Spaces:
Paused
Paused
File size: 3,817 Bytes
a5784e9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | """
Server Control API Router
Provides endpoints for server status and control operations.
"""
import logging
import os
import time
from datetime import datetime
from typing import Optional
from fastapi import APIRouter
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from ..app import VERSION
logger = logging.getLogger("CamoufoxLauncher")
router = APIRouter(prefix="/api/server", tags=["server"])
# Track server start time
_SERVER_START_TIME: Optional[float] = None
def _init_start_time() -> None:
"""Initialize server start time (called once at startup)."""
global _SERVER_START_TIME
if _SERVER_START_TIME is None:
_SERVER_START_TIME = time.time()
_init_start_time()
class ServerStatus(BaseModel):
"""Server status information."""
status: str
uptime_seconds: float
uptime_formatted: str
launch_mode: str
server_port: int
stream_port: int
version: str
python_version: str
started_at: str
class RestartRequest(BaseModel):
"""Restart request with mode."""
mode: str = "headless" # headless, debug, virtual_display
confirm: bool = False
def _format_uptime(seconds: float) -> str:
"""Format uptime in human-readable format."""
days = int(seconds // 86400)
hours = int((seconds % 86400) // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
parts = []
if days > 0:
parts.append(f"{days}d")
if hours > 0:
parts.append(f"{hours}h")
if minutes > 0:
parts.append(f"{minutes}m")
parts.append(f"{secs}s")
return " ".join(parts)
@router.get("/status")
async def get_server_status() -> JSONResponse:
"""Get server status information."""
import sys
uptime = time.time() - (_SERVER_START_TIME or time.time())
started_at = datetime.fromtimestamp(_SERVER_START_TIME or time.time())
status = ServerStatus(
status="running",
uptime_seconds=round(uptime, 2),
uptime_formatted=_format_uptime(uptime),
launch_mode=os.environ.get("LAUNCH_MODE", "unknown"),
server_port=int(
os.environ.get("SERVER_PORT_INFO", os.environ.get("PORT", 2048))
),
stream_port=int(os.environ.get("STREAM_PORT", 3120)),
version=VERSION,
python_version=f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
started_at=started_at.isoformat(),
)
return JSONResponse(content=status.model_dump())
@router.post("/restart")
async def restart_server(request: RestartRequest) -> JSONResponse:
"""
Request server restart.
Note: This operation terminates the current process and requires an external process manager to restart.
"""
if not request.confirm:
return JSONResponse(
content={
"success": False,
"message": "Restart operation requires confirmation. Please set confirm=true",
},
status_code=400,
)
valid_modes = ["headless", "debug", "virtual_display"]
if request.mode not in valid_modes:
return JSONResponse(
content={
"success": False,
"message": f"Invalid launch mode. Valid options: {valid_modes}",
},
status_code=400,
)
logger.info(f"[Server] Received restart request, target mode: {request.mode}")
# Set environment variable for next launch
os.environ["REQUESTED_RESTART_MODE"] = request.mode
# Return success - actual restart needs to be handled by process manager
return JSONResponse(
content={
"success": True,
"message": f"Server will restart in {request.mode} mode. Please refresh the page.",
"mode": request.mode,
}
)
|