#!/usr/bin/env python3 """ Agent Zero Sovereign - Gradio Wrapper for HuggingFace Spaces Autonomous orchestration agent with workspace isolation, self-healing model connections, and persistent task queue integration. """ import os import sys import json import time import threading import subprocess import sqlite3 from pathlib import Path from datetime import datetime import gradio as gr # ============================================================================ # Configuration & Environment # ============================================================================ HF_TOKEN = os.environ.get("HF_TOKEN", "") if not HF_TOKEN: print("WARNING: HF_TOKEN not set. Model connections will fail.") WORKSPACE_DIR = Path(os.environ.get("WORKSPACE_DIR", "/app/workspace/projects/sovereign")) SHARED_DIR = Path(os.environ.get("SHARED_DIR", "/app/workspace/shared")) TASK_QUEUE_DIR = SHARED_DIR / "task_queue" LOG_DIR = Path(os.environ.get("LOG_FILE", "/app/workspace/logs/sovereign.log")).parent MEMORY_DB = Path(os.environ.get("MEMORY_DB_PATH", "/app/workspace/memory/sovereign_memory.db")) # Ensure directories exist for d in [WORKSPACE_DIR, SHARED_DIR, TASK_QUEUE_DIR, LOG_DIR, MEMORY_DB.parent]: d.mkdir(parents=True, exist_ok=True) # Agent identity AGENT_NAME = os.environ.get("AGENT_NAME", "Sovereign") AGENT_ROLE = os.environ.get("AGENT_ROLE", "sovereign_orchestrator") MODEL_NAME = os.environ.get("MODEL_NAME", "ScottzillaSystems/Gemma-4-31B-it-abliterated") # ============================================================================ # Memory System (SQLite) # ============================================================================ class AgentMemory: """Persistent conversation memory with text + vector search capabilities.""" def __init__(self, db_path: str): self.db_path = db_path self._init_db() def _init_db(self): with sqlite3.connect(self.db_path) as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS conversations ( id INTEGER PRIMARY KEY AUTOINCREMENT, thread_id TEXT NOT NULL, role TEXT NOT NULL, content TEXT NOT NULL, metadata TEXT DEFAULT '{}', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS tasks ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT UNIQUE NOT NULL, status TEXT DEFAULT 'pending', description TEXT, assigned_to TEXT, result TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, completed_at TIMESTAMP ) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_thread ON conversations(thread_id); """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status); """) conn.commit() def save_message(self, role: str, content: str, thread_id: str = None, metadata: dict = None): if thread_id is None: thread_id = f"autonomous-{datetime.now().strftime('%Y%m%d-%H%M%S')}" with sqlite3.connect(self.db_path) as conn: conn.execute( "INSERT INTO conversations (thread_id, role, content, metadata) VALUES (?, ?, ?, ?)", (thread_id, role, content, json.dumps(metadata or {})) ) conn.commit() return thread_id def search(self, query: str, limit: int = 10): with sqlite3.connect(self.db_path) as conn: results = conn.execute( "SELECT thread_id, role, content, created_at FROM conversations WHERE content LIKE ? ORDER BY created_at DESC LIMIT ?", (f"%{query}%", limit) ).fetchall() return results def get_thread(self, thread_id: str): with sqlite3.connect(self.db_path) as conn: return conn.execute( "SELECT role, content, created_at FROM conversations WHERE thread_id = ? ORDER BY created_at", (thread_id,) ).fetchall() def list_threads(self): with sqlite3.connect(self.db_path) as conn: return conn.execute( "SELECT DISTINCT thread_id, MIN(created_at) as started, COUNT(*) as messages FROM conversations GROUP BY thread_id ORDER BY started DESC" ).fetchall() # ============================================================================ # Task Queue Integration # ============================================================================ class TaskQueue: """File-based task queue for inter-agent communication.""" def __init__(self, queue_dir: Path): self.queue_dir = queue_dir self.queue_dir.mkdir(parents=True, exist_ok=True) def post_task(self, task_id: str, description: str, assigned_to: str = "pentesting"): """Post a new task for another agent to pick up.""" task_file = self.queue_dir / f"{task_id}.json" task = { "task_id": task_id, "description": description, "assigned_to": assigned_to, "status": "pending", "created_at": datetime.now().isoformat(), "created_by": AGENT_NAME } task_file.write_text(json.dumps(task, indent=2)) return task_id def check_results(self, task_id: str = None): """Check for completed task results.""" results = [] for f in self.queue_dir.glob("*.json"): task = json.loads(f.read_text()) if task.get("status") == "completed": results.append(task) if task_id and task["task_id"] == task_id: return task return results if not task_id else None # ============================================================================ # Model Connection (Self-Healing) # ============================================================================ class ModelConnection: """Self-healing model connection using HuggingFace Inference API.""" def __init__(self, model_name: str): self.model_name = model_name self.token = HF_TOKEN self.max_retries = int(os.environ.get("MAX_RETRIES", "3")) import requests as r self.requests = r def query(self, prompt: str, max_tokens: int = 4096, temperature: float = 0.8) -> str: """Query the model with retry logic.""" api_url = f"https://api-inference.huggingface.co/models/{self.model_name}" headers = {"Authorization": f"Bearer {self.token}"} for attempt in range(self.max_retries): try: response = self.requests.post( api_url, headers=headers, json={ "inputs": prompt, "parameters": { "max_new_tokens": max_tokens, "temperature": temperature, "return_full_text": False } }, timeout=120 ) if response.status_code == 200: result = response.json() if isinstance(result, list) and len(result) > 0: return result[0].get("generated_text", "") return str(result) elif response.status_code == 503: # Model loading - wait and retry wait_time = (attempt + 1) * 10 print(f"Model loading (attempt {attempt+1}/{self.max_retries}), waiting {wait_time}s...") time.sleep(wait_time) else: print(f"API error {response.status_code}: {response.text[:200]}") time.sleep(5) except Exception as e: print(f"Connection error (attempt {attempt+1}): {e}") time.sleep(5) return f"[ERROR] Failed to connect to model after {self.max_retries} attempts." # ============================================================================ # Autonomous Agent Loop # ============================================================================ class AutonomousAgent: """Main agent loop that runs continuously, checking tasks and executing work.""" def __init__(self): self.memory = AgentMemory(str(MEMORY_DB)) self.task_queue = TaskQueue(TASK_QUEUE_DIR) self.model = ModelConnection(MODEL_NAME) self.running = False self.thread_id = f"autonomous-{datetime.now().strftime('%Y%m%d-%H%M%S')}" def _compose_system_prompt(self) -> str: return f"""You are {AGENT_NAME}, the {AGENT_ROLE} of a multi-agent AI system running on HuggingFace Spaces. Your capabilities: - You have access to a local workspace at {WORKSPACE_DIR} - You can post tasks to specialized agents via the task queue at {TASK_QUEUE_DIR} - All your conversations are stored in a SQLite memory database - You operate autonomously in a continuous loop Your agents: - agent-zero-pentesting: Security testing, red team operations, vulnerability analysis - agent-zero-code-architect: Software architecture, code generation, system design - agent-zero-research: Literature review, data analysis, experiment design Current workspace contents: {self._list_workspace()} Recent task queue: {self._list_tasks()} You are to: 1. Check for new tasks in the queue 2. Execute your own tasks 3. Delegate to other agents when appropriate 4. Document all decisions and results 5. Continuously improve your processes You are operating 24/7 without human intervention. Be thorough, be creative, be relentless.""" def _list_workspace(self) -> str: try: files = list(WORKSPACE_DIR.rglob("*"))[:20] return "\n".join(f" - {f.relative_to(WORKSPACE_DIR)}" for f in files) if files else " (empty)" except: return " (unable to list)" def _list_tasks(self) -> str: tasks = list(self.task_queue.queue_dir.glob("*.json"))[:10] if not tasks: return " (no tasks)" lines = [] for t in tasks: data = json.loads(t.read_text()) lines.append(f" - [{data.get('status','?')}] {data.get('task_id','?')}: {data.get('description','?')[:80]}") return "\n".join(lines) def autonomous_loop(self): """Main autonomous loop - runs indefinitely.""" print(f"[{AGENT_NAME}] Starting autonomous loop...") self.running = True system_prompt = self._compose_system_prompt() iteration = 0 while self.running: iteration += 1 print(f"[{AGENT_NAME}] Loop iteration {iteration}") # Check completed tasks completed = self.task_queue.check_results() if completed: if isinstance(completed, list): for task in completed: self.memory.save_message( "system", f"Task completed: {task['task_id']} - {task.get('result', 'No result')[:500]}", self.thread_id ) # Generate next action prompt = f"{system_prompt}\n\nIteration {iteration}. What should you do next? Be specific about actions, delegations, and expected outcomes." response = self.model.query(prompt) if not response.startswith("[ERROR]"): self.memory.save_message("assistant", response, self.thread_id) # Auto-delegate if response suggests it if "pentesting" in response.lower(): self.task_queue.post_task( f"auto-task-{iteration}", f"Autonomous delegation from {AGENT_NAME}: {response[:200]}", "pentesting" ) if "code-architect" in response.lower(): self.task_queue.post_task( f"auto-task-{iteration}", f"Autonomous delegation from {AGENT_NAME}: {response[:200]}", "code-architect" ) if "research" in response.lower(): self.task_queue.post_task( f"auto-task-{iteration}", f"Autonomous delegation from {AGENT_NAME}: {response[:200]}", "research" ) else: print(f"[{AGENT_NAME}] Model connection failed. Retrying next cycle.") # Self-diagnostic every 10 iterations if iteration % 10 == 0: self._self_diagnostic() # Wait before next iteration interval = int(os.environ.get("LOOP_INTERVAL_SECONDS", "60")) time.sleep(interval) def _self_diagnostic(self): """Run self-diagnostic checks.""" checks = [] # Check model connection test_response = self.model.query("Hello, respond with 'OK' if you can read this.", max_tokens=10) checks.append(("Model Connection", "OK" in test_response, test_response[:50])) # Check workspace checks.append(("Workspace", WORKSPACE_DIR.exists(), str(WORKSPACE_DIR))) # Check memory checks.append(("Memory DB", MEMORY_DB.exists(), f"{MEMORY_DB.stat().st_size} bytes")) # Check task queue checks.append(("Task Queue", TASK_QUEUE_DIR.exists(), f"{len(list(TASK_QUEUE_DIR.glob('*.json')))} tasks")) report = "\n".join(f"[{'PASS' if ok else 'FAIL'}] {name}: {detail}" for name, ok, detail in checks) self.memory.save_message("system", f"Self-diagnostic:\n{report}", self.thread_id) print(f"[{AGENT_NAME}] Self-diagnostic:\n{report}") return report # ============================================================================ # Gradio Interface # ============================================================================ def create_ui(): """Create the Gradio chat interface.""" agent = AutonomousAgent() # Start autonomous loop in background loop_thread = threading.Thread(target=agent.autonomous_loop, daemon=True) loop_thread.start() with gr.Blocks(title=f"Agent Zero - {AGENT_NAME}") as demo: gr.Markdown(f"""# 🤖 Agent Zero: {AGENT_NAME} **Role:** {AGENT_ROLE} | **Model:** {MODEL_NAME} This agent operates **autonomously 24/7**. The chat interface allows you to observe its actions and inject tasks. The autonomous loop runs every {os.environ.get('LOOP_INTERVAL_SECONDS', '60')} seconds. """) with gr.Tabs(): with gr.TabItem("💬 Chat"): chatbot = gr.Chatbot(height=500) msg = gr.Textbox(label="Inject task or message", placeholder="Type a task for the agent...") send = gr.Button("Send") def respond(message, history): response = agent.model.query(message) agent.memory.save_message("user", message) agent.memory.save_message("assistant", response) history = history or [] history.append((message, response)) return "", history send.click(respond, [msg, chatbot], [msg, chatbot]) with gr.TabItem("📋 Task Queue"): task_list = gr.JSON(label="Current Tasks") refresh_btn = gr.Button("Refresh") def refresh_tasks(): tasks = [] for f in TASK_QUEUE_DIR.glob("*.json"): tasks.append(json.loads(f.read_text())) return tasks refresh_btn.click(refresh_tasks, None, task_list) with gr.TabItem("🧠 Memory Search"): search_query = gr.Textbox(label="Search conversations") search_results = gr.Dataframe( headers=["Thread", "Role", "Content", "Time"], label="Results" ) search_btn = gr.Button("Search") def do_search(query): results = agent.memory.search(query, limit=20) return [[r[0], r[1], r[2][:200], r[3]] for r in results] search_btn.click(do_search, search_query, search_results) with gr.TabItem("🔧 Diagnostics"): diag_output = gr.Textbox(label="System Status", lines=10) diag_btn = gr.Button("Run Diagnostic") def run_diag(): return agent._self_diagnostic() diag_btn.click(run_diag, None, diag_output) # Auto-refresh task queue demo.load(refresh_tasks, None, task_list) # periodic refresh disabled for Gradio 6.0 compat return demo # ============================================================================ # Main Entry Point # ============================================================================ if __name__ == "__main__": print(f""" ╔══════════════════════════════════════════════════════════╗ ║ Agent Zero: {AGENT_NAME:<30} ║ ║ Role: {AGENT_ROLE:<38} ║ ║ Model: {MODEL_NAME:<37} ║ ║ Workspace: {str(WORKSPACE_DIR):<33} ║ ╚══════════════════════════════════════════════════════════╝ """) demo = create_ui() demo.queue(max_size=50) demo.launch(server_name="0.0.0.0", server_port=7860)