agent-zero-sovereign / agent_zero_wrapper.py
ScottzillaSystems's picture
[CHIMERA] Fix Gradio 6.0 compat (theme, every) and switch to working model
e83194b verified
#!/usr/bin/env python3
"""
Agent Zero Sovereign - Gradio Wrapper for HuggingFace Spaces
Autonomous orchestration agent with workspace isolation, self-healing model connections,
and persistent task queue integration.
"""
import os
import sys
import json
import time
import threading
import subprocess
import sqlite3
from pathlib import Path
from datetime import datetime
import gradio as gr
# ============================================================================
# Configuration & Environment
# ============================================================================
HF_TOKEN = os.environ.get("HF_TOKEN", "")
if not HF_TOKEN:
print("WARNING: HF_TOKEN not set. Model connections will fail.")
WORKSPACE_DIR = Path(os.environ.get("WORKSPACE_DIR", "/app/workspace/projects/sovereign"))
SHARED_DIR = Path(os.environ.get("SHARED_DIR", "/app/workspace/shared"))
TASK_QUEUE_DIR = SHARED_DIR / "task_queue"
LOG_DIR = Path(os.environ.get("LOG_FILE", "/app/workspace/logs/sovereign.log")).parent
MEMORY_DB = Path(os.environ.get("MEMORY_DB_PATH", "/app/workspace/memory/sovereign_memory.db"))
# Ensure directories exist
for d in [WORKSPACE_DIR, SHARED_DIR, TASK_QUEUE_DIR, LOG_DIR, MEMORY_DB.parent]:
d.mkdir(parents=True, exist_ok=True)
# Agent identity
AGENT_NAME = os.environ.get("AGENT_NAME", "Sovereign")
AGENT_ROLE = os.environ.get("AGENT_ROLE", "sovereign_orchestrator")
MODEL_NAME = os.environ.get("MODEL_NAME", "ScottzillaSystems/Gemma-4-31B-it-abliterated")
# ============================================================================
# Memory System (SQLite)
# ============================================================================
class AgentMemory:
"""Persistent conversation memory with text + vector search capabilities."""
def __init__(self, db_path: str):
self.db_path = db_path
self._init_db()
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS conversations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
thread_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
metadata TEXT DEFAULT '{}',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT UNIQUE NOT NULL,
status TEXT DEFAULT 'pending',
description TEXT,
assigned_to TEXT,
result TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_thread ON conversations(thread_id);
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status);
""")
conn.commit()
def save_message(self, role: str, content: str, thread_id: str = None, metadata: dict = None):
if thread_id is None:
thread_id = f"autonomous-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"INSERT INTO conversations (thread_id, role, content, metadata) VALUES (?, ?, ?, ?)",
(thread_id, role, content, json.dumps(metadata or {}))
)
conn.commit()
return thread_id
def search(self, query: str, limit: int = 10):
with sqlite3.connect(self.db_path) as conn:
results = conn.execute(
"SELECT thread_id, role, content, created_at FROM conversations WHERE content LIKE ? ORDER BY created_at DESC LIMIT ?",
(f"%{query}%", limit)
).fetchall()
return results
def get_thread(self, thread_id: str):
with sqlite3.connect(self.db_path) as conn:
return conn.execute(
"SELECT role, content, created_at FROM conversations WHERE thread_id = ? ORDER BY created_at",
(thread_id,)
).fetchall()
def list_threads(self):
with sqlite3.connect(self.db_path) as conn:
return conn.execute(
"SELECT DISTINCT thread_id, MIN(created_at) as started, COUNT(*) as messages FROM conversations GROUP BY thread_id ORDER BY started DESC"
).fetchall()
# ============================================================================
# Task Queue Integration
# ============================================================================
class TaskQueue:
"""File-based task queue for inter-agent communication."""
def __init__(self, queue_dir: Path):
self.queue_dir = queue_dir
self.queue_dir.mkdir(parents=True, exist_ok=True)
def post_task(self, task_id: str, description: str, assigned_to: str = "pentesting"):
"""Post a new task for another agent to pick up."""
task_file = self.queue_dir / f"{task_id}.json"
task = {
"task_id": task_id,
"description": description,
"assigned_to": assigned_to,
"status": "pending",
"created_at": datetime.now().isoformat(),
"created_by": AGENT_NAME
}
task_file.write_text(json.dumps(task, indent=2))
return task_id
def check_results(self, task_id: str = None):
"""Check for completed task results."""
results = []
for f in self.queue_dir.glob("*.json"):
task = json.loads(f.read_text())
if task.get("status") == "completed":
results.append(task)
if task_id and task["task_id"] == task_id:
return task
return results if not task_id else None
# ============================================================================
# Model Connection (Self-Healing)
# ============================================================================
class ModelConnection:
"""Self-healing model connection using HuggingFace Inference API."""
def __init__(self, model_name: str):
self.model_name = model_name
self.token = HF_TOKEN
self.max_retries = int(os.environ.get("MAX_RETRIES", "3"))
import requests as r
self.requests = r
def query(self, prompt: str, max_tokens: int = 4096, temperature: float = 0.8) -> str:
"""Query the model with retry logic."""
api_url = f"https://api-inference.huggingface.co/models/{self.model_name}"
headers = {"Authorization": f"Bearer {self.token}"}
for attempt in range(self.max_retries):
try:
response = self.requests.post(
api_url,
headers=headers,
json={
"inputs": prompt,
"parameters": {
"max_new_tokens": max_tokens,
"temperature": temperature,
"return_full_text": False
}
},
timeout=120
)
if response.status_code == 200:
result = response.json()
if isinstance(result, list) and len(result) > 0:
return result[0].get("generated_text", "")
return str(result)
elif response.status_code == 503:
# Model loading - wait and retry
wait_time = (attempt + 1) * 10
print(f"Model loading (attempt {attempt+1}/{self.max_retries}), waiting {wait_time}s...")
time.sleep(wait_time)
else:
print(f"API error {response.status_code}: {response.text[:200]}")
time.sleep(5)
except Exception as e:
print(f"Connection error (attempt {attempt+1}): {e}")
time.sleep(5)
return f"[ERROR] Failed to connect to model after {self.max_retries} attempts."
# ============================================================================
# Autonomous Agent Loop
# ============================================================================
class AutonomousAgent:
"""Main agent loop that runs continuously, checking tasks and executing work."""
def __init__(self):
self.memory = AgentMemory(str(MEMORY_DB))
self.task_queue = TaskQueue(TASK_QUEUE_DIR)
self.model = ModelConnection(MODEL_NAME)
self.running = False
self.thread_id = f"autonomous-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
def _compose_system_prompt(self) -> str:
return f"""You are {AGENT_NAME}, the {AGENT_ROLE} of a multi-agent AI system running on HuggingFace Spaces.
Your capabilities:
- You have access to a local workspace at {WORKSPACE_DIR}
- You can post tasks to specialized agents via the task queue at {TASK_QUEUE_DIR}
- All your conversations are stored in a SQLite memory database
- You operate autonomously in a continuous loop
Your agents:
- agent-zero-pentesting: Security testing, red team operations, vulnerability analysis
- agent-zero-code-architect: Software architecture, code generation, system design
- agent-zero-research: Literature review, data analysis, experiment design
Current workspace contents:
{self._list_workspace()}
Recent task queue:
{self._list_tasks()}
You are to:
1. Check for new tasks in the queue
2. Execute your own tasks
3. Delegate to other agents when appropriate
4. Document all decisions and results
5. Continuously improve your processes
You are operating 24/7 without human intervention. Be thorough, be creative, be relentless."""
def _list_workspace(self) -> str:
try:
files = list(WORKSPACE_DIR.rglob("*"))[:20]
return "\n".join(f" - {f.relative_to(WORKSPACE_DIR)}" for f in files) if files else " (empty)"
except:
return " (unable to list)"
def _list_tasks(self) -> str:
tasks = list(self.task_queue.queue_dir.glob("*.json"))[:10]
if not tasks:
return " (no tasks)"
lines = []
for t in tasks:
data = json.loads(t.read_text())
lines.append(f" - [{data.get('status','?')}] {data.get('task_id','?')}: {data.get('description','?')[:80]}")
return "\n".join(lines)
def autonomous_loop(self):
"""Main autonomous loop - runs indefinitely."""
print(f"[{AGENT_NAME}] Starting autonomous loop...")
self.running = True
system_prompt = self._compose_system_prompt()
iteration = 0
while self.running:
iteration += 1
print(f"[{AGENT_NAME}] Loop iteration {iteration}")
# Check completed tasks
completed = self.task_queue.check_results()
if completed:
if isinstance(completed, list):
for task in completed:
self.memory.save_message(
"system",
f"Task completed: {task['task_id']} - {task.get('result', 'No result')[:500]}",
self.thread_id
)
# Generate next action
prompt = f"{system_prompt}\n\nIteration {iteration}. What should you do next? Be specific about actions, delegations, and expected outcomes."
response = self.model.query(prompt)
if not response.startswith("[ERROR]"):
self.memory.save_message("assistant", response, self.thread_id)
# Auto-delegate if response suggests it
if "pentesting" in response.lower():
self.task_queue.post_task(
f"auto-task-{iteration}",
f"Autonomous delegation from {AGENT_NAME}: {response[:200]}",
"pentesting"
)
if "code-architect" in response.lower():
self.task_queue.post_task(
f"auto-task-{iteration}",
f"Autonomous delegation from {AGENT_NAME}: {response[:200]}",
"code-architect"
)
if "research" in response.lower():
self.task_queue.post_task(
f"auto-task-{iteration}",
f"Autonomous delegation from {AGENT_NAME}: {response[:200]}",
"research"
)
else:
print(f"[{AGENT_NAME}] Model connection failed. Retrying next cycle.")
# Self-diagnostic every 10 iterations
if iteration % 10 == 0:
self._self_diagnostic()
# Wait before next iteration
interval = int(os.environ.get("LOOP_INTERVAL_SECONDS", "60"))
time.sleep(interval)
def _self_diagnostic(self):
"""Run self-diagnostic checks."""
checks = []
# Check model connection
test_response = self.model.query("Hello, respond with 'OK' if you can read this.", max_tokens=10)
checks.append(("Model Connection", "OK" in test_response, test_response[:50]))
# Check workspace
checks.append(("Workspace", WORKSPACE_DIR.exists(), str(WORKSPACE_DIR)))
# Check memory
checks.append(("Memory DB", MEMORY_DB.exists(), f"{MEMORY_DB.stat().st_size} bytes"))
# Check task queue
checks.append(("Task Queue", TASK_QUEUE_DIR.exists(), f"{len(list(TASK_QUEUE_DIR.glob('*.json')))} tasks"))
report = "\n".join(f"[{'PASS' if ok else 'FAIL'}] {name}: {detail}" for name, ok, detail in checks)
self.memory.save_message("system", f"Self-diagnostic:\n{report}", self.thread_id)
print(f"[{AGENT_NAME}] Self-diagnostic:\n{report}")
return report
# ============================================================================
# Gradio Interface
# ============================================================================
def create_ui():
"""Create the Gradio chat interface."""
agent = AutonomousAgent()
# Start autonomous loop in background
loop_thread = threading.Thread(target=agent.autonomous_loop, daemon=True)
loop_thread.start()
with gr.Blocks(title=f"Agent Zero - {AGENT_NAME}") as demo:
gr.Markdown(f"""# πŸ€– Agent Zero: {AGENT_NAME}
**Role:** {AGENT_ROLE} | **Model:** {MODEL_NAME}
This agent operates **autonomously 24/7**. The chat interface allows you to observe its actions
and inject tasks. The autonomous loop runs every {os.environ.get('LOOP_INTERVAL_SECONDS', '60')} seconds.
""")
with gr.Tabs():
with gr.TabItem("πŸ’¬ Chat"):
chatbot = gr.Chatbot(height=500)
msg = gr.Textbox(label="Inject task or message", placeholder="Type a task for the agent...")
send = gr.Button("Send")
def respond(message, history):
response = agent.model.query(message)
agent.memory.save_message("user", message)
agent.memory.save_message("assistant", response)
history = history or []
history.append((message, response))
return "", history
send.click(respond, [msg, chatbot], [msg, chatbot])
with gr.TabItem("πŸ“‹ Task Queue"):
task_list = gr.JSON(label="Current Tasks")
refresh_btn = gr.Button("Refresh")
def refresh_tasks():
tasks = []
for f in TASK_QUEUE_DIR.glob("*.json"):
tasks.append(json.loads(f.read_text()))
return tasks
refresh_btn.click(refresh_tasks, None, task_list)
with gr.TabItem("🧠 Memory Search"):
search_query = gr.Textbox(label="Search conversations")
search_results = gr.Dataframe(
headers=["Thread", "Role", "Content", "Time"],
label="Results"
)
search_btn = gr.Button("Search")
def do_search(query):
results = agent.memory.search(query, limit=20)
return [[r[0], r[1], r[2][:200], r[3]] for r in results]
search_btn.click(do_search, search_query, search_results)
with gr.TabItem("πŸ”§ Diagnostics"):
diag_output = gr.Textbox(label="System Status", lines=10)
diag_btn = gr.Button("Run Diagnostic")
def run_diag():
return agent._self_diagnostic()
diag_btn.click(run_diag, None, diag_output)
# Auto-refresh task queue
demo.load(refresh_tasks, None, task_list) # periodic refresh disabled for Gradio 6.0 compat
return demo
# ============================================================================
# Main Entry Point
# ============================================================================
if __name__ == "__main__":
print(f"""
╔══════════════════════════════════════════════════════════╗
β•‘ Agent Zero: {AGENT_NAME:<30} β•‘
β•‘ Role: {AGENT_ROLE:<38} β•‘
β•‘ Model: {MODEL_NAME:<37} β•‘
β•‘ Workspace: {str(WORKSPACE_DIR):<33} β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
""")
demo = create_ui()
demo.queue(max_size=50)
demo.launch(server_name="0.0.0.0", server_port=7860)