GitHub Actions
Deploy c8a8192
efdd22e
# backend/app/pipeline/nodes/cache.py
# Semantic cache lookup node. Checks the in-memory SemanticCache before
# any downstream LLM or retrieval calls. On a hit, the pipeline short-circuits
# directly to log_eval β€” no Qdrant or Groq calls made.
#
# The computed query embedding is stored in state so the retrieve node can
# reuse it directly β€” avoiding a second identical HTTP call to the embedder.
#
# Issue 5 (cache bypass narrowing):
# The previous design bypassed the cache unconditionally for any multi-turn
# session. This prevented caching self-contained follow-up queries like
# "what programming languages does Darshan know?" even when they appear after
# prior turns. The corrected behaviour: check for unresolved reference tokens
# (pronouns, demonstratives) BEFORE the cache lookup. Only queries that
# contain such tokens AND have conversation history are cache-bypassed. All
# other queries in multi-turn sessions go through cache normally.
from typing import Callable
import numpy as np
from langgraph.config import get_stream_writer
from app.models.pipeline import PipelineState
from app.services.semantic_cache import SemanticCache
# Tokens that indicate the query cannot be understood without prior context:
# pronouns and demonstratives that refer to something the user said earlier.
# "his" and "he" are excluded β€” they almost always refer to Darshan, not a
# prior turn, and excluding them would bypass cache on most portfolio queries.
_REFERENCE_TOKENS: frozenset[str] = frozenset({
"that", "it", "its", "they", "their", "those",
"this", "these", "them", "there", "then",
})
def _has_unresolved_reference(query: str) -> bool:
"""
True when the query contains a pronoun or demonstrative that likely refers
to something in the prior conversation turn rather than to Darshan or the
portfolio content.
"""
tokens = frozenset(query.lower().split())
return bool(tokens & _REFERENCE_TOKENS)
def make_cache_node(cache: SemanticCache, embedder) -> Callable[[PipelineState], dict]:
async def cache_node(state: PipelineState) -> dict:
writer = get_stream_writer()
writer({"type": "status", "label": "Looking up in memory..."})
query = state["query"]
has_history = bool(state.get("conversation_history"))
# If the query contains a reference token AND the session has history,
# the query is a genuine follow-up that cannot be resolved without context.
# Skip the cache so the pipeline injects history into downstream nodes.
if has_history and _has_unresolved_reference(query):
embedding = await embedder.embed_one(query, is_query=True)
return {"cached": False, "query_embedding": embedding}
# is_query=True: prepend BGE asymmetric instruction so query embedding
# lands in the retrieval-optimised neighbourhood of the vector space.
# Document embeddings at ingestion time use is_query=False (default).
embedding = await embedder.embed_one(query, is_query=True)
query_embedding = np.array(embedding)
cached = await cache.get(query_embedding)
if cached:
writer({"type": "status", "label": "Found a recent answer, loading..."})
# Emit the full cached answer as a single token event β€” the cache
# returns complete text, not a stream, so one event is correct.
writer({"type": "token", "text": cached})
return {
"answer": cached,
"cached": True,
"query_embedding": embedding,
"path": "cache_hit",
}
# Store embedding in state so retrieve_node doesn't re-embed the same query.
return {"cached": False, "query_embedding": embedding}
return cache_node