Spaces:
Running
Running
File size: 9,405 Bytes
24f95f0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 | """
New service: MemoryManager
Makes Janus genuinely self-improving by maintaining structured memory across sessions:
1. Case deduplication β prevents learning from near-identical queries
2. Topic clustering β groups cases by domain/intent
3. Pattern extraction β identifies what kinds of queries work well vs poorly
4. Knowledge summarization β periodically summarizes case clusters into compact knowledge
5. Semantic similarity check β uses simple TF-IDF (no GPU needed) to find similar past cases
This replaces the raw JSON scan in adaptive_intelligence.py that caused the O(n)
discovery loop β MemoryManager maintains an index so lookups are O(1).
"""
from __future__ import annotations
import json
import math
import logging
import pathlib
import re
import time
from collections import defaultdict
from typing import Any, Optional
logger = logging.getLogger(__name__)
try:
from app.config import DATA_DIR
except ImportError:
DATA_DIR = pathlib.Path(__file__).parent.parent / "data"
MEMORY_DIR = pathlib.Path(DATA_DIR) / "memory"
INDEX_FILE = pathlib.Path(DATA_DIR) / "adaptive" / "memory_index.json"
class MemoryManager:
"""
Indexed case memory with similarity search.
Zero external dependencies β uses TF-IDF for similarity.
"""
def __init__(self):
MEMORY_DIR.mkdir(parents=True, exist_ok=True)
INDEX_FILE.parent.mkdir(parents=True, exist_ok=True)
self._index: dict[str, dict] = {} # case_id β metadata
self._tfidf: dict[str, dict] = {} # case_id β term_freq
self._idf: dict[str, float] = {} # term β idf score
self._load_index()
# ββ Public API βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def add_case(self, case: dict) -> str:
"""Index a new case. Returns case_id."""
case_id = case.get("id") or case.get("case_id") or _make_id()
query = case.get("query") or case.get("user_input") or ""
domain = case.get("domain") or case.get("route", {}).get("domain", "general")
quality = case.get("quality_score", 0.5)
# Check for near-duplicate
similar = self.find_similar(query, top_k=1)
if similar and similar[0]["score"] > 0.92:
logger.debug("MemoryManager: skipping near-duplicate case (%.2f similar to %s)",
similar[0]["score"], similar[0]["case_id"])
return similar[0]["case_id"]
# Index metadata
self._index[case_id] = {
"id": case_id,
"query": query[:200],
"domain": domain,
"quality": quality,
"ts": time.time(),
}
# Index TF-IDF terms
self._tfidf[case_id] = _term_freq(_tokenise(query))
self._recompute_idf()
self._save_index()
return case_id
def find_similar(self, query: str, top_k: int = 5) -> list[dict]:
"""Return top_k most similar past cases with cosine similarity scores."""
if not self._tfidf:
return []
query_tf = _term_freq(_tokenise(query))
scores = []
for cid, doc_tf in self._tfidf.items():
score = _cosine(query_tf, doc_tf, self._idf)
scores.append((score, cid))
scores.sort(reverse=True)
return [
{"case_id": cid, "score": round(score, 4),
**self._index.get(cid, {})}
for score, cid in scores[:top_k]
if score > 0.05
]
def get_domain_stats(self) -> dict[str, Any]:
"""Return per-domain case counts and average quality."""
domains: dict[str, list] = defaultdict(list)
for meta in self._index.values():
domains[meta.get("domain", "general")].append(meta.get("quality", 0.5))
return {
domain: {
"count": len(scores),
"avg_quality": round(sum(scores) / len(scores), 3) if scores else 0,
}
for domain, scores in domains.items()
}
def get_frequent_patterns(self, min_freq: int = 3) -> list[dict]:
"""
Extract query patterns that appear β₯ min_freq times.
Used to build skills / prompt specialisations.
"""
term_counts: dict[str, int] = defaultdict(int)
term_cases: dict[str, list] = defaultdict(list)
for cid, meta in self._index.items():
for token in _tokenise(meta.get("query", "")):
if len(token) >= 4:
term_counts[token] += 1
term_cases[token].append(cid)
patterns = []
for term, count in term_counts.items():
if count >= min_freq:
patterns.append({
"term": term,
"count": count,
"case_ids": term_cases[term][:10],
})
return sorted(patterns, key=lambda x: -x["count"])
def total_cases(self) -> int:
return len(self._index)
# ββ Internal βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _recompute_idf(self):
N = max(len(self._tfidf), 1)
term_doc_count: dict[str, int] = defaultdict(int)
for doc_tf in self._tfidf.values():
for term in doc_tf:
term_doc_count[term] += 1
self._idf = {
term: math.log((N + 1) / (count + 1)) + 1
for term, count in term_doc_count.items()
}
def _save_index(self):
try:
INDEX_FILE.write_text(json.dumps({
"index": self._index,
"tfidf": self._tfidf,
"idf": self._idf,
"saved": time.time(),
}, indent=2))
except Exception as exc:
logger.warning("MemoryManager: failed to save index: %s", exc)
def _load_index(self):
if not INDEX_FILE.exists():
# Bootstrap from existing case files
self._bootstrap_from_cases()
return
try:
data = json.loads(INDEX_FILE.read_text())
self._index = data.get("index", {})
self._tfidf = data.get("tfidf", {})
self._idf = data.get("idf", {})
logger.info("MemoryManager: loaded %d indexed cases", len(self._index))
except Exception as exc:
logger.warning("MemoryManager: index load failed (%s), rebuilding", exc)
self._bootstrap_from_cases()
def _bootstrap_from_cases(self):
"""Build index from existing case JSON files on first run."""
if not MEMORY_DIR.exists():
return
for f in list(MEMORY_DIR.glob("*.json"))[:500]:
try:
case = json.loads(f.read_text())
case_id = case.get("id") or case.get("case_id") or f.stem
query = case.get("query") or case.get("user_input") or ""
self._index[case_id] = {
"id": case_id,
"query": query[:200],
"domain": case.get("domain", "general"),
"quality": case.get("quality_score", 0.5),
"ts": case.get("timestamp", time.time()),
}
self._tfidf[case_id] = _term_freq(_tokenise(query))
except Exception:
pass
if self._tfidf:
self._recompute_idf()
self._save_index()
logger.info("MemoryManager: bootstrapped %d cases from disk", len(self._index))
# ββ TF-IDF helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
_STOPWORDS = {
"the","a","an","is","in","of","for","to","and","or","it","its",
"this","that","with","on","at","by","from","be","are","was","were",
"what","how","why","when","who","which","can","will","do","does",
"i","me","my","we","you","your","he","she","they","them","their",
"about","tell","explain","show","get","give","make","take","find",
}
def _tokenise(text: str) -> list[str]:
text = (text or "").lower()
tokens = re.findall(r'[a-z]{3,}', text)
return [t for t in tokens if t not in _STOPWORDS]
def _term_freq(tokens: list[str]) -> dict[str, float]:
if not tokens:
return {}
counts: dict[str, int] = defaultdict(int)
for t in tokens:
counts[t] += 1
total = len(tokens)
return {t: c / total for t, c in counts.items()}
def _cosine(a: dict[str, float], b: dict[str, float], idf: dict[str, float]) -> float:
common = set(a) & set(b)
if not common:
return 0.0
dot = sum(a[t] * b[t] * idf.get(t, 1.0) ** 2 for t in common)
mag_a = math.sqrt(sum((v * idf.get(t, 1.0)) ** 2 for t, v in a.items()))
mag_b = math.sqrt(sum((v * idf.get(t, 1.0)) ** 2 for t, v in b.items()))
if mag_a == 0 or mag_b == 0:
return 0.0
return dot / (mag_a * mag_b)
def _make_id() -> str:
import hashlib, uuid
return hashlib.md5(uuid.uuid4().bytes).hexdigest()[:12]
# Singleton
memory_manager = MemoryManager()
|