| | |
| | import argparse |
| | from flask import Flask, render_template |
| | from flask_socketio import SocketIO |
| | import pty |
| | import os |
| | import subprocess |
| | import select |
| | import termios |
| | import struct |
| | import fcntl |
| | import shlex |
| | import logging |
| | import sys |
| |
|
| | logging.getLogger("werkzeug").setLevel(logging.ERROR) |
| |
|
| | __version__ = "0.5.0.2" |
| |
|
| | app = Flask(__name__, template_folder=".", static_folder=".", static_url_path="") |
| | app.config["SECRET_KEY"] = "secret!" |
| | app.config["fd"] = None |
| | app.config["child_pid"] = None |
| | socketio = SocketIO(app) |
| |
|
| |
|
| | def set_winsize(fd, row, col, xpix=0, ypix=0): |
| | logging.debug("setting window size with termios") |
| | winsize = struct.pack("HHHH", row, col, xpix, ypix) |
| | fcntl.ioctl(fd, termios.TIOCSWINSZ, winsize) |
| |
|
| |
|
| | def read_and_forward_pty_output(): |
| | max_read_bytes = 1024 * 20 |
| | while True: |
| | socketio.sleep(0.01) |
| | if app.config["fd"]: |
| | timeout_sec = 0 |
| | (data_ready, _, _) = select.select([app.config["fd"]], [], [], timeout_sec) |
| | if data_ready: |
| | output = os.read(app.config["fd"], max_read_bytes).decode( |
| | errors="ignore" |
| | ) |
| | socketio.emit("pty-output", {"output": output}, namespace="/pty") |
| |
|
| |
|
| | @app.route("/") |
| | def index(): |
| | return render_template("index.html") |
| |
|
| |
|
| | @socketio.on("pty-input", namespace="/pty") |
| | def pty_input(data): |
| | """write to the child pty. The pty sees this as if you are typing in a real |
| | terminal. |
| | """ |
| | if app.config["fd"]: |
| | logging.debug("received input from browser: %s" % data["input"]) |
| | os.write(app.config["fd"], data["input"].encode()) |
| |
|
| |
|
| | @socketio.on("resize", namespace="/pty") |
| | def resize(data): |
| | if app.config["fd"]: |
| | logging.debug(f"Resizing window to {data['rows']}x{data['cols']}") |
| | set_winsize(app.config["fd"], data["rows"], data["cols"]) |
| |
|
| |
|
| | @socketio.on("connect", namespace="/pty") |
| | def connect(): |
| | """new client connected""" |
| | logging.info("new client connected") |
| | if app.config["child_pid"]: |
| | |
| | return |
| |
|
| | |
| | (child_pid, fd) = pty.fork() |
| | if child_pid == 0: |
| | |
| | |
| | |
| | subprocess.run(app.config["cmd"]) |
| | else: |
| | |
| | |
| | app.config["fd"] = fd |
| | app.config["child_pid"] = child_pid |
| | set_winsize(fd, 50, 50) |
| | cmd = " ".join(shlex.quote(c) for c in app.config["cmd"]) |
| | |
| | |
| | socketio.start_background_task(target=read_and_forward_pty_output) |
| |
|
| | logging.info("child pid is " + child_pid) |
| | logging.info( |
| | f"starting background task with command `{cmd}` to continously read " |
| | "and forward pty output to client" |
| | ) |
| | logging.info("task started") |
| |
|
| |
|
| | def main(): |
| | parser = argparse.ArgumentParser( |
| | description=( |
| | "A fully functional terminal in your browser. " |
| | "https://github.com/cs01/pyxterm.js" |
| | ), |
| | formatter_class=argparse.ArgumentDefaultsHelpFormatter, |
| | ) |
| | parser.add_argument( |
| | "-p", "--port", default=5000, help="port to run server on", type=int |
| | ) |
| | parser.add_argument( |
| | "--host", |
| | default="127.0.0.1", |
| | help="host to run server on (use 0.0.0.0 to allow access from other hosts)", |
| | ) |
| | parser.add_argument("--debug", action="store_true", help="debug the server") |
| | parser.add_argument("--version", action="store_true", help="print version and exit") |
| | parser.add_argument( |
| | "--command", default="bash", help="Command to run in the terminal" |
| | ) |
| | parser.add_argument( |
| | "--cmd-args", |
| | default="", |
| | help="arguments to pass to command (i.e. --cmd-args='arg1 arg2 --flag')", |
| | ) |
| | args = parser.parse_args() |
| | if args.version: |
| | print(__version__) |
| | exit(0) |
| | app.config["cmd"] = [args.command] + shlex.split(args.cmd_args) |
| | green = "\033[92m" |
| | end = "\033[0m" |
| | log_format = ( |
| | green |
| | + "pyxtermjs > " |
| | + end |
| | + "%(levelname)s (%(funcName)s:%(lineno)s) %(message)s" |
| | ) |
| | logging.basicConfig( |
| | format=log_format, |
| | stream=sys.stdout, |
| | level=logging.DEBUG if args.debug else logging.INFO, |
| | ) |
| | logging.info(f"serving on http://{args.host}:{args.port}") |
| | socketio.run(app, debug=args.debug, port=args.port, host=args.host) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|