from __future__ import annotations import json import re from collections import Counter from dataclasses import dataclass from pathlib import Path from typing import Any, Callable, Dict, Iterable, List, Optional from memory_pool import PersistentMemoryPool from browser_tools import run_browser_agentic_sequence from web_search import build_retrieved_context from workspace import Workspace def _compact(text: str, limit: int = 1200) -> str: text = re.sub(r"\s+", " ", (text or "")).strip() return text[:limit] def _normalize_name(name: str) -> str: return re.sub(r"[^a-z0-9]+", "_", (name or "").strip().lower()).strip("_") @dataclass class ToolSpec: name: str purpose: str handler: Callable[..., Dict[str, Any]] class PlanningLatch: def run(self, prompt: str, workspace: Optional[Workspace] = None) -> Dict[str, Any]: lines = [line.strip(" -") for line in (prompt or "").splitlines() if line.strip()] summary = _compact(lines[0] if lines else prompt, 220) plan = [ "Identify the request and required deliverable.", "Select the minimum tool set needed for retrieval, validation, and synthesis.", "Draft the answer, then run a final consistency/self-heal pass.", ] if workspace is not None: objective = workspace.read_objective().strip() if objective: plan.insert(1, f"Respect workspace objective: {_compact(objective, 180)}") return { "tool": "planning_latch", "summary": summary, "plan": plan, "status": "latched", } class Validator: def run(self, text: str, required_tools: Optional[Iterable[str]] = None) -> Dict[str, Any]: issues: List[str] = [] repaired = (text or "").strip() if not repaired: issues.append("empty_output") if repaired and repaired[-1] not in ".!?": repaired += "." if "I don't know" in repaired: issues.append("low_confidence_phrase") repaired = repaired.replace("I don't know", "I do not know yet") required = [_normalize_name(tool) for tool in (required_tools or []) if tool] mentioned = {_normalize_name(token) for token in re.findall(r"[A-Za-z_][A-Za-z0-9_]+", repaired)} missing = [tool for tool in required if tool not in mentioned] if missing: issues.append(f"missing_tool_mentions:{','.join(missing)}") repaired += " Tools considered: " + ", ".join(missing) + "." return { "tool": "validator", "issues": issues, "output": repaired, "status": "repaired" if issues else "clean", } class HarmonyEngine: def run(self, inputs: Iterable[str]) -> Dict[str, Any]: cleaned = [_compact(str(item), 240) for item in inputs if str(item).strip()] return { "tool": "harmony_engine", "status": "synthesized", "output": " | ".join(cleaned[:6]), } class FeelingQualia: def run(self, prompt: str) -> Dict[str, Any]: lower = (prompt or "").lower() tone = "neutral" if any(token in lower for token in ("urgent", "immediately", "asap")): tone = "urgent" elif any(token in lower for token in ("carefully", "safely", "compliance")): tone = "careful" return { "tool": "feeling_qualia", "tone": tone, "status": "classified", } class ToolRegistry: def __init__( self, workspace_root: Optional[str] = None, *, browser_tool: Optional[Any] = None, ): self.workspace = Workspace(Path(workspace_root or ".").resolve()) self.memory_pool = PersistentMemoryPool(self.workspace.root / "state" / "memory_pool.jsonl") self.browser_tool = browser_tool self._planning = PlanningLatch() self._validator = Validator() self._harmony = HarmonyEngine() self._feeling = FeelingQualia() self._tools: Dict[str, ToolSpec] = {} self._register_defaults() def _register(self, name: str, purpose: str, handler: Callable[..., Dict[str, Any]]) -> None: self._tools[_normalize_name(name)] = ToolSpec( name=_normalize_name(name), purpose=purpose, handler=handler, ) def _register_defaults(self) -> None: self._register("phillnet2", "system identity and orchestration alias", self._tool_system_alias) self._register("system_core", "system orchestration alias", self._tool_system_alias) self._register("agent_node", "swarm node orchestration alias", self._tool_system_alias) self._register("search", "alias for compact external retrieval", self._tool_search_alias) self._register("planning_latch", "lock a concise internal plan and keep execution aligned", self._tool_planning_latch) self._register("validator", "self-heal malformed, weak, or drifting drafts before finalization", self._tool_validator) self._register("workspace_search", "search local workspace, files, plans, and code context", self._tool_workspace_search) self._register("web_search", "retrieve compact external verification and current information", self._tool_web_search) self._register("memory_recall", "recover compact autobiographical or workspace memory context", self._tool_memory_recall) self._register("memory_pool", "store and retrieve persistent learned memory with lightweight rag lookup", self._tool_memory_pool) self._register("harmony_engine", "perform final coherence synthesis when multiple ideas need alignment", self._tool_harmony_engine) self._register("feeling_qualia", "classify user tone and response pressure before drafting", self._tool_feeling_qualia) self._register("browser_observe", "observe the active browser page before acting", self._tool_browser_observe) self._register("browser_navigate", "open or search for a browser target", self._tool_browser_navigate) self._register("browser_click", "move cursor to a target and click it", self._tool_browser_click) self._register("browser_scroll", "move cursor into the viewport and scroll deliberately", self._tool_browser_scroll) self._register("browser_type", "focus a browser input and type text", self._tool_browser_type) self._register("browser_verify", "verify the browser page state after an action", self._tool_browser_verify) self._register("browser_mode", "browse live pages with structured browser observations", self._tool_browser_mode) self._register("vision_mode", "inspect the current browser page with OCR and structural cues", self._tool_vision_mode) def list_tools(self) -> List[Dict[str, str]]: return [{"name": spec.name, "purpose": spec.purpose} for spec in self._tools.values()] def has_tool(self, name: str) -> bool: return _normalize_name(name) in self._tools def audit_dataset_tools(self, rows: Iterable[Dict[str, Any]]) -> Dict[str, Any]: missing = set() observed = Counter() total_rows = 0 for row in rows: total_rows += 1 for item in row.get("tool_surface") or []: if isinstance(item, dict) and item.get("name"): name = _normalize_name(str(item["name"])) observed[name] += 1 if not self.has_tool(name): missing.add(name) for name in row.get("recommended_tools") or []: normalized = _normalize_name(str(name)) observed[normalized] += 1 if not self.has_tool(normalized): missing.add(normalized) return { "rows": total_rows, "observed_tools": dict(sorted(observed.items())), "missing_tools": sorted(missing), "status": "ok" if not missing else "missing_tools", } def audit_embedded_tool_calls(self, rows: Iterable[Dict[str, Any]]) -> Dict[str, Any]: observed = Counter() missing = set() total_rows = 0 for row in rows: total_rows += 1 blobs = [str(row.get(key, "")) for key in ("prompt", "response", "source_prompt", "browser_observation")] text = "\n".join(blobs) for match in re.finditer(r'"name"\s*:\s*"([^"]+)"', text): name = _normalize_name(match.group(1)) observed[name] += 1 if not self.has_tool(name): missing.add(name) return { "rows": total_rows, "observed_embedded_calls": dict(sorted(observed.items())), "missing_embedded_calls": sorted(missing), "status": "ok" if not missing else "missing_embedded_calls", } def get_tool_surface_text(self) -> str: return "\n".join(f"- {spec.name}: {spec.purpose}" for spec in self._tools.values()) def resolve(self, name: str) -> ToolSpec: normalized = _normalize_name(name) if normalized not in self._tools: raise KeyError(f"Unknown tool: {name}") return self._tools[normalized] def call_tool(self, name: str, **kwargs: Any) -> Dict[str, Any]: return self.resolve(name).handler(**kwargs) def scaffold_prompt( self, prompt: str, *, recommended_tools: Optional[Iterable[str]] = None, tool_surface: Optional[Iterable[Dict[str, Any]]] = None, ) -> str: tools = [_normalize_name(tool) for tool in (recommended_tools or []) if tool] tool_desc = [] for item in tool_surface or self.list_tools(): if isinstance(item, dict): tool_desc.append(f"- {_normalize_name(str(item.get('name', '')))}: {item.get('purpose', '')}".rstrip()) sections = [ "[SYSTEM TOOLS]", "\n".join(tool_desc) or self.get_tool_surface_text(), "", "[RECOMMENDED TOOLS]", ", ".join(tools) if tools else "planning_latch, validator", "", "[USER PROMPT]", (prompt or "").strip(), ] return "\n".join(sections).strip() def self_heal_response(self, prompt: str, draft: str, recommended_tools: Optional[Iterable[str]] = None) -> Dict[str, Any]: plan = self._tool_planning_latch(prompt=prompt) feeling = self._tool_feeling_qualia(prompt=prompt) validated = self._tool_validator(text=draft, required_tools=recommended_tools) harmony = self._tool_harmony_engine(inputs=[ json.dumps(plan, ensure_ascii=False), json.dumps(feeling, ensure_ascii=False), validated.get("output", ""), ]) output = validated.get("output", "") if harmony.get("output"): output = f"{output}\n\n[coherence] {harmony['output']}" return { "plan": plan, "feeling": feeling, "validation": validated, "harmony": harmony, "output": output.strip(), } def _tool_planning_latch(self, prompt: str = "", **_: Any) -> Dict[str, Any]: return self._planning.run(prompt=prompt, workspace=self.workspace) def _tool_validator(self, text: str = "", required_tools: Optional[Iterable[str]] = None, **_: Any) -> Dict[str, Any]: return self._validator.run(text=text, required_tools=required_tools) def _tool_workspace_search(self, query: str = "", max_results: int = 5, **_: Any) -> Dict[str, Any]: results = self.workspace.get_relevant_files(query or "", top_k=max_results) compact_results = [] for item in results: if isinstance(item, dict): compact_results.append( { "file": item.get("file"), "score": item.get("score"), "matches": item.get("matches", [])[:6], } ) else: compact_results.append({"file": str(item), "score": 1.0, "matches": []}) return { "tool": "workspace_search", "query": query, "results": compact_results, "status": "ok", } def _tool_web_search(self, query: str = "", max_results: int = 5, **_: Any) -> Dict[str, Any]: context, results = build_retrieved_context( query, max_results=max_results, use_browser=self.browser_tool is not None, browser_tool=self.browser_tool, ) return { "tool": "web_search", "query": query, "results": results, "context": context, "status": "ok", } def _tool_search_alias(self, query: str = "", arguments: Optional[Dict[str, Any]] = None, **kwargs: Any) -> Dict[str, Any]: payload = dict(arguments or {}) if query and "query" not in payload: payload["query"] = query payload.update({k: v for k, v in kwargs.items() if k not in payload}) result = self._tool_web_search(query=str(payload.get("query", "")), max_results=int(payload.get("max_results", 5))) result["tool"] = "search" result["aliased_to"] = "web_search" return result def _tool_system_alias(self, prompt: str = "", query: str = "", arguments: Optional[Dict[str, Any]] = None, **_: Any) -> Dict[str, Any]: text = prompt or query or str(arguments or "") return { "tool": "system_alias", "status": "ok", "summary": _compact(text, 240), } def _tool_memory_recall(self, query: str = "", user_prompt: str = "", **_: Any) -> Dict[str, Any]: recall = self.workspace.get_recall_context(query=query, user_prompt=user_prompt or query, require_objective_match=False) pooled = self.memory_pool.build_context(query or user_prompt, max_results=4, max_chars=800) combined = recall.strip() if pooled: combined = f"{combined}\n\n[PERSISTENT MEMORY]\n{pooled}".strip() return { "tool": "memory_recall", "query": query, "context": combined, "status": "ok", } def _tool_memory_pool( self, query: str = "", text: str = "", source: str = "", action: str = "search", reward: float = 0.0, tags: Optional[Iterable[str]] = None, **_: Any, ) -> Dict[str, Any]: if action == "add": self.memory_pool.add( query=query, text=text, source=source or "tool_registry", reward=float(reward), tags=[str(tag) for tag in (tags or [])], ) return { "tool": "memory_pool", "action": "add", "status": "ok", "count": len(self.memory_pool.items), } results = self.memory_pool.search(query=query, max_results=5) return { "tool": "memory_pool", "action": "search", "query": query, "results": results, "status": "ok", } def _tool_harmony_engine(self, inputs: Optional[Iterable[str]] = None, **_: Any) -> Dict[str, Any]: return self._harmony.run(inputs or []) def _tool_feeling_qualia(self, prompt: str = "", **_: Any) -> Dict[str, Any]: return self._feeling.run(prompt) def _browser_action(self, action: str, *, prompt: str = "", query: str = "", **kwargs: Any) -> Dict[str, Any]: if self.browser_tool is None: return { "tool": f"browser_{action}", "status": "unavailable", "error": "browser tool not configured", } text = query or prompt payload = self.browser_tool.run(action=action, query=text, text=text, include_ocr=True, **kwargs) return { "tool": f"browser_{action}", "status": "ok", "payload": payload, } def _tool_browser_observe(self, prompt: str = "", query: str = "", **kwargs: Any) -> Dict[str, Any]: return self._browser_action("analyze", prompt=prompt, query=query, **kwargs) def _tool_browser_navigate(self, prompt: str = "", query: str = "", **kwargs: Any) -> Dict[str, Any]: text = query or prompt lowered = text.lower() action = "open" if re.search(r"https?://|(?:[A-Za-z0-9-]+\.)+[A-Za-z]{2,}", text) else "search" if action == "open": match = re.search(r"https?://\S+|(?:[A-Za-z0-9-]+\.)+[A-Za-z]{2,}", text) kwargs.setdefault("url", match.group(0).rstrip(".,;:!?") if match else text) text = "" return self._browser_action(action, prompt=prompt, query=text, **kwargs) def _tool_browser_click(self, prompt: str = "", query: str = "", **kwargs: Any) -> Dict[str, Any]: text = query or prompt kwargs.setdefault("text_target", text) return self._browser_action("click", prompt=prompt, query=text, **kwargs) def _tool_browser_scroll(self, prompt: str = "", query: str = "", direction: str = "down", **kwargs: Any) -> Dict[str, Any]: lowered = (query or prompt or "").lower() if any(token in lowered for token in ("up", "top", "previous")): direction = "up" return self._browser_action("scroll", prompt=prompt, query=query, direction=direction, **kwargs) def _tool_browser_type(self, prompt: str = "", query: str = "", text: str = "", **kwargs: Any) -> Dict[str, Any]: text = text or query or prompt return self._browser_action("type", prompt=prompt, query=query, text=text, **kwargs) def _tool_browser_verify(self, prompt: str = "", query: str = "", **kwargs: Any) -> Dict[str, Any]: return self._browser_action("analyze", prompt=prompt, query=query, **kwargs) def _tool_browser_mode(self, prompt: str = "", **_: Any) -> Dict[str, Any]: if self.browser_tool is None: return { "tool": "browser_mode", "status": "unavailable", "error": "browser tool not configured", } prompt_lower = (prompt or "").lower() wants_deeper_pass = any( token in prompt_lower for token in ("click", "scroll", "navigate", "open", "visit", "form", "playwright", "browser", "search") ) payload = run_browser_agentic_sequence( prompt, self.browser_tool.run, max_steps=5 if wants_deeper_pass else 3, observation_chars=1500 if wants_deeper_pass else 1200, ) return { "tool": "browser_mode", "status": "ok", "payload": payload, } def _tool_vision_mode(self, **_: Any) -> Dict[str, Any]: if self.browser_tool is None: return { "tool": "vision_mode", "status": "unavailable", "error": "browser tool not configured", } payload = self.browser_tool.run(action="vision") return { "tool": "vision_mode", "status": "ok", "payload": payload, }