Spaces:
Paused
Paused
File size: 7,141 Bytes
b92ee48 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
import os
import webssh
import logging
import paramiko
import subprocess
import tornado.web
import tornado.ioloop
from tornado.options import define, options
#from webssh.settings import get_origin_setting
from webssh.handler import IndexHandler, WsockHandler, NotFoundHandler
from .Fonts import Font, get_font_filename, base_dir, font_dirs, max_body_size
from .Tools import get_ssl_context, get_origin_setting, get_server_settings, check_encoding_setting, print_version
logging.getLogger("tornado.access").setLevel(logging.DEBUG)
logging.getLogger("tornado.application").setLevel(logging.DEBUG)
logging.getLogger("tornado.general").setLevel(logging.DEBUG)
# Define application options
define('address', default='0.0.0.0', help='Listen address')
define('port', type=int, default=7860, help='Listen port')
define('ssladdress', default='', help='SSL listen address')
define('sslport', type=int, default=4433, help='SSL listen port')
define('certfile', default='', help='SSL certificate file')
define('keyfile', default='', help='SSL private key file')
define('debug', type=bool, default=True, help='Debug mode')
define('policy', default='autoadd', help='Missing host key policy, reject|autoadd|warning')
define('hostfile', default='', help='User defined host keys file')
define('syshostfile', default='', help='System wide host keys file')
define('tdstream', default='', help='Trusted downstream, separated by comma')
define('redirect', type=bool, default=True, help='Redirecting http to https')
define('fbidhttp', type=bool, default=False, help='Forbid public plain http incoming requests')
define('xheaders', type=bool, default=False, help='Support xheaders')
define('xsrf', type=bool, default=False, help='CSRF protection')
define(
"origin",
default="*",
help="""Origin policy,
'same': same origin policy, matches host name and port number;
'primary': primary domain policy, matches primary domain only;
'<domains>': custom domains policy, matches any domain in the <domains> list
separated by comma;
'*': wildcard policy, matches any domain, allowed in debug mode only.""",
)
define('wpintvl', type=float, default=30, help='Websocket ping interval')
define('timeout', type=float, default=60, help='SSH connection timeout')
define('delay', type=float, default=60, help='The delay to call recycle_worker')
define('maxconn', type=int, default=5, help='Maximum live connections (ssh sessions) per client')
define('font', default='', help='custom font filename')
define(
"encoding",
default="utf-8",
help="""The default character encoding of ssh servers.
Example: --encoding='utf-8' to solve the problem with some switches&routers""",
)
define('version', type=bool, help='Show version information',callback=print_version)
# New handler for your custom page
class AppHandler(tornado.web.RequestHandler):
# Define the path to the templates directory
HandlerPath = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))),'WebSSH', 'templates')
template_folder = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'WebSSH', 'templates')
def get(self):
self.render(os.path.join(self.template_folder, 'index.html'))
def post(self):
username = self.get_argument("username")
password = self.get_argument("password")
# Check if the user already exists
user_exists = subprocess.run(["id", "-u", username], capture_output=True)
if user_exists.returncode == 0:
# User exists, read the existing SSH key
ssh_dir = f"/home/{username}/.ssh"
private_key_path = f"{ssh_dir}/id_rsa"
if os.path.exists(private_key_path):
with open(private_key_path, "r") as file:
private_key = file.read()
else:
self.set_status(404)
self.write("SSH key not found for existing user.")
return
else:
"""
# Create the user directory and .ssh directory manually
user_home = f"/home/{username}"
ssh_dir = f"{user_home}/.ssh"
os.makedirs(ssh_dir, exist_ok=True)
# Generate SSH key pair for the new user
subprocess.run(["ssh-keygen", "-t", "rsa", "-b", "2048", "-f", f"{ssh_dir}/id_rsa", "-N", ""])
# Read the private key
with open(f"{ssh_dir}/id_rsa", "r") as file:
private_key = file.read()
"""
self.set_status(404)
self.write("SSH key not found for existing user.")
return
# Return the private key to the user
self.set_header('Content-Type', 'application/octet-stream')
self.set_header('Content-Disposition', f'attachment; filename=id_rsa.txt')
self.write(private_key)
def make_app(loop):
# Set SSH host key policy
policy = paramiko.AutoAddPolicy() if options.policy == 'autoadd' else paramiko.RejectPolicy()
# Define host keys settings
host_keys_settings = {
'system_host_keys': paramiko.util.load_host_keys('/app/ssh/ssh_known_hosts'),
'host_keys': paramiko.HostKeys(),
'host_keys_filename': None
}
template_path=os.path.join(base_dir, '', 'templates'),
static_path=os.path.join(base_dir, '', 'static'),
#print(f"* Path: {base_dir},\n* Template Path: {template_path}, \n* Static Path: {static_path}")
# Create Tornado application
return tornado.web.Application(
[
(r'/', IndexHandler, dict(loop=loop, policy=policy, host_keys_settings=host_keys_settings)),
(r'/ws', WsockHandler, dict(loop=loop)),
(r'/app', AppHandler), # Route for your new page
],
template_path=os.path.join(base_dir, '', 'templates'),
static_path=os.path.join(base_dir, '', 'static'),
xsrf_cookies=options.xsrf,
debug=options.debug,
font=Font(
get_font_filename(options.font, os.path.join(base_dir, *font_dirs)),
font_dirs[1:]
),
websocket_ping_interval=options.wpintvl,
origin_policy=get_origin_setting(options),
origin=options.origin
)
def app_listen(app, port, address, server_settings):
app.listen(port, address, **server_settings)
if not server_settings.get('ssl_options'):
server_type = 'http'
else:
server_type = 'https'
handler.redirecting = True if options.redirect else False
logging.info('Listening on {}:{} ({})'.format(address, port, server_type))
if __name__ == '__main__':
loop = tornado.ioloop.IOLoop.current()
check_encoding_setting(options.encoding)
# Parse command-line options
tornado.options.parse_command_line()
# Create and start the application
app = make_app(loop)
ssl_ctx = get_ssl_context(options)
server_settings = get_server_settings(options)
app_listen(app, options.port, options.address, server_settings)
if ssl_ctx:
server_settings.update(ssl_options=ssl_ctx)
app_listen(app, options.sslport, options.ssladdress, server_settings)
loop.start()
|