import sys, re, os, shutil, bisect, textwrap, shlex
import svntest
from svntest import main, actions, tree
from svntest import Failure
if sys.version_info[0] >= 3:
from io import StringIO
else:
from StringIO import StringIO
def make(wc_dir, commands, prev_status=None, prev_disk=None, verbose=True):
"""The Factory Invocation Function. This is typically the only one
called from outside this file. See top comment in factory.py.
Prints the resulting py script to stdout when verbose is True and
returns the resulting line-list containing items as:
[ ['pseudo-shell input line #1', ' translation\n to\n py #1'], ...]"""
fac = TestFactory(wc_dir, prev_status, prev_disk)
fac.make(commands)
fac.print_script()
return fac.lines
class TestFactory:
"""This class keeps all state around a factory.make() call."""
def __init__(self, sbox, prev_status=None, prev_disk=None):
self.sbox = sbox
self.lines = []
self.prev_status = None
if prev_status:
self.prev_status = [None, prev_status]
self.prev_disk = None
if prev_disk:
self.prev_disk = [None, prev_disk]
self.keep_args_of = ['--depth', '--encoding', '-r',
'--changelist', '-m', '--message']
self.prevdirs = []
self.vars = {}
self.sorted_vars_by_pathlen = []
self.used_wc_dir = False
self.used_url = False
self.other_wc_dirs = {}
def make(self, commands):
"internal main function, delegates everything except final output."
self.add_line(None, None)
init = ""
if not self.sbox.is_built():
self.sbox.build()
init += "sbox.build()\n"
try:
input_lines = commands.replace(';','\n').splitlines()
for str in input_lines:
if len(str.strip()) > 0:
self.add_line(str)
for i in range(len(self.lines)):
if self.lines[i][0] is not None:
self.lines[i][1] = self.switch(self.lines[i][0])
self.add_line(
None,
"Remember, this only saves you typing. Doublecheck everything.")
if self.used_wc_dir:
init += 'wc_dir = sbox.wc_dir\n'
if self.used_url:
init += 'url = sbox.repo_url\n'
sorted_names = self.get_sorted_other_wc_dir_names()
for name in sorted_names:
init += name + ' = ' + self.other_wc_dirs[name][0] + '\n'
if len(init) > 0:
init += '\n'
sorted_names = self.get_sorted_var_names()
for name in sorted_names:
init += name + ' = ' + self.vars[name][0] + '\n'
if len(init) > 0:
self.lines[0][1] = init
return self.lines
except:
for line in self.lines:
if line[1] is not None:
print(line[1])
raise
def print_script(self, stream=sys.stdout):
"Output the resulting script of the preceding make() call"
if self.lines is not None:
for line in self.lines:
if line[1] is None:
stripped = line[0].strip()
if not stripped.startswith('#'):
stream.write(" # don't know how to handle:\n")
stream.write(" " + line[0].strip() + '\n')
else:
if line[0] is not None:
stream.write( wrap_each_line(line[0].strip(),
" # ", " # ", True) + '\n')
stream.write(wrap_each_line(line[1], " ", " ", False) + '\n\n')
else:
stream.write(" # empty.\n")
stream.flush()
def switch(self, line):
"Given one input line, delegates to the appropriate sub-functions."
args = shlex.split(line)
if len(args) < 1:
return ""
first = args[0]
if first == 'svn':
second = args[1]
if second == 'add':
return self.cmd_svn(args[1:], False, self.keep_args_of)
if second in ['changelist', 'cl']:
keep_count = 2
if '--remove' in args:
keep_count = 1
return self.cmd_svn(args[1:], False, self.keep_args_of, keep_count)
if second in ['status','stat','st']:
return self.cmd_svn_status(args[2:])
if second in ['commit','ci']:
return self.cmd_svn_commit(args[2:])
if second in ['update','up']:
return self.cmd_svn_update(args[2:])
if second in ['switch','sw']:
return self.cmd_svn_switch(args[2:])
if second in ['copy', 'cp',
'move', 'mv', 'rename', 'ren']:
return self.cmd_svn_copy_move(args[1:])
if second in ['checkout', 'co']:
return self.cmd_svn_checkout(args[2:])
if second in ['propset','pset','ps']:
return self.cmd_svn(args[1:], False,
self.keep_args_of, 3)
if second in ['delete','del','remove', 'rm']:
return self.cmd_svn(args[1:], False,
self.keep_args_of + ['--with-revprop'])
return self.cmd_svn(args[1:], False, self.keep_args_of)
if first == 'echo':
return self.cmd_echo(args[1:])
if first == 'mkdir':
return self.cmd_mkdir(args[1:])
if first == 'rm':
return self.cmd_rm(args[1:])
if first == 'mv':
return self.cmd_mv(args[1:])
if first == 'cp':
return self.cmd_cp(args[1:])
return None
def cmd_svn_standard_run(self, pyargs, runargs, do_chdir, wc):
"The generic invocation of svn, helper function."
pychdir = self.chdir(do_chdir, wc)
code, out, err = main.run_svn("Maybe", *runargs)
if code == 0 and len(err) < 1:
pylist = self.strlist2py(out)
if len(out) <= 1:
py = "expected_stdout = " + pylist + "\n\n"
else:
py = "expected_stdout = verify.UnorderedOutput(" + pylist + ")\n\n"
py += pychdir
py += "actions.run_and_verify_svn2('OUTPUT', expected_stdout, [], 0"
else:
pylist = self.strlist2py(err)
if len(err) <= 1:
py = "expected_stderr = " + pylist + "\n\n"
else:
py = "expected_stderr = verify.UnorderedOutput(" + pylist + ")\n\n"
py += pychdir
py += ("actions.run_and_verify_svn2('OUTPUT', " +
"[], expected_stderr, " + str(code))
if len(pyargs) > 0:
py += ", " + ", ".join(pyargs)
py += ")\n"
py += self.chdir_back(do_chdir)
return py
def cmd_svn(self, svnargs, append_wc_dir_if_missing = False,
keep_args_of = [], keep_first_count = 1,
drop_with_arg = []):
"Handles all svn calls not handled by more specific functions."
pyargs, runargs, do_chdir, targets = self.args2svntest(svnargs,
append_wc_dir_if_missing, keep_args_of,
keep_first_count, drop_with_arg)
return self.cmd_svn_standard_run(pyargs, runargs, do_chdir,
self.get_first_wc(targets))
def cmd_svn_status(self, status_args):
"Runs svn status, looks what happened and writes the script for it."
pyargs, runargs, do_chdir, targets = self.args2svntest(
status_args, True, self.keep_args_of, 0)
py = ""
for target in targets:
if not target.wc:
py += '# SKIPPING NON-WC ' + target.runarg + '\n'
continue
if '-q' in status_args:
pystatus = self.get_current_status(target.wc, True)
py += (pystatus +
"actions.run_and_verify_status(" + target.wc.py +
", expected_status)\n")
else:
pystatus = self.get_current_status(target.wc, False)
py += (pystatus +
"actions.run_and_verify_unquiet_status(" + target.wc.py +
", expected_status)\n")
return py
def cmd_svn_commit(self, commit_args):
"Runs svn commit, looks what happened and writes the script for it."
commit_arg_opts = [
"--depth",
"--with-revprop",
"--changelist",
]
pyargs, runargs, do_chdir, targets = self.args2svntest(
commit_args, True, commit_arg_opts, 0, ['-m', '--message'])
wc = self.get_first_wc(targets)
pychdir = self.chdir(do_chdir, wc)
code, output, err = main.run_svn("Maybe", 'ci',
'-m', 'log msg',
*runargs)
if code == 0 and len(err) < 1:
output = actions.process_output_for_commit(output)
actual_out = tree.build_tree_from_commit(output)
py = ("expected_output = " +
self.tree2py(actual_out, wc) + "\n\n")
pystatus = self.get_current_status(wc)
py += pystatus
py += pychdir
py += ("actions.run_and_verify_commit(" + wc.py + ", " +
"expected_output, expected_status, " +
"None")
else:
py = "expected_error = " + self.strlist2py(err) + "\n\n"
py += pychdir
py += ("actions.run_and_verify_commit(" + wc.py + ", " +
"None, None, expected_error")
if len(pyargs) > 0:
py += ', ' + ', '.join(pyargs)
py += ")"
py += self.chdir_back(do_chdir)
return py
def cmd_svn_update(self, update_args):
"Runs svn update, looks what happened and writes the script for it."
pyargs, runargs, do_chdir, targets = self.args2svntest(
update_args, True, self.keep_args_of, 0)
wc = self.get_first_wc(targets)
pychdir = self.chdir(do_chdir, wc)
code, output, err = main.run_svn('Maybe', 'up', *runargs)
if code == 0 and len(err) < 1:
actual_out = svntest.wc.State.from_checkout(output).old_tree()
py = ("expected_output = " +
self.tree2py(actual_out, wc) + "\n\n")
pydisk = self.get_current_disk(wc)
py += pydisk
pystatus = self.get_current_status(wc)
py += pystatus
py += pychdir
py += ("actions.run_and_verify_update(" + wc.py + ", " +
"expected_output, expected_disk, expected_status, " +
"None, None, None, None, None, False")
else:
py = "expected_error = " + self.strlist2py(err) + "\n\n"
py += pychdir
py += ("actions.run_and_verify_update(" + wc.py + ", None, None, " +
"None, expected_error, None, None, None, None, False")
if len(pyargs) > 0:
py += ', ' + ', '.join(pyargs)
py += ")"
py += self.chdir_back(do_chdir)
return py
def cmd_svn_switch(self, switch_args):
"Runs svn switch, looks what happened and writes the script for it."
pyargs, runargs, do_chdir, targets = self.args2svntest(
switch_args, True, self.keep_args_of, 0)
if len(targets) < 2:
raise Failure("Sorry, I'm currently enforcing two targets for svn " +
"switch. If you want to supply less, remove this " +
"check and implement whatever seems appropriate.")
wc_arg = targets[1]
del pyargs[wc_arg.argnr]
del runargs[wc_arg.argnr]
url_arg = targets[0]
del pyargs[url_arg.argnr]
del runargs[url_arg.argnr]
wc = wc_arg.wc
if not wc:
raise Failure("Unexpected argument ordering to factory's 'svn switch'?")
pychdir = self.chdir(do_chdir, wc)
code, output, err = main.run_svn('Maybe', 'sw',
url_arg.runarg, wc_arg.runarg,
*runargs)
py = ""
if code == 0 and len(err) < 1:
actual_out = tree.build_tree_from_checkout(output)
py = ("expected_output = " +
self.tree2py(actual_out, wc) + "\n\n")
pydisk = self.get_current_disk(wc)
py += pydisk
pystatus = self.get_current_status(wc)
py += pystatus
py += pychdir
py += ("actions.run_and_verify_switch(" + wc.py + ", " +
wc_arg.pyarg + ", " + url_arg.pyarg + ", " +
"expected_output, expected_disk, expected_status, " +
"None, None, None, None, None, False")
else:
py = "expected_error = " + self.strlist2py(err) + "\n\n"
py += pychdir
py += ("actions.run_and_verify_switch(" + wc.py + ", " +
wc_arg.pyarg + ", " + url_arg.pyarg + ", " +
"None, None, None, expected_error, None, None, None, None, False")
if len(pyargs) > 0:
py += ', ' + ', '.join(pyargs)
py += ")"
py += self.chdir_back(do_chdir)
return py
def cmd_svn_checkout(self, checkout_args):
"Runs svn checkout, looks what happened and writes the script for it."
pyargs, runargs, do_chdir, targets = self.args2svntest(
checkout_args, True, self.keep_args_of, 0)
if len(targets) < 2:
raise Failure("Sorry, I'm currently enforcing two targets for svn " +
"checkout. If you want to supply less, remove this " +
"check and implement whatever seems appropriate.")
wc_arg = targets[1]
del pyargs[wc_arg.argnr]
del runargs[wc_arg.argnr]
url_arg = targets[0]
del pyargs[url_arg.argnr]
del runargs[url_arg.argnr]
wc = wc_arg.wc
pychdir = self.chdir(do_chdir, wc)
code, output, err = main.run_svn('Maybe', 'co',
url_arg.runarg, wc_arg.runarg,
*runargs)
py = ""
if code == 0 and len(err) < 1:
actual_out = tree.build_tree_from_checkout(output)
pyout = ("expected_output = " +
self.tree2py(actual_out, wc) + "\n\n")
py += pyout
pydisk = self.get_current_disk(wc)
py += pydisk
py += pychdir
py += ("actions.run_and_verify_checkout(" +
url_arg.pyarg + ", " + wc_arg.pyarg +
", expected_output, expected_disk, None, None, None, None")
else:
pylist = self.strlist2py(err)
if len(err) <= 1:
py += "expected_stderr = " + pylist + "\n\n"
else:
py += "expected_stderr = verify.UnorderedOutput(" + pylist + ")\n\n"
py += pychdir
py += ("actions.run_and_verify_svn2('OUTPUT', " +
"[], expected_stderr, " + str(code) +
", " + url_arg.pyarg + ", " + wc_arg.pyarg)
if len(pyargs) > 0:
py += ', ' + ', '.join(pyargs)
py += ")"
py += self.chdir_back(do_chdir)
return py
def cmd_svn_copy_move(self, args):
"Runs svn copy or move, looks what happened and writes the script for it."
pyargs, runargs, do_chdir, targets = self.args2svntest(args,
False, self.keep_args_of, 1)
if len(targets) == 2 and targets[1].is_url:
has_message = False
for arg in runargs:
if arg.startswith('-m') or arg == '--message':
has_message = True
break
if not has_message:
runargs += [ '-m', 'copy log' ]
pyargs = []
for arg in runargs:
pyargs += [ self.str2svntest(arg) ]
return self.cmd_svn_standard_run(pyargs, runargs, do_chdir,
self.get_first_wc(targets))
def cmd_echo(self, echo_args):
"Writes a string to a file and writes the script for it."
target_arg = None
replace = True
contents = None
for i in range(len(echo_args)):
arg = echo_args[i]
if arg.startswith('>'):
if len(arg) > 1:
if arg[1] == '>':
replace = False
arg = arg[2:]
else:
arg = arg[1:]
if len(arg) > 0:
target_arg = arg
if target_arg is None:
if i+1 != len(echo_args)-1:
raise Failure("don't understand: echo " + " ".join(echo_args))
target_arg = echo_args[i+1]
else:
if i != len(echo_args)-1:
raise Failure("don't understand: echo " + " ".join(echo_args))
contents = " ".join(echo_args[:i]) + '\n'
if target_arg is None:
raise Failure("echo needs a '>' pipe to a file name: echo " +
" ".join(echo_args))
target = self.path2svntest(target_arg)
if replace:
main.file_write(target.runarg, contents)
py = "main.file_write("
else:
main.file_append(target.runarg, contents)
py = "main.file_append("
py += target.pyarg + ", " + self.str2svntest(contents) + ")"
return py
def cmd_mkdir(self, mkdir_args):
"Makes a new directory and writes the script for it."
out = ""
for arg in mkdir_args:
if not arg.startswith('-'):
target = self.path2svntest(arg)
os.makedirs(target.runarg)
out += "os.makedirs(" + target.pyarg + ")\n"
return out
def cmd_rm(self, rm_args):
"Removes a directory tree and writes the script for it."
out = ""
for arg in rm_args:
if not arg.startswith('-'):
target = self.path2svntest(arg)
if os.path.isfile(target.runarg):
os.remove(target.runarg)
out += "os.remove(" + target.pyarg + ")\n"
else:
self.really_safe_rmtree(target.runarg)
out += "main.safe_rmtree(" + target.pyarg + ")\n"
return out
def cmd_mv(self, mv_args):
"Moves things in the filesystem and writes the script for it."
out = ""
sources = []
target = None
for arg in mv_args:
if not arg.startswith('-'):
if target is not None:
sources += [target]
target = self.path2svntest(arg)
out = ""
for source in sources:
out += "shutil.move(" + source.pyarg + ", " + target.pyarg + ")\n"
shutil.move(source.runarg, target.runarg)
return out
def cmd_cp(self, mv_args):
"Copies in the filesystem and writes the script for it."
out = ""
sources = []
target = None
for arg in mv_args:
if not arg.startswith('-'):
if target is not None:
sources += [target]
target = self.path2svntest(arg)
if not target:
raise Failure("cp needs a source and a target 'cp wc_dir wc_dir_2'")
out = ""
for source in sources:
if os.path.exists(target.runarg):
raise Failure("cp target exists, remove first: " + target.pyarg)
if os.path.isdir(source.runarg):
shutil.copytree(source.runarg, target.runarg)
out += "shutil.copytree(" + source.pyarg + ", " + target.pyarg + ")\n"
elif os.path.isfile(source.runarg):
shutil.copy2(source.runarg, target.runarg)
out += "shutil.copy2(" + source.pyarg + ", " + target.pyarg + ")\n"
else:
raise Failure("cp copy source does not exist: " + source.pyarg)
return out
class WorkingCopy:
"Defines the list of info we need around a working copy."
def __init__(self, py, realpath, suffix):
self.py = py
self.realpath = realpath
self.suffix = suffix
class Target:
"Defines the list of info we need around a command line supplied target."
def __init__(self, pyarg, runarg, argnr, is_url=False, wc=None):
self.pyarg = pyarg
self.runarg = runarg
self.argnr = argnr
self.is_url = is_url
self.wc = wc
def add_line(self, args, translation=None):
"Definition of how to add a new in/out line pair to LINES."
self.lines += [ [args, translation] ]
def really_safe_rmtree(self, dir):
if dir.find('svn-test-work') < 0:
raise Failure("Tried to remove path outside working area: " + dir)
main.safe_rmtree(dir)
def get_current_disk(self, wc):
"Probes the given working copy and writes an expected_disk for it."
actual_disk = svntest.wc.State.from_wc(wc.realpath, False, True)
actual_disk.wc_dir = wc.realpath
make_py, prev_disk = self.get_prev_disk(wc)
actual_disk_tree = actual_disk.old_tree()
prev_disk_tree = prev_disk.old_tree()
tweaks = self.diff_trees(prev_disk_tree, actual_disk_tree, wc)
if tweaks == 'Purge':
make_py = ''
else:
tweaks = self.optimize_tweaks(tweaks, actual_disk_tree, wc)
self.remember_disk(wc, actual_disk)
pydisk = make_py + self.tweaks2py(tweaks, "expected_disk", wc)
if len(pydisk) > 0:
pydisk += '\n'
return pydisk
def get_prev_disk(self, wc):
"Retrieves the last used expected_disk tree if any."
make_py = ""
if self.prev_disk is None or \
not self.prev_disk[0] in [None, wc.realpath]:
disk = svntest.main.greek_state.copy()
disk.wc_dir = wc.realpath
self.remember_disk(wc, disk)
make_py = "expected_disk = svntest.main.greek_state.copy()\n"
else:
disk = self.prev_disk[1]
return make_py, disk
def remember_disk(self, wc, actual):
"Remembers the current disk tree for future reference."
self.prev_disk = [wc.realpath, actual]
def get_current_status(self, wc, quiet=True):
"Probes the given working copy and writes an expected_status for it."
if quiet:
code, output, err = main.run_svn(None, 'status', '-v', '-u', '-q',
wc.realpath)
else:
code, output, err = main.run_svn(None, 'status', '-v', '-u',
wc.realpath)
if code != 0 or len(err) > 0:
raise Failure("Hmm. `svn status' failed. What now.")
make_py, prev_status = self.get_prev_status(wc)
actual_status = svntest.wc.State.from_status(output)
prev_status_tree = prev_status.old_tree()
actual_status_tree = actual_status.old_tree()
tweaks = self.diff_trees(prev_status_tree, actual_status_tree, wc)
if tweaks == 'Purge':
make_py = "expected_status = wc.State(" + wc.py + ", {})\n"
tweaks = []
else:
tweaks = self.optimize_tweaks(tweaks, actual_status_tree, wc)
self.remember_status(wc, actual_status)
pystatus = make_py + self.tweaks2py(tweaks, "expected_status", wc)
if len(pystatus) > 0:
pystatus += '\n'
return pystatus
def get_prev_status(self, wc):
"Retrieves the last used expected_status tree if any."
make_py = ""
prev_status = None
if self.prev_status is None or \
not self.prev_status[0] in [None, wc.realpath]:
try:
base_rev = actions.get_wc_base_rev(wc.realpath)
except:
base_rev = 0
prev_status = actions.get_virginal_state(wc.realpath, base_rev)
make_py += ("expected_status = actions.get_virginal_state(" +
wc.py + ", " + str(base_rev) + ")\n")
else:
prev_status = self.prev_status[1]
return make_py, prev_status
def remember_status(self, wc, actual_status):
"Remembers the current status tree for future reference."
self.prev_status = [wc.realpath, actual_status]
def chdir(self, do_chdir, wc):
"Pushes the current dir onto the dir stack, does an os.chdir()."
if not do_chdir:
return ""
self.prevdirs.append(os.getcwd())
os.chdir(wc.realpath)
py = ("orig_dir = os.getcwd() # Need to chdir because of '^/' args\n" +
"os.chdir(" + wc.py + ")\n")
return py
def chdir_back(self, do_chdir):
"Does os.chdir() back to the directory popped from the dir stack's top."
if not do_chdir:
return ""
os.chdir(self.prevdirs.pop())
return "os.chdir(orig_dir)\n"
def get_sorted_vars_by_pathlen(self):
"""Compose a listing of variable names to be expanded in script output.
This is intended to be stored in self.sorted_vars_by_pathlen."""
lst = []
for dict in [self.vars, self.other_wc_dirs]:
for name in dict:
runpath = dict[name][1]
if not runpath:
continue
strlen = len(runpath)
item = [strlen, name, runpath]
bisect.insort(lst, item)
return lst
def get_sorted_var_names(self):
"""Compose a listing of variable names to be declared.
This is used by TestFactory.make()."""
paths = []
urls = []
for name in self.vars:
if name.startswith('url_'):
bisect.insort(urls, [name.lower(), name])
else:
bisect.insort(paths, [name.lower(), name])
list = []
for path in paths:
list += [path[1]]
for url in urls:
list += [url[1]]
return list
def get_sorted_other_wc_dir_names(self):
"""Compose a listing of working copies to be declared with sbox.
This is used by TestFactory.make()."""
list = []
for name in self.other_wc_dirs:
bisect.insort(list, [name.lower(), name])
names = []
for item in list:
names += [item[1]]
return names
def str2svntest(self, str):
"Like str2py(), but replaces any known paths with variable names."
if str is None:
return "None"
str = str2py(str)
quote = str[0]
def replace(str, path, name, quote):
return str.replace(path, quote + " + " + name + " + " + quote)
for var in reversed(self.sorted_vars_by_pathlen):
name = var[1]
path = var[2]
str = replace(str, path, name, quote)
str = replace(str, self.sbox.wc_dir, 'wc_dir', quote)
str = replace(str, self.sbox.repo_url, 'url', quote)
str = str.replace("'' + ",'').replace(" + ''",'')
str = str.replace('"" + ',"").replace(' + ""',"")
if not self.used_wc_dir:
self.used_wc_dir = (re.search('\bwc_dir\b', str) is not None)
if not self.used_url:
self.used_url = str.find('url') >= 0
return str
def strlist2py(self, list):
"Given a list of strings, composes a py script that produces the same."
if list is None:
return "None"
if len(list) < 1:
return "[]"
if len(list) == 1:
return "[" + self.str2svntest(list[0]) + "]"
py = "[\n"
for line in list:
py += " " + self.str2svntest(line) + ",\n"
py += "]"
return py
def get_node_path(self, node, wc):
"Tries to return the node path relative to the given working copy."
path = node.get_printable_path()
if path.startswith(wc.realpath + os.sep):
path = path[len(wc.realpath + os.sep):]
elif path.startswith(wc.realpath):
path = path[len(wc.realpath):]
return path
def node2py(self, node, wc, prepend="", drop_empties=True):
"Creates a line like 'A/C' : Item({ ... }) for wc.State composition."
buf = StringIO()
node.print_script(buf, wc.realpath, prepend, drop_empties)
return buf.getvalue()
def tree2py(self, node, wc):
"Writes the wc.State definition for the given SVNTreeNode in given WC."
buf = StringIO()
tree.dump_tree_script(node, stream=buf, subtree=wc.realpath,
wc_varname=wc.py)
return buf.getvalue()
def diff_trees(self, left, right, wc):
"""Compares the two trees given by the SVNTreeNode instances LEFT and
RIGHT in the given working copy and composes an internal list of
tweaks necessary to make LEFT into RIGHT."""
if not right.children:
return 'Purge'
return self._diff_trees(left, right, wc)
def _diff_trees(self, left, right, wc):
"Used by self.diff_trees(). No need to call this. See there."
tweaks = []
path = self.get_node_path(left, wc)
tweak = []
if ((left.contents is None) != (right.contents is None)) or \
(left.contents != right.contents):
tweak += [ ["contents", right.contents] ]
for key in left.props:
if key not in right.props:
tweak += [ [key, None] ]
elif left.props[key] != right.props[key]:
tweak += [ [key, right.props[key]] ]
for key in right.props:
if key not in left.props:
tweak += [ [key, right.props[key]] ]
for key in left.atts:
if key not in right.atts:
tweak += [ [key, None] ]
elif left.atts[key] != right.atts[key]:
tweak += [ [key, right.atts[key]] ]
for key in right.atts:
if key not in left.atts:
tweak += [ [key, right.atts[key]] ]
if len(tweak) > 0:
changetweak = [ 'Change', [path], tweak]
tweaks += [changetweak]
if left.children is not None:
for leftchild in left.children:
rightchild = None
if right.children is not None:
rightchild = tree.get_child(right, leftchild.name)
if rightchild is None:
paths = leftchild.recurse(lambda n: self.get_node_path(n, wc))
removetweak = [ 'Remove', paths ]
tweaks += [removetweak]
if right.children is not None:
for rightchild in right.children:
leftchild = None
if left.children is not None:
leftchild = tree.get_child(left, rightchild.name)
if leftchild is None:
paths_and_nodes = rightchild.recurse(
lambda n: [ self.get_node_path(n, wc), n ] )
addtweak = [ 'Add', paths_and_nodes ]
tweaks += [addtweak]
else:
tweaks += self._diff_trees(leftchild, rightchild, wc)
return tweaks
def optimize_tweaks(self, tweaks, actual_tree, wc):
"Given an internal list of tweaks, make them optimal by common sense."
if tweaks == 'Purge':
return tweaks
subtree = actual_tree.find_node(wc.realpath)
if not subtree:
subtree = actual_tree
remove_paths = []
additions = []
changes = []
for tweak in tweaks:
if tweak[0] == 'Remove':
remove_paths += tweak[1]
elif tweak[0] == 'Add':
additions += tweak[1]
else:
changes += [tweak]
removal = []
if len(remove_paths) > 0:
removal = [ [ 'Remove', remove_paths] ]
addition = []
if len(additions) > 0:
addition = [ [ 'Add', additions ] ]
def remove_mod(mod):
for change in changes:
if mod in change[2]:
change[2].remove(mod)
seen = []
tweak_all = []
for change in changes:
tweak = change[2]
for mod in tweak:
if mod in seen:
continue
seen += [mod]
name = mod[0]
val = mod[1]
if name == 'contents' and val is None:
continue;
def check_node(node):
if (
(name == 'contents' and node.contents == val)
or
(node.props and (name in node.props) and node.props[name] == val)
or
(node.atts and (name in node.atts) and node.atts[name] == val)):
return [node, None]
return [None, node]
results = subtree.recurse(check_node)
have = []
havent = []
for result in results:
if result[0]:
have += [result[0]]
else:
havent += [result[1]]
if havent == []:
remove_mod(mod)
tweak_all += [mod]
elif len(havent) < len(have) * 3: remove_mod(mod)
tweak_all += [mod]
for node in havent:
name = mod[0]
if name == 'contents':
value = node.contents
elif name in node.props:
value = node.props[name]
elif name in node.atts:
value = node.atts[name]
else:
continue
changes += [ ['Change',
[self.get_node_path(node, wc)],
[[name, value]]
]
]
i = 0
j = 0
while i < len(changes):
j = i + 1
while j < len(changes):
if changes[i][2] == changes[j][2]:
changes[i][1] += changes[j][1]
del changes[j]
else:
j += 1
i += 1
i = 0
j = 0
while i < len(changes):
j = i + 1
while j < len(changes):
if changes[i][1] == changes[j][1]:
changes[i][2] += changes[j][2]
del changes[j]
else:
j += 1
i += 1
if tweak_all != []:
changes = [ ['Change', [], tweak_all ] ] + changes
return removal + addition + changes
def tweaks2py(self, tweaks, var_name, wc):
"Given an internal list of tweaks, write the tweak script for it."
py = ""
if tweaks is None:
return ""
if tweaks == 'Purge':
return var_name + " = wc.State(" + wc.py + ", {})\n"
for tweak in tweaks:
if tweak[0] == 'Remove':
py += var_name + ".remove("
paths = tweak[1]
py += self.str2svntest(paths[0])
for path in paths[1:]:
py += ", " + self.str2svntest(path)
py += ")\n"
elif tweak[0] == 'Add':
py += var_name + ".add({"
adds = tweak[1]
for add in adds:
path = add[0]
node = add[1]
py += self.node2py(node, wc, "\n ", False)
py += "\n})\n"
else:
paths = tweak[1]
mods = tweak[2]
if mods != []:
py += var_name + ".tweak("
for path in paths:
py += self.str2svntest(path) + ", "
def mod2py(mod):
return mod[0] + "=" + self.str2svntest(mod[1])
py += mod2py(mods[0])
for mod in mods[1:]:
py += ", " + mod2py(mod)
py += ")\n"
return py
def path2svntest(self, path, argnr=None, do_remove_on_new_wc_path=True):
"""Given an input argument, do one hell of a path expansion on it.
ARGNR is simply inserted into the resulting Target.
Returns a self.Target instance.
"""
wc = self.WorkingCopy('wc_dir', self.sbox.wc_dir, None)
url = self.sbox.repo_url
pathsep = '/'
if path.find('/') < 0 and path.find('\\') >= 0:
pathsep = '\\'
is_url = False
wc_dir_wildcards = ['wc_dir', 'wcdir', '$WC_DIR', '$WC']
url_wildcards = ['url', '$URL']
first = path.split(pathsep, 1)[0]
if first in wc_dir_wildcards:
path = path[len(first):]
elif first in url_wildcards:
path = path[len(first):]
is_url = True
else:
for url_scheme in ['^/', 'file:/', 'http:/', 'svn:/', 'svn+ssh:/']:
if path.startswith(url_scheme):
is_url = True
pyarg = self.str2svntest(path)
runarg = path
return self.Target(pyarg, runarg, argnr, is_url, None)
for wc_dir_wildcard in wc_dir_wildcards:
if first.startswith(wc_dir_wildcard):
suffix = ''
if first[len(wc_dir_wildcard)] in ['.','-','_']:
suffix = first[len(wc_dir_wildcard) + 1:]
if len(suffix) < 1:
suffix = first[len(wc_dir_wildcard):]
if len(suffix) < 1:
raise Failure("no suffix supplied to other-wc_dir arg")
suffix = suffix.replace('.','_').replace('-','_')
other_wc_dir_varname = 'wc_dir_' + suffix
path = path[len(first):]
real_path = self.get_other_wc_real_path(other_wc_dir_varname,
suffix,
do_remove_on_new_wc_path)
wc = self.WorkingCopy(other_wc_dir_varname,
real_path, suffix)
break
if len(path) < 1 or path == pathsep:
if is_url:
self.used_url = True
pyarg = 'url'
runarg = url
wc = None
else:
if wc.suffix is None:
self.used_wc_dir = True
pyarg = wc.py
runarg = wc.realpath
else:
pathelements = split_remove_empty(path, pathsep)
if is_url:
pyarg, runarg = self.ensure_url_var(pathelements)
wc = None
else:
pyarg, runarg = self.ensure_path_var(wc, pathelements)
return self.Target(pyarg, runarg, argnr, is_url, wc)
def get_other_wc_real_path(self, varname, suffix, do_remove):
"Create or retrieve the path of an alternate working copy."
if varname in self.other_wc_dirs:
return self.other_wc_dirs[varname][1]
path = self.sbox.wc_dir + '.' + suffix
if path in self.sbox.test_paths:
py = "sbox.wc_dir + '." + suffix + "'"
else:
path = self.sbox.add_wc_path(suffix, do_remove)
py = "sbox.add_wc_path(" + str2py(suffix)
if not do_remove:
py += ", remove=False"
py += ')'
value = [py, path]
self.other_wc_dirs[varname] = [py, path]
self.sorted_vars_by_pathlen = self.get_sorted_vars_by_pathlen()
return path
def define_var(self, name, value):
"Add a variable definition, don't allow redefinitions."
if name in self.vars:
if self.vars[name] != value:
raise Failure("Variable name collision. Hm, fix factory.py?")
return
self.vars[name] = value
self.sorted_vars_by_pathlen = self.get_sorted_vars_by_pathlen()
def ensure_path_var(self, wc, pathelements):
"Given a path in a working copy, make sure we have a variable for it."
name = "_".join(pathelements)
if wc.suffix is not None:
name = wc.suffix + "_" + name
if name[0].isdigit():
name = "_" + name
else:
self.used_wc_dir = True
py = 'os.path.join(' + wc.py
if len(pathelements) > 0:
py += ", '" + "', '".join(pathelements) + "'"
py += ')'
wc_dir_real_path = wc.realpath
run = os.path.join(wc_dir_real_path, *pathelements)
value = [py, run]
self.define_var(name, value)
return name, run
def ensure_url_var(self, pathelements):
"Given a path in the test repository, ensure we have a url var for it."
name = "url_" + "_".join(pathelements)
joined = "/" + "/".join(pathelements)
py = 'url'
if len(pathelements) > 0:
py += " + " + str2py(joined)
self.used_url = True
run = self.sbox.repo_url + joined
value = [py, run]
self.define_var(name, value)
return name, run
def get_first_wc(self, target_list):
"""In a list of Target instances, find the first one that is in a
working copy and return that WorkingCopy. Default to sbox.wc_dir.
This is useful if we need a working copy for a '^/' URL."""
for target in target_list:
if target.wc:
return target.wc
return self.WorkingCopy('wc_dir', self.sbox.wc_dir, None)
def args2svntest(self, args, append_wc_dir_if_missing = False,
keep_args_of = [], keep_first_count = 1,
drop_with_arg = []):
"""Tries to be extremely intelligent at parsing command line arguments.
It needs to know which args are file targets that should be in a
working copy. File targets are magically expanded.
args: list of string tokens as passed to factory.make(), e.g.
['svn', 'commit', '--force', 'wc_dir2']
append_wc_dir_if_missing: It's a switch.
keep_args_of: See TestFactory.keep_args_of (comment in __init__)
keep_first_count: Don't expand the first N non-option args. This is used
to preserve e.g. the token 'update' in '[svn] update wc_dir'
(the 'svn' is usually split off before this function is called).
drop_with_arg: list of string tokens that are commandline options with
following argument which we want to drop from the list of args
(e.g. -m message).
"""
wc_dir = self.sbox.wc_dir
url = self.sbox.repo_url
target_supplied = False
pyargs = []
runargs = []
do_chdir = False
targets = []
wc_dirs = []
i = 0
while i < len(args):
arg = args[i]
if arg in drop_with_arg:
if not arg.startswith('--') and len(arg) > 2:
i = i
else:
i += 1
elif arg.startswith('-'):
pyargs += [ self.str2svntest(arg) ]
runargs += [ arg ]
if arg in keep_args_of:
i += 1
if i < len(args):
arg = args[i]
pyargs += [ self.str2svntest(arg) ]
runargs += [ arg ]
elif keep_first_count > 0:
pyargs += [ self.str2svntest(arg) ]
runargs += [ arg ]
keep_first_count -= 1
elif arg.startswith('^/'):
do_chdir = True
pyarg = str2py(arg)
targets += [ self.Target(pyarg, arg, len(pyargs), True, None) ]
pyargs += [ pyarg ]
runargs += [ arg ]
else:
target = self.path2svntest(arg, argnr=len(pyargs))
pyargs += [ target.pyarg ]
runargs += [ target.runarg ]
target_supplied = True
targets += [ target ]
i += 1
if not target_supplied and append_wc_dir_if_missing:
self.used_wc_dir = True
wc = self.WorkingCopy('wc_dir', wc_dir, None)
targets += [ self.Target('wc_dir', wc_dir, len(pyargs), False, wc) ]
pyargs += [ 'wc_dir' ]
runargs += [ wc_dir ]
return pyargs, runargs, do_chdir, targets
def find_quote_end(text, i):
"In string TEXT, find the end of the qoute that starts at TEXT[i]"
quote = text[i]
i += 1
while i < len(text):
if text[i] == '\\':
i += 1
elif text[i] == quote:
return i
i += 1
return len(text) - 1
class MyWrapper(textwrap.TextWrapper):
"A textwrap.TextWrapper that doesn't break a line within quotes."
def _split(self, text):
parts = []
i = 0
start = 0
while i < len(text):
if text[i] in ['"', "'"]:
if text[i:i+3] == '"""':
end = text[i+3:].find('"""')
if end >= 0:
i += end + 2
else:
i = len(text) - 1
else:
i = find_quote_end(text, i)
elif text[i].isspace():
if start < i:
parts += [text[start:i]]
start = i
parts += [text[i]]
start = i + 1
i += 1
if start < len(text):
parts += [text[start:]]
return parts
def wrap_each_line(str, ii, si, blw):
"""Wrap lines to a defined width (<80 chars). Feed the lines single to
MyWrapper, so that it preserves the current line endings already in there.
We only want to insert new wraps, not remove existing newlines."""
wrapper = MyWrapper(77, initial_indent=ii,
subsequent_indent=si)
lines = str.splitlines()
for i in range(0,len(lines)):
if lines[i] != '':
lines[i] = wrapper.fill(lines[i])
return '\n'.join(lines)
def sh2str(string):
"un-escapes away /x sequences"
if string is None:
return None
return string.decode("string-escape")
def get_quote_style(str):
"""find which quote is the outer one, ' or "."""
quote_char = None
at = None
found = str.find("'")
found2 = str.find('"')
if found != found2:
if found >= 0 and found2 >= 0:
if found < found2:
found2 = -1
else:
found = -1
if found >= 0:
at = found + 1
quote_char = "'"
elif found2 >= 0:
at = found2 + 1
quote_char = '"'
return quote_char, at
def split_remove_empty(str, sep):
"do a split, then remove empty elements."
list = str.split(sep)
return filter(lambda item: item and len(item) > 0, list)
def str2py(str):
"returns the string enclosed in quotes, suitable for py scripts."
if str is None:
return "None"
if str.find("'") >= 0:
return '"' + str.encode("string-escape"
).replace("\\'", "'"
).replace('"', '\\"') + '"'
else:
return "'" + str.encode("string-escape") + "'"
return str