Spaces:
Paused
Paused
| """ | |
| Port Configuration and Status API Router | |
| Endpoints for port configuration, status querying, and process management. | |
| """ | |
| import json | |
| import os | |
| import platform | |
| import subprocess | |
| import time | |
| from pathlib import Path | |
| from fastapi import APIRouter, HTTPException | |
| from fastapi.responses import JSONResponse | |
| from pydantic import BaseModel, Field, field_validator | |
| router = APIRouter(prefix="/api/ports", tags=["ports"]) | |
| # Config file path | |
| _CONFIG_DIR = Path(__file__).parent.parent.parent | |
| _PORTS_CONFIG_FILE = _CONFIG_DIR / "ports_config.json" | |
| class PortConfig(BaseModel): | |
| """Port configuration model.""" | |
| fastapi_port: int = Field(default=2048, ge=1024, le=65535) | |
| camoufox_debug_port: int = Field(default=9222, ge=1024, le=65535) | |
| stream_proxy_port: int = Field(default=3120, ge=0, le=65535) | |
| stream_proxy_enabled: bool = True | |
| def validate_required_port(cls, v: int) -> int: | |
| if v < 1024: | |
| raise ValueError("Port must be >= 1024") | |
| return v | |
| class ProcessInfo(BaseModel): | |
| """Information about a process.""" | |
| pid: int | |
| name: str | |
| class PortStatus(BaseModel): | |
| """Status of a port.""" | |
| port: int | |
| port_type: str | |
| in_use: bool | |
| processes: list[ProcessInfo] = [] | |
| class KillRequest(BaseModel): | |
| """Request to kill a process.""" | |
| pid: int = Field(..., ge=1, description="PID of the process to terminate") | |
| confirm: bool = Field(default=False, description="Confirm termination") | |
| def _load_port_config() -> PortConfig: | |
| """Load port config from file or environment.""" | |
| # Environment variables take priority | |
| config = PortConfig( | |
| fastapi_port=int(os.environ.get("DEFAULT_FASTAPI_PORT", "2048")), | |
| camoufox_debug_port=int(os.environ.get("DEFAULT_CAMOUFOX_PORT", "9222")), | |
| stream_proxy_port=int(os.environ.get("STREAM_PORT", "3120")), | |
| stream_proxy_enabled=os.environ.get("STREAM_PORT", "3120") != "0", | |
| ) | |
| # Override with saved config if exists | |
| if _PORTS_CONFIG_FILE.exists(): | |
| try: | |
| data = json.loads(_PORTS_CONFIG_FILE.read_text(encoding="utf-8")) | |
| config = PortConfig(**data) | |
| except Exception: | |
| pass | |
| return config | |
| def _save_port_config(config: PortConfig) -> None: | |
| """Save port config to file.""" | |
| _PORTS_CONFIG_FILE.write_text( | |
| json.dumps(config.model_dump(), ensure_ascii=False, indent=2), | |
| encoding="utf-8", | |
| ) | |
| def _find_processes_on_port(port: int) -> list[ProcessInfo]: | |
| """Find processes listening on a port.""" | |
| processes: list[ProcessInfo] = [] | |
| system = platform.system() | |
| try: | |
| if system in ("Linux", "Darwin"): | |
| cmd = f"lsof -ti tcp:{port} -sTCP:LISTEN" | |
| result = subprocess.run( | |
| cmd, shell=True, capture_output=True, text=True, timeout=5 | |
| ) | |
| if result.returncode == 0 and result.stdout.strip(): | |
| pids = [ | |
| int(p) for p in result.stdout.strip().splitlines() if p.isdigit() | |
| ] | |
| for pid in pids: | |
| name = _get_process_name(pid) | |
| processes.append(ProcessInfo(pid=pid, name=name)) | |
| elif system == "Windows": | |
| cmd = "netstat -ano -p TCP" | |
| result = subprocess.run( | |
| cmd, | |
| shell=True, | |
| capture_output=True, | |
| text=True, | |
| timeout=10, | |
| creationflags=subprocess.CREATE_NO_WINDOW, # type: ignore[attr-defined] | |
| ) | |
| if result.returncode == 0: | |
| for line in result.stdout.strip().splitlines(): | |
| parts = line.split() | |
| if ( | |
| len(parts) >= 5 | |
| and parts[0].upper() == "TCP" | |
| and parts[3].upper() == "LISTENING" | |
| ): | |
| local_addr = parts[1] | |
| if local_addr.endswith(f":{port}"): | |
| pid_str = parts[4] | |
| if pid_str.isdigit(): | |
| pid = int(pid_str) | |
| name = _get_process_name(pid) | |
| processes.append(ProcessInfo(pid=pid, name=name)) | |
| except Exception: | |
| pass | |
| # Deduplicate by PID | |
| seen_pids: set[int] = set() | |
| unique_processes: list[ProcessInfo] = [] | |
| for p in processes: | |
| if p.pid not in seen_pids: | |
| seen_pids.add(p.pid) | |
| unique_processes.append(p) | |
| return unique_processes | |
| def _get_process_name(pid: int) -> str: | |
| """Get process name by PID.""" | |
| system = platform.system() | |
| try: | |
| if system == "Linux": | |
| result = subprocess.run( | |
| ["ps", "-p", str(pid), "-o", "comm="], | |
| capture_output=True, | |
| text=True, | |
| timeout=3, | |
| ) | |
| if result.returncode == 0 and result.stdout.strip(): | |
| return result.stdout.strip() | |
| elif system == "Darwin": | |
| result = subprocess.run( | |
| ["ps", "-p", str(pid), "-o", "comm="], | |
| capture_output=True, | |
| text=True, | |
| timeout=3, | |
| ) | |
| if result.returncode == 0 and result.stdout.strip(): | |
| return result.stdout.strip() | |
| elif system == "Windows": | |
| result = subprocess.run( | |
| ["tasklist", "/NH", "/FO", "CSV", "/FI", f"PID eq {pid}"], | |
| capture_output=True, | |
| text=True, | |
| timeout=3, | |
| creationflags=subprocess.CREATE_NO_WINDOW, # type: ignore[attr-defined] | |
| ) | |
| if result.returncode == 0 and result.stdout.strip(): | |
| parts = result.stdout.strip().split('","') | |
| if parts: | |
| return parts[0].strip('"') | |
| except Exception: | |
| pass | |
| return "Unknown" | |
| def _kill_process(pid: int) -> tuple[bool, str]: | |
| """Kill a process by PID. Returns (success, message).""" | |
| system = platform.system() | |
| try: | |
| if system in ("Linux", "Darwin"): | |
| # Try SIGTERM first | |
| subprocess.run(["kill", "-TERM", str(pid)], capture_output=True, timeout=3) | |
| time.sleep(0.5) | |
| # Check if still alive | |
| check = subprocess.run( | |
| ["kill", "-0", str(pid)], capture_output=True, text=True | |
| ) | |
| if check.returncode != 0: | |
| return True, f"Process {pid} terminated (SIGTERM)" | |
| # Force kill | |
| subprocess.run(["kill", "-KILL", str(pid)], capture_output=True, timeout=3) | |
| time.sleep(0.2) | |
| # Verify | |
| check = subprocess.run( | |
| ["kill", "-0", str(pid)], capture_output=True, text=True | |
| ) | |
| if check.returncode != 0: | |
| return True, f"Process {pid} force terminated (SIGKILL)" | |
| else: | |
| return False, f"Unable to terminate process {pid}" | |
| elif system == "Windows": | |
| result = subprocess.run( | |
| ["taskkill", "/PID", str(pid), "/T", "/F"], | |
| capture_output=True, | |
| text=True, | |
| timeout=5, | |
| creationflags=subprocess.CREATE_NO_WINDOW, # type: ignore[attr-defined] | |
| ) | |
| if result.returncode == 0: | |
| return True, f"Process {pid} terminated" | |
| else: | |
| return False, f"Unable to terminate process {pid}: {result.stderr}" | |
| except Exception as e: | |
| return False, f"Error terminating process: {e}" | |
| return False, "Unsupported OS" | |
| async def get_port_config() -> JSONResponse: | |
| """Get port configuration.""" | |
| config = _load_port_config() | |
| return JSONResponse(content=config.model_dump()) | |
| async def update_port_config(config: PortConfig) -> JSONResponse: | |
| """ | |
| Update port configuration. | |
| Note: Changes will take effect on next server restart. | |
| """ | |
| _save_port_config(config) | |
| return JSONResponse( | |
| content={ | |
| "success": True, | |
| "config": config.model_dump(), | |
| "message": "Configuration saved. Changes will take effect on next restart.", | |
| } | |
| ) | |
| async def get_port_status() -> JSONResponse: | |
| """Get port occupation status.""" | |
| config = _load_port_config() | |
| statuses: list[PortStatus] = [] | |
| # Check FastAPI port | |
| fastapi_processes = _find_processes_on_port(config.fastapi_port) | |
| statuses.append( | |
| PortStatus( | |
| port=config.fastapi_port, | |
| port_type="FastAPI", | |
| in_use=len(fastapi_processes) > 0, | |
| processes=fastapi_processes, | |
| ) | |
| ) | |
| # Check Camoufox debug port | |
| camoufox_processes = _find_processes_on_port(config.camoufox_debug_port) | |
| statuses.append( | |
| PortStatus( | |
| port=config.camoufox_debug_port, | |
| port_type="Camoufox Debug", | |
| in_use=len(camoufox_processes) > 0, | |
| processes=camoufox_processes, | |
| ) | |
| ) | |
| # Check Stream proxy port (if enabled) | |
| if config.stream_proxy_enabled and config.stream_proxy_port > 0: | |
| stream_processes = _find_processes_on_port(config.stream_proxy_port) | |
| statuses.append( | |
| PortStatus( | |
| port=config.stream_proxy_port, | |
| port_type="Stream Proxy", | |
| in_use=len(stream_processes) > 0, | |
| processes=stream_processes, | |
| ) | |
| ) | |
| return JSONResponse(content={"ports": [s.model_dump() for s in statuses]}) | |
| async def kill_process(request: KillRequest) -> JSONResponse: | |
| """ | |
| Terminate process with specified PID. | |
| Security validation: | |
| - Requires confirm=true | |
| - PID must belong to a process on a configured port | |
| """ | |
| if not request.confirm: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Please set confirm=true to confirm process termination", | |
| ) | |
| # Security: Validate PID belongs to a tracked port | |
| config = _load_port_config() | |
| tracked_pids: set[int] = set() | |
| # Collect PIDs from all configured ports | |
| for port in [config.fastapi_port, config.camoufox_debug_port]: | |
| for proc in _find_processes_on_port(port): | |
| tracked_pids.add(proc.pid) | |
| # Also check stream proxy port if enabled | |
| if config.stream_proxy_enabled and config.stream_proxy_port > 0: | |
| for proc in _find_processes_on_port(config.stream_proxy_port): | |
| tracked_pids.add(proc.pid) | |
| if request.pid not in tracked_pids: | |
| raise HTTPException( | |
| status_code=403, | |
| detail=f"Security validation failed: PID {request.pid} does not belong to a configured port. Only processes running on FastAPI ({config.fastapi_port}), Camoufox ({config.camoufox_debug_port}) or Stream Proxy ({config.stream_proxy_port}) ports can be terminated.", | |
| ) | |
| success, message = _kill_process(request.pid) | |
| return JSONResponse( | |
| content={"success": success, "message": message, "pid": request.pid}, | |
| status_code=200 if success else 500, | |
| ) | |