from __future__ import annotations import json import re import time from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional def _normalize(text: str) -> str: return re.sub(r"\s+", " ", (text or "")).strip() def _tokens(text: str) -> List[str]: return re.findall(r"[a-z0-9_]+", (text or "").lower()) @dataclass class MemoryItem: timestamp: float query: str text: str source: str reward: float tags: List[str] metadata: Dict[str, Any] class PersistentMemoryPool: def __init__(self, path: str | Path): self.path = Path(path) self.path.parent.mkdir(parents=True, exist_ok=True) self.items: List[MemoryItem] = [] self._load() def _load(self) -> None: self.items = [] if not self.path.exists(): return for line in self.path.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line: continue try: payload = json.loads(line) except json.JSONDecodeError: continue self.items.append( MemoryItem( timestamp=float(payload.get("timestamp", 0.0) or 0.0), query=str(payload.get("query", "")), text=str(payload.get("text", "")), source=str(payload.get("source", "")), reward=float(payload.get("reward", 0.0) or 0.0), tags=[str(tag) for tag in payload.get("tags", [])], metadata=dict(payload.get("metadata", {}) or {}), ) ) def add( self, *, query: str, text: str, source: str, reward: float = 0.0, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, ) -> None: item = MemoryItem( timestamp=time.time(), query=_normalize(query), text=_normalize(text), source=_normalize(source), reward=float(reward), tags=[str(tag) for tag in (tags or [])], metadata=dict(metadata or {}), ) self.items.append(item) with self.path.open("a", encoding="utf-8") as handle: handle.write( json.dumps( { "timestamp": item.timestamp, "query": item.query, "text": item.text, "source": item.source, "reward": item.reward, "tags": item.tags, "metadata": item.metadata, }, ensure_ascii=False, ) + "\n" ) def search(self, query: str, max_results: int = 5) -> List[Dict[str, Any]]: query_terms = set(_tokens(query)) ranked: List[tuple[float, MemoryItem]] = [] for item in self.items: haystack_terms = set(_tokens(item.query + " " + item.text + " " + " ".join(item.tags))) overlap = len(query_terms.intersection(haystack_terms)) if overlap == 0 and query_terms: continue score = float(overlap) + (item.reward * 0.25) ranked.append((score, item)) ranked.sort(key=lambda pair: (pair[0], pair[1].timestamp), reverse=True) results: List[Dict[str, Any]] = [] for score, item in ranked[:max_results]: results.append( { "score": round(score, 4), "query": item.query, "text": item.text[:400], "source": item.source, "reward": item.reward, "tags": item.tags, } ) return results def build_context(self, query: str, max_results: int = 5, max_chars: int = 1200) -> str: entries = self.search(query, max_results=max_results) lines: List[str] = [] total = 0 for item in entries: line = f"- [{item['source']}] {item['text']}" total += len(line) if total > max_chars: break lines.append(line) return "\n".join(lines).strip()