| import socket |
| import paramiko |
|
|
| |
| host_key = paramiko.RSAKey(filename='test_rsa.key') |
| host, port = 'localhost', 2222 |
|
|
| |
| class SSHServer(paramiko.ServerInterface): |
| def __init__(self): |
| self.event = threading.Event() |
|
|
| def check_channel_request(self, kind, chanid): |
| if kind == 'session': |
| return paramiko.OPEN_SUCCEEDED |
| return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED |
|
|
| def check_auth_password(self, username, password): |
| if username == 'test' and password == 'password': |
| return paramiko.AUTH_SUCCESSFUL |
| return paramiko.AUTH_FAILED |
|
|
| |
| def start_server(): |
| server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| server.bind((host, port)) |
| server.listen(100) |
|
|
| print(f'[*] Listening for connection on {host}:{port}...') |
|
|
| while True: |
| client, addr = server.accept() |
| print(f'[*] Accepted connection from {addr[0]}:{addr[1]}') |
|
|
| try: |
| transport = paramiko.Transport(client) |
| transport.add_server_key(host_key) |
| server = SSHServer() |
|
|
| transport.start_server(server=server) |
|
|
| chan = transport.accept(20) |
| if chan is None: |
| print('*** No channel.') |
| continue |
|
|
| print(f'[*] Authenticated!') |
| chan.send('Welcome to my SSH server!') |
|
|
| while True: |
| command = chan.recv(1024) |
| if not command: |
| break |
| print(f'[*] Received command: {command.decode()}') |
| chan.send(command) |
|
|
| transport.close() |
|
|
| except Exception as e: |
| print(f'[-] Caught exception: {e.__class__.__name__}: {e}') |
| try: |
| transport.close() |
| except: |
| pass |
|
|
| if __name__ == '__main__': |
| start_server() |
|
|