Spaces:
Running on Zero
Running on Zero
| """LLM <-> AgentAZAll bridge for the HuggingFace Spaces demo. | |
| Dual-model setup: Qwen2.5-3B-Instruct (Agent Alpha) and SmolLM2-1.7B-Instruct | |
| (Agent Beta). Models are loaded at module level for ZeroGPU caching. | |
| """ | |
| import os | |
| import re | |
| import sys | |
| from pathlib import Path | |
| # Ensure src/ is on the import path | |
| sys.path.insert(0, str(Path(__file__).parent / "src")) | |
| from agentazall.config import INBOX, NOTES, REMEMBER, SENT | |
| from agentazall.helpers import ( | |
| agent_base, | |
| agent_day, | |
| ensure_dirs, | |
| sanitize, | |
| today_str, | |
| ) | |
| from agentazall.index import build_index, build_remember_index | |
| from agentazall.messages import format_message, parse_headers_only, parse_message | |
| from seed_data import make_demo_config, MAILBOXES | |
| # Model IDs | |
| ALPHA_MODEL_ID = "Qwen/Qwen2.5-3B-Instruct" | |
| BETA_MODEL_ID = "HuggingFaceTB/SmolLM2-1.7B-Instruct" | |
| # Regex for tool calls: [TOOL: command | arg1 | arg2 | ...] | |
| TOOL_PATTERN = re.compile(r"\[TOOL:\s*(\w+)(?:\s*\|\s*(.*?))?\]") | |
| # --------------------------------------------------------------------------- | |
| # Tool implementations (direct filesystem, no subprocess) | |
| # --------------------------------------------------------------------------- | |
| def _tool_remember(cfg: dict, args: list[str]) -> str: | |
| """Store a persistent memory.""" | |
| if not args: | |
| return "Error: need text to remember." | |
| text = args[0].strip() | |
| title = sanitize(args[1].strip()) if len(args) > 1 and args[1].strip() else "memory" | |
| if not title.endswith(".txt"): | |
| title += ".txt" | |
| d = today_str() | |
| ensure_dirs(cfg, d) | |
| mem_dir = agent_day(cfg, d) / REMEMBER | |
| mem_dir.mkdir(parents=True, exist_ok=True) | |
| # Avoid overwriting: append counter if exists | |
| path = mem_dir / title | |
| if path.exists(): | |
| stem = path.stem | |
| for i in range(2, 100): | |
| candidate = mem_dir / f"{stem}-{i}.txt" | |
| if not candidate.exists(): | |
| path = candidate | |
| break | |
| path.write_text(text, encoding="utf-8") | |
| build_remember_index(cfg) | |
| return f"Memory stored: {path.stem}" | |
| def _tool_recall(cfg: dict, args: list[str]) -> str: | |
| """Search/display agent memories.""" | |
| query = args[0].strip().lower() if args and args[0].strip() else "" | |
| base = agent_base(cfg) | |
| results = [] | |
| # Walk all date directories looking for remember/ folders | |
| if base.exists(): | |
| for date_dir in sorted(base.iterdir(), reverse=True): | |
| rem_dir = date_dir / REMEMBER | |
| if not rem_dir.is_dir(): | |
| continue | |
| for f in sorted(rem_dir.iterdir()): | |
| if not f.is_file() or f.suffix != ".txt": | |
| continue | |
| content = f.read_text(encoding="utf-8").strip() | |
| if not query or query in content.lower() or query in f.stem.lower(): | |
| results.append(f"[{date_dir.name}] {f.stem}: {content[:200]}") | |
| if len(results) >= 20: | |
| break | |
| if not results: | |
| return "No memories found." + (f" (searched for: '{query}')" if query else "") | |
| return f"Found {len(results)} memories:\n" + "\n".join(results) | |
| def _tool_whoami(cfg: dict, args: list[str]) -> str: | |
| """Get agent identity.""" | |
| d = today_str() | |
| path = agent_day(cfg, d) / "who_am_i" / "identity.txt" | |
| if path.exists(): | |
| return path.read_text(encoding="utf-8").strip() | |
| return "Identity not set." | |
| def _tool_doing(cfg: dict, args: list[str]) -> str: | |
| """Get or set current tasks.""" | |
| d = today_str() | |
| ensure_dirs(cfg, d) | |
| path = agent_day(cfg, d) / "what_am_i_doing" / "tasks.txt" | |
| if args and args[0].strip(): | |
| # Set new status | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| path.write_text(args[0].strip(), encoding="utf-8") | |
| return f"Tasks updated: {args[0].strip()[:100]}" | |
| if path.exists(): | |
| return path.read_text(encoding="utf-8").strip() | |
| return "No current tasks set." | |
| def _tool_note(cfg: dict, args: list[str]) -> str: | |
| """Read or write a named note.""" | |
| if not args or not args[0].strip(): | |
| return "Error: need note name." | |
| name = sanitize(args[0].strip()) | |
| if not name.endswith(".txt"): | |
| name += ".txt" | |
| d = today_str() | |
| ensure_dirs(cfg, d) | |
| note_path = agent_day(cfg, d) / NOTES / name | |
| if len(args) > 1 and args[1].strip(): | |
| # Write | |
| note_path.parent.mkdir(parents=True, exist_ok=True) | |
| note_path.write_text(args[1].strip(), encoding="utf-8") | |
| return f"Note '{args[0].strip()}' saved." | |
| # Read | |
| if note_path.exists(): | |
| return note_path.read_text(encoding="utf-8").strip() | |
| return f"Note '{args[0].strip()}' not found." | |
| def _tool_send(cfg: dict, args: list[str]) -> str: | |
| """Send a message to another agent.""" | |
| if len(args) < 3: | |
| return "Error: need [to | subject | body]." | |
| to_agent = args[0].strip() | |
| subject = args[1].strip() | |
| body = args[2].strip() | |
| if not to_agent or not subject or not body: | |
| return "Error: to, subject, and body are all required." | |
| content, msg_id = format_message(cfg["agent_name"], to_agent, subject, body) | |
| d = today_str() | |
| ensure_dirs(cfg, d) | |
| # Queue in sender's outbox | |
| outbox = agent_day(cfg, d) / "outbox" | |
| outbox.mkdir(parents=True, exist_ok=True) | |
| (outbox / f"{msg_id}.txt").write_text(content, encoding="utf-8") | |
| # Direct delivery to recipient's inbox (local demo, no transport needed) | |
| recipient_inbox = Path(cfg["mailbox_dir"]) / to_agent / d / INBOX | |
| recipient_inbox.mkdir(parents=True, exist_ok=True) | |
| (recipient_inbox / f"{msg_id}.txt").write_text(content, encoding="utf-8") | |
| # Copy to sender's sent | |
| sent = agent_day(cfg, d) / SENT | |
| sent.mkdir(parents=True, exist_ok=True) | |
| (sent / f"{msg_id}.txt").write_text(content, encoding="utf-8") | |
| return f"Message sent to {to_agent}: '{subject}' (ID: {msg_id})" | |
| def _tool_inbox(cfg: dict, args: list[str]) -> str: | |
| """List inbox messages.""" | |
| d = today_str() | |
| inbox_dir = agent_day(cfg, d) / INBOX | |
| if not inbox_dir.exists(): | |
| return "Inbox is empty." | |
| messages = [] | |
| for f in sorted(inbox_dir.iterdir(), reverse=True): | |
| if not f.is_file() or f.suffix != ".txt": | |
| continue | |
| headers = parse_headers_only(f) | |
| if headers: | |
| fr = headers.get("From", "?") | |
| subj = headers.get("Subject", "(no subject)") | |
| messages.append(f" [{f.stem}] From: {fr} | Subject: {subj}") | |
| if not messages: | |
| return "Inbox is empty." | |
| return f"Inbox ({len(messages)} messages):\n" + "\n".join(messages) | |
| def _tool_directory(cfg: dict, args: list[str]) -> str: | |
| """List all agents in the network.""" | |
| mb = Path(cfg["mailbox_dir"]) | |
| if not mb.exists(): | |
| return "No agents found." | |
| agents = [] | |
| for agent_dir in sorted(mb.iterdir()): | |
| if not agent_dir.is_dir() or agent_dir.name.startswith("."): | |
| continue | |
| name = agent_dir.name | |
| # Find latest date dir with identity | |
| identity = "?" | |
| doing = "?" | |
| for date_dir in sorted(agent_dir.iterdir(), reverse=True): | |
| if not date_dir.is_dir() or not re.match(r"\d{4}-\d{2}-\d{2}$", date_dir.name): | |
| continue | |
| id_file = date_dir / "who_am_i" / "identity.txt" | |
| if id_file.exists(): | |
| identity = id_file.read_text(encoding="utf-8").strip()[:120] | |
| task_file = date_dir / "what_am_i_doing" / "tasks.txt" | |
| if task_file.exists(): | |
| doing = task_file.read_text(encoding="utf-8").strip()[:120] | |
| break | |
| agents.append(f" {name}\n Identity: {identity}\n Doing: {doing}") | |
| if not agents: | |
| return "No agents found." | |
| return f"Agent Directory ({len(agents)} agents):\n\n" + "\n\n".join(agents) | |
| # Tool dispatch table | |
| TOOL_MAP = { | |
| "remember": _tool_remember, | |
| "recall": _tool_recall, | |
| "whoami": _tool_whoami, | |
| "doing": _tool_doing, | |
| "note": _tool_note, | |
| "send": _tool_send, | |
| "inbox": _tool_inbox, | |
| "directory": _tool_directory, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # System prompt & context builder | |
| # --------------------------------------------------------------------------- | |
| SYSTEM_PROMPT_TEMPLATE = """\ | |
| You are "{agent_name}", an AI assistant with persistent memory powered by AgentAZAll. | |
| You can use tools by writing them on their own line in this exact format: | |
| [TOOL: command | arg1 | arg2] | |
| Available tools: | |
| - [TOOL: remember | text to remember | optional-title] -- Store a persistent memory | |
| - [TOOL: recall | optional search query] -- Search your memories (blank = show all) | |
| - [TOOL: whoami] -- Check your identity | |
| - [TOOL: doing | optional new status] -- Check or update your current task | |
| - [TOOL: note | name | optional text to save] -- Read or write a named note | |
| - [TOOL: send | recipient@localhost | subject | body] -- Send a message to another agent | |
| - [TOOL: inbox] -- Check your inbox for messages | |
| - [TOOL: directory] -- List all agents in the network | |
| When you use a tool, write ONLY the [TOOL: ...] line. The system will execute it | |
| and show you the result. Then continue your response naturally. | |
| You ARE a persistent agent. Information you [TOOL: remember] will survive across | |
| conversations. You can message other agents and they can message you back. | |
| This is not a simulation -- these are real file operations on the filesystem. | |
| YOUR CURRENT STATE: | |
| Identity: {identity} | |
| Current task: {doing} | |
| YOUR MEMORIES: | |
| {memories} | |
| YOUR INBOX: | |
| {inbox} | |
| AGENTS IN NETWORK: | |
| {directory} | |
| Respond naturally and helpfully. Use tools when relevant -- remember observations, | |
| check your inbox, send messages to collaborators. Keep responses concise.\ | |
| """ | |
| def build_system_prompt(cfg: dict) -> str: | |
| """Assemble the system prompt with live context from the agent's state.""" | |
| identity = _tool_whoami(cfg, []) | |
| doing = _tool_doing(cfg, []) | |
| memories = _tool_recall(cfg, []) | |
| inbox = _tool_inbox(cfg, []) | |
| directory = _tool_directory(cfg, []) | |
| return SYSTEM_PROMPT_TEMPLATE.format( | |
| agent_name=cfg["agent_name"], | |
| identity=identity, | |
| doing=doing, | |
| memories=memories, | |
| inbox=inbox, | |
| directory=directory, | |
| ) | |
| def parse_tool_calls(text: str) -> list[tuple[str, list[str]]]: | |
| """Extract [TOOL: cmd | arg1 | arg2] patterns from LLM output.""" | |
| calls = [] | |
| for match in TOOL_PATTERN.finditer(text): | |
| cmd = match.group(1).lower().strip() | |
| raw_args = match.group(2) or "" | |
| args = [a.strip() for a in raw_args.split("|")] if raw_args.strip() else [] | |
| if cmd in TOOL_MAP: | |
| calls.append((cmd, args)) | |
| return calls | |
| def execute_tools(tool_calls: list[tuple[str, list[str]]], cfg: dict) -> str: | |
| """Execute parsed tool calls and return formatted results.""" | |
| results = [] | |
| for cmd, args in tool_calls: | |
| fn = TOOL_MAP.get(cmd) | |
| if fn: | |
| try: | |
| result = fn(cfg, args) | |
| except Exception as e: | |
| result = f"Error executing {cmd}: {e}" | |
| results.append(f"**[{cmd}]** {result}") | |
| return "\n\n".join(results) | |
| # --------------------------------------------------------------------------- | |
| # Model loading — module-level for ZeroGPU caching | |
| # --------------------------------------------------------------------------- | |
| def _is_on_hf_spaces() -> bool: | |
| """Detect if running on Hugging Face Spaces.""" | |
| return "SPACE_ID" in os.environ | |
| # Lazy model holders — loaded on first use, then cached | |
| _models = {} | |
| def _ensure_models(): | |
| """Load both models if not already cached.""" | |
| if "alpha" in _models: | |
| return | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| dtype = torch.bfloat16 if device == "cuda" else torch.float32 | |
| # Alpha: Qwen2.5-3B-Instruct | |
| _models["alpha_tokenizer"] = AutoTokenizer.from_pretrained(ALPHA_MODEL_ID) | |
| alpha_model = AutoModelForCausalLM.from_pretrained( | |
| ALPHA_MODEL_ID, torch_dtype=dtype, | |
| device_map="auto" if device == "cuda" else None, | |
| ) | |
| if device != "cuda": | |
| alpha_model = alpha_model.to(device) | |
| _models["alpha"] = alpha_model | |
| # Beta: SmolLM2-1.7B-Instruct | |
| _models["beta_tokenizer"] = AutoTokenizer.from_pretrained(BETA_MODEL_ID) | |
| beta_model = AutoModelForCausalLM.from_pretrained( | |
| BETA_MODEL_ID, torch_dtype=dtype, | |
| device_map="auto" if device == "cuda" else None, | |
| ) | |
| if device != "cuda": | |
| beta_model = beta_model.to(device) | |
| _models["beta"] = beta_model | |
| # --------------------------------------------------------------------------- | |
| # Unified generate function | |
| # --------------------------------------------------------------------------- | |
| def generate_response(agent_id: str, message: str, history: list, cfg: dict) -> str: | |
| """Generate a response using the appropriate model with AgentAZAll tools. | |
| agent_id: "alpha" or "beta" | |
| On HF Spaces this runs on ZeroGPU. Locally it runs on CPU. | |
| """ | |
| import torch | |
| _ensure_models() | |
| model = _models[agent_id] | |
| tokenizer = _models[f"{agent_id}_tokenizer"] | |
| device = next(model.parameters()).device | |
| # Build messages with context | |
| system_prompt = build_system_prompt(cfg) | |
| messages = [{"role": "system", "content": system_prompt}] | |
| # Add conversation history | |
| for h in history: | |
| if isinstance(h, (list, tuple)) and len(h) == 2: | |
| messages.append({"role": "user", "content": str(h[0])}) | |
| messages.append({"role": "assistant", "content": str(h[1])}) | |
| elif isinstance(h, dict): | |
| messages.append(h) | |
| messages.append({"role": "user", "content": message}) | |
| # Tokenize and generate | |
| input_text = tokenizer.apply_chat_template( | |
| messages, tokenize=False, add_generation_prompt=True | |
| ) | |
| inputs = tokenizer(input_text, return_tensors="pt").to(device) | |
| with torch.no_grad(): | |
| outputs = model.generate( | |
| **inputs, | |
| max_new_tokens=384, | |
| temperature=0.7, | |
| top_p=0.9, | |
| do_sample=True, | |
| repetition_penalty=1.1, | |
| ) | |
| response = tokenizer.decode( | |
| outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True | |
| ) | |
| # Parse and execute tool calls | |
| tool_calls = parse_tool_calls(response) | |
| if tool_calls: | |
| tool_results = execute_tools(tool_calls, cfg) | |
| clean_response = TOOL_PATTERN.sub("", response).strip() | |
| if clean_response: | |
| return f"{clean_response}\n\n---\n*Tool results:*\n{tool_results}" | |
| return f"*Tool results:*\n{tool_results}" | |
| return response | |
| # Apply @spaces.GPU decorator only on HF Spaces | |
| if _is_on_hf_spaces(): | |
| try: | |
| import spaces | |
| generate_response = spaces.GPU(duration=120)(generate_response) | |
| except ImportError: | |
| pass | |