Spaces:
Running
Running
File size: 5,624 Bytes
86a0172 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | """
Episodic Memory β Past Tasks & Events
=======================================
Stores discrete events / task completions as Markdown files
under memory/events/*.md
Each event has a timestamp, outcome, and optional linked entities.
Supports keyword search and time-range queries.
"""
from __future__ import annotations
import os
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from .models import MemoryEntry, MemoryTier
class EpisodicMemory:
"""File-backed store for task / event memories."""
def __init__(self, base_dir: str = "memory/events"):
self.base_dir = Path(base_dir)
self.base_dir.mkdir(parents=True, exist_ok=True)
# id β MemoryEntry (in-memory index)
self._index: Dict[str, MemoryEntry] = {}
self._load_from_disk()
# ββ CRUD βββββββββββββββββββββββββββββββββββββββββββββββββ
def create(
self,
content: str,
title: str = "",
tags: Optional[List[str]] = None,
importance: float = 0.5,
metadata: Optional[Dict[str, Any]] = None,
source: str = "",
) -> MemoryEntry:
entry = MemoryEntry(
content=content,
title=title or self._auto_title(content),
tier=MemoryTier.EPISODIC,
tags=tags or [],
importance=importance,
metadata=metadata or {},
source=source,
created_at=datetime.utcnow().isoformat(),
updated_at=datetime.utcnow().isoformat(),
)
self._index[entry.id] = entry
self._persist(entry)
return entry
def read(self, entry_id: str) -> Optional[MemoryEntry]:
entry = self._index.get(entry_id)
if entry:
entry.access_count += 1
entry.updated_at = datetime.utcnow().isoformat()
self._persist(entry)
return entry
def update(self, entry_id: str, **kwargs) -> Optional[MemoryEntry]:
entry = self._index.get(entry_id)
if not entry:
return None
for k, v in kwargs.items():
if hasattr(entry, k) and k not in ("id", "tier", "created_at"):
setattr(entry, k, v)
entry.updated_at = datetime.utcnow().isoformat()
self._persist(entry)
return entry
def delete(self, entry_id: str) -> bool:
if entry_id not in self._index:
return False
del self._index[entry_id]
path = self._entry_path(entry_id)
if path.exists():
path.unlink()
return True
def list_entries(
self,
tag: Optional[str] = None,
since: Optional[str] = None,
until: Optional[str] = None,
limit: int = 50,
) -> List[MemoryEntry]:
"""List events, optionally filtered by tag and/or time range."""
entries = list(self._index.values())
if tag:
entries = [e for e in entries if tag in e.tags]
if since:
entries = [e for e in entries if e.created_at >= since]
if until:
entries = [e for e in entries if e.created_at <= until]
# newest first
entries.sort(key=lambda e: e.created_at, reverse=True)
return entries[:limit]
def search(self, query: str, limit: int = 10) -> List[MemoryEntry]:
"""Keyword search across episodic memories."""
q = query.lower()
scored: List[tuple] = []
for entry in self._index.values():
text = f"{entry.title} {entry.content} {' '.join(entry.tags)}".lower()
if q in text:
# rudimentary relevance: importance + recency
scored.append((entry, entry.importance))
scored.sort(key=lambda x: x[1], reverse=True)
return [e for e, _ in scored[:limit]]
def count(self) -> int:
return len(self._index)
# ββ timeline helpers βββββββββββββββββββββββββββββββββββββ
def recent(self, n: int = 10) -> List[MemoryEntry]:
"""Get the N most recent events."""
entries = sorted(self._index.values(), key=lambda e: e.created_at, reverse=True)
return entries[:n]
def by_tag(self, tag: str) -> List[MemoryEntry]:
return [e for e in self._index.values() if tag in e.tags]
# ββ persistence ββββββββββββββββββββββββββββββββββββββββββ
def _entry_path(self, entry_id: str) -> Path:
return self.base_dir / f"{entry_id}.md"
def _persist(self, entry: MemoryEntry):
path = self._entry_path(entry.id)
path.write_text(entry.to_markdown(), encoding="utf-8")
def _load_from_disk(self):
for md_file in self.base_dir.glob("*.md"):
try:
text = md_file.read_text(encoding="utf-8")
entry = MemoryEntry.from_markdown(text)
entry.tier = MemoryTier.EPISODIC
self._index[entry.id] = entry
except Exception:
pass
@staticmethod
def _auto_title(content: str) -> str:
"""Generate a short title from content."""
first_line = content.strip().split("\n")[0][:80]
return first_line if first_line else "Untitled Event"
|