File size: 8,286 Bytes
dbb04e4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 | """
RippleContext – Phase 4.5: External Memory Environment
=======================================================
Implements the "Ripple" concept from MIT's Recursive Language Models paper.
Instead of loading all memory content into an LLM's context window (causing
"Context Rot"), RippleContext holds arbitrarily large text as an external
environment. The AI can programmatically search and slice it, fetching only
the relevant portions.
This is the MnemoCore equivalent of the RLM "REPL environment" — our tiered
storage (Redis/Qdrant/FileSystem) is the Ripple, and this class provides the
tool interface to search it without loading everything.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from collections import Counter
import math
from loguru import logger
@dataclass
class RippleChunk:
"""A single chunk of text from the Ripple environment."""
index: int
text: str
start_char: int
end_char: int
# Simple TF index for keyword search
term_freq: Dict[str, int] = field(default_factory=dict)
def __post_init__(self):
if not self.term_freq:
self.term_freq = self._build_tf(self.text)
@staticmethod
def _build_tf(text: str) -> Dict[str, int]:
"""Build term frequency index for this chunk."""
tokens = re.findall(r'\b[a-zA-ZåäöÅÄÖ]{2,}\b', text.lower())
return dict(Counter(tokens))
def score_query(self, query_terms: List[str]) -> float:
"""BM25-inspired relevance score for a list of query terms."""
if not query_terms or not self.term_freq:
return 0.0
total_terms = sum(self.term_freq.values()) or 1
score = 0.0
for term in query_terms:
tf = self.term_freq.get(term, 0)
if tf > 0:
# Normalized TF with saturation (BM25-style)
k1 = 1.5
norm_tf = (tf * (k1 + 1)) / (tf + k1 * (total_terms / 100))
score += norm_tf
return score
class RippleContext:
"""
External memory environment for Phase 4.5 Recursive Synthesis.
Holds large text corpora outside the LLM context window. The AI
interacts with it via search() and slice() — never loading everything
at once.
Usage:
ctx = RippleContext(large_text, chunk_size=500)
snippets = ctx.search("quantum computing", top_k=3)
raw = ctx.slice(0, 1000)
"""
def __init__(
self,
text: str,
chunk_size: int = 500,
chunk_overlap: int = 50,
source_label: str = "external",
):
"""
Args:
text: The large text to hold as external context.
chunk_size: Characters per chunk (default 500).
chunk_overlap: Overlap between adjacent chunks (default 50).
source_label: Label for logging/tracing.
"""
self.text = text
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.source_label = source_label
self.chunks: List[RippleChunk] = []
self._build_index()
logger.debug(
f"RippleContext '{source_label}': {len(self.text)} chars, "
f"{len(self.chunks)} chunks (size={chunk_size}, overlap={chunk_overlap})"
)
def _build_index(self) -> None:
"""Chunk the text and build the search index."""
text = self.text
step = max(1, self.chunk_size - self.chunk_overlap)
idx = 0
pos = 0
while pos < len(text):
end = min(pos + self.chunk_size, len(text))
chunk_text = text[pos:end]
self.chunks.append(RippleChunk(
index=idx,
text=chunk_text,
start_char=pos,
end_char=end,
))
idx += 1
pos += step
def search(self, query: str, top_k: int = 5) -> List[str]:
"""
Search the external context for relevant snippets.
Uses BM25-inspired keyword scoring. Returns the top_k most
relevant text chunks.
Args:
query: The search query.
top_k: Number of chunks to return.
Returns:
List of relevant text snippets (strings).
"""
if not self.chunks:
return []
query_terms = re.findall(r'\b[a-zA-ZåäöÅÄÖ]{2,}\b', query.lower())
if not query_terms:
# Fallback: return first top_k chunks
return [c.text for c in self.chunks[:top_k]]
scored = [
(chunk, chunk.score_query(query_terms))
for chunk in self.chunks
]
scored.sort(key=lambda x: x[1], reverse=True)
results = [chunk.text for chunk, score in scored[:top_k] if score > 0]
if not results:
# No keyword matches — return first chunks as fallback
results = [c.text for c in self.chunks[:top_k]]
logger.debug(
f"RippleContext.search('{query[:40]}...'): "
f"top score={scored[0][1]:.2f}, returned {len(results)} chunks"
)
return results
def slice(self, start_char: int, end_char: int) -> str:
"""
Extract a raw slice of the external context by character position.
Args:
start_char: Start character index (inclusive).
end_char: End character index (exclusive).
Returns:
The text slice.
"""
start_char = max(0, start_char)
end_char = min(len(self.text), end_char)
return self.text[start_char:end_char]
def get_chunk_by_index(self, index: int) -> Optional[RippleChunk]:
"""Get a specific chunk by its index."""
if 0 <= index < len(self.chunks):
return self.chunks[index]
return None
def get_stats(self) -> Dict[str, Any]:
"""Return statistics about this context."""
return {
"source": self.source_label,
"total_chars": len(self.text),
"total_chunks": len(self.chunks),
"chunk_size": self.chunk_size,
"chunk_overlap": self.chunk_overlap,
"approx_tokens": len(self.text) // 4, # rough estimate
}
@classmethod
def from_file(cls, path: str, **kwargs) -> "RippleContext":
"""Load a RippleContext from a text file."""
with open(path, "r", encoding="utf-8") as f:
text = f.read()
return cls(text=text, source_label=path, **kwargs)
@classmethod
def from_memory_jsonl(cls, path: str, **kwargs) -> "RippleContext":
"""
Load a RippleContext from MnemoCore's memory.jsonl (Cold tier).
Concatenates all memory content fields into a searchable corpus.
"""
import json
lines = []
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
content = obj.get("content", "")
mem_id = obj.get("id", "?")
if content:
lines.append(f"[{mem_id}] {content}")
except json.JSONDecodeError:
continue
except FileNotFoundError:
logger.warning(f"memory.jsonl not found at {path}, creating empty context")
return cls(text="", source_label=path, **kwargs)
text = "\n".join(lines)
logger.info(f"RippleContext loaded {len(lines)} memories from {path}")
return cls(text=text, source_label=path, **kwargs)
def __len__(self) -> int:
return len(self.text)
def __repr__(self) -> str:
return (
f"RippleContext(source='{self.source_label}', "
f"chars={len(self.text)}, chunks={len(self.chunks)})"
)
|