AgentAZAll / llm_bridge.py
ghmk's picture
Rebuild: dual-agent live demo with filesystem browser
773b021
"""LLM <-> AgentAZAll bridge for the HuggingFace Spaces demo.
Dual-model setup: Qwen2.5-3B-Instruct (Agent Alpha) and SmolLM2-1.7B-Instruct
(Agent Beta). Models are loaded at module level for ZeroGPU caching.
"""
import os
import re
import sys
from pathlib import Path
# Ensure src/ is on the import path
sys.path.insert(0, str(Path(__file__).parent / "src"))
from agentazall.config import INBOX, NOTES, REMEMBER, SENT
from agentazall.helpers import (
agent_base,
agent_day,
ensure_dirs,
sanitize,
today_str,
)
from agentazall.index import build_index, build_remember_index
from agentazall.messages import format_message, parse_headers_only, parse_message
from seed_data import make_demo_config, MAILBOXES
# Model IDs
ALPHA_MODEL_ID = "Qwen/Qwen2.5-3B-Instruct"
BETA_MODEL_ID = "HuggingFaceTB/SmolLM2-1.7B-Instruct"
# Regex for tool calls: [TOOL: command | arg1 | arg2 | ...]
TOOL_PATTERN = re.compile(r"\[TOOL:\s*(\w+)(?:\s*\|\s*(.*?))?\]")
# ---------------------------------------------------------------------------
# Tool implementations (direct filesystem, no subprocess)
# ---------------------------------------------------------------------------
def _tool_remember(cfg: dict, args: list[str]) -> str:
"""Store a persistent memory."""
if not args:
return "Error: need text to remember."
text = args[0].strip()
title = sanitize(args[1].strip()) if len(args) > 1 and args[1].strip() else "memory"
if not title.endswith(".txt"):
title += ".txt"
d = today_str()
ensure_dirs(cfg, d)
mem_dir = agent_day(cfg, d) / REMEMBER
mem_dir.mkdir(parents=True, exist_ok=True)
# Avoid overwriting: append counter if exists
path = mem_dir / title
if path.exists():
stem = path.stem
for i in range(2, 100):
candidate = mem_dir / f"{stem}-{i}.txt"
if not candidate.exists():
path = candidate
break
path.write_text(text, encoding="utf-8")
build_remember_index(cfg)
return f"Memory stored: {path.stem}"
def _tool_recall(cfg: dict, args: list[str]) -> str:
"""Search/display agent memories."""
query = args[0].strip().lower() if args and args[0].strip() else ""
base = agent_base(cfg)
results = []
# Walk all date directories looking for remember/ folders
if base.exists():
for date_dir in sorted(base.iterdir(), reverse=True):
rem_dir = date_dir / REMEMBER
if not rem_dir.is_dir():
continue
for f in sorted(rem_dir.iterdir()):
if not f.is_file() or f.suffix != ".txt":
continue
content = f.read_text(encoding="utf-8").strip()
if not query or query in content.lower() or query in f.stem.lower():
results.append(f"[{date_dir.name}] {f.stem}: {content[:200]}")
if len(results) >= 20:
break
if not results:
return "No memories found." + (f" (searched for: '{query}')" if query else "")
return f"Found {len(results)} memories:\n" + "\n".join(results)
def _tool_whoami(cfg: dict, args: list[str]) -> str:
"""Get agent identity."""
d = today_str()
path = agent_day(cfg, d) / "who_am_i" / "identity.txt"
if path.exists():
return path.read_text(encoding="utf-8").strip()
return "Identity not set."
def _tool_doing(cfg: dict, args: list[str]) -> str:
"""Get or set current tasks."""
d = today_str()
ensure_dirs(cfg, d)
path = agent_day(cfg, d) / "what_am_i_doing" / "tasks.txt"
if args and args[0].strip():
# Set new status
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(args[0].strip(), encoding="utf-8")
return f"Tasks updated: {args[0].strip()[:100]}"
if path.exists():
return path.read_text(encoding="utf-8").strip()
return "No current tasks set."
def _tool_note(cfg: dict, args: list[str]) -> str:
"""Read or write a named note."""
if not args or not args[0].strip():
return "Error: need note name."
name = sanitize(args[0].strip())
if not name.endswith(".txt"):
name += ".txt"
d = today_str()
ensure_dirs(cfg, d)
note_path = agent_day(cfg, d) / NOTES / name
if len(args) > 1 and args[1].strip():
# Write
note_path.parent.mkdir(parents=True, exist_ok=True)
note_path.write_text(args[1].strip(), encoding="utf-8")
return f"Note '{args[0].strip()}' saved."
# Read
if note_path.exists():
return note_path.read_text(encoding="utf-8").strip()
return f"Note '{args[0].strip()}' not found."
def _tool_send(cfg: dict, args: list[str]) -> str:
"""Send a message to another agent."""
if len(args) < 3:
return "Error: need [to | subject | body]."
to_agent = args[0].strip()
subject = args[1].strip()
body = args[2].strip()
if not to_agent or not subject or not body:
return "Error: to, subject, and body are all required."
content, msg_id = format_message(cfg["agent_name"], to_agent, subject, body)
d = today_str()
ensure_dirs(cfg, d)
# Queue in sender's outbox
outbox = agent_day(cfg, d) / "outbox"
outbox.mkdir(parents=True, exist_ok=True)
(outbox / f"{msg_id}.txt").write_text(content, encoding="utf-8")
# Direct delivery to recipient's inbox (local demo, no transport needed)
recipient_inbox = Path(cfg["mailbox_dir"]) / to_agent / d / INBOX
recipient_inbox.mkdir(parents=True, exist_ok=True)
(recipient_inbox / f"{msg_id}.txt").write_text(content, encoding="utf-8")
# Copy to sender's sent
sent = agent_day(cfg, d) / SENT
sent.mkdir(parents=True, exist_ok=True)
(sent / f"{msg_id}.txt").write_text(content, encoding="utf-8")
return f"Message sent to {to_agent}: '{subject}' (ID: {msg_id})"
def _tool_inbox(cfg: dict, args: list[str]) -> str:
"""List inbox messages."""
d = today_str()
inbox_dir = agent_day(cfg, d) / INBOX
if not inbox_dir.exists():
return "Inbox is empty."
messages = []
for f in sorted(inbox_dir.iterdir(), reverse=True):
if not f.is_file() or f.suffix != ".txt":
continue
headers = parse_headers_only(f)
if headers:
fr = headers.get("From", "?")
subj = headers.get("Subject", "(no subject)")
messages.append(f" [{f.stem}] From: {fr} | Subject: {subj}")
if not messages:
return "Inbox is empty."
return f"Inbox ({len(messages)} messages):\n" + "\n".join(messages)
def _tool_directory(cfg: dict, args: list[str]) -> str:
"""List all agents in the network."""
mb = Path(cfg["mailbox_dir"])
if not mb.exists():
return "No agents found."
agents = []
for agent_dir in sorted(mb.iterdir()):
if not agent_dir.is_dir() or agent_dir.name.startswith("."):
continue
name = agent_dir.name
# Find latest date dir with identity
identity = "?"
doing = "?"
for date_dir in sorted(agent_dir.iterdir(), reverse=True):
if not date_dir.is_dir() or not re.match(r"\d{4}-\d{2}-\d{2}$", date_dir.name):
continue
id_file = date_dir / "who_am_i" / "identity.txt"
if id_file.exists():
identity = id_file.read_text(encoding="utf-8").strip()[:120]
task_file = date_dir / "what_am_i_doing" / "tasks.txt"
if task_file.exists():
doing = task_file.read_text(encoding="utf-8").strip()[:120]
break
agents.append(f" {name}\n Identity: {identity}\n Doing: {doing}")
if not agents:
return "No agents found."
return f"Agent Directory ({len(agents)} agents):\n\n" + "\n\n".join(agents)
# Tool dispatch table
TOOL_MAP = {
"remember": _tool_remember,
"recall": _tool_recall,
"whoami": _tool_whoami,
"doing": _tool_doing,
"note": _tool_note,
"send": _tool_send,
"inbox": _tool_inbox,
"directory": _tool_directory,
}
# ---------------------------------------------------------------------------
# System prompt & context builder
# ---------------------------------------------------------------------------
SYSTEM_PROMPT_TEMPLATE = """\
You are "{agent_name}", an AI assistant with persistent memory powered by AgentAZAll.
You can use tools by writing them on their own line in this exact format:
[TOOL: command | arg1 | arg2]
Available tools:
- [TOOL: remember | text to remember | optional-title] -- Store a persistent memory
- [TOOL: recall | optional search query] -- Search your memories (blank = show all)
- [TOOL: whoami] -- Check your identity
- [TOOL: doing | optional new status] -- Check or update your current task
- [TOOL: note | name | optional text to save] -- Read or write a named note
- [TOOL: send | recipient@localhost | subject | body] -- Send a message to another agent
- [TOOL: inbox] -- Check your inbox for messages
- [TOOL: directory] -- List all agents in the network
When you use a tool, write ONLY the [TOOL: ...] line. The system will execute it
and show you the result. Then continue your response naturally.
You ARE a persistent agent. Information you [TOOL: remember] will survive across
conversations. You can message other agents and they can message you back.
This is not a simulation -- these are real file operations on the filesystem.
YOUR CURRENT STATE:
Identity: {identity}
Current task: {doing}
YOUR MEMORIES:
{memories}
YOUR INBOX:
{inbox}
AGENTS IN NETWORK:
{directory}
Respond naturally and helpfully. Use tools when relevant -- remember observations,
check your inbox, send messages to collaborators. Keep responses concise.\
"""
def build_system_prompt(cfg: dict) -> str:
"""Assemble the system prompt with live context from the agent's state."""
identity = _tool_whoami(cfg, [])
doing = _tool_doing(cfg, [])
memories = _tool_recall(cfg, [])
inbox = _tool_inbox(cfg, [])
directory = _tool_directory(cfg, [])
return SYSTEM_PROMPT_TEMPLATE.format(
agent_name=cfg["agent_name"],
identity=identity,
doing=doing,
memories=memories,
inbox=inbox,
directory=directory,
)
def parse_tool_calls(text: str) -> list[tuple[str, list[str]]]:
"""Extract [TOOL: cmd | arg1 | arg2] patterns from LLM output."""
calls = []
for match in TOOL_PATTERN.finditer(text):
cmd = match.group(1).lower().strip()
raw_args = match.group(2) or ""
args = [a.strip() for a in raw_args.split("|")] if raw_args.strip() else []
if cmd in TOOL_MAP:
calls.append((cmd, args))
return calls
def execute_tools(tool_calls: list[tuple[str, list[str]]], cfg: dict) -> str:
"""Execute parsed tool calls and return formatted results."""
results = []
for cmd, args in tool_calls:
fn = TOOL_MAP.get(cmd)
if fn:
try:
result = fn(cfg, args)
except Exception as e:
result = f"Error executing {cmd}: {e}"
results.append(f"**[{cmd}]** {result}")
return "\n\n".join(results)
# ---------------------------------------------------------------------------
# Model loading — module-level for ZeroGPU caching
# ---------------------------------------------------------------------------
def _is_on_hf_spaces() -> bool:
"""Detect if running on Hugging Face Spaces."""
return "SPACE_ID" in os.environ
# Lazy model holders — loaded on first use, then cached
_models = {}
def _ensure_models():
"""Load both models if not already cached."""
if "alpha" in _models:
return
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
device = "cuda" if torch.cuda.is_available() else "cpu"
dtype = torch.bfloat16 if device == "cuda" else torch.float32
# Alpha: Qwen2.5-3B-Instruct
_models["alpha_tokenizer"] = AutoTokenizer.from_pretrained(ALPHA_MODEL_ID)
alpha_model = AutoModelForCausalLM.from_pretrained(
ALPHA_MODEL_ID, torch_dtype=dtype,
device_map="auto" if device == "cuda" else None,
)
if device != "cuda":
alpha_model = alpha_model.to(device)
_models["alpha"] = alpha_model
# Beta: SmolLM2-1.7B-Instruct
_models["beta_tokenizer"] = AutoTokenizer.from_pretrained(BETA_MODEL_ID)
beta_model = AutoModelForCausalLM.from_pretrained(
BETA_MODEL_ID, torch_dtype=dtype,
device_map="auto" if device == "cuda" else None,
)
if device != "cuda":
beta_model = beta_model.to(device)
_models["beta"] = beta_model
# ---------------------------------------------------------------------------
# Unified generate function
# ---------------------------------------------------------------------------
def generate_response(agent_id: str, message: str, history: list, cfg: dict) -> str:
"""Generate a response using the appropriate model with AgentAZAll tools.
agent_id: "alpha" or "beta"
On HF Spaces this runs on ZeroGPU. Locally it runs on CPU.
"""
import torch
_ensure_models()
model = _models[agent_id]
tokenizer = _models[f"{agent_id}_tokenizer"]
device = next(model.parameters()).device
# Build messages with context
system_prompt = build_system_prompt(cfg)
messages = [{"role": "system", "content": system_prompt}]
# Add conversation history
for h in history:
if isinstance(h, (list, tuple)) and len(h) == 2:
messages.append({"role": "user", "content": str(h[0])})
messages.append({"role": "assistant", "content": str(h[1])})
elif isinstance(h, dict):
messages.append(h)
messages.append({"role": "user", "content": message})
# Tokenize and generate
input_text = tokenizer.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
inputs = tokenizer(input_text, return_tensors="pt").to(device)
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=384,
temperature=0.7,
top_p=0.9,
do_sample=True,
repetition_penalty=1.1,
)
response = tokenizer.decode(
outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True
)
# Parse and execute tool calls
tool_calls = parse_tool_calls(response)
if tool_calls:
tool_results = execute_tools(tool_calls, cfg)
clean_response = TOOL_PATTERN.sub("", response).strip()
if clean_response:
return f"{clean_response}\n\n---\n*Tool results:*\n{tool_results}"
return f"*Tool results:*\n{tool_results}"
return response
# Apply @spaces.GPU decorator only on HF Spaces
if _is_on_hf_spaces():
try:
import spaces
generate_response = spaces.GPU(duration=120)(generate_response)
except ImportError:
pass