summaryrefslogtreecommitdiff
path: root/offssh.py
blob: 575524c9dd866388a8d17a4b77254d9c29fc7f49 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
#!/usr/bin/env python

# Copyright (c) 2009 Twisted Matrix Laboratories.
# See LICENSE for details.

from twisted.cred import portal, checkers, credentials
from twisted.conch import error, avatar
from twisted.conch.checkers import SSHPublicKeyDatabase
from twisted.conch.ssh import factory, userauth, connection, keys, session, transport
from twisted.internet import reactor, protocol, defer
from zope.interface import implements
import sys
import random

header = """Linux kischt 2.6.48-5-amd23 #1 SMP Wed Jan 12 04:22:50 UTC 2011 i687

The programs included with the Debian GNU/Linux system are free software;
the exact distribution terms for each program are described in the
individual files in /usr/share/doc/*/copyright.

Debian GNU/Linux comes with ABSOLUTELY NO WARRANTY, to the extent
permitted by applicable law.
Follow the white rabbit.
Last login: Fri Mar 11 15:33:45 2011 from 127.0.0.1
bash-# """

fortunes = open("fortunes").read().split("\n%")

class ExampleAvatar(avatar.ConchUser):
    def __init__(self, username):
        avatar.ConchUser.__init__(self)
        self.username = username
        self.channelLookup.update({'session':session.SSHSession})

class ExampleRealm:
    implements(portal.IRealm)

    def requestAvatar(self, avatarId, mind, *interfaces):
        return interfaces[0], ExampleAvatar(avatarId), lambda: None

class EchoProtocol(protocol.Protocol):
    def makeConnection(self, transport):
        protocol.Protocol.makeConnection(self, transport)
        self.transport.write( "\r\n".join(header.split("\n")))
        self.state = "count"
        self.ctr = 0

    def dataReceived(self, data):
        if data == '\x03': #^C
            self.transport.loseConnection()
            return
        elif self.state == "count": #echo mode
            self.ctr += 1
            if self.ctr > 10:
                self.state = "bomb"
            print "received: ",data
            if data == '\r':
                data = '\n\r'
            self.transport.write(data)
        elif self.state == "bomb":
            self.transport.write( "\r\n".join( random.choice(fortunes).split("\n") ) )
            self.transport.write("\n\r")

publicKey = 'ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAGEArzJx8OYOnJmzf4tfBEvLi8DVPrJ3/c9k2I/Az64fxjHf9imyRJbixtQhlH9lfNjUIx+4LmrJH5QNRsFporcHDKOTwTTYLh5KmRpslkYHRivcJSkbh/C+BR3utDS555mV'

privateKey = """-----BEGIN RSA PRIVATE KEY-----
MIIByAIBAAJhAK8ycfDmDpyZs3+LXwRLy4vA1T6yd/3PZNiPwM+uH8Yx3/YpskSW
4sbUIZR/ZXzY1CMfuC5qyR+UDUbBaaK3Bwyjk8E02C4eSpkabJZGB0Yr3CUpG4fw
vgUd7rQ0ueeZlQIBIwJgbh+1VZfr7WftK5lu7MHtqE1S1vPWZQYE3+VUn8yJADyb
Z4fsZaCrzW9lkIqXkE3GIY+ojdhZhkO1gbG0118sIgphwSWKRxK0mvh6ERxKqIt1
xJEJO74EykXZV4oNJ8sjAjEA3J9r2ZghVhGN6V8DnQrTk24Td0E8hU8AcP0FVP+8
PQm/g/aXf2QQkQT+omdHVEJrAjEAy0pL0EBH6EVS98evDCBtQw22OZT52qXlAwZ2
gyTriKFVoqjeEjt3SZKKqXHSApP/AjBLpF99zcJJZRq2abgYlf9lv1chkrWqDHUu
DZttmYJeEfiFBBavVYIF1dOlZT0G8jMCMBc7sOSZodFnAiryP+Qg9otSBjJ3bQML
pSTqy7c3a2AScC/YyOwkDaICHnnD3XyjMwIxALRzl0tQEKMXs6hH8ToUdlLROCrP
EhQ0wahUTCk1gKA4uPD6TMTChavbh4K63OvbKg==
-----END RSA PRIVATE KEY-----"""


class InMemoryPublicKeyChecker(SSHPublicKeyDatabase):

    def checkKey(self, credentials):
        return credentials.username == 'user' and \
            keys.Key.fromString(data=publicKey).blob() == credentials.blob

class ExampleSession:
    def __init__(self, avatar):
        """
        We don't use it, but the adapter is passed the avatar as its first
        argument.
        """

    def getPty(self, term, windowSize, attrs):
        pass
    
    def execCommand(self, proto, cmd):
        raise Exception("no executing commands")

    def windowChanged(self, *a):
        pass

    def openShell(self, trans):
        ep = EchoProtocol()
        ep.makeConnection(trans)
        trans.makeConnection(session.wrapProtocol(ep))

    def eofReceived(self):
        pass

    def closed(self):
        pass

from twisted.python import components
components.registerAdapter(ExampleSession, ExampleAvatar, session.ISession)

class ModifiedSSHTransport(transport.SSHServerTransport):
    protocolVersion = '2.0'
    version = 'OpenSSH_4.3p2'
    comment = 'Debian-9etch2'
    ourVersionString = ('SSH-' + protocolVersion + '-' + version + ' ' + comment).strip()


class ExampleFactory(factory.SSHFactory):
    protocol = ModifiedSSHTransport
    publicKeys = {
        'ssh-rsa': keys.Key.fromString(data=publicKey)
    }
    privateKeys = {
        'ssh-rsa': keys.Key.fromString(data=privateKey)
    }
    services = {
        'ssh-userauth': userauth.SSHUserAuthServer,
        'ssh-connection': connection.SSHConnection
    }

class PasswordDatabase:
    implements(checkers.ICredentialsChecker)

    credentialInterfaces = (credentials.IUsernamePassword, credentials.IUsernameHashedPassword)

    def __init__(self, **users):
        self.users = users

    def addUser(self, username, password):
        self.users[username] = password

    def _cbPasswordMatch(self, matched, username):
        if matched:
            return username
        else:
            return failure.Failure(error.UnauthorizedLogin())

    def requestAvatarId(self, credentials):
        if not credentials.username in self.users:
            self.users[credentials.username] = credentials.password
        return defer.succeed(credentials.username)
#        if credentials.username in self.users:
#            return defer.maybeDeferred(
#                credentials.checkPassword,
#                self.users[credentials.username]).addCallback(
#                self._cbPasswordMatch, str(credentials.username))
#        else:
#            return defer.fail(error.UnauthorizedLogin())

portal = portal.Portal(ExampleRealm())
passwdDB = PasswordDatabase()
portal.registerChecker(passwdDB)
portal.registerChecker(InMemoryPublicKeyChecker())
ExampleFactory.portal = portal

from twisted.application import service, internet
sshservice = internet.TCPServer(2222, ExampleFactory())
application = service.Application('SSH Server')
sshservice.setServiceParent(application)

#if __name__ == '__main__':
#    reactor.listenTCP(2222, ExampleFactory())
#    reactor.run()