"""FTP tests.
Maintainer: slyphon (Jonathan D. Simms)
"""
from __future__ import nested_scopes
import os, sys, types, re
import os.path as osp
from StringIO import StringIO
from twisted import internet
from twisted.trial import unittest
from twisted.protocols import basic, loopback, policies
from twisted.internet import reactor, protocol, defer, interfaces
from twisted.cred import error, portal, checkers, credentials
from twisted.python import log, components, failure
sys.path.append(osp.dirname(os.getcwd()))
from ftp_refactor import ftp, ftpdav
bogusfiles = [
{'type': 'f',
'name': 'foo.txt',
'size': 1586,
'date': '20030102125902',
'listrep': '-rwxr-xr-x 1 anonymous anonymous 1586 Jan 02 12:59 foo.txt\r\n'
},
{'type': 'f',
'name': 'bar.tar.gz',
'size': 4872,
'date': '2003 11 22 01 55 22',
'listrep': '-rwxr-xr-x 1 anonymous anonymous 4872 Nov 22 01:55 bar.tar.gz\r\n'
} ]
class BogusAvatar(object):
__implements__ = (ftp.IShell,)
filesize = None
bogusSize = 42
bogusMdtm = 'YYMMDDHHMMSS'
bogusPwd = '/alt/fun/your_ass'
def __init__(self):
self.user = None self.uid = None self.gid = None self.clientwd = None
self.tld = None
self.debug = True
def pwd(self):
return self.bogusPwd
def cwd(self, path):
pass
def cdup(self):
self.cdupCalled = True
def dele(self, path):
self.delePath = path
def mkd(self, path):
self.mkdPath = path
def rmd(self, path):
self.rmdPath = path
def retr(self, path=None):
log.debug('BogusAvatar.retr')
if path == 'ASCII':
text = """this is a line with a dos terminator\r\n
this is a line with a unix terminator\n"""
else:
self.path = path
if self.filesize is not None: size = self.filesize
else:
size = bogusfiles[0]['size']
endln = ['\r','\n']
text = []
for n in xrange(size/26):
line = [chr(x) for x in xrange(97,123)]
line.extend(endln)
text.extend(line)
del text[(size - 2):]
text.extend(endln)
sio = NonClosingStringIO(''.join(text))
self.finalFileSize = len(sio.getvalue())
log.msg("BogusAvatar.retr: file size = %d" % self.finalFileSize)
sio.seek(0)
self.sentfile = sio
return (sio, self.finalFileSize)
def stor(self, params):
pass
def list(self, path):
sio = NonClosingStringIO()
for f in bogusfiles:
sio.write(f['listrep'])
sio.seek(0)
self.sentlist = sio
return (sio, len(sio.getvalue()))
def mdtm(self, path):
self.mdtmPath = path
return self.bogusMdtm
def size(self, path):
self.sizePath = path
return self.bogusSize
def nlist(self, path):
pass
class MyVirtualFTP(ftp.FTP):
def connectionMade(self):
ftp.FTP.connectionMade(self)
def loadAvatar(self):
self.shell = BogusAvatar()
class SimpleProtocol(object, basic.LineReceiver, policies.TimeoutMixin):
connLost = None
maxNumLines = None
def __init__(self, timeout=None):
self.conn = defer.Deferred()
self.lineRx = defer.Deferred()
self.connLostCB = defer.Deferred()
self.rawData = []
self.lines = []
self.setTimeout(timeout)
numLines = property(lambda self: len(self.lines))
lastline = property(lambda self: self.lines[-1])
def connectionMade(self):
self.resetTimeout()
d, self.conn = self.conn, defer.Deferred()
d.callback(None)
def lineReceived(self, line):
self.resetTimeout()
self.lines.append(line)
if self.maxNumLines is not None and self.numLines >= self.maxNumLines:
d, self.lineRx = self.lineRx, defer.Deferred()
d.callback(None)
def connectionLost(self, reason):
self.connLost = 1
d, self.connLostCB = self.connLostCB, defer.Deferred()
d.callback(None)
def loseConnection(self):
self.setTimeout(None)
if self.transport:
self.transport.loseConnection()
def timeoutConnection(self):
print 'Connection Timed Out!!!'
policies.TimeoutMixin.timeoutConnection(self)
def rawDataReceived(self, data):
self.rawData.append(data)
class TestRealm(ftpdav.Realm):
def requestAvatar(self, avatarID, mind, *interfaces):
a = BogusAvatar()
a.tld = os.getcwd()
return ftp.IShell, a, None
class VeryInsecureChecker:
__implements__ = (checkers.ICredentialsChecker,)
credentialInterfaces = [credentials.IUsernamePassword]
def requestAvatarId(self, credentials):
return credentials.username
class SlightlyLessInsecureChecker(VeryInsecureChecker):
def requestAvatarId(self, credentials):
if (credentials.username == 'studebaker_hawk' and
credentials.password == 'ethel_a_tree'):
return credentials.username
else:
raise error.UnauthorizedLogin()
def resp(code, arg=None):
return ftp.RESPONSE[code]
def _eb(r):
log.err(r)
class SimpleTestBase(unittest.TestCase):
loopbackFunc = loopback.loopback
def setUp(self):
self.testpath = '/a/file/path/to/delete'
self.loadTestRealm = False
self.upChecker = VeryInsecureChecker()
self.s, self.c = self.createServerAndClient(timeout=2)
def tearDown(self):
[n.cancel() for n in reactor.getDelayedCalls()]
del self.s
del self.c
def createServerAndClient(self, timeout=None):
factory = ftp.Factory()
factory.maxProtocolInstances = None
protocol = MyVirtualFTP()
protocol.factory = factory
if not self.loadTestRealm:
realm = ftpdav.Realm(os.getcwd())
else:
realm = TestRealm()
p = portal.Portal(realm)
p.registerChecker(checkers.AllowAnonymousAccess(), credentials.IAnonymous)
p.registerChecker(self.upChecker, credentials.IUsernamePassword)
protocol.portal = p
client = SimpleProtocol(timeout)
client.factory = internet.protocol.Factory()
return (protocol, client)
class SimpleTests(SimpleTestBase):
user = 'studebaker_hawk'
def test_Loopback(self):
self.s, self.c = self.createServerAndClient()
self.c.maxNumLines = 2
self.c.lineRx.addCallback(lambda _: self.c.loseConnection()).addErrback(_eb)
loopback.loopback(self.s, self.c)
self.failUnless(len(self.c.lines) == 2)
def runRespTest(self, cmd, resp):
self.run_reply_test(lambda _:self.c.sendLine(cmd), 3)
self.failUnlessEqual(self.c.lastline, resp)
def run_reply_test(self, sendALine, numLines):
self.c.conn.addCallback(sendALine
).addErrback(_eb)
self.c.lineRx.addCallback(lambda _:self.c.loseConnection()).addErrback(_eb)
self.c.maxNumLines = numLines
self.loopbackFunc.im_func(self.s, self.c)
def test_Greeting(self):
self.s, self.c = self.createServerAndClient()
self.c.maxNumLines = 2
self.c.lineRx.addCallback(lambda _: self.c.loseConnection()).addErrback(_eb)
loopback.loopback(self.s, self.c)
self.failUnlessEqual(self.c.lines, resp(ftp.WELCOME_MSG).split('\r\n'))
def test_USER_no_params(self):
self.runRespTest('USER', resp(ftp.SYNTAX_ERR) % 'USER with no parameters')
def test_USER_anon_login(self):
self.runRespTest('USER anonymous', resp(ftp.GUEST_NAME_OK_NEED_EMAIL))
def test_USER_user_login(self):
self.runRespTest('USER %s' % self.user, resp(ftp.USR_NAME_OK_NEED_PASS) % self.user)
def test_PASS_anon(self):
self.s.user = ftp.Factory.userAnonymous
self.runRespTest('PASS %s' % 'nooone@example.net', resp(ftp.GUEST_LOGGED_IN_PROCEED))
def test_PASS_user(self):
self.loadTestRealm = True
self.tearDown();
self.s, self.c = self.createServerAndClient(timeout=2)
self.failUnless(isinstance(self.s.portal.realm, TestRealm), 'self.s.portal.realm is not TestRealm!')
self.s.user = self.user
self.runRespTest('PASS %s' % 'apassword', resp(ftp.USR_LOGGED_IN_PROCEED))
def test_PASS_user_fail(self):
self.loadTestRealm = True
self.upChecker = SlightlyLessInsecureChecker()
self.tearDown()
self.s, self.c = self.createServerAndClient(timeout=2)
self.failUnless(isinstance(self.s.portal.realm, TestRealm), 'self.s.portal.realm is not TestRealm!')
self.s.user = self.user
self.runRespTest('PASS %s' % 'apassword', resp(ftp.AUTH_FAILURE) % '')
def test_TYPE(self):
okParams = ['I', 'A', 'L', 'i', 'a', 'l']
for arg in okParams:
self.s.shell = True
self.runRespTest('TYPE %s' % arg, resp(ftp.TYPE_SET_OK) % arg.upper())
self.tearDown(); self.setUp()
for arg in [chr(x) for x in xrange(97,123)]:
if arg not in okParams:
self.s.shell = True
self.runRespTest('TYPE %s' % arg, resp(ftp.SYNTAX_ERR_IN_ARGS) % arg.upper())
self.tearDown(); self.setUp()
else:
continue
def test_SYST(self):
self.s.shell = True
self.runRespTest('SYST', resp(ftp.NAME_SYS_TYPE))
def test_SIZE(self):
self.s.loadAvatar()
self.runRespTest('SIZE %s' % self.testpath, resp(ftp.FILE_STATUS) % self.s.shell.bogusSize)
self.failUnlessEqual(self.testpath, self.s.shell.sizePath)
def test_MDTM(self):
self.s.loadAvatar()
self.runRespTest('MDTM %s' % self.testpath, resp(ftp.FILE_STATUS) % self.s.shell.bogusMdtm)
self.failUnlessEqual(self.testpath, self.s.shell.mdtmPath)
def test_PWD(self):
self.s.loadAvatar()
self.runRespTest('PWD', resp(ftp.PWD_REPLY) % self.s.shell.bogusPwd)
def test_PASV(self):
self.s.loadAvatar()
self.run_reply_test(lambda _:self.c.sendLine('PASV'), 3)
self.assert_(re.search(r'227 =.*,[0-2]?[0-9]?[0-9],[0-2]?[0-9]?[0-9]',self.c.lastline))
def test_PORT(self):
self.s.loadAvatar()
self.runRespTest('PORT' , resp(ftp.CMD_NOT_IMPLMNTD) % 'PORT')
def test_CDUP(self):
self.s.loadAvatar()
self.runRespTest('CDUP', resp(ftp.REQ_FILE_ACTN_COMPLETED_OK))
self.failUnless(hasattr(self.s.shell, 'cdupCalled'))
def test_STRU(self):
self.s.loadAvatar()
self.runRespTest('STRU F', resp(ftp.CMD_OK))
self.tearDown(); self.setUp()
self.s.loadAvatar()
arg = 'Q'
self.runRespTest('STRU %s' % arg, resp(ftp.CMD_NOT_IMPLMNTD_FOR_ARG) % arg)
def test_MODE(self):
self.s.loadAvatar()
self.runRespTest('MODE S', resp(ftp.CMD_OK))
self.tearDown(); self.setUp()
self.s.loadAvatar()
arg = 'Q'
self.runRespTest('MODE %s' % arg, resp(ftp.CMD_NOT_IMPLMNTD_FOR_ARG) % arg)
def test_DELE(self):
self.s.loadAvatar()
self.runRespTest('DELE %s' % self.testpath, resp(ftp.REQ_FILE_ACTN_COMPLETED_OK))
self.failUnless(self.s.shell.delePath == self.testpath)
def test_MKD(self):
self.s.loadAvatar()
self.runRespTest('MKD %s' % self.testpath, resp(ftp.REQ_FILE_ACTN_COMPLETED_OK))
self.failUnless(self.s.shell.mkdPath == self.testpath)
def test_RMD(self):
self.s.loadAvatar()
self.runRespTest('RMD %s' % self.testpath, resp(ftp.REQ_FILE_ACTN_COMPLETED_OK))
self.failUnless(self.s.shell.rmdPath == self.testpath)
def test_STOU(self):
cmd = 'STOU'
self.s.loadAvatar()
self.runRespTest(cmd, resp(ftp.CMD_NOT_IMPLMNTD) % cmd)
def test_QUIT(self):
self.s.loadAvatar()
self.run_reply_test(lambda _:self.c.sendLine('QUIT'), 3)
self.failUnless(self.c.connLost)
from twisted.protocols.loopback import LoopbackRelay
from twisted.internet import main
class LoopbackPump(object):
def __init__(self, s, c):
from cStringIO import StringIO
self.slog, self.clog = StringIO(), StringIO()
self.s, self.c = s, c
self.serverToClient = LoopbackRelay(self.c, self.slog)
self.clientToServer = LoopbackRelay(self.s, self.clog)
self.s.makeConnection(self.serverToClient)
self.c.makeConnection(self.clientToServer)
def pump(self):
stc, cts = self.serverToClient, self.clientToServer
while len(stc.buffer) > 0 or len(cts.buffer) > 0:
reactor.iterate(0.01)
stc.clearBuffer()
cts.clearBuffer()
def cleanup(self):
self.c.connectionLost(failure.Failure(main.CONNECTION_DONE))
self.s.connectionLost(failure.Failure(main.CONNECTION_DONE))
reactor.iterate()
class DtpRelatedTests(unittest.TestCase):
skip = 'this is broken for now'
loopbackFunc = loopback.loopback
def setUp(self):
self.testpath = '/a/file/path/to/delete'
self.loadTestRealm = False
self.upChecker = VeryInsecureChecker()
self.s, self.c = self.createServerAndClient(timeout=2)
self.lo = LoopbackPump(self.s, self.c)
def tearDown(self):
[n.cancel() for n in reactor.getDelayedCalls()]
log.msg("logged by server: \n\n%s" % self.lo.slog.getvalue())
log.msg("logged by client: \n\n%s" % self.lo.clog.getvalue())
self.lo.cleanup()
del self.s
del self.c
def createServerAndClient(self, timeout=None):
factory = ftp.Factory()
factory.dtpInterface = '127.0.0.1'
factory.maxProtocolInstances = None
protocol = MyVirtualFTP()
protocol.factory = factory
if not self.loadTestRealm:
realm = ftpdav.Realm(os.getcwd())
else:
realm = TestRealm()
p = portal.Portal(realm)
p.registerChecker(checkers.AllowAnonymousAccess(), credentials.IAnonymous)
p.registerChecker(self.upChecker, credentials.IUsernamePassword)
protocol.portal = p
client = SimpleProtocol(timeout)
client.factory = internet.protocol.Factory()
return (protocol, client)
def hookUpDtp(self):
self.s.dtpTxfrMode = ftp.PASV
ds = ftp.DTP()
ds.pi = self.s
ds.pi.dtpInstance = ds
ds.pi.dtpPort = ('i','',0)
ds.factory = ftp.DTPFactory(self.s)
self.s.dtpFactory = ds.factory
self.ds = ds
self.s._FTP__TestingSoJustSkipTheReactorStep = True
self.dc = self.createDtpClient()
def createDtpClient(self):
dc = SimpleProtocol()
dc.factory = protocol.ClientFactory()
dc.factory.protocol = SimpleProtocol
dc.setRawMode()
return dc
def test_PASV_transfer(self):
self.s.loadAvatar()
self.c.sendLine('PASV')
self.lo.pump()
print self.c.lines
junk, p1, p2 = self.c.lastline.split(',')
port = (int(p1) << 8) + int(p2)
print port
dc = self.createDtpClient()
dcport = reactor.connectTCP('127.0.0.1', port)
import pdb;pdb.set_trace()
class NonClosingStringIO(StringIO):
def close(self):
pass
class CustomFileWrapper(protocol.FileWrapper):
def write(self, data):
protocol.FileWrapper.write(self, data)
class IOPump:
"""Utility to pump data between clients and servers for protocol testing.
Perhaps this is a utility worthy of being in protocol.py?
"""
def __init__(self, client, server, clientIO, serverIO):
self.client = client
self.server = server
self.clientIO = clientIO
self.serverIO = serverIO
def flush(self):
"Pump until there is no more input or output."
reactor.iterate()
while self.pump():
reactor.iterate()
reactor.iterate()
def pumpAndCount(self):
numMessages = 0
while True:
result = self.pump()
if result == 0:
return numMessages
else:
numMessages += result
def pump(self):
"""Move data back and forth.
Returns whether any data was moved.
"""
self.clientIO.seek(0)
self.serverIO.seek(0)
cData = self.clientIO.read()
sData = self.serverIO.read()
self.clientIO.seek(0)
self.serverIO.seek(0)
self.clientIO.truncate()
self.serverIO.truncate()
self.client.transport._checkProducer()
self.server.transport._checkProducer()
for byte in cData:
self.server.dataReceived(byte)
for byte in sData:
self.client.dataReceived(byte)
if cData or sData:
return 1
else:
return 0
def getTuple(self):
return (self.client, self.server, self.pump, self.flush)
class ServerFactoryForTest(protocol.Factory):
def __init__(self, portal):
self.protocol = ftp.FTP
self.allowAnonymous = True
self.userAnonymous = 'anonymous'
self.timeOut = 30
self.dtpTimeout = 10
self.maxProtocolInstances = 100
self.instances = []
self.portal = portal
class ConnectedFTPServer(object):
c = None
s = None
iop = None
dc = None
ds = None
diop = None
loadTestRealm = None
upChecker = VeryInsecureChecker()
def __init__(self):
self.deferred = defer.Deferred()
s = ftp.FTP()
c = SimpleProtocol()
c.logname = 'ftp-pi'
self.cio, self.sio = NonClosingStringIO(), NonClosingStringIO()
if not self.loadTestRealm:
realm = ftpdav.Realm(os.getcwd())
else:
realm = TestRealm()
p = portal.Portal(realm)
p.registerChecker(checkers.AllowAnonymousAccess(), credentials.IAnonymous)
p.registerChecker(self.upChecker, credentials.IUsernamePassword)
protocol.portal = p
s.factory = ServerFactoryForTest(p)
s.factory.timeOut = None
s.factory.dtpTimeout = None
c.factory = protocol.ClientFactory()
c.factory.protocol = SimpleProtocol
c.makeConnection(CustomFileWrapper(self.cio))
s.makeConnection(CustomFileWrapper(self.sio))
iop = IOPump(c, s, self.cio, self.sio)
self.c, self.s, self.iop = c, s, iop
def hookUpDTP(self):
log.debug('hooking up dtp')
self.dcio, self.dsio = NonClosingStringIO(), NonClosingStringIO()
ds = ftp.DTP()
ds.pi = self.s
ds.pi.dtpInstance = ds
ds.pi.dtpPort = ('i','',0)
ds.factory = ftp.DTPFactory(self.s)
self.s.dtpFactory = ds.factory
self.s.TestingSoJustSkipTheReactorStep = True
ds.makeConnection(CustomFileWrapper(self.dsio))
dc = SimpleProtocol()
dc.logname = 'ftp-dtp'
dc.factory = protocol.ClientFactory()
dc.factory.protocol = SimpleProtocol
dc.setRawMode()
del dc.lines
dc.makeConnection(CustomFileWrapper(self.dcio))
iop = IOPump(dc, ds, self.dcio, self.dsio)
self.dc, self.ds, self.diop = dc, ds, iop
log.debug('flushing pi buffer')
self.iop.flush()
log.debug('hooked up dtp')
return
def getDtpCSTuple(self):
if not self.s.shell:
self.loadAvatar()
if not self.dc:
self.hookUpDTP()
return (self.dc, self.ds, self.diop)
def getCSTuple(self):
return (self.c, self.s, self.iop, self.srvReceive)
def srvReceive(self, *args):
for msg in args:
self.s.lineReceived(msg)
self.iop.flush()
def loadAvatar(self):
log.debug('BogusAvatar.loadAvatar')
shell = BogusAvatar()
shell.tld = '/home/foo'
shell.debug = True
shell.clientwd = '/'
shell.user = 'anonymous'
shell.uid = 1000
shell.gid = 1000
self.s.shell = shell
self.s.user = 'anonymous'
return shell
class FTPTestCase(unittest.TestCase):
def setUp(self):
self.cnx = ConnectedFTPServer()
def tearDown(self):
delayeds = reactor.getDelayedCalls()
for d in delayeds:
d.cancel()
self.cnx = None
class TestFTPServer(FTPTestCase):
def testNotLoggedInReply(self):
cli, sr, iop, send = self.cnx.getCSTuple()
cmdlist = ['CDUP', 'CWD', 'LIST', 'MODE', 'PASV', 'PORT',
'PWD', 'RETR', 'STRU', 'SYST', 'TYPE']
for cmd in cmdlist:
send(cmd)
self.failUnless(cli.lines > 0)
self.assertEqual(cli.lines[-1], ftp.RESPONSE[ftp.NOT_LOGGED_IN])
def testBadCmdSequenceReply(self):
cli, sr, iop, send = self.cnx.getCSTuple()
send('PASS')
self.failUnless(cli.lines > 0)
self.assertEqual(cli.lines[-1],
ftp.RESPONSE[ftp.BAD_CMD_SEQ] % 'USER required before PASS')
def testBadCmdSequenceReplyPartTwo(self):
cli, sr, iop, send = self.cnx.getCSTuple()
self.cnx.loadAvatar()
self.failUnlessRaises(ftp.BadCmdSequenceError, self.cnx.s.ftp_RETR,'foo')
log.flushErrors(ftp.BadCmdSequenceError)
def testCmdNotImplementedForArgErrors(self):
cli, sr, iop, send = self.cnx.getCSTuple()
self.cnx.loadAvatar()
self.failUnlessRaises(ftp.CmdNotImplementedForArgError, self.cnx.s.ftp_MODE, 'z')
self.failUnlessRaises(ftp.CmdNotImplementedForArgError, self.cnx.s.ftp_STRU, 'I')
def testDecodeHostPort(self):
self.assertEquals(self.cnx.s.decodeHostPort('25,234,129,22,100,23'),
('25.234.129.22', 25623))
def testPASV(self):
cli, sr, iop, send = self.cnx.getCSTuple()
self.cnx.loadAvatar()
self.cnx.s.ftp_PASV()
iop.flush()
reply = cli.lines[-1]
self.assert_(re.search(r'227 =.*,[0-2]?[0-9]?[0-9],[0-2]?[0-9]?[0-9]',cli.lines[-1]))
def testTYPE(self):
cli, sr, iop, send = self.cnx.getCSTuple()
self.cnx.loadAvatar()
for n in ['I', 'A', 'L', 'i', 'a', 'l']:
self.cnx.s.ftp_TYPE(n)
iop.flush()
self.assertEquals(cli.lines[-1], ftp.RESPONSE[ftp.TYPE_SET_OK] % n.upper())
if n in ['I', 'L', 'i', 'l']:
self.assertEquals(self.cnx.s.binary, True)
else:
self.assertEquals(self.cnx.s.binary, False)
s = ftp.FTP()
okParams = ['i', 'a', 'l']
for n in [chr(x) for x in xrange(97,123)]:
if n not in okParams:
self.failUnlessRaises(ftp.CmdArgSyntaxError, s.ftp_TYPE, n)
self.cnx.hookUpDTP()
dc, ds, diop = self.cnx.getDtpCSTuple()
sr.dtpTxfrMode = ftp.PASV
sr.ftp_TYPE('A') self.assertEquals(self.cnx.s.binary, False)
iop.flush()
diop.flush()
log.debug('flushed buffers, about to run RETR')
sr.ftp_RETR('ASCII')
iop.flush()
diop.flush()
self.failUnless(len(dc.rawData) >= 1)
log.msg(dc.rawData)
rx = ''.join(dc.rawData)
log.msg(rx)
self.failUnlessEqual(rx.count('\r\n'), 2, "more than 2 \\r\\n's ")
self.fail('test is not complete')
testTYPE.todo = 'rework tests to make sure only binary is supported'
def testRETR(self):
cli, sr, iop, send = self.cnx.getCSTuple()
avatar = self.cnx.loadAvatar()
sr.ftp_TYPE('L')
self.assert_(self.cnx.s.binary == True)
iop.flush()
self.cnx.hookUpDTP()
dc, ds, diop = self.cnx.getDtpCSTuple()
sr.dtpTxfrMode = ftp.PASV
self.assert_(sr.blocked is None)
self.assert_(sr.dtpTxfrMode == ftp.PASV)
log.msg('about to send RETR command')
filename = '/home/foo/foo.txt'
send('RETR %s' % filename)
iop.flush()
diop.flush()
log.msg('dc.rawData: %s' % dc.rawData)
self.assert_(len(dc.rawData) >= 1)
self.failUnlessEqual(avatar.path, filename)
rx = ''.join(dc.rawData)
self.failUnlessEqual(rx, avatar.sentfile.getvalue())
def testSYST(self):
cli, sr, iop, send = self.cnx.getCSTuple()
self.cnx.loadAvatar()
self.cnx.s.ftp_SYST()
iop.flush()
self.assertEquals(cli.lines[-1], ftp.RESPONSE[ftp.NAME_SYS_TYPE])
def testLIST(self):
cli, sr, iop, send = self.cnx.getCSTuple()
avatar = self.cnx.loadAvatar()
sr.dtpTxfrMode = ftp.PASV
self.cnx.hookUpDTP()
dc, ds, diop = self.cnx.getDtpCSTuple()
self.assert_(hasattr(self.cnx.s, 'binary'))
self.cnx.s.binary = True
sr.ftp_LIST('/')
iop.flush()
diop.flush()
log.debug('dc.rawData: %s' % dc.rawData)
self.assert_(len(dc.rawData) > 1)
avatarsent = avatar.sentlist.getvalue()
dcrx = ''.join(dc.rawData)
self.assertEqual(avatarsent, dcrx,
"""
avatar's sentlist != dtp client's ''.join(rawData)
avatar's sentlist:
%s
''.join(dc.rawData):
%s
""" % (avatarsent, dcrx))
class TestDTPTesting(FTPTestCase):
def testDTPTestingSanityCheck(self):
filesizes = [(n*100) for n in xrange(100,110)]
for fs in filesizes:
self.tearDown()
self.setUp()
self.runtest(fs)
def runtest(self, filesize):
cli, sr, iop, send = self.cnx.getCSTuple()
avatar = self.cnx.loadAvatar()
avatar.filesize = filesize
sr.dtpTxfrMode = ftp.PASV
sr.binary = True self.cnx.hookUpDTP()
dc, ds, diop = self.cnx.getDtpCSTuple()
sr.ftp_RETR('')
iop.flush()
diop.flush()
log.debug('dc.rawData size: %d' % len(dc.rawData))
rxLines = ''.join(dc.rawData)
lenRxLines = len(rxLines)
sizes = 'filesize before txmit: %d, filesize after txmit: %d' % (avatar.finalFileSize, lenRxLines)
percent = 'percent actually received %f' % ((float(lenRxLines) / float(avatar.finalFileSize))*100)
log.debug(sizes)
log.debug(percent)
self.assertEquals(lenRxLines, avatar.finalFileSize)