cc3m / routers /terminal.py
kokokoasd's picture
Upload 21 files
36ce73b verified
"""
Terminal WebSocket with persistent PTY sessions.
Single Responsibility: only handles PTY lifecycle and WebSocket communication.
Depends on storage.get_zone_path for path resolution (Dependency Inversion).
"""
import asyncio
import collections
import fcntl
import json
import os
import pty
import select
import struct
import termios
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from config import SCROLLBACK_SIZE
from storage import get_zone_path, check_zone_owner
from auth import get_ws_user
router = APIRouter(tags=["terminal"])
# Active terminals: {zone_name: {fd, pid, buffer, buffer_size, bg_task, ws}}
active_terminals: dict[str, dict] = {}
# ── PTY Management ────────────────────────────
def _spawn_shell(zone_name: str) -> dict:
"""Spawn a new PTY shell for a zone."""
zone_path = get_zone_path(zone_name)
master_fd, slave_fd = pty.openpty()
child_pid = os.fork()
if child_pid == 0:
os.setsid()
os.dup2(slave_fd, 0)
os.dup2(slave_fd, 1)
os.dup2(slave_fd, 2)
os.close(master_fd)
os.close(slave_fd)
os.chdir(str(zone_path))
env = os.environ.copy()
env["TERM"] = "xterm-256color"
env["HOME"] = str(zone_path)
env["PS1"] = f"[{zone_name}] \\w $ "
os.execvpe("/bin/bash", ["/bin/bash", "--norc"], env)
else:
os.close(slave_fd)
flag = fcntl.fcntl(master_fd, fcntl.F_GETFL)
fcntl.fcntl(master_fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
return {"fd": master_fd, "pid": child_pid, "buffer": collections.deque(), "buffer_size": 0}
def _resize_terminal(zone_name: str, rows: int, cols: int):
if zone_name in active_terminals:
fd = active_terminals[zone_name]["fd"]
winsize = struct.pack("HHHH", rows, cols, 0, 0)
fcntl.ioctl(fd, termios.TIOCSWINSZ, winsize)
def _append_buffer(info: dict, data: bytes):
info["buffer"].append(data)
info["buffer_size"] += len(data)
while info["buffer_size"] > SCROLLBACK_SIZE:
old = info["buffer"].popleft()
info["buffer_size"] -= len(old)
def _get_buffer(info: dict) -> bytes:
return b"".join(info["buffer"])
def _is_alive(zone_name: str) -> bool:
if zone_name not in active_terminals:
return False
try:
pid = active_terminals[zone_name]["pid"]
return os.waitpid(pid, os.WNOHANG) == (0, 0)
except ChildProcessError:
active_terminals.pop(zone_name, None)
return False
async def _bg_reader(zone_name: str):
"""Background: continuously read PTY output into the ring buffer."""
info = active_terminals.get(zone_name)
if not info:
return
fd = info["fd"]
while _is_alive(zone_name):
await asyncio.sleep(0.02)
try:
r, _, _ = select.select([fd], [], [], 0)
if r:
data = os.read(fd, 4096)
if data:
_append_buffer(info, data)
ws = info.get("ws")
if ws:
try:
await ws.send_bytes(data)
except Exception:
info["ws"] = None
except (OSError, BlockingIOError):
pass
except Exception:
break
def kill_terminal(zone_name: str):
"""Kill terminal process for a zone."""
if zone_name in active_terminals:
info = active_terminals.pop(zone_name)
bg = info.get("bg_task")
if bg:
bg.cancel()
try:
os.kill(info["pid"], 9)
os.waitpid(info["pid"], os.WNOHANG)
except (ProcessLookupError, ChildProcessError):
pass
try:
os.close(info["fd"])
except OSError:
pass
# ── WebSocket Handler ─────────────────────────
@router.websocket("/ws/terminal/{zone_name}")
async def terminal_ws(websocket: WebSocket, zone_name: str):
# Authenticate via query parameter
user = get_ws_user(websocket)
if not user:
await websocket.close(code=4001, reason="Chưa đăng nhập")
return
# Check zone ownership
try:
check_zone_owner(zone_name, user.sub, user.role)
except ValueError as e:
await websocket.close(code=4003, reason=str(e))
return
await websocket.accept()
try:
get_zone_path(zone_name)
except ValueError as e:
await websocket.send_json({"error": str(e)})
await websocket.close()
return
# Spawn or reuse terminal
if not _is_alive(zone_name):
kill_terminal(zone_name)
try:
info = _spawn_shell(zone_name)
info["ws"] = None
active_terminals[zone_name] = info
info["bg_task"] = asyncio.create_task(_bg_reader(zone_name))
except Exception as e:
await websocket.send_json({"error": f"Cannot create terminal: {e}"})
await websocket.close()
return
info = active_terminals[zone_name]
fd = info["fd"]
# Replay buffered scrollback
buf = _get_buffer(info)
if buf:
await websocket.send_bytes(buf)
# Register this WebSocket as the active receiver
info["ws"] = websocket
try:
while True:
msg = await websocket.receive()
if msg.get("type") == "websocket.disconnect":
break
if "text" in msg:
data = json.loads(msg["text"])
if data.get("type") == "resize":
_resize_terminal(zone_name, data.get("rows", 24), data.get("cols", 80))
elif data.get("type") == "input":
os.write(fd, data["data"].encode("utf-8"))
elif "bytes" in msg:
os.write(fd, msg["bytes"])
except WebSocketDisconnect:
pass
except Exception:
pass
finally:
if zone_name in active_terminals and active_terminals[zone_name].get("ws") is websocket:
active_terminals[zone_name]["ws"] = None