Spaces:
Running
Running
File size: 6,520 Bytes
50f71a7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | """Persistent episodic memory for the LLM detective agent.
Stores:
- Reflections: short lessons the agent generates after each episode
- Best trajectories: full action logs from high-reward episodes (used as few-shot examples)
All data is written to disk so learning persists across container restarts
when the memory/ directory is mounted as a Docker volume.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Dict, List, Optional
MEMORY_DIR = Path(__file__).parent.parent / "memory"
class AgentMemory:
"""Disk-backed memory for reflections and successful trajectories."""
def __init__(self, memory_dir: Path = MEMORY_DIR) -> None:
self.memory_dir = memory_dir
self.memory_dir.mkdir(parents=True, exist_ok=True)
# ------------------------------------------------------------------
# Reflections (one JSONL file per task)
# ------------------------------------------------------------------
def _reflections_path(self, task: str) -> Path:
return self.memory_dir / f"reflections_{task}.jsonl"
def add_reflection(
self,
task: str,
text: str,
episode_num: int,
reward: float,
) -> None:
entry = {
"episode": episode_num,
"reward": round(reward, 3),
"reflection": text.strip(),
}
with open(self._reflections_path(task), "a") as f:
f.write(json.dumps(entry) + "\n")
def get_reflections(self, task: str, n: int = 4) -> List[str]:
"""Return the n most recent reflection texts for a task."""
path = self._reflections_path(task)
if not path.exists():
return []
lines = path.read_text().strip().splitlines()
entries = []
for line in lines:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
pass
# Return the last n reflections
return [e["reflection"] for e in entries[-n:]]
def reflection_count(self, task: str) -> int:
path = self._reflections_path(task)
if not path.exists():
return 0
return sum(1 for line in path.read_text().splitlines() if line.strip())
# ------------------------------------------------------------------
# Best trajectory (one JSON file per task — stores single best run)
# ------------------------------------------------------------------
def _trajectory_path(self, task: str) -> Path:
return self.memory_dir / f"best_trajectory_{task}.json"
def add_trajectory(
self,
task: str,
action_log: List[str],
final_message: str,
reward: float,
episode_num: int,
) -> bool:
"""Save trajectory if it's better than the current best. Returns True if saved."""
path = self._trajectory_path(task)
current_best_reward = -999.0
if path.exists():
try:
current_best_reward = json.loads(path.read_text()).get("reward", -999.0)
except (json.JSONDecodeError, KeyError):
pass
if reward > current_best_reward:
data = {
"task": task,
"episode": episode_num,
"reward": round(reward, 3),
"action_log": action_log,
"final_message": final_message,
}
path.write_text(json.dumps(data, indent=2))
return True
return False
def get_best_trajectory(self, task: str) -> Optional[Dict[str, Any]]:
"""Return best saved trajectory for task, or None."""
path = self._trajectory_path(task)
if not path.exists():
return None
try:
return json.loads(path.read_text())
except json.JSONDecodeError:
return None
# ------------------------------------------------------------------
# Win history + alpha persistence
# ------------------------------------------------------------------
def _wins_path(self, task: str) -> Path:
return self.memory_dir / f"wins_{task}.jsonl"
def _alpha_path(self, task: str) -> Path:
return self.memory_dir / f"alpha_{task}.json"
def record_win(self, task: str, won: bool, episode_num: int) -> None:
"""Append an episode outcome to the win history for this task."""
entry = {"episode": episode_num, "won": won}
with open(self._wins_path(task), "a") as f:
f.write(json.dumps(entry) + "\n")
def recent_win_rate(self, task: str, n: int = 10) -> float:
"""Return win rate over the last n episodes for this task."""
path = self._wins_path(task)
if not path.exists():
return 0.0
entries = []
for line in path.read_text().strip().splitlines():
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
pass
window = entries[-n:]
if not window:
return 0.0
return sum(1 for e in window if e["won"]) / len(window)
def save_alpha(self, task: str, alpha: float) -> None:
"""Persist the current α (LLM trust weight) for a task."""
self._alpha_path(task).write_text(json.dumps({"alpha": round(alpha, 3)}))
def load_alpha(self, task: str, default: float = 0.20) -> float:
"""Load persisted α, or return default if not saved yet."""
path = self._alpha_path(task)
if not path.exists():
return default
try:
return json.loads(path.read_text()).get("alpha", default)
except (json.JSONDecodeError, KeyError):
return default
# ------------------------------------------------------------------
# Summary
# ------------------------------------------------------------------
def summary(self) -> str:
lines = ["=== Agent Memory ==="]
for task in ["easy", "medium", "hard"]:
n_ref = self.reflection_count(task)
best = self.get_best_trajectory(task)
best_r = f"{best['reward']:+.2f}" if best else "none"
alpha = self.load_alpha(task)
wr = self.recent_win_rate(task, n=10)
lines.append(
f" {task:6s}: {n_ref:3d} reflections | best reward: {best_r} "
f"| α={alpha:.2f} | wr(last10)={wr:.0%}"
)
return "\n".join(lines)
|