"""Persistent query cache. Stores (query_text, retrieved_chunks, timestamp) and indexes the queries by their BGE embedding. New queries similar to past successful queries return the cached chunks immediately (Tier 1 of the retrieval pipeline). This is the device-side counterpart of the future central-server PageRank curation layer: as users issue queries, successful (query, chunks) pairs accumulate locally, and can later be uploaded for collective curation. """ from typing import List, Optional, Tuple import time import numpy as np from rag import BGERetriever class QueryCache: def __init__(self, retriever: BGERetriever, sim_threshold: float = 0.85): self.retriever = retriever self.sim_threshold = sim_threshold self.entries: list = [] # list of (query, [chunks], ts) self.q_embs: np.ndarray = np.zeros((0, retriever._dim()), dtype=np.float32) def __len__(self) -> int: return len(self.entries) def add(self, query: str, chunks: List[str]) -> None: emb = self.retriever._encode([query], is_query=True) self.entries.append((query, list(chunks), time.time())) if len(self.q_embs) == 0: self.q_embs = emb else: self.q_embs = np.vstack([self.q_embs, emb]) def lookup(self, query: str) -> Optional[Tuple[List[str], float, str]]: """If a sufficiently-similar past query exists, return (chunks, sim, matched_query).""" if len(self.entries) == 0: return None emb = self.retriever._encode([query], is_query=True)[0] sims = self.q_embs @ emb idx = int(sims.argmax()) if sims[idx] >= self.sim_threshold: q, chunks, _ts = self.entries[idx] return chunks, float(sims[idx]), q return None