Spaces:
Running
Running
| # backend/app/pipeline/nodes/cache.py | |
| # Semantic cache lookup node. Checks the in-memory SemanticCache before | |
| # any downstream LLM or retrieval calls. On a hit, the pipeline short-circuits | |
| # directly to log_eval β no Qdrant or Groq calls made. | |
| # | |
| # The computed query embedding is stored in state so the retrieve node can | |
| # reuse it directly β avoiding a second identical HTTP call to the embedder. | |
| # | |
| # Issue 5 (cache bypass narrowing): | |
| # The previous design bypassed the cache unconditionally for any multi-turn | |
| # session. This prevented caching self-contained follow-up queries like | |
| # "what programming languages does Darshan know?" even when they appear after | |
| # prior turns. The corrected behaviour: check for unresolved reference tokens | |
| # (pronouns, demonstratives) BEFORE the cache lookup. Only queries that | |
| # contain such tokens AND have conversation history are cache-bypassed. All | |
| # other queries in multi-turn sessions go through cache normally. | |
| from typing import Callable | |
| import numpy as np | |
| from langgraph.config import get_stream_writer | |
| from app.models.pipeline import PipelineState | |
| from app.services.semantic_cache import SemanticCache | |
| # Tokens that indicate the query cannot be understood without prior context: | |
| # pronouns and demonstratives that refer to something the user said earlier. | |
| # "his" and "he" are excluded β they almost always refer to Darshan, not a | |
| # prior turn, and excluding them would bypass cache on most portfolio queries. | |
| _REFERENCE_TOKENS: frozenset[str] = frozenset({ | |
| "that", "it", "its", "they", "their", "those", | |
| "this", "these", "them", "there", "then", | |
| }) | |
| def _has_unresolved_reference(query: str) -> bool: | |
| """ | |
| True when the query contains a pronoun or demonstrative that likely refers | |
| to something in the prior conversation turn rather than to Darshan or the | |
| portfolio content. | |
| """ | |
| tokens = frozenset(query.lower().split()) | |
| return bool(tokens & _REFERENCE_TOKENS) | |
| def make_cache_node(cache: SemanticCache, embedder) -> Callable[[PipelineState], dict]: | |
| async def cache_node(state: PipelineState) -> dict: | |
| writer = get_stream_writer() | |
| writer({"type": "status", "label": "Looking up in memory..."}) | |
| query = state["query"] | |
| has_history = bool(state.get("conversation_history")) | |
| # If the query contains a reference token AND the session has history, | |
| # the query is a genuine follow-up that cannot be resolved without context. | |
| # Skip the cache so the pipeline injects history into downstream nodes. | |
| if has_history and _has_unresolved_reference(query): | |
| embedding = await embedder.embed_one(query, is_query=True) | |
| return {"cached": False, "query_embedding": embedding} | |
| # is_query=True: prepend BGE asymmetric instruction so query embedding | |
| # lands in the retrieval-optimised neighbourhood of the vector space. | |
| # Document embeddings at ingestion time use is_query=False (default). | |
| embedding = await embedder.embed_one(query, is_query=True) | |
| query_embedding = np.array(embedding) | |
| cached = await cache.get(query_embedding) | |
| if cached: | |
| writer({"type": "status", "label": "Found a recent answer, loading..."}) | |
| # Emit the full cached answer as a single token event β the cache | |
| # returns complete text, not a stream, so one event is correct. | |
| writer({"type": "token", "text": cached}) | |
| return { | |
| "answer": cached, | |
| "cached": True, | |
| "query_embedding": embedding, | |
| "path": "cache_hit", | |
| } | |
| # Store embedding in state so retrieve_node doesn't re-embed the same query. | |
| return {"cached": False, "query_embedding": embedding} | |
| return cache_node | |