Spaces:
Paused
Paused
| import os | |
| import webssh | |
| import logging | |
| import paramiko | |
| import subprocess | |
| import tornado.web | |
| import tornado.ioloop | |
| from tornado.options import define, options | |
| #from webssh.settings import get_origin_setting | |
| from webssh.handler import IndexHandler, WsockHandler, NotFoundHandler | |
| from .Fonts import Font, get_font_filename, base_dir, font_dirs, max_body_size | |
| from .Tools import get_ssl_context, get_origin_setting, get_server_settings, check_encoding_setting, print_version | |
| logging.getLogger("tornado.access").setLevel(logging.DEBUG) | |
| logging.getLogger("tornado.application").setLevel(logging.DEBUG) | |
| logging.getLogger("tornado.general").setLevel(logging.DEBUG) | |
| # Define application options | |
| define('address', default='0.0.0.0', help='Listen address') | |
| define('port', type=int, default=7860, help='Listen port') | |
| define('ssladdress', default='', help='SSL listen address') | |
| define('sslport', type=int, default=4433, help='SSL listen port') | |
| define('certfile', default='', help='SSL certificate file') | |
| define('keyfile', default='', help='SSL private key file') | |
| define('debug', type=bool, default=True, help='Debug mode') | |
| define('policy', default='autoadd', help='Missing host key policy, reject|autoadd|warning') | |
| define('hostfile', default='', help='User defined host keys file') | |
| define('syshostfile', default='', help='System wide host keys file') | |
| define('tdstream', default='', help='Trusted downstream, separated by comma') | |
| define('redirect', type=bool, default=True, help='Redirecting http to https') | |
| define('fbidhttp', type=bool, default=False, help='Forbid public plain http incoming requests') | |
| define('xheaders', type=bool, default=False, help='Support xheaders') | |
| define('xsrf', type=bool, default=False, help='CSRF protection') | |
| define( | |
| "origin", | |
| default="*", | |
| help="""Origin policy, | |
| 'same': same origin policy, matches host name and port number; | |
| 'primary': primary domain policy, matches primary domain only; | |
| '<domains>': custom domains policy, matches any domain in the <domains> list | |
| separated by comma; | |
| '*': wildcard policy, matches any domain, allowed in debug mode only.""", | |
| ) | |
| define('wpintvl', type=float, default=30, help='Websocket ping interval') | |
| define('timeout', type=float, default=60, help='SSH connection timeout') | |
| define('delay', type=float, default=60, help='The delay to call recycle_worker') | |
| define('maxconn', type=int, default=5, help='Maximum live connections (ssh sessions) per client') | |
| define('font', default='', help='custom font filename') | |
| define( | |
| "encoding", | |
| default="utf-8", | |
| help="""The default character encoding of ssh servers. | |
| Example: --encoding='utf-8' to solve the problem with some switches&routers""", | |
| ) | |
| define('version', type=bool, help='Show version information',callback=print_version) | |
| # New handler for your custom page | |
| class AppHandler(tornado.web.RequestHandler): | |
| # Define the path to the templates directory | |
| HandlerPath = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))),'WebSSH', 'templates') | |
| template_folder = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'WebSSH', 'templates') | |
| def get(self): | |
| self.render(os.path.join(self.template_folder, 'index.html')) | |
| def post(self): | |
| username = self.get_argument("username") | |
| password = self.get_argument("password") | |
| # Check if the user already exists | |
| user_exists = subprocess.run(["id", "-u", username], capture_output=True) | |
| if user_exists.returncode == 0: | |
| # User exists, read the existing SSH key | |
| ssh_dir = f"/home/{username}/.ssh" | |
| private_key_path = f"{ssh_dir}/id_rsa" | |
| if os.path.exists(private_key_path): | |
| with open(private_key_path, "r") as file: | |
| private_key = file.read() | |
| else: | |
| self.set_status(404) | |
| self.write("SSH key not found for existing user.") | |
| return | |
| else: | |
| """ | |
| # Create the user directory and .ssh directory manually | |
| user_home = f"/home/{username}" | |
| ssh_dir = f"{user_home}/.ssh" | |
| os.makedirs(ssh_dir, exist_ok=True) | |
| # Generate SSH key pair for the new user | |
| subprocess.run(["ssh-keygen", "-t", "rsa", "-b", "2048", "-f", f"{ssh_dir}/id_rsa", "-N", ""]) | |
| # Read the private key | |
| with open(f"{ssh_dir}/id_rsa", "r") as file: | |
| private_key = file.read() | |
| """ | |
| self.set_status(404) | |
| self.write("SSH key not found for existing user.") | |
| return | |
| # Return the private key to the user | |
| self.set_header('Content-Type', 'application/octet-stream') | |
| self.set_header('Content-Disposition', f'attachment; filename=id_rsa.txt') | |
| self.write(private_key) | |
| def make_app(loop): | |
| # Set SSH host key policy | |
| policy = paramiko.AutoAddPolicy() if options.policy == 'autoadd' else paramiko.RejectPolicy() | |
| # Define host keys settings | |
| host_keys_settings = { | |
| 'system_host_keys': paramiko.util.load_host_keys('/app/ssh/ssh_known_hosts'), | |
| 'host_keys': paramiko.HostKeys(), | |
| 'host_keys_filename': None | |
| } | |
| template_path=os.path.join(base_dir, '', 'templates'), | |
| static_path=os.path.join(base_dir, '', 'static'), | |
| #print(f"* Path: {base_dir},\n* Template Path: {template_path}, \n* Static Path: {static_path}") | |
| # Create Tornado application | |
| return tornado.web.Application( | |
| [ | |
| (r'/', IndexHandler, dict(loop=loop, policy=policy, host_keys_settings=host_keys_settings)), | |
| (r'/ws', WsockHandler, dict(loop=loop)), | |
| (r'/app', AppHandler), # Route for your new page | |
| ], | |
| template_path=os.path.join(base_dir, '', 'templates'), | |
| static_path=os.path.join(base_dir, '', 'static'), | |
| xsrf_cookies=options.xsrf, | |
| debug=options.debug, | |
| font=Font( | |
| get_font_filename(options.font, os.path.join(base_dir, *font_dirs)), | |
| font_dirs[1:] | |
| ), | |
| websocket_ping_interval=options.wpintvl, | |
| origin_policy=get_origin_setting(options), | |
| origin=options.origin | |
| ) | |
| def app_listen(app, port, address, server_settings): | |
| app.listen(port, address, **server_settings) | |
| if not server_settings.get('ssl_options'): | |
| server_type = 'http' | |
| else: | |
| server_type = 'https' | |
| handler.redirecting = True if options.redirect else False | |
| logging.info('Listening on {}:{} ({})'.format(address, port, server_type)) | |
| if __name__ == '__main__': | |
| loop = tornado.ioloop.IOLoop.current() | |
| check_encoding_setting(options.encoding) | |
| # Parse command-line options | |
| tornado.options.parse_command_line() | |
| # Create and start the application | |
| app = make_app(loop) | |
| ssl_ctx = get_ssl_context(options) | |
| server_settings = get_server_settings(options) | |
| app_listen(app, options.port, options.address, server_settings) | |
| if ssl_ctx: | |
| server_settings.update(ssl_options=ssl_ctx) | |
| app_listen(app, options.sslport, options.ssladdress, server_settings) | |
| loop.start() | |