insultsexample.py   [plain text]


from twisted.cred import authorizer
from twisted.conch import identity, error
from twisted.conch.insults import client, colors
from twisted.conch.ssh import factory, userauth, connection, channel, session,keys
from twisted.internet import reactor, defer
from twisted.python import log
import sys
log.startLogging(sys.stderr)

class InsultsDemoClient(client.InsultsClient):
    def connectionMade(self):
        self.initScreen()
        self.clearScreen()
        self.writeStr("Hello, and welcome to Insults!")
        self.gotoXY(10,10)
        self.setAttributes(colors.FG_RED, colors.BG_BLUE)
        self.writeStr("Here is some red text on a blue background!")
        self.setAttributes(colors.CLEAR)
        self.gotoXY(0,1)
        self.writeStr("Press 'Q' to disconnect.")
        self.gotoXY(0, 5)
        self.writeStr("You pressed: ")
        self.refresh()

    def keyReceived(self, key):
        self.gotoXY(13, 5)
        self.eraseToLine()
        if ord(key) < 27:
            key = '^' + chr(ord(key) + 64)
        self.writeStr(key)
        self.refresh()
        if key.upper() == 'Q':
            self.transport.loseConnection()

class Identity(identity.ConchIdentity):
    def validatePublicKey(self, data):
        return defer.fail(error.ConchError('no public key'))

    def verifyPlainPassword(self, password):
        if password == 'password' and self.name == 'user':
            return defer.succeed('')
        return defer.fail(error.ConchError('bad password'))

class Authorizer(authorizer.Authorizer):
    def getIdentityRequest(self, name):
        return defer.succeed(Identity(name, self))

class SSHConnection(connection.SSHConnection):
    def gotGlobalRequest(self, *args):
        return 0

    def getChannel(self, channelType, windowSize, maxPacket, data):
        if channelType == 'session':
            return SSHSession(
                    remoteWindow=windowSize,
                    remoteMaxPacket = maxPacket,
                    conn = self)
        return 0

class SSHSession(channel.SSHChannel):
    def channelOpen(self, data):
        self.winSize = None
    def request_pty_req(self, data):
        term, winSize, modes = session.parseRequest_pty_req(data)
        self.term = term
        self.winSize = winSize
        return 1
    def request_shell(self, data):
        if not self.winSize:
            return 0
        self.client = InsultsDemoClient()
        self.client.setSize(self.winSize[1], self.winSize[0])
        self.dataReceived = self.client.dataReceived
        reactor.callLater(0, self.client.makeConnection, self)
        return 1

class SSHFactory(factory.SSHFactory):
    publicKeys = {
        'ssh-rsa':keys.getPublicKeyString('/etc/ssh_host_rsa_key.pub')
    }
    privateKeys = {
        'ssh-rsa':keys.getPrivateKeyObject('/etc/ssh_host_rsa_key')
    }
    services = {
        'ssh-userauth': userauth.SSHUserAuthServer,
        'ssh-connection': SSHConnection
    }
    authorizer = Authorizer()
    
reactor.listenTCP(5022, SSHFactory())
reactor.run()