xstc.py   [plain text]


#!/usr/bin/env python

#
# This is the MS subset of the W3C test suite for XML Schemas.
# This file is generated from the MS W3c test suite description file.
#

import sys, os
import exceptions, optparse
import libxml2

opa = optparse.OptionParser()

opa.add_option("-b", "--base", action="store", type="string", dest="baseDir",
               default="",
               help="""The base directory; i.e. the parent folder of the
               "nisttest", "suntest" and "msxsdtest" directories.""")

opa.add_option("-o", "--out", action="store", type="string", dest="logFile",
               default="test.log",
               help="The filepath of the log file to be created")
               
opa.add_option("--no-log", action="store_true", dest="disableLog",
               default=False,
               help="The filepath of the log file to be created")    
               
opa.add_option("--no-test-out", action="store_true", dest="disableTestStdOut",
               default=False,
               help="The filepath of the log file to be created")                           

opa.add_option("-s", "--silent", action="store_true", dest="silent", default=False,
               help="Disables display of all tests")

opa.add_option("-v", "--verbose", action="store_true", dest="verbose",
               default=False,
               help="Displays all tests (only if --silent is not set)")

opa.add_option("-x", "--max", type="int", dest="maxTestCount",
               default="-1",
               help="The maximum number of tests to be run")

opa.add_option("-t", "--test", type="string", dest="singleTest",
               default=None,
               help="Runs the specified test only")
               
opa.add_option("--rieo", "--report-internal-errors-only", action="store_true",
               dest="reportInternalErrOnly", default=False,
               help="Display erroneous tests of type 'internal' only")
               
opa.add_option("--rmleo", "--report-mem-leak-errors-only", action="store_true",
               dest="reportMemLeakErrOnly", default=False,
               help="Display erroneous tests of type 'memory leak' only")

opa.add_option("-c", "--combines", type="string", dest="combines",
               default=None,
               help="Combines to be run (all if omitted)")

opa.add_option("--rc", "--report-combines", action="store_true",
               dest="reportCombines", default=False,
               help="Display combine reports")

opa.add_option("--rec", "--report-err-combines", action="store_true",
               dest="reportErrCombines", default=False,
               help="Display erroneous combine reports only")

opa.add_option("--debug", action="store_true",
               dest="debugEnabled", default=False,
               help="Displays debug messages")
               
opa.add_option("--info", action="store_true",
               dest="info", default=False,
               help="Displays info on the suite only. Does not run any test.")            

(options, args) = opa.parse_args()

if options.combines is not None:
    options.combines = options.combines.split()
    
################################################
# The vars below are not intended to be changed.
#

msgSchemaNotValidButShould =  "The schema should be valid."
msgSchemaValidButShouldNot = "The schema should be invalid."
msgInstanceNotValidButShould = "The instance should be valid."
msgInstanceValidButShouldNot = "The instance should be invalid."
testFolderNIST = "nisttest"
testFolderMS   = "msxsdtest"
testFolderSUN  = "suntest"

###################
# Helper functions.
#

def handleError(test, msg):
    test.addLibLog("'%s'   LIB: %s" % (test.name, msg))
    if msg.find("Unimplemented") > -1:
        test.failUnimplemented()
    elif msg.find("Internal") > -1:
        test.failInternal()        
    

##################
# Test case class.
#

class MSTestCase:
           
    def __init__(self, name, descr, tFolder, sFolder, sFile, sVal, iExists, iFolder, iFile, iVal):
        global testFolderNIST, testFolderSUN, testFolderMS
        #
        # Init.
        #
        self.name = name
        self.descr = descr
        self.test_Folder = tFolder
        self.schema_Folder = sFolder
        self.schema_File = sFile
        self.schema_Val = sVal
        self.instance_Exists = iExists
        self.instance_Folder = iFolder
        self.instance_File = iFile
        self.instance_Val = iVal
        self.failed = False
        self.log = []
        self.libLog = []
        self.phase = ""
        self.initialMemUsed = 0
        self.memLeak = 0
        self.excepted = False
        self.bad = False
        self.unimplemented = False
        self.internalErr = False
        #
        # Compute combine name of this test.
        #       
        if self.test_Folder == testFolderMS or self.test_Folder == testFolderSUN:
            #
            # Use the last given directory for the combine name.
            #
            dirs = self.schema_Folder.split("/")
            self.combineName = dirs[len(dirs) -1]
	    if self.test_Folder == testFolderMS:
	        if self.combineName == "group":		    
	            self.schema_Folder = "Group"
		    self.instance_Folder = "Group"
        elif self.test_Folder == testFolderNIST:
            #
            # NIST files are named in the following form: 
            # "NISTSchema-short-pattern-1.xsd"
            #
            tokens = self.schema_File.split("-")
            self.combineName = tokens[1]            
        else:
            self.combineName = "unkown"
            raise Exception("Could not compute the combine name of a test.")
        #
        # Init the log.
        #
        self.log.append("'%s'   descr: %s\n" % (self.name, self.descr))
        self.log.append("'%s'   exp schema   valid: %d\n" % (self.name, self.schema_Val))
        if (self.instance_Exists):
            self.log.append("'%s'   exp instance valid: %d\n" % (self.name, self.instance_Val))                       
       
    def addLibLog(self, msg):
        """This one is intended to be used by the error handler
        function"""
        self.libLog.append(msg)

    def fail(self, msg):       
        self.failed = True         
        self.log.append("'%s' ( FAILED: %s\n" % (self.name, msg))
        
    def failInternal(self):
        self.failed = True
        self.internalErr = True
        self.log.append("'%s' * INTERNAL\n" % self.name)
        
    def failUnimplemented(self):
        self.failed = True
        self.unimplemented = True
        self.log.append("'%s' ? UNIMPLEMENTED\n" % self.name)

    def failCritical(self, msg):        
        self.failed = True
        self.bad = True
        self.log.append("'%s' ! BAD: %s\n" % (self.name, msg))  

    def failExcept(self, e):      
        self.failed = True
        self.excepted = True
        self.log.append("'%s' # EXCEPTION: %s\n" % (self.name, e.__str__()))
    
    def setUp(self):            
        #
        # Set up Libxml2.
        #   
        self.initialMemUsed = libxml2.debugMemory(1)
        libxml2.initParser()
        libxml2.lineNumbersDefault(1)
        libxml2.registerErrorHandler(handleError, self)
        
    def tearDown(self):        
        libxml2.schemaCleanupTypes()
        libxml2.cleanupParser()      
        self.memLeak = libxml2.debugMemory(1) - self.initialMemUsed

    def isIOError(self, file, docType):
        err = None
        try:
            err = libxml2.lastError()
        except:
            # Suppress exceptions.
            pass
        if (err is None):
            return False
        if err.domain() == libxml2.XML_FROM_IO:
            self.failCritical("failed to access the %s resource '%s'\n" % (docType, file))

    def debugMsg(self, msg):
        global options 
        if options.debugEnabled:
            sys.stdout.write("'%s'   DEBUG: %s\n" % (self.name, msg))
            
    def finalize(self):
        """Adds additional info to the log."""
        #
        # Add libxml2 messages.
        #
        self.log.extend(self.libLog)
        #
        # Add memory leaks.
        #        
        if self.memLeak != 0:            
            self.log.append("%s + memory leak: %d bytes\n" % (self.name, self.memLeak))
            
    def processSchema(self, filePath):
        global msgSchemaNotValidButShould, msgSchemaValidButShouldNot
        schema = None
        
        #
        # Parse the schema.
        #
        self.debugMsg("loading schema: %s" % filePath)
        schema_ParserCtxt = libxml2.schemaNewParserCtxt(filePath)
        try:
            try:
                schema = schema_ParserCtxt.schemaParse()
            except:
                pass
        finally:
            self.debugMsg("after loading schema")
            del schema_ParserCtxt
        if schema is None:
            self.debugMsg("schema is None")
            self.debugMsg("checking for IO errors...")
            if self.isIOError(file, "schema"):
                return None
        self.debugMsg("checking schema result")
        if (schema is None and self.schema_Val) or (schema is not None and self.schema_Val == 0):
            self.debugMsg("schema result is BAD")
            if (schema == None):
                self.fail(msgSchemaNotValidButShould)
            else:
                self.fail(msgSchemaValidButShouldNot)
        else:
	    self.debugMsg("schema result is OK")
            return schema

    def processInstance(self, filePath, schema):
        global msgInstanceNotValidButShould, msgInstanceValidButShouldNot
        
        instance = None
        self.debugMsg("loading instance: %s" % filePath)            
        instance_parserCtxt = libxml2.newParserCtxt()
        if (instance_parserCtxt is None):
            # TODO: Is this one necessary, or will an exception 
            # be already raised?
            raise Exception("Could not create the instance parser context.")
        try:
            try:
                instance = instance_parserCtxt.ctxtReadFile(filePath, None, libxml2.XML_PARSE_NOWARNING)
            except:
                # Suppress exceptions.
                pass
        finally:
            del instance_parserCtxt
        self.debugMsg("after loading instance")
        if instance is None:
            self.debugMsg("instance is None")
            self.failCritical("Failed to parse the instance for unknown reasons.")
            return
        else:
            try:
                #
                # Validate the instance.
                #
		
                validation_Ctxt = schema.schemaNewValidCtxt()
		#validation_Ctxt = libxml2.schemaNewValidCtxt(None)
                if (validation_Ctxt is None):
                    self.failCritical("Could not create the validation context.")
                    return
                try:
                    self.debugMsg("validating instance") 
                    instance_Err = validation_Ctxt.schemaValidateDoc(instance)
                    self.debugMsg("after instance validation") 
                    self.debugMsg("instance-err: %d" % instance_Err)
                    if (instance_Err != 0 and self.instance_Val == 1) or (instance_Err == 0 and self.instance_Val == 0):
                        self.debugMsg("instance result is BAD")
                        if (instance_Err != 0):
                            self.fail(msgInstanceNotValidButShould)
                        else:
                            self.fail(msgInstanceValidButShouldNot)
                            
                    else:                        
                                self.debugMsg("instance result is OK")
                finally:
                    del validation_Ctxt
            finally:
                instance.freeDoc()
            

    def run(self):
        """Runs a test.""" 
        global options
        
        # os.path.join(options.baseDir, self.test_Folder, self.schema_Folder, self.schema_File)
        filePath = "%s/%s/%s/%s" % (options.baseDir, self.test_Folder, self.schema_Folder, self.schema_File)
        schema = None
        try:                
            schema = self.processSchema(filePath)
            try:
                if self.instance_Exists and (schema is not None) and (not self.failed):
                    filePath = "%s/%s/%s/%s" % (options.baseDir, self.test_Folder, self.instance_Folder, self.instance_File)
                    self.processInstance(filePath, schema)
            finally:
                if schema is not None:
                   del schema

        except (Exception, libxml2.parserError, libxml2.treeError), e:
            self.failExcept(e)

            
####################
# Test runner class.
#
              
class MSTestRunner:

    CNT_TOTAL = 0
    CNT_RAN = 1
    CNT_SUCCEEDED = 2
    CNT_FAILED = 3
    CNT_UNIMPLEMENTED = 4
    CNT_INTERNAL = 5
    CNT_BAD = 6
    CNT_EXCEPTED = 7
    CNT_MEMLEAK = 8

    def __init__(self):
        self.logFile = None
        self.counters = self.createCounters()
        self.testList = []
        self.combinesRan = {}
        
    def createCounters(self):
        counters = {self.CNT_TOTAL:0, self.CNT_RAN:0, self.CNT_SUCCEEDED:0,
        self.CNT_FAILED:0, self.CNT_UNIMPLEMENTED:0, self.CNT_INTERNAL:0, self.CNT_BAD:0, 
        self.CNT_EXCEPTED:0, self.CNT_MEMLEAK:0}
        
        return counters

    def addTest(self, test):
        self.testList.append(test)
        
    def updateCounters(self, test, counters):
        if test.memLeak != 0:
           counters[self.CNT_MEMLEAK] += 1
        if not test.failed:
           counters[self.CNT_SUCCEEDED] +=1
        if test.failed:
           counters[self.CNT_FAILED] += 1
        if test.bad:
           counters[self.CNT_BAD] += 1
        if test.unimplemented:
           counters[self.CNT_UNIMPLEMENTED] += 1   
        if test.internalErr:
           counters[self.CNT_INTERNAL] += 1                      
        if test.excepted:
           counters[self.CNT_EXCEPTED] += 1
        return counters
           
    def displayResults(self, out, all, combName, counters):
        out.write("\n")
        if all:
            if options.combines is not None:
                out.write("combine(s): %s\n" % str(options.combines))
        elif combName is not None:             
            out.write("combine : %s\n" % combName)
        out.write("  total             : %d\n" % counters[self.CNT_TOTAL])
        if all or options.combines is not None:
            out.write("    ran             : %d\n" % counters[self.CNT_RAN])
        # out.write("    succeeded       : %d\n" % counters[self.CNT_SUCCEEDED])
        if counters[self.CNT_FAILED] > 0:
            out.write("    failed          : %d\n" % counters[self.CNT_FAILED])
            out.write("     -> internal    : %d\n" % counters[self.CNT_INTERNAL])
            out.write("     -> unimpl.     : %d\n" % counters[self.CNT_UNIMPLEMENTED])
            out.write("     -> bad         : %d\n" % counters[self.CNT_BAD])            
            out.write("     -> exceptions  : %d\n" % counters[self.CNT_EXCEPTED])
        if counters[self.CNT_MEMLEAK] > 0:
            out.write("    memory leaks    : %d\n" % counters[self.CNT_MEMLEAK])

    def displayShortResults(self, out, all, combName, counters):
        out.write("Ran %d of %d tests:" % (counters[self.CNT_RAN],
                  counters[self.CNT_TOTAL]))
        # out.write("    succeeded       : %d\n" % counters[self.CNT_SUCCEEDED])
        if counters[self.CNT_FAILED] > 0 or counters[self.CNT_MEMLEAK] > 0:
            out.write(" %d failed" % (counters[self.CNT_FAILED]))
            if counters[self.CNT_INTERNAL] > 0:
                out.write(" %d internal" % (counters[self.CNT_INTERNAL]))
            if counters[self.CNT_UNIMPLEMENTED] > 0:
                out.write(" %d unimplemented" % (counters[self.CNT_UNIMPLEMENTED]))
            if counters[self.CNT_BAD] > 0:
                out.write(" %d bad" % (counters[self.CNT_BAD]))
            if counters[self.CNT_EXCEPTED] > 0:
                out.write(" %d exception" % (counters[self.CNT_EXCEPTED]))
            if counters[self.CNT_MEMLEAK] > 0:
                out.write(" %d leaks" % (counters[self.CNT_MEMLEAK]))
            out.write("\n")
        else:
            out.write(" all passed\n")
    
    def reportCombine(self, combName):
        global options
        
        counters = self.createCounters()
        #
        # Compute evaluation counters.
        #
        for test in self.combinesRan[combName]:
            counters[self.CNT_TOTAL] += 1
            counters[self.CNT_RAN] += 1
            counters = self.updateCounters(test, counters)
        if options.reportErrCombines and (counters[self.CNT_FAILED] == 0) and (counters[self.CNT_MEMLEAK] == 0):
            pass
        else:
            if not options.disableLog:
                self.displayResults(self.logFile, False, combName, counters)
            self.displayResults(sys.stdout, False, combName, counters)
        
    def displayTestLog(self, test):
        sys.stdout.writelines(test.log)
        sys.stdout.write("~~~~~~~~~~\n")
    
    def reportTest(self, test):
        global options
        
        error = test.failed or test.memLeak != 0
        #
        # Only erroneous tests will be written to the log,
        # except @verbose is switched on.
        #        
        if not options.disableLog and (options.verbose or error):
            self.logFile.writelines(test.log)
            self.logFile.write("~~~~~~~~~~\n")
        #
        # if not @silent, only erroneous tests will be
        # written to stdout, except @verbose is switched on.
        #
        if not options.silent: 
            if options.reportInternalErrOnly and test.internalErr:
                self.displayTestLog(test)
            if options.reportMemLeakErrOnly and test.memLeak != 0: 
                self.displayTestLog(test)
            if (options.verbose or error) and (not options.reportInternalErrOnly) and (not options.reportMemLeakErrOnly):
                self.displayTestLog(test)
                
    def addToCombines(self, test):
        found = False
        if self.combinesRan.has_key(test.combineName):
            self.combinesRan[test.combineName].append(test)
        else:
            self.combinesRan[test.combineName] = [test]

    def run(self):

        global options
        
        if options.info:
            for test in self.testList:
                self.addToCombines(test)               
            sys.stdout.write("Combines: %d\n" % len(self.combinesRan))
            sys.stdout.write("%s\n" % self.combinesRan.keys())
            return
        
        if not options.disableLog:
            self.logFile = open(options.logFile, "w")
        try:
            for test in self.testList:
                self.counters[self.CNT_TOTAL] += 1
                #
                # Filter tests.
                #   
                if options.singleTest is not None and options.singleTest != "":
                    if (test.name != options.singleTest):
                        continue
                elif options.combines is not None:
                    if not options.combines.__contains__(test.combineName):
                        continue
                if options.maxTestCount != -1 and self.counters[self.CNT_RAN] >= options.maxTestCount:
                    break
                self.counters[self.CNT_RAN] += 1
                #
                # Run the thing, dammit.
                #
                try:
                    test.setUp()
                    try:
                        test.run()
                    finally:
                        test.tearDown()
                finally:
                    #
                    # Evaluate.
                    #
                    test.finalize()
                    self.reportTest(test)
                    if options.reportCombines or options.reportErrCombines:
                        self.addToCombines(test)
                    self.counters = self.updateCounters(test, self.counters)
        finally:        
            if options.reportCombines or options.reportErrCombines:
                #
                # Build a report for every single combine.
                #
                # TODO: How to sort a dict?
                #
                self.combinesRan.keys().sort(None)
                for key in self.combinesRan.keys():
                    self.reportCombine(key)
            
            #
            # Display the final report.
            #
            if options.silent:
                self.displayShortResults(sys.stdout, True, None, self.counters)
            else:
                sys.stdout.write("===========================\n")
                self.displayResults(sys.stdout, True, None, self.counters)