from __future__ import annotations import json from pathlib import Path from src.agents.codex_big_agent import CodexBigAgent from src.agents.direct_search_agent import DirectSearchAgent from src.memory.extractor import extract_memory_candidates from src.memory.store import MemoryStore from src.ranking import rank_results from src.router import decide_route from src.sanitizer import sanitize_task from src.schemas import AgentTrace, ExternalAgentResult from src.schemas import UserMemory class SmallPersonalAgent: def __init__(self, base_dir: str | Path = "data") -> None: self.base_dir = Path(base_dir) self.store = MemoryStore(self.base_dir) self.direct_search = DirectSearchAgent() self.codex = CodexBigAgent(self.base_dir) def handle_message(self, message: str, auto_accept: bool = True) -> tuple[str, AgentTrace, ExternalAgentResult]: memory = self.store.load() extracted = extract_memory_candidates(message) added = self.store.add_candidates(extracted.candidates, auto_accept=auto_accept) memory = self.store.load() task = sanitize_task(message, memory) decision = decide_route(message) if decision.route == "direct_search": external = self.direct_search.run(task) elif decision.route == "codex_big_agent": external = self.codex.run(task) else: external = ExternalAgentResult( route="local_answer", summary=self._local_answer(message, memory, decision.reasons), sources=[], artifacts=[], errors=[], ) ranked, ranking_summary = rank_results(external, memory) final = self._compose_final(external, ranked, added, decision.route, memory) trace = AgentTrace( run_id=task.run_id, route_decision=decision, private_memory_used=[fact.text for fact in memory.accepted()], sanitized_task=task, outbound_payload_preview=task.sanitized_query if decision.route != "local_answer" else "(nothing sent out)", external_result_summary=external.summary, final_reasoning_summary=ranking_summary, ) self._save_trace(trace) self._append_engineering_log(trace, added, external) return final, trace, external def _local_answer(self, message: str, memory: UserMemory, decision_reasons: list[str]) -> str: lower = message.lower() name = self._display_name(memory) if any(term in lower for term in ("what's my name", "what is my name", "whats my name", "do you know my name", "who am i")): if name: return f"Your name is {name}." return "I do not know your name yet." if any(reason.startswith("needs_clarification") for reason in decision_reasons): return "Do you want me to answer from what I already know, or should I look it up live?" if "mm1" in lower or "privacy" in lower: return ( "I keep what I learn about you here on this machine. When something needs the internet or Codex, " "I strip out private details first, send only the task, then use your accepted memory locally to shape the answer." ) if memory.accepted(): return "I am here. Tell me what you want to figure out, build, remember, or search for." return "I am here. Tell me a little about you, or ask me to help with something." def _compose_final(self, external: ExternalAgentResult, ranked: list, added: list, route: str, memory: UserMemory) -> str: name = self._display_name(memory) greeting = f"{name}, " if name else "" if route == "local_answer" and added: name_fact = next((fact for fact in added if fact.kind == "profile" and fact.text.lower().startswith("user name:")), None) if name_fact: learned_name = name_fact.text.split(":", 1)[1].strip() return f"Nice to meet you, {learned_name}. I’ll remember that." return f"{greeting}Got it. I’ll keep that in mind." if route == "local_answer": if external.summary.startswith(("Your name is", "I do not know your name")): return external.summary return greeting + external.summary if external.errors and not external.sources and not external.artifacts: return f"{greeting}I could not get a live result for that right now. Try again in a moment, or ask it a different way." if route == "direct_search": lines = [f"{greeting}I looked that up for you."] elif route == "codex_big_agent": lines = [f"{greeting}I worked through that as a bigger task."] else: lines = [greeting + "Here is what I found."] if external.summary: lines.extend(["", self._clean_external_summary(external.summary)]) if route != "direct_search" and ranked and external.sources: lines.append("") lines.append("The most useful starting points are:") for item in ranked[:3]: lines.append(f"- {item.title}") if external.errors: lines.append("") lines.append("One caveat: I had trouble with part of the live lookup.") return "\n".join(lines) def _display_name(self, memory: UserMemory) -> str | None: for fact in memory.accepted(): if fact.kind == "profile" and fact.text.lower().startswith("user name:"): return fact.text.split(":", 1)[1].strip() or None return None def _clean_external_summary(self, summary: str) -> str: summary = summary.removeprefix("Direct search summary:").strip() summary = summary.removeprefix("Hosted under-32B model result:").strip() return summary def _save_trace(self, trace: AgentTrace) -> None: run_dir = self.base_dir / "runs" / trace.run_id run_dir.mkdir(parents=True, exist_ok=True) (run_dir / "trace.json").write_text(json.dumps(trace.to_dict(), indent=2), encoding="utf-8") def _append_engineering_log(self, trace: AgentTrace, added: list, external: ExternalAgentResult) -> None: log_path = self.base_dir / "engineering_events.jsonl" event = { "run_id": trace.run_id, "route": trace.route_decision.route, "learned_facts": [fact.to_dict() for fact in added], "private_memory_used": trace.private_memory_used, "outbound_payload_preview": trace.outbound_payload_preview, "external_errors": external.errors, "source_count": len(external.sources), "artifact_count": len(external.artifacts), } with log_path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(event) + "\n")