File size: 3,815 Bytes
bbe01fe
 
 
 
661c2d6
 
 
3d134a6
 
 
 
 
 
 
 
 
bbe01fe
 
 
 
efdd22e
bbe01fe
 
 
 
3d134a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bbe01fe
 
 
efdd22e
 
 
3d134a6
 
 
 
 
 
 
 
 
 
e7c9ee6
 
 
3d134a6
661c2d6
bbe01fe
 
 
efdd22e
 
 
 
3d134a6
 
 
 
 
 
bbe01fe
661c2d6
 
bbe01fe
 
3d134a6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# backend/app/pipeline/nodes/cache.py
# Semantic cache lookup node. Checks the in-memory SemanticCache before
# any downstream LLM or retrieval calls. On a hit, the pipeline short-circuits
# directly to log_eval — no Qdrant or Groq calls made.
#
# The computed query embedding is stored in state so the retrieve node can
# reuse it directly — avoiding a second identical HTTP call to the embedder.
#
# Issue 5 (cache bypass narrowing):
# The previous design bypassed the cache unconditionally for any multi-turn
# session. This prevented caching self-contained follow-up queries like
# "what programming languages does Darshan know?" even when they appear after
# prior turns. The corrected behaviour: check for unresolved reference tokens
# (pronouns, demonstratives) BEFORE the cache lookup. Only queries that
# contain such tokens AND have conversation history are cache-bypassed. All
# other queries in multi-turn sessions go through cache normally.

from typing import Callable

import numpy as np
from langgraph.config import get_stream_writer

from app.models.pipeline import PipelineState
from app.services.semantic_cache import SemanticCache

# Tokens that indicate the query cannot be understood without prior context:
# pronouns and demonstratives that refer to something the user said earlier.
# "his" and "he" are excluded — they almost always refer to Darshan, not a
# prior turn, and excluding them would bypass cache on most portfolio queries.
_REFERENCE_TOKENS: frozenset[str] = frozenset({
    "that", "it", "its", "they", "their", "those",
    "this", "these", "them", "there", "then",
})


def _has_unresolved_reference(query: str) -> bool:
    """
    True when the query contains a pronoun or demonstrative that likely refers
    to something in the prior conversation turn rather than to Darshan or the
    portfolio content.
    """
    tokens = frozenset(query.lower().split())
    return bool(tokens & _REFERENCE_TOKENS)


def make_cache_node(cache: SemanticCache, embedder) -> Callable[[PipelineState], dict]:
    async def cache_node(state: PipelineState) -> dict:
        writer = get_stream_writer()
        writer({"type": "status", "label": "Looking up in memory..."})

        query = state["query"]
        has_history = bool(state.get("conversation_history"))

        # If the query contains a reference token AND the session has history,
        # the query is a genuine follow-up that cannot be resolved without context.
        # Skip the cache so the pipeline injects history into downstream nodes.
        if has_history and _has_unresolved_reference(query):
            embedding = await embedder.embed_one(query, is_query=True)
            return {"cached": False, "query_embedding": embedding}

        # is_query=True: prepend BGE asymmetric instruction so query embedding
        # lands in the retrieval-optimised neighbourhood of the vector space.
        # Document embeddings at ingestion time use is_query=False (default).
        embedding = await embedder.embed_one(query, is_query=True)
        query_embedding = np.array(embedding)

        cached = await cache.get(query_embedding)
        if cached:
            writer({"type": "status", "label": "Found a recent answer, loading..."})
            # Emit the full cached answer as a single token event — the cache
            # returns complete text, not a stream, so one event is correct.
            writer({"type": "token", "text": cached})
            return {
                "answer": cached,
                "cached": True,
                "query_embedding": embedding,
                "path": "cache_hit",
            }

        # Store embedding in state so retrieve_node doesn't re-embed the same query.
        return {"cached": False, "query_embedding": embedding}

    return cache_node