esistools.py   [plain text]


"""Miscellaneous utility functions useful for dealing with ESIS streams."""

import re
import string

import xml.dom.pulldom

import xml.sax
import xml.sax.handler
import xml.sax.xmlreader


_data_match = re.compile(r"[^\\][^\\]*").match

def decode(s):
    r = ''
    while s:
        m = _data_match(s)
        if m:
            r = r + m.group()
            s = s[m.end():]
        elif s[1] == "\\":
            r = r + "\\"
            s = s[2:]
        elif s[1] == "n":
            r = r + "\n"
            s = s[2:]
        elif s[1] == "%":
            s = s[2:]
            n, s = s.split(";", 1)
            r = r + unichr(int(n))
        else:
            raise ValueError, "can't handle " + `s`
    return r


_charmap = {}
for c in map(chr, range(256)):
    _charmap[c] = c
_charmap["\n"] = r"\n"
_charmap["\\"] = r"\\"
del c

_null_join = ''.join
def encode(s):
    return _null_join(map(_charmap.get, s))


class ESISReader(xml.sax.xmlreader.XMLReader):
    """SAX Reader which reads from an ESIS stream.

    No verification of the document structure is performed by the
    reader; a general verifier could be used as the target
    ContentHandler instance.

    """
    _decl_handler = None
    _lexical_handler = None

    _public_id = None
    _system_id = None

    _buffer = ""
    _is_empty = 0
    _lineno = 0
    _started = 0

    def __init__(self, contentHandler=None, errorHandler=None):
        xml.sax.xmlreader.XMLReader.__init__(self)
        self._attrs = {}
        self._attributes = Attributes(self._attrs)
        self._locator = Locator()
        self._empties = {}
        if contentHandler:
            self.setContentHandler(contentHandler)
        if errorHandler:
            self.setErrorHandler(errorHandler)

    def get_empties(self):
        return self._empties.keys()

    #
    #  XMLReader interface
    #

    def parse(self, source):
        raise RuntimeError
        self._locator._public_id = source.getPublicId()
        self._locator._system_id = source.getSystemId()
        fp = source.getByteStream()
        handler = self.getContentHandler()
        if handler:
            handler.startDocument()
        lineno = 0
        while 1:
            token, data = self._get_token(fp)
            if token is None:
                break
            lineno = lineno + 1
            self._locator._lineno = lineno
            self._handle_token(token, data)
        handler = self.getContentHandler()
        if handler:
            handler.startDocument()

    def feed(self, data):
        if not self._started:
            handler = self.getContentHandler()
            if handler:
                handler.startDocument()
            self._started = 1
        data = self._buffer + data
        self._buffer = None
        lines = data.split("\n")
        if lines:
            for line in lines[:-1]:
                self._lineno = self._lineno + 1
                self._locator._lineno = self._lineno
                if not line:
                    e = xml.sax.SAXParseException(
                        "ESIS input line contains no token type mark",
                        None, self._locator)
                    self.getErrorHandler().error(e)
                else:
                    self._handle_token(line[0], line[1:])
            self._buffer = lines[-1]
        else:
            self._buffer = ""

    def close(self):
        handler = self.getContentHandler()
        if handler:
            handler.endDocument()
        self._buffer = ""

    def _get_token(self, fp):
        try:
            line = fp.readline()
        except IOError, e:
            e = SAXException("I/O error reading input stream", e)
            self.getErrorHandler().fatalError(e)
            return
        if not line:
            return None, None
        if line[-1] == "\n":
            line = line[:-1]
        if not line:
            e = xml.sax.SAXParseException(
                "ESIS input line contains no token type mark",
                None, self._locator)
            self.getErrorHandler().error(e)
            return
        return line[0], line[1:]

    def _handle_token(self, token, data):
        handler = self.getContentHandler()
        if token == '-':
            if data and handler:
                handler.characters(decode(data))
        elif token == ')':
            if handler:
                handler.endElement(decode(data))
        elif token == '(':
            if self._is_empty:
                self._empties[data] = 1
            if handler:
                handler.startElement(data, self._attributes)
            self._attrs.clear()
            self._is_empty = 0
        elif token == 'A':
            name, value = data.split(' ', 1)
            if value != "IMPLIED":
                type, value = value.split(' ', 1)
                self._attrs[name] = (decode(value), type)
        elif token == '&':
            # entity reference in SAX?
            pass
        elif token == '?':
            if handler:
                if ' ' in data:
                    target, data = string.split(data, None, 1)
                else:
                    target, data = data, ""
                handler.processingInstruction(target, decode(data))
        elif token == 'N':
            handler = self.getDTDHandler()
            if handler:
                handler.notationDecl(data, self._public_id, self._system_id)
            self._public_id = None
            self._system_id = None
        elif token == 'p':
            self._public_id = decode(data)
        elif token == 's':
            self._system_id = decode(data)
        elif token == 'e':
            self._is_empty = 1
        elif token == 'C':
            pass
        else:
            e = SAXParseException("unknown ESIS token in event stream",
                                  None, self._locator)
            self.getErrorHandler().error(e)

    def setContentHandler(self, handler):
        old = self.getContentHandler()
        if old:
            old.setDocumentLocator(None)
        if handler:
            handler.setDocumentLocator(self._locator)
        xml.sax.xmlreader.XMLReader.setContentHandler(self, handler)

    def getProperty(self, property):
        if property == xml.sax.handler.property_lexical_handler:
            return self._lexical_handler

        elif property == xml.sax.handler.property_declaration_handler:
            return self._decl_handler

        else:
            raise xml.sax.SAXNotRecognizedException("unknown property %s"
                                                    % `property`)

    def setProperty(self, property, value):
        if property == xml.sax.handler.property_lexical_handler:
            if self._lexical_handler:
                self._lexical_handler.setDocumentLocator(None)
            if value:
                value.setDocumentLocator(self._locator)
            self._lexical_handler = value

        elif property == xml.sax.handler.property_declaration_handler:
            if self._decl_handler:
                self._decl_handler.setDocumentLocator(None)
            if value:
                value.setDocumentLocator(self._locator)
            self._decl_handler = value

        else:
            raise xml.sax.SAXNotRecognizedException()

    def getFeature(self, feature):
        if feature == xml.sax.handler.feature_namespaces:
            return 1
        else:
            return xml.sax.xmlreader.XMLReader.getFeature(self, feature)

    def setFeature(self, feature, enabled):
        if feature == xml.sax.handler.feature_namespaces:
            pass
        else:
            xml.sax.xmlreader.XMLReader.setFeature(self, feature, enabled)


class Attributes(xml.sax.xmlreader.AttributesImpl):
    # self._attrs has the form {name: (value, type)}

    def getType(self, name):
        return self._attrs[name][1]

    def getValue(self, name):
        return self._attrs[name][0]

    def getValueByQName(self, name):
        return self._attrs[name][0]

    def __getitem__(self, name):
        return self._attrs[name][0]

    def get(self, name, default=None):
        if self._attrs.has_key(name):
            return self._attrs[name][0]
        return default

    def items(self):
        L = []
        for name, (value, type) in self._attrs.items():
            L.append((name, value))
        return L

    def values(self):
        L = []
        for value, type in self._attrs.values():
            L.append(value)
        return L


class Locator(xml.sax.xmlreader.Locator):
    _lineno = -1
    _public_id = None
    _system_id = None

    def getLineNumber(self):
        return self._lineno

    def getPublicId(self):
        return self._public_id

    def getSystemId(self):
        return self._system_id


def parse(stream_or_string, parser=None):
    if type(stream_or_string) in [type(""), type(u"")]:
        stream = open(stream_or_string)
    else:
        stream = stream_or_string
    if not parser:
        parser = ESISReader()
    return xml.dom.pulldom.DOMEventStream(stream, parser, (2 ** 14) - 20)