#!/usr/bin/env python # Copyright (c) 2009 Twisted Matrix Laboratories. # See LICENSE for details. from twisted.cred import portal, checkers, credentials from twisted.conch import error, avatar from twisted.conch.checkers import SSHPublicKeyDatabase from twisted.conch.ssh import factory, userauth, connection, keys, session, transport from twisted.internet import reactor, protocol, defer from zope.interface import implements import sys import random header = """Linux kischt 2.6.48-5-amd23 #1 SMP Wed Jan 12 04:22:50 UTC 2011 i687 The programs included with the Debian GNU/Linux system are free software; the exact distribution terms for each program are described in the individual files in /usr/share/doc/*/copyright. Debian GNU/Linux comes with ABSOLUTELY NO WARRANTY, to the extent permitted by applicable law. Follow the white rabbit. Last login: Fri Mar 11 15:33:45 2011 from 127.0.0.1 bash-# """ fortunes = open("fortunes").read().split("\n%") class ExampleAvatar(avatar.ConchUser): def __init__(self, username): avatar.ConchUser.__init__(self) self.username = username self.channelLookup.update({'session':session.SSHSession}) class ExampleRealm: implements(portal.IRealm) def requestAvatar(self, avatarId, mind, *interfaces): return interfaces[0], ExampleAvatar(avatarId), lambda: None class EchoProtocol(protocol.Protocol): def makeConnection(self, transport): protocol.Protocol.makeConnection(self, transport) self.transport.write( "\r\n".join(header.split("\n"))) self.state = "count" self.ctr = 0 def dataReceived(self, data): if data == '\x03': #^C self.transport.loseConnection() return elif self.state == "count": #echo mode self.ctr += 1 if self.ctr > 10: self.state = "bomb" print "received: ",data if data == '\r': data = '\n\r' self.transport.write(data) elif self.state == "bomb": self.transport.write( "\r\n".join( random.choice(fortunes).split("\n") ) ) self.transport.write("\n\r") publicKey = 'ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAGEArzJx8OYOnJmzf4tfBEvLi8DVPrJ3/c9k2I/Az64fxjHf9imyRJbixtQhlH9lfNjUIx+4LmrJH5QNRsFporcHDKOTwTTYLh5KmRpslkYHRivcJSkbh/C+BR3utDS555mV' privateKey = """-----BEGIN RSA PRIVATE KEY----- MIIByAIBAAJhAK8ycfDmDpyZs3+LXwRLy4vA1T6yd/3PZNiPwM+uH8Yx3/YpskSW 4sbUIZR/ZXzY1CMfuC5qyR+UDUbBaaK3Bwyjk8E02C4eSpkabJZGB0Yr3CUpG4fw vgUd7rQ0ueeZlQIBIwJgbh+1VZfr7WftK5lu7MHtqE1S1vPWZQYE3+VUn8yJADyb Z4fsZaCrzW9lkIqXkE3GIY+ojdhZhkO1gbG0118sIgphwSWKRxK0mvh6ERxKqIt1 xJEJO74EykXZV4oNJ8sjAjEA3J9r2ZghVhGN6V8DnQrTk24Td0E8hU8AcP0FVP+8 PQm/g/aXf2QQkQT+omdHVEJrAjEAy0pL0EBH6EVS98evDCBtQw22OZT52qXlAwZ2 gyTriKFVoqjeEjt3SZKKqXHSApP/AjBLpF99zcJJZRq2abgYlf9lv1chkrWqDHUu DZttmYJeEfiFBBavVYIF1dOlZT0G8jMCMBc7sOSZodFnAiryP+Qg9otSBjJ3bQML pSTqy7c3a2AScC/YyOwkDaICHnnD3XyjMwIxALRzl0tQEKMXs6hH8ToUdlLROCrP EhQ0wahUTCk1gKA4uPD6TMTChavbh4K63OvbKg== -----END RSA PRIVATE KEY-----""" class InMemoryPublicKeyChecker(SSHPublicKeyDatabase): def checkKey(self, credentials): return credentials.username == 'user' and \ keys.Key.fromString(data=publicKey).blob() == credentials.blob class ExampleSession: def __init__(self, avatar): """ We don't use it, but the adapter is passed the avatar as its first argument. """ def getPty(self, term, windowSize, attrs): pass def execCommand(self, proto, cmd): raise Exception("no executing commands") def windowChanged(self, *a): pass def openShell(self, trans): ep = EchoProtocol() ep.makeConnection(trans) trans.makeConnection(session.wrapProtocol(ep)) def eofReceived(self): pass def closed(self): pass from twisted.python import components components.registerAdapter(ExampleSession, ExampleAvatar, session.ISession) class ModifiedSSHTransport(transport.SSHServerTransport): protocolVersion = '2.0' version = 'OpenSSH_4.3p2' comment = 'Debian-9etch2' ourVersionString = ('SSH-' + protocolVersion + '-' + version + ' ' + comment).strip() class ExampleFactory(factory.SSHFactory): protocol = ModifiedSSHTransport publicKeys = { 'ssh-rsa': keys.Key.fromString(data=publicKey) } privateKeys = { 'ssh-rsa': keys.Key.fromString(data=privateKey) } services = { 'ssh-userauth': userauth.SSHUserAuthServer, 'ssh-connection': connection.SSHConnection } class PasswordDatabase: implements(checkers.ICredentialsChecker) credentialInterfaces = (credentials.IUsernamePassword, credentials.IUsernameHashedPassword) def __init__(self, **users): self.users = users def addUser(self, username, password): self.users[username] = password def _cbPasswordMatch(self, matched, username): if matched: return username else: return failure.Failure(error.UnauthorizedLogin()) def requestAvatarId(self, credentials): if not credentials.username in self.users: self.users[credentials.username] = credentials.password return defer.succeed(credentials.username) # if credentials.username in self.users: # return defer.maybeDeferred( # credentials.checkPassword, # self.users[credentials.username]).addCallback( # self._cbPasswordMatch, str(credentials.username)) # else: # return defer.fail(error.UnauthorizedLogin()) portal = portal.Portal(ExampleRealm()) passwdDB = PasswordDatabase() portal.registerChecker(passwdDB) portal.registerChecker(InMemoryPublicKeyChecker()) ExampleFactory.portal = portal from twisted.application import service, internet sshservice = internet.TCPServer(2222, ExampleFactory()) application = service.Application('SSH Server') sshservice.setServiceParent(application) #if __name__ == '__main__': # reactor.listenTCP(2222, ExampleFactory()) # reactor.run()