xstc.py   [plain text]


#!/usr/bin/env python

#
# This is the MS subset of the W3C test suite for XML Schemas.
# This file is generated from the MS W3c test suite description file.
#

import sys, os
import exceptions, optparse
import libxml2

opa = optparse.OptionParser()

opa.add_option("-b", "--base", action="store", type="string", dest="baseDir",
			   default="",
			   help="""The base directory; i.e. the parent folder of the
			   "nisttest", "suntest" and "msxsdtest" directories.""")

opa.add_option("-o", "--out", action="store", type="string", dest="logFile",
			   default="test.log",
			   help="The filepath of the log file to be created")

opa.add_option("--log", action="store_true", dest="enableLog",
			   default=False,
			   help="Create the log file")

opa.add_option("--no-test-out", action="store_true", dest="disableTestStdOut",
			   default=False,
			   help="Don't output test results")

opa.add_option("-s", "--silent", action="store_true", dest="silent", default=False,
			   help="Disables display of all tests")

opa.add_option("-v", "--verbose", action="store_true", dest="verbose",
			   default=False,
			   help="Displays all tests (only if --silent is not set)")

opa.add_option("-x", "--max", type="int", dest="maxTestCount",
			   default="-1",
			   help="The maximum number of tests to be run")

opa.add_option("-t", "--test", type="string", dest="singleTest",
			   default=None,
			   help="Runs the specified test only")
			   
opa.add_option("--tsw", "--test-starts-with", type="string", dest="testStartsWith",
			   default=None,
			   help="Runs the specified test(s), starting with the given string")

opa.add_option("--rieo", "--report-internal-errors-only", action="store_true",
			   dest="reportInternalErrOnly", default=False,
			   help="Display erroneous tests of type 'internal' only")

opa.add_option("--rueo", "--report-unimplemented-errors-only", action="store_true",
			   dest="reportUnimplErrOnly", default=False,
			   help="Display erroneous tests of type 'unimplemented' only")

opa.add_option("--rmleo", "--report-mem-leak-errors-only", action="store_true",
			   dest="reportMemLeakErrOnly", default=False,
			   help="Display erroneous tests of type 'memory leak' only")

opa.add_option("-c", "--combines", type="string", dest="combines",
			   default=None,
			   help="Combines to be run (all if omitted)")
			   
opa.add_option("--csw", "--csw", type="string", dest="combineStartsWith",
			   default=None,
			   help="Combines to be run (all if omitted)")			   

opa.add_option("--rc", "--report-combines", action="store_true",
			   dest="reportCombines", default=False,
			   help="Display combine reports")

opa.add_option("--rec", "--report-err-combines", action="store_true",
			   dest="reportErrCombines", default=False,
			   help="Display erroneous combine reports only")

opa.add_option("--debug", action="store_true",
			   dest="debugEnabled", default=False,
			   help="Displays debug messages")

opa.add_option("--info", action="store_true",
			   dest="info", default=False,
			   help="Displays info on the suite only. Does not run any test.")
opa.add_option("--sax", action="store_true",
			   dest="validationSAX", default=False,
			   help="Use SAX2-driven validation.")
opa.add_option("--tn", action="store_true",
			   dest="displayTestName", default=False,
			   help="Display the test name in every case.")

(options, args) = opa.parse_args()

if options.combines is not None:
	options.combines = options.combines.split()

################################################
# The vars below are not intended to be changed.
#

msgSchemaNotValidButShould =  "The schema should be valid."
msgSchemaValidButShouldNot = "The schema should be invalid."
msgInstanceNotValidButShould = "The instance should be valid."
msgInstanceValidButShouldNot = "The instance should be invalid."
vendorNIST = "NIST"
vendorNIST_2 = "NIST-2"
vendorSUN  = "SUN"
vendorMS   = "MS"

###################
# Helper functions.
#
vendor = None

def handleError(test, msg):
	global options
	if not options.silent:
		test.addLibLog("'%s'   LIB: %s" % (test.name, msg))
	if msg.find("Unimplemented") > -1:
		test.failUnimplemented()
	elif msg.find("Internal") > -1:
		test.failInternal()
		
	
def fixFileNames(fileName):
	if (fileName is None) or (fileName == ""):
		return ""
	dirs = fileName.split("/")
	if dirs[1] != "Tests":
		fileName = os.path.join(".", "Tests")
		for dir in dirs[1:]:
			fileName = os.path.join(fileName, dir)	
	return fileName

class XSTCTestGroup:
	def __init__(self, name, schemaFileName, descr):
		global vendor, vendorNIST_2
		self.name = name
		self.descr = descr
		self.mainSchema = True
		self.schemaFileName = fixFileNames(schemaFileName)
		self.schemaParsed = False
		self.schemaTried = False

	def setSchema(self, schemaFileName, parsed):
		if not self.mainSchema:			
			return
		self.mainSchema = False
		self.schemaParsed = parsed
		self.schemaTried = True

class XSTCTestCase:

		   # <!-- groupName, Name, Accepted, File, Val, Descr
	def __init__(self, isSchema, groupName, name, accepted, file, val, descr):
		global options
		#
		# Constructor.
		#
		self.testRunner = None
		self.isSchema = isSchema
		self.groupName = groupName
		self.name = name
		self.accepted = accepted		
		self.fileName = fixFileNames(file)
		self.val = val
		self.descr = descr
		self.failed = False
		self.combineName = None

		self.log = []
		self.libLog = []
		self.initialMemUsed = 0
		self.memLeak = 0
		self.excepted = False
		self.bad = False
		self.unimplemented = False
		self.internalErr = False
		self.noSchemaErr = False
		self.failed = False
		#
		# Init the log.
		#
		if not options.silent:
			if self.descr is not None:
				self.log.append("'%s'   descr: %s\n" % (self.name, self.descr))		
			self.log.append("'%s'   exp validity: %d\n" % (self.name, self.val))

	def initTest(self, runner):
		global vendorNIST, vendorSUN, vendorMS, vendorNIST_2, options, vendor
		#
		# Get the test-group.
		#
		self.runner = runner
		self.group = runner.getGroup(self.groupName)				
		if vendor == vendorMS or vendor == vendorSUN:
			#
			# Use the last given directory for the combine name.
			#
			dirs = self.fileName.split("/")
			self.combineName = dirs[len(dirs) -2]					
		elif vendor == vendorNIST:
			#
			# NIST files are named in the following form:
			# "NISTSchema-short-pattern-1.xsd"
			#						
			tokens = self.name.split("-")
			self.combineName = tokens[1]
		elif vendor == vendorNIST_2:
			#
			# Group-names have the form: "atomic-normalizedString-length-1"
			#
			tokens = self.groupName.split("-")
			self.combineName = "%s-%s" % (tokens[0], tokens[1])
		else:
			self.combineName = "unkown"
			raise Exception("Could not compute the combine name of a test.")
		if (not options.silent) and (self.group.descr is not None):
			self.log.append("'%s'   group-descr: %s\n" % (self.name, self.group.descr))
		

	def addLibLog(self, msg):		
		"""This one is intended to be used by the error handler
		function"""
		global options		
		if not options.silent:
			self.libLog.append(msg)

	def fail(self, msg):
		global options
		self.failed = True
		if not options.silent:
			self.log.append("'%s' ( FAILED: %s\n" % (self.name, msg))

	def failNoSchema(self):
		global options
		self.failed = True
		self.noSchemaErr = True
		if not options.silent:
			self.log.append("'%s' X NO-SCHEMA\n" % (self.name))

	def failInternal(self):
		global options
		self.failed = True
		self.internalErr = True
		if not options.silent:
			self.log.append("'%s' * INTERNAL\n" % self.name)

	def failUnimplemented(self):
		global options
		self.failed = True
		self.unimplemented = True
		if not options.silent:
			self.log.append("'%s' ? UNIMPLEMENTED\n" % self.name)

	def failCritical(self, msg):
		global options
		self.failed = True
		self.bad = True
		if not options.silent:
			self.log.append("'%s' ! BAD: %s\n" % (self.name, msg))

	def failExcept(self, e):
		global options
		self.failed = True
		self.excepted = True
		if not options.silent:
			self.log.append("'%s' # EXCEPTION: %s\n" % (self.name, e.__str__()))

	def setUp(self):
		#
		# Set up Libxml2.
		#
		self.initialMemUsed = libxml2.debugMemory(1)
		libxml2.initParser()
		libxml2.lineNumbersDefault(1)
		libxml2.registerErrorHandler(handleError, self)

	def tearDown(self):
		libxml2.schemaCleanupTypes()
		libxml2.cleanupParser()
		self.memLeak = libxml2.debugMemory(1) - self.initialMemUsed

	def isIOError(self, file, docType):
		err = None
		try:
			err = libxml2.lastError()
		except:
			# Suppress exceptions.
			pass
		if (err is None):
			return False
		if err.domain() == libxml2.XML_FROM_IO:
			self.failCritical("failed to access the %s resource '%s'\n" % (docType, file))

	def debugMsg(self, msg):
		global options
		if options.debugEnabled:
			sys.stdout.write("'%s'   DEBUG: %s\n" % (self.name, msg))

	def finalize(self):
		global options
		"""Adds additional info to the log."""
		#
		# Add libxml2 messages.
		#
		if not options.silent:
			self.log.extend(self.libLog)
			#
			# Add memory leaks.
			#
			if self.memLeak != 0:
				self.log.append("%s + memory leak: %d bytes\n" % (self.name, self.memLeak))

	def run(self):
		"""Runs a test."""
		global options

		##filePath = os.path.join(options.baseDir, self.fileName)
		# filePath = "%s/%s/%s/%s" % (options.baseDir, self.test_Folder, self.schema_Folder, self.schema_File)
		if options.displayTestName:
			sys.stdout.write("'%s'\n" % self.name)
		try:
			self.validate()
		except (Exception, libxml2.parserError, libxml2.treeError), e:
			self.failExcept(e)
			
def parseSchema(fileName):
	schema = None
	ctxt = libxml2.schemaNewParserCtxt(fileName)
	try:
		try:
			schema = ctxt.schemaParse()
		except:
			pass
	finally:		
		del ctxt
		return schema
				

class XSTCSchemaTest(XSTCTestCase):

	def __init__(self, groupName, name, accepted, file, val, descr):
		XSTCTestCase.__init__(self, 1, groupName, name, accepted, file, val, descr)

	def validate(self):
		global msgSchemaNotValidButShould, msgSchemaValidButShouldNot
		schema = None
		filePath = self.fileName
		# os.path.join(options.baseDir, self.fileName)
		valid = 0
		try:
			#
			# Parse the schema.
			#
			self.debugMsg("loading schema: %s" % filePath)
			schema = parseSchema(filePath)
			self.debugMsg("after loading schema")						
			if schema is None:
				self.debugMsg("schema is None")
				self.debugMsg("checking for IO errors...")
				if self.isIOError(file, "schema"):
					return
			self.debugMsg("checking schema result")
			if (schema is None and self.val) or (schema is not None and self.val == 0):
				self.debugMsg("schema result is BAD")
				if (schema == None):
					self.fail(msgSchemaNotValidButShould)
				else:
					self.fail(msgSchemaValidButShouldNot)
			else:
				self.debugMsg("schema result is OK")
		finally:
			self.group.setSchema(self.fileName, schema is not None)
			del schema

class XSTCInstanceTest(XSTCTestCase):

	def __init__(self, groupName, name, accepted, file, val, descr):
		XSTCTestCase.__init__(self, 0, groupName, name, accepted, file, val, descr)

	def validate(self):
		instance = None
		schema = None
		filePath = self.fileName
		# os.path.join(options.baseDir, self.fileName)

		if not self.group.schemaParsed and self.group.schemaTried:
			self.failNoSchema()
			return
					
		self.debugMsg("loading instance: %s" % filePath)
		parserCtxt = libxml2.newParserCtxt()
		if (parserCtxt is None):
			# TODO: Is this one necessary, or will an exception
			# be already raised?
			raise Exception("Could not create the instance parser context.")
		if not options.validationSAX:
			try:
				try:
					instance = parserCtxt.ctxtReadFile(filePath, None, libxml2.XML_PARSE_NOWARNING)
				except:
					# Suppress exceptions.
					pass
			finally:
				del parserCtxt
			self.debugMsg("after loading instance")
			if instance is None:
				self.debugMsg("instance is None")
				self.failCritical("Failed to parse the instance for unknown reasons.")
				return		
		try:
			#
			# Validate the instance.
			#
			self.debugMsg("loading schema: %s" % self.group.schemaFileName)
			schema = parseSchema(self.group.schemaFileName)
			try:
				validationCtxt = schema.schemaNewValidCtxt()
				#validationCtxt = libxml2.schemaNewValidCtxt(None)
				if (validationCtxt is None):
					self.failCritical("Could not create the validation context.")
					return
				try:
					self.debugMsg("validating instance")
					if options.validationSAX:
						instance_Err = validationCtxt.schemaValidateFile(filePath, 0)
					else:
						instance_Err = validationCtxt.schemaValidateDoc(instance)
					self.debugMsg("after instance validation")
					self.debugMsg("instance-err: %d" % instance_Err)
					if (instance_Err != 0 and self.val == 1) or (instance_Err == 0 and self.val == 0):
						self.debugMsg("instance result is BAD")
						if (instance_Err != 0):
							self.fail(msgInstanceNotValidButShould)
						else:
							self.fail(msgInstanceValidButShouldNot)

					else:
								self.debugMsg("instance result is OK")
				finally:
					del validationCtxt
			finally:
				del schema
		finally:
			if instance is not None:
				instance.freeDoc()


####################
# Test runner class.
#

class XSTCTestRunner:

	CNT_TOTAL = 0
	CNT_RAN = 1
	CNT_SUCCEEDED = 2
	CNT_FAILED = 3
	CNT_UNIMPLEMENTED = 4
	CNT_INTERNAL = 5
	CNT_BAD = 6
	CNT_EXCEPTED = 7
	CNT_MEMLEAK = 8
	CNT_NOSCHEMA = 9
	CNT_NOTACCEPTED = 10
	CNT_SCHEMA_TEST = 11

	def __init__(self):
		self.logFile = None
		self.counters = self.createCounters()
		self.testList = []
		self.combinesRan = {}
		self.groups = {}
		self.curGroup = None

	def createCounters(self):
		counters = {self.CNT_TOTAL:0, self.CNT_RAN:0, self.CNT_SUCCEEDED:0,
		self.CNT_FAILED:0, self.CNT_UNIMPLEMENTED:0, self.CNT_INTERNAL:0, self.CNT_BAD:0,
		self.CNT_EXCEPTED:0, self.CNT_MEMLEAK:0, self.CNT_NOSCHEMA:0, self.CNT_NOTACCEPTED:0,
		self.CNT_SCHEMA_TEST:0}

		return counters

	def addTest(self, test):
		self.testList.append(test)
		test.initTest(self)

	def getGroup(self, groupName):
		return self.groups[groupName]

	def addGroup(self, group):
		self.groups[group.name] = group

	def updateCounters(self, test, counters):
		if test.memLeak != 0:
			counters[self.CNT_MEMLEAK] += 1
		if not test.failed:
			counters[self.CNT_SUCCEEDED] +=1
		if test.failed:
			counters[self.CNT_FAILED] += 1
		if test.bad:
			counters[self.CNT_BAD] += 1
		if test.unimplemented:
			counters[self.CNT_UNIMPLEMENTED] += 1
		if test.internalErr:
			counters[self.CNT_INTERNAL] += 1
		if test.noSchemaErr:
			counters[self.CNT_NOSCHEMA] += 1
		if test.excepted:
			counters[self.CNT_EXCEPTED] += 1
		if not test.accepted:
			counters[self.CNT_NOTACCEPTED] += 1
		if test.isSchema:
			counters[self.CNT_SCHEMA_TEST] += 1
		return counters

	def displayResults(self, out, all, combName, counters):
		out.write("\n")
		if all:
			if options.combines is not None:
				out.write("combine(s): %s\n" % str(options.combines))
		elif combName is not None:
			out.write("combine : %s\n" % combName)
		out.write("  total           : %d\n" % counters[self.CNT_TOTAL])
		if all or options.combines is not None:
			out.write("  ran             : %d\n" % counters[self.CNT_RAN])
			out.write("    (schemata)    : %d\n" % counters[self.CNT_SCHEMA_TEST])
		# out.write("    succeeded       : %d\n" % counters[self.CNT_SUCCEEDED])
		out.write("  not accepted    : %d\n" % counters[self.CNT_NOTACCEPTED])
		if counters[self.CNT_FAILED] > 0:		    
			out.write("    failed                  : %d\n" % counters[self.CNT_FAILED])
			out.write("     -> internal            : %d\n" % counters[self.CNT_INTERNAL])
			out.write("     -> unimpl.             : %d\n" % counters[self.CNT_UNIMPLEMENTED])
			out.write("     -> skip-invalid-schema : %d\n" % counters[self.CNT_NOSCHEMA])
			out.write("     -> bad                 : %d\n" % counters[self.CNT_BAD])
			out.write("     -> exceptions          : %d\n" % counters[self.CNT_EXCEPTED])
			out.write("    memory leaks            : %d\n" % counters[self.CNT_MEMLEAK])

	def displayShortResults(self, out, all, combName, counters):
		out.write("Ran %d of %d tests (%d schemata):" % (counters[self.CNT_RAN],
				  counters[self.CNT_TOTAL], counters[self.CNT_SCHEMA_TEST]))
		# out.write("    succeeded       : %d\n" % counters[self.CNT_SUCCEEDED])
		if counters[self.CNT_NOTACCEPTED] > 0:
			out.write(" %d not accepted" % (counters[self.CNT_NOTACCEPTED]))
		if counters[self.CNT_FAILED] > 0 or counters[self.CNT_MEMLEAK] > 0:
			if counters[self.CNT_FAILED] > 0:
				out.write(" %d failed" % (counters[self.CNT_FAILED]))
				out.write(" (")
				if counters[self.CNT_INTERNAL] > 0:
					out.write(" %d internal" % (counters[self.CNT_INTERNAL]))
				if counters[self.CNT_UNIMPLEMENTED] > 0:
					out.write(" %d unimplemented" % (counters[self.CNT_UNIMPLEMENTED]))
				if counters[self.CNT_NOSCHEMA] > 0:
					out.write(" %d skip-invalid-schema" % (counters[self.CNT_NOSCHEMA]))
				if counters[self.CNT_BAD] > 0:
					out.write(" %d bad" % (counters[self.CNT_BAD]))
				if counters[self.CNT_EXCEPTED] > 0:
					out.write(" %d exception" % (counters[self.CNT_EXCEPTED]))
				out.write(" )")
			if counters[self.CNT_MEMLEAK] > 0:
				out.write(" %d leaks" % (counters[self.CNT_MEMLEAK]))			
			out.write("\n")
		else:
			out.write(" all passed\n")

	def reportCombine(self, combName):
		global options

		counters = self.createCounters()
		#
		# Compute evaluation counters.
		#
		for test in self.combinesRan[combName]:
			counters[self.CNT_TOTAL] += 1
			counters[self.CNT_RAN] += 1
			counters = self.updateCounters(test, counters)
		if options.reportErrCombines and (counters[self.CNT_FAILED] == 0) and (counters[self.CNT_MEMLEAK] == 0):
			pass
		else:
			if options.enableLog:
				self.displayResults(self.logFile, False, combName, counters)				
			self.displayResults(sys.stdout, False, combName, counters)

	def displayTestLog(self, test):
		sys.stdout.writelines(test.log)
		sys.stdout.write("~~~~~~~~~~\n")

	def reportTest(self, test):
		global options

		error = test.failed or test.memLeak != 0
		#
		# Only erroneous tests will be written to the log,
		# except @verbose is switched on.
		#
		if options.enableLog and (options.verbose or error):
			self.logFile.writelines(test.log)
			self.logFile.write("~~~~~~~~~~\n")
		#
		# if not @silent, only erroneous tests will be
		# written to stdout, except @verbose is switched on.
		#
		if not options.silent:
			if options.reportInternalErrOnly and test.internalErr:
				self.displayTestLog(test)
			if options.reportMemLeakErrOnly and test.memLeak != 0:
				self.displayTestLog(test)
			if options.reportUnimplErrOnly and test.unimplemented:
				self.displayTestLog(test)
			if (options.verbose or error) and (not options.reportInternalErrOnly) and (not options.reportMemLeakErrOnly) and (not options.reportUnimplErrOnly):
				self.displayTestLog(test)


	def addToCombines(self, test):
		found = False
		if self.combinesRan.has_key(test.combineName):
			self.combinesRan[test.combineName].append(test)
		else:
			self.combinesRan[test.combineName] = [test]

	def run(self):

		global options

		if options.info:
			for test in self.testList:
				self.addToCombines(test)
			sys.stdout.write("Combines: %d\n" % len(self.combinesRan))
			sys.stdout.write("%s\n" % self.combinesRan.keys())
			return

		if options.enableLog:
			self.logFile = open(options.logFile, "w")
		try:
			for test in self.testList:
				self.counters[self.CNT_TOTAL] += 1
				#
				# Filter tests.
				#
				if options.singleTest is not None and options.singleTest != "":
					if (test.name != options.singleTest):
						continue
				elif options.combines is not None:
					if not options.combines.__contains__(test.combineName):
						continue
				elif options.testStartsWith is not None:
					if not test.name.startswith(options.testStartsWith):
						continue
				elif options.combineStartsWith is not None:
					if not test.combineName.startswith(options.combineStartsWith):
						continue
				
				if options.maxTestCount != -1 and self.counters[self.CNT_RAN] >= options.maxTestCount:
					break
				self.counters[self.CNT_RAN] += 1
				#
				# Run the thing, dammit.
				#
				try:
					test.setUp()
					try:
						test.run()
					finally:
						test.tearDown()
				finally:
					#
					# Evaluate.
					#
					test.finalize()
					self.reportTest(test)
					if options.reportCombines or options.reportErrCombines:
						self.addToCombines(test)
					self.counters = self.updateCounters(test, self.counters)
		finally:
			if options.reportCombines or options.reportErrCombines:
				#
				# Build a report for every single combine.
				#
				# TODO: How to sort a dict?
				#
				self.combinesRan.keys().sort(None)
				for key in self.combinesRan.keys():
					self.reportCombine(key)

			#
			# Display the final report.
			#
			if options.silent:
				self.displayShortResults(sys.stdout, True, None, self.counters)
			else:
				sys.stdout.write("===========================\n")
				self.displayResults(sys.stdout, True, None, self.counters)