""" Port Configuration and Status API Router Endpoints for port configuration, status querying, and process management. """ import json import os import platform import subprocess import time from pathlib import Path from fastapi import APIRouter, HTTPException from fastapi.responses import JSONResponse from pydantic import BaseModel, Field, field_validator router = APIRouter(prefix="/api/ports", tags=["ports"]) # Config file path _CONFIG_DIR = Path(__file__).parent.parent.parent _PORTS_CONFIG_FILE = _CONFIG_DIR / "ports_config.json" class PortConfig(BaseModel): """Port configuration model.""" fastapi_port: int = Field(default=2048, ge=1024, le=65535) camoufox_debug_port: int = Field(default=9222, ge=1024, le=65535) stream_proxy_port: int = Field(default=3120, ge=0, le=65535) stream_proxy_enabled: bool = True @field_validator("fastapi_port", "camoufox_debug_port") @classmethod def validate_required_port(cls, v: int) -> int: if v < 1024: raise ValueError("Port must be >= 1024") return v class ProcessInfo(BaseModel): """Information about a process.""" pid: int name: str class PortStatus(BaseModel): """Status of a port.""" port: int port_type: str in_use: bool processes: list[ProcessInfo] = [] class KillRequest(BaseModel): """Request to kill a process.""" pid: int = Field(..., ge=1, description="PID of the process to terminate") confirm: bool = Field(default=False, description="Confirm termination") def _load_port_config() -> PortConfig: """Load port config from file or environment.""" # Environment variables take priority config = PortConfig( fastapi_port=int(os.environ.get("DEFAULT_FASTAPI_PORT", "2048")), camoufox_debug_port=int(os.environ.get("DEFAULT_CAMOUFOX_PORT", "9222")), stream_proxy_port=int(os.environ.get("STREAM_PORT", "3120")), stream_proxy_enabled=os.environ.get("STREAM_PORT", "3120") != "0", ) # Override with saved config if exists if _PORTS_CONFIG_FILE.exists(): try: data = json.loads(_PORTS_CONFIG_FILE.read_text(encoding="utf-8")) config = PortConfig(**data) except Exception: pass return config def _save_port_config(config: PortConfig) -> None: """Save port config to file.""" _PORTS_CONFIG_FILE.write_text( json.dumps(config.model_dump(), ensure_ascii=False, indent=2), encoding="utf-8", ) def _find_processes_on_port(port: int) -> list[ProcessInfo]: """Find processes listening on a port.""" processes: list[ProcessInfo] = [] system = platform.system() try: if system in ("Linux", "Darwin"): cmd = f"lsof -ti tcp:{port} -sTCP:LISTEN" result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=5 ) if result.returncode == 0 and result.stdout.strip(): pids = [ int(p) for p in result.stdout.strip().splitlines() if p.isdigit() ] for pid in pids: name = _get_process_name(pid) processes.append(ProcessInfo(pid=pid, name=name)) elif system == "Windows": cmd = "netstat -ano -p TCP" result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=10, creationflags=subprocess.CREATE_NO_WINDOW, # type: ignore[attr-defined] ) if result.returncode == 0: for line in result.stdout.strip().splitlines(): parts = line.split() if ( len(parts) >= 5 and parts[0].upper() == "TCP" and parts[3].upper() == "LISTENING" ): local_addr = parts[1] if local_addr.endswith(f":{port}"): pid_str = parts[4] if pid_str.isdigit(): pid = int(pid_str) name = _get_process_name(pid) processes.append(ProcessInfo(pid=pid, name=name)) except Exception: pass # Deduplicate by PID seen_pids: set[int] = set() unique_processes: list[ProcessInfo] = [] for p in processes: if p.pid not in seen_pids: seen_pids.add(p.pid) unique_processes.append(p) return unique_processes def _get_process_name(pid: int) -> str: """Get process name by PID.""" system = platform.system() try: if system == "Linux": result = subprocess.run( ["ps", "-p", str(pid), "-o", "comm="], capture_output=True, text=True, timeout=3, ) if result.returncode == 0 and result.stdout.strip(): return result.stdout.strip() elif system == "Darwin": result = subprocess.run( ["ps", "-p", str(pid), "-o", "comm="], capture_output=True, text=True, timeout=3, ) if result.returncode == 0 and result.stdout.strip(): return result.stdout.strip() elif system == "Windows": result = subprocess.run( ["tasklist", "/NH", "/FO", "CSV", "/FI", f"PID eq {pid}"], capture_output=True, text=True, timeout=3, creationflags=subprocess.CREATE_NO_WINDOW, # type: ignore[attr-defined] ) if result.returncode == 0 and result.stdout.strip(): parts = result.stdout.strip().split('","') if parts: return parts[0].strip('"') except Exception: pass return "Unknown" def _kill_process(pid: int) -> tuple[bool, str]: """Kill a process by PID. Returns (success, message).""" system = platform.system() try: if system in ("Linux", "Darwin"): # Try SIGTERM first subprocess.run(["kill", "-TERM", str(pid)], capture_output=True, timeout=3) time.sleep(0.5) # Check if still alive check = subprocess.run( ["kill", "-0", str(pid)], capture_output=True, text=True ) if check.returncode != 0: return True, f"Process {pid} terminated (SIGTERM)" # Force kill subprocess.run(["kill", "-KILL", str(pid)], capture_output=True, timeout=3) time.sleep(0.2) # Verify check = subprocess.run( ["kill", "-0", str(pid)], capture_output=True, text=True ) if check.returncode != 0: return True, f"Process {pid} force terminated (SIGKILL)" else: return False, f"Unable to terminate process {pid}" elif system == "Windows": result = subprocess.run( ["taskkill", "/PID", str(pid), "/T", "/F"], capture_output=True, text=True, timeout=5, creationflags=subprocess.CREATE_NO_WINDOW, # type: ignore[attr-defined] ) if result.returncode == 0: return True, f"Process {pid} terminated" else: return False, f"Unable to terminate process {pid}: {result.stderr}" except Exception as e: return False, f"Error terminating process: {e}" return False, "Unsupported OS" @router.get("/config") async def get_port_config() -> JSONResponse: """Get port configuration.""" config = _load_port_config() return JSONResponse(content=config.model_dump()) @router.post("/config") async def update_port_config(config: PortConfig) -> JSONResponse: """ Update port configuration. Note: Changes will take effect on next server restart. """ _save_port_config(config) return JSONResponse( content={ "success": True, "config": config.model_dump(), "message": "Configuration saved. Changes will take effect on next restart.", } ) @router.get("/status") async def get_port_status() -> JSONResponse: """Get port occupation status.""" config = _load_port_config() statuses: list[PortStatus] = [] # Check FastAPI port fastapi_processes = _find_processes_on_port(config.fastapi_port) statuses.append( PortStatus( port=config.fastapi_port, port_type="FastAPI", in_use=len(fastapi_processes) > 0, processes=fastapi_processes, ) ) # Check Camoufox debug port camoufox_processes = _find_processes_on_port(config.camoufox_debug_port) statuses.append( PortStatus( port=config.camoufox_debug_port, port_type="Camoufox Debug", in_use=len(camoufox_processes) > 0, processes=camoufox_processes, ) ) # Check Stream proxy port (if enabled) if config.stream_proxy_enabled and config.stream_proxy_port > 0: stream_processes = _find_processes_on_port(config.stream_proxy_port) statuses.append( PortStatus( port=config.stream_proxy_port, port_type="Stream Proxy", in_use=len(stream_processes) > 0, processes=stream_processes, ) ) return JSONResponse(content={"ports": [s.model_dump() for s in statuses]}) @router.post("/kill") async def kill_process(request: KillRequest) -> JSONResponse: """ Terminate process with specified PID. Security validation: - Requires confirm=true - PID must belong to a process on a configured port """ if not request.confirm: raise HTTPException( status_code=400, detail="Please set confirm=true to confirm process termination", ) # Security: Validate PID belongs to a tracked port config = _load_port_config() tracked_pids: set[int] = set() # Collect PIDs from all configured ports for port in [config.fastapi_port, config.camoufox_debug_port]: for proc in _find_processes_on_port(port): tracked_pids.add(proc.pid) # Also check stream proxy port if enabled if config.stream_proxy_enabled and config.stream_proxy_port > 0: for proc in _find_processes_on_port(config.stream_proxy_port): tracked_pids.add(proc.pid) if request.pid not in tracked_pids: raise HTTPException( status_code=403, detail=f"Security validation failed: PID {request.pid} does not belong to a configured port. Only processes running on FastAPI ({config.fastapi_port}), Camoufox ({config.camoufox_debug_port}) or Stream Proxy ({config.stream_proxy_port}) ports can be terminated.", ) success, message = _kill_process(request.pid) return JSONResponse( content={"success": success, "message": message, "pid": request.pid}, status_code=200 if success else 500, )