import os import webssh import logging import paramiko import subprocess import tornado.web import tornado.ioloop from tornado.options import define, options #from webssh.settings import get_origin_setting from webssh.handler import IndexHandler, WsockHandler, NotFoundHandler from .Fonts import Font, get_font_filename, base_dir, font_dirs, max_body_size from .Tools import get_ssl_context, get_origin_setting, get_server_settings, check_encoding_setting, print_version logging.getLogger("tornado.access").setLevel(logging.DEBUG) logging.getLogger("tornado.application").setLevel(logging.DEBUG) logging.getLogger("tornado.general").setLevel(logging.DEBUG) # Define application options define('address', default='0.0.0.0', help='Listen address') define('port', type=int, default=7860, help='Listen port') define('ssladdress', default='', help='SSL listen address') define('sslport', type=int, default=4433, help='SSL listen port') define('certfile', default='', help='SSL certificate file') define('keyfile', default='', help='SSL private key file') define('debug', type=bool, default=True, help='Debug mode') define('policy', default='autoadd', help='Missing host key policy, reject|autoadd|warning') define('hostfile', default='', help='User defined host keys file') define('syshostfile', default='', help='System wide host keys file') define('tdstream', default='', help='Trusted downstream, separated by comma') define('redirect', type=bool, default=True, help='Redirecting http to https') define('fbidhttp', type=bool, default=False, help='Forbid public plain http incoming requests') define('xheaders', type=bool, default=False, help='Support xheaders') define('xsrf', type=bool, default=False, help='CSRF protection') define( "origin", default="*", help="""Origin policy, 'same': same origin policy, matches host name and port number; 'primary': primary domain policy, matches primary domain only; '': custom domains policy, matches any domain in the list separated by comma; '*': wildcard policy, matches any domain, allowed in debug mode only.""", ) define('wpintvl', type=float, default=30, help='Websocket ping interval') define('timeout', type=float, default=60, help='SSH connection timeout') define('delay', type=float, default=60, help='The delay to call recycle_worker') define('maxconn', type=int, default=5, help='Maximum live connections (ssh sessions) per client') define('font', default='', help='custom font filename') define( "encoding", default="utf-8", help="""The default character encoding of ssh servers. Example: --encoding='utf-8' to solve the problem with some switches&routers""", ) define('version', type=bool, help='Show version information',callback=print_version) # New handler for your custom page class AppHandler(tornado.web.RequestHandler): # Define the path to the templates directory HandlerPath = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))),'WebSSH', 'templates') template_folder = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'WebSSH', 'templates') def get(self): self.render(os.path.join(self.template_folder, 'index.html')) def post(self): username = self.get_argument("username") password = self.get_argument("password") # Check if the user already exists user_exists = subprocess.run(["id", "-u", username], capture_output=True) if user_exists.returncode == 0: # User exists, read the existing SSH key ssh_dir = f"/home/{username}/.ssh" private_key_path = f"{ssh_dir}/id_rsa" if os.path.exists(private_key_path): with open(private_key_path, "r") as file: private_key = file.read() else: self.set_status(404) self.write("SSH key not found for existing user.") return else: """ # Create the user directory and .ssh directory manually user_home = f"/home/{username}" ssh_dir = f"{user_home}/.ssh" os.makedirs(ssh_dir, exist_ok=True) # Generate SSH key pair for the new user subprocess.run(["ssh-keygen", "-t", "rsa", "-b", "2048", "-f", f"{ssh_dir}/id_rsa", "-N", ""]) # Read the private key with open(f"{ssh_dir}/id_rsa", "r") as file: private_key = file.read() """ self.set_status(404) self.write("SSH key not found for existing user.") return # Return the private key to the user self.set_header('Content-Type', 'application/octet-stream') self.set_header('Content-Disposition', f'attachment; filename=id_rsa.txt') self.write(private_key) def make_app(loop): # Set SSH host key policy policy = paramiko.AutoAddPolicy() if options.policy == 'autoadd' else paramiko.RejectPolicy() # Define host keys settings host_keys_settings = { 'system_host_keys': paramiko.util.load_host_keys('/app/ssh/ssh_known_hosts'), 'host_keys': paramiko.HostKeys(), 'host_keys_filename': None } template_path=os.path.join(base_dir, '', 'templates'), static_path=os.path.join(base_dir, '', 'static'), #print(f"* Path: {base_dir},\n* Template Path: {template_path}, \n* Static Path: {static_path}") # Create Tornado application return tornado.web.Application( [ (r'/', IndexHandler, dict(loop=loop, policy=policy, host_keys_settings=host_keys_settings)), (r'/ws', WsockHandler, dict(loop=loop)), (r'/app', AppHandler), # Route for your new page ], template_path=os.path.join(base_dir, '', 'templates'), static_path=os.path.join(base_dir, '', 'static'), xsrf_cookies=options.xsrf, debug=options.debug, font=Font( get_font_filename(options.font, os.path.join(base_dir, *font_dirs)), font_dirs[1:] ), websocket_ping_interval=options.wpintvl, origin_policy=get_origin_setting(options), origin=options.origin ) def app_listen(app, port, address, server_settings): app.listen(port, address, **server_settings) if not server_settings.get('ssl_options'): server_type = 'http' else: server_type = 'https' handler.redirecting = True if options.redirect else False logging.info('Listening on {}:{} ({})'.format(address, port, server_type)) if __name__ == '__main__': loop = tornado.ioloop.IOLoop.current() check_encoding_setting(options.encoding) # Parse command-line options tornado.options.parse_command_line() # Create and start the application app = make_app(loop) ssl_ctx = get_ssl_context(options) server_settings = get_server_settings(options) app_listen(app, options.port, options.address, server_settings) if ssl_ctx: server_settings.update(ssl_options=ssl_ctx) app_listen(app, options.sslport, options.ssladdress, server_settings) loop.start()