nmea.py   [plain text]


# Twisted, the Framework of Your Internet
# Copyright (C) 2001-2002 Matthew W. Lefkowitz
# 
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
# 
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
# 
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 

"""NMEA 0183 implementation

Maintainer: U{Bob Ippolito<mailto:bob@redivi.com>}

The following NMEA 0183 sentences are currently understood::
    GPGGA (fix)
    GPGLL (position)
    GPRMC (position and time)
    GPGSA (active satellites)
 
The following NMEA 0183 sentences require implementation::
    None really, the others aren't generally useful or implemented in most devices anyhow

Other desired features::
    - A NMEA 0183 producer to emulate GPS devices (?)
"""

import time, operator
from twisted.protocols import basic

POSFIX_INVALID, POSFIX_SPS, POSFIX_DGPS, POSFIX_PPS = 0, 1, 2, 3
MODE_AUTO, MODE_FORCED = 'A', 'M'
MODE_NOFIX, MODE_2D, MODE_3D = 1, 2, 3

class InvalidSentence(Exception):
    pass

class InvalidChecksum(Exception):
    pass

class NMEAReceiver(basic.LineReceiver):
    """This parses most common NMEA-0183 messages, presumably from a serial GPS device at 4800 bps
    """
    delimiter = '\r\n'
    dispatch = {
        'GPGGA': 'fix',
        'GPGLL': 'position',
        'GPGSA': 'activesatellites',
        'GPRMC': 'positiontime',
        'GPGSV': 'viewsatellites',    # not implemented
        'GPVTG': 'course',            # not implemented
        'GPALM': 'almanac',           # not implemented
        'GPGRS': 'range',             # not implemented
        'GPGST': 'noise',             # not implemented
        'GPMSS': 'beacon',            # not implemented
        'GPZDA': 'time',              # not implemented
    }
    # generally you may miss the beginning of the first message
    ignore_invalid_sentence = 1
    # checksums shouldn't be invalid
    ignore_checksum_mismatch = 0
    # ignore unknown sentence types
    ignore_unknown_sentencetypes = 0
    # do we want to even bother checking to see if it's from the 20th century?
    convert_dates_before_y2k = 1

    def lineReceived(self, line):
        if not line.startswith('$'):
            if self.ignore_invalid_sentence:
                return
            raise InvalidSentence("%r does not begin with $" % (line,))
        # message is everything between $ and *, checksum is xor of all ASCII values of the message
        strmessage, checksum = line[1:].strip().split('*')
        message = strmessage.split(',')
        sentencetype, message = message[0], message[1:]
        dispatch = self.dispatch.get(sentencetype, None)
        if (not dispatch) and (not self.ignore_unknown_sentencetypes):
            raise InvalidSentence("sentencetype %r" % (sentencetype,))
        if not self.ignore_checksum_mismatch:
            checksum, calculated_checksum = int(checksum, 16), reduce(operator.xor, map(ord, strmessage))
        if checksum != calculated_checksum:
            raise InvalidChecksum("Given 0x%02X != 0x%02X" % (checksum, calculated_checksum))
        handler = getattr(self, "handle_%s" % dispatch, None)
        decoder = getattr(self, "decode_%s" % dispatch, None)
        if not (dispatch and handler and decoder):
            # missing dispatch, handler, or decoder
            return
        # return handler(*decoder(*message))
        try:
            decoded = decoder(*message)
        except Exception, e:
            raise InvalidSentence("%r is not a valid %s (%s) sentence" % (line, sentencetype, dispatch))
        return handler(*decoded)

    def decode_position(self, latitude, ns, longitude, ew, utc, status):
        latitude, longitude = self._decode_latlon(latitude, ns, longitude, ew)
        utc = self._decode_utc(utc)
        if status == 'A':
            status = 1
        else:
            status = 0
        return (
            latitude,
            longitude,
            utc,
            status,
        )

    def decode_positiontime(self, utc, status, latitude, ns, longitude, ew, speed, course, utcdate, magvar, magdir):
        utc = self._decode_utc(utc)
        latitude, longitude = self._decode_latlon(latitude, ns, longitude, ew)
        if speed != '':
            speed = float(speed)
        else:
            speed = None
        if course != '':
            course = float(course)
        else:
            course = None
        utcdate = 2000+int(utcdate[4:6]), int(utcdate[2:4]), int(utcdate[0:2])
        if self.convert_dates_before_y2k and utcdate[0] > 2073:
            # GPS was invented by the US DoD in 1973, but NMEA uses 2 digit year.
            # Highly unlikely that we'll be using NMEA or this twisted module in 70 years,
            # but remotely possible that you'll be using it to play back data from the 20th century.
            utcdate = (utcdate[0] - 100, utcdate[1], utcdate[2])
        if magvar != '':
            magvar = float(magvar)
        if magdir == 'W':
            magvar = -magvar
        else:
            magvar = None
        return (
            latitude,
            longitude,
            speed,
            course,
            # UTC seconds past utcdate
            utc,
            # UTC (year, month, day)
            utcdate,
            # None or magnetic variation in degrees (west is negative)
            magvar,
        )

    def _decode_utc(self, utc):
        utc_hh, utc_mm, utc_ss = map(float, (utc[:2], utc[2:4], utc[4:]))
        return utc_hh * 3600.0 + utc_mm * 60.0 + utc_ss

    def _decode_latlon(self, latitude, ns, longitude, ew):
        latitude = float(latitude[:2]) + float(latitude[2:])/60.0
        if ns == 'S':
            latitude = -latitude
        longitude = float(longitude[:3]) + float(longitude[3:])/60.0
        if ew == 'W':
            longitude = -longitude
        return (latitude, longitude)

    def decode_activesatellites(self, mode1, mode2, *args):
        satellites, (pdop, hdop, vdop) = args[:12], map(float, args[12:])
        satlist = []
        for n in satellites:
            if n:
                satlist.append(int(n))
            else:
                satlist.append(None)
        mode = (mode1, int(mode2))
        return (
            # satellite list by channel
            tuple(satlist),
            # (MODE_AUTO/MODE_FORCED, MODE_NOFIX/MODE_2DFIX/MODE_3DFIX)
            mode,
            # position dilution of precision
            pdop,
            # horizontal dilution of precision
            hdop,
            # vertical dilution of precision
            vdop,
        )
    
    def decode_fix(self, utc, latitude, ns, longitude, ew, posfix, satellites, hdop, altitude, altitude_units, geoid_separation, geoid_separation_units, dgps_age, dgps_station_id):
        latitude, longitude = self._decode_latlon(latitude, ns, longitude, ew)
        utc = self._decode_utc(utc)
        posfix = int(posfix)
        satellites = int(satellites)
        hdop = float(hdop)
        altitude = (float(altitude), altitude_units)
        if geoid_separation != '':
            geoid = (float(geoid_separation), geoid_separation_units)
        else:
            geoid = None
        if dgps_age != '':
            dgps = (float(dgps_age), dgps_station_id)
        else:
            dgps = None
        return (
            # seconds since 00:00 UTC
            utc,                 
            # latitude (degrees)
            latitude,       
            # longitude (degrees)
            longitude,     
            # position fix status (POSFIX_INVALID, POSFIX_SPS, POSFIX_DGPS, POSFIX_PPS)
            posfix,           
            # number of satellites used for fix 0 <= satellites <= 12 
            satellites,   
            # horizontal dilution of precision
            hdop,               
            # None or (altitude according to WGS-84 ellipsoid, units (typically 'M' for meters)) 
            altitude,
            # None or (geoid separation according to WGS-84 ellipsoid, units (typically 'M' for meters))
            geoid,
            # (age of dgps data in seconds, dgps station id)
            dgps,
        )