meta-r2 / agent /memory.py
github-actions[bot]
Deploy Space snapshot
ddbc1ba
import os
import chromadb
from sentence_transformers import SentenceTransformer
import uuid
import math
from datetime import datetime
from collections import defaultdict
from typing import Optional
class LifeStackMemory:
def __init__(self, silent: bool = False, path: str = "./lifestack_memory"):
self.silent = silent
try:
self.client = chromadb.PersistentClient(path=path)
self.collection = self.client.get_or_create_collection(name='decisions')
self.traj_collection = self.client.get_or_create_collection(name='trajectories')
self.feedback_collection = self.client.get_or_create_collection(name='feedback')
except Exception as e:
if not self.silent:
print(f"⚠️ Memory persistence failed ({e}). Falling back to episodic (in-memory) mode.")
self.client = chromadb.Client()
self.collection = self.client.get_or_create_collection(name='decisions')
self.traj_collection = self.client.get_or_create_collection(name='trajectories')
self.feedback_collection = self.client.get_or_create_collection(name='feedback')
self.encoder = self._load_encoder()
if not self.silent:
print("Memory system initialized")
# Auto-hydrate if empty
if self.collection.count() == 0:
self._hydrate_from_preseeded()
def _hydrate_from_preseeded(self):
import json
sources = ["./data/preseeded_memory.json", "./data/preseeded_memory_p1.json", "./data/preseeded_memory_p2.json"]
if not self.silent:
print(f"🧬 Empty memory detected. Hydrating from partitioned volumes...")
total_decisions = 0
for path in sources:
if not os.path.exists(path):
continue
try:
with open(path, 'r') as f:
data = json.load(f)
# Hydrate decisions
d = data.get("decisions", {})
if d.get("ids"):
self.collection.add(
ids=d["ids"],
documents=d["documents"],
metadatas=d["metadatas"],
embeddings=d["embeddings"]
)
total_decisions += len(d["ids"])
except Exception as e:
if not self.silent:
print(f"⚠️ Hydration failed for {path}: {e}")
if not self.silent:
print(f"✅ Hydration complete: {total_decisions} memories restored.")
def _load_encoder(self):
try:
return SentenceTransformer('all-MiniLM-L6-v2', local_files_only=True)
except Exception as exc:
if not self.silent:
print(f"Falling back to local hash embeddings: {exc}")
return None
def _embed_text(self, text: str) -> list[float]:
if self.encoder is not None:
return self.encoder.encode(text).tolist()
import zlib
buckets = [0.0] * 384
for token in text.lower().split():
idx = zlib.adler32(token.encode()) % len(buckets)
buckets[idx] += 1.0
norm = math.sqrt(sum(v * v for v in buckets)) or 1.0
return [v / norm for v in buckets]
def store_decision(
self,
conflict_title: str,
action_type: str,
target_domain: str,
reward: float,
metrics_snapshot: dict,
reasoning: str,
trajectory: list[dict] = None,
route_outcome: str = None,
episode_id: str = None,
personality_type: str = None
) -> None:
"""Stores individual decision for longitudinal tracking."""
text = f"{conflict_title} Action: {action_type} Domain: {target_domain} Reward: {reward:.2f} {reasoning[:100]}"
embedding = self._embed_text(text)
doc_id = str(uuid.uuid4())
self.collection.add(
ids=[doc_id],
embeddings=[embedding],
documents=[text],
metadatas=[{
"conflict_title": conflict_title,
"action_type": action_type,
"target_domain": target_domain,
"reward": float(reward),
"reasoning": reasoning,
"route_outcome": route_outcome or "",
"episode_id": episode_id or "",
"personality_type": personality_type or "unknown",
"timestamp": datetime.now().isoformat()
}]
)
def store_trajectory(
self,
conflict_title: str = None,
route_taken: str = None,
total_reward: float = 0.0,
metrics_diff_str: str = None,
reasoning: str = None,
task_id: str = None,
trajectory_summary: dict = None
) -> None:
"""Stores a full trajectory summary."""
if trajectory_summary is not None and task_id is not None:
import json
text = f"Task: {task_id} Route: {route_taken} Reward: {total_reward:.2f} Hits: {len(trajectory_summary.get('milestones_hit', []))}"
embedding = self._embed_text(text)
doc_id = str(uuid.uuid4())
self.traj_collection.add(
ids=[doc_id],
embeddings=[embedding],
documents=[text],
metadatas=[{
"task_id": task_id,
"route_taken": route_taken,
"reward": total_reward,
"summary": json.dumps(trajectory_summary),
"timestamp": datetime.now().isoformat()
}]
)
if not self.silent:
print(f"Stored task trajectory: {route_taken} (reward: {total_reward:.2f})")
return
# Fallback to older signature logic
text = f"{conflict_title} Route: {route_taken} Diff: {metrics_diff_str} {reasoning[:100]}"
embedding = self._embed_text(text)
doc_id = str(uuid.uuid4())
self.collection.add(
ids=[doc_id],
embeddings=[embedding],
documents=[text],
metadatas=[{
"conflict_title": conflict_title,
"route_taken": route_taken,
"metrics_diff": metrics_diff_str,
"reward": total_reward,
"reasoning": reasoning,
"timestamp": datetime.now().isoformat()
}]
)
if not self.silent:
print(f"Stored trajectory fallback: {route_taken} (reward: {total_reward:.2f})")
def store_feedback(self, feedback) -> None:
"""Stores OutcomeFeedback linked to a specific episode."""
import json
text = f"Episode: {feedback.episode_id} Effectiveness: {feedback.overall_effectiveness} Resolution: {feedback.resolution_time_hours}h"
embedding = self._embed_text(text)
doc_id = f"fb_{feedback.episode_id}"
self.feedback_collection.add(
ids=[doc_id],
embeddings=[embedding],
documents=[text],
metadatas=[{
"episode_id": feedback.episode_id,
"effectiveness": feedback.overall_effectiveness,
"domains_improved": json.dumps(feedback.domains_improved),
"domains_worsened": json.dumps(feedback.domains_worsened),
"unexpected_effects": feedback.unexpected_effects,
"resolution_time": feedback.resolution_time_hours,
"timestamp": feedback.submitted_at.isoformat()
}]
)
if not self.silent:
print(f"Stored human feedback for episode {feedback.episode_id}")
def retrieve_feedback(self, episode_id: str) -> Optional[dict]:
"""Retrieves feedback for a specific episode."""
import json
doc_id = f"fb_{episode_id}"
results = self.feedback_collection.get(ids=[doc_id])
if not results['metadatas']:
return None
meta = results['metadatas'][0]
# Deserialize lists
meta["domains_improved"] = json.loads(meta["domains_improved"])
meta["domains_worsened"] = json.loads(meta["domains_worsened"])
return meta
def retrieve_similar_trajectories(self, task_domain: str, current_world: dict, n: int = 3) -> list[dict]:
"""Retrieve similar trajectories based on task domain and current world state."""
import json
if self.traj_collection.count() == 0:
return []
sorted_metrics = sorted(current_world.items(), key=lambda x: x[1] if isinstance(x[1], (int, float)) else 0)
top_stressed = " ".join(f"{k}:{v}" for k, v in sorted_metrics[:3])
query_text = f"TaskDomain: {task_domain} {top_stressed}"
query_embedding = self._embed_text(query_text)
results = self.traj_collection.query(
query_embeddings=[query_embedding],
n_results=min(n, self.traj_collection.count())
)
output = []
for i, meta in enumerate(results['metadatas'][0]):
output.append({
"task_id": meta.get("task_id", ""),
"route_taken": meta.get("route_taken", ""),
"reward": meta.get("reward", 0.0),
"summary": json.loads(meta.get("summary", "{}")),
})
return output
def retrieve_similar(self, conflict_title: str, current_metrics: dict, n: int = 3,
personality_type: str = None) -> list[dict]:
"""Retrieves the n most similar past high-reward decisions using semantic search.
When personality_type is provided, tries personality-matched results first and
fills remaining slots from the global pool so we always return n results.
"""
if self.collection.count() == 0:
return []
sorted_metrics = sorted(current_metrics.items(), key=lambda x: x[1])
top_stressed = " ".join(f"{k}:{v:.0f}" for k, v in sorted_metrics[:3])
query_text = f"{conflict_title} {top_stressed}"
query_embedding = self._embed_text(query_text)
fetch = min(n * 4, self.collection.count())
def _parse_results(results) -> list[dict]:
output = []
for i, meta in enumerate(results['metadatas'][0]):
if meta.get("reward", 0.0) < 0.05:
continue
distance = results['distances'][0][i]
output.append({
"route_taken": meta.get("route_taken", ""),
"action_type": meta.get("action_type", ""),
"target_domain": meta.get("target_domain", ""),
"metrics_diff": meta.get("metrics_diff", ""),
"reward": meta.get("reward", 0.0),
"reasoning": meta.get("reasoning", ""),
"episode_id": meta.get("episode_id", ""),
"personality_type": meta.get("personality_type", "unknown"),
"similarity_score": round(1.0 / (1.0 + distance), 4),
})
return output
# --- personality-matched pass ---
matched: list[dict] = []
if personality_type and personality_type != "unknown":
try:
r = self.collection.query(
query_embeddings=[query_embedding],
n_results=fetch,
where={"personality_type": {"$eq": personality_type}},
)
matched = _parse_results(r)[:n]
except Exception:
pass # personality filter unsupported or empty — fall through
# --- global fill-up pass ---
if len(matched) < n:
r = self.collection.query(
query_embeddings=[query_embedding],
n_results=fetch,
)
seen_episodes = {m["episode_id"] for m in matched}
for entry in _parse_results(r):
if len(matched) >= n:
break
if entry["episode_id"] not in seen_episodes:
matched.append(entry)
seen_episodes.add(entry["episode_id"])
return matched[:n]
def build_few_shot_prompt(self, conflict_title: str, current_metrics: dict,
personality_type: str = None) -> str:
"""Formats retrieved memories into a few-shot prompt block for the LLM."""
memories = self.retrieve_similar(conflict_title, current_metrics, personality_type=personality_type)
if not memories:
return ""
lines = ["--- PAST EXPERIENCE & HUMAN VERIFICATION ---"]
for m in memories:
episode_id = m.get("episode_id")
fb_context = ""
if episode_id:
fb = self.retrieve_feedback(episode_id)
if fb:
fb_context = f"\n HUMAN FEEDBACK: Rated {fb['effectiveness']}/10. Notes: {fb['unexpected_effects']}"
p_tag = f" [{m['personality_type']}]" if m.get("personality_type", "unknown") != "unknown" else ""
short_reason = m['reasoning'][:120]
line = (f"- Action Taken: [{m['action_type'].upper()}] on {m['target_domain'].upper()}{p_tag}\n"
f" Agent's Initial Reasoning: {short_reason}{fb_context}")
lines.append(line)
return "\n".join(lines)
def get_stats(self) -> dict:
"""Returns memory stats: total count, average reward, and route details."""
if self.collection.count() == 0:
return {"total_memories": 0, "average_reward": 0.0, "by_action_type": {}}
all_records = self.collection.get(include=["metadatas"])
metadatas = all_records["metadatas"]
total = len(metadatas)
avg_reward = sum(m.get("reward", 0.0) for m in metadatas) / total
by_action_type = defaultdict(int)
for m in metadatas:
action_type = m.get("action_type")
if action_type:
by_action_type[action_type] += 1
return {
"total_memories": total,
"average_reward": round(avg_reward, 3),
"by_action_type": dict(by_action_type)
}
def main():
memory = LifeStackMemory()
# --- Synthetic Decisions: mix of high and low reward ---
synthetic = [
{
"conflict_title": "Friday 6PM",
"action_type": "negotiate",
"target_domain": "career",
"reward": 0.72,
"metrics_snapshot": {"career.workload": 100, "mental_wellbeing.stress_level": 95},
"reasoning": "Negotiating the deadline directly reduced workload pressure quickly."
},
{
"conflict_title": "Friday 6PM",
"action_type": "rest",
"target_domain": "mental_wellbeing",
"reward": 0.61,
"metrics_snapshot": {"mental_wellbeing.stress_level": 95, "physical_health.energy": 40},
"reasoning": "A short rest during peak stress restored energy before tackling logistics."
},
{
"conflict_title": "The Perfect Storm",
"action_type": "communicate",
"target_domain": "relationships",
"reward": 0.58,
"metrics_snapshot": {"relationships.romantic": 45, "mental_wellbeing.emotional_stability": 50},
"reasoning": "A quick reassuring call prevented relationship collapse under crisis."
},
{
"conflict_title": "The Perfect Storm",
"action_type": "delegate",
"target_domain": "career",
"reward": 0.38, # Below threshold — should NOT be stored
"metrics_snapshot": {"career.workload": 90, "career.stability": 55},
"reasoning": "Attempted to delegate but the neurotic profile made it ineffective."
},
{
"conflict_title": "Health Scare",
"action_type": "rest",
"target_domain": "physical_health",
"reward": 0.80,
"metrics_snapshot": {"physical_health.energy": 20, "mental_wellbeing.stress_level": 90},
"reasoning": "Aggressive rest protocol dramatically recovered energy and clarity."
},
{
"conflict_title": "Check Engine Light",
"action_type": "spend",
"target_domain": "finances",
"reward": 0.33, # Below threshold — should NOT be stored
"metrics_snapshot": {"finances.liquidity": 40, "time.commute_burden": 80},
"reasoning": "Overspent on premium repair, draining liquidity buffer dangerously."
},
]
print("\n--- STORING SYNTHETIC DECISIONS ---")
for d in synthetic:
memory.store_decision(**d)
# --- Retrieve similar decisions ---
print("\n--- RETRIEVING SIMILAR DECISIONS ---")
test_metrics = {
"career.workload": 95,
"mental_wellbeing.stress_level": 90,
"finances.liquidity": 35,
"physical_health.energy": 50,
"relationships.romantic": 70
}
similar = memory.retrieve_similar("Friday 6PM", test_metrics, n=3)
for s in similar:
print(f" [{s['action_type']}] → {s['target_domain']} | reward: {s['reward']:.2f} | similarity: {s['similarity_score']:.4f}")
print(f" Reasoning: {s['reasoning'][:80]}...")
# --- Few-shot prompt ---
print("\n--- FEW-SHOT PROMPT OUTPUT ---")
prompt = memory.build_few_shot_prompt("Friday 6PM", test_metrics)
print(prompt if prompt else "(No relevant memories found)")
# --- Stats ---
print("\n--- MEMORY STATS ---")
stats = memory.get_stats()
print(f"Total Memories : {stats['total_memories']}")
print(f"Average Reward : {stats['average_reward']}")
print(f"By Action Type : {stats.get('by_action_type', stats.get('by_route_start'))}")
if __name__ == "__main__":
main()