Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| from src.agents.codex_big_agent import CodexBigAgent | |
| from src.agents.direct_search_agent import DirectSearchAgent | |
| from src.memory.extractor import extract_memory_candidates | |
| from src.memory.store import MemoryStore | |
| from src.ranking import rank_results | |
| from src.router import decide_route | |
| from src.sanitizer import sanitize_task | |
| from src.schemas import AgentTrace, ExternalAgentResult | |
| from src.schemas import UserMemory | |
| class SmallPersonalAgent: | |
| def __init__(self, base_dir: str | Path = "data") -> None: | |
| self.base_dir = Path(base_dir) | |
| self.store = MemoryStore(self.base_dir) | |
| self.direct_search = DirectSearchAgent() | |
| self.codex = CodexBigAgent(self.base_dir) | |
| def handle_message(self, message: str, auto_accept: bool = True) -> tuple[str, AgentTrace, ExternalAgentResult]: | |
| memory = self.store.load() | |
| extracted = extract_memory_candidates(message) | |
| added = self.store.add_candidates(extracted.candidates, auto_accept=auto_accept) | |
| memory = self.store.load() | |
| task = sanitize_task(message, memory) | |
| decision = decide_route(message) | |
| if decision.route == "direct_search": | |
| external = self.direct_search.run(task) | |
| elif decision.route == "codex_big_agent": | |
| external = self.codex.run(task) | |
| else: | |
| external = ExternalAgentResult( | |
| route="local_answer", | |
| summary=self._local_answer(message, memory, decision.reasons), | |
| sources=[], | |
| artifacts=[], | |
| errors=[], | |
| ) | |
| ranked, ranking_summary = rank_results(external, memory) | |
| final = self._compose_final(external, ranked, added, decision.route, memory) | |
| trace = AgentTrace( | |
| run_id=task.run_id, | |
| route_decision=decision, | |
| private_memory_used=[fact.text for fact in memory.accepted()], | |
| sanitized_task=task, | |
| outbound_payload_preview=task.sanitized_query if decision.route != "local_answer" else "(nothing sent out)", | |
| external_result_summary=external.summary, | |
| final_reasoning_summary=ranking_summary, | |
| ) | |
| self._save_trace(trace) | |
| self._append_engineering_log(trace, added, external) | |
| return final, trace, external | |
| def _local_answer(self, message: str, memory: UserMemory, decision_reasons: list[str]) -> str: | |
| lower = message.lower() | |
| name = self._display_name(memory) | |
| if any(term in lower for term in ("what's my name", "what is my name", "whats my name", "do you know my name", "who am i")): | |
| if name: | |
| return f"Your name is {name}." | |
| return "I do not know your name yet." | |
| if any(reason.startswith("needs_clarification") for reason in decision_reasons): | |
| return "Do you want me to answer from what I already know, or should I look it up live?" | |
| if "mm1" in lower or "privacy" in lower: | |
| return ( | |
| "I keep what I learn about you here on this machine. When something needs the internet or Codex, " | |
| "I strip out private details first, send only the task, then use your accepted memory locally to shape the answer." | |
| ) | |
| if memory.accepted(): | |
| return "I am here. Tell me what you want to figure out, build, remember, or search for." | |
| return "I am here. Tell me a little about you, or ask me to help with something." | |
| def _compose_final(self, external: ExternalAgentResult, ranked: list, added: list, route: str, memory: UserMemory) -> str: | |
| name = self._display_name(memory) | |
| greeting = f"{name}, " if name else "" | |
| if route == "local_answer" and added: | |
| name_fact = next((fact for fact in added if fact.kind == "profile" and fact.text.lower().startswith("user name:")), None) | |
| if name_fact: | |
| learned_name = name_fact.text.split(":", 1)[1].strip() | |
| return f"Nice to meet you, {learned_name}. I’ll remember that." | |
| return f"{greeting}Got it. I’ll keep that in mind." | |
| if route == "local_answer": | |
| if external.summary.startswith(("Your name is", "I do not know your name")): | |
| return external.summary | |
| return greeting + external.summary | |
| if external.errors and not external.sources and not external.artifacts: | |
| return f"{greeting}I could not get a live result for that right now. Try again in a moment, or ask it a different way." | |
| if route == "direct_search": | |
| lines = [f"{greeting}I looked that up for you."] | |
| elif route == "codex_big_agent": | |
| lines = [f"{greeting}I worked through that as a bigger task."] | |
| else: | |
| lines = [greeting + "Here is what I found."] | |
| if external.summary: | |
| lines.extend(["", self._clean_external_summary(external.summary)]) | |
| if route != "direct_search" and ranked and external.sources: | |
| lines.append("") | |
| lines.append("The most useful starting points are:") | |
| for item in ranked[:3]: | |
| lines.append(f"- {item.title}") | |
| if external.errors: | |
| lines.append("") | |
| lines.append("One caveat: I had trouble with part of the live lookup.") | |
| return "\n".join(lines) | |
| def _display_name(self, memory: UserMemory) -> str | None: | |
| for fact in memory.accepted(): | |
| if fact.kind == "profile" and fact.text.lower().startswith("user name:"): | |
| return fact.text.split(":", 1)[1].strip() or None | |
| return None | |
| def _clean_external_summary(self, summary: str) -> str: | |
| summary = summary.removeprefix("Direct search summary:").strip() | |
| summary = summary.removeprefix("Hosted under-32B model result:").strip() | |
| return summary | |
| def _save_trace(self, trace: AgentTrace) -> None: | |
| run_dir = self.base_dir / "runs" / trace.run_id | |
| run_dir.mkdir(parents=True, exist_ok=True) | |
| (run_dir / "trace.json").write_text(json.dumps(trace.to_dict(), indent=2), encoding="utf-8") | |
| def _append_engineering_log(self, trace: AgentTrace, added: list, external: ExternalAgentResult) -> None: | |
| log_path = self.base_dir / "engineering_events.jsonl" | |
| event = { | |
| "run_id": trace.run_id, | |
| "route": trace.route_decision.route, | |
| "learned_facts": [fact.to_dict() for fact in added], | |
| "private_memory_used": trace.private_memory_used, | |
| "outbound_payload_preview": trace.outbound_payload_preview, | |
| "external_errors": external.errors, | |
| "source_count": len(external.sources), | |
| "artifact_count": len(external.artifacts), | |
| } | |
| with log_path.open("a", encoding="utf-8") as handle: | |
| handle.write(json.dumps(event) + "\n") | |