File size: 5,624 Bytes
86a0172
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""

Episodic Memory – Past Tasks & Events

=======================================

Stores discrete events / task completions as Markdown files

under  memory/events/*.md



Each event has a timestamp, outcome, and optional linked entities.

Supports keyword search and time-range queries.

"""

from __future__ import annotations

import os
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional

from .models import MemoryEntry, MemoryTier


class EpisodicMemory:
    """File-backed store for task / event memories."""

    def __init__(self, base_dir: str = "memory/events"):
        self.base_dir = Path(base_dir)
        self.base_dir.mkdir(parents=True, exist_ok=True)
        # id β†’ MemoryEntry (in-memory index)
        self._index: Dict[str, MemoryEntry] = {}
        self._load_from_disk()

    # ── CRUD ─────────────────────────────────────────────────

    def create(

        self,

        content: str,

        title: str = "",

        tags: Optional[List[str]] = None,

        importance: float = 0.5,

        metadata: Optional[Dict[str, Any]] = None,

        source: str = "",

    ) -> MemoryEntry:
        entry = MemoryEntry(
            content=content,
            title=title or self._auto_title(content),
            tier=MemoryTier.EPISODIC,
            tags=tags or [],
            importance=importance,
            metadata=metadata or {},
            source=source,
            created_at=datetime.utcnow().isoformat(),
            updated_at=datetime.utcnow().isoformat(),
        )
        self._index[entry.id] = entry
        self._persist(entry)
        return entry

    def read(self, entry_id: str) -> Optional[MemoryEntry]:
        entry = self._index.get(entry_id)
        if entry:
            entry.access_count += 1
            entry.updated_at = datetime.utcnow().isoformat()
            self._persist(entry)
        return entry

    def update(self, entry_id: str, **kwargs) -> Optional[MemoryEntry]:
        entry = self._index.get(entry_id)
        if not entry:
            return None
        for k, v in kwargs.items():
            if hasattr(entry, k) and k not in ("id", "tier", "created_at"):
                setattr(entry, k, v)
        entry.updated_at = datetime.utcnow().isoformat()
        self._persist(entry)
        return entry

    def delete(self, entry_id: str) -> bool:
        if entry_id not in self._index:
            return False
        del self._index[entry_id]
        path = self._entry_path(entry_id)
        if path.exists():
            path.unlink()
        return True

    def list_entries(

        self,

        tag: Optional[str] = None,

        since: Optional[str] = None,

        until: Optional[str] = None,

        limit: int = 50,

    ) -> List[MemoryEntry]:
        """List events, optionally filtered by tag and/or time range."""
        entries = list(self._index.values())

        if tag:
            entries = [e for e in entries if tag in e.tags]
        if since:
            entries = [e for e in entries if e.created_at >= since]
        if until:
            entries = [e for e in entries if e.created_at <= until]

        # newest first
        entries.sort(key=lambda e: e.created_at, reverse=True)
        return entries[:limit]

    def search(self, query: str, limit: int = 10) -> List[MemoryEntry]:
        """Keyword search across episodic memories."""
        q = query.lower()
        scored: List[tuple] = []
        for entry in self._index.values():
            text = f"{entry.title} {entry.content} {' '.join(entry.tags)}".lower()
            if q in text:
                # rudimentary relevance: importance + recency
                scored.append((entry, entry.importance))
        scored.sort(key=lambda x: x[1], reverse=True)
        return [e for e, _ in scored[:limit]]

    def count(self) -> int:
        return len(self._index)

    # ── timeline helpers ─────────────────────────────────────

    def recent(self, n: int = 10) -> List[MemoryEntry]:
        """Get the N most recent events."""
        entries = sorted(self._index.values(), key=lambda e: e.created_at, reverse=True)
        return entries[:n]

    def by_tag(self, tag: str) -> List[MemoryEntry]:
        return [e for e in self._index.values() if tag in e.tags]

    # ── persistence ──────────────────────────────────────────

    def _entry_path(self, entry_id: str) -> Path:
        return self.base_dir / f"{entry_id}.md"

    def _persist(self, entry: MemoryEntry):
        path = self._entry_path(entry.id)
        path.write_text(entry.to_markdown(), encoding="utf-8")

    def _load_from_disk(self):
        for md_file in self.base_dir.glob("*.md"):
            try:
                text = md_file.read_text(encoding="utf-8")
                entry = MemoryEntry.from_markdown(text)
                entry.tier = MemoryTier.EPISODIC
                self._index[entry.id] = entry
            except Exception:
                pass

    @staticmethod
    def _auto_title(content: str) -> str:
        """Generate a short title from content."""
        first_line = content.strip().split("\n")[0][:80]
        return first_line if first_line else "Untitled Event"