mm1 / src /agents /small_agent.py
TheRealHubertus's picture
Upload 49 files
82372e5 verified
Raw
History Blame Contribute Delete
6.96 kB
from __future__ import annotations
import json
from pathlib import Path
from src.agents.codex_big_agent import CodexBigAgent
from src.agents.direct_search_agent import DirectSearchAgent
from src.memory.extractor import extract_memory_candidates
from src.memory.store import MemoryStore
from src.ranking import rank_results
from src.router import decide_route
from src.sanitizer import sanitize_task
from src.schemas import AgentTrace, ExternalAgentResult
from src.schemas import UserMemory
class SmallPersonalAgent:
def __init__(self, base_dir: str | Path = "data") -> None:
self.base_dir = Path(base_dir)
self.store = MemoryStore(self.base_dir)
self.direct_search = DirectSearchAgent()
self.codex = CodexBigAgent(self.base_dir)
def handle_message(self, message: str, auto_accept: bool = True) -> tuple[str, AgentTrace, ExternalAgentResult]:
memory = self.store.load()
extracted = extract_memory_candidates(message)
added = self.store.add_candidates(extracted.candidates, auto_accept=auto_accept)
memory = self.store.load()
task = sanitize_task(message, memory)
decision = decide_route(message)
if decision.route == "direct_search":
external = self.direct_search.run(task)
elif decision.route == "codex_big_agent":
external = self.codex.run(task)
else:
external = ExternalAgentResult(
route="local_answer",
summary=self._local_answer(message, memory, decision.reasons),
sources=[],
artifacts=[],
errors=[],
)
ranked, ranking_summary = rank_results(external, memory)
final = self._compose_final(external, ranked, added, decision.route, memory)
trace = AgentTrace(
run_id=task.run_id,
route_decision=decision,
private_memory_used=[fact.text for fact in memory.accepted()],
sanitized_task=task,
outbound_payload_preview=task.sanitized_query if decision.route != "local_answer" else "(nothing sent out)",
external_result_summary=external.summary,
final_reasoning_summary=ranking_summary,
)
self._save_trace(trace)
self._append_engineering_log(trace, added, external)
return final, trace, external
def _local_answer(self, message: str, memory: UserMemory, decision_reasons: list[str]) -> str:
lower = message.lower()
name = self._display_name(memory)
if any(term in lower for term in ("what's my name", "what is my name", "whats my name", "do you know my name", "who am i")):
if name:
return f"Your name is {name}."
return "I do not know your name yet."
if any(reason.startswith("needs_clarification") for reason in decision_reasons):
return "Do you want me to answer from what I already know, or should I look it up live?"
if "mm1" in lower or "privacy" in lower:
return (
"I keep what I learn about you here on this machine. When something needs the internet or Codex, "
"I strip out private details first, send only the task, then use your accepted memory locally to shape the answer."
)
if memory.accepted():
return "I am here. Tell me what you want to figure out, build, remember, or search for."
return "I am here. Tell me a little about you, or ask me to help with something."
def _compose_final(self, external: ExternalAgentResult, ranked: list, added: list, route: str, memory: UserMemory) -> str:
name = self._display_name(memory)
greeting = f"{name}, " if name else ""
if route == "local_answer" and added:
name_fact = next((fact for fact in added if fact.kind == "profile" and fact.text.lower().startswith("user name:")), None)
if name_fact:
learned_name = name_fact.text.split(":", 1)[1].strip()
return f"Nice to meet you, {learned_name}. I’ll remember that."
return f"{greeting}Got it. I’ll keep that in mind."
if route == "local_answer":
if external.summary.startswith(("Your name is", "I do not know your name")):
return external.summary
return greeting + external.summary
if external.errors and not external.sources and not external.artifacts:
return f"{greeting}I could not get a live result for that right now. Try again in a moment, or ask it a different way."
if route == "direct_search":
lines = [f"{greeting}I looked that up for you."]
elif route == "codex_big_agent":
lines = [f"{greeting}I worked through that as a bigger task."]
else:
lines = [greeting + "Here is what I found."]
if external.summary:
lines.extend(["", self._clean_external_summary(external.summary)])
if route != "direct_search" and ranked and external.sources:
lines.append("")
lines.append("The most useful starting points are:")
for item in ranked[:3]:
lines.append(f"- {item.title}")
if external.errors:
lines.append("")
lines.append("One caveat: I had trouble with part of the live lookup.")
return "\n".join(lines)
def _display_name(self, memory: UserMemory) -> str | None:
for fact in memory.accepted():
if fact.kind == "profile" and fact.text.lower().startswith("user name:"):
return fact.text.split(":", 1)[1].strip() or None
return None
def _clean_external_summary(self, summary: str) -> str:
summary = summary.removeprefix("Direct search summary:").strip()
summary = summary.removeprefix("Hosted under-32B model result:").strip()
return summary
def _save_trace(self, trace: AgentTrace) -> None:
run_dir = self.base_dir / "runs" / trace.run_id
run_dir.mkdir(parents=True, exist_ok=True)
(run_dir / "trace.json").write_text(json.dumps(trace.to_dict(), indent=2), encoding="utf-8")
def _append_engineering_log(self, trace: AgentTrace, added: list, external: ExternalAgentResult) -> None:
log_path = self.base_dir / "engineering_events.jsonl"
event = {
"run_id": trace.run_id,
"route": trace.route_decision.route,
"learned_facts": [fact.to_dict() for fact in added],
"private_memory_used": trace.private_memory_used,
"outbound_payload_preview": trace.outbound_payload_preview,
"external_errors": external.errors,
"source_count": len(external.sources),
"artifact_count": len(external.artifacts),
}
with log_path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(event) + "\n")