AlpineLinux / WebSSH /__main__.py
blenders
Initial
b92ee48
import os
import webssh
import logging
import paramiko
import subprocess
import tornado.web
import tornado.ioloop
from tornado.options import define, options
#from webssh.settings import get_origin_setting
from webssh.handler import IndexHandler, WsockHandler, NotFoundHandler
from .Fonts import Font, get_font_filename, base_dir, font_dirs, max_body_size
from .Tools import get_ssl_context, get_origin_setting, get_server_settings, check_encoding_setting, print_version
logging.getLogger("tornado.access").setLevel(logging.DEBUG)
logging.getLogger("tornado.application").setLevel(logging.DEBUG)
logging.getLogger("tornado.general").setLevel(logging.DEBUG)
# Define application options
define('address', default='0.0.0.0', help='Listen address')
define('port', type=int, default=7860, help='Listen port')
define('ssladdress', default='', help='SSL listen address')
define('sslport', type=int, default=4433, help='SSL listen port')
define('certfile', default='', help='SSL certificate file')
define('keyfile', default='', help='SSL private key file')
define('debug', type=bool, default=True, help='Debug mode')
define('policy', default='autoadd', help='Missing host key policy, reject|autoadd|warning')
define('hostfile', default='', help='User defined host keys file')
define('syshostfile', default='', help='System wide host keys file')
define('tdstream', default='', help='Trusted downstream, separated by comma')
define('redirect', type=bool, default=True, help='Redirecting http to https')
define('fbidhttp', type=bool, default=False, help='Forbid public plain http incoming requests')
define('xheaders', type=bool, default=False, help='Support xheaders')
define('xsrf', type=bool, default=False, help='CSRF protection')
define(
"origin",
default="*",
help="""Origin policy,
'same': same origin policy, matches host name and port number;
'primary': primary domain policy, matches primary domain only;
'<domains>': custom domains policy, matches any domain in the <domains> list
separated by comma;
'*': wildcard policy, matches any domain, allowed in debug mode only.""",
)
define('wpintvl', type=float, default=30, help='Websocket ping interval')
define('timeout', type=float, default=60, help='SSH connection timeout')
define('delay', type=float, default=60, help='The delay to call recycle_worker')
define('maxconn', type=int, default=5, help='Maximum live connections (ssh sessions) per client')
define('font', default='', help='custom font filename')
define(
"encoding",
default="utf-8",
help="""The default character encoding of ssh servers.
Example: --encoding='utf-8' to solve the problem with some switches&routers""",
)
define('version', type=bool, help='Show version information',callback=print_version)
# New handler for your custom page
class AppHandler(tornado.web.RequestHandler):
# Define the path to the templates directory
HandlerPath = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))),'WebSSH', 'templates')
template_folder = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'WebSSH', 'templates')
def get(self):
self.render(os.path.join(self.template_folder, 'index.html'))
def post(self):
username = self.get_argument("username")
password = self.get_argument("password")
# Check if the user already exists
user_exists = subprocess.run(["id", "-u", username], capture_output=True)
if user_exists.returncode == 0:
# User exists, read the existing SSH key
ssh_dir = f"/home/{username}/.ssh"
private_key_path = f"{ssh_dir}/id_rsa"
if os.path.exists(private_key_path):
with open(private_key_path, "r") as file:
private_key = file.read()
else:
self.set_status(404)
self.write("SSH key not found for existing user.")
return
else:
"""
# Create the user directory and .ssh directory manually
user_home = f"/home/{username}"
ssh_dir = f"{user_home}/.ssh"
os.makedirs(ssh_dir, exist_ok=True)
# Generate SSH key pair for the new user
subprocess.run(["ssh-keygen", "-t", "rsa", "-b", "2048", "-f", f"{ssh_dir}/id_rsa", "-N", ""])
# Read the private key
with open(f"{ssh_dir}/id_rsa", "r") as file:
private_key = file.read()
"""
self.set_status(404)
self.write("SSH key not found for existing user.")
return
# Return the private key to the user
self.set_header('Content-Type', 'application/octet-stream')
self.set_header('Content-Disposition', f'attachment; filename=id_rsa.txt')
self.write(private_key)
def make_app(loop):
# Set SSH host key policy
policy = paramiko.AutoAddPolicy() if options.policy == 'autoadd' else paramiko.RejectPolicy()
# Define host keys settings
host_keys_settings = {
'system_host_keys': paramiko.util.load_host_keys('/app/ssh/ssh_known_hosts'),
'host_keys': paramiko.HostKeys(),
'host_keys_filename': None
}
template_path=os.path.join(base_dir, '', 'templates'),
static_path=os.path.join(base_dir, '', 'static'),
#print(f"* Path: {base_dir},\n* Template Path: {template_path}, \n* Static Path: {static_path}")
# Create Tornado application
return tornado.web.Application(
[
(r'/', IndexHandler, dict(loop=loop, policy=policy, host_keys_settings=host_keys_settings)),
(r'/ws', WsockHandler, dict(loop=loop)),
(r'/app', AppHandler), # Route for your new page
],
template_path=os.path.join(base_dir, '', 'templates'),
static_path=os.path.join(base_dir, '', 'static'),
xsrf_cookies=options.xsrf,
debug=options.debug,
font=Font(
get_font_filename(options.font, os.path.join(base_dir, *font_dirs)),
font_dirs[1:]
),
websocket_ping_interval=options.wpintvl,
origin_policy=get_origin_setting(options),
origin=options.origin
)
def app_listen(app, port, address, server_settings):
app.listen(port, address, **server_settings)
if not server_settings.get('ssl_options'):
server_type = 'http'
else:
server_type = 'https'
handler.redirecting = True if options.redirect else False
logging.info('Listening on {}:{} ({})'.format(address, port, server_type))
if __name__ == '__main__':
loop = tornado.ioloop.IOLoop.current()
check_encoding_setting(options.encoding)
# Parse command-line options
tornado.options.parse_command_line()
# Create and start the application
app = make_app(loop)
ssl_ctx = get_ssl_context(options)
server_settings = get_server_settings(options)
app_listen(app, options.port, options.address, server_settings)
if ssl_ctx:
server_settings.update(ssl_options=ssl_ctx)
app_listen(app, options.sslport, options.ssladdress, server_settings)
loop.start()