AgentOS-Core-V2 / agentos_core_v2.py
Tpayne101's picture
Create agentos_core_v2.py
dc80564 verified
import re
from memory import MemoryManager
from context_graph import ContextGraph
from telemetry import Telemetry
from identity_core import create_agent_identity
class AgentCore:
def __init__(self, model="gpt-4o-mini"):
self.agent_id = create_agent_identity()
self.telemetry = Telemetry(self.agent_id)
self.memory = MemoryManager(self.agent_id)
self.context = ContextGraph()
self.model = model
print(f"[INIT] Agent {self.agent_id} initialized with model {self.model}")
def categorize(self, prompt):
prompt_l = prompt.lower()
if "goal" in prompt_l or "want" in prompt_l or "plan" in prompt_l:
return "goals"
elif "friend" in prompt_l or "person" in prompt_l or "met" in prompt_l:
return "people"
elif "favorite" in prompt_l or "like" in prompt_l:
return "preferences"
elif "city" in prompt_l or "food" in prompt_l or "color" in prompt_l:
return "personal"
return "general"
def run(self, prompt):
self.telemetry.log("run_start", "in_progress", {"prompt": prompt})
try:
category = self.categorize(prompt)
# Store new fact
if re.match(r"my |i |i'm |i am |i like", prompt.lower()):
response = f"Got it — I'll remember that about your {category}."
self.context.link_context(self.agent_id, category, prompt, response)
self.memory.save({"prompt": prompt, "response": response})
else:
recall = self.context.query_context(self.agent_id, category)
recall_text = "; ".join(recall)
response = f"Here’s what I know from your {category}: {recall_text}"
self.telemetry.log("run_complete", "success", {"response": response})
print(f"[RUN] {response}")
return response
except Exception as e:
self.telemetry.log("run_failed", "error", {"error": str(e)})
raise e