""" Episodic Memory – Past Tasks & Events ======================================= Stores discrete events / task completions as Markdown files under memory/events/*.md Each event has a timestamp, outcome, and optional linked entities. Supports keyword search and time-range queries. """ from __future__ import annotations import os from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional from .models import MemoryEntry, MemoryTier class EpisodicMemory: """File-backed store for task / event memories.""" def __init__(self, base_dir: str = "memory/events"): self.base_dir = Path(base_dir) self.base_dir.mkdir(parents=True, exist_ok=True) # id → MemoryEntry (in-memory index) self._index: Dict[str, MemoryEntry] = {} self._load_from_disk() # ── CRUD ───────────────────────────────────────────────── def create( self, content: str, title: str = "", tags: Optional[List[str]] = None, importance: float = 0.5, metadata: Optional[Dict[str, Any]] = None, source: str = "", ) -> MemoryEntry: entry = MemoryEntry( content=content, title=title or self._auto_title(content), tier=MemoryTier.EPISODIC, tags=tags or [], importance=importance, metadata=metadata or {}, source=source, created_at=datetime.utcnow().isoformat(), updated_at=datetime.utcnow().isoformat(), ) self._index[entry.id] = entry self._persist(entry) return entry def read(self, entry_id: str) -> Optional[MemoryEntry]: entry = self._index.get(entry_id) if entry: entry.access_count += 1 entry.updated_at = datetime.utcnow().isoformat() self._persist(entry) return entry def update(self, entry_id: str, **kwargs) -> Optional[MemoryEntry]: entry = self._index.get(entry_id) if not entry: return None for k, v in kwargs.items(): if hasattr(entry, k) and k not in ("id", "tier", "created_at"): setattr(entry, k, v) entry.updated_at = datetime.utcnow().isoformat() self._persist(entry) return entry def delete(self, entry_id: str) -> bool: if entry_id not in self._index: return False del self._index[entry_id] path = self._entry_path(entry_id) if path.exists(): path.unlink() return True def list_entries( self, tag: Optional[str] = None, since: Optional[str] = None, until: Optional[str] = None, limit: int = 50, ) -> List[MemoryEntry]: """List events, optionally filtered by tag and/or time range.""" entries = list(self._index.values()) if tag: entries = [e for e in entries if tag in e.tags] if since: entries = [e for e in entries if e.created_at >= since] if until: entries = [e for e in entries if e.created_at <= until] # newest first entries.sort(key=lambda e: e.created_at, reverse=True) return entries[:limit] def search(self, query: str, limit: int = 10) -> List[MemoryEntry]: """Keyword search across episodic memories.""" q = query.lower() scored: List[tuple] = [] for entry in self._index.values(): text = f"{entry.title} {entry.content} {' '.join(entry.tags)}".lower() if q in text: # rudimentary relevance: importance + recency scored.append((entry, entry.importance)) scored.sort(key=lambda x: x[1], reverse=True) return [e for e, _ in scored[:limit]] def count(self) -> int: return len(self._index) # ── timeline helpers ───────────────────────────────────── def recent(self, n: int = 10) -> List[MemoryEntry]: """Get the N most recent events.""" entries = sorted(self._index.values(), key=lambda e: e.created_at, reverse=True) return entries[:n] def by_tag(self, tag: str) -> List[MemoryEntry]: return [e for e in self._index.values() if tag in e.tags] # ── persistence ────────────────────────────────────────── def _entry_path(self, entry_id: str) -> Path: return self.base_dir / f"{entry_id}.md" def _persist(self, entry: MemoryEntry): path = self._entry_path(entry.id) path.write_text(entry.to_markdown(), encoding="utf-8") def _load_from_disk(self): for md_file in self.base_dir.glob("*.md"): try: text = md_file.read_text(encoding="utf-8") entry = MemoryEntry.from_markdown(text) entry.tier = MemoryTier.EPISODIC self._index[entry.id] = entry except Exception: pass @staticmethod def _auto_title(content: str) -> str: """Generate a short title from content.""" first_line = content.strip().split("\n")[0][:80] return first_line if first_line else "Untitled Event"