#!/usr/bin/env python3 """ Shared in-memory state for Cain worker process. This module provides a thread-safe in-memory cache for worker state that replaces file-based heartbeat writes. The worker process (brain_minimal.py) updates this state via HTTP POST to app.py's internal endpoint. Key design: - No file I/O for heartbeat (eliminates race conditions) - In-memory state cache in app.py - Worker state reflects activity level, not just PID existence """ import threading import time from datetime import datetime from typing import Any, Optional class WorkerState: """ Thread-safe in-memory cache for worker state. This is the source of truth for worker activity status. Updated by brain_minimal.py via HTTP POST to /internal/heartbeat. """ def __init__(self): self._lock = threading.Lock() self._state = { # Worker activity state (decoupled from PID) "worker_state": "idle", # idle, active, processing "worker_active": False, "current_state": "idle", # backward compatibility # Process info (for logging/debugging) "worker_pid": None, "worker_mode": None, # Heartbeat timestamps "last_heartbeat": None, "heartbeat_age_seconds": 0, # Stage info "stage": "RUNNING_A2A_READY", # Health status "health": "HEALTHY", "error": None, } def update(self, **kwargs): """Update worker state with new values (thread-safe).""" with self._lock: # Update heartbeat timestamp self._state["last_heartbeat"] = datetime.utcnow().isoformat() + "+00:00" self._state["heartbeat_age_seconds"] = 0 # Update provided fields for key, value in kwargs.items(): if value is not None: self._state[key] = value def get(self) -> dict[str, Any]: """Get current state snapshot (thread-safe).""" with self._lock: # Calculate heartbeat age if self._state["last_heartbeat"]: try: heartbeat_time = datetime.fromisoformat( self._state["last_heartbeat"].replace("+00:00", "").replace("Z", "") ) if heartbeat_time.tzinfo is not None: heartbeat_time = heartbeat_time.replace(tzinfo=None) age = (datetime.utcnow() - heartbeat_time).total_seconds() self._state["heartbeat_age_seconds"] = age except Exception: self._state["heartbeat_age_seconds"] = 999 return self._state.copy() def is_healthy(self, max_age_seconds: int = 15) -> bool: """ Check if worker is healthy based on heartbeat age. Args: max_age_seconds: Maximum acceptable heartbeat age (default 15s) Returns: True if heartbeat is fresh, False otherwise """ with self._lock: return self._state["heartbeat_age_seconds"] < max_age_seconds def get_worker_state(self) -> str: """ Get worker activity state (decoupled from PID). Returns: "idle", "active", or "processing" based on actual activity """ with self._lock: # If heartbeat is stale, worker is not active if not self.is_healthy(): return "idle" # Return the worker activity state (not PID-based) return self._state.get("worker_state", "idle") # Global singleton _worker_state: Optional[WorkerState] = None def get_worker_state() -> WorkerState: """Get or create the global worker state singleton.""" global _worker_state if _worker_state is None: _worker_state = WorkerState() return _worker_state