MnemoCore / src /mnemocore /core /ripple_context.py
Granis87's picture
Initial upload of MnemoCore
dbb04e4 verified
"""
RippleContext – Phase 4.5: External Memory Environment
=======================================================
Implements the "Ripple" concept from MIT's Recursive Language Models paper.
Instead of loading all memory content into an LLM's context window (causing
"Context Rot"), RippleContext holds arbitrarily large text as an external
environment. The AI can programmatically search and slice it, fetching only
the relevant portions.
This is the MnemoCore equivalent of the RLM "REPL environment" — our tiered
storage (Redis/Qdrant/FileSystem) is the Ripple, and this class provides the
tool interface to search it without loading everything.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from collections import Counter
import math
from loguru import logger
@dataclass
class RippleChunk:
"""A single chunk of text from the Ripple environment."""
index: int
text: str
start_char: int
end_char: int
# Simple TF index for keyword search
term_freq: Dict[str, int] = field(default_factory=dict)
def __post_init__(self):
if not self.term_freq:
self.term_freq = self._build_tf(self.text)
@staticmethod
def _build_tf(text: str) -> Dict[str, int]:
"""Build term frequency index for this chunk."""
tokens = re.findall(r'\b[a-zA-ZåäöÅÄÖ]{2,}\b', text.lower())
return dict(Counter(tokens))
def score_query(self, query_terms: List[str]) -> float:
"""BM25-inspired relevance score for a list of query terms."""
if not query_terms or not self.term_freq:
return 0.0
total_terms = sum(self.term_freq.values()) or 1
score = 0.0
for term in query_terms:
tf = self.term_freq.get(term, 0)
if tf > 0:
# Normalized TF with saturation (BM25-style)
k1 = 1.5
norm_tf = (tf * (k1 + 1)) / (tf + k1 * (total_terms / 100))
score += norm_tf
return score
class RippleContext:
"""
External memory environment for Phase 4.5 Recursive Synthesis.
Holds large text corpora outside the LLM context window. The AI
interacts with it via search() and slice() — never loading everything
at once.
Usage:
ctx = RippleContext(large_text, chunk_size=500)
snippets = ctx.search("quantum computing", top_k=3)
raw = ctx.slice(0, 1000)
"""
def __init__(
self,
text: str,
chunk_size: int = 500,
chunk_overlap: int = 50,
source_label: str = "external",
):
"""
Args:
text: The large text to hold as external context.
chunk_size: Characters per chunk (default 500).
chunk_overlap: Overlap between adjacent chunks (default 50).
source_label: Label for logging/tracing.
"""
self.text = text
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.source_label = source_label
self.chunks: List[RippleChunk] = []
self._build_index()
logger.debug(
f"RippleContext '{source_label}': {len(self.text)} chars, "
f"{len(self.chunks)} chunks (size={chunk_size}, overlap={chunk_overlap})"
)
def _build_index(self) -> None:
"""Chunk the text and build the search index."""
text = self.text
step = max(1, self.chunk_size - self.chunk_overlap)
idx = 0
pos = 0
while pos < len(text):
end = min(pos + self.chunk_size, len(text))
chunk_text = text[pos:end]
self.chunks.append(RippleChunk(
index=idx,
text=chunk_text,
start_char=pos,
end_char=end,
))
idx += 1
pos += step
def search(self, query: str, top_k: int = 5) -> List[str]:
"""
Search the external context for relevant snippets.
Uses BM25-inspired keyword scoring. Returns the top_k most
relevant text chunks.
Args:
query: The search query.
top_k: Number of chunks to return.
Returns:
List of relevant text snippets (strings).
"""
if not self.chunks:
return []
query_terms = re.findall(r'\b[a-zA-ZåäöÅÄÖ]{2,}\b', query.lower())
if not query_terms:
# Fallback: return first top_k chunks
return [c.text for c in self.chunks[:top_k]]
scored = [
(chunk, chunk.score_query(query_terms))
for chunk in self.chunks
]
scored.sort(key=lambda x: x[1], reverse=True)
results = [chunk.text for chunk, score in scored[:top_k] if score > 0]
if not results:
# No keyword matches — return first chunks as fallback
results = [c.text for c in self.chunks[:top_k]]
logger.debug(
f"RippleContext.search('{query[:40]}...'): "
f"top score={scored[0][1]:.2f}, returned {len(results)} chunks"
)
return results
def slice(self, start_char: int, end_char: int) -> str:
"""
Extract a raw slice of the external context by character position.
Args:
start_char: Start character index (inclusive).
end_char: End character index (exclusive).
Returns:
The text slice.
"""
start_char = max(0, start_char)
end_char = min(len(self.text), end_char)
return self.text[start_char:end_char]
def get_chunk_by_index(self, index: int) -> Optional[RippleChunk]:
"""Get a specific chunk by its index."""
if 0 <= index < len(self.chunks):
return self.chunks[index]
return None
def get_stats(self) -> Dict[str, Any]:
"""Return statistics about this context."""
return {
"source": self.source_label,
"total_chars": len(self.text),
"total_chunks": len(self.chunks),
"chunk_size": self.chunk_size,
"chunk_overlap": self.chunk_overlap,
"approx_tokens": len(self.text) // 4, # rough estimate
}
@classmethod
def from_file(cls, path: str, **kwargs) -> "RippleContext":
"""Load a RippleContext from a text file."""
with open(path, "r", encoding="utf-8") as f:
text = f.read()
return cls(text=text, source_label=path, **kwargs)
@classmethod
def from_memory_jsonl(cls, path: str, **kwargs) -> "RippleContext":
"""
Load a RippleContext from MnemoCore's memory.jsonl (Cold tier).
Concatenates all memory content fields into a searchable corpus.
"""
import json
lines = []
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
content = obj.get("content", "")
mem_id = obj.get("id", "?")
if content:
lines.append(f"[{mem_id}] {content}")
except json.JSONDecodeError:
continue
except FileNotFoundError:
logger.warning(f"memory.jsonl not found at {path}, creating empty context")
return cls(text="", source_label=path, **kwargs)
text = "\n".join(lines)
logger.info(f"RippleContext loaded {len(lines)} memories from {path}")
return cls(text=text, source_label=path, **kwargs)
def __len__(self) -> int:
return len(self.text)
def __repr__(self) -> str:
return (
f"RippleContext(source='{self.source_label}', "
f"chars={len(self.text)}, chunks={len(self.chunks)})"
)