Spaces:
Sleeping
Sleeping
| """ | |
| core_agent.py — Enhanced Computer Agent Brain | |
| ============================================= | |
| Hierarchical Planner + Verifier + Multi-Model Router + Long-Term Memory | |
| """ | |
| import os | |
| import json | |
| import time | |
| import uuid | |
| from datetime import datetime | |
| from typing import Any, Dict, List, Optional, Tuple | |
| from dataclasses import dataclass, field | |
| import numpy as np | |
| from PIL import Image, ImageDraw, ImageFont | |
| # Smolagents | |
| from smolagents import CodeAgent, tool | |
| from smolagents.agent_types import AgentImage | |
| from smolagents.memory import ActionStep, TaskStep | |
| from smolagents.models import ChatMessage, Model, HfApiModel | |
| from smolagents.monitoring import LogLevel | |
| # Local model fallback | |
| from huggingface_hub import InferenceClient | |
| # Try ChromaDB for memory | |
| try: | |
| import chromadb | |
| from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction | |
| HAS_CHROMA = True | |
| except ImportError: | |
| HAS_CHROMA = False | |
| # Try sentence-transformers for embeddings | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| HAS_ST = True | |
| except ImportError: | |
| HAS_ST = False | |
| # --------------------------------------------------------------------------- | |
| # Data models | |
| # --------------------------------------------------------------------------- | |
| class Subtask: | |
| id: str | |
| description: str | |
| status: str = "pending" # pending | running | completed | failed | |
| strategy: str = "auto" # browser | desktop | code | vision | |
| depends_on: List[str] = field(default_factory=list) | |
| result: Any = None | |
| retries: int = 0 | |
| max_retries: int = 2 | |
| class Plan: | |
| goal: str | |
| subtasks: List[Subtask] | |
| created_at: float = field(default_factory=time.time) | |
| class ModelCall: | |
| model_id: str | |
| tokens_in: int = 0 | |
| tokens_out: int = 0 | |
| latency_ms: float = 0.0 | |
| cost_usd: float = 0.0 | |
| timestamp: float = field(default_factory=time.time) | |
| # --------------------------------------------------------------------------- | |
| # Multi-Model Intelligence Router | |
| # --------------------------------------------------------------------------- | |
| MODEL_REGISTRY = { | |
| "fast_vision": { | |
| "model_id": "Qwen/Qwen2.5-VL-7B-Instruct", | |
| "endpoint": None, # Use HF Inference API | |
| "type": "vision", | |
| "cost_per_1k_in": 0.0001, | |
| "cost_per_1k_out": 0.0002, | |
| "max_tokens": 2048, | |
| }, | |
| "powerful_vision": { | |
| "model_id": "Qwen/Qwen2.5-VL-72B-Instruct", | |
| "endpoint": None, | |
| "type": "vision", | |
| "cost_per_1k_in": 0.001, | |
| "cost_per_1k_out": 0.002, | |
| "max_tokens": 4096, | |
| }, | |
| "fast_text": { | |
| "model_id": "Qwen/Qwen2.5-32B-Instruct", | |
| "endpoint": None, | |
| "type": "text", | |
| "cost_per_1k_in": 0.0002, | |
| "cost_per_1k_out": 0.0004, | |
| "max_tokens": 4096, | |
| }, | |
| "powerful_text": { | |
| "model_id": "Qwen/Qwen3-235B-A22B", | |
| "endpoint": None, | |
| "type": "text", | |
| "cost_per_1k_in": 0.0015, | |
| "cost_per_1k_out": 0.003, | |
| "max_tokens": 8192, | |
| }, | |
| } | |
| class IntelligenceRouter(Model): | |
| """Routes tasks to the optimal model based on complexity, modality, and cost.""" | |
| def __init__( | |
| self, | |
| hf_token: Optional[str] = None, | |
| default_vision: str = "powerful_vision", | |
| default_text: str = "fast_text", | |
| cost_budget_usd: float = 1.0, | |
| ): | |
| super().__init__() | |
| self.hf_token = hf_token or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_API_KEY") | |
| self.default_vision = default_vision | |
| self.default_text = default_text | |
| self.cost_budget_usd = cost_budget_usd | |
| self.cost_so_far_usd = 0.0 | |
| self.call_history: List[ModelCall] = [] | |
| self._clients: Dict[str, InferenceClient] = {} | |
| def _get_client(self, model_key: str) -> InferenceClient: | |
| if model_key not in self._clients: | |
| cfg = MODEL_REGISTRY[model_key] | |
| self._clients[model_key] = InferenceClient( | |
| model=cfg["model_id"], | |
| token=self.hf_token, | |
| ) | |
| return self._clients[model_key] | |
| def select_model( | |
| self, | |
| task_type: str = "vision", | |
| complexity: str = "medium", | |
| has_images: bool = False, | |
| ) -> str: | |
| """Select the best model for a given task.""" | |
| if self.cost_so_far_usd >= self.cost_budget_usd * 0.9: | |
| # Budget nearly exhausted — use cheapest | |
| return "fast_vision" if has_images else "fast_text" | |
| if has_images or task_type == "vision": | |
| if complexity in ("high", "complex", "spatial"): | |
| return self.default_vision | |
| return "fast_vision" | |
| if complexity in ("high", "complex", "reasoning"): | |
| return "powerful_text" | |
| return self.default_text | |
| def __call__( | |
| self, | |
| messages: List[Dict[str, Any]], | |
| stop_sequences: Optional[List[str]] = None, | |
| task_type: str = "vision", | |
| complexity: str = "medium", | |
| has_images: bool = False, | |
| **kwargs, | |
| ) -> ChatMessage: | |
| model_key = self.select_model(task_type, complexity, has_images) | |
| cfg = MODEL_REGISTRY[model_key] | |
| client = self._get_client(model_key) | |
| start = time.time() | |
| try: | |
| # HF InferenceClient chat_completion | |
| response = client.chat_completion( | |
| messages=messages, | |
| max_tokens=cfg["max_tokens"], | |
| stop=stop_sequences, | |
| ) | |
| latency = (time.time() - start) * 1000 | |
| # Estimate cost (rough token counting) | |
| content = response.choices[0].message.content or "" | |
| tok_in = self._estimate_tokens(messages) | |
| tok_out = len(content.split()) * 1.3 # rough | |
| cost = (tok_in / 1000) * cfg["cost_per_1k_in"] + (tok_out / 1000) * cfg["cost_per_1k_out"] | |
| self.cost_so_far_usd += cost | |
| self.call_history.append(ModelCall( | |
| model_id=cfg["model_id"], | |
| tokens_in=int(tok_in), | |
| tokens_out=int(tok_out), | |
| latency_ms=latency, | |
| cost_usd=cost, | |
| )) | |
| return ChatMessage(role="assistant", content=content) | |
| except Exception as e: | |
| # Fallback to default vision/text | |
| fallback = self.default_vision if has_images else self.default_text | |
| if model_key == fallback: | |
| raise | |
| print(f"[{model_key}] failed: {e}. Falling back to {fallback}") | |
| return self.__call__( | |
| messages, stop_sequences, task_type, complexity, has_images, **kwargs | |
| ) | |
| def _estimate_tokens(self, messages: List[Dict[str, Any]]) -> int: | |
| # Very rough estimate: 4 chars ~= 1 token | |
| total = 0 | |
| for msg in messages: | |
| content = msg.get("content", "") | |
| if isinstance(content, str): | |
| total += len(content) // 4 | |
| elif isinstance(content, list): | |
| for item in content: | |
| if isinstance(item, dict) and "text" in item: | |
| total += len(item["text"]) // 4 | |
| return max(total, 1) | |
| def get_cost_report(self) -> Dict[str, Any]: | |
| return { | |
| "budget_usd": self.cost_budget_usd, | |
| "spent_usd": round(self.cost_so_far_usd, 6), | |
| "remaining_usd": round(self.cost_budget_usd - self.cost_so_far_usd, 6), | |
| "calls": len(self.call_history), | |
| "by_model": self._aggregate_by_model(), | |
| } | |
| def _aggregate_by_model(self) -> Dict[str, Dict[str, float]]: | |
| agg = {} | |
| for c in self.call_history: | |
| agg.setdefault(c.model_id, {"calls": 0, "tokens_in": 0, "tokens_out": 0, "cost": 0.0}) | |
| agg[c.model_id]["calls"] += 1 | |
| agg[c.model_id]["tokens_in"] += c.tokens_in | |
| agg[c.model_id]["tokens_out"] += c.tokens_out | |
| agg[c.model_id]["cost"] += c.cost_usd | |
| return agg | |
| # --------------------------------------------------------------------------- | |
| # Hierarchical Planner | |
| # --------------------------------------------------------------------------- | |
| PLANNER_SYSTEM_PROMPT = """You are a Task Planner for a computer automation agent. | |
| Given a user's high-level goal, break it into a JSON list of subtasks. | |
| Each subtask must have: | |
| - description: concise action description | |
| - strategy: one of [browser, desktop, code, vision] | |
| - depends_on: list of subtask indices (0-based) that must finish before this one | |
| Rules: | |
| 1. Use "browser" for web navigation, "desktop" for OS-level GUI actions, | |
| "code" for writing/running scripts, "vision" for visual reasoning. | |
| 2. Keep subtasks atomic (1-3 actions each). | |
| 3. Start with gathering info, then acting, then verifying. | |
| 4. Output ONLY valid JSON. No markdown fences. | |
| Example input: "Find Hugging Face HQ in Paris using Google Maps" | |
| Example output: | |
| [ | |
| {"description": "Open Google Maps in browser", "strategy": "browser", "depends_on": []}, | |
| {"description": "Search for 'Hugging Face Paris'", "strategy": "browser", "depends_on": [0]}, | |
| {"description": "Extract the address from the result card", "strategy": "vision", "depends_on": [1]}, | |
| {"description": "Verify the address contains 'Paris'", "strategy": "code", "depends_on": [2]} | |
| ] | |
| """ | |
| class HierarchicalPlanner: | |
| """Breaks a user goal into a DAG of subtasks using a cheap text model.""" | |
| def __init__(self, router: IntelligenceRouter): | |
| self.router = router | |
| def plan(self, goal: str, context: str = "") -> Plan: | |
| messages = [ | |
| {"role": "system", "content": PLANNER_SYSTEM_PROMPT}, | |
| {"role": "user", "content": f"Goal: {goal}\nContext: {context}\n\nGenerate the subtask JSON list."}, | |
| ] | |
| response = self.router( | |
| messages, | |
| task_type="text", | |
| complexity="medium", | |
| has_images=False, | |
| ) | |
| raw = response.content.strip() | |
| # Strip markdown fences if present | |
| if raw.startswith("```"): | |
| raw = raw.split("```", 2)[-1] | |
| if raw.startswith("json"): | |
| raw = raw[4:] | |
| raw = raw.strip() | |
| try: | |
| data = json.loads(raw) | |
| except json.JSONDecodeError: | |
| # Fallback: single subtask with the whole goal | |
| data = [{"description": goal, "strategy": "auto", "depends_on": []}] | |
| subtasks = [] | |
| for i, item in enumerate(data): | |
| subtasks.append(Subtask( | |
| id=f"st_{i:03d}", | |
| description=item.get("description", str(item)), | |
| strategy=item.get("strategy", "auto"), | |
| depends_on=item.get("depends_on", []), | |
| )) | |
| return Plan(goal=goal, subtasks=subtasks) | |
| # --------------------------------------------------------------------------- | |
| # Verifier & Recovery | |
| # --------------------------------------------------------------------------- | |
| VERIFIER_SYSTEM_PROMPT = """You are a Verifier agent. Given a subtask description, the agent's action trace, and a screenshot, determine if the subtask was completed successfully. | |
| Respond with ONLY a JSON object: | |
| {"success": true/false, "reason": "short explanation", "next_action": "continue|retry|alternative"} | |
| Rules: | |
| - success=true if the intended outcome is clearly visible in the screenshot or trace. | |
| - next_action=retry if the agent seems close but missed a click. | |
| - next_action=alternative if the approach is fundamentally wrong. | |
| """ | |
| class VerifierAgent: | |
| """Checks if a subtask succeeded and suggests recovery.""" | |
| def __init__(self, router: IntelligenceRouter): | |
| self.router = router | |
| def verify( | |
| self, | |
| subtask: Subtask, | |
| action_trace: List[str], | |
| screenshot: Optional[Image.Image] = None, | |
| ) -> Dict[str, Any]: | |
| trace_text = "\n".join(action_trace[-10:]) # last 10 actions | |
| content = [ | |
| {"type": "text", "text": f"Subtask: {subtask.description}\nAction trace:\n{trace_text}\n\nWas this completed successfully?"}, | |
| ] | |
| if screenshot: | |
| # In a real implementation we'd base64 encode the image | |
| content.append({"type": "text", "text": "[Screenshot available — analyze it]"}) | |
| messages = [ | |
| {"role": "system", "content": VERIFIER_SYSTEM_PROMPT}, | |
| {"role": "user", "content": content}, | |
| ] | |
| response = self.router( | |
| messages, | |
| task_type="vision" if screenshot else "text", | |
| complexity="medium", | |
| has_images=screenshot is not None, | |
| ) | |
| raw = response.content.strip() | |
| if raw.startswith("```"): | |
| raw = raw.split("```", 2)[-1] | |
| if raw.startswith("json"): | |
| raw = raw[4:] | |
| raw = raw.strip() | |
| try: | |
| return json.loads(raw) | |
| except json.JSONDecodeError: | |
| return {"success": True, "reason": "Parsing failed, assuming success", "next_action": "continue"} | |
| # --------------------------------------------------------------------------- | |
| # Long-Term Memory (ChromaDB) | |
| # --------------------------------------------------------------------------- | |
| class AgentMemory: | |
| """Stores and retrieves past task trajectories for few-shot prompting.""" | |
| def __init__(self, persist_dir: str = "./memory_db"): | |
| self.persist_dir = persist_dir | |
| os.makedirs(persist_dir, exist_ok=True) | |
| self.collection = None | |
| if HAS_CHROMA and HAS_ST: | |
| self.client = chromadb.PersistentClient(path=persist_dir) | |
| self.ef = SentenceTransformerEmbeddingFunction(model_name="all-MiniLM-L6-v2") | |
| self.collection = self.client.get_or_create_collection( | |
| name="task_memory", | |
| embedding_function=self.ef, | |
| ) | |
| elif HAS_ST: | |
| # Fallback: in-memory similarity with numpy | |
| self.embedder = SentenceTransformer("all-MiniLM-L6-v2") | |
| self._memories: List[Dict] = [] | |
| else: | |
| self._memories: List[Dict] = [] | |
| def embed(self, text: str) -> List[float]: | |
| if HAS_ST: | |
| return self.embedder.encode(text).tolist() | |
| return [] | |
| def add_task( | |
| self, | |
| task: str, | |
| strategy_summary: str, | |
| success: bool, | |
| final_answer: str = "", | |
| domain: str = "general", | |
| ): | |
| entry = { | |
| "task": task, | |
| "strategy_summary": strategy_summary, | |
| "success": success, | |
| "final_answer": final_answer, | |
| "domain": domain, | |
| "timestamp": time.time(), | |
| } | |
| if self.collection: | |
| self.collection.add( | |
| documents=[task], | |
| metadatas=[entry], | |
| ids=[str(uuid.uuid4())], | |
| ) | |
| else: | |
| self._memories.append(entry) | |
| def retrieve_similar( | |
| self, | |
| query: str, | |
| n_results: int = 3, | |
| filter_success: bool = True, | |
| ) -> List[Dict[str, Any]]: | |
| if self.collection: | |
| where = {"success": True} if filter_success else None | |
| results = self.collection.query( | |
| query_texts=[query], | |
| n_results=n_results, | |
| where=where, | |
| ) | |
| out = [] | |
| for meta in results.get("metadatas", [[]])[0]: | |
| out.append(meta) | |
| return out | |
| else: | |
| # Simple exact/contains match fallback | |
| query_lower = query.lower() | |
| scored = [] | |
| for m in self._memories: | |
| score = 0 | |
| if query_lower in m["task"].lower(): | |
| score += 10 | |
| if m.get("domain", "") in query_lower: | |
| score += 5 | |
| if filter_success and not m.get("success", False): | |
| score -= 100 | |
| scored.append((score, m)) | |
| scored.sort(key=lambda x: x[0], reverse=True) | |
| return [x[1] for x in scored[:n_results]] | |
| def get_domain_tips(self, domain: str) -> List[str]: | |
| tips = [] | |
| for m in self._memories: | |
| if m.get("domain") == domain and m.get("success"): | |
| tips.append(m.get("strategy_summary", "")) | |
| return tips[:5] | |
| # --------------------------------------------------------------------------- | |
| # Set-of-Marks (SoM) Preprocessor | |
| # --------------------------------------------------------------------------- | |
| class SoMPreprocessor: | |
| """Overlays numbered bounding boxes on UI elements for the agent to reference by ID.""" | |
| def __init__(self, use_icon_detection: bool = False): | |
| self.use_icon_detection = use_icon_detection | |
| self.element_registry: Dict[int, Tuple[int, int, int, int]] = {} | |
| self.next_id = 1 | |
| def detect_elements(self, image: Image.Image) -> List[Tuple[int, int, int, int]]: | |
| """Lightweight heuristic element detection. | |
| In production, replace with OmniParser or seeclick model. | |
| """ | |
| # Simple grid-based + edge heuristic fallback | |
| w, h = image.size | |
| boxes = [] | |
| # Detect potential buttons/links by looking for rectangular regions | |
| # This is a placeholder — real implementation would use a vision model | |
| # For now, divide screen into a coarse grid and let agent pick grid cells | |
| cols, rows = 8, 6 | |
| cell_w, cell_h = w // cols, h // rows | |
| for r in range(rows): | |
| for c in range(cols): | |
| x1, y1 = c * cell_w, r * cell_h | |
| x2, y2 = x1 + cell_w, y1 + cell_h | |
| boxes.append((x1, y1, x2, y2)) | |
| return boxes | |
| def preprocess(self, image: Image.Image) -> Tuple[Image.Image, Dict[int, Tuple[int, int, int, int]]]: | |
| """Return annotated image + element registry mapping ID -> bbox.""" | |
| boxes = self.detect_elements(image) | |
| annotated = image.copy() | |
| draw = ImageDraw.Draw(annotated) | |
| registry = {} | |
| try: | |
| font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 14) | |
| except Exception: | |
| font = ImageFont.load_default() | |
| for i, (x1, y1, x2, y2) in enumerate(boxes, start=1): | |
| registry[i] = (x1, y1, x2, y2) | |
| # Draw bounding box | |
| draw.rectangle([x1, y1, x2, y2], outline="#00FF00", width=2) | |
| # Draw label background | |
| label = str(i) | |
| bbox = draw.textbbox((0, 0), label, font=font) | |
| tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1] | |
| draw.rectangle([x1, y1, x1 + tw + 4, y1 + th + 4], fill="#00FF00") | |
| draw.text((x1 + 2, y1 + 2), label, fill="#000000", font=font) | |
| self.element_registry = registry | |
| self.next_id = len(registry) + 1 | |
| return annotated, registry | |
| def get_center(self, element_id: int) -> Tuple[int, int]: | |
| x1, y1, x2, y2 = self.element_registry[element_id] | |
| return (x1 + x2) // 2, (y1 + y2) // 2 | |
| # --------------------------------------------------------------------------- | |
| # Session Recorder & Macro Saver | |
| # --------------------------------------------------------------------------- | |
| class SessionFrame: | |
| step: int | |
| screenshot_path: Optional[str] | |
| action: str | |
| observation: str | |
| timestamp: float | |
| class SessionRecorder: | |
| """Records every step for replay, GIF generation, and macro creation.""" | |
| def __init__(self, session_id: str, output_dir: str = "./sessions"): | |
| self.session_id = session_id | |
| self.output_dir = os.path.join(output_dir, session_id) | |
| os.makedirs(self.output_dir, exist_ok=True) | |
| self.frames: List[SessionFrame] = [] | |
| self.start_time = time.time() | |
| def log_step( | |
| self, | |
| step: int, | |
| screenshot: Optional[Image.Image], | |
| action: str, | |
| observation: str, | |
| ): | |
| path = None | |
| if screenshot: | |
| path = os.path.join(self.output_dir, f"step_{step:03d}.png") | |
| screenshot.save(path) | |
| frame = SessionFrame( | |
| step=step, | |
| screenshot_path=path, | |
| action=action, | |
| observation=observation, | |
| timestamp=time.time(), | |
| ) | |
| self.frames.append(frame) | |
| # Also append to JSONL | |
| with open(os.path.join(self.output_dir, "session.jsonl"), "a") as f: | |
| f.write(json.dumps({ | |
| "step": step, | |
| "action": action, | |
| "observation": observation, | |
| "timestamp": frame.timestamp, | |
| "screenshot": path, | |
| }) + "\n") | |
| def save_macro(self, name: str) -> str: | |
| """Save successful trajectory as a replayable macro.""" | |
| macro = { | |
| "name": name, | |
| "session_id": self.session_id, | |
| "frames": [ | |
| {"action": f.action, "observation": f.observation, "timestamp": f.timestamp} | |
| for f in self.frames | |
| ], | |
| } | |
| path = os.path.join(self.output_dir, f"macro_{name}.json") | |
| with open(path, "w") as f: | |
| json.dump(macro, f, indent=2) | |
| return path | |
| def generate_summary(self) -> Dict[str, Any]: | |
| duration = time.time() - self.start_time | |
| actions = [f.action for f in self.frames] | |
| return { | |
| "session_id": self.session_id, | |
| "duration_sec": round(duration, 2), | |
| "steps": len(self.frames), | |
| "actions": actions, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # HITL (Human-in-the-Loop) Checkpoint | |
| # --------------------------------------------------------------------------- | |
| class HITLCheckpoint: | |
| """Defines categories of actions that require human approval.""" | |
| SENSITIVE_KEYWORDS = [ | |
| "password", "credit card", "ssn", "social security", | |
| "payment", "checkout", "buy", "purchase", "subscribe", | |
| "delete", "remove", "uninstall", "format", | |
| "send email", "send message", "post to", "tweet", | |
| ] | |
| def __init__(self, auto_approve: bool = False): | |
| self.auto_approve = auto_approve | |
| self.pending_approvals: List[Dict[str, Any]] = [] | |
| def check_action(self, action: str, context: str = "") -> Tuple[bool, Optional[str]]: | |
| """Returns (approved, reason). If not approved, reason explains why.""" | |
| if self.auto_approve: | |
| return True, None | |
| action_lower = action.lower() | |
| for kw in self.SENSITIVE_KEYWORDS: | |
| if kw in action_lower: | |
| return False, f"Sensitive action detected: '{kw}'. Requires human approval." | |
| return True, None | |
| def request_approval(self, action: str, screenshot_path: Optional[str] = None) -> Dict[str, Any]: | |
| req = { | |
| "id": str(uuid.uuid4()), | |
| "action": action, | |
| "screenshot": screenshot_path, | |
| "status": "pending", | |
| "requested_at": time.time(), | |
| } | |
| self.pending_approvals.append(req) | |
| return req | |
| # --------------------------------------------------------------------------- | |
| # Cost Tracker | |
| # --------------------------------------------------------------------------- | |
| class CostTracker: | |
| """Tracks per-task and cumulative costs across all model calls.""" | |
| def __init__(self): | |
| self.tasks: Dict[str, List[ModelCall]] = {} | |
| def start_task(self, task_id: str): | |
| self.tasks[task_id] = [] | |
| def log_call(self, task_id: str, call: ModelCall): | |
| self.tasks.setdefault(task_id, []).append(call) | |
| def get_task_report(self, task_id: str) -> Dict[str, Any]: | |
| calls = self.tasks.get(task_id, []) | |
| total_cost = sum(c.cost_usd for c in calls) | |
| total_tokens = sum(c.tokens_in + c.tokens_out for c in calls) | |
| total_latency = sum(c.latency_ms for c in calls) | |
| return { | |
| "task_id": task_id, | |
| "calls": len(calls), | |
| "total_cost_usd": round(total_cost, 6), | |
| "total_tokens": total_tokens, | |
| "avg_latency_ms": round(total_latency / max(len(calls), 1), 2), | |
| "by_model": self._aggregate(calls), | |
| } | |
| def _aggregate(self, calls: List[ModelCall]) -> Dict[str, Dict[str, float]]: | |
| agg = {} | |
| for c in calls: | |
| agg.setdefault(c.model_id, {"calls": 0, "cost": 0.0, "tokens": 0}) | |
| agg[c.model_id]["calls"] += 1 | |
| agg[c.model_id]["cost"] += c.cost_usd | |
| agg[c.model_id]["tokens"] += c.tokens_in + c.tokens_out | |
| return agg | |
| # --------------------------------------------------------------------------- | |
| # Convenience: Compose everything into an AgentConfig | |
| # --------------------------------------------------------------------------- | |
| class AgentConfig: | |
| hf_token: Optional[str] = None | |
| cost_budget_usd: float = 2.0 | |
| use_planner: bool = True | |
| use_verifier: bool = True | |
| use_memory: bool = True | |
| use_som: bool = True | |
| use_hitl: bool = True | |
| use_recorder: bool = True | |
| memory_dir: str = "./memory_db" | |
| auto_approve: bool = False | |