merge_authz_tests.py [plain text]
import shutil, sys, re, os
import time
import svntest
from svntest import wc
Item = wc.StateItem
XFail = svntest.testcase.XFail
Skip = svntest.testcase.Skip
SkipUnless = svntest.testcase.SkipUnless
from merge_tests import set_up_branch
from merge_tests import expected_merge_output
from svntest.main import is_ra_type_dav
from svntest.main import is_ra_type_svn
from svntest.main import SVN_PROP_MERGEINFO
from svntest.main import write_restrictive_svnserve_conf
from svntest.main import write_authz_file
from svntest.main import is_ra_type_dav
from svntest.main import is_ra_type_svn
from svntest.main import server_has_mergeinfo
from svntest.actions import fill_file_with_lines
from svntest.actions import make_conflict_marker_text
from svntest.actions import inject_conflict_into_expected_state
def mergeinfo_and_skipped_paths(sbox):
"skipped paths get overriding mergeinfo"
sbox.build()
wc_dir = sbox.wc_dir
wc_disk, wc_status = set_up_branch(sbox, False, 3)
write_restrictive_svnserve_conf(sbox.repo_dir)
write_authz_file(sbox, {"/" : svntest.main.wc_author +"=rw",
"/A/B/E" : svntest.main.wc_author + "=",
"/A_COPY_2/D/H/psi" : svntest.main.wc_author + "=",
"/A_COPY_2/D/G" : svntest.main.wc_author + "=",
"/A_COPY_3/B/E" : svntest.main.wc_author + "=",
})
wc_restricted = sbox.add_wc_path('restricted')
svntest.actions.run_and_verify_svn(None, None, [], 'checkout',
sbox.repo_url,
wc_restricted)
A_COPY_path = os.path.join(wc_restricted, "A_COPY")
A_COPY_2_path = os.path.join(wc_restricted, "A_COPY_2")
A_COPY_2_H_path = os.path.join(wc_restricted, "A_COPY_2", "D", "H")
A_COPY_3_path = os.path.join(wc_restricted, "A_COPY_3")
omega_path = os.path.join(wc_restricted, "A_COPY", "D", "H", "omega")
zeta_path = os.path.join(wc_dir, "A", "D", "H", "zeta")
os.remove(omega_path)
expected_output = wc.State(A_COPY_path, {
'D/G/rho' : Item(status='U '),
'D/H/psi' : Item(status='U '),
})
expected_status = wc.State(A_COPY_path, {
'' : Item(status=' M', wc_rev=8),
'D/H/chi' : Item(status=' ', wc_rev=8),
'D/H/psi' : Item(status='M ', wc_rev=8),
'D/H/omega' : Item(status='!M', wc_rev=8),
'D/H' : Item(status=' ', wc_rev=8),
'D/G/pi' : Item(status=' ', wc_rev=8),
'D/G/rho' : Item(status='M ', wc_rev=8),
'D/G/tau' : Item(status=' ', wc_rev=8),
'D/G' : Item(status=' ', wc_rev=8),
'D/gamma' : Item(status=' ', wc_rev=8),
'D' : Item(status=' ', wc_rev=8),
'B/lambda' : Item(status=' ', wc_rev=8),
'B/E' : Item(status=' M', wc_rev=8),
'B/E/alpha' : Item(status=' ', wc_rev=8),
'B/E/beta' : Item(status=' ', wc_rev=8),
'B/F' : Item(status=' ', wc_rev=8),
'B' : Item(status=' ', wc_rev=8),
'mu' : Item(status=' ', wc_rev=8),
'C' : Item(status=' ', wc_rev=8),
})
expected_disk = wc.State('', {
'' : Item(props={SVN_PROP_MERGEINFO : '/A:5-8'}),
'D/H/psi' : Item("New content"),
'D/H/chi' : Item("This is the file 'chi'.\n"),
'D/H' : Item(),
'D/G/pi' : Item("This is the file 'pi'.\n"),
'D/G/rho' : Item("New content"),
'D/G/tau' : Item("This is the file 'tau'.\n"),
'D/G' : Item(),
'D/gamma' : Item("This is the file 'gamma'.\n"),
'D' : Item(),
'B/lambda' : Item("This is the file 'lambda'.\n"),
'B/E' : Item(props={SVN_PROP_MERGEINFO : ''}),
'B/E/alpha' : Item("This is the file 'alpha'.\n"),
'B/E/beta' : Item("This is the file 'beta'.\n"),
'B/F' : Item(),
'B' : Item(),
'mu' : Item("This is the file 'mu'.\n"),
'C' : Item(),
})
expected_skip = wc.State(A_COPY_path, {
'B/E' : Item(),
'D/H/omega' : Item(),
})
saved_cwd = os.getcwd()
svntest.actions.run_and_verify_merge(A_COPY_path, '4', '8',
sbox.repo_url + \
'/A',
expected_output,
expected_disk,
expected_status,
expected_skip,
None, None, None, None,
None, 1)
svntest.actions.run_and_verify_svn(None, ['\n'], [],
'pg', SVN_PROP_MERGEINFO, omega_path)
expected_output = wc.State(A_COPY_2_path, {
'D/G' : Item(status=' ', treeconflict='C'),
'D/H/omega' : Item(status='U '),
})
expected_status = wc.State(A_COPY_2_path, {
'' : Item(status=' M', wc_rev=8),
'D/G' : Item(status='! ', treeconflict='C'),
'D/H/chi' : Item(status=' M', wc_rev=8),
'D/H/omega' : Item(status='MM', wc_rev=8),
'D/H' : Item(status=' M', wc_rev=8),
'D/gamma' : Item(status=' M', wc_rev=8),
'D' : Item(status=' M', wc_rev=8),
'B/lambda' : Item(status=' ', wc_rev=8),
'B/E' : Item(status=' M', wc_rev=8),
'B/E/alpha' : Item(status=' ', wc_rev=8),
'B/E/beta' : Item(status=' ', wc_rev=8),
'B/F' : Item(status=' ', wc_rev=8),
'B' : Item(status=' ', wc_rev=8),
'mu' : Item(status=' ', wc_rev=8),
'C' : Item(status=' ', wc_rev=8),
})
expected_disk = wc.State('', {
'' : Item(props={SVN_PROP_MERGEINFO : '/A:5-8'}),
'D/H/omega' : Item("New content",
props={SVN_PROP_MERGEINFO : '/A/D/H/omega:5-8'}),
'D/H/chi' : Item("This is the file 'chi'.\n",
props={SVN_PROP_MERGEINFO : '/A/D/H/chi:5-8'}),
'D/H' : Item(props={SVN_PROP_MERGEINFO : '/A/D/H:5-8*'}),
'D/gamma' : Item("This is the file 'gamma'.\n",
props={SVN_PROP_MERGEINFO : '/A/D/gamma:5-8'}),
'D' : Item(props={SVN_PROP_MERGEINFO : '/A/D:5-8*'}),
'B/lambda' : Item("This is the file 'lambda'.\n"),
'B/E' : Item(props={SVN_PROP_MERGEINFO : ''}),
'B/E/alpha' : Item("This is the file 'alpha'.\n"),
'B/E/beta' : Item("This is the file 'beta'.\n"),
'B/F' : Item(),
'B' : Item(),
'mu' : Item("This is the file 'mu'.\n"),
'C' : Item(),
})
expected_skip = wc.State(A_COPY_2_path, {
'B/E' : Item(),
'D/H/psi' : Item(),
})
saved_cwd = os.getcwd()
svntest.actions.run_and_verify_merge(A_COPY_2_path, '4', '8',
sbox.repo_url + \
'/A',
expected_output,
expected_disk,
expected_status,
expected_skip,
None, None, None, None,
None, 1, 0)
expected_output = wc.State(A_COPY_3_path, {
'D/G/rho' : Item(status='U '),
})
expected_status = wc.State(A_COPY_3_path, {
'' : Item(status=' M', wc_rev=8),
'D/H/chi' : Item(status=' ', wc_rev=8),
'D/H/omega' : Item(status=' ', wc_rev=8),
'D/H/psi' : Item(status=' ', wc_rev=8),
'D/H' : Item(status=' ', wc_rev=8),
'D/gamma' : Item(status=' ', wc_rev=8),
'D' : Item(status=' ', wc_rev=8),
'D/G' : Item(status=' ', wc_rev=8),
'D/G/pi' : Item(status=' ', wc_rev=8),
'D/G/rho' : Item(status='M ', wc_rev=8),
'D/G/tau' : Item(status=' ', wc_rev=8),
'B/lambda' : Item(status=' M', wc_rev=8),
'B/F' : Item(status=' M', wc_rev=8),
'B' : Item(status=' M', wc_rev=8),
'mu' : Item(status=' ', wc_rev=8),
'C' : Item(status=' ', wc_rev=8),
})
expected_disk = wc.State('', {
'' : Item(props={SVN_PROP_MERGEINFO : '/A:6-7'}),
'D/H/omega' : Item("This is the file 'omega'.\n"),
'D/H/chi' : Item("This is the file 'chi'.\n"),
'D/H/psi' : Item("This is the file 'psi'.\n"),
'D/H' : Item(),
'D/gamma' : Item("This is the file 'gamma'.\n"),
'D' : Item(),
'D/G' : Item(),
'D/G/pi' : Item("This is the file 'pi'.\n"),
'D/G/rho' : Item("New content"),
'D/G/tau' : Item("This is the file 'tau'.\n"),
'B/lambda' : Item("This is the file 'lambda'.\n",
props={SVN_PROP_MERGEINFO : '/A/B/lambda:6-7'}),
'B/F' : Item(props={SVN_PROP_MERGEINFO : '/A/B/F:6-7'}),
'B' : Item(props={SVN_PROP_MERGEINFO : '/A/B:6-7*'}),
'mu' : Item("This is the file 'mu'.\n"),
'C' : Item(),
})
expected_skip = wc.State(A_COPY_3_path, {'B/E' : Item()})
saved_cwd = os.getcwd()
svntest.actions.run_and_verify_merge(A_COPY_3_path, '5', '7',
sbox.repo_url + \
'/A',
expected_output,
expected_disk,
expected_status,
expected_skip,
None, None, None, None,
None, 1, 0)
svntest.actions.run_and_verify_svn(None, None, [], 'revert', '--recursive',
wc_restricted)
expected_output = wc.State(A_COPY_2_H_path, {
'omega' : Item(status='U '),
})
expected_status = wc.State(A_COPY_2_H_path, {
'' : Item(status=' M', wc_rev=8),
'chi' : Item(status=' M', wc_rev=8),
'omega' : Item(status='MM', wc_rev=8),
})
expected_disk = wc.State('', {
'' : Item(props={SVN_PROP_MERGEINFO : '/A/D/H:5*,8*'}),
'omega' : Item("New content",
props={SVN_PROP_MERGEINFO : '/A/D/H/omega:5,8'}),
'chi' : Item("This is the file 'chi'.\n",
props={SVN_PROP_MERGEINFO : '/A/D/H/chi:5,8'}),
})
expected_skip = wc.State(A_COPY_2_H_path, {
'psi' : Item(),
})
saved_cwd = os.getcwd()
svntest.actions.run_and_verify_merge(A_COPY_2_H_path, '4', '5',
sbox.repo_url + \
'/A/D/H',
expected_output,
expected_disk,
expected_status,
expected_skip,
None, None, None, None,
None, 1, 0, '-c5', '-c8')
svntest.actions.run_and_verify_svn(None, None, [], 'revert', '--recursive',
wc_restricted)
svntest.main.file_write(zeta_path, "This is the file 'zeta'.\n")
svntest.actions.run_and_verify_svn(None, None, [], 'add', zeta_path)
expected_output = wc.State(wc_dir, {'A/D/H/zeta' : Item(verb='Adding')})
wc_status.add({'A/D/H/zeta' : Item(status=' ', wc_rev=9)})
svntest.actions.run_and_verify_commit(wc_dir, expected_output,
wc_status, None, wc_dir)
expected_output = wc.State(A_COPY_2_H_path, {
'omega' : Item(status='U '),
'zeta' : Item(status='A '),
})
expected_status = wc.State(A_COPY_2_H_path, {
'' : Item(status=' M', wc_rev=8),
'chi' : Item(status=' M', wc_rev=8),
'omega' : Item(status='MM', wc_rev=8),
'zeta' : Item(status='A ', copied='+', wc_rev='-'),
})
expected_disk = wc.State('', {
'' : Item(props={SVN_PROP_MERGEINFO : '/A/D/H:8-9*'}),
'omega' : Item("New content",
props={SVN_PROP_MERGEINFO : '/A/D/H/omega:8-9'}),
'chi' : Item("This is the file 'chi'.\n",
props={SVN_PROP_MERGEINFO : '/A/D/H/chi:8-9'}),
'zeta' : Item("This is the file 'zeta'.\n",
props={SVN_PROP_MERGEINFO : '/A/D/H/zeta:8-9'}),
})
expected_skip = wc.State(A_COPY_2_H_path, {})
saved_cwd = os.getcwd()
svntest.actions.run_and_verify_merge(A_COPY_2_H_path, '7', '9',
sbox.repo_url + \
'/A/D/H',
expected_output,
expected_disk,
expected_status,
expected_skip,
None, None, None, None,
None, 1, 0)
def reintegrate_fails_if_no_root_access(sbox):
"reintegrate fails if no root access"
wc_dir = sbox.wc_dir
A_path = os.path.join(wc_dir, 'A')
A_COPY_path = os.path.join(wc_dir, 'A_COPY')
beta_COPY_path = os.path.join(wc_dir, 'A_COPY', 'B', 'E', 'beta')
rho_COPY_path = os.path.join(wc_dir, 'A_COPY', 'D', 'G', 'rho')
omega_COPY_path = os.path.join(wc_dir, 'A_COPY', 'D', 'H', 'omega')
psi_COPY_path = os.path.join(wc_dir, 'A_COPY', 'D', 'H', 'psi')
sbox.build()
wc_dir = sbox.wc_dir
expected_disk, expected_status = set_up_branch(sbox)
svntest.main.file_write(os.path.join(wc_dir, "A_COPY", "mu"),
"Changed on the branch.")
expected_output = wc.State(wc_dir, {'A_COPY/mu' : Item(verb='Sending')})
expected_status.tweak('A_COPY/mu', wc_rev=7)
svntest.actions.run_and_verify_commit(wc_dir, expected_output,
expected_status, None, wc_dir)
expected_disk.tweak('A_COPY/mu', contents='Changed on the branch.')
svntest.main.run_svn(None, 'up', wc_dir)
expected_output = expected_merge_output([[2,7]],
['U ' + beta_COPY_path + '\n',
'U ' + rho_COPY_path + '\n',
'U ' + omega_COPY_path + '\n',
'U ' + psi_COPY_path + '\n',
' U ' + A_COPY_path + '\n'])
svntest.actions.run_and_verify_svn(None, expected_output, [], 'merge',
sbox.repo_url + '/A', A_COPY_path)
svntest.main.run_svn(None, 'ci', '-m', 'synch A_COPY with A', wc_dir)
svntest.main.run_svn(None, 'up', wc_dir)
if is_ra_type_svn() or is_ra_type_dav():
write_restrictive_svnserve_conf(sbox.repo_dir)
write_authz_file(sbox, {"/" : "* =",
"/A" : "* = rw",
"/A_COPY" : "* = rw",
"/iota" : "* = rw"})
expected_output = wc.State(A_path, {
'mu' : Item(status='U '),
})
expected_disk = wc.State('', {
'' : Item(props={SVN_PROP_MERGEINFO : '/A_COPY:2-8'}),
'B' : Item(),
'B/lambda' : Item("This is the file 'lambda'.\n"),
'B/E' : Item(),
'B/E/alpha' : Item("This is the file 'alpha'.\n"),
'B/E/beta' : Item("New content"),
'B/F' : Item(),
'mu' : Item("Changed on the branch."),
'C' : Item(),
'D' : Item(),
'D/gamma' : Item("This is the file 'gamma'.\n"),
'D/G' : Item(),
'D/G/pi' : Item("This is the file 'pi'.\n"),
'D/G/rho' : Item("New content"),
'D/G/tau' : Item("This is the file 'tau'.\n"),
'D/H' : Item(),
'D/H/chi' : Item("This is the file 'chi'.\n"),
'D/H/omega' : Item("New content"),
'D/H/psi' : Item("New content"),
})
expected_status = wc.State(A_path, {
"B" : Item(status=' ', wc_rev=8),
"B/lambda" : Item(status=' ', wc_rev=8),
"B/E" : Item(status=' ', wc_rev=8),
"B/E/alpha" : Item(status=' ', wc_rev=8),
"B/E/beta" : Item(status=' ', wc_rev=8),
"B/F" : Item(status=' ', wc_rev=8),
"mu" : Item(status='M ', wc_rev=8),
"C" : Item(status=' ', wc_rev=8),
"D" : Item(status=' ', wc_rev=8),
"D/gamma" : Item(status=' ', wc_rev=8),
"D/G" : Item(status=' ', wc_rev=8),
"D/G/pi" : Item(status=' ', wc_rev=8),
"D/G/rho" : Item(status=' ', wc_rev=8),
"D/G/tau" : Item(status=' ', wc_rev=8),
"D/H" : Item(status=' ', wc_rev=8),
"D/H/chi" : Item(status=' ', wc_rev=8),
"D/H/omega" : Item(status=' ', wc_rev=8),
"D/H/psi" : Item(status=' ', wc_rev=8),
"" : Item(status=' M', wc_rev=8),
})
expected_skip = wc.State(A_path, {})
svntest.actions.run_and_verify_merge(A_path, None, None,
sbox.repo_url + '/A_COPY',
expected_output,
expected_disk,
expected_status,
expected_skip,
None, None, None, None,
None, True, True,
'--reintegrate')
def merge_fails_if_subtree_is_deleted_on_src(sbox):
"merge fails if subtree is deleted on src"
sbox.build()
wc_dir = sbox.wc_dir
if is_ra_type_svn() or is_ra_type_dav():
write_authz_file(sbox, {"/" : "* = rw",
"/unrelated" : ("* =\n" +
svntest.main.wc_author2 + " = rw")})
Acopy_path = os.path.join(wc_dir, 'A_copy')
gamma_path = os.path.join(wc_dir, 'A', 'D', 'gamma')
Acopy_gamma_path = os.path.join(wc_dir, 'A_copy', 'D', 'gamma')
Acopy_D_path = os.path.join(wc_dir, 'A_copy', 'D')
A_url = sbox.repo_url + '/A'
Acopy_url = sbox.repo_url + '/A_copy'
new_content = "line1\nline2\nline3\nline4\nline5\n"
svntest.main.file_write(gamma_path, new_content)
expected_output = wc.State(wc_dir, {
'A/D/gamma' : Item(verb='Sending'),
})
expected_status = svntest.actions.get_virginal_state(wc_dir, 1)
expected_status.tweak('A/D/gamma', wc_rev=2)
svntest.actions.run_and_verify_commit(wc_dir, expected_output,
expected_status, None, wc_dir)
svntest.actions.run_and_verify_svn(None, None, [], 'cp', A_url, Acopy_url,
'-m', 'create a new copy of A')
svntest.actions.run_and_verify_svn(None, None, [], 'up', wc_dir)
svntest.main.file_substitute(gamma_path, "line1", "this is line1")
expected_output = wc.State(wc_dir, {
'A/D/gamma' : Item(verb='Sending'),
})
expected_status.tweak(wc_rev=3)
expected_status.tweak('A/D/gamma', wc_rev=4)
expected_status.add({
'A_copy' : Item(status=' ', wc_rev=3),
'A_copy/B' : Item(status=' ', wc_rev=3),
'A_copy/B/lambda' : Item(status=' ', wc_rev=3),
'A_copy/B/E' : Item(status=' ', wc_rev=3),
'A_copy/B/E/alpha': Item(status=' ', wc_rev=3),
'A_copy/B/E/beta' : Item(status=' ', wc_rev=3),
'A_copy/B/F' : Item(status=' ', wc_rev=3),
'A_copy/mu' : Item(status=' ', wc_rev=3),
'A_copy/C' : Item(status=' ', wc_rev=3),
'A_copy/D' : Item(status=' ', wc_rev=3),
'A_copy/D/gamma' : Item(status=' ', wc_rev=3),
'A_copy/D/G' : Item(status=' ', wc_rev=3),
'A_copy/D/G/pi' : Item(status=' ', wc_rev=3),
'A_copy/D/G/rho' : Item(status=' ', wc_rev=3),
'A_copy/D/G/tau' : Item(status=' ', wc_rev=3),
'A_copy/D/H' : Item(status=' ', wc_rev=3),
'A_copy/D/H/chi' : Item(status=' ', wc_rev=3),
'A_copy/D/H/omega': Item(status=' ', wc_rev=3),
'A_copy/D/H/psi' : Item(status=' ', wc_rev=3),
})
svntest.actions.run_and_verify_commit(wc_dir, expected_output,
expected_status, None, wc_dir)
svntest.actions.run_and_verify_svn(None, None, [], 'delete', gamma_path)
expected_output = wc.State(wc_dir, {
'A/D/gamma' : Item(verb='Deleting'),
})
expected_status.remove('A/D/gamma')
svntest.actions.run_and_verify_commit(wc_dir,
expected_output,
expected_status,
None,
wc_dir, wc_dir)
svntest.actions.run_and_verify_svn(None, expected_merge_output([[3,4]],
'U ' + Acopy_gamma_path + '\n'),
[], 'merge', '-r1:4',
A_url + '/D/gamma' + '@4',
Acopy_gamma_path)
svntest.main.run_svn(None, 'mkdir', sbox.repo_url + '/unrelated',
'--username', svntest.main.wc_author2,
'-m', 'creating a rev with no paths.')
svntest.actions.run_and_verify_svn(None, expected_merge_output([[6], [3,6]],
['D ' + Acopy_gamma_path + '\n',
'C ' + Acopy_D_path + '\n']),
[], 'merge', '-r1:6', '--force',
A_url, Acopy_path)
test_list = [ None,
SkipUnless(Skip(mergeinfo_and_skipped_paths,
svntest.main.is_ra_type_file),
svntest.main.server_has_mergeinfo),
SkipUnless(Skip(reintegrate_fails_if_no_root_access,
svntest.main.is_ra_type_file),
svntest.main.server_has_mergeinfo),
SkipUnless(merge_fails_if_subtree_is_deleted_on_src,
server_has_mergeinfo),
]
if __name__ == '__main__':
svntest.main.run_tests(test_list, serial_only = True)