# backend/app/pipeline/nodes/cache.py # Semantic cache lookup node. Checks the in-memory SemanticCache before # any downstream LLM or retrieval calls. On a hit, the pipeline short-circuits # directly to log_eval — no Qdrant or Groq calls made. # # The computed query embedding is stored in state so the retrieve node can # reuse it directly — avoiding a second identical HTTP call to the embedder. # # Issue 5 (cache bypass narrowing): # The previous design bypassed the cache unconditionally for any multi-turn # session. This prevented caching self-contained follow-up queries like # "what programming languages does Darshan know?" even when they appear after # prior turns. The corrected behaviour: check for unresolved reference tokens # (pronouns, demonstratives) BEFORE the cache lookup. Only queries that # contain such tokens AND have conversation history are cache-bypassed. All # other queries in multi-turn sessions go through cache normally. from typing import Callable import numpy as np from langgraph.config import get_stream_writer from app.models.pipeline import PipelineState from app.services.semantic_cache import SemanticCache # Tokens that indicate the query cannot be understood without prior context: # pronouns and demonstratives that refer to something the user said earlier. # "his" and "he" are excluded — they almost always refer to Darshan, not a # prior turn, and excluding them would bypass cache on most portfolio queries. _REFERENCE_TOKENS: frozenset[str] = frozenset({ "that", "it", "its", "they", "their", "those", "this", "these", "them", "there", "then", }) def _has_unresolved_reference(query: str) -> bool: """ True when the query contains a pronoun or demonstrative that likely refers to something in the prior conversation turn rather than to Darshan or the portfolio content. """ tokens = frozenset(query.lower().split()) return bool(tokens & _REFERENCE_TOKENS) def make_cache_node(cache: SemanticCache, embedder) -> Callable[[PipelineState], dict]: async def cache_node(state: PipelineState) -> dict: writer = get_stream_writer() writer({"type": "status", "label": "Looking up in memory..."}) query = state["query"] has_history = bool(state.get("conversation_history")) # If the query contains a reference token AND the session has history, # the query is a genuine follow-up that cannot be resolved without context. # Skip the cache so the pipeline injects history into downstream nodes. if has_history and _has_unresolved_reference(query): embedding = await embedder.embed_one(query, is_query=True) return {"cached": False, "query_embedding": embedding} # is_query=True: prepend BGE asymmetric instruction so query embedding # lands in the retrieval-optimised neighbourhood of the vector space. # Document embeddings at ingestion time use is_query=False (default). embedding = await embedder.embed_one(query, is_query=True) query_embedding = np.array(embedding) cached = await cache.get(query_embedding) if cached: writer({"type": "status", "label": "Found a recent answer, loading..."}) # Emit the full cached answer as a single token event — the cache # returns complete text, not a stream, so one event is correct. writer({"type": "token", "text": cached}) return { "answer": cached, "cached": True, "query_embedding": embedding, "path": "cache_hit", } # Store embedding in state so retrieve_node doesn't re-embed the same query. return {"cached": False, "query_embedding": embedding} return cache_node