Claude Code Claude Opus 4.6 commited on
Commit
7cab3bb
·
1 Parent(s): 7d8ff40

Claude Code: Force minimal app.py - diagnostic for unknown error

Browse files

Backup saved as app.py.broken. Testing if "unknown" error is in
Python code layer or Docker/entrypoint layer.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

Files changed (2) hide show
  1. app.py +6 -1045
  2. app.py.broken +1049 -0
app.py CHANGED
@@ -1,1049 +1,10 @@
1
- #!/usr/bin/env python3
2
- """
3
- Cain FastAPI App with background task execution engine.
4
-
5
- Serves agent dashboard and manages asynchronous task execution
6
- without blocking the main Uvicorn thread.
7
- """
8
- import asyncio
9
- import os
10
- import subprocess
11
- import sys
12
- import threading
13
- import time
14
- import uuid
15
- import logging
16
- from datetime import datetime
17
- from enum import Enum
18
- from pathlib import Path
19
- from typing import Any, Optional
20
- from contextlib import asynccontextmanager
21
-
22
- import uvicorn
23
- from fastapi import FastAPI, HTTPException, BackgroundTasks, WebSocket, WebSocketDisconnect, Request
24
- from fastapi.responses import JSONResponse, FileResponse
25
- from fastapi.staticfiles import StaticFiles
26
- from fastapi.middleware import Middleware
27
- from starlette.middleware.base import BaseHTTPMiddleware
28
-
29
- # ============================================================================
30
- # SYS.PATH SETUP (must happen before other imports)
31
- # ============================================================================
32
-
33
- # Set up sys.path for agents imports at module load time
34
- # This ensures `from agents import brain_minimal` works regardless of import order
35
- # Dynamic path resolution with fallbacks for different Docker contexts
36
- _script_dir = Path(os.path.abspath(os.path.dirname(__file__))) # Absolute path of this script
37
-
38
- # Try multiple possible locations for .openclaw directory
39
- _possible_openclaw_paths = [
40
- _script_dir / ".openclaw", # /app/.openclaw (legacy/flat structure)
41
- _script_dir / "openclaw" / ".openclaw", # /app/openclaw/.openclaw (nested structure)
42
- Path("/app/openclaw/.openclaw"), # Absolute Docker path (nested)
43
- Path("/app/.openclaw"), # Absolute Docker path (flat)
44
- ]
45
-
46
- # Add all valid paths to sys.path
47
- for path_dir in _possible_openclaw_paths:
48
- path_str = str(path_dir)
49
- if path_str not in sys.path and path_dir.exists():
50
- sys.path.insert(0, path_str)
51
-
52
- # ============================================================================
53
- # CONFIGURATION & LOGGING
54
- # ============================================================================
55
-
56
- LOG_PATH = "/app/logs/cain.log" # Writable in Docker container
57
- FRONTEND_PATH = Path(__file__).parent / "frontend"
58
- AGENT_DASHBOARD = FRONTEND_PATH / "agent-dashboard.html"
59
-
60
- # Configure structured logging to file and stdout
61
- class StructuredFormatter(logging.Formatter):
62
- """JSON-like structured log formatter."""
63
- def format(self, record):
64
- log_data = {
65
- "timestamp": datetime.utcnow().isoformat() + "+00:00",
66
- "level": record.levelname,
67
- "logger": record.name,
68
- "message": record.getMessage(),
69
- "module": record.module,
70
- "function": record.funcName,
71
- "line": record.lineno,
72
- }
73
- if record.exc_info:
74
- log_data["exception"] = self.formatException(record.exc_info)
75
- return str(log_data)
76
-
77
- # Set up file handler
78
- try:
79
- file_handler = logging.FileHandler(LOG_PATH)
80
- file_handler.setFormatter(StructuredFormatter())
81
- except (PermissionError, FileNotFoundError):
82
- file_handler = logging.StreamHandler(sys.stdout)
83
- file_handler.setFormatter(StructuredFormatter())
84
-
85
- # Set up stdout handler
86
- stdout_handler = logging.StreamHandler(sys.stdout)
87
- stdout_handler.setFormatter(logging.Formatter(
88
- "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s"
89
- ))
90
-
91
- # Configure root logger
92
- logger = logging.getLogger(__name__)
93
- logger.setLevel(logging.INFO)
94
- logger.addHandler(file_handler)
95
- logger.addHandler(stdout_handler)
96
-
97
- # ============================================================================
98
- # WORKER MANAGER (Handles worker process lifecycle with blocking startup)
99
- # ============================================================================
100
-
101
- class WorkerManager:
102
- """
103
- Manages the worker process lifecycle with blocking startup verification.
104
-
105
- Ensures the app does NOT report "healthy" until the worker is confirmed running.
106
- Uses asyncio subprocess for proper async handling and retry logic.
107
- """
108
-
109
- def __init__(self, shared_state_ref):
110
- self.shared_state = shared_state_ref
111
- self._process: Optional[asyncio.subprocess.Process] = None
112
- self._log_path = Path("/app/cain.log")
113
- self._pid_file = Path("/app/worker.pid")
114
-
115
- async def spawn_with_retry(
116
- self,
117
- max_attempts: int = 2,
118
- retry_delay: float = 1.0,
119
- heartbeat_timeout: float = 5.0
120
- ) -> bool:
121
- """
122
- Spawn worker process with retry logic - NON-BLOCKING.
123
-
124
- Worker starts asynchronously; heartbeat updates health status separately.
125
-
126
- Args:
127
- max_attempts: Maximum spawn attempts (default: 2, reduced for faster startup)
128
- retry_delay: Seconds to wait between retries (default: 1.0, reduced)
129
- heartbeat_timeout: Max seconds to wait for heartbeat per attempt (default: 5.0, reduced)
130
-
131
- Returns:
132
- True if worker spawned (even if heartbeat not yet received)
133
- """
134
- brain_path = Path(__file__).parent / "brain_minimal.py"
135
-
136
- for attempt in range(1, max_attempts + 1):
137
- logger.info(f"[WorkerManager] Spawn attempt {attempt}/{max_attempts}")
138
- print(f"[WorkerManager] Spawn attempt {attempt}/{max_attempts}")
139
- sys.stdout.flush()
140
- sys.stderr.flush()
141
-
142
- try:
143
- # Open log file for appending
144
- log_file = open(self._log_path, "a") if self._log_path.parent.exists() else None
145
-
146
- # Spawn using asyncio.create_subprocess_exec
147
- self._process = await asyncio.create_subprocess_exec(
148
- sys.executable,
149
- str(brain_path),
150
- stdout=asyncio.subprocess.PIPE,
151
- stderr=asyncio.subprocess.PIPE,
152
- env=self._get_worker_env()
153
- )
154
-
155
- worker_pid = self._process.pid
156
- logger.info(f"[WorkerManager] Worker spawned with PID {worker_pid}")
157
- print(f"[WorkerManager] Worker spawned with PID {worker_pid}")
158
- sys.stdout.flush()
159
-
160
- # Start draining stdout/stderr to log file
161
- asyncio.create_task(self._drain_stdout(log_file))
162
- asyncio.create_task(self._drain_stderr(log_file))
163
-
164
- # Write PID file for external monitoring
165
- self._pid_file.write_text(str(worker_pid))
166
- logger.info(f"[WorkerManager] PID file written: {self._pid_file}")
167
-
168
- # NON-BLOCKING: Mark worker as spawning, heartbeat will confirm it's running
169
- self.shared_state.update(
170
- stage="RUNNING",
171
- health="HEALTHY", # Report healthy immediately so app passes health checks
172
- worker_pid=worker_pid,
173
- worker_active=True, # Assume active until heartbeat fails
174
- worker_mode="standalone_process"
175
- )
176
- logger.info(f"[WorkerManager] Worker spawn initiated (PID: {worker_pid})")
177
- print(f"[WorkerManager] ✓ Worker spawned - waiting for heartbeat")
178
- sys.stdout.flush()
179
- return True
180
-
181
- except Exception as e:
182
- logger.error(f"[WorkerManager] Spawn attempt {attempt} failed: {type(e).__name__}: {e}")
183
- print(f"[WorkerManager] ✗ Spawn attempt {attempt} failed: {e}")
184
- sys.stdout.flush()
185
-
186
- # Retry delay (except after last attempt)
187
- if attempt < max_attempts:
188
- logger.info(f"[WorkerManager] Waiting {retry_delay}s before retry...")
189
- await asyncio.sleep(retry_delay)
190
-
191
- # All attempts failed - but don't crash, just log and continue
192
- error_msg = f"Failed to spawn worker after {max_attempts} attempts"
193
- logger.error(error_msg)
194
- print(f"[WorkerManager] ⚠ {error_msg} - continuing without worker")
195
- sys.stdout.flush()
196
-
197
- self.shared_state.update(
198
- stage="RUNNING",
199
- health="DEGRADED", # Degraded but still serving requests
200
- error=error_msg,
201
- worker_active=False,
202
- worker_pid=None
203
- )
204
-
205
- return False # Return False but don't raise - app stays alive
206
-
207
- async def _wait_for_heartbeat(self, timeout: float) -> bool:
208
- """
209
- Wait for worker heartbeat to be received.
210
-
211
- Args:
212
- timeout: Maximum seconds to wait
213
-
214
- Returns:
215
- True if heartbeat received, False if timeout
216
- """
217
- start_time = time.time()
218
- check_interval = 0.2
219
-
220
- while (time.time() - start_time) < timeout:
221
- state = self.shared_state.get()
222
- if state.get("worker_active") is True:
223
- logger.info(f"[WorkerManager] Heartbeat confirmed after {time.time() - start_time:.1f}s")
224
- return True
225
-
226
- # Check if process crashed
227
- if self._process and self._process.poll() is not None:
228
- logger.warning(f"[WorkerManager] Process exited during heartbeat wait")
229
- return False
230
-
231
- await asyncio.sleep(check_interval)
232
-
233
- return False
234
-
235
- async def _drain_stdout(self, log_file=None):
236
- """Drain worker stdout to prevent buffer blocking."""
237
- if not self._process or not self._process.stdout:
238
- return
239
- try:
240
- while True:
241
- line = await self._process.stdout.readline()
242
- if not line:
243
- break
244
- line_str = line.decode("utf-8", errors="replace").rstrip()
245
- logger.info(f"[WORKER_STDOUT] {line_str}")
246
- if log_file:
247
- log_file.write(f"[STDOUT] {line_str}\n")
248
- log_file.flush()
249
- except Exception:
250
- pass
251
-
252
- async def _drain_stderr(self, log_file=None):
253
- """Drain worker stderr to prevent buffer blocking."""
254
- if not self._process or not self._process.stderr:
255
- return
256
- try:
257
- while True:
258
- line = await self._process.stderr.readline()
259
- if not line:
260
- break
261
- line_str = line.decode("utf-8", errors="replace").rstrip()
262
- logger.warning(f"[WORKER_STDERR] {line_str}")
263
- if log_file:
264
- log_file.write(f"[STDERR] {line_str}\n")
265
- log_file.flush()
266
- except Exception:
267
- pass
268
-
269
- async def _terminate_worker(self):
270
- """Terminate the worker process if running."""
271
- if self._process:
272
- try:
273
- self._process.terminate()
274
- try:
275
- await asyncio.wait_for(self._process.wait(), timeout=5.0)
276
- except asyncio.TimeoutError:
277
- self._process.kill()
278
- await self._process.wait()
279
- except Exception as e:
280
- logger.warning(f"[WorkerManager] Error terminating worker: {e}")
281
-
282
- # Clean up PID file
283
- if self._pid_file.exists():
284
- try:
285
- self._pid_file.unlink()
286
- except Exception:
287
- pass
288
-
289
- def _get_worker_env(self) -> dict[str, str]:
290
- """Build environment for worker process.
291
-
292
- CRITICAL: Uses os.environ.copy() which automatically inherits all
293
- HF Space secrets (HF_TOKEN, OPENAI_API_KEY, OPENROUTER_API_KEY, etc).
294
- DO NOT set empty defaults - that can interfere with secret inheritance.
295
- """
296
- worker_env = os.environ.copy()
297
- worker_env["PYTHONPATH"] = "/app:" + worker_env.get("PYTHONPATH", "")
298
- worker_env["CAIN_IS_WORKER"] = "true"
299
- return worker_env
300
-
301
- async def shutdown(self):
302
- """Gracefully shutdown the worker process."""
303
- if self._process:
304
- logger.info("[WorkerManager] Shutting down worker process...")
305
- await self._terminate_worker()
306
- self._process = None
307
-
308
-
309
- # Global worker manager instance
310
- _worker_manager: Optional[WorkerManager] = None
311
-
312
-
313
- # ============================================================================
314
- # BACKGROUND TASK MANAGER (Non-blocking)
315
- # ============================================================================
316
-
317
- class TaskStatus(str, Enum):
318
- PENDING = "pending"
319
- RUNNING = "running"
320
- COMPLETED = "completed"
321
- FAILED = "failed"
322
-
323
-
324
- class BackgroundTask:
325
- """Represents a single background task."""
326
-
327
- def __init__(self, task_id: str, command: str, task_type: str = "shell"):
328
- self.task_id = task_id
329
- self.command = command
330
- self.task_type = task_type
331
- self.status = TaskStatus.PENDING
332
- self.created_at = datetime.utcnow().isoformat() + "+00:00"
333
- self.started_at: Optional[str] = None
334
- self.completed_at: Optional[str] = None
335
- self.output: str = ""
336
- self.error: Optional[str] = None
337
- self.exit_code: Optional[int] = None
338
- self._process: Optional[subprocess.Popen] = None
339
-
340
- def start(self):
341
- """Start executing the task in background."""
342
- self.status = TaskStatus.RUNNING
343
- self.started_at = datetime.utcnow().isoformat() + "+00:00"
344
-
345
- try:
346
- if self.task_type == "shell":
347
- self._process = subprocess.Popen(
348
- self.command,
349
- shell=True,
350
- stdout=subprocess.PIPE,
351
- stderr=subprocess.PIPE,
352
- text=True,
353
- bufsize=1,
354
- universal_newlines=True
355
- )
356
- else:
357
- # Python script execution
358
- self._process = subprocess.Popen(
359
- [sys.executable, "-c", self.command],
360
- stdout=subprocess.PIPE,
361
- stderr=subprocess.PIPE,
362
- text=True,
363
- bufsize=1,
364
- universal_newlines=True
365
- )
366
- except Exception as e:
367
- self.status = TaskStatus.FAILED
368
- self.error = str(e)
369
- self.completed_at = datetime.utcnow().isoformat() + "+00:00"
370
- logger.error(f"Task {self.task_id} failed to start: {e}")
371
-
372
- def poll(self):
373
- """Poll task status and update if complete."""
374
- if self._process is None:
375
- return
376
-
377
- # Check if process has completed
378
- returncode = self._process.poll()
379
- if returncode is not None:
380
- # Process finished
381
- self.status = TaskStatus.COMPLETED if returncode == 0 else TaskStatus.FAILED
382
- self.completed_at = datetime.utcnow().isoformat() + "+00:00"
383
- self.exit_code = returncode
384
-
385
- # Capture remaining output
386
- stdout, stderr = self._process.communicate()
387
- self.output += stdout
388
- if stderr:
389
- self.error = stderr
390
-
391
- logger.info(f"Task {self.task_id} completed with exit code {returncode}")
392
-
393
- def read_output(self):
394
- """Read available output without blocking."""
395
- if self._process is None:
396
- return
397
- try:
398
- # Non-blocking read of available output
399
- if self._process.stdout:
400
- lines = self._process.stdout.readlines()
401
- self.output += "".join(lines)
402
- except Exception:
403
- pass
404
-
405
- def to_dict(self) -> dict[str, Any]:
406
- """Convert task to dictionary for API response."""
407
- return {
408
- "task_id": self.task_id,
409
- "command": self.command,
410
- "task_type": self.task_type,
411
- "status": self.status,
412
- "created_at": self.created_at,
413
- "started_at": self.started_at,
414
- "completed_at": self.completed_at,
415
- "exit_code": self.exit_code,
416
- "output": self.output,
417
- "error": self.error,
418
- }
419
-
420
-
421
- class TaskManager:
422
- """
423
- Non-blocking background task manager.
424
-
425
- Runs shell commands and Python scripts asynchronously without
426
- blocking the main Uvicorn thread.
427
- """
428
-
429
- def __init__(self):
430
- self._tasks: dict[str, BackgroundTask] = {}
431
- self._lock = threading.Lock()
432
- self._running = True
433
- # Start background poller thread
434
- self._poller_thread = threading.Thread(
435
- target=self._poll_loop,
436
- daemon=True,
437
- name="TaskManagerPoller"
438
- )
439
- self._poller_thread.start()
440
- logger.info("TaskManager initialized with background poller thread")
441
-
442
- def _poll_loop(self):
443
- """Background thread that polls all running tasks."""
444
- while self._running:
445
- try:
446
- with self._lock:
447
- for task in list(self._tasks.values()):
448
- if task.status == TaskStatus.RUNNING:
449
- task.read_output()
450
- task.poll()
451
- except Exception as e:
452
- logger.error(f"Error in task poll loop: {e}")
453
- time.sleep(0.1) # Poll every 100ms
454
-
455
- def create_task(self, command: str, task_type: str = "shell") -> str:
456
- """Create a new background task and start it."""
457
- task_id = str(uuid.uuid4())[:8]
458
- task = BackgroundTask(task_id, command, task_type)
459
- task.start()
460
-
461
- with self._lock:
462
- self._tasks[task_id] = task
463
-
464
- logger.info(f"Created task {task_id}: {command[:50]}...")
465
- return task_id
466
-
467
- def get_task(self, task_id: str) -> Optional[BackgroundTask]:
468
- """Get a task by ID."""
469
- with self._lock:
470
- return self._tasks.get(task_id)
471
-
472
- def list_tasks(self) -> list[dict[str, Any]]:
473
- """List all tasks."""
474
- with self._lock:
475
- return [task.to_dict() for task in self._tasks.values()]
476
-
477
- def cancel_task(self, task_id: str) -> bool:
478
- """Cancel a running task."""
479
- with self._lock:
480
- task = self._tasks.get(task_id)
481
- if task and task._process:
482
- task._process.terminate()
483
- task.status = TaskStatus.FAILED
484
- task.completed_at = datetime.utcnow().isoformat() + "+00:00"
485
- task.error = "Task cancelled by user"
486
- logger.info(f"Cancelled task {task_id}")
487
- return True
488
- return False
489
-
490
- def cleanup_old_tasks(self, max_age_hours: int = 24):
491
- """Remove completed tasks older than max_age_hours."""
492
- cutoff = datetime.utcnow().timestamp() - (max_age_hours * 3600)
493
- with self._lock:
494
- to_remove = []
495
- for task_id, task in self._tasks.items():
496
- if task.status in (TaskStatus.COMPLETED, TaskStatus.FAILED):
497
- if task.completed_at:
498
- try:
499
- completed_time = datetime.fromisoformat(
500
- task.completed_at.replace("+00:00", "").replace("Z", "")
501
- ).timestamp()
502
- if completed_time < cutoff:
503
- to_remove.append(task_id)
504
- except Exception:
505
- pass
506
- for task_id in to_remove:
507
- del self._tasks[task_id]
508
- if to_remove:
509
- logger.info(f"Cleaned up {len(to_remove)} old tasks")
510
-
511
- def get_status(self) -> dict[str, Any]:
512
- """Get overall task manager status."""
513
- with self._lock:
514
- status_counts = {}
515
- for task in self._tasks.values():
516
- status_counts[task.status] = status_counts.get(task.status, 0) + 1
517
- return {
518
- "total_tasks": len(self._tasks),
519
- "status_breakdown": status_counts,
520
- "poller_thread_alive": self._poller_thread.is_alive(),
521
- }
522
-
523
-
524
- # ============================================================================
525
- # SHARED STATE (In-memory)
526
- # ============================================================================
527
-
528
-
529
- class SharedState:
530
- """In-memory shared state for process status."""
531
-
532
- def __init__(self):
533
- self._lock = threading.Lock()
534
- self._state = {
535
- "worker_state": "initializing",
536
- "worker_active": False, # Set to True only after worker spawns successfully
537
- "worker_pid": None, # Set to worker PID after successful spawn
538
- "worker_mode": None, # Set after spawn
539
- "last_heartbeat": None,
540
- "heartbeat_age_seconds": 0,
541
- "stage": "STARTUP_INIT",
542
- "health": "INITIALIZING",
543
- "error": None,
544
- "uptime_seconds": 0,
545
- "started_at": datetime.utcnow().isoformat() + "+00:00",
546
- }
547
- self._start_time = time.time()
548
-
549
- def update(self, **kwargs):
550
- """Update state with new values."""
551
- with self._lock:
552
- self._state["last_heartbeat"] = datetime.utcnow().isoformat() + "+00:00"
553
- self._state["heartbeat_age_seconds"] = 0
554
- for key, value in kwargs.items():
555
- if value is not None:
556
- self._state[key] = value
557
-
558
- def get(self) -> dict[str, Any]:
559
- """Get current state snapshot."""
560
- with self._lock:
561
- self._state["uptime_seconds"] = int(time.time() - self._start_time)
562
-
563
- # Calculate heartbeat age
564
- if self._state["last_heartbeat"]:
565
- try:
566
- heartbeat_time = datetime.fromisoformat(
567
- self._state["last_heartbeat"].replace("+00:00", "").replace("Z", "")
568
- )
569
- if heartbeat_time.tzinfo is not None:
570
- heartbeat_time = heartbeat_time.replace(tzinfo=None)
571
- age = (datetime.utcnow() - heartbeat_time).total_seconds()
572
- self._state["heartbeat_age_seconds"] = age
573
- except Exception:
574
- self._state["heartbeat_age_seconds"] = 999
575
-
576
- return self._state.copy()
577
-
578
-
579
- # ============================================================================
580
- # REQUEST LOGGING MIDDLEWARE
581
- # ============================================================================
582
-
583
- class RequestLoggingMiddleware(BaseHTTPMiddleware):
584
- """
585
- Middleware to log all incoming requests with method, path, and client IP.
586
- Helps track internal state transitions for debugging.
587
- """
588
-
589
- async def dispatch(self, request: Request, call_next):
590
- """Process request and log details."""
591
- client_ip = request.client.host if request.client else "unknown"
592
- method = request.method
593
- path = request.url.path
594
- query = str(request.url.query) if request.url.query else ""
595
-
596
- logger.info(f"REQUEST: {method} {path}{'?' + query if query else ''} from {client_ip}")
597
-
598
- # Process request
599
- response = await call_next(request)
600
-
601
- # Log response status
602
- logger.info(f"RESPONSE: {method} {path} -> {response.status_code}")
603
-
604
- return response
605
-
606
-
607
- # ============================================================================
608
- # FASTAPI APP
609
- # ============================================================================
610
-
611
- # Initialize shared state and task manager
612
- shared_state = SharedState()
613
- task_manager = TaskManager()
614
-
615
-
616
- def _check_dependencies() -> tuple[bool, str]:
617
- """Pre-initialization check for PYTHONPATH and dependencies."""
618
- errors = []
619
-
620
- # Check PYTHONPATH
621
- if "/app" not in sys.path:
622
- errors.append("PYTHONPATH missing /app")
623
-
624
- # Check critical imports
625
- try:
626
- import fastapi
627
- import uvicorn
628
- except ImportError as e:
629
- errors.append(f"Missing dependency: {e}")
630
-
631
- # Check brain_minimal.py exists
632
- brain_path = Path(__file__).parent / "brain_minimal.py"
633
- if not brain_path.exists():
634
- errors.append(f"brain_minimal.py not found at {brain_path}")
635
-
636
- return (len(errors) == 0, "; ".join(errors) if errors else "OK")
637
-
638
-
639
- async def _initialize_worker():
640
- """
641
- Background task to initialize worker process.
642
- Spawns worker with retry logic and waits for heartbeat confirmation.
643
- Runs after FastAPI server is ready to accept requests.
644
- """
645
- global _worker_manager
646
-
647
- print("=" * 60)
648
- print("🚀 CAIN WORKER INITIALIZATION: Starting async worker spawn")
649
- print("=" * 60)
650
- sys.stdout.flush()
651
- sys.stderr.flush()
652
-
653
- # Update state to show we're initializing worker
654
- shared_state.update(
655
- stage="RUNNING_APP_STARTING",
656
- health="INITIALIZING_WORKER",
657
- worker_active=False,
658
- worker_pid=None
659
- )
660
-
661
- try:
662
- # Initialize WorkerManager
663
- _worker_manager = WorkerManager(shared_state)
664
-
665
- # Spawn worker with extended timeout for HuggingFace Spaces environment
666
- # Use 30 seconds heartbeat timeout instead of 15 for slower environments
667
- success = await _worker_manager.spawn_with_retry(
668
- max_attempts=5, # More attempts for HF Spaces
669
- retry_delay=2.0,
670
- heartbeat_timeout=30.0 # Extended timeout
671
- )
672
-
673
- if success:
674
- state = shared_state.get()
675
- print("=" * 60)
676
- print("✅ CAIN WORKER INITIALIZED SUCCESSFULLY")
677
- print(f" Stage: {state.get('stage')}")
678
- print(f" Health: {state.get('health')}")
679
- print(f" Worker PID: {state.get('worker_pid')}")
680
- print(f" Worker Active: {state.get('worker_active')}")
681
- print("=" * 60)
682
- logger.info("Cain worker initialized successfully")
683
- else:
684
- raise RuntimeError("Worker spawn returned False without exception")
685
-
686
- except Exception as e:
687
- logger.error(f"Worker initialization failed: {type(e).__name__}: {e}")
688
- print(f"❌ CAIN WORKER INITIALIZATION FAILED: {e}")
689
- sys.stdout.flush()
690
- shared_state.update(
691
- stage="RUNNING_APP_STARTING",
692
- health="WORKER_FAILED",
693
- error=f"Worker init failed: {e}",
694
- worker_active=False,
695
- worker_pid=None
696
- )
697
-
698
-
699
- @asynccontextmanager
700
- async def lifespan(app: FastAPI):
701
- """
702
- Lifespan context manager for startup/shutdown.
703
- Ensures worker initialization happens after FastAPI is ready.
704
- """
705
- # Startup: Start worker initialization in background
706
- print("=" * 60)
707
- print("🚀 CAIN LIFESPAN: Startup starting")
708
- print("=" * 60)
709
- sys.stdout.flush()
710
-
711
- # Pre-initialization checks
712
- deps_ok, deps_msg = _check_dependencies()
713
- if not deps_ok:
714
- logger.critical(f"DEPENDENCY CHECK FAILED: {deps_msg}")
715
- print(f"CRITICAL: {deps_msg}")
716
- sys.stdout.flush()
717
- shared_state.update(
718
- stage="RUNNING",
719
- health="DEGRADED", # Use DEGRADED instead of DEPENDENCY_ERROR
720
- error=f"Pre-initialization check failed: {deps_msg}",
721
- worker_active=False
722
- )
723
- # Don't raise - let app start in degraded mode
724
- print("⚠️ Starting in DEGRADED mode (worker not available)")
725
- sys.stdout.flush()
726
- else:
727
- print("✓ Pre-initialization checks passed")
728
- sys.stdout.flush()
729
-
730
- # Start worker initialization as background task
731
- # This allows FastAPI to start accepting requests immediately
732
- # while worker spawns in background
733
- asyncio.create_task(_initialize_worker())
734
-
735
- yield
736
-
737
- # Shutdown: Cleanup
738
- print("=" * 60)
739
- print("🛑 CAIN LIFESPAN: Shutdown starting")
740
- print("=" * 60)
741
- sys.stdout.flush()
742
-
743
- task_manager._running = False
744
-
745
- # Shutdown worker manager
746
- global _worker_manager
747
- if _worker_manager is not None:
748
- try:
749
- await _worker_manager.shutdown()
750
- except Exception as e:
751
- logger.error(f"Error shutting down worker manager: {e}")
752
- finally:
753
- _worker_manager = None
754
-
755
- print("CAIN shutdown complete")
756
- sys.stdout.flush()
757
-
758
-
759
- app = FastAPI(
760
- title="HuggingClaw Cain",
761
- description="Agent collaboration server with background task execution",
762
- version="2.0.0",
763
- lifespan=lifespan
764
- )
765
-
766
- # DEBUG: Print APP_READY immediately to verify core app instantiation
767
- print("APP_READY")
768
- sys.stdout.flush()
769
-
770
- # TEMPORARILY DISABLED: Testing Eve's hypothesis - comment out error_handlers to break potential dependency chain
771
- # # Register exception handlers (import here to avoid circular dependency)
772
- # from error_handlers import register_error_handlers
773
- # register_error_handlers(app)
774
- #
775
- # # Add request logging middleware
776
- # app.add_middleware(RequestLoggingMiddleware)
777
-
778
- # Mount static files for frontend assets (images, fonts, etc.)
779
- app.mount("/static", StaticFiles(directory=str(FRONTEND_PATH)), name="static")
780
-
781
 
782
  @app.get("/")
783
- async def read_root():
784
- """Serve the agent dashboard HTML."""
785
- logger.info("GET / - Serving agent dashboard")
786
-
787
- if AGENT_DASHBOARD.exists():
788
- return FileResponse(str(AGENT_DASHBOARD), media_type="text/html")
789
-
790
- # Fallback if dashboard file missing
791
- return JSONResponse({
792
- "status": "alive",
793
- "message": "Cain core operational",
794
- "note": "Agent dashboard file not found",
795
- "frontend_path": str(FRONTEND_PATH),
796
- })
797
-
798
-
799
- @app.get("/health")
800
- async def health_check():
801
- """
802
- Health check endpoint for container orchestration.
803
-
804
- Returns:
805
- - 200: System is operational (worker may still be initializing)
806
- - Detailed worker status in response body
807
- """
808
- state = shared_state.get()
809
- worker_alive = False
810
- worker_pid = None
811
-
812
- # Check live worker process status
813
- if _worker_manager is not None and _worker_manager._process is not None:
814
- worker_pid = _worker_manager._process.pid
815
- returncode = _worker_manager._process.poll()
816
- worker_alive = returncode is None # None = still running
817
-
818
- # Determine overall health
819
- # App is healthy if it's running, worker can be initializing or degraded
820
- is_healthy = state["health"] in ("HEALTHY", "INITIALIZING", "INITIALIZING_WORKER", "RUNNING_APP_STARTING", "DEGRADED")
821
-
822
- return JSONResponse({
823
- "status": "healthy" if is_healthy else "unhealthy",
824
- "stage": state["stage"],
825
- "worker": {
826
- "active": state["worker_active"],
827
- "alive": worker_alive,
828
- "pid": worker_pid or state.get("worker_pid"),
829
- "mode": state["worker_mode"],
830
- "heartbeat_age_seconds": state["heartbeat_age_seconds"],
831
- },
832
- "uptime_seconds": state["uptime_seconds"],
833
- "timestamp": datetime.utcnow().isoformat() + "+00:00",
834
- }, status_code=200 if is_healthy else 503)
835
-
836
-
837
- @app.get("/api/state")
838
- async def get_state():
839
- """
840
- Get detailed process and system status.
841
-
842
- Returns live worker process status (not just cached state).
843
- """
844
- logger.debug("GET /api/state called")
845
-
846
- state = shared_state.get()
847
- task_status = task_manager.get_status()
848
-
849
- # Check live worker process status from WorkerManager
850
- worker_alive = False
851
- worker_pid = None
852
-
853
- if _worker_manager is not None and _worker_manager._process is not None:
854
- worker_pid = _worker_manager._process.pid
855
- returncode = _worker_manager._process.poll()
856
- worker_alive = returncode is None # None = still running
857
-
858
- # Update shared_state with real-time worker status
859
- if worker_alive != state.get("worker_active"):
860
- shared_state.update(worker_active=worker_alive)
861
- state["worker_active"] = worker_alive
862
-
863
- # Determine worker health - combine live process check with heartbeat freshness
864
- worker_healthy = (
865
- worker_alive and # Must be actually running
866
- state["heartbeat_age_seconds"] < 30 # And heartbeat must be recent
867
- )
868
-
869
- return JSONResponse({
870
- "cain": {
871
- "name": "Cain",
872
- "space_id": "tao-shen/HuggingClaw-Cain",
873
- "stage": state["stage"],
874
- "health": state["health"],
875
- "error": state.get("error"),
876
- "uptime_seconds": state["uptime_seconds"],
877
- "started_at": state["started_at"],
878
- },
879
- "worker": {
880
- "state": state["worker_state"],
881
- "active": state["worker_active"], # Now reflects real-time process status
882
- "pid": worker_pid or state.get("worker_pid"),
883
- "mode": state["worker_mode"],
884
- "last_heartbeat": state["last_heartbeat"],
885
- "heartbeat_age_seconds": state["heartbeat_age_seconds"],
886
- "is_healthy": worker_healthy,
887
- },
888
- "tasks": task_status,
889
- "timestamp": datetime.utcnow().isoformat() + "+00:00",
890
- })
891
-
892
-
893
- # Internal heartbeat endpoint (used by worker processes)
894
- @app.post("/internal/heartbeat")
895
- async def heartbeat(data: dict[str, Any]):
896
- """Receive heartbeat from worker process."""
897
- logger.info(f"[HEARTBEAT] Received from worker_pid={data.get('worker_pid')}, worker_state={data.get('worker_state')}, stage={data.get('stage')}")
898
- print(f"[HEARTBEAT] Received from worker_pid={data.get('worker_pid')}, worker_state={data.get('worker_state')}, stage={data.get('stage')}")
899
- sys.stdout.flush()
900
-
901
- # Get current state before update
902
- current = shared_state.get()
903
- current_stage = current.get("stage")
904
- incoming_stage = data.get("stage")
905
-
906
- # If startup set RUNNING_APP_READY, allow worker to upgrade to RUNNING_A2A_READY
907
- # but don't allow downgrades or unknown stages
908
- if current_stage == "RUNNING_APP_READY" and incoming_stage == "RUNNING_A2A_READY":
909
- print(f"[HEARTBEAT] Stage transition: {current_stage} -> {incoming_stage}")
910
- shared_state.update(**data)
911
- elif current_stage in ("RUNNING_APP_READY", "RUNNING_A2A_READY"):
912
- # Preserve the app-ready stage, don't overwrite with worker's stage
913
- # But update other fields
914
- filtered_data = {k: v for k, v in data.items() if k != "stage"}
915
- shared_state.update(**filtered_data)
916
- print(f"[HEARTBEAT] Preserving stage={current_stage}, updating other fields")
917
- else:
918
- # Normal update for other stages
919
- shared_state.update(**data)
920
-
921
- sys.stdout.flush()
922
- return {"status": "received"}
923
-
924
-
925
- @app.get("/read_startup_error")
926
- async def read_startup_error():
927
- """
928
- DEBUG: Read the startup error log to expose worker spawn failures.
929
- This endpoint bypasses the silent failure loop by forcing errors to be visible.
930
- """
931
- startup_error_log = Path(__file__).parent / "startup_error.log"
932
- if startup_error_log.exists():
933
- content = startup_error_log.read_text()
934
- return JSONResponse({
935
- "startup_error_log_exists": True,
936
- "content": content
937
- })
938
- return JSONResponse({
939
- "startup_error_log_exists": False,
940
- "message": "No startup error log found - worker may have started successfully"
941
- })
942
-
943
-
944
- # ============================================================================
945
- # TASK EXECUTION API
946
- # ============================================================================
947
-
948
- @app.post("/api/tasks/execute")
949
- async def execute_task(request: dict[str, Any]):
950
- """
951
- Execute a command or script in the background.
952
-
953
- Request body:
954
- {
955
- "command": "echo 'hello'", # Command to execute
956
- "type": "shell" | "python" # Execution type (default: shell)
957
- }
958
- """
959
- command = request.get("command")
960
- task_type = request.get("type", "shell")
961
-
962
- if not command:
963
- raise HTTPException(status_code=400, detail="Command is required")
964
-
965
- task_id = task_manager.create_task(command, task_type)
966
- logger.info(f"Started task {task_id}: {command[:100]}")
967
-
968
- return JSONResponse({
969
- "task_id": task_id,
970
- "status": "started",
971
- "message": f"Task {task_id} started successfully"
972
- })
973
-
974
-
975
- @app.get("/api/tasks/{task_id}")
976
- async def get_task_status(task_id: str):
977
- """Get status of a specific task."""
978
- task = task_manager.get_task(task_id)
979
- if not task:
980
- raise HTTPException(status_code=404, detail="Task not found")
981
-
982
- # Poll for any updates
983
- task.poll()
984
-
985
- return JSONResponse(task.to_dict())
986
-
987
-
988
- @app.get("/api/tasks")
989
- async def list_tasks():
990
- """List all tasks."""
991
- return JSONResponse({"tasks": task_manager.list_tasks()})
992
-
993
-
994
- @app.delete("/api/tasks/{task_id}")
995
- async def cancel_task(task_id: str):
996
- """Cancel a running task."""
997
- if task_manager.cancel_task(task_id):
998
- return JSONResponse({"status": "cancelled", "task_id": task_id})
999
- raise HTTPException(status_code=404, detail="Task not found or not cancellable")
1000
-
1001
-
1002
- @app.post("/api/tasks/cleanup")
1003
- async def cleanup_tasks(max_age_hours: int = 24):
1004
- """Clean up old completed tasks."""
1005
- task_manager.cleanup_old_tasks(max_age_hours)
1006
- return JSONResponse({"status": "cleaned", "max_age_hours": max_age_hours})
1007
-
1008
-
1009
- # ============================================================================
1010
- # WEBSOCKET CHAT ENDPOINT
1011
- # ============================================================================
1012
-
1013
- @app.websocket("/chat")
1014
- async def chat_websocket(websocket: WebSocket):
1015
- """WebSocket endpoint for real-time agent communication."""
1016
- await websocket.accept()
1017
- client_id = f"{websocket.client.host}:{websocket.client.port}"
1018
- logger.info(f"WebSocket connection established from {client_id}")
1019
-
1020
- try:
1021
- while True:
1022
- # Receive message from client
1023
- data = await websocket.receive_text()
1024
- logger.info(f"WebSocket message from {client_id}: {data}")
1025
-
1026
- # Echo acknowledgment back to client
1027
- await websocket.send_json({
1028
- "type": "ack",
1029
- "message": "Message received",
1030
- "timestamp": datetime.utcnow().isoformat() + "+00:00",
1031
- "echo": data
1032
- })
1033
-
1034
- except WebSocketDisconnect:
1035
- logger.info(f"WebSocket connection closed by {client_id}")
1036
- except Exception as e:
1037
- logger.error(f"WebSocket error for {client_id}: {e}")
1038
- finally:
1039
- logger.info(f"WebSocket connection terminated for {client_id}")
1040
-
1041
-
1042
- # ============================================================================
1043
- # MAIN ENTRY POINT
1044
- # ============================================================================
1045
 
1046
  if __name__ == "__main__":
1047
- port = int(os.environ.get("PORT", 7860))
1048
- logger.info(f"🚀 STARTING UVICORN on 0.0.0.0:{port}")
1049
- uvicorn.run(app, host="0.0.0.0", port=port, log_level="info")
 
1
+ from fastapi import FastAPI
2
+ app = FastAPI()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3
 
4
  @app.get("/")
5
+ def read_root():
6
+ return {"status": "minimal_ok"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7
 
8
  if __name__ == "__main__":
9
+ import uvicorn
10
+ uvicorn.run(app, host="0.0.0.0", port=7860)
 
app.py.broken ADDED
@@ -0,0 +1,1049 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Cain FastAPI App with background task execution engine.
4
+
5
+ Serves agent dashboard and manages asynchronous task execution
6
+ without blocking the main Uvicorn thread.
7
+ """
8
+ import asyncio
9
+ import os
10
+ import subprocess
11
+ import sys
12
+ import threading
13
+ import time
14
+ import uuid
15
+ import logging
16
+ from datetime import datetime
17
+ from enum import Enum
18
+ from pathlib import Path
19
+ from typing import Any, Optional
20
+ from contextlib import asynccontextmanager
21
+
22
+ import uvicorn
23
+ from fastapi import FastAPI, HTTPException, BackgroundTasks, WebSocket, WebSocketDisconnect, Request
24
+ from fastapi.responses import JSONResponse, FileResponse
25
+ from fastapi.staticfiles import StaticFiles
26
+ from fastapi.middleware import Middleware
27
+ from starlette.middleware.base import BaseHTTPMiddleware
28
+
29
+ # ============================================================================
30
+ # SYS.PATH SETUP (must happen before other imports)
31
+ # ============================================================================
32
+
33
+ # Set up sys.path for agents imports at module load time
34
+ # This ensures `from agents import brain_minimal` works regardless of import order
35
+ # Dynamic path resolution with fallbacks for different Docker contexts
36
+ _script_dir = Path(os.path.abspath(os.path.dirname(__file__))) # Absolute path of this script
37
+
38
+ # Try multiple possible locations for .openclaw directory
39
+ _possible_openclaw_paths = [
40
+ _script_dir / ".openclaw", # /app/.openclaw (legacy/flat structure)
41
+ _script_dir / "openclaw" / ".openclaw", # /app/openclaw/.openclaw (nested structure)
42
+ Path("/app/openclaw/.openclaw"), # Absolute Docker path (nested)
43
+ Path("/app/.openclaw"), # Absolute Docker path (flat)
44
+ ]
45
+
46
+ # Add all valid paths to sys.path
47
+ for path_dir in _possible_openclaw_paths:
48
+ path_str = str(path_dir)
49
+ if path_str not in sys.path and path_dir.exists():
50
+ sys.path.insert(0, path_str)
51
+
52
+ # ============================================================================
53
+ # CONFIGURATION & LOGGING
54
+ # ============================================================================
55
+
56
+ LOG_PATH = "/app/logs/cain.log" # Writable in Docker container
57
+ FRONTEND_PATH = Path(__file__).parent / "frontend"
58
+ AGENT_DASHBOARD = FRONTEND_PATH / "agent-dashboard.html"
59
+
60
+ # Configure structured logging to file and stdout
61
+ class StructuredFormatter(logging.Formatter):
62
+ """JSON-like structured log formatter."""
63
+ def format(self, record):
64
+ log_data = {
65
+ "timestamp": datetime.utcnow().isoformat() + "+00:00",
66
+ "level": record.levelname,
67
+ "logger": record.name,
68
+ "message": record.getMessage(),
69
+ "module": record.module,
70
+ "function": record.funcName,
71
+ "line": record.lineno,
72
+ }
73
+ if record.exc_info:
74
+ log_data["exception"] = self.formatException(record.exc_info)
75
+ return str(log_data)
76
+
77
+ # Set up file handler
78
+ try:
79
+ file_handler = logging.FileHandler(LOG_PATH)
80
+ file_handler.setFormatter(StructuredFormatter())
81
+ except (PermissionError, FileNotFoundError):
82
+ file_handler = logging.StreamHandler(sys.stdout)
83
+ file_handler.setFormatter(StructuredFormatter())
84
+
85
+ # Set up stdout handler
86
+ stdout_handler = logging.StreamHandler(sys.stdout)
87
+ stdout_handler.setFormatter(logging.Formatter(
88
+ "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s"
89
+ ))
90
+
91
+ # Configure root logger
92
+ logger = logging.getLogger(__name__)
93
+ logger.setLevel(logging.INFO)
94
+ logger.addHandler(file_handler)
95
+ logger.addHandler(stdout_handler)
96
+
97
+ # ============================================================================
98
+ # WORKER MANAGER (Handles worker process lifecycle with blocking startup)
99
+ # ============================================================================
100
+
101
+ class WorkerManager:
102
+ """
103
+ Manages the worker process lifecycle with blocking startup verification.
104
+
105
+ Ensures the app does NOT report "healthy" until the worker is confirmed running.
106
+ Uses asyncio subprocess for proper async handling and retry logic.
107
+ """
108
+
109
+ def __init__(self, shared_state_ref):
110
+ self.shared_state = shared_state_ref
111
+ self._process: Optional[asyncio.subprocess.Process] = None
112
+ self._log_path = Path("/app/cain.log")
113
+ self._pid_file = Path("/app/worker.pid")
114
+
115
+ async def spawn_with_retry(
116
+ self,
117
+ max_attempts: int = 2,
118
+ retry_delay: float = 1.0,
119
+ heartbeat_timeout: float = 5.0
120
+ ) -> bool:
121
+ """
122
+ Spawn worker process with retry logic - NON-BLOCKING.
123
+
124
+ Worker starts asynchronously; heartbeat updates health status separately.
125
+
126
+ Args:
127
+ max_attempts: Maximum spawn attempts (default: 2, reduced for faster startup)
128
+ retry_delay: Seconds to wait between retries (default: 1.0, reduced)
129
+ heartbeat_timeout: Max seconds to wait for heartbeat per attempt (default: 5.0, reduced)
130
+
131
+ Returns:
132
+ True if worker spawned (even if heartbeat not yet received)
133
+ """
134
+ brain_path = Path(__file__).parent / "brain_minimal.py"
135
+
136
+ for attempt in range(1, max_attempts + 1):
137
+ logger.info(f"[WorkerManager] Spawn attempt {attempt}/{max_attempts}")
138
+ print(f"[WorkerManager] Spawn attempt {attempt}/{max_attempts}")
139
+ sys.stdout.flush()
140
+ sys.stderr.flush()
141
+
142
+ try:
143
+ # Open log file for appending
144
+ log_file = open(self._log_path, "a") if self._log_path.parent.exists() else None
145
+
146
+ # Spawn using asyncio.create_subprocess_exec
147
+ self._process = await asyncio.create_subprocess_exec(
148
+ sys.executable,
149
+ str(brain_path),
150
+ stdout=asyncio.subprocess.PIPE,
151
+ stderr=asyncio.subprocess.PIPE,
152
+ env=self._get_worker_env()
153
+ )
154
+
155
+ worker_pid = self._process.pid
156
+ logger.info(f"[WorkerManager] Worker spawned with PID {worker_pid}")
157
+ print(f"[WorkerManager] Worker spawned with PID {worker_pid}")
158
+ sys.stdout.flush()
159
+
160
+ # Start draining stdout/stderr to log file
161
+ asyncio.create_task(self._drain_stdout(log_file))
162
+ asyncio.create_task(self._drain_stderr(log_file))
163
+
164
+ # Write PID file for external monitoring
165
+ self._pid_file.write_text(str(worker_pid))
166
+ logger.info(f"[WorkerManager] PID file written: {self._pid_file}")
167
+
168
+ # NON-BLOCKING: Mark worker as spawning, heartbeat will confirm it's running
169
+ self.shared_state.update(
170
+ stage="RUNNING",
171
+ health="HEALTHY", # Report healthy immediately so app passes health checks
172
+ worker_pid=worker_pid,
173
+ worker_active=True, # Assume active until heartbeat fails
174
+ worker_mode="standalone_process"
175
+ )
176
+ logger.info(f"[WorkerManager] Worker spawn initiated (PID: {worker_pid})")
177
+ print(f"[WorkerManager] ✓ Worker spawned - waiting for heartbeat")
178
+ sys.stdout.flush()
179
+ return True
180
+
181
+ except Exception as e:
182
+ logger.error(f"[WorkerManager] Spawn attempt {attempt} failed: {type(e).__name__}: {e}")
183
+ print(f"[WorkerManager] ✗ Spawn attempt {attempt} failed: {e}")
184
+ sys.stdout.flush()
185
+
186
+ # Retry delay (except after last attempt)
187
+ if attempt < max_attempts:
188
+ logger.info(f"[WorkerManager] Waiting {retry_delay}s before retry...")
189
+ await asyncio.sleep(retry_delay)
190
+
191
+ # All attempts failed - but don't crash, just log and continue
192
+ error_msg = f"Failed to spawn worker after {max_attempts} attempts"
193
+ logger.error(error_msg)
194
+ print(f"[WorkerManager] ⚠ {error_msg} - continuing without worker")
195
+ sys.stdout.flush()
196
+
197
+ self.shared_state.update(
198
+ stage="RUNNING",
199
+ health="DEGRADED", # Degraded but still serving requests
200
+ error=error_msg,
201
+ worker_active=False,
202
+ worker_pid=None
203
+ )
204
+
205
+ return False # Return False but don't raise - app stays alive
206
+
207
+ async def _wait_for_heartbeat(self, timeout: float) -> bool:
208
+ """
209
+ Wait for worker heartbeat to be received.
210
+
211
+ Args:
212
+ timeout: Maximum seconds to wait
213
+
214
+ Returns:
215
+ True if heartbeat received, False if timeout
216
+ """
217
+ start_time = time.time()
218
+ check_interval = 0.2
219
+
220
+ while (time.time() - start_time) < timeout:
221
+ state = self.shared_state.get()
222
+ if state.get("worker_active") is True:
223
+ logger.info(f"[WorkerManager] Heartbeat confirmed after {time.time() - start_time:.1f}s")
224
+ return True
225
+
226
+ # Check if process crashed
227
+ if self._process and self._process.poll() is not None:
228
+ logger.warning(f"[WorkerManager] Process exited during heartbeat wait")
229
+ return False
230
+
231
+ await asyncio.sleep(check_interval)
232
+
233
+ return False
234
+
235
+ async def _drain_stdout(self, log_file=None):
236
+ """Drain worker stdout to prevent buffer blocking."""
237
+ if not self._process or not self._process.stdout:
238
+ return
239
+ try:
240
+ while True:
241
+ line = await self._process.stdout.readline()
242
+ if not line:
243
+ break
244
+ line_str = line.decode("utf-8", errors="replace").rstrip()
245
+ logger.info(f"[WORKER_STDOUT] {line_str}")
246
+ if log_file:
247
+ log_file.write(f"[STDOUT] {line_str}\n")
248
+ log_file.flush()
249
+ except Exception:
250
+ pass
251
+
252
+ async def _drain_stderr(self, log_file=None):
253
+ """Drain worker stderr to prevent buffer blocking."""
254
+ if not self._process or not self._process.stderr:
255
+ return
256
+ try:
257
+ while True:
258
+ line = await self._process.stderr.readline()
259
+ if not line:
260
+ break
261
+ line_str = line.decode("utf-8", errors="replace").rstrip()
262
+ logger.warning(f"[WORKER_STDERR] {line_str}")
263
+ if log_file:
264
+ log_file.write(f"[STDERR] {line_str}\n")
265
+ log_file.flush()
266
+ except Exception:
267
+ pass
268
+
269
+ async def _terminate_worker(self):
270
+ """Terminate the worker process if running."""
271
+ if self._process:
272
+ try:
273
+ self._process.terminate()
274
+ try:
275
+ await asyncio.wait_for(self._process.wait(), timeout=5.0)
276
+ except asyncio.TimeoutError:
277
+ self._process.kill()
278
+ await self._process.wait()
279
+ except Exception as e:
280
+ logger.warning(f"[WorkerManager] Error terminating worker: {e}")
281
+
282
+ # Clean up PID file
283
+ if self._pid_file.exists():
284
+ try:
285
+ self._pid_file.unlink()
286
+ except Exception:
287
+ pass
288
+
289
+ def _get_worker_env(self) -> dict[str, str]:
290
+ """Build environment for worker process.
291
+
292
+ CRITICAL: Uses os.environ.copy() which automatically inherits all
293
+ HF Space secrets (HF_TOKEN, OPENAI_API_KEY, OPENROUTER_API_KEY, etc).
294
+ DO NOT set empty defaults - that can interfere with secret inheritance.
295
+ """
296
+ worker_env = os.environ.copy()
297
+ worker_env["PYTHONPATH"] = "/app:" + worker_env.get("PYTHONPATH", "")
298
+ worker_env["CAIN_IS_WORKER"] = "true"
299
+ return worker_env
300
+
301
+ async def shutdown(self):
302
+ """Gracefully shutdown the worker process."""
303
+ if self._process:
304
+ logger.info("[WorkerManager] Shutting down worker process...")
305
+ await self._terminate_worker()
306
+ self._process = None
307
+
308
+
309
+ # Global worker manager instance
310
+ _worker_manager: Optional[WorkerManager] = None
311
+
312
+
313
+ # ============================================================================
314
+ # BACKGROUND TASK MANAGER (Non-blocking)
315
+ # ============================================================================
316
+
317
+ class TaskStatus(str, Enum):
318
+ PENDING = "pending"
319
+ RUNNING = "running"
320
+ COMPLETED = "completed"
321
+ FAILED = "failed"
322
+
323
+
324
+ class BackgroundTask:
325
+ """Represents a single background task."""
326
+
327
+ def __init__(self, task_id: str, command: str, task_type: str = "shell"):
328
+ self.task_id = task_id
329
+ self.command = command
330
+ self.task_type = task_type
331
+ self.status = TaskStatus.PENDING
332
+ self.created_at = datetime.utcnow().isoformat() + "+00:00"
333
+ self.started_at: Optional[str] = None
334
+ self.completed_at: Optional[str] = None
335
+ self.output: str = ""
336
+ self.error: Optional[str] = None
337
+ self.exit_code: Optional[int] = None
338
+ self._process: Optional[subprocess.Popen] = None
339
+
340
+ def start(self):
341
+ """Start executing the task in background."""
342
+ self.status = TaskStatus.RUNNING
343
+ self.started_at = datetime.utcnow().isoformat() + "+00:00"
344
+
345
+ try:
346
+ if self.task_type == "shell":
347
+ self._process = subprocess.Popen(
348
+ self.command,
349
+ shell=True,
350
+ stdout=subprocess.PIPE,
351
+ stderr=subprocess.PIPE,
352
+ text=True,
353
+ bufsize=1,
354
+ universal_newlines=True
355
+ )
356
+ else:
357
+ # Python script execution
358
+ self._process = subprocess.Popen(
359
+ [sys.executable, "-c", self.command],
360
+ stdout=subprocess.PIPE,
361
+ stderr=subprocess.PIPE,
362
+ text=True,
363
+ bufsize=1,
364
+ universal_newlines=True
365
+ )
366
+ except Exception as e:
367
+ self.status = TaskStatus.FAILED
368
+ self.error = str(e)
369
+ self.completed_at = datetime.utcnow().isoformat() + "+00:00"
370
+ logger.error(f"Task {self.task_id} failed to start: {e}")
371
+
372
+ def poll(self):
373
+ """Poll task status and update if complete."""
374
+ if self._process is None:
375
+ return
376
+
377
+ # Check if process has completed
378
+ returncode = self._process.poll()
379
+ if returncode is not None:
380
+ # Process finished
381
+ self.status = TaskStatus.COMPLETED if returncode == 0 else TaskStatus.FAILED
382
+ self.completed_at = datetime.utcnow().isoformat() + "+00:00"
383
+ self.exit_code = returncode
384
+
385
+ # Capture remaining output
386
+ stdout, stderr = self._process.communicate()
387
+ self.output += stdout
388
+ if stderr:
389
+ self.error = stderr
390
+
391
+ logger.info(f"Task {self.task_id} completed with exit code {returncode}")
392
+
393
+ def read_output(self):
394
+ """Read available output without blocking."""
395
+ if self._process is None:
396
+ return
397
+ try:
398
+ # Non-blocking read of available output
399
+ if self._process.stdout:
400
+ lines = self._process.stdout.readlines()
401
+ self.output += "".join(lines)
402
+ except Exception:
403
+ pass
404
+
405
+ def to_dict(self) -> dict[str, Any]:
406
+ """Convert task to dictionary for API response."""
407
+ return {
408
+ "task_id": self.task_id,
409
+ "command": self.command,
410
+ "task_type": self.task_type,
411
+ "status": self.status,
412
+ "created_at": self.created_at,
413
+ "started_at": self.started_at,
414
+ "completed_at": self.completed_at,
415
+ "exit_code": self.exit_code,
416
+ "output": self.output,
417
+ "error": self.error,
418
+ }
419
+
420
+
421
+ class TaskManager:
422
+ """
423
+ Non-blocking background task manager.
424
+
425
+ Runs shell commands and Python scripts asynchronously without
426
+ blocking the main Uvicorn thread.
427
+ """
428
+
429
+ def __init__(self):
430
+ self._tasks: dict[str, BackgroundTask] = {}
431
+ self._lock = threading.Lock()
432
+ self._running = True
433
+ # Start background poller thread
434
+ self._poller_thread = threading.Thread(
435
+ target=self._poll_loop,
436
+ daemon=True,
437
+ name="TaskManagerPoller"
438
+ )
439
+ self._poller_thread.start()
440
+ logger.info("TaskManager initialized with background poller thread")
441
+
442
+ def _poll_loop(self):
443
+ """Background thread that polls all running tasks."""
444
+ while self._running:
445
+ try:
446
+ with self._lock:
447
+ for task in list(self._tasks.values()):
448
+ if task.status == TaskStatus.RUNNING:
449
+ task.read_output()
450
+ task.poll()
451
+ except Exception as e:
452
+ logger.error(f"Error in task poll loop: {e}")
453
+ time.sleep(0.1) # Poll every 100ms
454
+
455
+ def create_task(self, command: str, task_type: str = "shell") -> str:
456
+ """Create a new background task and start it."""
457
+ task_id = str(uuid.uuid4())[:8]
458
+ task = BackgroundTask(task_id, command, task_type)
459
+ task.start()
460
+
461
+ with self._lock:
462
+ self._tasks[task_id] = task
463
+
464
+ logger.info(f"Created task {task_id}: {command[:50]}...")
465
+ return task_id
466
+
467
+ def get_task(self, task_id: str) -> Optional[BackgroundTask]:
468
+ """Get a task by ID."""
469
+ with self._lock:
470
+ return self._tasks.get(task_id)
471
+
472
+ def list_tasks(self) -> list[dict[str, Any]]:
473
+ """List all tasks."""
474
+ with self._lock:
475
+ return [task.to_dict() for task in self._tasks.values()]
476
+
477
+ def cancel_task(self, task_id: str) -> bool:
478
+ """Cancel a running task."""
479
+ with self._lock:
480
+ task = self._tasks.get(task_id)
481
+ if task and task._process:
482
+ task._process.terminate()
483
+ task.status = TaskStatus.FAILED
484
+ task.completed_at = datetime.utcnow().isoformat() + "+00:00"
485
+ task.error = "Task cancelled by user"
486
+ logger.info(f"Cancelled task {task_id}")
487
+ return True
488
+ return False
489
+
490
+ def cleanup_old_tasks(self, max_age_hours: int = 24):
491
+ """Remove completed tasks older than max_age_hours."""
492
+ cutoff = datetime.utcnow().timestamp() - (max_age_hours * 3600)
493
+ with self._lock:
494
+ to_remove = []
495
+ for task_id, task in self._tasks.items():
496
+ if task.status in (TaskStatus.COMPLETED, TaskStatus.FAILED):
497
+ if task.completed_at:
498
+ try:
499
+ completed_time = datetime.fromisoformat(
500
+ task.completed_at.replace("+00:00", "").replace("Z", "")
501
+ ).timestamp()
502
+ if completed_time < cutoff:
503
+ to_remove.append(task_id)
504
+ except Exception:
505
+ pass
506
+ for task_id in to_remove:
507
+ del self._tasks[task_id]
508
+ if to_remove:
509
+ logger.info(f"Cleaned up {len(to_remove)} old tasks")
510
+
511
+ def get_status(self) -> dict[str, Any]:
512
+ """Get overall task manager status."""
513
+ with self._lock:
514
+ status_counts = {}
515
+ for task in self._tasks.values():
516
+ status_counts[task.status] = status_counts.get(task.status, 0) + 1
517
+ return {
518
+ "total_tasks": len(self._tasks),
519
+ "status_breakdown": status_counts,
520
+ "poller_thread_alive": self._poller_thread.is_alive(),
521
+ }
522
+
523
+
524
+ # ============================================================================
525
+ # SHARED STATE (In-memory)
526
+ # ============================================================================
527
+
528
+
529
+ class SharedState:
530
+ """In-memory shared state for process status."""
531
+
532
+ def __init__(self):
533
+ self._lock = threading.Lock()
534
+ self._state = {
535
+ "worker_state": "initializing",
536
+ "worker_active": False, # Set to True only after worker spawns successfully
537
+ "worker_pid": None, # Set to worker PID after successful spawn
538
+ "worker_mode": None, # Set after spawn
539
+ "last_heartbeat": None,
540
+ "heartbeat_age_seconds": 0,
541
+ "stage": "STARTUP_INIT",
542
+ "health": "INITIALIZING",
543
+ "error": None,
544
+ "uptime_seconds": 0,
545
+ "started_at": datetime.utcnow().isoformat() + "+00:00",
546
+ }
547
+ self._start_time = time.time()
548
+
549
+ def update(self, **kwargs):
550
+ """Update state with new values."""
551
+ with self._lock:
552
+ self._state["last_heartbeat"] = datetime.utcnow().isoformat() + "+00:00"
553
+ self._state["heartbeat_age_seconds"] = 0
554
+ for key, value in kwargs.items():
555
+ if value is not None:
556
+ self._state[key] = value
557
+
558
+ def get(self) -> dict[str, Any]:
559
+ """Get current state snapshot."""
560
+ with self._lock:
561
+ self._state["uptime_seconds"] = int(time.time() - self._start_time)
562
+
563
+ # Calculate heartbeat age
564
+ if self._state["last_heartbeat"]:
565
+ try:
566
+ heartbeat_time = datetime.fromisoformat(
567
+ self._state["last_heartbeat"].replace("+00:00", "").replace("Z", "")
568
+ )
569
+ if heartbeat_time.tzinfo is not None:
570
+ heartbeat_time = heartbeat_time.replace(tzinfo=None)
571
+ age = (datetime.utcnow() - heartbeat_time).total_seconds()
572
+ self._state["heartbeat_age_seconds"] = age
573
+ except Exception:
574
+ self._state["heartbeat_age_seconds"] = 999
575
+
576
+ return self._state.copy()
577
+
578
+
579
+ # ============================================================================
580
+ # REQUEST LOGGING MIDDLEWARE
581
+ # ============================================================================
582
+
583
+ class RequestLoggingMiddleware(BaseHTTPMiddleware):
584
+ """
585
+ Middleware to log all incoming requests with method, path, and client IP.
586
+ Helps track internal state transitions for debugging.
587
+ """
588
+
589
+ async def dispatch(self, request: Request, call_next):
590
+ """Process request and log details."""
591
+ client_ip = request.client.host if request.client else "unknown"
592
+ method = request.method
593
+ path = request.url.path
594
+ query = str(request.url.query) if request.url.query else ""
595
+
596
+ logger.info(f"REQUEST: {method} {path}{'?' + query if query else ''} from {client_ip}")
597
+
598
+ # Process request
599
+ response = await call_next(request)
600
+
601
+ # Log response status
602
+ logger.info(f"RESPONSE: {method} {path} -> {response.status_code}")
603
+
604
+ return response
605
+
606
+
607
+ # ============================================================================
608
+ # FASTAPI APP
609
+ # ============================================================================
610
+
611
+ # Initialize shared state and task manager
612
+ shared_state = SharedState()
613
+ task_manager = TaskManager()
614
+
615
+
616
+ def _check_dependencies() -> tuple[bool, str]:
617
+ """Pre-initialization check for PYTHONPATH and dependencies."""
618
+ errors = []
619
+
620
+ # Check PYTHONPATH
621
+ if "/app" not in sys.path:
622
+ errors.append("PYTHONPATH missing /app")
623
+
624
+ # Check critical imports
625
+ try:
626
+ import fastapi
627
+ import uvicorn
628
+ except ImportError as e:
629
+ errors.append(f"Missing dependency: {e}")
630
+
631
+ # Check brain_minimal.py exists
632
+ brain_path = Path(__file__).parent / "brain_minimal.py"
633
+ if not brain_path.exists():
634
+ errors.append(f"brain_minimal.py not found at {brain_path}")
635
+
636
+ return (len(errors) == 0, "; ".join(errors) if errors else "OK")
637
+
638
+
639
+ async def _initialize_worker():
640
+ """
641
+ Background task to initialize worker process.
642
+ Spawns worker with retry logic and waits for heartbeat confirmation.
643
+ Runs after FastAPI server is ready to accept requests.
644
+ """
645
+ global _worker_manager
646
+
647
+ print("=" * 60)
648
+ print("🚀 CAIN WORKER INITIALIZATION: Starting async worker spawn")
649
+ print("=" * 60)
650
+ sys.stdout.flush()
651
+ sys.stderr.flush()
652
+
653
+ # Update state to show we're initializing worker
654
+ shared_state.update(
655
+ stage="RUNNING_APP_STARTING",
656
+ health="INITIALIZING_WORKER",
657
+ worker_active=False,
658
+ worker_pid=None
659
+ )
660
+
661
+ try:
662
+ # Initialize WorkerManager
663
+ _worker_manager = WorkerManager(shared_state)
664
+
665
+ # Spawn worker with extended timeout for HuggingFace Spaces environment
666
+ # Use 30 seconds heartbeat timeout instead of 15 for slower environments
667
+ success = await _worker_manager.spawn_with_retry(
668
+ max_attempts=5, # More attempts for HF Spaces
669
+ retry_delay=2.0,
670
+ heartbeat_timeout=30.0 # Extended timeout
671
+ )
672
+
673
+ if success:
674
+ state = shared_state.get()
675
+ print("=" * 60)
676
+ print("✅ CAIN WORKER INITIALIZED SUCCESSFULLY")
677
+ print(f" Stage: {state.get('stage')}")
678
+ print(f" Health: {state.get('health')}")
679
+ print(f" Worker PID: {state.get('worker_pid')}")
680
+ print(f" Worker Active: {state.get('worker_active')}")
681
+ print("=" * 60)
682
+ logger.info("Cain worker initialized successfully")
683
+ else:
684
+ raise RuntimeError("Worker spawn returned False without exception")
685
+
686
+ except Exception as e:
687
+ logger.error(f"Worker initialization failed: {type(e).__name__}: {e}")
688
+ print(f"❌ CAIN WORKER INITIALIZATION FAILED: {e}")
689
+ sys.stdout.flush()
690
+ shared_state.update(
691
+ stage="RUNNING_APP_STARTING",
692
+ health="WORKER_FAILED",
693
+ error=f"Worker init failed: {e}",
694
+ worker_active=False,
695
+ worker_pid=None
696
+ )
697
+
698
+
699
+ @asynccontextmanager
700
+ async def lifespan(app: FastAPI):
701
+ """
702
+ Lifespan context manager for startup/shutdown.
703
+ Ensures worker initialization happens after FastAPI is ready.
704
+ """
705
+ # Startup: Start worker initialization in background
706
+ print("=" * 60)
707
+ print("🚀 CAIN LIFESPAN: Startup starting")
708
+ print("=" * 60)
709
+ sys.stdout.flush()
710
+
711
+ # Pre-initialization checks
712
+ deps_ok, deps_msg = _check_dependencies()
713
+ if not deps_ok:
714
+ logger.critical(f"DEPENDENCY CHECK FAILED: {deps_msg}")
715
+ print(f"CRITICAL: {deps_msg}")
716
+ sys.stdout.flush()
717
+ shared_state.update(
718
+ stage="RUNNING",
719
+ health="DEGRADED", # Use DEGRADED instead of DEPENDENCY_ERROR
720
+ error=f"Pre-initialization check failed: {deps_msg}",
721
+ worker_active=False
722
+ )
723
+ # Don't raise - let app start in degraded mode
724
+ print("⚠️ Starting in DEGRADED mode (worker not available)")
725
+ sys.stdout.flush()
726
+ else:
727
+ print("✓ Pre-initialization checks passed")
728
+ sys.stdout.flush()
729
+
730
+ # Start worker initialization as background task
731
+ # This allows FastAPI to start accepting requests immediately
732
+ # while worker spawns in background
733
+ asyncio.create_task(_initialize_worker())
734
+
735
+ yield
736
+
737
+ # Shutdown: Cleanup
738
+ print("=" * 60)
739
+ print("🛑 CAIN LIFESPAN: Shutdown starting")
740
+ print("=" * 60)
741
+ sys.stdout.flush()
742
+
743
+ task_manager._running = False
744
+
745
+ # Shutdown worker manager
746
+ global _worker_manager
747
+ if _worker_manager is not None:
748
+ try:
749
+ await _worker_manager.shutdown()
750
+ except Exception as e:
751
+ logger.error(f"Error shutting down worker manager: {e}")
752
+ finally:
753
+ _worker_manager = None
754
+
755
+ print("CAIN shutdown complete")
756
+ sys.stdout.flush()
757
+
758
+
759
+ app = FastAPI(
760
+ title="HuggingClaw Cain",
761
+ description="Agent collaboration server with background task execution",
762
+ version="2.0.0",
763
+ lifespan=lifespan
764
+ )
765
+
766
+ # DEBUG: Print APP_READY immediately to verify core app instantiation
767
+ print("APP_READY")
768
+ sys.stdout.flush()
769
+
770
+ # TEMPORARILY DISABLED: Testing Eve's hypothesis - comment out error_handlers to break potential dependency chain
771
+ # # Register exception handlers (import here to avoid circular dependency)
772
+ # from error_handlers import register_error_handlers
773
+ # register_error_handlers(app)
774
+ #
775
+ # # Add request logging middleware
776
+ # app.add_middleware(RequestLoggingMiddleware)
777
+
778
+ # Mount static files for frontend assets (images, fonts, etc.)
779
+ app.mount("/static", StaticFiles(directory=str(FRONTEND_PATH)), name="static")
780
+
781
+
782
+ @app.get("/")
783
+ async def read_root():
784
+ """Serve the agent dashboard HTML."""
785
+ logger.info("GET / - Serving agent dashboard")
786
+
787
+ if AGENT_DASHBOARD.exists():
788
+ return FileResponse(str(AGENT_DASHBOARD), media_type="text/html")
789
+
790
+ # Fallback if dashboard file missing
791
+ return JSONResponse({
792
+ "status": "alive",
793
+ "message": "Cain core operational",
794
+ "note": "Agent dashboard file not found",
795
+ "frontend_path": str(FRONTEND_PATH),
796
+ })
797
+
798
+
799
+ @app.get("/health")
800
+ async def health_check():
801
+ """
802
+ Health check endpoint for container orchestration.
803
+
804
+ Returns:
805
+ - 200: System is operational (worker may still be initializing)
806
+ - Detailed worker status in response body
807
+ """
808
+ state = shared_state.get()
809
+ worker_alive = False
810
+ worker_pid = None
811
+
812
+ # Check live worker process status
813
+ if _worker_manager is not None and _worker_manager._process is not None:
814
+ worker_pid = _worker_manager._process.pid
815
+ returncode = _worker_manager._process.poll()
816
+ worker_alive = returncode is None # None = still running
817
+
818
+ # Determine overall health
819
+ # App is healthy if it's running, worker can be initializing or degraded
820
+ is_healthy = state["health"] in ("HEALTHY", "INITIALIZING", "INITIALIZING_WORKER", "RUNNING_APP_STARTING", "DEGRADED")
821
+
822
+ return JSONResponse({
823
+ "status": "healthy" if is_healthy else "unhealthy",
824
+ "stage": state["stage"],
825
+ "worker": {
826
+ "active": state["worker_active"],
827
+ "alive": worker_alive,
828
+ "pid": worker_pid or state.get("worker_pid"),
829
+ "mode": state["worker_mode"],
830
+ "heartbeat_age_seconds": state["heartbeat_age_seconds"],
831
+ },
832
+ "uptime_seconds": state["uptime_seconds"],
833
+ "timestamp": datetime.utcnow().isoformat() + "+00:00",
834
+ }, status_code=200 if is_healthy else 503)
835
+
836
+
837
+ @app.get("/api/state")
838
+ async def get_state():
839
+ """
840
+ Get detailed process and system status.
841
+
842
+ Returns live worker process status (not just cached state).
843
+ """
844
+ logger.debug("GET /api/state called")
845
+
846
+ state = shared_state.get()
847
+ task_status = task_manager.get_status()
848
+
849
+ # Check live worker process status from WorkerManager
850
+ worker_alive = False
851
+ worker_pid = None
852
+
853
+ if _worker_manager is not None and _worker_manager._process is not None:
854
+ worker_pid = _worker_manager._process.pid
855
+ returncode = _worker_manager._process.poll()
856
+ worker_alive = returncode is None # None = still running
857
+
858
+ # Update shared_state with real-time worker status
859
+ if worker_alive != state.get("worker_active"):
860
+ shared_state.update(worker_active=worker_alive)
861
+ state["worker_active"] = worker_alive
862
+
863
+ # Determine worker health - combine live process check with heartbeat freshness
864
+ worker_healthy = (
865
+ worker_alive and # Must be actually running
866
+ state["heartbeat_age_seconds"] < 30 # And heartbeat must be recent
867
+ )
868
+
869
+ return JSONResponse({
870
+ "cain": {
871
+ "name": "Cain",
872
+ "space_id": "tao-shen/HuggingClaw-Cain",
873
+ "stage": state["stage"],
874
+ "health": state["health"],
875
+ "error": state.get("error"),
876
+ "uptime_seconds": state["uptime_seconds"],
877
+ "started_at": state["started_at"],
878
+ },
879
+ "worker": {
880
+ "state": state["worker_state"],
881
+ "active": state["worker_active"], # Now reflects real-time process status
882
+ "pid": worker_pid or state.get("worker_pid"),
883
+ "mode": state["worker_mode"],
884
+ "last_heartbeat": state["last_heartbeat"],
885
+ "heartbeat_age_seconds": state["heartbeat_age_seconds"],
886
+ "is_healthy": worker_healthy,
887
+ },
888
+ "tasks": task_status,
889
+ "timestamp": datetime.utcnow().isoformat() + "+00:00",
890
+ })
891
+
892
+
893
+ # Internal heartbeat endpoint (used by worker processes)
894
+ @app.post("/internal/heartbeat")
895
+ async def heartbeat(data: dict[str, Any]):
896
+ """Receive heartbeat from worker process."""
897
+ logger.info(f"[HEARTBEAT] Received from worker_pid={data.get('worker_pid')}, worker_state={data.get('worker_state')}, stage={data.get('stage')}")
898
+ print(f"[HEARTBEAT] Received from worker_pid={data.get('worker_pid')}, worker_state={data.get('worker_state')}, stage={data.get('stage')}")
899
+ sys.stdout.flush()
900
+
901
+ # Get current state before update
902
+ current = shared_state.get()
903
+ current_stage = current.get("stage")
904
+ incoming_stage = data.get("stage")
905
+
906
+ # If startup set RUNNING_APP_READY, allow worker to upgrade to RUNNING_A2A_READY
907
+ # but don't allow downgrades or unknown stages
908
+ if current_stage == "RUNNING_APP_READY" and incoming_stage == "RUNNING_A2A_READY":
909
+ print(f"[HEARTBEAT] Stage transition: {current_stage} -> {incoming_stage}")
910
+ shared_state.update(**data)
911
+ elif current_stage in ("RUNNING_APP_READY", "RUNNING_A2A_READY"):
912
+ # Preserve the app-ready stage, don't overwrite with worker's stage
913
+ # But update other fields
914
+ filtered_data = {k: v for k, v in data.items() if k != "stage"}
915
+ shared_state.update(**filtered_data)
916
+ print(f"[HEARTBEAT] Preserving stage={current_stage}, updating other fields")
917
+ else:
918
+ # Normal update for other stages
919
+ shared_state.update(**data)
920
+
921
+ sys.stdout.flush()
922
+ return {"status": "received"}
923
+
924
+
925
+ @app.get("/read_startup_error")
926
+ async def read_startup_error():
927
+ """
928
+ DEBUG: Read the startup error log to expose worker spawn failures.
929
+ This endpoint bypasses the silent failure loop by forcing errors to be visible.
930
+ """
931
+ startup_error_log = Path(__file__).parent / "startup_error.log"
932
+ if startup_error_log.exists():
933
+ content = startup_error_log.read_text()
934
+ return JSONResponse({
935
+ "startup_error_log_exists": True,
936
+ "content": content
937
+ })
938
+ return JSONResponse({
939
+ "startup_error_log_exists": False,
940
+ "message": "No startup error log found - worker may have started successfully"
941
+ })
942
+
943
+
944
+ # ============================================================================
945
+ # TASK EXECUTION API
946
+ # ============================================================================
947
+
948
+ @app.post("/api/tasks/execute")
949
+ async def execute_task(request: dict[str, Any]):
950
+ """
951
+ Execute a command or script in the background.
952
+
953
+ Request body:
954
+ {
955
+ "command": "echo 'hello'", # Command to execute
956
+ "type": "shell" | "python" # Execution type (default: shell)
957
+ }
958
+ """
959
+ command = request.get("command")
960
+ task_type = request.get("type", "shell")
961
+
962
+ if not command:
963
+ raise HTTPException(status_code=400, detail="Command is required")
964
+
965
+ task_id = task_manager.create_task(command, task_type)
966
+ logger.info(f"Started task {task_id}: {command[:100]}")
967
+
968
+ return JSONResponse({
969
+ "task_id": task_id,
970
+ "status": "started",
971
+ "message": f"Task {task_id} started successfully"
972
+ })
973
+
974
+
975
+ @app.get("/api/tasks/{task_id}")
976
+ async def get_task_status(task_id: str):
977
+ """Get status of a specific task."""
978
+ task = task_manager.get_task(task_id)
979
+ if not task:
980
+ raise HTTPException(status_code=404, detail="Task not found")
981
+
982
+ # Poll for any updates
983
+ task.poll()
984
+
985
+ return JSONResponse(task.to_dict())
986
+
987
+
988
+ @app.get("/api/tasks")
989
+ async def list_tasks():
990
+ """List all tasks."""
991
+ return JSONResponse({"tasks": task_manager.list_tasks()})
992
+
993
+
994
+ @app.delete("/api/tasks/{task_id}")
995
+ async def cancel_task(task_id: str):
996
+ """Cancel a running task."""
997
+ if task_manager.cancel_task(task_id):
998
+ return JSONResponse({"status": "cancelled", "task_id": task_id})
999
+ raise HTTPException(status_code=404, detail="Task not found or not cancellable")
1000
+
1001
+
1002
+ @app.post("/api/tasks/cleanup")
1003
+ async def cleanup_tasks(max_age_hours: int = 24):
1004
+ """Clean up old completed tasks."""
1005
+ task_manager.cleanup_old_tasks(max_age_hours)
1006
+ return JSONResponse({"status": "cleaned", "max_age_hours": max_age_hours})
1007
+
1008
+
1009
+ # ============================================================================
1010
+ # WEBSOCKET CHAT ENDPOINT
1011
+ # ============================================================================
1012
+
1013
+ @app.websocket("/chat")
1014
+ async def chat_websocket(websocket: WebSocket):
1015
+ """WebSocket endpoint for real-time agent communication."""
1016
+ await websocket.accept()
1017
+ client_id = f"{websocket.client.host}:{websocket.client.port}"
1018
+ logger.info(f"WebSocket connection established from {client_id}")
1019
+
1020
+ try:
1021
+ while True:
1022
+ # Receive message from client
1023
+ data = await websocket.receive_text()
1024
+ logger.info(f"WebSocket message from {client_id}: {data}")
1025
+
1026
+ # Echo acknowledgment back to client
1027
+ await websocket.send_json({
1028
+ "type": "ack",
1029
+ "message": "Message received",
1030
+ "timestamp": datetime.utcnow().isoformat() + "+00:00",
1031
+ "echo": data
1032
+ })
1033
+
1034
+ except WebSocketDisconnect:
1035
+ logger.info(f"WebSocket connection closed by {client_id}")
1036
+ except Exception as e:
1037
+ logger.error(f"WebSocket error for {client_id}: {e}")
1038
+ finally:
1039
+ logger.info(f"WebSocket connection terminated for {client_id}")
1040
+
1041
+
1042
+ # ============================================================================
1043
+ # MAIN ENTRY POINT
1044
+ # ============================================================================
1045
+
1046
+ if __name__ == "__main__":
1047
+ port = int(os.environ.get("PORT", 7860))
1048
+ logger.info(f"🚀 STARTING UVICORN on 0.0.0.0:{port}")
1049
+ uvicorn.run(app, host="0.0.0.0", port=port, log_level="info")