| """
|
| RippleContext – Phase 4.5: External Memory Environment
|
| =======================================================
|
| Implements the "Ripple" concept from MIT's Recursive Language Models paper.
|
|
|
| Instead of loading all memory content into an LLM's context window (causing
|
| "Context Rot"), RippleContext holds arbitrarily large text as an external
|
| environment. The AI can programmatically search and slice it, fetching only
|
| the relevant portions.
|
|
|
| This is the MnemoCore equivalent of the RLM "REPL environment" — our tiered
|
| storage (Redis/Qdrant/FileSystem) is the Ripple, and this class provides the
|
| tool interface to search it without loading everything.
|
| """
|
|
|
| from __future__ import annotations
|
|
|
| import re
|
| from dataclasses import dataclass, field
|
| from typing import List, Optional, Dict, Any
|
| from collections import Counter
|
| import math
|
|
|
| from loguru import logger
|
|
|
|
|
| @dataclass
|
| class RippleChunk:
|
| """A single chunk of text from the Ripple environment."""
|
| index: int
|
| text: str
|
| start_char: int
|
| end_char: int
|
|
|
| term_freq: Dict[str, int] = field(default_factory=dict)
|
|
|
| def __post_init__(self):
|
| if not self.term_freq:
|
| self.term_freq = self._build_tf(self.text)
|
|
|
| @staticmethod
|
| def _build_tf(text: str) -> Dict[str, int]:
|
| """Build term frequency index for this chunk."""
|
| tokens = re.findall(r'\b[a-zA-ZåäöÅÄÖ]{2,}\b', text.lower())
|
| return dict(Counter(tokens))
|
|
|
| def score_query(self, query_terms: List[str]) -> float:
|
| """BM25-inspired relevance score for a list of query terms."""
|
| if not query_terms or not self.term_freq:
|
| return 0.0
|
| total_terms = sum(self.term_freq.values()) or 1
|
| score = 0.0
|
| for term in query_terms:
|
| tf = self.term_freq.get(term, 0)
|
| if tf > 0:
|
|
|
| k1 = 1.5
|
| norm_tf = (tf * (k1 + 1)) / (tf + k1 * (total_terms / 100))
|
| score += norm_tf
|
| return score
|
|
|
|
|
| class RippleContext:
|
| """
|
| External memory environment for Phase 4.5 Recursive Synthesis.
|
|
|
| Holds large text corpora outside the LLM context window. The AI
|
| interacts with it via search() and slice() — never loading everything
|
| at once.
|
|
|
| Usage:
|
| ctx = RippleContext(large_text, chunk_size=500)
|
| snippets = ctx.search("quantum computing", top_k=3)
|
| raw = ctx.slice(0, 1000)
|
| """
|
|
|
| def __init__(
|
| self,
|
| text: str,
|
| chunk_size: int = 500,
|
| chunk_overlap: int = 50,
|
| source_label: str = "external",
|
| ):
|
| """
|
| Args:
|
| text: The large text to hold as external context.
|
| chunk_size: Characters per chunk (default 500).
|
| chunk_overlap: Overlap between adjacent chunks (default 50).
|
| source_label: Label for logging/tracing.
|
| """
|
| self.text = text
|
| self.chunk_size = chunk_size
|
| self.chunk_overlap = chunk_overlap
|
| self.source_label = source_label
|
| self.chunks: List[RippleChunk] = []
|
|
|
| self._build_index()
|
| logger.debug(
|
| f"RippleContext '{source_label}': {len(self.text)} chars, "
|
| f"{len(self.chunks)} chunks (size={chunk_size}, overlap={chunk_overlap})"
|
| )
|
|
|
| def _build_index(self) -> None:
|
| """Chunk the text and build the search index."""
|
| text = self.text
|
| step = max(1, self.chunk_size - self.chunk_overlap)
|
| idx = 0
|
| pos = 0
|
|
|
| while pos < len(text):
|
| end = min(pos + self.chunk_size, len(text))
|
| chunk_text = text[pos:end]
|
| self.chunks.append(RippleChunk(
|
| index=idx,
|
| text=chunk_text,
|
| start_char=pos,
|
| end_char=end,
|
| ))
|
| idx += 1
|
| pos += step
|
|
|
| def search(self, query: str, top_k: int = 5) -> List[str]:
|
| """
|
| Search the external context for relevant snippets.
|
|
|
| Uses BM25-inspired keyword scoring. Returns the top_k most
|
| relevant text chunks.
|
|
|
| Args:
|
| query: The search query.
|
| top_k: Number of chunks to return.
|
|
|
| Returns:
|
| List of relevant text snippets (strings).
|
| """
|
| if not self.chunks:
|
| return []
|
|
|
| query_terms = re.findall(r'\b[a-zA-ZåäöÅÄÖ]{2,}\b', query.lower())
|
| if not query_terms:
|
|
|
| return [c.text for c in self.chunks[:top_k]]
|
|
|
| scored = [
|
| (chunk, chunk.score_query(query_terms))
|
| for chunk in self.chunks
|
| ]
|
| scored.sort(key=lambda x: x[1], reverse=True)
|
|
|
| results = [chunk.text for chunk, score in scored[:top_k] if score > 0]
|
| if not results:
|
|
|
| results = [c.text for c in self.chunks[:top_k]]
|
|
|
| logger.debug(
|
| f"RippleContext.search('{query[:40]}...'): "
|
| f"top score={scored[0][1]:.2f}, returned {len(results)} chunks"
|
| )
|
| return results
|
|
|
| def slice(self, start_char: int, end_char: int) -> str:
|
| """
|
| Extract a raw slice of the external context by character position.
|
|
|
| Args:
|
| start_char: Start character index (inclusive).
|
| end_char: End character index (exclusive).
|
|
|
| Returns:
|
| The text slice.
|
| """
|
| start_char = max(0, start_char)
|
| end_char = min(len(self.text), end_char)
|
| return self.text[start_char:end_char]
|
|
|
| def get_chunk_by_index(self, index: int) -> Optional[RippleChunk]:
|
| """Get a specific chunk by its index."""
|
| if 0 <= index < len(self.chunks):
|
| return self.chunks[index]
|
| return None
|
|
|
| def get_stats(self) -> Dict[str, Any]:
|
| """Return statistics about this context."""
|
| return {
|
| "source": self.source_label,
|
| "total_chars": len(self.text),
|
| "total_chunks": len(self.chunks),
|
| "chunk_size": self.chunk_size,
|
| "chunk_overlap": self.chunk_overlap,
|
| "approx_tokens": len(self.text) // 4,
|
| }
|
|
|
| @classmethod
|
| def from_file(cls, path: str, **kwargs) -> "RippleContext":
|
| """Load a RippleContext from a text file."""
|
| with open(path, "r", encoding="utf-8") as f:
|
| text = f.read()
|
| return cls(text=text, source_label=path, **kwargs)
|
|
|
| @classmethod
|
| def from_memory_jsonl(cls, path: str, **kwargs) -> "RippleContext":
|
| """
|
| Load a RippleContext from MnemoCore's memory.jsonl (Cold tier).
|
| Concatenates all memory content fields into a searchable corpus.
|
| """
|
| import json
|
| lines = []
|
| try:
|
| with open(path, "r", encoding="utf-8") as f:
|
| for line in f:
|
| line = line.strip()
|
| if not line:
|
| continue
|
| try:
|
| obj = json.loads(line)
|
| content = obj.get("content", "")
|
| mem_id = obj.get("id", "?")
|
| if content:
|
| lines.append(f"[{mem_id}] {content}")
|
| except json.JSONDecodeError:
|
| continue
|
| except FileNotFoundError:
|
| logger.warning(f"memory.jsonl not found at {path}, creating empty context")
|
| return cls(text="", source_label=path, **kwargs)
|
|
|
| text = "\n".join(lines)
|
| logger.info(f"RippleContext loaded {len(lines)} memories from {path}")
|
| return cls(text=text, source_label=path, **kwargs)
|
|
|
| def __len__(self) -> int:
|
| return len(self.text)
|
|
|
| def __repr__(self) -> str:
|
| return (
|
| f"RippleContext(source='{self.source_label}', "
|
| f"chars={len(self.text)}, chunks={len(self.chunks)})"
|
| )
|
|
|