| """ |
| core_agent.py — Enhanced Computer Agent Brain |
| ============================================= |
| Hierarchical Planner + Verifier + Multi-Model Router + Long-Term Memory |
| """ |
|
|
| import os |
| import json |
| import time |
| import uuid |
| from datetime import datetime |
| from typing import Any, Dict, List, Optional, Tuple |
| from dataclasses import dataclass, field |
|
|
| import numpy as np |
| from PIL import Image, ImageDraw, ImageFont |
|
|
| |
| from smolagents import CodeAgent, tool |
| from smolagents.agent_types import AgentImage |
| from smolagents.memory import ActionStep, TaskStep |
| from smolagents.models import ChatMessage, Model, HfApiModel |
| from smolagents.monitoring import LogLevel |
|
|
| |
| from huggingface_hub import InferenceClient |
|
|
| |
| try: |
| import chromadb |
| from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction |
| HAS_CHROMA = True |
| except ImportError: |
| HAS_CHROMA = False |
|
|
| |
| try: |
| from sentence_transformers import SentenceTransformer |
| HAS_ST = True |
| except ImportError: |
| HAS_ST = False |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class Subtask: |
| id: str |
| description: str |
| status: str = "pending" |
| strategy: str = "auto" |
| depends_on: List[str] = field(default_factory=list) |
| result: Any = None |
| retries: int = 0 |
| max_retries: int = 2 |
|
|
|
|
| @dataclass |
| class Plan: |
| goal: str |
| subtasks: List[Subtask] |
| created_at: float = field(default_factory=time.time) |
|
|
|
|
| @dataclass |
| class ModelCall: |
| model_id: str |
| tokens_in: int = 0 |
| tokens_out: int = 0 |
| latency_ms: float = 0.0 |
| cost_usd: float = 0.0 |
| timestamp: float = field(default_factory=time.time) |
|
|
|
|
| |
| |
| |
|
|
| MODEL_REGISTRY = { |
| "fast_vision": { |
| "model_id": "Qwen/Qwen2.5-VL-7B-Instruct", |
| "endpoint": None, |
| "type": "vision", |
| "cost_per_1k_in": 0.0001, |
| "cost_per_1k_out": 0.0002, |
| "max_tokens": 2048, |
| }, |
| "powerful_vision": { |
| "model_id": "Qwen/Qwen2.5-VL-72B-Instruct", |
| "endpoint": None, |
| "type": "vision", |
| "cost_per_1k_in": 0.001, |
| "cost_per_1k_out": 0.002, |
| "max_tokens": 4096, |
| }, |
| "fast_text": { |
| "model_id": "Qwen/Qwen2.5-32B-Instruct", |
| "endpoint": None, |
| "type": "text", |
| "cost_per_1k_in": 0.0002, |
| "cost_per_1k_out": 0.0004, |
| "max_tokens": 4096, |
| }, |
| "powerful_text": { |
| "model_id": "Qwen/Qwen3-235B-A22B", |
| "endpoint": None, |
| "type": "text", |
| "cost_per_1k_in": 0.0015, |
| "cost_per_1k_out": 0.003, |
| "max_tokens": 8192, |
| }, |
| } |
|
|
|
|
| class IntelligenceRouter(Model): |
| """Routes tasks to the optimal model based on complexity, modality, and cost.""" |
|
|
| def __init__( |
| self, |
| hf_token: Optional[str] = None, |
| default_vision: str = "powerful_vision", |
| default_text: str = "fast_text", |
| cost_budget_usd: float = 1.0, |
| ): |
| super().__init__() |
| self.hf_token = hf_token or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_API_KEY") |
| self.default_vision = default_vision |
| self.default_text = default_text |
| self.cost_budget_usd = cost_budget_usd |
| self.cost_so_far_usd = 0.0 |
| self.call_history: List[ModelCall] = [] |
| self._clients: Dict[str, InferenceClient] = {} |
|
|
| def _get_client(self, model_key: str) -> InferenceClient: |
| if model_key not in self._clients: |
| cfg = MODEL_REGISTRY[model_key] |
| self._clients[model_key] = InferenceClient( |
| model=cfg["model_id"], |
| token=self.hf_token, |
| ) |
| return self._clients[model_key] |
|
|
| def select_model( |
| self, |
| task_type: str = "vision", |
| complexity: str = "medium", |
| has_images: bool = False, |
| ) -> str: |
| """Select the best model for a given task.""" |
| if self.cost_so_far_usd >= self.cost_budget_usd * 0.9: |
| |
| return "fast_vision" if has_images else "fast_text" |
|
|
| if has_images or task_type == "vision": |
| if complexity in ("high", "complex", "spatial"): |
| return self.default_vision |
| return "fast_vision" |
|
|
| if complexity in ("high", "complex", "reasoning"): |
| return "powerful_text" |
| return self.default_text |
|
|
| def __call__( |
| self, |
| messages: List[Dict[str, Any]], |
| stop_sequences: Optional[List[str]] = None, |
| task_type: str = "vision", |
| complexity: str = "medium", |
| has_images: bool = False, |
| **kwargs, |
| ) -> ChatMessage: |
| model_key = self.select_model(task_type, complexity, has_images) |
| cfg = MODEL_REGISTRY[model_key] |
| client = self._get_client(model_key) |
|
|
| start = time.time() |
| try: |
| |
| response = client.chat_completion( |
| messages=messages, |
| max_tokens=cfg["max_tokens"], |
| stop=stop_sequences, |
| ) |
| latency = (time.time() - start) * 1000 |
|
|
| |
| content = response.choices[0].message.content or "" |
| tok_in = self._estimate_tokens(messages) |
| tok_out = len(content.split()) * 1.3 |
| cost = (tok_in / 1000) * cfg["cost_per_1k_in"] + (tok_out / 1000) * cfg["cost_per_1k_out"] |
| self.cost_so_far_usd += cost |
|
|
| self.call_history.append(ModelCall( |
| model_id=cfg["model_id"], |
| tokens_in=int(tok_in), |
| tokens_out=int(tok_out), |
| latency_ms=latency, |
| cost_usd=cost, |
| )) |
|
|
| return ChatMessage(role="assistant", content=content) |
| except Exception as e: |
| |
| fallback = self.default_vision if has_images else self.default_text |
| if model_key == fallback: |
| raise |
| print(f"[{model_key}] failed: {e}. Falling back to {fallback}") |
| return self.__call__( |
| messages, stop_sequences, task_type, complexity, has_images, **kwargs |
| ) |
|
|
| def _estimate_tokens(self, messages: List[Dict[str, Any]]) -> int: |
| |
| total = 0 |
| for msg in messages: |
| content = msg.get("content", "") |
| if isinstance(content, str): |
| total += len(content) // 4 |
| elif isinstance(content, list): |
| for item in content: |
| if isinstance(item, dict) and "text" in item: |
| total += len(item["text"]) // 4 |
| return max(total, 1) |
|
|
| def get_cost_report(self) -> Dict[str, Any]: |
| return { |
| "budget_usd": self.cost_budget_usd, |
| "spent_usd": round(self.cost_so_far_usd, 6), |
| "remaining_usd": round(self.cost_budget_usd - self.cost_so_far_usd, 6), |
| "calls": len(self.call_history), |
| "by_model": self._aggregate_by_model(), |
| } |
|
|
| def _aggregate_by_model(self) -> Dict[str, Dict[str, float]]: |
| agg = {} |
| for c in self.call_history: |
| agg.setdefault(c.model_id, {"calls": 0, "tokens_in": 0, "tokens_out": 0, "cost": 0.0}) |
| agg[c.model_id]["calls"] += 1 |
| agg[c.model_id]["tokens_in"] += c.tokens_in |
| agg[c.model_id]["tokens_out"] += c.tokens_out |
| agg[c.model_id]["cost"] += c.cost_usd |
| return agg |
|
|
|
|
| |
| |
| |
|
|
| PLANNER_SYSTEM_PROMPT = """You are a Task Planner for a computer automation agent. |
| Given a user's high-level goal, break it into a JSON list of subtasks. |
| Each subtask must have: |
| - description: concise action description |
| - strategy: one of [browser, desktop, code, vision] |
| - depends_on: list of subtask indices (0-based) that must finish before this one |
| |
| Rules: |
| 1. Use "browser" for web navigation, "desktop" for OS-level GUI actions, |
| "code" for writing/running scripts, "vision" for visual reasoning. |
| 2. Keep subtasks atomic (1-3 actions each). |
| 3. Start with gathering info, then acting, then verifying. |
| 4. Output ONLY valid JSON. No markdown fences. |
| |
| Example input: "Find Hugging Face HQ in Paris using Google Maps" |
| Example output: |
| [ |
| {"description": "Open Google Maps in browser", "strategy": "browser", "depends_on": []}, |
| {"description": "Search for 'Hugging Face Paris'", "strategy": "browser", "depends_on": [0]}, |
| {"description": "Extract the address from the result card", "strategy": "vision", "depends_on": [1]}, |
| {"description": "Verify the address contains 'Paris'", "strategy": "code", "depends_on": [2]} |
| ] |
| """ |
|
|
|
|
| class HierarchicalPlanner: |
| """Breaks a user goal into a DAG of subtasks using a cheap text model.""" |
|
|
| def __init__(self, router: IntelligenceRouter): |
| self.router = router |
|
|
| def plan(self, goal: str, context: str = "") -> Plan: |
| messages = [ |
| {"role": "system", "content": PLANNER_SYSTEM_PROMPT}, |
| {"role": "user", "content": f"Goal: {goal}\nContext: {context}\n\nGenerate the subtask JSON list."}, |
| ] |
| response = self.router( |
| messages, |
| task_type="text", |
| complexity="medium", |
| has_images=False, |
| ) |
| raw = response.content.strip() |
| |
| if raw.startswith("```"): |
| raw = raw.split("```", 2)[-1] |
| if raw.startswith("json"): |
| raw = raw[4:] |
| raw = raw.strip() |
|
|
| try: |
| data = json.loads(raw) |
| except json.JSONDecodeError: |
| |
| data = [{"description": goal, "strategy": "auto", "depends_on": []}] |
|
|
| subtasks = [] |
| for i, item in enumerate(data): |
| subtasks.append(Subtask( |
| id=f"st_{i:03d}", |
| description=item.get("description", str(item)), |
| strategy=item.get("strategy", "auto"), |
| depends_on=item.get("depends_on", []), |
| )) |
| return Plan(goal=goal, subtasks=subtasks) |
|
|
|
|
| |
| |
| |
|
|
| VERIFIER_SYSTEM_PROMPT = """You are a Verifier agent. Given a subtask description, the agent's action trace, and a screenshot, determine if the subtask was completed successfully. |
| |
| Respond with ONLY a JSON object: |
| {"success": true/false, "reason": "short explanation", "next_action": "continue|retry|alternative"} |
| |
| Rules: |
| - success=true if the intended outcome is clearly visible in the screenshot or trace. |
| - next_action=retry if the agent seems close but missed a click. |
| - next_action=alternative if the approach is fundamentally wrong. |
| """ |
|
|
|
|
| class VerifierAgent: |
| """Checks if a subtask succeeded and suggests recovery.""" |
|
|
| def __init__(self, router: IntelligenceRouter): |
| self.router = router |
|
|
| def verify( |
| self, |
| subtask: Subtask, |
| action_trace: List[str], |
| screenshot: Optional[Image.Image] = None, |
| ) -> Dict[str, Any]: |
| trace_text = "\n".join(action_trace[-10:]) |
| content = [ |
| {"type": "text", "text": f"Subtask: {subtask.description}\nAction trace:\n{trace_text}\n\nWas this completed successfully?"}, |
| ] |
| if screenshot: |
| |
| content.append({"type": "text", "text": "[Screenshot available — analyze it]"}) |
|
|
| messages = [ |
| {"role": "system", "content": VERIFIER_SYSTEM_PROMPT}, |
| {"role": "user", "content": content}, |
| ] |
| response = self.router( |
| messages, |
| task_type="vision" if screenshot else "text", |
| complexity="medium", |
| has_images=screenshot is not None, |
| ) |
| raw = response.content.strip() |
| if raw.startswith("```"): |
| raw = raw.split("```", 2)[-1] |
| if raw.startswith("json"): |
| raw = raw[4:] |
| raw = raw.strip() |
| try: |
| return json.loads(raw) |
| except json.JSONDecodeError: |
| return {"success": True, "reason": "Parsing failed, assuming success", "next_action": "continue"} |
|
|
|
|
| |
| |
| |
|
|
| class AgentMemory: |
| """Stores and retrieves past task trajectories for few-shot prompting.""" |
|
|
| def __init__(self, persist_dir: str = "./memory_db"): |
| self.persist_dir = persist_dir |
| os.makedirs(persist_dir, exist_ok=True) |
| self.collection = None |
| if HAS_CHROMA and HAS_ST: |
| self.client = chromadb.PersistentClient(path=persist_dir) |
| self.ef = SentenceTransformerEmbeddingFunction(model_name="all-MiniLM-L6-v2") |
| self.collection = self.client.get_or_create_collection( |
| name="task_memory", |
| embedding_function=self.ef, |
| ) |
| elif HAS_ST: |
| |
| self.embedder = SentenceTransformer("all-MiniLM-L6-v2") |
| self._memories: List[Dict] = [] |
| else: |
| self._memories: List[Dict] = [] |
|
|
| def embed(self, text: str) -> List[float]: |
| if HAS_ST: |
| return self.embedder.encode(text).tolist() |
| return [] |
|
|
| def add_task( |
| self, |
| task: str, |
| strategy_summary: str, |
| success: bool, |
| final_answer: str = "", |
| domain: str = "general", |
| ): |
| entry = { |
| "task": task, |
| "strategy_summary": strategy_summary, |
| "success": success, |
| "final_answer": final_answer, |
| "domain": domain, |
| "timestamp": time.time(), |
| } |
| if self.collection: |
| self.collection.add( |
| documents=[task], |
| metadatas=[entry], |
| ids=[str(uuid.uuid4())], |
| ) |
| else: |
| self._memories.append(entry) |
|
|
| def retrieve_similar( |
| self, |
| query: str, |
| n_results: int = 3, |
| filter_success: bool = True, |
| ) -> List[Dict[str, Any]]: |
| if self.collection: |
| where = {"success": True} if filter_success else None |
| results = self.collection.query( |
| query_texts=[query], |
| n_results=n_results, |
| where=where, |
| ) |
| out = [] |
| for meta in results.get("metadatas", [[]])[0]: |
| out.append(meta) |
| return out |
| else: |
| |
| query_lower = query.lower() |
| scored = [] |
| for m in self._memories: |
| score = 0 |
| if query_lower in m["task"].lower(): |
| score += 10 |
| if m.get("domain", "") in query_lower: |
| score += 5 |
| if filter_success and not m.get("success", False): |
| score -= 100 |
| scored.append((score, m)) |
| scored.sort(key=lambda x: x[0], reverse=True) |
| return [x[1] for x in scored[:n_results]] |
|
|
| def get_domain_tips(self, domain: str) -> List[str]: |
| tips = [] |
| for m in self._memories: |
| if m.get("domain") == domain and m.get("success"): |
| tips.append(m.get("strategy_summary", "")) |
| return tips[:5] |
|
|
|
|
| |
| |
| |
|
|
| class SoMPreprocessor: |
| """Overlays numbered bounding boxes on UI elements for the agent to reference by ID.""" |
|
|
| def __init__(self, use_icon_detection: bool = False): |
| self.use_icon_detection = use_icon_detection |
| self.element_registry: Dict[int, Tuple[int, int, int, int]] = {} |
| self.next_id = 1 |
|
|
| def detect_elements(self, image: Image.Image) -> List[Tuple[int, int, int, int]]: |
| """Lightweight heuristic element detection. |
| In production, replace with OmniParser or seeclick model. |
| """ |
| |
| w, h = image.size |
| boxes = [] |
| |
| |
| |
| cols, rows = 8, 6 |
| cell_w, cell_h = w // cols, h // rows |
| for r in range(rows): |
| for c in range(cols): |
| x1, y1 = c * cell_w, r * cell_h |
| x2, y2 = x1 + cell_w, y1 + cell_h |
| boxes.append((x1, y1, x2, y2)) |
| return boxes |
|
|
| def preprocess(self, image: Image.Image) -> Tuple[Image.Image, Dict[int, Tuple[int, int, int, int]]]: |
| """Return annotated image + element registry mapping ID -> bbox.""" |
| boxes = self.detect_elements(image) |
| annotated = image.copy() |
| draw = ImageDraw.Draw(annotated) |
| registry = {} |
| try: |
| font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 14) |
| except Exception: |
| font = ImageFont.load_default() |
|
|
| for i, (x1, y1, x2, y2) in enumerate(boxes, start=1): |
| registry[i] = (x1, y1, x2, y2) |
| |
| draw.rectangle([x1, y1, x2, y2], outline="#00FF00", width=2) |
| |
| label = str(i) |
| bbox = draw.textbbox((0, 0), label, font=font) |
| tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1] |
| draw.rectangle([x1, y1, x1 + tw + 4, y1 + th + 4], fill="#00FF00") |
| draw.text((x1 + 2, y1 + 2), label, fill="#000000", font=font) |
|
|
| self.element_registry = registry |
| self.next_id = len(registry) + 1 |
| return annotated, registry |
|
|
| def get_center(self, element_id: int) -> Tuple[int, int]: |
| x1, y1, x2, y2 = self.element_registry[element_id] |
| return (x1 + x2) // 2, (y1 + y2) // 2 |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class SessionFrame: |
| step: int |
| screenshot_path: Optional[str] |
| action: str |
| observation: str |
| timestamp: float |
|
|
|
|
| class SessionRecorder: |
| """Records every step for replay, GIF generation, and macro creation.""" |
|
|
| def __init__(self, session_id: str, output_dir: str = "./sessions"): |
| self.session_id = session_id |
| self.output_dir = os.path.join(output_dir, session_id) |
| os.makedirs(self.output_dir, exist_ok=True) |
| self.frames: List[SessionFrame] = [] |
| self.start_time = time.time() |
|
|
| def log_step( |
| self, |
| step: int, |
| screenshot: Optional[Image.Image], |
| action: str, |
| observation: str, |
| ): |
| path = None |
| if screenshot: |
| path = os.path.join(self.output_dir, f"step_{step:03d}.png") |
| screenshot.save(path) |
| frame = SessionFrame( |
| step=step, |
| screenshot_path=path, |
| action=action, |
| observation=observation, |
| timestamp=time.time(), |
| ) |
| self.frames.append(frame) |
| |
| with open(os.path.join(self.output_dir, "session.jsonl"), "a") as f: |
| f.write(json.dumps({ |
| "step": step, |
| "action": action, |
| "observation": observation, |
| "timestamp": frame.timestamp, |
| "screenshot": path, |
| }) + "\n") |
|
|
| def save_macro(self, name: str) -> str: |
| """Save successful trajectory as a replayable macro.""" |
| macro = { |
| "name": name, |
| "session_id": self.session_id, |
| "frames": [ |
| {"action": f.action, "observation": f.observation, "timestamp": f.timestamp} |
| for f in self.frames |
| ], |
| } |
| path = os.path.join(self.output_dir, f"macro_{name}.json") |
| with open(path, "w") as f: |
| json.dump(macro, f, indent=2) |
| return path |
|
|
| def generate_summary(self) -> Dict[str, Any]: |
| duration = time.time() - self.start_time |
| actions = [f.action for f in self.frames] |
| return { |
| "session_id": self.session_id, |
| "duration_sec": round(duration, 2), |
| "steps": len(self.frames), |
| "actions": actions, |
| } |
|
|
|
|
| |
| |
| |
|
|
| class HITLCheckpoint: |
| """Defines categories of actions that require human approval.""" |
|
|
| SENSITIVE_KEYWORDS = [ |
| "password", "credit card", "ssn", "social security", |
| "payment", "checkout", "buy", "purchase", "subscribe", |
| "delete", "remove", "uninstall", "format", |
| "send email", "send message", "post to", "tweet", |
| ] |
|
|
| def __init__(self, auto_approve: bool = False): |
| self.auto_approve = auto_approve |
| self.pending_approvals: List[Dict[str, Any]] = [] |
|
|
| def check_action(self, action: str, context: str = "") -> Tuple[bool, Optional[str]]: |
| """Returns (approved, reason). If not approved, reason explains why.""" |
| if self.auto_approve: |
| return True, None |
| action_lower = action.lower() |
| for kw in self.SENSITIVE_KEYWORDS: |
| if kw in action_lower: |
| return False, f"Sensitive action detected: '{kw}'. Requires human approval." |
| return True, None |
|
|
| def request_approval(self, action: str, screenshot_path: Optional[str] = None) -> Dict[str, Any]: |
| req = { |
| "id": str(uuid.uuid4()), |
| "action": action, |
| "screenshot": screenshot_path, |
| "status": "pending", |
| "requested_at": time.time(), |
| } |
| self.pending_approvals.append(req) |
| return req |
|
|
|
|
| |
| |
| |
|
|
| class CostTracker: |
| """Tracks per-task and cumulative costs across all model calls.""" |
|
|
| def __init__(self): |
| self.tasks: Dict[str, List[ModelCall]] = {} |
|
|
| def start_task(self, task_id: str): |
| self.tasks[task_id] = [] |
|
|
| def log_call(self, task_id: str, call: ModelCall): |
| self.tasks.setdefault(task_id, []).append(call) |
|
|
| def get_task_report(self, task_id: str) -> Dict[str, Any]: |
| calls = self.tasks.get(task_id, []) |
| total_cost = sum(c.cost_usd for c in calls) |
| total_tokens = sum(c.tokens_in + c.tokens_out for c in calls) |
| total_latency = sum(c.latency_ms for c in calls) |
| return { |
| "task_id": task_id, |
| "calls": len(calls), |
| "total_cost_usd": round(total_cost, 6), |
| "total_tokens": total_tokens, |
| "avg_latency_ms": round(total_latency / max(len(calls), 1), 2), |
| "by_model": self._aggregate(calls), |
| } |
|
|
| def _aggregate(self, calls: List[ModelCall]) -> Dict[str, Dict[str, float]]: |
| agg = {} |
| for c in calls: |
| agg.setdefault(c.model_id, {"calls": 0, "cost": 0.0, "tokens": 0}) |
| agg[c.model_id]["calls"] += 1 |
| agg[c.model_id]["cost"] += c.cost_usd |
| agg[c.model_id]["tokens"] += c.tokens_in + c.tokens_out |
| return agg |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class AgentConfig: |
| hf_token: Optional[str] = None |
| cost_budget_usd: float = 2.0 |
| use_planner: bool = True |
| use_verifier: bool = True |
| use_memory: bool = True |
| use_som: bool = True |
| use_hitl: bool = True |
| use_recorder: bool = True |
| memory_dir: str = "./memory_db" |
| auto_approve: bool = False |
|
|