import sys, re, os, time, subprocess
import datetime
import svntest
from svntest import wc, actions
Skip = svntest.testcase.Skip_deco
SkipUnless = svntest.testcase.SkipUnless_deco
XFail = svntest.testcase.XFail_deco
Issues = svntest.testcase.Issues_deco
Issue = svntest.testcase.Issue_deco
Wimp = svntest.testcase.Wimp_deco
Item = wc.StateItem
def test_stderr(re_string, stderr):
exp_err_re = re.compile(re_string)
for line in stderr:
if exp_err_re.search(line):
return
if svntest.main.options.verbose:
for x in stderr:
sys.stdout.write(x)
print("Expected stderr reg-ex: '" + re_string + "'")
raise svntest.Failure("Checkout failed but not in the expected way")
def make_local_tree(sbox, mod_files=False, add_unversioned=False):
"""Make a local unversioned tree to checkout into."""
sbox.build(create_wc = False)
if os.path.exists(sbox.wc_dir):
svntest.main.safe_rmtree(sbox.wc_dir)
export_target = sbox.wc_dir
expected_output = svntest.main.greek_state.copy()
expected_output.wc_dir = sbox.wc_dir
expected_output.desc[""] = Item()
expected_output.tweak(contents=None, status="A ")
svntest.actions.run_and_verify_export(sbox.repo_url,
export_target,
expected_output,
svntest.main.greek_state.copy())
svntest.main.safe_rmtree(os.path.join(sbox.wc_dir, "A", "B"))
svntest.main.safe_rmtree(os.path.join(sbox.wc_dir, "A", "C"))
svntest.main.safe_rmtree(os.path.join(sbox.wc_dir, "A", "D"))
if mod_files:
iota_path = os.path.join(sbox.wc_dir, "iota")
mu_path = os.path.join(sbox.wc_dir, "A", "mu")
svntest.main.file_write(iota_path,
"This is the local version of the file 'iota'.\n")
svntest.main.file_write(mu_path,
"This is the local version of the file 'mu'.\n")
if add_unversioned:
sigma_path = os.path.join(sbox.wc_dir, "sigma")
svntest.main.file_append(sigma_path, "unversioned sigma")
upsilon_path = os.path.join(sbox.wc_dir, "A", "upsilon")
svntest.main.file_append(upsilon_path, "unversioned upsilon")
Z_path = os.path.join(sbox.wc_dir, "A", "Z")
os.mkdir(Z_path)
return wc.State(sbox.wc_dir, {
"A" : Item(status='E '), "A/B" : Item(status='A '),
"A/B/lambda" : Item(status='A '),
"A/B/E" : Item(status='A '),
"A/B/E/alpha" : Item(status='A '),
"A/B/E/beta" : Item(status='A '),
"A/B/F" : Item(status='A '),
"A/mu" : Item(status='E '), "A/C" : Item(status='A '),
"A/D" : Item(status='A '),
"A/D/gamma" : Item(status='A '),
"A/D/G" : Item(status='A '),
"A/D/G/pi" : Item(status='A '),
"A/D/G/rho" : Item(status='A '),
"A/D/G/tau" : Item(status='A '),
"A/D/H" : Item(status='A '),
"A/D/H/chi" : Item(status='A '),
"A/D/H/omega" : Item(status='A '),
"A/D/H/psi" : Item(status='A '),
"iota" : Item(status='E '), })
def checkout_with_obstructions(sbox):
"""co with obstructions conflicts without --force"""
make_local_tree(sbox, False, False)
wc_dir = sbox.wc_dir
url = sbox.repo_url
expected_output = svntest.wc.State(wc_dir, {
'iota' : Item(status=' ', treeconflict='C'),
'A' : Item(status=' ', treeconflict='C'),
'A/D' : Item(status=' ', treeconflict='A'),
'A/D/gamma' : Item(status=' ', treeconflict='A'),
'A/D/G' : Item(status=' ', treeconflict='A'),
'A/D/G/rho' : Item(status=' ', treeconflict='A'),
'A/D/G/pi' : Item(status=' ', treeconflict='A'),
'A/D/G/tau' : Item(status=' ', treeconflict='A'),
'A/D/H' : Item(status=' ', treeconflict='A'),
'A/D/H/chi' : Item(status=' ', treeconflict='A'),
'A/D/H/omega' : Item(status=' ', treeconflict='A'),
'A/D/H/psi' : Item(status=' ', treeconflict='A'),
'A/B' : Item(status=' ', treeconflict='A'),
'A/B/E' : Item(status=' ', treeconflict='A'),
'A/B/E/beta' : Item(status=' ', treeconflict='A'),
'A/B/E/alpha' : Item(status=' ', treeconflict='A'),
'A/B/F' : Item(status=' ', treeconflict='A'),
'A/B/lambda' : Item(status=' ', treeconflict='A'),
'A/C' : Item(status=' ', treeconflict='A'),
'A/mu' : Item(status=' ', treeconflict='A'),
})
expected_disk = svntest.main.greek_state.copy()
expected_disk.remove('A/B', 'A/B/E', 'A/B/E/beta', 'A/B/E/alpha', 'A/B/F',
'A/B/lambda', 'A/D', 'A/D/G', 'A/D/G/rho', 'A/D/G/pi', 'A/D/G/tau',
'A/D/H', 'A/D/H/psi', 'A/D/H/omega', 'A/D/H/chi', 'A/D/gamma', 'A/C')
actions.run_and_verify_checkout2(False, url, wc_dir, expected_output,
expected_disk, None, None, None, None)
expected_status = actions.get_virginal_state(wc_dir, 1)
expected_status.tweak('A', 'iota', status='D ', wc_rev=1,
treeconflict='C')
expected_status.tweak('A/D', 'A/D/G', 'A/D/G/rho', 'A/D/G/pi', 'A/D/G/tau',
'A/D/H', 'A/D/H/chi', 'A/D/H/omega', 'A/D/H/psi', 'A/D/gamma', 'A/B',
'A/B/E', 'A/B/E/beta', 'A/B/E/alpha', 'A/B/F', 'A/B/lambda', 'A/C',
status='D ')
expected_status.tweak('A/mu', status='D ')
actions.run_and_verify_unquiet_status(wc_dir, expected_status)
svntest.main.safe_rmtree( os.path.join(wc_dir, 'A') )
os.remove( os.path.join(wc_dir, 'iota') )
svntest.main.run_svn(None, 'revert', '-R', os.path.join(wc_dir, 'A'),
os.path.join(wc_dir, 'iota'))
expected_output = svntest.wc.State(wc_dir, {
})
expected_disk = svntest.main.greek_state.copy()
expected_status = actions.get_virginal_state(wc_dir, 1)
actions.run_and_verify_update(wc_dir, expected_output, expected_disk,
expected_status, None, None, None, None, None, False, wc_dir)
def forced_checkout_of_file_with_dir_obstructions(sbox):
"""forced co flags conflict if a dir obstructs a file"""
sbox.build()
url = sbox.repo_url
wc_dir_other = sbox.add_wc_path('other')
other_iota = os.path.join(wc_dir_other, 'iota')
os.makedirs(other_iota)
expected_output = svntest.wc.State(wc_dir_other, {
'A' : Item(status='A '),
'A/B' : Item(status='A '),
'A/B/E' : Item(status='A '),
'A/B/E/alpha' : Item(status='A '),
'A/B/E/beta' : Item(status='A '),
'A/B/F' : Item(status='A '),
'A/B/lambda' : Item(status='A '),
'A/D' : Item(status='A '),
'A/D/H' : Item(status='A '),
'A/D/H/chi' : Item(status='A '),
'A/D/H/omega' : Item(status='A '),
'A/D/H/psi' : Item(status='A '),
'A/D/G' : Item(status='A '),
'A/D/G/pi' : Item(status='A '),
'A/D/G/rho' : Item(status='A '),
'A/D/G/tau' : Item(status='A '),
'A/D/gamma' : Item(status='A '),
'A/C' : Item(status='A '),
'A/mu' : Item(status='A '),
'iota' : Item(status=' ', treeconflict='C'),
})
expected_disk = svntest.main.greek_state.copy()
expected_disk.tweak('iota', contents=None)
actions.run_and_verify_checkout(url, wc_dir_other, expected_output,
expected_disk, None, None, None, None, '--force')
def forced_checkout_of_dir_with_file_obstructions(sbox):
"""forced co flags conflict if a file obstructs a dir"""
make_local_tree(sbox, False, False)
url = sbox.repo_url
wc_dir_other = sbox.add_wc_path('other')
other_A = os.path.join(wc_dir_other, 'A')
os.makedirs(wc_dir_other)
svntest.main.file_write(other_A, 'The file A\n')
expected_output = svntest.wc.State(wc_dir_other, {
'iota' : Item(status='A '),
'A' : Item(status=' ', treeconflict='C'),
'A/mu' : Item(status=' ', treeconflict='A'),
'A/D' : Item(status=' ', treeconflict='A'),
'A/D/G' : Item(status=' ', treeconflict='A'),
'A/D/G/tau' : Item(status=' ', treeconflict='A'),
'A/D/G/pi' : Item(status=' ', treeconflict='A'),
'A/D/G/rho' : Item(status=' ', treeconflict='A'),
'A/D/H' : Item(status=' ', treeconflict='A'),
'A/D/H/psi' : Item(status=' ', treeconflict='A'),
'A/D/H/omega' : Item(status=' ', treeconflict='A'),
'A/D/H/chi' : Item(status=' ', treeconflict='A'),
'A/D/gamma' : Item(status=' ', treeconflict='A'),
'A/C' : Item(status=' ', treeconflict='A'),
'A/B' : Item(status=' ', treeconflict='A'),
'A/B/E' : Item(status=' ', treeconflict='A'),
'A/B/E/beta' : Item(status=' ', treeconflict='A'),
'A/B/E/alpha' : Item(status=' ', treeconflict='A'),
'A/B/F' : Item(status=' ', treeconflict='A'),
'A/B/lambda' : Item(status=' ', treeconflict='A'),
})
expected_disk = svntest.main.greek_state.copy()
expected_disk.remove('A/B', 'A/B/E', 'A/B/E/beta', 'A/B/E/alpha', 'A/B/F',
'A/B/lambda', 'A/D', 'A/D/G', 'A/D/G/rho', 'A/D/G/pi', 'A/D/G/tau',
'A/D/H', 'A/D/H/psi', 'A/D/H/omega', 'A/D/H/chi', 'A/D/gamma', 'A/mu',
'A/C')
expected_disk.tweak('A', contents='The file A\n')
actions.run_and_verify_checkout(url, wc_dir_other, expected_output,
expected_disk, None, None, None, None, '--force')
os.remove(other_A)
expected_output = svntest.wc.State(wc_dir_other, {
})
expected_disk = svntest.main.greek_state.copy()
expected_status = actions.get_virginal_state(wc_dir_other, 1)
svntest.main.run_svn(None, 'revert', '-R', os.path.join(wc_dir_other, 'A'))
actions.run_and_verify_update(wc_dir_other, expected_output, expected_disk,
expected_status, None, None, None, None, None, False, wc_dir_other)
def forced_checkout_with_faux_obstructions(sbox):
"""co with faux obstructions ok with --force"""
expected_output = make_local_tree(sbox, False, False)
expected_wc = svntest.main.greek_state.copy()
svntest.actions.run_and_verify_checkout(sbox.repo_url,
sbox.wc_dir, expected_output,
expected_wc, None, None, None,
None, '--force')
def forced_checkout_with_real_obstructions(sbox):
"""co with real obstructions ok with --force"""
expected_output = make_local_tree(sbox, True, False)
expected_wc = svntest.main.greek_state.copy()
expected_wc.tweak('A/mu',
contents="This is the local version of the file 'mu'.\n")
expected_wc.tweak('iota',
contents="This is the local version of the file 'iota'.\n")
svntest.actions.run_and_verify_checkout(sbox.repo_url,
sbox.wc_dir, expected_output,
expected_wc, None, None, None,
None, '--force')
def forced_checkout_with_real_obstructions_and_unversioned_files(sbox):
"""co with real obstructions and unversioned files"""
expected_output = make_local_tree(sbox, True, True)
expected_wc = svntest.main.greek_state.copy()
expected_wc.tweak('A/mu',
contents="This is the local version of the file 'mu'.\n")
expected_wc.tweak('iota',
contents="This is the local version of the file 'iota'.\n")
expected_wc.add({'sigma' : Item("unversioned sigma"),
'A/upsilon' : Item("unversioned upsilon"),
'A/Z' : Item(),
})
svntest.actions.run_and_verify_checkout(sbox.repo_url,
sbox.wc_dir, expected_output,
expected_wc, None, None, None,
None, '--force')
def forced_checkout_with_versioned_obstruction(sbox):
"""forced co with versioned obstruction"""
sbox.build(read_only = True)
repo_dir = sbox.repo_dir
repo_url = sbox.repo_url
other_repo_dir, other_repo_url = sbox.add_repo_path("other")
svntest.main.copy_repos(repo_dir, other_repo_dir, 1, 1)
fresh_wc_dir = sbox.add_wc_path('fresh')
fresh_wc_dir_A = os.path.join(fresh_wc_dir, 'A')
os.mkdir(fresh_wc_dir)
other_wc_dir = sbox.add_wc_path("other")
other_wc_dir_A = os.path.join(other_wc_dir, "A")
os.mkdir(other_wc_dir)
svntest.actions.run_and_verify_svn("Unexpected error during co",
svntest.verify.AnyOutput, [],
"co", repo_url + "/A",
fresh_wc_dir_A)
svntest.actions.run_and_verify_svn("Unexpected error during co",
svntest.verify.AnyOutput, [],
"co", other_repo_url + "/A",
other_wc_dir_A)
expected_output = svntest.wc.State(fresh_wc_dir, {
'iota' : Item(status='A '),
'A' : Item(verb='Skipped'),
})
expected_wc = svntest.main.greek_state.copy()
svntest.actions.run_and_verify_checkout(repo_url, fresh_wc_dir,
expected_output, expected_wc,
None, None, None, None,
'--force')
expected_output = svntest.wc.State(other_wc_dir, {
'iota' : Item(status='A '),
'A' : Item(verb='Skipped'),
})
expected_wc = svntest.main.greek_state.copy()
svntest.actions.run_and_verify_checkout(repo_url, other_wc_dir,
expected_output, expected_wc,
None, None, None, None,
'--force')
svntest.actions.run_and_verify_svn("empty status output", None,
[], "st", other_wc_dir_A)
exit_code, sout, serr = svntest.actions.run_and_verify_svn(
"it should still point to other_repo_url/A", None, [], "info",
other_wc_dir_A)
test_stderr("URL: " + other_repo_url + '/A$', sout)
exit_code, sout, serr = svntest.actions.run_and_verify_svn(
"it should still point to other_repo_url", None, [], "info",
other_wc_dir)
test_stderr("URL: " + sbox.repo_url + '$', sout)
def import_and_checkout(sbox):
"""import and checkout"""
sbox.build(read_only = True)
other_repo_dir, other_repo_url = sbox.add_repo_path("other")
import_from_dir = sbox.add_wc_path("other")
expected_output = svntest.main.greek_state.copy()
expected_output.wc_dir = import_from_dir
expected_output.desc[''] = Item()
expected_output.tweak(contents=None, status='A ')
svntest.actions.run_and_verify_export(sbox.repo_url,
import_from_dir,
expected_output,
svntest.main.greek_state.copy())
svntest.main.create_repos(other_repo_dir)
expected_output = svntest.wc.State(sbox.wc_dir, {})
svntest.actions.run_and_verify_svn(None, None, [], 'import',
'-m', 'import', import_from_dir,
other_repo_url)
expected_output = wc.State(import_from_dir, {
"A" : Item(status='E '),
"A/B" : Item(status='E '),
"A/B/lambda" : Item(status='E '),
"A/B/E" : Item(status='E '),
"A/B/E/alpha" : Item(status='E '),
"A/B/E/beta" : Item(status='E '),
"A/B/F" : Item(status='E '),
"A/mu" : Item(status='E '),
"A/C" : Item(status='E '),
"A/D" : Item(status='E '),
"A/D/gamma" : Item(status='E '),
"A/D/G" : Item(status='E '),
"A/D/G/pi" : Item(status='E '),
"A/D/G/rho" : Item(status='E '),
"A/D/G/tau" : Item(status='E '),
"A/D/H" : Item(status='E '),
"A/D/H/chi" : Item(status='E '),
"A/D/H/omega" : Item(status='E '),
"A/D/H/psi" : Item(status='E '),
"iota" : Item(status='E ')
})
expected_wc = svntest.main.greek_state.copy()
svntest.actions.run_and_verify_checkout(other_repo_url, import_from_dir,
expected_output, expected_wc,
None, None, None, None,
'--force')
@Issue(2529)
def checkout_broken_eol(sbox):
"checkout file with broken eol style"
svntest.actions.load_repo(sbox, os.path.join(os.path.dirname(sys.argv[0]),
'update_tests_data',
'checkout_broken_eol.dump'))
URL = sbox.repo_url
expected_output = svntest.wc.State(sbox.wc_dir, {
'file': Item(status='A '),
})
expected_wc = svntest.wc.State('', {
'file': Item(contents='line\nline2\n'),
})
svntest.actions.run_and_verify_checkout(URL,
sbox.wc_dir,
expected_output,
expected_wc)
def checkout_creates_intermediate_folders(sbox):
"checkout and create some intermediate folders"
sbox.build(create_wc = False, read_only = True)
checkout_target = os.path.join(sbox.wc_dir, 'a', 'b', 'c')
expected_output = svntest.main.greek_state.copy()
expected_output.wc_dir = checkout_target
expected_output.tweak(status='A ', contents=None)
expected_wc = svntest.main.greek_state
svntest.actions.run_and_verify_checkout(sbox.repo_url,
checkout_target,
expected_output,
expected_wc)
def checkout_peg_rev(sbox):
"checkout with peg revision"
sbox.build()
wc_dir = sbox.wc_dir
mu_path = os.path.join(wc_dir, 'A', 'mu')
svntest.main.file_append(mu_path, 'appended mu text')
svntest.actions.run_and_verify_svn(None, None, [],
'ci', '-m', 'changed file mu', wc_dir)
checkout_target = sbox.add_wc_path('checkout')
os.mkdir(checkout_target)
expected_output = svntest.main.greek_state.copy()
expected_output.wc_dir = checkout_target
expected_output.tweak(status='A ', contents=None)
expected_wc = svntest.main.greek_state.copy()
svntest.actions.run_and_verify_checkout(sbox.repo_url + '@1',
checkout_target,
expected_output,
expected_wc)
@Issue(2602)
def checkout_peg_rev_date(sbox):
"checkout with peg revision date"
sbox.build()
wc_dir = sbox.wc_dir
exit_code, output, errput = svntest.main.run_svn(None, 'propget', 'svn:date',
'--revprop', '-r1',
'--strict',
sbox.repo_url)
if exit_code or errput != [] or len(output) != 1:
raise svntest.Failure("svn:date propget failed")
r1_string = output[0]
date_pattern = re.compile(r'(\d+)-(\d+)-(\d+)T(\d\d):(\d\d):(\d\d)\.(\d+)Z$')
r1_time = datetime.datetime(*map(int, date_pattern.match(r1_string).groups()))
peg_time = r1_time + datetime.timedelta(microseconds=1)
assert r1_time != peg_time
peg_string = '%04d-%02d-%02dT%02d:%02d:%02d.%06dZ' % \
tuple(getattr(peg_time, x)
for x in ["year", "month", "day", "hour", "minute",
"second", "microsecond"])
mu_path = os.path.join(wc_dir, 'A', 'mu')
svntest.main.file_append(mu_path, 'appended mu text')
svntest.actions.run_and_verify_svn(None, None, [],
'ci', '-m', 'changed file mu', wc_dir)
checkout_target = sbox.add_wc_path('checkout')
os.mkdir(checkout_target)
expected_output = svntest.main.greek_state.copy()
expected_output.wc_dir = checkout_target
expected_output.tweak(status='A ', contents=None)
expected_wc = svntest.main.greek_state.copy()
svntest.actions.run_and_verify_checkout(sbox.repo_url +
'@{' + peg_string + '}',
checkout_target,
expected_output,
expected_wc)
checkout_target = sbox.add_wc_path('checkout2')
os.mkdir(checkout_target)
expected_output = svntest.main.greek_state.copy()
expected_output.wc_dir = checkout_target
expected_output.tweak(status='A ', contents=None)
expected_wc = svntest.main.greek_state.copy()
svntest.actions.run_and_verify_checkout(sbox.repo_url +
'@{' + r1_string + '}',
checkout_target,
expected_output,
expected_wc)
def co_with_obstructing_local_adds(sbox):
"co handles obstructing paths scheduled for add"
sbox.build()
wc_dir = sbox.wc_dir
wc_backup = sbox.add_wc_path('backup')
svntest.actions.duplicate_dir(wc_dir, wc_backup)
upsilon_path = os.path.join(wc_dir, 'A', 'B', 'upsilon')
svntest.main.file_append(upsilon_path, "This is the file 'upsilon'\n")
nu_path = os.path.join(wc_dir, 'A', 'C', 'nu')
svntest.main.file_append(nu_path, "This is the file 'nu'\n")
kappa_path = os.path.join(wc_dir, 'A', 'D', 'kappa')
svntest.main.file_append(kappa_path, "This is REPOS file 'kappa'\n")
I_path = os.path.join(wc_dir, 'A', 'D', 'H', 'I')
os.mkdir(I_path)
J_path = os.path.join(I_path, 'J')
os.mkdir(J_path)
K_path = os.path.join(I_path, 'K')
os.mkdir(K_path)
L_path = os.path.join(I_path, 'L')
os.mkdir(L_path)
xi_path = os.path.join(K_path, 'xi')
svntest.main.file_append(xi_path, "This is file 'xi'\n")
eta_path = os.path.join(K_path, 'eta')
svntest.main.file_append(eta_path, "This is REPOS file 'eta'\n")
svntest.main.run_svn(None, 'add', upsilon_path, nu_path,
kappa_path, I_path)
expected_output = wc.State(wc_dir, {
'A/B/upsilon' : Item(verb='Adding'),
'A/C/nu' : Item(verb='Adding'),
'A/D/kappa' : Item(verb='Adding'),
'A/D/H/I' : Item(verb='Adding'),
'A/D/H/I/J' : Item(verb='Adding'),
'A/D/H/I/K' : Item(verb='Adding'),
'A/D/H/I/K/xi' : Item(verb='Adding'),
'A/D/H/I/K/eta' : Item(verb='Adding'),
'A/D/H/I/L' : Item(verb='Adding'),
})
expected_status = svntest.actions.get_virginal_state(wc_dir, 1)
expected_status.add({
'A/B/upsilon' : Item(status=' ', wc_rev=2),
'A/C/nu' : Item(status=' ', wc_rev=2),
'A/D/kappa' : Item(status=' ', wc_rev=2),
'A/D/H/I' : Item(status=' ', wc_rev=2),
'A/D/H/I/J' : Item(status=' ', wc_rev=2),
'A/D/H/I/K' : Item(status=' ', wc_rev=2),
'A/D/H/I/K/xi' : Item(status=' ', wc_rev=2),
'A/D/H/I/K/eta' : Item(status=' ', wc_rev=2),
'A/D/H/I/L' : Item(status=' ', wc_rev=2),
})
svntest.actions.run_and_verify_commit(wc_dir, expected_output,
expected_status, None, wc_dir)
upsilon_backup_path = os.path.join(wc_backup, 'A', 'B', 'upsilon')
svntest.main.file_append(upsilon_backup_path,
"This is the file 'upsilon'\n")
kappa_backup_path = os.path.join(wc_backup, 'A', 'D', 'kappa')
svntest.main.file_append(kappa_backup_path,
"This is WC file 'kappa'\n")
I_backup_path = os.path.join(wc_backup, 'A', 'D', 'H', 'I')
os.mkdir(I_backup_path)
J_backup_path = os.path.join(I_backup_path, 'J')
os.mkdir(J_backup_path)
K_backup_path = os.path.join(I_backup_path, 'K')
os.mkdir(K_backup_path)
xi_backup_path = os.path.join(K_backup_path, 'xi')
svntest.main.file_append(xi_backup_path, "This is file 'xi'\n")
eta_backup_path = os.path.join(K_backup_path, 'eta')
svntest.main.file_append(eta_backup_path, "This is WC file 'eta'\n")
svntest.main.run_svn(None, 'add',
upsilon_backup_path,
kappa_backup_path,
I_backup_path)
expected_output = wc.State(wc_backup, {
'A/B/upsilon' : Item(status='E '),
'A/C/nu' : Item(status='A '),
'A/D/H/I' : Item(status='E '),
'A/D/H/I/J' : Item(status='E '),
'A/D/H/I/K' : Item(status='E '),
'A/D/H/I/K/xi' : Item(status='E '),
'A/D/H/I/K/eta' : Item(status='C '),
'A/D/H/I/L' : Item(status='A '),
'A/D/kappa' : Item(status='C '),
})
expected_disk = svntest.main.greek_state.copy()
expected_disk.add({
'A/B/upsilon' : Item("This is the file 'upsilon'\n"),
'A/C/nu' : Item("This is the file 'nu'\n"),
'A/D/H/I' : Item(),
'A/D/H/I/J' : Item(),
'A/D/H/I/K' : Item(),
'A/D/H/I/K/xi' : Item("This is file 'xi'\n"),
'A/D/H/I/K/eta' : Item("\n".join(["<<<<<<< .mine",
"This is WC file 'eta'",
"=======",
"This is REPOS file 'eta'",
">>>>>>> .r2",
""])),
'A/D/H/I/L' : Item(),
'A/D/kappa' : Item("\n".join(["<<<<<<< .mine",
"This is WC file 'kappa'",
"=======",
"This is REPOS file 'kappa'",
">>>>>>> .r2",
""])),
})
expected_status = svntest.actions.get_virginal_state(wc_backup, 2)
expected_status.add({
'A/B/upsilon' : Item(status=' ', wc_rev=2),
'A/C/nu' : Item(status=' ', wc_rev=2),
'A/D/H/I' : Item(status=' ', wc_rev=2),
'A/D/H/I/J' : Item(status=' ', wc_rev=2),
'A/D/H/I/K' : Item(status=' ', wc_rev=2),
'A/D/H/I/K/xi' : Item(status=' ', wc_rev=2),
'A/D/H/I/K/eta' : Item(status='C ', wc_rev=2),
'A/D/H/I/L' : Item(status=' ', wc_rev=2),
'A/D/kappa' : Item(status='C ', wc_rev=2),
})
extra_files = ['eta\.r0', 'eta\.r2', 'eta\.mine',
'kappa\.r0', 'kappa\.r2', 'kappa\.mine']
svntest.actions.run_and_verify_checkout(sbox.repo_url, wc_backup,
expected_output, expected_disk,
svntest.tree.detect_conflict_files,
extra_files, None, None,
'--force')
svntest.actions.run_and_verify_status(wc_backup, expected_status)
G_URL = sbox.repo_url + '/A/D/G'
M_URL = sbox.repo_url + '/A/D/M'
svntest.actions.run_and_verify_svn("Copy error:", None, [],
'cp', G_URL, M_URL, '-m', '')
D_path = os.path.join(wc_dir, 'A', 'D')
H_path = os.path.join(wc_dir, 'A', 'D', 'H')
M_path = os.path.join(wc_dir, 'A', 'D', 'M')
svntest.actions.run_and_verify_svn("Copy error:", None, [],
'cp', H_path, M_path)
omega_URL = sbox.repo_url + '/A/B/E/alpha'
omicron_URL = sbox.repo_url + '/A/B/F/omicron'
svntest.actions.run_and_verify_svn("Copy error:", None, [],
'cp', omega_URL, omicron_URL,
'-m', '')
F_path = os.path.join(wc_dir, 'A', 'B', 'F')
omicron_path = os.path.join(wc_dir, 'A', 'B', 'F', 'omicron')
chi_path = os.path.join(wc_dir, 'A', 'D', 'H', 'chi')
svntest.actions.run_and_verify_svn("Copy error:", None, [],
'cp', chi_path,
omicron_path)
expected_status = svntest.actions.get_virginal_state(wc_dir, 1)
expected_status.add({
'A/B/F/omicron' : Item(status='A ', copied='+', wc_rev='-'),
'A/B/upsilon' : Item(status=' ', wc_rev=2),
'A/C/nu' : Item(status=' ', wc_rev=2),
'A/D/kappa' : Item(status=' ', wc_rev=2),
'A/D/H/I' : Item(status=' ', wc_rev=2),
'A/D/H/I/J' : Item(status=' ', wc_rev=2),
'A/D/H/I/K' : Item(status=' ', wc_rev=2),
'A/D/H/I/K/xi' : Item(status=' ', wc_rev=2),
'A/D/H/I/K/eta' : Item(status=' ', wc_rev=2),
'A/D/H/I/L' : Item(status=' ', wc_rev=2),
'A/D/M' : Item(status='A ', copied='+', wc_rev='-'),
'A/D/M/psi' : Item(status=' ', copied='+', wc_rev='-'),
'A/D/M/chi' : Item(status=' ', copied='+', wc_rev='-'),
'A/D/M/omega' : Item(status=' ', copied='+', wc_rev='-'),
'A/D/M/I' : Item(status='A ', copied='+', wc_rev='-',
entry_status=' '), 'A/D/M/I/J' : Item(status=' ', copied='+', wc_rev='-'),
'A/D/M/I/K' : Item(status=' ', copied='+', wc_rev='-'),
'A/D/M/I/K/xi' : Item(status=' ', copied='+', wc_rev='-'),
'A/D/M/I/K/eta' : Item(status=' ', copied='+', wc_rev='-'),
'A/D/M/I/L' : Item(status=' ', copied='+', wc_rev='-'),
})
svntest.actions.run_and_verify_status(wc_dir, expected_status)
expected_output = wc.State(wc_dir, {
'A/D/M' : Item(status=' ', treeconflict='C'),
'A/D/M/rho' : Item(status=' ', treeconflict='A'),
'A/D/M/pi' : Item(status=' ', treeconflict='A'),
'A/D/M/tau' : Item(status=' ', treeconflict='A'),
})
expected_disk = wc.State('', {
'gamma' : Item("This is the file 'gamma'.\n"),
'G/pi' : Item("This is the file 'pi'.\n"),
'G/rho' : Item("This is the file 'rho'.\n"),
'G/tau' : Item("This is the file 'tau'.\n"),
'H/I' : Item(),
'H/I/J' : Item(),
'H/I/K' : Item(),
'H/I/K/xi' : Item("This is file 'xi'\n"),
'H/I/K/eta' : Item("This is REPOS file 'eta'\n"),
'H/I/L' : Item(),
'H/chi' : Item("This is the file 'chi'.\n"),
'H/psi' : Item("This is the file 'psi'.\n"),
'H/omega' : Item("This is the file 'omega'.\n"),
'M/I' : Item(),
'M/I/J' : Item(),
'M/I/K' : Item(),
'M/I/K/xi' : Item("This is file 'xi'\n"),
'M/I/K/eta' : Item("This is REPOS file 'eta'\n"),
'M/I/L' : Item(),
'M/chi' : Item("This is the file 'chi'.\n"),
'M/psi' : Item("This is the file 'psi'.\n"),
'M/omega' : Item("This is the file 'omega'.\n"),
'kappa' : Item("This is REPOS file 'kappa'\n"),
})
svntest.actions.run_and_verify_checkout(sbox.repo_url + '/A/D',
D_path,
expected_output,
expected_disk,
None, None, None, None,
'--force')
expected_status.tweak('A/D/M', treeconflict='C', status='R ')
expected_status.tweak(
'A/D',
'A/D/G',
'A/D/G/pi',
'A/D/G/rho',
'A/D/G/tau',
'A/D/gamma',
'A/D/kappa',
'A/D/H',
'A/D/H/I',
'A/D/H/I/J',
'A/D/H/I/K',
'A/D/H/I/K/xi',
'A/D/H/I/K/eta',
'A/D/H/I/L', wc_rev=4)
expected_status.add({
'A/D/H/chi' : Item(status=' ', wc_rev=4),
'A/D/H/psi' : Item(status=' ', wc_rev=4),
'A/D/H/omega' : Item(status=' ', wc_rev=4),
'A/D/M/pi' : Item(status='D ', wc_rev=4),
'A/D/M/rho' : Item(status='D ', wc_rev=4),
'A/D/M/tau' : Item(status='D ', wc_rev=4),
})
svntest.actions.run_and_verify_status(wc_dir, expected_status)
expected_output = wc.State(wc_dir, {
'A/B/F/omicron' : Item(status=' ', treeconflict='C'),
})
expected_disk = wc.State('', {
'omicron' : Item("This is the file 'chi'.\n"),
})
svntest.actions.run_and_verify_checkout(sbox.repo_url + '/A/B/F',
F_path,
expected_output,
expected_disk,
None, None, None, None,
'--force')
expected_status.tweak('A/B/F/omicron', treeconflict='C', status='R ')
expected_status.add({
'A/B/F' : Item(status=' ', wc_rev=4),
})
svntest.actions.run_and_verify_status(wc_dir, expected_status)
def checkout_wc_from_drive(sbox):
"checkout from the root of a Windows drive"
def find_the_next_available_drive_letter():
"find the first available drive"
try:
import win32api
drives=win32api.GetLogicalDriveStrings()
drives=drives.split('\000')
for d in range(ord('G'), ord('Z')+1):
drive = chr(d)
if not drive + ':\\' in drives:
return drive
except ImportError:
for d in range(ord('G'), ord('Z')+1):
drive = chr(d)
if not os.path.isdir(drive + ':\\'):
return drive
return None
if not svntest.main.windows:
raise svntest.Skip
sbox.build(create_wc = False)
svntest.main.safe_rmtree(sbox.wc_dir)
os.mkdir(sbox.wc_dir)
drive = find_the_next_available_drive_letter()
if drive is None:
raise svntest.Skip
subprocess.call(['subst', drive +':', sbox.repo_dir])
repo_url = 'file:///' + drive + ':/'
wc_dir = sbox.wc_dir
was_cwd = os.getcwd()
try:
expected_wc = svntest.main.greek_state.copy()
expected_output = wc.State(wc_dir, {
'A' : Item(status='A '),
'A/D' : Item(status='A '),
'A/D/H' : Item(status='A '),
'A/D/H/psi' : Item(status='A '),
'A/D/H/chi' : Item(status='A '),
'A/D/H/omega' : Item(status='A '),
'A/D/G' : Item(status='A '),
'A/D/G/tau' : Item(status='A '),
'A/D/G/pi' : Item(status='A '),
'A/D/G/rho' : Item(status='A '),
'A/D/gamma' : Item(status='A '),
'A/C' : Item(status='A '),
'A/mu' : Item(status='A '),
'A/B' : Item(status='A '),
'A/B/E' : Item(status='A '),
'A/B/E/alpha' : Item(status='A '),
'A/B/E/beta' : Item(status='A '),
'A/B/F' : Item(status='A '),
'A/B/lambda' : Item(status='A '),
'iota' : Item(status='A '),
})
svntest.actions.run_and_verify_checkout(repo_url, wc_dir,
expected_output, expected_wc,
None, None, None, None)
wc2_dir = sbox.add_wc_path('2')
expected_output = wc.State(wc2_dir, {
'D' : Item(status='A '),
'D/H' : Item(status='A '),
'D/H/psi' : Item(status='A '),
'D/H/chi' : Item(status='A '),
'D/H/omega' : Item(status='A '),
'D/G' : Item(status='A '),
'D/G/tau' : Item(status='A '),
'D/G/pi' : Item(status='A '),
'D/G/rho' : Item(status='A '),
'D/gamma' : Item(status='A '),
'C' : Item(status='A '),
'mu' : Item(status='A '),
'B' : Item(status='A '),
'B/E' : Item(status='A '),
'B/E/alpha' : Item(status='A '),
'B/E/beta' : Item(status='A '),
'B/F' : Item(status='A '),
'B/lambda' : Item(status='A '),
})
expected_wc = wc.State('', {
'C' : Item(),
'B/E/beta' : Item(contents="This is the file 'beta'.\n"),
'B/E/alpha' : Item(contents="This is the file 'alpha'.\n"),
'B/lambda' : Item(contents="This is the file 'lambda'.\n"),
'B/F' : Item(),
'D/H/omega' : Item(contents="This is the file 'omega'.\n"),
'D/H/psi' : Item(contents="This is the file 'psi'.\n"),
'D/H/chi' : Item(contents="This is the file 'chi'.\n"),
'D/G/rho' : Item(contents="This is the file 'rho'.\n"),
'D/G/tau' : Item(contents="This is the file 'tau'.\n"),
'D/G/pi' : Item(contents="This is the file 'pi'.\n"),
'D/gamma' : Item(contents="This is the file 'gamma'.\n"),
'mu' : Item(contents="This is the file 'mu'.\n"),
})
svntest.actions.run_and_verify_checkout(repo_url + '/A', wc2_dir,
expected_output, expected_wc,
None, None, None, None)
wc3_dir = sbox.add_wc_path('3')
expected_output = wc.State(wc3_dir, {
'H' : Item(status='A '),
'H/psi' : Item(status='A '),
'H/chi' : Item(status='A '),
'H/omega' : Item(status='A '),
'G' : Item(status='A '),
'G/tau' : Item(status='A '),
'G/pi' : Item(status='A '),
'G/rho' : Item(status='A '),
'gamma' : Item(status='A '),
})
expected_wc = wc.State('', {
'H/chi' : Item(contents="This is the file 'chi'.\n"),
'H/psi' : Item(contents="This is the file 'psi'.\n"),
'H/omega' : Item(contents="This is the file 'omega'.\n"),
'G/pi' : Item(contents="This is the file 'pi'.\n"),
'G/tau' : Item(contents="This is the file 'tau'.\n"),
'G/rho' : Item(contents="This is the file 'rho'.\n"),
'gamma' : Item(contents="This is the file 'gamma'.\n"),
})
svntest.actions.run_and_verify_checkout(repo_url + '/A/D', wc3_dir,
expected_output, expected_wc,
None, None, None, None)
finally:
os.chdir(was_cwd)
subprocess.call(['subst', '/D', drive +':'])
test_list = [ None,
checkout_with_obstructions,
forced_checkout_of_file_with_dir_obstructions,
forced_checkout_of_dir_with_file_obstructions,
forced_checkout_with_faux_obstructions,
forced_checkout_with_real_obstructions,
forced_checkout_with_real_obstructions_and_unversioned_files,
forced_checkout_with_versioned_obstruction,
import_and_checkout,
checkout_broken_eol,
checkout_creates_intermediate_folders,
checkout_peg_rev,
checkout_peg_rev_date,
co_with_obstructing_local_adds,
checkout_wc_from_drive
]
if __name__ == "__main__":
svntest.main.run_tests(test_list)