Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Supervisor for OpenClaw Agent Army | |
| Manages multiple agents, restarts on failure, writes system state | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import time | |
| import subprocess | |
| import threading | |
| import http.server | |
| import socketserver | |
| from datetime import datetime | |
| from pathlib import Path | |
| # Configuration | |
| AGENTS = { | |
| 'gig1': {'file': 'agent_army/agents/gig_real.py', 'type': 'gig'}, | |
| 'gig2': {'file': 'agent_army/agents/gig_real.py', 'type': 'gig'}, | |
| 'gig3': {'file': 'agent_army/agents/gig_real.py', 'type': 'gig'}, | |
| 'gig4': {'file': 'agent_army/agents/gig_real.py', 'type': 'gig'}, | |
| 'gig5': {'file': 'agent_army/agents/gig_real.py', 'type': 'gig'}, | |
| 'trading1': {'file': 'agent_army/agents/trading_real.py', 'type': 'trading', 'strategy': 'trend'}, | |
| 'trading2': {'file': 'agent_army/agents/trading_real.py', 'type': 'trading', 'strategy': 'reversion'}, | |
| 'trading3': {'file': 'agent_army/agents/trading_real.py', 'type': 'trading', 'strategy': 'memecoin'}, | |
| 'editor': {'file': 'agent_army/agents/editor_improved.py', 'type': 'supervisor'} | |
| } | |
| BASE_DIR = Path(__file__).parent | |
| STATE_DIR = BASE_DIR / "state" | |
| LOGS_DIR = BASE_DIR / "logs" | |
| STATE_DIR.mkdir(exist_ok=True) | |
| LOGS_DIR.mkdir(exist_ok=True) | |
| class AgentProcess: | |
| def __init__(self, name, config): | |
| self.name = name | |
| self.config = config | |
| self.process = None | |
| self.start_time = None | |
| self.restart_count = 0 | |
| self.state_file = STATE_DIR / f"{name}.json" | |
| self.log_file = LOGS_DIR / f"{name}.log" | |
| def start(self): | |
| script_path = BASE_DIR / self.config['file'] | |
| if not script_path.exists(): | |
| print(f"[{self.name}] Script not found: {script_path}") | |
| # Log to file for debugging | |
| with open(self.log_file, 'a') as f: | |
| f.write(f"[{datetime.now()}] Script not found: {script_path}\n") | |
| return False | |
| # Open log file | |
| log_f = open(self.log_file, 'a') | |
| try: | |
| self.process = subprocess.Popen( | |
| [sys.executable, str(script_path)], | |
| stdout=log_f, | |
| stderr=subprocess.STDOUT, | |
| cwd=BASE_DIR, | |
| env=os.environ.copy() | |
| ) | |
| self.start_time = datetime.now() | |
| print(f"[{self.name}] Started (PID {self.process.pid})") | |
| # Give it a moment to see if it crashes | |
| time.sleep(2) | |
| if self.process.poll() is not None: | |
| exit_code = self.process.returncode | |
| print(f"[{self.name}] Exited immediately with code {exit_code}") | |
| # Read last few lines of log for debugging | |
| try: | |
| with open(self.log_file, 'r') as lf: | |
| lines = lf.readlines() | |
| last_lines = lines[-10:] if len(lines) > 10 else lines | |
| print(f"[{self.name}] Last log lines:") | |
| for line in last_lines: | |
| print(f" {line.rstrip()}") | |
| except: | |
| pass | |
| with open(self.log_file, 'a') as f: | |
| f.write(f"[{datetime.now()}] Exited immediately with code {exit_code}\n") | |
| return False | |
| return True | |
| except Exception as e: | |
| print(f"[{self.name}] Failed to start: {e}") | |
| with open(self.log_file, 'a') as f: | |
| f.write(f"[{datetime.now()}] Failed to start: {e}\n") | |
| log_f.close() | |
| return False | |
| def check(self): | |
| if self.process is None: | |
| return False | |
| if self.process.poll() is not None: | |
| # Process died | |
| exit_code = self.process.returncode | |
| print(f"[{self.name}] Process exited with code {exit_code}") | |
| # Print last lines of log for debugging | |
| try: | |
| with open(self.log_file, 'r') as f: | |
| lines = f.readlines() | |
| last_lines = lines[-10:] if len(lines) > 10 else lines | |
| print(f"[{self.name}] Last log lines:") | |
| for line in last_lines: | |
| print(f" {line.rstrip()}") | |
| except Exception as e: | |
| print(f"[{self.name}] Could not read log: {e}") | |
| self.process = None | |
| return False | |
| return True | |
| def stop(self): | |
| if self.process and self.process.poll() is None: | |
| self.process.terminate() | |
| try: | |
| self.process.wait(timeout=5) | |
| except subprocess.TimeoutExpired: | |
| self.process.kill() | |
| self.process = None | |
| def get_state(self): | |
| if self.state_file.exists(): | |
| try: | |
| with open(self.state_file) as f: | |
| return json.load(f) | |
| except: | |
| return {} | |
| return {} | |
| def run(self): | |
| while True: | |
| if not self.check(): | |
| self.restart_count += 1 | |
| print(f"[{self.name}] Restarted (attempt {self.restart_count})...") | |
| if not self.start(): | |
| print(f"[{self.name}] Start failed, waiting 10s...") | |
| time.sleep(10) | |
| continue | |
| # Read state periodically | |
| state = self.get_state() | |
| if state: | |
| # Could send to central logger | |
| pass | |
| time.sleep(5) | |
| def system_state(agents): | |
| """Write overall system state""" | |
| state = { | |
| "timestamp": datetime.now().isoformat(), | |
| "agents": {} | |
| } | |
| for name, agent in agents.items(): | |
| proc_state = { | |
| "restart_count": agent.restart_count, | |
| "running": agent.process is not None and agent.process.poll() is None, | |
| "pid": agent.process.pid if agent.process and agent.process.poll() is None else None, | |
| "started_at": agent.start_time.isoformat() if agent.start_time else None | |
| } | |
| # Merge agent's own state | |
| agent_state = agent.get_state() | |
| proc_state.update(agent_state) | |
| state["agents"][name] = proc_state | |
| state_file = STATE_DIR / "system.json" | |
| with open(state_file, 'w') as f: | |
| json.dump(state, f, indent=2) | |
| def start_xvfb(): | |
| """Start Xvfb for headless Chrome""" | |
| try: | |
| print("[Xvfb] Starting virtual display on :99") | |
| # Start Xvfb in background | |
| subprocess.run(['Xvfb', ':99', '-screen', '0', '1920x1080x24', '-nolisten', 'tcp', '-fbdir', '/var/tmp'], check=True) | |
| print("[Xvfb] Virtual display started successfully") | |
| except Exception as e: | |
| print(f"[Xvfb] Failed to start: {e}") | |
| print("[Xvfb] Chrome may not work properly without virtual display") | |
| def main(): | |
| print(f"[Supervisor] Starting at {datetime.now()}") | |
| # Start Xvfb FIRST - Chrome depends on it | |
| print("[Supervisor] Starting Xvfb...") | |
| try: | |
| xvfb_proc = subprocess.Popen(['Xvfb', ':99', '-screen', '0', '1920x1080x24', '-nolisten', 'tcp']) | |
| time.sleep(2) # Give Xvfb time to start | |
| print("[Supervisor] Xvfb started") | |
| except Exception as e: | |
| print(f"[Supervisor] Xvfb failed: {e}") | |
| print("[Supervisor] Chrome may not work properly") | |
| agents = {} | |
| # Start all agents | |
| for name, config in AGENTS.items(): | |
| agent = AgentProcess(name, config) | |
| agents[name] = agent | |
| agent.start() | |
| # Start web UI in background thread | |
| web_thread = threading.Thread(target=start_web_server, args=(agents,), daemon=True) | |
| web_thread.start() | |
| # Monitor loop | |
| try: | |
| while True: | |
| time.sleep(30) | |
| # Check all agents | |
| for agent in agents.values(): | |
| if not agent.check(): | |
| # Will auto-restart on next iteration | |
| pass | |
| # Write system state every 5 minutes | |
| if int(time.time()) % 300 < 30: | |
| system_state(agents) | |
| except KeyboardInterrupt: | |
| print("\n[Supervisor] Shutting down...") | |
| for agent in agents.values(): | |
| agent.stop() | |
| if xvfb_proc: | |
| xvfb_proc.terminate() | |
| except Exception as e: | |
| print(f"\n[Supervisor] Fatal error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) | |
| # Web UI Handler | |
| class AgentHandler(http.server.BaseHTTPRequestHandler): | |
| def do_GET(self): | |
| if self.path == '/health' or self.path == '/': | |
| self.send_response(200) | |
| self.send_header('Content-type', 'application/json') | |
| self.end_headers() | |
| status = { | |
| "status": "ok", | |
| "timestamp": datetime.now().isoformat(), | |
| "agents": {} | |
| } | |
| for name, agent in self.server.agents.items(): | |
| agent_status = { | |
| "running": agent.process is not None and agent.process.poll() is None, | |
| "restart_count": agent.restart_count, | |
| "pid": agent.process.pid if agent.process and agent.process.poll() is None else None, | |
| "last_checked": datetime.now().isoformat() | |
| } | |
| # Merge agent's own state (read from file) | |
| state = agent.get_state() | |
| agent_status.update(state) | |
| status["agents"][name] = agent_status | |
| self.wfile.write(json.dumps(status, indent=2).encode()) | |
| elif self.path == '/logs' or self.path == '/archive': | |
| self.send_response(200) | |
| self.send_header('Content-type', 'application/json') | |
| self.end_headers() | |
| # Return recent logs from all agents | |
| logs = {} | |
| for name, agent in self.server.agents.items(): | |
| log_file = BASE_DIR / "logs" / f"{name}.log" | |
| if log_file.exists(): | |
| try: | |
| with open(log_file) as f: | |
| lines = f.readlines() | |
| logs[name] = [line.strip() for line in lines[-20:]] | |
| except: | |
| logs[name] = [] | |
| else: | |
| logs[name] = [] | |
| resp = { | |
| "timestamp": datetime.now().isoformat(), | |
| "logs": logs | |
| } | |
| self.wfile.write(json.dumps(resp, indent=2).encode()) | |
| elif self.path.startswith('/log?name='): | |
| # Single agent log endpoint | |
| import urllib.parse | |
| parsed = urllib.parse.urlparse(self.path) | |
| params = urllib.parse.parse_qs(parsed.query) | |
| name = params.get('name', [None])[0] | |
| if not name or name not in AGENTS: | |
| self.send_response(404) | |
| self.end_headers() | |
| self.wfile.write(b'Agent not found') | |
| return | |
| self.send_response(200) | |
| self.send_header('Content-type', 'text/plain') | |
| self.end_headers() | |
| log_file = BASE_DIR / "logs" / f"{name}.log" | |
| if log_file.exists(): | |
| try: | |
| with open(log_file) as f: | |
| lines = f.readlines() | |
| # Return last 50 lines | |
| tail = lines[-50:] if len(lines) > 50 else lines | |
| self.wfile.write(''.join(tail).encode()) | |
| except Exception as e: | |
| self.wfile.write(f"Error reading log: {e}".encode()) | |
| else: | |
| self.wfile.write(b"No log file found") | |
| else: | |
| self.send_response(404) | |
| self.end_headers() | |
| self.wfile.write(b'Not Found') | |
| def do_POST(self): | |
| self.send_response(501) | |
| self.end_headers() | |
| self.wfile.write(b'Not Implemented') | |
| def start_web_server(agents, port=None): | |
| """Start the HTTP server in a separate thread""" | |
| if port is None: | |
| port = int(os.getenv('PORT', 7860)) # Use HF's default port | |
| try: | |
| with socketserver.TCPServer(("0.0.0.0", port), AgentHandler) as httpd: | |
| httpd.agents = agents | |
| print(f"[Web UI] Running on port {port}") | |
| httpd.serve_forever() | |
| except OSError as e: | |
| print(f"[Web UI] Failed to start on port {port}: {e}") | |
| if __name__ == "__main__": | |
| main() |