"""Rockwell Semiconductor Zodiac Serial Protocol
Coded from official protocol specs (Order No. GPS-25, 09/24/1996, Revision 11)
Maintainer: U{Bob Ippolito<mailto:bob@redivi.com>}
The following Rockwell Zodiac messages are currently understood::
EARTHA\\r\\n (a hack to "turn on" a DeLorme Earthmate)
1000 (Geodesic Position Status Output)
1002 (Channel Summary)
1003 (Visible Satellites)
1011 (Receiver ID)
The following Rockwell Zodiac messages require implementation::
None really, the others aren't quite so useful and require bidirectional communication w/ the device
Other desired features::
- Compatability with the DeLorme Tripmate and other devices with this chipset (?)
"""
import struct, operator, math, string
from twisted.internet import protocol
from twisted.python import log
DEBUG = 1
class ZodiacParseError(ValueError):
pass
class Zodiac(protocol.Protocol):
dispatch = {
1000: 'fix', 1001: 'ecef', 1002: 'channels', 1003: 'satellites', 1005: 'dgps', 1007: 'channelmeas', 1011: 'id', 1012: 'usersettings', 1100: 'testresults', 1102: 'meastimemark', 1108: 'utctimemark', 1130: 'serial', 1135: 'eepromupdate', 1136: 'eepromstatus', }
messages = {
'fix': 1200, 'udatum': 1210, 'mdatum': 1211, 'smask': 1212, 'sselect': 1213, 'dgpsc': 1214, 'startc': 1216, 'svalid': 1217, 'antenna': 1218, 'altinput': 1219, 'appctl': 1220, 'navcfg': 1221, 'test': 1300, 'restart': 1303, 'serial': 1330, 'msgctl': 1331, 'dgpsd': 1351, }
MAX_LENGTH = 296
allow_earthmate_hack = 1
recvd = ""
def dataReceived(self, recd):
self.recvd = self.recvd + recd
while len(self.recvd) >= 10:
if self.recvd[:8] == 'EARTHA\r\n':
if self.allow_earthmate_hack:
self.allow_earthmate_hack = 0
self.transport.write('EARTHA\r\n')
self.recvd = self.recvd[8:]
continue
if self.recvd[0:2] != '\xFF\x81':
if DEBUG:
raise ZodiacParseError('Invalid Sync %r' % self.recvd)
else:
raise ZodiacParseError
sync, msg_id, length, acknak, checksum = struct.unpack('<HHHHh', self.recvd[:10])
cksum = -(reduce(operator.add, (sync, msg_id, length, acknak)) & 0xFFFF)
cksum, = struct.unpack('<h', struct.pack('<h', cksum))
if cksum != checksum:
if DEBUG:
raise ZodiacParseError('Invalid Header Checksum %r != %r %r' % (checksum, cksum, self.recvd[:8]))
else:
raise ZodiacParseError
length = length * 2
neededBytes = 10
if length:
neededBytes += length + 2
if len(self.recvd) < neededBytes:
break
if neededBytes > self.MAX_LENGTH:
raise ZodiacParseError("Invalid Header??")
message = ''
if length:
message, checksum = self.recvd[10:10+length], struct.unpack('<h', self.recvd[10+length:neededBytes])[0]
cksum = 0x10000 - (reduce(operator.add, struct.unpack('<%dH' % (length/2), message)) & 0xFFFF)
cksum, = struct.unpack('<h', struct.pack('<h', cksum))
if cksum != checksum:
if DEBUG:
log.debug('msg_id = %r length = %r' % (msg_id, length))
raise ZodiacParseError('Invalid Data Checksum %r != %r %r' % (checksum, cksum, message))
else:
raise ZodiacParseError
self.recvd = self.recvd[neededBytes:]
self.receivedMessage(msg_id, message, acknak)
def receivedMessage(self, msg_id, message, acknak):
dispatch = self.dispatch.get(msg_id, None)
if not dispatch:
raise ZodiacParseError('Unknown msg_id = %r' % msg_id)
handler = getattr(self, 'handle_%s' % dispatch, None)
decoder = getattr(self, 'decode_%s' % dispatch, None)
if not (handler and decoder):
return
decoded = decoder(message)
return handler(*decoded)
def decode_fix(self, message):
assert len(message) == 98, "Geodesic Position Status Output should be 55 words total (98 byte message)"
(ticks, msgseq, satseq, navstatus, navtype, nmeasure, polar, gpswk, gpses, gpsns, utcdy, utcmo, utcyr, utchr, utcmn, utcsc, utcns, latitude, longitude, height, geoidalsep, speed, course, magvar, climb, mapdatum, exhposerr, exvposerr, extimeerr, exphvelerr, clkbias, clkbiasdev, clkdrift, clkdriftdev) = struct.unpack('<LhhHHHHHLLHHHHHHLlllhLHhhHLLLHllll', message)
utc = (utchr * 3600.0) + (utcmn * 60.0) + utcsc + (float(utcns) * 0.000000001)
log.debug('utchr, utcmn, utcsc, utcns = ' + repr((utchr, utcmn, utcsc, utcns)))
latitude = float(latitude) * 0.00000180 / math.pi
longitude = float(longitude) * 0.00000180 / math.pi
posfix = not (navstatus & 0x001c)
satellites = nmeasure
hdop = float(exhposerr) * 0.01
altitude = float(height) * 0.01, 'M'
geoid = float(geoidalsep) * 0.01, 'M'
dgps = None
return (
utc,
latitude,
longitude,
posfix,
satellites,
hdop,
altitude,
geoid,
dgps,
)
def decode_id(self, message):
assert len(message) == 106, "Receiver ID Message should be 59 words total (106 byte message)"
ticks, msgseq, channels, software_version, software_date, options_list, reserved = struct.unpack('<Lh20s20s20s20s20s', message)
channels, software_version, software_date, options_list = map(lambda s: s.split('\0')[0], (channels, software_version, software_date, options_list))
software_version = float(software_version)
channels = int(channels) options_list = int(options_list[:4], 16) minimize_rom = (options_list & 0x01) > 0
minimize_ram = (options_list & 0x02) > 0
return ((software_version, software_date), (minimize_rom, minimize_ram))
def decode_channels(self, message):
assert len(message) == 90, "Channel Summary Message should be 51 words total (90 byte message)"
ticks, msgseq, satseq, gpswk, gpsws, gpsns = struct.unpack('<LhhHLL', message[:18])
channels = []
message = message[18:]
for i in range(12):
flags, prn, cno = struct.unpack('<HHH', message[6 * i:6 * (i + 1)])
flags = (flags & 0x01, flags & 0x02, flags & 0x04, flags & 0x08)
channels.append((flags, prn, cno))
return (tuple(channels),)
def decode_satellites(self, message):
assert len(message) == 90, "Visible Satellites Message should be 51 words total (90 byte message)"
ticks, msgseq, gdop, pdop, hdop, vdop, tdop, numsatellites = struct.unpack('<LhhhhhhH', message[:18])
gdop, pdop, hdop, vdop, tdop = map(lambda n: float(n) * 0.01, (gdop, pdop, hdop, vdop, tdop))
satellites = []
message = message[18:]
for i in range(numsatellites):
prn, azi, elev = struct.unpack('<Hhh', message[6 * i:6 * (i + 1)])
azi, elev = map(lambda n: (float(n) * 0.0180 / math.pi), (azi, elev))
satellites.push((prn, azi, elev))
return (tuple(satellites), (gdop, pdop, hdop, vdop, tdop))
def decode_dgps(self, message):
assert len(message) == 38, "Differential GPS Status Message should be 25 words total (38 byte message)"
raise NotImplementedError
def decode_ecef(self, message):
assert len(message) == 96, "ECEF Position Status Output Message should be 54 words total (96 byte message)"
raise NotImplementedError
def decode_channelmeas(self, message):
assert len(message) == 296, "Channel Measurement Message should be 154 words total (296 byte message)"
raise NotImplementedError
def decode_usersettings(self, message):
assert len(message) == 32, "User-Settings Output Message should be 22 words total (32 byte message)"
raise NotImplementedError
def decode_testresults(self, message):
assert len(message) == 28, "Built-In Test Results Message should be 20 words total (28 byte message)"
raise NotImplementedError
def decode_meastimemark(self, message):
assert len(message) == 494, "Measurement Time Mark Message should be 253 words total (494 byte message)"
raise NotImplementedError
def decode_utctimemark(self, message):
assert len(message) == 28, "UTC Time Mark Pulse Output Message should be 20 words total (28 byte message)"
raise NotImplementedError
def decode_serial(self, message):
assert len(message) == 30, "Serial Port Communication Paramaters In Use Message should be 21 words total (30 byte message)"
raise NotImplementedError
def decode_eepromupdate(self, message):
assert len(message) == 8, "EEPROM Update Message should be 10 words total (8 byte message)"
raise NotImplementedError
def decode_eepromstatus(self, message):
assert len(message) == 24, "EEPROM Status Message should be 18 words total (24 byte message)"
raise NotImplementedError