File size: 3,930 Bytes
41564f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/env python3
"""
Shared in-memory state for Cain worker process.

This module provides a thread-safe in-memory cache for worker state
that replaces file-based heartbeat writes. The worker process (brain_minimal.py)
updates this state via HTTP POST to app.py's internal endpoint.

Key design:
- No file I/O for heartbeat (eliminates race conditions)
- In-memory state cache in app.py
- Worker state reflects activity level, not just PID existence
"""
import threading
import time
from datetime import datetime
from typing import Any, Optional


class WorkerState:
    """
    Thread-safe in-memory cache for worker state.

    This is the source of truth for worker activity status.
    Updated by brain_minimal.py via HTTP POST to /internal/heartbeat.
    """

    def __init__(self):
        self._lock = threading.Lock()
        self._state = {
            # Worker activity state (decoupled from PID)
            "worker_state": "idle",  # idle, active, processing
            "worker_active": False,
            "current_state": "idle",  # backward compatibility

            # Process info (for logging/debugging)
            "worker_pid": None,
            "worker_mode": None,

            # Heartbeat timestamps
            "last_heartbeat": None,
            "heartbeat_age_seconds": 0,

            # Stage info
            "stage": "RUNNING_A2A_READY",

            # Health status
            "health": "HEALTHY",
            "error": None,
        }

    def update(self, **kwargs):
        """Update worker state with new values (thread-safe)."""
        with self._lock:
            # Update heartbeat timestamp
            self._state["last_heartbeat"] = datetime.utcnow().isoformat() + "+00:00"
            self._state["heartbeat_age_seconds"] = 0

            # Update provided fields
            for key, value in kwargs.items():
                if value is not None:
                    self._state[key] = value

    def get(self) -> dict[str, Any]:
        """Get current state snapshot (thread-safe)."""
        with self._lock:
            # Calculate heartbeat age
            if self._state["last_heartbeat"]:
                try:
                    heartbeat_time = datetime.fromisoformat(
                        self._state["last_heartbeat"].replace("+00:00", "").replace("Z", "")
                    )
                    if heartbeat_time.tzinfo is not None:
                        heartbeat_time = heartbeat_time.replace(tzinfo=None)
                    age = (datetime.utcnow() - heartbeat_time).total_seconds()
                    self._state["heartbeat_age_seconds"] = age
                except Exception:
                    self._state["heartbeat_age_seconds"] = 999

            return self._state.copy()

    def is_healthy(self, max_age_seconds: int = 15) -> bool:
        """
        Check if worker is healthy based on heartbeat age.

        Args:
            max_age_seconds: Maximum acceptable heartbeat age (default 15s)

        Returns:
            True if heartbeat is fresh, False otherwise
        """
        with self._lock:
            return self._state["heartbeat_age_seconds"] < max_age_seconds

    def get_worker_state(self) -> str:
        """
        Get worker activity state (decoupled from PID).

        Returns:
            "idle", "active", or "processing" based on actual activity
        """
        with self._lock:
            # If heartbeat is stale, worker is not active
            if not self.is_healthy():
                return "idle"

            # Return the worker activity state (not PID-based)
            return self._state.get("worker_state", "idle")


# Global singleton
_worker_state: Optional[WorkerState] = None


def get_worker_state() -> WorkerState:
    """Get or create the global worker state singleton."""
    global _worker_state
    if _worker_state is None:
        _worker_state = WorkerState()
    return _worker_state