#!/usr/bin/env python3 """ Agent Zero Fleet Manager Manages 4 autonomous Agent Zero instances with workspace isolation, self-healing, config persistence, inter-agent delegation, and persistent storage. Usage: python fleet_manager.py --check # Check all 4 agents python fleet_manager.py --provision-all # Provision entire fleet python fleet_manager.py --set-hw agent-zero-main zero-a10g python fleet_manager.py --enable-storage agent-zero-main python fleet_manager.py --deploy-prompts agent-zero-main python fleet_manager.py --setup-pipeline # Configure inter-agent task queue """ import os import json import time import argparse from datetime import datetime from typing import List, Dict, Optional from huggingface_hub import HfApi, SpaceHardware # ─── Fleet Configuration ───────────────────────────────────────────────────── FLEET = { "agent-zero-main": { "repo": "ScottzillaSystems/agentzerov2", "role": "sovereign", "description": "Main orchestrator — receives all tasks, delegates to specialists", "models": ["qwen3.5-27b", "cydonia-24b", "fallen-command"], "hardware_target": "zero-a10g", "persistent_storage": True, }, "agent-zero-pentesting": { "repo": "ScottzillaSystems/agent-zero-pentesting", "role": "specialist", "description": "Security specialist — writes findings to shared task queue", "models": ["qwen3.5-9b"], "hardware_target": "zero-a10g", "persistent_storage": True, }, "agent-zero-finops": { "repo": "ScottzillaSystems/agent-zero-finops", "role": "specialist", "description": "Cost optimization specialist — monitors billing, suggests cuts", "models": ["chatgpt5"], "hardware_target": "cpu-basic", "persistent_storage": False, }, "agent-zero-creative": { "repo": "ScottzillaSystems/agent-zero-adult-entertainment", "role": "specialist", "description": "Creative specialist — content generation, image workflows", "models": ["qwen3.5-9b"], "hardware_target": "zero-a10g", "persistent_storage": True, }, } # ─── Prompts ───────────────────────────────────────────────────────────────── SELF_DIAGNOSTIC_PROMPT = """# Agent Zero Self-Diagnostic Protocol Run this diagnostic immediately on startup and report results: ## 1. System Health - [ ] GPU available: `torch.cuda.is_available()` → report True/False - [ ] GPU memory: `torch.cuda.get_device_properties(0).total_memory / 1e9` → report GB - [ ] Current model loaded: report model_key - [ ] Model cache status: list all keys in _model_cache ## 2. Model Loading Test For each model in your catalog: 1. Call `load_model(model_key)` 2. Report: success/failure, load time, GPU memory after load 3. Call `unload_model(model_key)` 4. Report: memory freed ## 3. Inference Test Send: "What is 2+2?" Expected: Response contains "4" Report: latency_ms, tokens_generated, tokens/sec ## 4. File System Test - [ ] Write test file to `/workspace/projects/test.txt` - [ ] Read it back - [ ] Report: persistent storage working (survives restart?) ## 5. Network Test - [ ] Can reach huggingface.co? - [ ] Can reach your model repos? ## 6. Inter-Agent Test (if applicable) - [ ] Can write to `/workspace/shared/task_queue/`? - [ ] Can read from `/workspace/shared/task_queue/`? Report all results in a single JSON block. """ AUTONOMOUS_LOOP_PROMPT = """# Agent Zero Autonomous Operation Loop You are now running autonomously. Follow this loop indefinitely: ## Loop Cycle (every 60 seconds) 1. **SCAN** `/workspace/shared/task_queue/` for new task files 2. **READ** each task file (JSON format) 3. **EXECUTE** the task using your loaded model 4. **WRITE** results to `/workspace/shared/results/{task_id}.json` 5. **ARCHIVE** completed task to `/workspace/shared/completed/` 6. **SELF-HEAL** if model fails: - Unload current model - Try fallback model from catalog - If all fail, pause and alert 7. **REPORT** status to stdout every cycle ## Task File Format ```json { "task_id": "uuid", "source_agent": "agent-zero-pentesting", "priority": 1-5, "task_type": "analysis|generation|review|action", "prompt": "...", "context": {}, "deadline": "ISO8601", "created_at": "ISO8601" } ``` ## Result File Format ```json { "task_id": "uuid", "completed_by": "agent-zero-main", "completed_at": "ISO8601", "status": "success|failure|partial", "output": "...", "model_used": "cydonia-24b", "tokens_used": 1234, "latency_ms": 5600 } ``` ## Self-Healing Rules - If OOM: unload all models, wait 10s, reload smallest model - If model download fails: retry 3x with exponential backoff - If GPU not available: switch to CPU mode (slower but functional) - If task queue full (>100): process highest priority first Begin autonomous operation now. """ # ─── Fleet Manager ─────────────────────────────────────────────────────────── class AgentZeroFleetManager: def __init__(self, token: Optional[str] = None): self.api = HfApi(token=token or os.getenv("HF_TOKEN")) self.status_log: List[Dict] = [] def check_fleet(self) -> Dict: """Check status of all 4 agent instances.""" report = { "checked_at": datetime.utcnow().isoformat(), "agents": [], "issues": [], } for name, config in FLEET.items(): repo_id = config["repo"] try: runtime = self.api.get_space_runtime(repo_id) agent_report = { "name": name, "repo": repo_id, "stage": runtime.stage, "hardware": runtime.hardware, "requested_hardware": runtime.requested_hardware, "role": config["role"], "healthy": runtime.stage == "RUNNING", } report["agents"].append(agent_report) if runtime.stage != "RUNNING": report["issues"].append(f"{name}: {runtime.stage} (expected RUNNING)") if runtime.hardware != config["hardware_target"]: report["issues"].append( f"{name}: hardware={runtime.hardware} (target={config['hardware_target']})" ) except Exception as e: report["agents"].append({ "name": name, "repo": repo_id, "stage": "ERROR", "error": str(e), }) report["issues"].append(f"{name}: API error - {e}") self.status_log.append(report) return report def set_hardware(self, agent_name: str, hardware: str): """Set hardware for an agent Space.""" config = FLEET.get(agent_name) if not config: raise ValueError(f"Unknown agent: {agent_name}") repo_id = config["repo"] hardware_enum = getattr(SpaceHardware, hardware.upper().replace("-", "_")) self.api.request_space_hardware(repo_id, hardware=hardware_enum) print(f"[Fleet] Set {agent_name} -> {hardware}") def enable_persistent_storage(self, agent_name: str): """Enable /data volume on a Space.""" config = FLEET.get(agent_name) if not config: raise ValueError(f"Unknown agent: {agent_name}") repo_id = config["repo"] import requests resp = requests.put( f"https://huggingface.co/api/spaces/{repo_id}/volumes", headers={"Authorization": f"Bearer {self.api.token}"}, json={"data": True}, ) if resp.status_code == 200: print(f"[Fleet] Persistent storage enabled for {agent_name}") else: print(f"[Fleet] Failed to enable storage for {agent_name}: {resp.status_code}") def restart_agent(self, agent_name: str): """Restart an agent Space.""" config = FLEET.get(agent_name) if not config: raise ValueError(f"Unknown agent: {agent_name}") self.api.restart_space(config["repo"]) print(f"[Fleet] Restarted {agent_name}") def print_report(self, report: Dict): """Print formatted fleet status.""" print("\n" + "=" * 70) print(f"🤖 AGENT ZERO FLEET REPORT — {report['checked_at']}") print("=" * 70) for agent in report["agents"]: emoji = "🟢" if agent.get("healthy") else "🔴" print(f"\n{emoji} {agent['name']} ({agent['role']})") print(f" Repo: {agent['repo']}") print(f" Stage: {agent.get('stage', 'unknown')}") print(f" Hardware: {agent.get('hardware', 'none')}") if "error" in agent: print(f" ❌ Error: {agent['error']}") if report["issues"]: print(f"\n⚠️ ISSUES ({len(report['issues'])}):") for issue in report["issues"]: print(f" - {issue}") else: print("\n✅ All agents healthy") print("=" * 70 + "\n") def deploy_prompts(self, agent_name: str): """Deploy diagnostic and autonomous loop prompts to an agent.""" config = FLEET.get(agent_name) if not config: raise ValueError(f"Unknown agent: {agent_name}") repo_id = config["repo"] self.api.upload_file( path_or_fileobj=SELF_DIAGNOSTIC_PROMPT.encode(), path_in_repo="prompts/self_diagnostic.md", repo_id=repo_id, repo_type="space", ) self.api.upload_file( path_or_fileobj=AUTONOMOUS_LOOP_PROMPT.encode(), path_in_repo="prompts/autonomous_loop.md", repo_id=repo_id, repo_type="space", ) print(f"[Fleet] Prompts deployed to {agent_name}") def setup_inter_agent_pipeline(self): """Configure shared task queue directories.""" shared_setup = """#!/bin/bash mkdir -p /workspace/shared/task_queue mkdir -p /workspace/shared/results mkdir -p /workspace/shared/completed mkdir -p /workspace/projects chmod 777 /workspace/shared/task_queue chmod 777 /workspace/shared/results chmod 777 /workspace/shared/completed chmod 777 /workspace/projects echo "Shared directories ready" """ for name, config in FLEET.items(): self.api.upload_file( path_or_fileobj=shared_setup.encode(), path_in_repo="setup_shared.sh", repo_id=config["repo"], repo_type="space", ) print("[Fleet] Inter-agent pipeline configured") def main(): parser = argparse.ArgumentParser(description="Agent Zero Fleet Manager") parser.add_argument("--check", action="store_true", help="Check fleet status") parser.add_argument("--set-hw", nargs=2, metavar=("AGENT", "HW"), help="Set hardware") parser.add_argument("--enable-storage", metavar="AGENT", help="Enable persistent storage") parser.add_argument("--restart", metavar="AGENT", help="Restart agent") parser.add_argument("--deploy-prompts", metavar="AGENT", help="Deploy prompts") parser.add_argument("--setup-pipeline", action="store_true", help="Setup inter-agent pipeline") parser.add_argument("--provision-all", action="store_true", help="Provision entire fleet") args = parser.parse_args() manager = AgentZeroFleetManager() if args.check: report = manager.check_fleet() manager.print_report(report) elif args.set_hw: manager.set_hardware(args.set_hw[0], args.set_hw[1]) elif args.enable_storage: manager.enable_persistent_storage(args.enable_storage) elif args.restart: manager.restart_agent(args.restart) elif args.deploy_prompts: manager.deploy_prompts(args.deploy_prompts) elif args.setup_pipeline: manager.setup_inter_agent_pipeline() elif args.provision_all: print("[Fleet] Provisioning entire fleet...") for name, config in FLEET.items(): print(f"\n--- {name} ---") try: manager.set_hardware(name, config["hardware_target"]) if config["persistent_storage"]: manager.enable_persistent_storage(name) manager.restart_agent(name) except Exception as e: print(f"Failed: {e}") print("\n[Fleet] Provisioning complete. Waiting for builds...") else: parser.print_help() if __name__ == "__main__": main()