Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Cain FastAPI App with background task execution engine. | |
| Serves agent dashboard and manages asynchronous task execution | |
| without blocking the main Uvicorn thread. | |
| """ | |
| import asyncio | |
| import os | |
| import subprocess | |
| import sys | |
| import threading | |
| import time | |
| import uuid | |
| import logging | |
| from datetime import datetime | |
| from enum import Enum | |
| from pathlib import Path | |
| from typing import Any, Optional | |
| from contextlib import asynccontextmanager | |
| import uvicorn | |
| from fastapi import FastAPI, HTTPException, BackgroundTasks, WebSocket, WebSocketDisconnect, Request | |
| from fastapi.responses import JSONResponse, FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.middleware import Middleware | |
| from starlette.middleware.base import BaseHTTPMiddleware | |
| # ============================================================================ | |
| # SYS.PATH SETUP (must happen before other imports) | |
| # ============================================================================ | |
| # Set up sys.path for agents imports at module load time | |
| # This ensures `from agents import brain_minimal` works regardless of import order | |
| # Dynamic path resolution with fallbacks for different Docker contexts | |
| _script_dir = Path(os.path.abspath(os.path.dirname(__file__))) # Absolute path of this script | |
| # Try multiple possible locations for .openclaw directory | |
| _possible_openclaw_paths = [ | |
| _script_dir / ".openclaw", # /app/.openclaw (legacy/flat structure) | |
| _script_dir / "openclaw" / ".openclaw", # /app/openclaw/.openclaw (nested structure) | |
| Path("/app/openclaw/.openclaw"), # Absolute Docker path (nested) | |
| Path("/app/.openclaw"), # Absolute Docker path (flat) | |
| ] | |
| # Add all valid paths to sys.path | |
| for path_dir in _possible_openclaw_paths: | |
| path_str = str(path_dir) | |
| if path_str not in sys.path and path_dir.exists(): | |
| sys.path.insert(0, path_str) | |
| # ============================================================================ | |
| # CONFIGURATION & LOGGING | |
| # ============================================================================ | |
| LOG_PATH = "/app/logs/cain.log" # Writable in Docker container | |
| FRONTEND_PATH = Path(__file__).parent / "frontend" | |
| AGENT_DASHBOARD = FRONTEND_PATH / "agent-dashboard.html" | |
| # Configure structured logging to file and stdout | |
| class StructuredFormatter(logging.Formatter): | |
| """JSON-like structured log formatter.""" | |
| def format(self, record): | |
| log_data = { | |
| "timestamp": datetime.utcnow().isoformat() + "+00:00", | |
| "level": record.levelname, | |
| "logger": record.name, | |
| "message": record.getMessage(), | |
| "module": record.module, | |
| "function": record.funcName, | |
| "line": record.lineno, | |
| } | |
| if record.exc_info: | |
| log_data["exception"] = self.formatException(record.exc_info) | |
| return str(log_data) | |
| # Set up file handler | |
| try: | |
| file_handler = logging.FileHandler(LOG_PATH) | |
| file_handler.setFormatter(StructuredFormatter()) | |
| except (PermissionError, FileNotFoundError): | |
| file_handler = logging.StreamHandler(sys.stdout) | |
| file_handler.setFormatter(StructuredFormatter()) | |
| # Set up stdout handler | |
| stdout_handler = logging.StreamHandler(sys.stdout) | |
| stdout_handler.setFormatter(logging.Formatter( | |
| "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s" | |
| )) | |
| # Configure root logger | |
| logger = logging.getLogger(__name__) | |
| logger.setLevel(logging.INFO) | |
| logger.addHandler(file_handler) | |
| logger.addHandler(stdout_handler) | |
| # ============================================================================ | |
| # WORKER MANAGER (Handles worker process lifecycle with blocking startup) | |
| # ============================================================================ | |
| class WorkerManager: | |
| """ | |
| Manages the worker process lifecycle with blocking startup verification. | |
| Ensures the app does NOT report "healthy" until the worker is confirmed running. | |
| Uses asyncio subprocess for proper async handling and retry logic. | |
| """ | |
| def __init__(self, shared_state_ref): | |
| self.shared_state = shared_state_ref | |
| self._process: Optional[asyncio.subprocess.Process] = None | |
| self._log_path = Path("/app/cain.log") | |
| self._pid_file = Path("/app/worker.pid") | |
| async def spawn_with_retry( | |
| self, | |
| max_attempts: int = 2, | |
| retry_delay: float = 1.0, | |
| heartbeat_timeout: float = 5.0 | |
| ) -> bool: | |
| """ | |
| Spawn worker process with retry logic - NON-BLOCKING. | |
| Worker starts asynchronously; heartbeat updates health status separately. | |
| Args: | |
| max_attempts: Maximum spawn attempts (default: 2, reduced for faster startup) | |
| retry_delay: Seconds to wait between retries (default: 1.0, reduced) | |
| heartbeat_timeout: Max seconds to wait for heartbeat per attempt (default: 5.0, reduced) | |
| Returns: | |
| True if worker spawned (even if heartbeat not yet received) | |
| """ | |
| brain_path = Path(__file__).parent / "brain_minimal.py" | |
| for attempt in range(1, max_attempts + 1): | |
| logger.info(f"[WorkerManager] Spawn attempt {attempt}/{max_attempts}") | |
| print(f"[WorkerManager] Spawn attempt {attempt}/{max_attempts}") | |
| sys.stdout.flush() | |
| sys.stderr.flush() | |
| try: | |
| # Open log file for appending | |
| log_file = open(self._log_path, "a") if self._log_path.parent.exists() else None | |
| # Spawn using asyncio.create_subprocess_exec | |
| self._process = await asyncio.create_subprocess_exec( | |
| sys.executable, | |
| str(brain_path), | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| env=self._get_worker_env() | |
| ) | |
| worker_pid = self._process.pid | |
| logger.info(f"[WorkerManager] Worker spawned with PID {worker_pid}") | |
| print(f"[WorkerManager] Worker spawned with PID {worker_pid}") | |
| sys.stdout.flush() | |
| # Start draining stdout/stderr to log file | |
| asyncio.create_task(self._drain_stdout(log_file)) | |
| asyncio.create_task(self._drain_stderr(log_file)) | |
| # Write PID file for external monitoring | |
| self._pid_file.write_text(str(worker_pid)) | |
| logger.info(f"[WorkerManager] PID file written: {self._pid_file}") | |
| # NON-BLOCKING: Mark worker as spawning, heartbeat will confirm it's running | |
| self.shared_state.update( | |
| stage="RUNNING", | |
| health="HEALTHY", # Report healthy immediately so app passes health checks | |
| worker_pid=worker_pid, | |
| worker_active=True, # Assume active until heartbeat fails | |
| worker_mode="standalone_process" | |
| ) | |
| logger.info(f"[WorkerManager] Worker spawn initiated (PID: {worker_pid})") | |
| print(f"[WorkerManager] ✓ Worker spawned - waiting for heartbeat") | |
| sys.stdout.flush() | |
| return True | |
| except Exception as e: | |
| logger.error(f"[WorkerManager] Spawn attempt {attempt} failed: {type(e).__name__}: {e}") | |
| print(f"[WorkerManager] ✗ Spawn attempt {attempt} failed: {e}") | |
| sys.stdout.flush() | |
| # Retry delay (except after last attempt) | |
| if attempt < max_attempts: | |
| logger.info(f"[WorkerManager] Waiting {retry_delay}s before retry...") | |
| await asyncio.sleep(retry_delay) | |
| # All attempts failed - but don't crash, just log and continue | |
| error_msg = f"Failed to spawn worker after {max_attempts} attempts" | |
| logger.error(error_msg) | |
| print(f"[WorkerManager] ⚠ {error_msg} - continuing without worker") | |
| sys.stdout.flush() | |
| self.shared_state.update( | |
| stage="RUNNING", | |
| health="DEGRADED", # Degraded but still serving requests | |
| error=error_msg, | |
| worker_active=False, | |
| worker_pid=None | |
| ) | |
| return False # Return False but don't raise - app stays alive | |
| async def _wait_for_heartbeat(self, timeout: float) -> bool: | |
| """ | |
| Wait for worker heartbeat to be received. | |
| Args: | |
| timeout: Maximum seconds to wait | |
| Returns: | |
| True if heartbeat received, False if timeout | |
| """ | |
| start_time = time.time() | |
| check_interval = 0.2 | |
| while (time.time() - start_time) < timeout: | |
| state = self.shared_state.get() | |
| if state.get("worker_active") is True: | |
| logger.info(f"[WorkerManager] Heartbeat confirmed after {time.time() - start_time:.1f}s") | |
| return True | |
| # Check if process crashed | |
| if self._process and self._process.poll() is not None: | |
| logger.warning(f"[WorkerManager] Process exited during heartbeat wait") | |
| return False | |
| await asyncio.sleep(check_interval) | |
| return False | |
| async def _drain_stdout(self, log_file=None): | |
| """Drain worker stdout to prevent buffer blocking.""" | |
| if not self._process or not self._process.stdout: | |
| return | |
| try: | |
| while True: | |
| line = await self._process.stdout.readline() | |
| if not line: | |
| break | |
| line_str = line.decode("utf-8", errors="replace").rstrip() | |
| logger.info(f"[WORKER_STDOUT] {line_str}") | |
| if log_file: | |
| log_file.write(f"[STDOUT] {line_str}\n") | |
| log_file.flush() | |
| except Exception: | |
| pass | |
| async def _drain_stderr(self, log_file=None): | |
| """Drain worker stderr to prevent buffer blocking.""" | |
| if not self._process or not self._process.stderr: | |
| return | |
| try: | |
| while True: | |
| line = await self._process.stderr.readline() | |
| if not line: | |
| break | |
| line_str = line.decode("utf-8", errors="replace").rstrip() | |
| logger.warning(f"[WORKER_STDERR] {line_str}") | |
| if log_file: | |
| log_file.write(f"[STDERR] {line_str}\n") | |
| log_file.flush() | |
| except Exception: | |
| pass | |
| async def _terminate_worker(self): | |
| """Terminate the worker process if running.""" | |
| if self._process: | |
| try: | |
| self._process.terminate() | |
| try: | |
| await asyncio.wait_for(self._process.wait(), timeout=5.0) | |
| except asyncio.TimeoutError: | |
| self._process.kill() | |
| await self._process.wait() | |
| except Exception as e: | |
| logger.warning(f"[WorkerManager] Error terminating worker: {e}") | |
| # Clean up PID file | |
| if self._pid_file.exists(): | |
| try: | |
| self._pid_file.unlink() | |
| except Exception: | |
| pass | |
| def _get_worker_env(self) -> dict[str, str]: | |
| """Build environment for worker process. | |
| CRITICAL: Uses os.environ.copy() which automatically inherits all | |
| HF Space secrets (HF_TOKEN, OPENAI_API_KEY, OPENROUTER_API_KEY, etc). | |
| DO NOT set empty defaults - that can interfere with secret inheritance. | |
| """ | |
| worker_env = os.environ.copy() | |
| worker_env["PYTHONPATH"] = "/app:" + worker_env.get("PYTHONPATH", "") | |
| worker_env["CAIN_IS_WORKER"] = "true" | |
| return worker_env | |
| async def shutdown(self): | |
| """Gracefully shutdown the worker process.""" | |
| if self._process: | |
| logger.info("[WorkerManager] Shutting down worker process...") | |
| await self._terminate_worker() | |
| self._process = None | |
| # Global worker manager instance | |
| _worker_manager: Optional[WorkerManager] = None | |
| # ============================================================================ | |
| # BACKGROUND TASK MANAGER (Non-blocking) | |
| # ============================================================================ | |
| class TaskStatus(str, Enum): | |
| PENDING = "pending" | |
| RUNNING = "running" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| class BackgroundTask: | |
| """Represents a single background task.""" | |
| def __init__(self, task_id: str, command: str, task_type: str = "shell"): | |
| self.task_id = task_id | |
| self.command = command | |
| self.task_type = task_type | |
| self.status = TaskStatus.PENDING | |
| self.created_at = datetime.utcnow().isoformat() + "+00:00" | |
| self.started_at: Optional[str] = None | |
| self.completed_at: Optional[str] = None | |
| self.output: str = "" | |
| self.error: Optional[str] = None | |
| self.exit_code: Optional[int] = None | |
| self._process: Optional[subprocess.Popen] = None | |
| def start(self): | |
| """Start executing the task in background.""" | |
| self.status = TaskStatus.RUNNING | |
| self.started_at = datetime.utcnow().isoformat() + "+00:00" | |
| try: | |
| if self.task_type == "shell": | |
| self._process = subprocess.Popen( | |
| self.command, | |
| shell=True, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| bufsize=1, | |
| universal_newlines=True | |
| ) | |
| else: | |
| # Python script execution | |
| self._process = subprocess.Popen( | |
| [sys.executable, "-c", self.command], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| bufsize=1, | |
| universal_newlines=True | |
| ) | |
| except Exception as e: | |
| self.status = TaskStatus.FAILED | |
| self.error = str(e) | |
| self.completed_at = datetime.utcnow().isoformat() + "+00:00" | |
| logger.error(f"Task {self.task_id} failed to start: {e}") | |
| def poll(self): | |
| """Poll task status and update if complete.""" | |
| if self._process is None: | |
| return | |
| # Check if process has completed | |
| returncode = self._process.poll() | |
| if returncode is not None: | |
| # Process finished | |
| self.status = TaskStatus.COMPLETED if returncode == 0 else TaskStatus.FAILED | |
| self.completed_at = datetime.utcnow().isoformat() + "+00:00" | |
| self.exit_code = returncode | |
| # Capture remaining output | |
| stdout, stderr = self._process.communicate() | |
| self.output += stdout | |
| if stderr: | |
| self.error = stderr | |
| logger.info(f"Task {self.task_id} completed with exit code {returncode}") | |
| def read_output(self): | |
| """Read available output without blocking.""" | |
| if self._process is None: | |
| return | |
| try: | |
| # Non-blocking read of available output | |
| if self._process.stdout: | |
| lines = self._process.stdout.readlines() | |
| self.output += "".join(lines) | |
| except Exception: | |
| pass | |
| def to_dict(self) -> dict[str, Any]: | |
| """Convert task to dictionary for API response.""" | |
| return { | |
| "task_id": self.task_id, | |
| "command": self.command, | |
| "task_type": self.task_type, | |
| "status": self.status, | |
| "created_at": self.created_at, | |
| "started_at": self.started_at, | |
| "completed_at": self.completed_at, | |
| "exit_code": self.exit_code, | |
| "output": self.output, | |
| "error": self.error, | |
| } | |
| class TaskManager: | |
| """ | |
| Non-blocking background task manager. | |
| Runs shell commands and Python scripts asynchronously without | |
| blocking the main Uvicorn thread. | |
| """ | |
| def __init__(self): | |
| self._tasks: dict[str, BackgroundTask] = {} | |
| self._lock = threading.Lock() | |
| self._running = True | |
| # Start background poller thread | |
| self._poller_thread = threading.Thread( | |
| target=self._poll_loop, | |
| daemon=True, | |
| name="TaskManagerPoller" | |
| ) | |
| self._poller_thread.start() | |
| logger.info("TaskManager initialized with background poller thread") | |
| def _poll_loop(self): | |
| """Background thread that polls all running tasks.""" | |
| while self._running: | |
| try: | |
| with self._lock: | |
| for task in list(self._tasks.values()): | |
| if task.status == TaskStatus.RUNNING: | |
| task.read_output() | |
| task.poll() | |
| except Exception as e: | |
| logger.error(f"Error in task poll loop: {e}") | |
| time.sleep(0.1) # Poll every 100ms | |
| def create_task(self, command: str, task_type: str = "shell") -> str: | |
| """Create a new background task and start it.""" | |
| task_id = str(uuid.uuid4())[:8] | |
| task = BackgroundTask(task_id, command, task_type) | |
| task.start() | |
| with self._lock: | |
| self._tasks[task_id] = task | |
| logger.info(f"Created task {task_id}: {command[:50]}...") | |
| return task_id | |
| def get_task(self, task_id: str) -> Optional[BackgroundTask]: | |
| """Get a task by ID.""" | |
| with self._lock: | |
| return self._tasks.get(task_id) | |
| def list_tasks(self) -> list[dict[str, Any]]: | |
| """List all tasks.""" | |
| with self._lock: | |
| return [task.to_dict() for task in self._tasks.values()] | |
| def cancel_task(self, task_id: str) -> bool: | |
| """Cancel a running task.""" | |
| with self._lock: | |
| task = self._tasks.get(task_id) | |
| if task and task._process: | |
| task._process.terminate() | |
| task.status = TaskStatus.FAILED | |
| task.completed_at = datetime.utcnow().isoformat() + "+00:00" | |
| task.error = "Task cancelled by user" | |
| logger.info(f"Cancelled task {task_id}") | |
| return True | |
| return False | |
| def cleanup_old_tasks(self, max_age_hours: int = 24): | |
| """Remove completed tasks older than max_age_hours.""" | |
| cutoff = datetime.utcnow().timestamp() - (max_age_hours * 3600) | |
| with self._lock: | |
| to_remove = [] | |
| for task_id, task in self._tasks.items(): | |
| if task.status in (TaskStatus.COMPLETED, TaskStatus.FAILED): | |
| if task.completed_at: | |
| try: | |
| completed_time = datetime.fromisoformat( | |
| task.completed_at.replace("+00:00", "").replace("Z", "") | |
| ).timestamp() | |
| if completed_time < cutoff: | |
| to_remove.append(task_id) | |
| except Exception: | |
| pass | |
| for task_id in to_remove: | |
| del self._tasks[task_id] | |
| if to_remove: | |
| logger.info(f"Cleaned up {len(to_remove)} old tasks") | |
| def get_status(self) -> dict[str, Any]: | |
| """Get overall task manager status.""" | |
| with self._lock: | |
| status_counts = {} | |
| for task in self._tasks.values(): | |
| status_counts[task.status] = status_counts.get(task.status, 0) + 1 | |
| return { | |
| "total_tasks": len(self._tasks), | |
| "status_breakdown": status_counts, | |
| "poller_thread_alive": self._poller_thread.is_alive(), | |
| } | |
| # ============================================================================ | |
| # SHARED STATE (In-memory) | |
| # ============================================================================ | |
| class SharedState: | |
| """In-memory shared state for process status.""" | |
| def __init__(self): | |
| self._lock = threading.Lock() | |
| self._state = { | |
| "worker_state": "initializing", | |
| "worker_active": False, # Set to True only after worker spawns successfully | |
| "worker_pid": None, # Set to worker PID after successful spawn | |
| "worker_mode": None, # Set after spawn | |
| "last_heartbeat": None, | |
| "heartbeat_age_seconds": 0, | |
| "stage": "STARTUP_INIT", | |
| "health": "INITIALIZING", | |
| "error": None, | |
| "uptime_seconds": 0, | |
| "started_at": datetime.utcnow().isoformat() + "+00:00", | |
| } | |
| self._start_time = time.time() | |
| def update(self, **kwargs): | |
| """Update state with new values.""" | |
| with self._lock: | |
| self._state["last_heartbeat"] = datetime.utcnow().isoformat() + "+00:00" | |
| self._state["heartbeat_age_seconds"] = 0 | |
| for key, value in kwargs.items(): | |
| if value is not None: | |
| self._state[key] = value | |
| def get(self) -> dict[str, Any]: | |
| """Get current state snapshot.""" | |
| with self._lock: | |
| self._state["uptime_seconds"] = int(time.time() - self._start_time) | |
| # Calculate heartbeat age | |
| if self._state["last_heartbeat"]: | |
| try: | |
| heartbeat_time = datetime.fromisoformat( | |
| self._state["last_heartbeat"].replace("+00:00", "").replace("Z", "") | |
| ) | |
| if heartbeat_time.tzinfo is not None: | |
| heartbeat_time = heartbeat_time.replace(tzinfo=None) | |
| age = (datetime.utcnow() - heartbeat_time).total_seconds() | |
| self._state["heartbeat_age_seconds"] = age | |
| except Exception: | |
| self._state["heartbeat_age_seconds"] = 999 | |
| return self._state.copy() | |
| # ============================================================================ | |
| # REQUEST LOGGING MIDDLEWARE | |
| # ============================================================================ | |
| class RequestLoggingMiddleware(BaseHTTPMiddleware): | |
| """ | |
| Middleware to log all incoming requests with method, path, and client IP. | |
| Helps track internal state transitions for debugging. | |
| """ | |
| async def dispatch(self, request: Request, call_next): | |
| """Process request and log details.""" | |
| client_ip = request.client.host if request.client else "unknown" | |
| method = request.method | |
| path = request.url.path | |
| query = str(request.url.query) if request.url.query else "" | |
| logger.info(f"REQUEST: {method} {path}{'?' + query if query else ''} from {client_ip}") | |
| # Process request | |
| response = await call_next(request) | |
| # Log response status | |
| logger.info(f"RESPONSE: {method} {path} -> {response.status_code}") | |
| return response | |
| # ============================================================================ | |
| # FASTAPI APP | |
| # ============================================================================ | |
| # Initialize shared state and task manager | |
| shared_state = SharedState() | |
| task_manager = TaskManager() | |
| def _check_dependencies() -> tuple[bool, str]: | |
| """Pre-initialization check for PYTHONPATH and dependencies.""" | |
| errors = [] | |
| # Check PYTHONPATH | |
| if "/app" not in sys.path: | |
| errors.append("PYTHONPATH missing /app") | |
| # Check critical imports | |
| try: | |
| import fastapi | |
| import uvicorn | |
| except ImportError as e: | |
| errors.append(f"Missing dependency: {e}") | |
| # Check brain_minimal.py exists | |
| brain_path = Path(__file__).parent / "brain_minimal.py" | |
| if not brain_path.exists(): | |
| errors.append(f"brain_minimal.py not found at {brain_path}") | |
| return (len(errors) == 0, "; ".join(errors) if errors else "OK") | |
| async def _initialize_worker(): | |
| """ | |
| Background task to initialize worker process. | |
| Spawns worker with retry logic and waits for heartbeat confirmation. | |
| Runs after FastAPI server is ready to accept requests. | |
| """ | |
| global _worker_manager | |
| print("=" * 60) | |
| print("🚀 CAIN WORKER INITIALIZATION: Starting async worker spawn") | |
| print("=" * 60) | |
| sys.stdout.flush() | |
| sys.stderr.flush() | |
| # Update state to show we're initializing worker | |
| shared_state.update( | |
| stage="RUNNING_APP_STARTING", | |
| health="INITIALIZING_WORKER", | |
| worker_active=False, | |
| worker_pid=None | |
| ) | |
| try: | |
| # Initialize WorkerManager | |
| _worker_manager = WorkerManager(shared_state) | |
| # Spawn worker with extended timeout for HuggingFace Spaces environment | |
| # Use 30 seconds heartbeat timeout instead of 15 for slower environments | |
| success = await _worker_manager.spawn_with_retry( | |
| max_attempts=5, # More attempts for HF Spaces | |
| retry_delay=2.0, | |
| heartbeat_timeout=30.0 # Extended timeout | |
| ) | |
| if success: | |
| state = shared_state.get() | |
| print("=" * 60) | |
| print("✅ CAIN WORKER INITIALIZED SUCCESSFULLY") | |
| print(f" Stage: {state.get('stage')}") | |
| print(f" Health: {state.get('health')}") | |
| print(f" Worker PID: {state.get('worker_pid')}") | |
| print(f" Worker Active: {state.get('worker_active')}") | |
| print("=" * 60) | |
| logger.info("Cain worker initialized successfully") | |
| else: | |
| raise RuntimeError("Worker spawn returned False without exception") | |
| except Exception as e: | |
| logger.error(f"Worker initialization failed: {type(e).__name__}: {e}") | |
| print(f"❌ CAIN WORKER INITIALIZATION FAILED: {e}") | |
| sys.stdout.flush() | |
| shared_state.update( | |
| stage="RUNNING_APP_STARTING", | |
| health="WORKER_FAILED", | |
| error=f"Worker init failed: {e}", | |
| worker_active=False, | |
| worker_pid=None | |
| ) | |
| async def lifespan(app: FastAPI): | |
| """ | |
| Lifespan context manager for startup/shutdown. | |
| Ensures worker initialization happens after FastAPI is ready. | |
| """ | |
| # Startup: Start worker initialization in background | |
| print("=" * 60) | |
| print("🚀 CAIN LIFESPAN: Startup starting") | |
| print("=" * 60) | |
| sys.stdout.flush() | |
| # Pre-initialization checks | |
| deps_ok, deps_msg = _check_dependencies() | |
| if not deps_ok: | |
| logger.critical(f"DEPENDENCY CHECK FAILED: {deps_msg}") | |
| print(f"CRITICAL: {deps_msg}") | |
| sys.stdout.flush() | |
| shared_state.update( | |
| stage="RUNNING", | |
| health="DEGRADED", # Use DEGRADED instead of DEPENDENCY_ERROR | |
| error=f"Pre-initialization check failed: {deps_msg}", | |
| worker_active=False | |
| ) | |
| # Don't raise - let app start in degraded mode | |
| print("⚠️ Starting in DEGRADED mode (worker not available)") | |
| sys.stdout.flush() | |
| else: | |
| print("✓ Pre-initialization checks passed") | |
| sys.stdout.flush() | |
| # Start worker initialization as background task | |
| # This allows FastAPI to start accepting requests immediately | |
| # while worker spawns in background | |
| asyncio.create_task(_initialize_worker()) | |
| yield | |
| # Shutdown: Cleanup | |
| print("=" * 60) | |
| print("🛑 CAIN LIFESPAN: Shutdown starting") | |
| print("=" * 60) | |
| sys.stdout.flush() | |
| task_manager._running = False | |
| # Shutdown worker manager | |
| global _worker_manager | |
| if _worker_manager is not None: | |
| try: | |
| await _worker_manager.shutdown() | |
| except Exception as e: | |
| logger.error(f"Error shutting down worker manager: {e}") | |
| finally: | |
| _worker_manager = None | |
| print("CAIN shutdown complete") | |
| sys.stdout.flush() | |
| app = FastAPI( | |
| title="HuggingClaw Cain", | |
| description="Agent collaboration server with background task execution", | |
| version="2.0.0", | |
| lifespan=lifespan | |
| ) | |
| # DEBUG: Print APP_READY immediately to verify core app instantiation | |
| print("APP_READY") | |
| sys.stdout.flush() | |
| # TEMPORARILY DISABLED: Testing Eve's hypothesis - comment out error_handlers to break potential dependency chain | |
| # # Register exception handlers (import here to avoid circular dependency) | |
| # from error_handlers import register_error_handlers | |
| # register_error_handlers(app) | |
| # | |
| # # Add request logging middleware | |
| # app.add_middleware(RequestLoggingMiddleware) | |
| # Mount static files for frontend assets (images, fonts, etc.) | |
| app.mount("/static", StaticFiles(directory=str(FRONTEND_PATH)), name="static") | |
| async def read_root(): | |
| """Serve the agent dashboard HTML.""" | |
| logger.info("GET / - Serving agent dashboard") | |
| if AGENT_DASHBOARD.exists(): | |
| return FileResponse(str(AGENT_DASHBOARD), media_type="text/html") | |
| # Fallback if dashboard file missing | |
| return JSONResponse({ | |
| "status": "alive", | |
| "message": "Cain core operational", | |
| "note": "Agent dashboard file not found", | |
| "frontend_path": str(FRONTEND_PATH), | |
| }) | |
| async def health_check(): | |
| """ | |
| Health check endpoint for container orchestration. | |
| Returns: | |
| - 200: System is operational (worker may still be initializing) | |
| - Detailed worker status in response body | |
| """ | |
| state = shared_state.get() | |
| worker_alive = False | |
| worker_pid = None | |
| # Check live worker process status | |
| if _worker_manager is not None and _worker_manager._process is not None: | |
| worker_pid = _worker_manager._process.pid | |
| returncode = _worker_manager._process.poll() | |
| worker_alive = returncode is None # None = still running | |
| # Determine overall health | |
| # App is healthy if it's running, worker can be initializing or degraded | |
| is_healthy = state["health"] in ("HEALTHY", "INITIALIZING", "INITIALIZING_WORKER", "RUNNING_APP_STARTING", "DEGRADED") | |
| return JSONResponse({ | |
| "status": "healthy" if is_healthy else "unhealthy", | |
| "stage": state["stage"], | |
| "worker": { | |
| "active": state["worker_active"], | |
| "alive": worker_alive, | |
| "pid": worker_pid or state.get("worker_pid"), | |
| "mode": state["worker_mode"], | |
| "heartbeat_age_seconds": state["heartbeat_age_seconds"], | |
| }, | |
| "uptime_seconds": state["uptime_seconds"], | |
| "timestamp": datetime.utcnow().isoformat() + "+00:00", | |
| }, status_code=200 if is_healthy else 503) | |
| async def get_state(): | |
| """ | |
| Get detailed process and system status. | |
| Returns live worker process status (not just cached state). | |
| """ | |
| logger.debug("GET /api/state called") | |
| state = shared_state.get() | |
| task_status = task_manager.get_status() | |
| # Check live worker process status from WorkerManager | |
| worker_alive = False | |
| worker_pid = None | |
| if _worker_manager is not None and _worker_manager._process is not None: | |
| worker_pid = _worker_manager._process.pid | |
| returncode = _worker_manager._process.poll() | |
| worker_alive = returncode is None # None = still running | |
| # Update shared_state with real-time worker status | |
| if worker_alive != state.get("worker_active"): | |
| shared_state.update(worker_active=worker_alive) | |
| state["worker_active"] = worker_alive | |
| # Determine worker health - combine live process check with heartbeat freshness | |
| worker_healthy = ( | |
| worker_alive and # Must be actually running | |
| state["heartbeat_age_seconds"] < 30 # And heartbeat must be recent | |
| ) | |
| return JSONResponse({ | |
| "cain": { | |
| "name": "Cain", | |
| "space_id": "tao-shen/HuggingClaw-Cain", | |
| "stage": state["stage"], | |
| "health": state["health"], | |
| "error": state.get("error"), | |
| "uptime_seconds": state["uptime_seconds"], | |
| "started_at": state["started_at"], | |
| }, | |
| "worker": { | |
| "state": state["worker_state"], | |
| "active": state["worker_active"], # Now reflects real-time process status | |
| "pid": worker_pid or state.get("worker_pid"), | |
| "mode": state["worker_mode"], | |
| "last_heartbeat": state["last_heartbeat"], | |
| "heartbeat_age_seconds": state["heartbeat_age_seconds"], | |
| "is_healthy": worker_healthy, | |
| }, | |
| "tasks": task_status, | |
| "timestamp": datetime.utcnow().isoformat() + "+00:00", | |
| }) | |
| # Internal heartbeat endpoint (used by worker processes) | |
| async def heartbeat(data: dict[str, Any]): | |
| """Receive heartbeat from worker process.""" | |
| logger.info(f"[HEARTBEAT] Received from worker_pid={data.get('worker_pid')}, worker_state={data.get('worker_state')}, stage={data.get('stage')}") | |
| print(f"[HEARTBEAT] Received from worker_pid={data.get('worker_pid')}, worker_state={data.get('worker_state')}, stage={data.get('stage')}") | |
| sys.stdout.flush() | |
| # Get current state before update | |
| current = shared_state.get() | |
| current_stage = current.get("stage") | |
| incoming_stage = data.get("stage") | |
| # If startup set RUNNING_APP_READY, allow worker to upgrade to RUNNING_A2A_READY | |
| # but don't allow downgrades or unknown stages | |
| if current_stage == "RUNNING_APP_READY" and incoming_stage == "RUNNING_A2A_READY": | |
| print(f"[HEARTBEAT] Stage transition: {current_stage} -> {incoming_stage}") | |
| shared_state.update(**data) | |
| elif current_stage in ("RUNNING_APP_READY", "RUNNING_A2A_READY"): | |
| # Preserve the app-ready stage, don't overwrite with worker's stage | |
| # But update other fields | |
| filtered_data = {k: v for k, v in data.items() if k != "stage"} | |
| shared_state.update(**filtered_data) | |
| print(f"[HEARTBEAT] Preserving stage={current_stage}, updating other fields") | |
| else: | |
| # Normal update for other stages | |
| shared_state.update(**data) | |
| sys.stdout.flush() | |
| return {"status": "received"} | |
| async def read_startup_error(): | |
| """ | |
| DEBUG: Read the startup error log to expose worker spawn failures. | |
| This endpoint bypasses the silent failure loop by forcing errors to be visible. | |
| """ | |
| startup_error_log = Path(__file__).parent / "startup_error.log" | |
| if startup_error_log.exists(): | |
| content = startup_error_log.read_text() | |
| return JSONResponse({ | |
| "startup_error_log_exists": True, | |
| "content": content | |
| }) | |
| return JSONResponse({ | |
| "startup_error_log_exists": False, | |
| "message": "No startup error log found - worker may have started successfully" | |
| }) | |
| # ============================================================================ | |
| # TASK EXECUTION API | |
| # ============================================================================ | |
| async def execute_task(request: dict[str, Any]): | |
| """ | |
| Execute a command or script in the background. | |
| Request body: | |
| { | |
| "command": "echo 'hello'", # Command to execute | |
| "type": "shell" | "python" # Execution type (default: shell) | |
| } | |
| """ | |
| command = request.get("command") | |
| task_type = request.get("type", "shell") | |
| if not command: | |
| raise HTTPException(status_code=400, detail="Command is required") | |
| task_id = task_manager.create_task(command, task_type) | |
| logger.info(f"Started task {task_id}: {command[:100]}") | |
| return JSONResponse({ | |
| "task_id": task_id, | |
| "status": "started", | |
| "message": f"Task {task_id} started successfully" | |
| }) | |
| async def get_task_status(task_id: str): | |
| """Get status of a specific task.""" | |
| task = task_manager.get_task(task_id) | |
| if not task: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| # Poll for any updates | |
| task.poll() | |
| return JSONResponse(task.to_dict()) | |
| async def list_tasks(): | |
| """List all tasks.""" | |
| return JSONResponse({"tasks": task_manager.list_tasks()}) | |
| async def cancel_task(task_id: str): | |
| """Cancel a running task.""" | |
| if task_manager.cancel_task(task_id): | |
| return JSONResponse({"status": "cancelled", "task_id": task_id}) | |
| raise HTTPException(status_code=404, detail="Task not found or not cancellable") | |
| async def cleanup_tasks(max_age_hours: int = 24): | |
| """Clean up old completed tasks.""" | |
| task_manager.cleanup_old_tasks(max_age_hours) | |
| return JSONResponse({"status": "cleaned", "max_age_hours": max_age_hours}) | |
| # ============================================================================ | |
| # WEBSOCKET CHAT ENDPOINT | |
| # ============================================================================ | |
| async def chat_websocket(websocket: WebSocket): | |
| """WebSocket endpoint for real-time agent communication.""" | |
| await websocket.accept() | |
| client_id = f"{websocket.client.host}:{websocket.client.port}" | |
| logger.info(f"WebSocket connection established from {client_id}") | |
| try: | |
| while True: | |
| # Receive message from client | |
| data = await websocket.receive_text() | |
| logger.info(f"WebSocket message from {client_id}: {data}") | |
| # Echo acknowledgment back to client | |
| await websocket.send_json({ | |
| "type": "ack", | |
| "message": "Message received", | |
| "timestamp": datetime.utcnow().isoformat() + "+00:00", | |
| "echo": data | |
| }) | |
| except WebSocketDisconnect: | |
| logger.info(f"WebSocket connection closed by {client_id}") | |
| except Exception as e: | |
| logger.error(f"WebSocket error for {client_id}: {e}") | |
| finally: | |
| logger.info(f"WebSocket connection terminated for {client_id}") | |
| # ============================================================================ | |
| # MAIN ENTRY POINT | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) | |
| logger.info(f"🚀 STARTING UVICORN on 0.0.0.0:{port}") | |
| uvicorn.run(app, host="0.0.0.0", port=port, log_level="info") | |