GitHub Actions commited on
Commit
4ef165a
Β·
1 Parent(s): f0e94ef

Deploy 583b552

Browse files
app/api/chat.py CHANGED
@@ -1,3 +1,4 @@
 
1
  import json
2
  import time
3
  from fastapi import APIRouter, Request, Depends
@@ -76,6 +77,32 @@ async def _generate_follow_ups(
76
  return []
77
 
78
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79
  @router.post("")
80
  @chat_rate_limit()
81
  async def chat_endpoint(
@@ -104,10 +131,36 @@ async def chat_endpoint(
104
  session_id = request_data.session_id
105
 
106
  conversation_history = conv_store.get_recent(session_id)
 
107
  criticism = _is_criticism(request_data.message)
108
  if criticism and conversation_history:
109
  conv_store.mark_last_negative(session_id)
110
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
111
  initial_state: PipelineState = { # type: ignore[assignment]
112
  "query": request_data.message,
113
  "session_id": request_data.session_id,
@@ -131,6 +184,16 @@ async def chat_endpoint(
131
  "follow_ups": [],
132
  "path": None,
133
  "query_topic": None,
 
 
 
 
 
 
 
 
 
 
134
  }
135
 
136
  async def sse_generator():
@@ -194,6 +257,24 @@ async def chat_endpoint(
194
  if follow_ups:
195
  yield f"event: follow_ups\ndata: {json.dumps({'questions': follow_ups})}\n\n"
196
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
197
  except Exception as exc:
198
  yield f"data: {json.dumps({'error': str(exc) or 'Generation failed'})}\n\n"
199
 
 
1
+ import asyncio
2
  import json
3
  import time
4
  from fastapi import APIRouter, Request, Depends
 
77
  return []
78
 
79
 
80
+ async def _update_summary_async(
81
+ conv_store,
82
+ gemini_client,
83
+ session_id: str,
84
+ previous_summary: str | None,
85
+ query: str,
86
+ answer: str,
87
+ processing_api_key: str | None,
88
+ ) -> None:
89
+ """
90
+ Triggered post-response to update the rolling conversation summary.
91
+ Failures are silently swallowed β€” summary is best-effort context, not critical.
92
+ """
93
+ try:
94
+ new_summary = await gemini_client.update_conversation_summary(
95
+ previous_summary=previous_summary or "",
96
+ new_turn_q=query,
97
+ new_turn_a=answer[:600], # cap answer chars sent to Gemini
98
+ processing_api_key=processing_api_key,
99
+ )
100
+ if new_summary:
101
+ conv_store.set_summary(session_id, new_summary)
102
+ except Exception:
103
+ pass
104
+
105
+
106
  @router.post("")
107
  @chat_rate_limit()
108
  async def chat_endpoint(
 
131
  session_id = request_data.session_id
132
 
133
  conversation_history = conv_store.get_recent(session_id)
134
+ conversation_summary = conv_store.get_summary(session_id)
135
  criticism = _is_criticism(request_data.message)
136
  if criticism and conversation_history:
137
  conv_store.mark_last_negative(session_id)
138
 
139
+ # Stage 2: decontextualize the query concurrently with Guard when we have a
140
+ # rolling summary. Reference-heavy queries like "tell me more about that project"
141
+ # embed poorly; a self-contained rewrite fixes retrieval without added latency
142
+ # because Gemini Flash runs while Guard is classifying the query.
143
+ gemini_client = getattr(request.app.state, "gemini_client", None)
144
+ decontextualized_query: str | None = None
145
+ decontext_task: asyncio.Task | None = None
146
+ if conversation_summary and gemini_client and gemini_client.is_configured:
147
+ decontext_task = asyncio.create_task(
148
+ gemini_client.decontextualize_query(request_data.message, conversation_summary)
149
+ )
150
+
151
+ # Await decontextualization result before the pipeline begins (retrieve node
152
+ # will use it if present; Guard runs first so the latency is masked).
153
+ if decontext_task is not None:
154
+ try:
155
+ result = await asyncio.wait_for(decontext_task, timeout=3.0)
156
+ # Only use the rewritten form when it differs non-trivially from the raw
157
+ # query to avoid polluting retrieval with identical-but-slightly-rephrased
158
+ # versions that waste embedding budget.
159
+ if result and result.strip().lower() != request_data.message.strip().lower():
160
+ decontextualized_query = result.strip()
161
+ except Exception:
162
+ pass # Decontextualization is best-effort; fall back to raw query.
163
+
164
  initial_state: PipelineState = { # type: ignore[assignment]
165
  "query": request_data.message,
166
  "session_id": request_data.session_id,
 
184
  "follow_ups": [],
185
  "path": None,
186
  "query_topic": None,
187
+ # Stage 1: follow-up bypass for Gemini fast-path
188
+ "is_followup": request_data.is_followup,
189
+ # Stage 2: progressive history summarisation
190
+ "conversation_summary": conversation_summary or None,
191
+ "decontextualized_query": decontextualized_query,
192
+ # Stage 3: SELF-RAG critic scores (populated by generate node)
193
+ "critic_groundedness": None,
194
+ "critic_completeness": None,
195
+ "critic_specificity": None,
196
+ "critic_quality": None,
197
  }
198
 
199
  async def sse_generator():
 
257
  if follow_ups:
258
  yield f"event: follow_ups\ndata: {json.dumps({'questions': follow_ups})}\n\n"
259
 
260
+ # Stage 2: update rolling summary asynchronously β€” fired after the
261
+ # response is fully delivered so it adds zero latency to the turn.
262
+ if final_answer and gemini_client and gemini_client.is_configured:
263
+ processing_key = getattr(
264
+ request.app.state, "gemini_processing_api_key", None
265
+ )
266
+ asyncio.create_task(
267
+ _update_summary_async(
268
+ conv_store=conv_store,
269
+ gemini_client=gemini_client,
270
+ session_id=session_id,
271
+ previous_summary=conversation_summary,
272
+ query=request_data.message,
273
+ answer=final_answer,
274
+ processing_api_key=processing_key,
275
+ )
276
+ )
277
+
278
  except Exception as exc:
279
  yield f"data: {json.dumps({'error': str(exc) or 'Generation failed'})}\n\n"
280
 
app/models/chat.py CHANGED
@@ -17,6 +17,10 @@ class ChatRequest(BaseModel):
17
  max_length=64,
18
  pattern=r"^[a-zA-Z0-9_-]+$",
19
  )
 
 
 
 
20
 
21
 
22
  class ChatResponse(BaseModel):
 
17
  max_length=64,
18
  pattern=r"^[a-zA-Z0-9_-]+$",
19
  )
20
+ # True when the query was submitted via a follow-up pill button.
21
+ # Bypasses the Gemini fast-path unconditionally so pill follow-ups
22
+ # always produce cited, chunk-grounded answers rather than TOON summaries.
23
+ is_followup: bool = False
24
 
25
 
26
  class ChatResponse(BaseModel):
app/models/pipeline.py CHANGED
@@ -1,17 +1,34 @@
1
  import operator
2
- from typing import Annotated, Literal, Optional, TypedDict
3
 
4
  from app.models.chat import SourceRef
5
 
6
 
7
- class ChunkMetadata(TypedDict):
 
 
 
 
 
 
 
 
8
  doc_id: str
9
  source_title: str
10
  source_url: str
11
  section: str
12
- source_type: Literal["blog", "project", "github", "bio", "cv"]
13
  date: str
14
  tags: list[str]
 
 
 
 
 
 
 
 
 
15
 
16
 
17
  class Chunk(TypedDict):
@@ -34,33 +51,34 @@ class PipelineState(TypedDict):
34
  guard_passed: bool
35
  thinking: bool # True while Gemini has signalled RAG is needed
36
  # Last N Q/A pairs for this session β€” injected into prompts for follow-up context.
37
- # List of {"q": str, "a": str} dicts, oldest first, answers truncated to 120 chars.
38
  conversation_history: list
 
 
 
 
 
 
 
39
  # True when the current query explicitly criticises the previous answer.
40
- # Triggers automatic negative feedback on the prior interaction and forces
41
- # Gemini editorial reformat regardless of the low-trust heuristic score.
42
  is_criticism: bool
 
 
 
43
  latency_ms: int
44
  error: Optional[str]
45
  interaction_id: Optional[int]
46
  # CRAG: counts retrieve node invocations; 2 = one retry was attempted.
47
- # Starts at 0 in initial state; retrieve increments it each call.
48
  retrieval_attempts: int
49
- # Set by the rewrite_query node when CRAG triggers; None otherwise.
50
  rewritten_query: Optional[str]
51
- # Top cross-encoder score from the last retrieve call.
52
- # Used by route_retrieve_result to trigger a CRAG rewrite on low-confidence
53
- # retrieval (non-empty but weak matches) in addition to the empty-chunk case.
54
  top_rerank_score: Optional[float]
55
- # Follow-up question suggestions generated after the main answer.
56
- # 3 short questions specific to content in the answer.
57
  follow_ups: list[str]
58
  # Which pipeline branch produced the final answer.
59
- # Values: "cache_hit", "gemini_fast", "rag", "blocked".
60
- # Set by cache, gemini_fast, and generate nodes respectively.
61
- # data_prep.py filters to path=="rag" when building reranker triplets because
62
- # only RAG interactions have chunk associations that form valid training pairs.
63
  path: Optional[str]
64
- # 1–3 word topic extracted from the query by the guard node (extract_topic).
65
- # Stored in state so retrieve_node can reuse it without recomputing.
66
  query_topic: Optional[str]
 
 
 
 
 
 
1
  import operator
2
+ from typing import Annotated, Optional, TypedDict
3
 
4
  from app.models.chat import SourceRef
5
 
6
 
7
+ class ChunkMetadata(TypedDict, total=False):
8
+ """
9
+ Per-chunk payload stored in Qdrant.
10
+
11
+ All fields have total=False so new optional fields (raptor_level, parent_id,
12
+ linked_chunks) can coexist with existing points that don't have them.
13
+ Required fields (doc_id, source_title, source_url, section, source_type) are
14
+ always present in practice; callers should .get() with a default for safety.
15
+ """
16
  doc_id: str
17
  source_title: str
18
  source_url: str
19
  section: str
20
+ source_type: str # "blog" | "project" | "github" | "bio" | "cv"
21
  date: str
22
  tags: list[str]
23
+ # RAPTOR hierarchical indexing (Stage 4).
24
+ # 0 = leaf chunk (original content), 1 = cluster summary, 2 = document summary.
25
+ # Absent on pre-RAPTOR points β€” treat as 0.
26
+ raptor_level: int
27
+ # Qdrant point ID of the parent RAPTOR summary node. None for top-level nodes.
28
+ parent_id: str
29
+ # Stage 5: fingerprints (doc_id::section) of semantically linked chunks within
30
+ # the same RAPTOR cluster (cosine similarity > 0.85). Used for context expansion.
31
+ linked_chunks: list[str]
32
 
33
 
34
  class Chunk(TypedDict):
 
51
  guard_passed: bool
52
  thinking: bool # True while Gemini has signalled RAG is needed
53
  # Last N Q/A pairs for this session β€” injected into prompts for follow-up context.
 
54
  conversation_history: list
55
+ # Stage 2: rolling conversation summary (single paragraph, ≀150 tokens).
56
+ # Injected into generate/gemini_fast instead of raw turn list when present.
57
+ conversation_summary: Optional[str]
58
+ # Stage 2: self-contained query rewritten before retrieval when the original
59
+ # contains unresolved pronouns/references. Used for embedding; original query
60
+ # is used for display and system prompt.
61
+ decontextualized_query: Optional[str]
62
  # True when the current query explicitly criticises the previous answer.
 
 
63
  is_criticism: bool
64
+ # Stage 1: True when submitted via a follow-up pill button.
65
+ # Bypasses Gemini fast-path so pill follow-ups always produce cited RAG answers.
66
+ is_followup: bool
67
  latency_ms: int
68
  error: Optional[str]
69
  interaction_id: Optional[int]
70
  # CRAG: counts retrieve node invocations; 2 = one retry was attempted.
 
71
  retrieval_attempts: int
 
72
  rewritten_query: Optional[str]
73
+ # Top cross-encoder score from the last retrieve call. Used by CRAG routing.
 
 
74
  top_rerank_score: Optional[float]
 
 
75
  follow_ups: list[str]
76
  # Which pipeline branch produced the final answer.
 
 
 
 
77
  path: Optional[str]
78
+ # 1–3 word topic extracted from the query by the guard node.
 
79
  query_topic: Optional[str]
80
+ # Stage 3: SELF-RAG critic scores (1–3 each). Logged to SQLite for training.
81
+ critic_groundedness: Optional[int] # all claims supported by a specific chunk
82
+ critic_completeness: Optional[int] # answer uses all relevant available chunks
83
+ critic_specificity: Optional[int] # answer contains specific names/numbers
84
+ critic_quality: Optional[str] # "high" | "medium" | "low"
app/pipeline/nodes/gemini_fast.py CHANGED
@@ -32,13 +32,15 @@ from app.core.quality import is_low_trust
32
  logger = logging.getLogger(__name__)
33
 
34
  # Words that reliably indicate the visitor wants a deep, cited answer.
35
- # Kept intentionally small: false negatives route to Gemini first, then RAG
36
- # on a tool call. False positives here add one Gemini RTT unnecessarily.
37
  _COMPLEX_SIGNALS: frozenset[str] = frozenset({
38
  "how", "why", "explain", "implement", "architecture", "deep",
39
  "detail", "technical", "compare", "difference", "algorithm",
40
  "code", "example", "breakdown", "analysis", "source", "cite",
41
  "reference", "proof", "derive", "calculate", "optimise", "optimize",
 
 
 
 
42
  })
43
 
44
  # Minimum token count for a query to be classified as complex.
@@ -76,6 +78,21 @@ def make_gemini_fast_node(gemini_client: GeminiClient) -> Any:
76
  writer({"type": "status", "label": "Thinking about your question directly..."})
77
 
78
  query = state["query"]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79
  complexity = "complex" if _is_complex(query) else "simple"
80
 
81
  # When Gemini is not configured (GEMINI_API_KEY not set), route all
 
32
  logger = logging.getLogger(__name__)
33
 
34
  # Words that reliably indicate the visitor wants a deep, cited answer.
 
 
35
  _COMPLEX_SIGNALS: frozenset[str] = frozenset({
36
  "how", "why", "explain", "implement", "architecture", "deep",
37
  "detail", "technical", "compare", "difference", "algorithm",
38
  "code", "example", "breakdown", "analysis", "source", "cite",
39
  "reference", "proof", "derive", "calculate", "optimise", "optimize",
40
+ # Follow-up depth signals β€” these phrases appear in pill-generated questions
41
+ # and always indicate the user wants a cited, retrieved answer not a summary.
42
+ "tell me more", "more detail", "more about", "what about",
43
+ "explain that", "go deeper", "expand", "elaborate", "dig into",
44
  })
45
 
46
  # Minimum token count for a query to be classified as complex.
 
78
  writer({"type": "status", "label": "Thinking about your question directly..."})
79
 
80
  query = state["query"]
81
+
82
+ # Stage 1: Follow-up pill submissions bypass Gemini entirely.
83
+ # is_followup=True means the query came from a pill button β€” it is a
84
+ # reference-heavy string like "What technologies did he use for that?" that
85
+ # will produce a TOON summary answer from Gemini. Always route to RAG so
86
+ # the response is cited and chunk-grounded.
87
+ if state.get("is_followup", False):
88
+ logger.debug("is_followup=True β€” forcing RAG, skipping Gemini fast-path.")
89
+ writer({"type": "status", "label": "Needs deep retrieval, checking portfolio..."})
90
+ return {
91
+ "query_complexity": "complex", # force 70B for quality
92
+ "expanded_queries": [query],
93
+ "thinking": False,
94
+ }
95
+
96
  complexity = "complex" if _is_complex(query) else "simple"
97
 
98
  # When Gemini is not configured (GEMINI_API_KEY not set), route all
app/pipeline/nodes/generate.py CHANGED
@@ -96,13 +96,22 @@ No apologies, no padding, vary your phrasing.
96
  """.format(topics=_TOPIC_SUGGESTIONS)
97
 
98
 
99
- def _format_history(history: list[dict]) -> str:
100
  """
101
- Render prior turns as a compact prefix block.
102
- Each turn is one line: "[Tn] Q: ... | A: ..."
103
- Returns empty string when there is no history (first message in session).
104
- Token cost: ~20-35 tokens per turn; max 3 turns β†’ <110 tokens overhead.
 
 
 
 
105
  """
 
 
 
 
 
106
  if not history:
107
  return ""
108
  lines = [
@@ -129,7 +138,7 @@ def make_generate_node(llm_client: LLMClient, gemini_client=None) -> Callable[[P
129
  # ── Not-found path ─────────────────────────────────────────────────
130
  if not reranked_chunks:
131
  writer({"type": "status", "label": "Could not find specific information, responding carefully..."})
132
- history_prefix = _format_history(state.get("conversation_history") or [])
133
  stream = llm_client.complete_with_complexity(
134
  prompt=f"{history_prefix}Visitor question: {query}",
135
  system=_NOT_FOUND_SYSTEM,
@@ -167,7 +176,7 @@ def make_generate_node(llm_client: LLMClient, gemini_client=None) -> Callable[[P
167
 
168
  context_block = "\n\n".join(context_parts)
169
 
170
- history_prefix = _format_history(state.get("conversation_history") or [])
171
  is_criticism = state.get("is_criticism", False)
172
  criticism_note = (
173
  "NOTE: The visitor says the previous answer was wrong. "
@@ -265,10 +274,46 @@ def make_generate_node(llm_client: LLMClient, gemini_client=None) -> Callable[[P
265
  cited_indices = {int(m) for m in re.findall(r"\[(\d+)\]", full_answer)}
266
  cited_sources = [sr for i, sr in enumerate(source_refs, start=1) if i in cited_indices]
267
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
268
  return {
269
  "answer": full_answer,
270
  "sources": cited_sources if cited_sources else source_refs[:2],
271
  "path": "rag",
 
272
  }
273
 
274
  return generate_node
 
96
  """.format(topics=_TOPIC_SUGGESTIONS)
97
 
98
 
99
+ def _format_history(state: "PipelineState") -> str:
100
  """
101
+ Render conversation context as a compact prefix block.
102
+
103
+ Stage 2 β€” progressive history summarisation:
104
+ If a rolling `conversation_summary` is present in state, inject that
105
+ single paragraph instead of the raw 3-turn transcript. The summary is
106
+ ~150 tokens; the raw transcript costs 20-35 tokens per turn but degrades
107
+ at turn 4+ due to pronoun ambiguity and stale context. We keep the raw
108
+ turns as fallback when Gemini hasn't produced a summary yet.
109
  """
110
+ summary = state.get("conversation_summary")
111
+ if summary:
112
+ return f"Running conversation context:\n{summary}\n\n"
113
+
114
+ history = state.get("conversation_history") or []
115
  if not history:
116
  return ""
117
  lines = [
 
138
  # ── Not-found path ─────────────────────────────────────────────────
139
  if not reranked_chunks:
140
  writer({"type": "status", "label": "Could not find specific information, responding carefully..."})
141
+ history_prefix = _format_history(state)
142
  stream = llm_client.complete_with_complexity(
143
  prompt=f"{history_prefix}Visitor question: {query}",
144
  system=_NOT_FOUND_SYSTEM,
 
176
 
177
  context_block = "\n\n".join(context_parts)
178
 
179
+ history_prefix = _format_history(state)
180
  is_criticism = state.get("is_criticism", False)
181
  criticism_note = (
182
  "NOTE: The visitor says the previous answer was wrong. "
 
274
  cited_indices = {int(m) for m in re.findall(r"\[(\d+)\]", full_answer)}
275
  cited_sources = [sr for i, sr in enumerate(source_refs, start=1) if i in cited_indices]
276
 
277
+ # ── Stage 3: SELF-RAG critic ──────────────────────────────────────────
278
+ # Runs after answer is fully streamed β€” zero latency impact on first token.
279
+ # Scores groundedness (stays in passages), completeness (covers the query),
280
+ # and specificity (concrete names/numbers vs vague language) on 1-3 each.
281
+ # Scores are stored in state for log_eval and downstream quality analysis.
282
+ critic_scores: dict[str, int | None] = {
283
+ "critic_groundedness": None,
284
+ "critic_completeness": None,
285
+ "critic_specificity": None,
286
+ "critic_quality": None,
287
+ }
288
+ if gemini_client is not None and full_answer and reranked_chunks:
289
+ try:
290
+ scores = await gemini_client.critique_rag_answer(
291
+ query=query,
292
+ context_block=context_block,
293
+ answer=full_answer,
294
+ decontextualized_query=state.get("decontextualized_query"),
295
+ )
296
+ g = scores.get("groundedness", 3)
297
+ c = scores.get("completeness", 3)
298
+ s = scores.get("specificity", 3)
299
+ # Composite quality label for quick log filtering:
300
+ # 'high' if average >= 2.5, 'medium' if >= 1.5, else 'low'
301
+ avg = (g + c + s) / 3.0
302
+ quality = "high" if avg >= 2.5 else ("medium" if avg >= 1.5 else "low")
303
+ critic_scores = {
304
+ "critic_groundedness": g,
305
+ "critic_completeness": c,
306
+ "critic_specificity": s,
307
+ "critic_quality": quality,
308
+ }
309
+ except Exception as exc:
310
+ logger.debug("SELF-RAG critic failed (non-critical): %s", exc)
311
+
312
  return {
313
  "answer": full_answer,
314
  "sources": cited_sources if cited_sources else source_refs[:2],
315
  "path": "rag",
316
+ **critic_scores,
317
  }
318
 
319
  return generate_node
app/pipeline/nodes/log_eval.py CHANGED
@@ -61,7 +61,11 @@ def make_log_eval_node(db_path: str, github_log=None) -> Callable[[PipelineState
61
  latency_ms INTEGER,
62
  cached BOOLEAN,
63
  feedback INTEGER DEFAULT 0,
64
- path TEXT DEFAULT 'rag'
 
 
 
 
65
  )
66
  """
67
  )
@@ -72,6 +76,11 @@ def make_log_eval_node(db_path: str, github_log=None) -> Callable[[PipelineState
72
  ("session_id", "TEXT DEFAULT ''"),
73
  # path column: old rows default to "rag" β€” they were all RAG interactions.
74
  ("path", "TEXT DEFAULT 'rag'"),
 
 
 
 
 
75
  ]:
76
  try:
77
  conn.execute(f"ALTER TABLE interactions ADD COLUMN {col} {definition}")
@@ -81,8 +90,10 @@ def make_log_eval_node(db_path: str, github_log=None) -> Callable[[PipelineState
81
  cursor = conn.execute(
82
  """
83
  INSERT INTO interactions
84
- (timestamp, session_id, query, answer, chunks_used, rerank_scores, reranked_chunks_json, latency_ms, cached, path)
85
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
 
 
86
  """,
87
  (
88
  datetime.now(tz=timezone.utc).isoformat(),
@@ -95,6 +106,10 @@ def make_log_eval_node(db_path: str, github_log=None) -> Callable[[PipelineState
95
  state.get("latency_ms", 0),
96
  state.get("cached", False),
97
  path,
 
 
 
 
98
  ),
99
  )
100
  return cursor.lastrowid # type: ignore[return-value]
@@ -126,6 +141,10 @@ def make_log_eval_node(db_path: str, github_log=None) -> Callable[[PipelineState
126
  "cached": state.get("cached", False),
127
  "feedback": 0,
128
  "path": path,
 
 
 
 
129
  }
130
  github_log.append(record)
131
 
 
61
  latency_ms INTEGER,
62
  cached BOOLEAN,
63
  feedback INTEGER DEFAULT 0,
64
+ path TEXT DEFAULT 'rag',
65
+ critic_groundedness INTEGER,
66
+ critic_completeness INTEGER,
67
+ critic_specificity INTEGER,
68
+ critic_quality TEXT
69
  )
70
  """
71
  )
 
76
  ("session_id", "TEXT DEFAULT ''"),
77
  # path column: old rows default to "rag" β€” they were all RAG interactions.
78
  ("path", "TEXT DEFAULT 'rag'"),
79
+ # Stage 3 SELF-RAG critic scores
80
+ ("critic_groundedness", "INTEGER"),
81
+ ("critic_completeness", "INTEGER"),
82
+ ("critic_specificity", "INTEGER"),
83
+ ("critic_quality", "TEXT"),
84
  ]:
85
  try:
86
  conn.execute(f"ALTER TABLE interactions ADD COLUMN {col} {definition}")
 
90
  cursor = conn.execute(
91
  """
92
  INSERT INTO interactions
93
+ (timestamp, session_id, query, answer, chunks_used, rerank_scores,
94
+ reranked_chunks_json, latency_ms, cached, path,
95
+ critic_groundedness, critic_completeness, critic_specificity, critic_quality)
96
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
97
  """,
98
  (
99
  datetime.now(tz=timezone.utc).isoformat(),
 
106
  state.get("latency_ms", 0),
107
  state.get("cached", False),
108
  path,
109
+ state.get("critic_groundedness"),
110
+ state.get("critic_completeness"),
111
+ state.get("critic_specificity"),
112
+ state.get("critic_quality"),
113
  ),
114
  )
115
  return cursor.lastrowid # type: ignore[return-value]
 
141
  "cached": state.get("cached", False),
142
  "feedback": 0,
143
  "path": path,
144
+ "critic_groundedness": state.get("critic_groundedness"),
145
+ "critic_completeness": state.get("critic_completeness"),
146
+ "critic_specificity": state.get("critic_specificity"),
147
+ "critic_quality": state.get("critic_quality"),
148
  }
149
  github_log.append(record)
150
 
app/pipeline/nodes/retrieve.py CHANGED
@@ -1,8 +1,11 @@
1
  import asyncio
 
2
  from typing import Callable
3
 
4
  from langgraph.config import get_stream_writer
5
 
 
 
6
  from app.models.pipeline import PipelineState, Chunk
7
  from app.services.vector_store import VectorStore
8
  from app.services.embedder import Embedder
@@ -39,6 +42,20 @@ _SIBLING_EXPAND_TOP_N: int = 5 # expand from the top-N RRF-ranked unique chunk
39
  _SIBLING_FETCH_LIMIT: int = 5 # fetch up to N siblings per document
40
  _SIBLING_TOTAL_CAP: int = 8 # max additional chunks added via sibling expansion
41
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
42
  # Keywords that imply the visitor wants depth from a specific source type.
43
  # Values are the source_type values set by ingest (ChunkMetadata.source_type).
44
  _FOCUS_KEYWORDS: dict[frozenset[str], str] = {
@@ -118,6 +135,11 @@ def make_retrieve_node(
118
 
119
  attempts = state.get("retrieval_attempts", 0)
120
  query = state["query"]
 
 
 
 
 
121
 
122
  # Reuse the topic computed by the guard node β€” no recomputation needed.
123
  topic = state.get("query_topic") or ""
@@ -136,9 +158,12 @@ def make_retrieve_node(
136
  # Second attempt: re-embed the rewritten query with is_query=True.
137
  cached_embedding = None
138
 
139
- expanded = [query] # gemini_fast may fill expanded_queries on first attempt
140
  if attempts == 0:
141
- expanded = state.get("expanded_queries", [query])
 
 
 
142
 
143
  # Embed all query variants in one batched call (is_query=True for asymmetric BGE).
144
  if cached_embedding is not None and len(expanded) == 1:
@@ -152,12 +177,36 @@ def make_retrieve_node(
152
  chunks = vector_store.search(query_vector=vec, top_k=20)
153
  dense_results.append(chunks)
154
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
155
  # ── Sparse (BM25) search (primary query only) ──────────────────────��──────
156
  # Runs concurrently with dense search isn't possible here since dense
157
  # is synchronous Qdrant calls, but we parallelise encode + sparse search.
158
  sparse_results: list[Chunk] = []
159
  if _sparse_encoder.available:
160
- indices, values = _sparse_encoder.encode_one(query)
161
  sparse_results = vector_store.search_sparse(indices, values, top_k=20)
162
 
163
  # ── Reciprocal Rank Fusion ─────────────────────────────────────────────
@@ -222,7 +271,38 @@ def make_retrieve_node(
222
  if sibling_count >= _SIBLING_TOTAL_CAP:
223
  break
224
 
225
- reranked = await reranker.rerank(query, unique_chunks, top_k=7)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
226
 
227
  # ── Relevance gate ─────────────────────────────────────────────────────
228
  top_score = reranked[0]["metadata"].get("rerank_score", 0.0) if reranked else None
 
1
  import asyncio
2
+ import logging
3
  from typing import Callable
4
 
5
  from langgraph.config import get_stream_writer
6
 
7
+ logger = logging.getLogger(__name__)
8
+
9
  from app.models.pipeline import PipelineState, Chunk
10
  from app.services.vector_store import VectorStore
11
  from app.services.embedder import Embedder
 
42
  _SIBLING_FETCH_LIMIT: int = 5 # fetch up to N siblings per document
43
  _SIBLING_TOTAL_CAP: int = 8 # max additional chunks added via sibling expansion
44
 
45
+ # RAPTOR (Stage 4): top-N cluster summary hits to expand into child leaf chunks.
46
+ # When a level-1 cluster node appears in the top-3 results, its linked leaf
47
+ # children are fetched from Qdrant and added to the candidate pool before reranking.
48
+ _RAPTOR_CLUSTER_TOP_K: int = 5 # how many cluster nodes to search for
49
+ _RAPTOR_EXPAND_TOP_N: int = 3 # expand children for top-N cluster hits only
50
+ _RAPTOR_CHILD_FETCH_LIMIT: int = 6 # max leaf children fetched per cluster hit
51
+
52
+ # Stage 5 linked_chunks expansion cap. After reranking, each top chunk may link
53
+ # to near-duplicate passages (same skill in resume vs. project README). We expand
54
+ # up to this many additional candidates before the context cap is applied.
55
+ _LINKED_CHUNKS_EXPAND_TOP_N: int = 5 # expand links from top-N reranked chunks
56
+ _LINKED_CHUNKS_PER_CHUNK: int = 2 # max linked neighbours per source chunk
57
+ _LINKED_CHUNKS_TOTAL_CAP: int = 4 # total linked-chunk additions across all seeds
58
+
59
  # Keywords that imply the visitor wants depth from a specific source type.
60
  # Values are the source_type values set by ingest (ChunkMetadata.source_type).
61
  _FOCUS_KEYWORDS: dict[frozenset[str], str] = {
 
135
 
136
  attempts = state.get("retrieval_attempts", 0)
137
  query = state["query"]
138
+ # Stage 2: use the self-contained decontextualized rewrite for embedding
139
+ # when one was produced. "Tell me more about that ML project" has terrible
140
+ # cosine similarity against "PersonaBot RAG pipeline" passages; the rewrite
141
+ # "What ML projects has Darshan built?" dramatically improves recall.
142
+ retrieval_query = state.get("decontextualized_query") or query
143
 
144
  # Reuse the topic computed by the guard node β€” no recomputation needed.
145
  topic = state.get("query_topic") or ""
 
158
  # Second attempt: re-embed the rewritten query with is_query=True.
159
  cached_embedding = None
160
 
161
+ expanded = [retrieval_query] # gemini_fast may fill expanded_queries on first attempt
162
  if attempts == 0:
163
+ expanded = state.get("expanded_queries") or [retrieval_query]
164
+ # Ensure decontextualized form is the primary search query if present.
165
+ if retrieval_query != query and retrieval_query not in expanded:
166
+ expanded = [retrieval_query] + expanded
167
 
168
  # Embed all query variants in one batched call (is_query=True for asymmetric BGE).
169
  if cached_embedding is not None and len(expanded) == 1:
 
177
  chunks = vector_store.search(query_vector=vec, top_k=20)
178
  dense_results.append(chunks)
179
 
180
+ # ── Stage 4: RAPTOR cluster search (parallel to dense leaf search) ──────────
181
+ # Query the level-1 RAPTOR cluster nodes with the primary query vector.
182
+ # If a cluster node scores in the top-_RAPTOR_EXPAND_TOP_N results, we
183
+ # fetch its child leaf chunks (via linked_chunks payload) and add them to
184
+ # the candidate pool before RRF fusion. This gives the retriever a
185
+ # "zoomed-out" structural view that pure cosine over leaves misses.
186
+ primary_vec = query_vectors[0]
187
+ raptor_cluster_hits = vector_store.search_by_raptor_level(
188
+ query_vector=primary_vec, level=1, top_k=_RAPTOR_CLUSTER_TOP_K
189
+ )
190
+ raptor_leaf_expansions: list[Chunk] = []
191
+ for cluster_chunk in raptor_cluster_hits[:_RAPTOR_EXPAND_TOP_N]:
192
+ # linked_chunks stores "doc_id::section" fingerprints of child leaves.
193
+ linked_fps: list[str] = cluster_chunk["metadata"].get("linked_chunks") or []
194
+ for fp in linked_fps[:_RAPTOR_CHILD_FETCH_LIMIT]:
195
+ if "::" not in fp:
196
+ continue
197
+ child_doc_id, _ = fp.split("::", 1)
198
+ siblings = vector_store.fetch_by_doc_id(child_doc_id, limit=3)
199
+ raptor_leaf_expansions.extend(siblings)
200
+ if raptor_leaf_expansions:
201
+ logger.debug("RAPTOR: added %d child leaf candidates.", len(raptor_leaf_expansions))
202
+ dense_results.append(raptor_leaf_expansions)
203
+
204
  # ── Sparse (BM25) search (primary query only) ──────────────────────��──────
205
  # Runs concurrently with dense search isn't possible here since dense
206
  # is synchronous Qdrant calls, but we parallelise encode + sparse search.
207
  sparse_results: list[Chunk] = []
208
  if _sparse_encoder.available:
209
+ indices, values = _sparse_encoder.encode_one(retrieval_query)
210
  sparse_results = vector_store.search_sparse(indices, values, top_k=20)
211
 
212
  # ── Reciprocal Rank Fusion ─────────────────────────────────────────────
 
271
  if sibling_count >= _SIBLING_TOTAL_CAP:
272
  break
273
 
274
+ reranked = await reranker.rerank(retrieval_query, unique_chunks, top_k=7)
275
+
276
+ # ── Stage 5: linked_chunks expansion ─────────────────────────────────────
277
+ # After reranking, inspect the top-N chunks for linked_chunks edges set by
278
+ # RaptorBuilder. These link near-duplicate passages from different source
279
+ # documents (e.g. the same skill mentioned in CV and a project README).
280
+ # Expanding them ensures the LLM can cross-cite both authoritative sources.
281
+ if reranked:
282
+ linked_fps_seen: set[str] = {
283
+ f"{c['metadata']['doc_id']}::{c['metadata']['section']}" for c in reranked
284
+ }
285
+ linked_added = 0
286
+ for seed in reranked[:_LINKED_CHUNKS_EXPAND_TOP_N]:
287
+ if linked_added >= _LINKED_CHUNKS_TOTAL_CAP:
288
+ break
289
+ linked_fps: list[str] = seed["metadata"].get("linked_chunks") or []
290
+ for fp in linked_fps[:_LINKED_CHUNKS_PER_CHUNK]:
291
+ if fp in linked_fps_seen or "::" not in fp:
292
+ continue
293
+ linked_fps_seen.add(fp)
294
+ child_doc_id, _ = fp.split("::", 1)
295
+ siblings = vector_store.fetch_by_doc_id(child_doc_id, limit=2)
296
+ for sib in siblings:
297
+ sib_fp = f"{sib['metadata']['doc_id']}::{sib['metadata']['section']}"
298
+ if sib_fp not in linked_fps_seen:
299
+ linked_fps_seen.add(sib_fp)
300
+ reranked.append(sib)
301
+ linked_added += 1
302
+ if linked_added >= _LINKED_CHUNKS_TOTAL_CAP:
303
+ break
304
+ if linked_added >= _LINKED_CHUNKS_TOTAL_CAP:
305
+ break
306
 
307
  # ── Relevance gate ─────────────────────────────────────────────────────
308
  top_score = reranked[0]["metadata"].get("rerank_score", 0.0) if reranked else None
app/services/conversation_store.py CHANGED
@@ -1,22 +1,17 @@
1
  """
2
  backend/app/services/conversation_store.py
3
 
4
- SQLite-backed per-session conversation history.
5
-
6
- Reads the last N completed turns for a session from the existing `interactions`
7
- table so the LLM has conversational context without a separate store.
8
- Answers are truncated to 120 chars before injection β€” enough context for
9
- referential follow-ups ("tell me more", "what else?", "that's wrong") without
10
- wasting significant token budget on verbatim prior answers.
11
-
12
- All reads/writes are synchronous sqlite3 (<3ms on SSD) β€” acceptable because:
13
- 1. The call happens once at request start, outside the model call path.
14
- 2. SQLite WAL mode allows concurrent readers and one writer without blocking.
15
-
16
- Issue 1: mark_last_negative() now also fires github_log.append_feedback() so
17
- negative labels persist across HF Space restarts. Without this, negative
18
- examples accumulated during a session are lost on the next restart, and
19
- data_prep.py cannot produce accurate hard-negative training triplets.
20
  """
21
  from __future__ import annotations
22
 
@@ -27,32 +22,75 @@ from datetime import datetime, timezone
27
 
28
  logger = logging.getLogger(__name__)
29
 
30
- # Visible answer length per turn injected into context.
31
- # 120 chars β‰ˆ 25 tokens β€” plenty to resolve pronouns and follow-up references.
32
  _ANSWER_PREVIEW_LEN = 120
33
-
34
- # Default number of prior turns to surface. Three covers the typical "yes,
35
- # but what about X?", "and Y?", "ok fix the previous answer" pattern.
36
  _DEFAULT_MAX_TURNS = 3
37
 
38
 
39
  class ConversationStore:
40
  """
41
- Thin read/write layer over the `interactions` SQLite table for session history.
42
  One instance is created at startup and shared across all requests via app.state.
43
  """
44
 
45
  def __init__(self, db_path: str, github_log=None) -> None:
46
  self._db_path = db_path
47
  self._github_log = github_log
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48
 
49
  def get_recent(self, session_id: str, max_turns: int = _DEFAULT_MAX_TURNS) -> list[dict]:
50
  """
51
  Return the last `max_turns` completed Q/A pairs for `session_id`,
52
- oldest first (so LLMs read them in chronological order).
53
-
54
- Returns an empty list if there is no history or the table doesn't exist yet.
55
- Each entry: {"q": str, "a": str} β€” `a` is truncated to _ANSWER_PREVIEW_LEN.
56
  """
57
  try:
58
  with sqlite3.connect(self._db_path) as conn:
@@ -66,30 +104,21 @@ class ConversationStore:
66
  (session_id, max_turns),
67
  ).fetchall()
68
  except sqlite3.OperationalError:
69
- # Table doesn't exist yet (first ever request) β€” not an error.
70
  return []
71
  except Exception as exc:
72
  logger.warning("ConversationStore.get_recent failed: %s", exc)
73
  return []
74
 
75
- # Reverse so oldest is first (chronological order for the LLM).
76
  turns = []
77
  for query, answer in reversed(rows):
78
  a_preview = answer[:_ANSWER_PREVIEW_LEN]
79
  if len(answer) > _ANSWER_PREVIEW_LEN:
80
- a_preview += "…"
81
  turns.append({"q": query, "a": a_preview})
82
  return turns
83
 
84
  def mark_last_negative(self, session_id: str) -> None:
85
- """
86
- Set feedback=-1 on the most recent interaction for `session_id` in SQLite,
87
- then durably record the correction in the GitHub JSONL log so the negative
88
- label survives a HF Space restart.
89
-
90
- data_prep.py reads {type:"feedback", feedback:-1} correction records from
91
- the durable log and applies them when building reranker training triplets.
92
- """
93
  try:
94
  with sqlite3.connect(self._db_path) as conn:
95
  conn.execute(
@@ -107,18 +136,13 @@ class ConversationStore:
107
  except Exception as exc:
108
  logger.warning("ConversationStore.mark_last_negative SQLite failed: %s", exc)
109
 
110
- # Durable correction record β€” survives Space restart; not in SQLite only.
111
  if self._github_log is not None:
112
  self._github_log.append_feedback(session_id, feedback=-1)
113
 
114
  def populate_from_records(self, records: list[dict]) -> None:
115
  """
116
  Replay interaction records from the durable GitHub log into SQLite.
117
- Called at startup when SQLite is empty after a Space restart so conversation
118
- history is available without requiring a full log replay on every request.
119
-
120
- Only inserts rows for path='rag'|'gemini_fast'|'cache_hit' interactions;
121
- skips feedback correction records (type='feedback') which are not interactions.
122
  """
123
  import os
124
  db_dir = os.path.dirname(self._db_path)
@@ -152,8 +176,6 @@ class ConversationStore:
152
  )
153
  """
154
  )
155
- # Apply feedback corrections: build a map session_id -> feedback
156
- # so they can be applied when inserting the matching interactions.
157
  feedback_corrections: dict[str, int] = {}
158
  for r in records:
159
  if r.get("type") == "feedback":
 
1
  """
2
  backend/app/services/conversation_store.py
3
 
4
+ SQLite-backed per-session conversation history with progressive summarisation.
5
+
6
+ Stage 2 additions:
7
+ - A `conversation_summaries` table stores one rolling summary paragraph per
8
+ session. After each completed turn, GeminiClient.update_conversation_summary()
9
+ is called asynchronously and the result is persisted here.
10
+ - get_recent() is unchanged (raw turns still available for the 3-turn fallback).
11
+ - get_summary() / set_summary() are thin wrappers on the new table.
12
+
13
+ The raw `interactions` table is still the source of truth for reranker training.
14
+ Summaries are only for live context injection and have no training significance.
 
 
 
 
 
15
  """
16
  from __future__ import annotations
17
 
 
22
 
23
  logger = logging.getLogger(__name__)
24
 
 
 
25
  _ANSWER_PREVIEW_LEN = 120
 
 
 
26
  _DEFAULT_MAX_TURNS = 3
27
 
28
 
29
  class ConversationStore:
30
  """
31
+ Thin read/write layer over SQLite for session history and rolling summaries.
32
  One instance is created at startup and shared across all requests via app.state.
33
  """
34
 
35
  def __init__(self, db_path: str, github_log=None) -> None:
36
  self._db_path = db_path
37
  self._github_log = github_log
38
+ self._ensure_summary_table()
39
+
40
+ def _ensure_summary_table(self) -> None:
41
+ """Create the conversation_summaries table idempotently at startup."""
42
+ import os
43
+ db_dir = os.path.dirname(self._db_path)
44
+ if db_dir:
45
+ os.makedirs(db_dir, exist_ok=True)
46
+ try:
47
+ with sqlite3.connect(self._db_path) as conn:
48
+ conn.execute(
49
+ """
50
+ CREATE TABLE IF NOT EXISTS conversation_summaries (
51
+ session_id TEXT PRIMARY KEY,
52
+ summary TEXT NOT NULL DEFAULT '',
53
+ updated_at TEXT NOT NULL
54
+ )
55
+ """
56
+ )
57
+ except Exception as exc:
58
+ logger.warning("Could not create conversation_summaries table: %s", exc)
59
+
60
+ def get_summary(self, session_id: str) -> str:
61
+ """Return the rolling summary for this session, or '' if none exists."""
62
+ try:
63
+ with sqlite3.connect(self._db_path) as conn:
64
+ row = conn.execute(
65
+ "SELECT summary FROM conversation_summaries WHERE session_id = ?",
66
+ (session_id,),
67
+ ).fetchone()
68
+ return row[0] if row else ""
69
+ except Exception as exc:
70
+ logger.warning("get_summary failed: %s", exc)
71
+ return ""
72
+
73
+ def set_summary(self, session_id: str, summary: str) -> None:
74
+ """Upsert the rolling summary for this session."""
75
+ try:
76
+ with sqlite3.connect(self._db_path) as conn:
77
+ conn.execute(
78
+ """
79
+ INSERT INTO conversation_summaries (session_id, summary, updated_at)
80
+ VALUES (?, ?, ?)
81
+ ON CONFLICT(session_id) DO UPDATE SET
82
+ summary = excluded.summary,
83
+ updated_at = excluded.updated_at
84
+ """,
85
+ (session_id, summary, datetime.now(tz=timezone.utc).isoformat()),
86
+ )
87
+ except Exception as exc:
88
+ logger.warning("set_summary failed: %s", exc)
89
 
90
  def get_recent(self, session_id: str, max_turns: int = _DEFAULT_MAX_TURNS) -> list[dict]:
91
  """
92
  Return the last `max_turns` completed Q/A pairs for `session_id`,
93
+ oldest first. Each entry: {"q": str, "a": str}.
 
 
 
94
  """
95
  try:
96
  with sqlite3.connect(self._db_path) as conn:
 
104
  (session_id, max_turns),
105
  ).fetchall()
106
  except sqlite3.OperationalError:
 
107
  return []
108
  except Exception as exc:
109
  logger.warning("ConversationStore.get_recent failed: %s", exc)
110
  return []
111
 
 
112
  turns = []
113
  for query, answer in reversed(rows):
114
  a_preview = answer[:_ANSWER_PREVIEW_LEN]
115
  if len(answer) > _ANSWER_PREVIEW_LEN:
116
+ a_preview += "\u2026"
117
  turns.append({"q": query, "a": a_preview})
118
  return turns
119
 
120
  def mark_last_negative(self, session_id: str) -> None:
121
+ """Set feedback=-1 on the most recent interaction for this session."""
 
 
 
 
 
 
 
122
  try:
123
  with sqlite3.connect(self._db_path) as conn:
124
  conn.execute(
 
136
  except Exception as exc:
137
  logger.warning("ConversationStore.mark_last_negative SQLite failed: %s", exc)
138
 
 
139
  if self._github_log is not None:
140
  self._github_log.append_feedback(session_id, feedback=-1)
141
 
142
  def populate_from_records(self, records: list[dict]) -> None:
143
  """
144
  Replay interaction records from the durable GitHub log into SQLite.
145
+ Called at startup when SQLite is empty after a Space restart.
 
 
 
 
146
  """
147
  import os
148
  db_dir = os.path.dirname(self._db_path)
 
176
  )
177
  """
178
  )
 
 
179
  feedback_corrections: dict[str, int] = {}
180
  for r in records:
181
  if r.get("type") == "feedback":
app/services/gemini_client.py CHANGED
@@ -145,6 +145,171 @@ class GeminiClient:
145
  logger.warning("Gemini reformat failed (%s); keeping Groq draft.", exc)
146
  return None
147
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
148
  @property
149
  def is_configured(self) -> bool:
150
  return self._client is not None
 
145
  logger.warning("Gemini reformat failed (%s); keeping Groq draft.", exc)
146
  return None
147
 
148
+ async def decontextualize_query(
149
+ self,
150
+ query: str,
151
+ summary: str,
152
+ ) -> str:
153
+ """
154
+ Rewrite a reference-heavy follow-up query into a self-contained question.
155
+
156
+ Called on the live request path (runs concurrently with Guard) when the
157
+ session has a rolling summary and the query contains pronouns/references.
158
+ Returns the rewritten query, or the original if Gemini is unavailable or
159
+ the call fails.
160
+
161
+ Example:
162
+ query: "What about his caching approach?"
163
+ summary: "Discussed Darshan's RAG system using Qdrant and semantic cache."
164
+ output: "What caching strategy does Darshan use in his RAG system?"
165
+ """
166
+ if not self._client:
167
+ return query
168
+
169
+ prompt = (
170
+ f"Conversation so far:\n{summary}\n\n"
171
+ f"Current question: {query}\n\n"
172
+ "Rewrite the current question as a fully self-contained question that "
173
+ "can be understood without any prior context. Replace all pronouns and "
174
+ "references ('it', 'that', 'this', 'the same', 'his', etc.) with the "
175
+ "specific subject they refer to. Output ONLY the rewritten question β€” "
176
+ "no explanation, no quotes, one sentence."
177
+ )
178
+ try:
179
+ from google.genai import types # noqa: PLC0415
180
+ response = await self._client.aio.models.generate_content( # type: ignore[attr-defined]
181
+ model=self._model,
182
+ contents=prompt,
183
+ config=types.GenerateContentConfig(temperature=0.1, max_output_tokens=80),
184
+ )
185
+ rewritten = (response.candidates[0].content.parts[0].text or "").strip().strip('"').strip("'")
186
+ if rewritten and rewritten != query:
187
+ logger.debug("Decontextualized %r β†’ %r", query[:50], rewritten[:60])
188
+ return rewritten
189
+ except Exception as exc:
190
+ logger.warning("decontextualize_query failed (%s); using original.", exc)
191
+ return query
192
+
193
+ async def update_conversation_summary(
194
+ self,
195
+ previous_summary: str,
196
+ new_turn_q: str,
197
+ new_turn_a: str,
198
+ processing_api_key: str = "",
199
+ ) -> str:
200
+ """
201
+ Progressive summary update β€” called AFTER the response is delivered
202
+ so it adds zero perceived latency.
203
+
204
+ Takes the previous rolling summary (initially empty) and one new Q/A turn
205
+ and asks Gemini Flash to produce an updated single-paragraph summary of
206
+ the entire conversation, capped at 150 tokens.
207
+
208
+ Uses the GEMINI_PROCESSING_API_KEY when provided so this offline step
209
+ does not consume live API quota. Falls back to the instance's own client
210
+ if no processing key is set.
211
+ """
212
+ if not self._client and not processing_api_key:
213
+ return previous_summary
214
+
215
+ prior_block = (
216
+ f"Previous summary:\n{previous_summary}\n\n" if previous_summary else ""
217
+ )
218
+ prompt = (
219
+ f"{prior_block}"
220
+ f"New turn:\nQ: {new_turn_q}\nA: {new_turn_a[:300]}\n\n"
221
+ "Write an updated summary of the whole conversation in ONE paragraph "
222
+ "of at most 150 tokens. Mention the topics discussed and key facts "
223
+ "established. Be specific (include names, project names, technologies). "
224
+ "Output ONLY the summary paragraph."
225
+ )
226
+ try:
227
+ from google.genai import types # noqa: PLC0415
228
+
229
+ # Use a separate client with the processing key when provided.
230
+ if processing_api_key:
231
+ from google import genai as _genai # noqa: PLC0415
232
+ proc_client = _genai.Client(api_key=processing_api_key)
233
+ client_to_use = proc_client.aio
234
+ else:
235
+ client_to_use = self._client.aio # type: ignore[attr-defined]
236
+
237
+ response = await client_to_use.models.generate_content(
238
+ model=self._model,
239
+ contents=prompt,
240
+ config=types.GenerateContentConfig(temperature=0.0, max_output_tokens=180),
241
+ )
242
+ text = (response.candidates[0].content.parts[0].text or "").strip()
243
+ if text:
244
+ logger.debug("Conversation summary updated (%d chars).", len(text))
245
+ return text
246
+ except Exception as exc:
247
+ logger.warning("update_conversation_summary failed (%s); keeping previous.", exc)
248
+ return previous_summary
249
+
250
+ async def critique_rag_answer(
251
+ self,
252
+ query: str,
253
+ context_block: str,
254
+ answer: str,
255
+ decontextualized_query: str = "",
256
+ ) -> dict[str, int]:
257
+ """
258
+ SELF-RAG critic: score Groq's generated answer on three dimensions (1–3).
259
+
260
+ Dimension 1 β€” Groundedness: Are all factual claims supported by a chunk?
261
+ Dimension 2 β€” Completeness: Does the answer use all relevant available chunks?
262
+ Dimension 3 β€” Specificity: Does the answer give names/numbers/details?
263
+
264
+ Returns {"groundedness": int, "completeness": int, "specificity": int}.
265
+ Defaults to {"groundedness": 3, "completeness": 3, "specificity": 3} when
266
+ Gemini is unavailable (treat as high quality to avoid unnecessary retries).
267
+ """
268
+ _default = {"groundedness": 3, "completeness": 3, "specificity": 3}
269
+ if not self._client:
270
+ return _default
271
+
272
+ display_query = decontextualized_query or query
273
+ prompt = (
274
+ f"Original question: {query}\n"
275
+ + (f"Interpreted as: {decontextualized_query}\n" if decontextualized_query and decontextualized_query != query else "")
276
+ + f"\nRetrieved passages:\n{context_block[:3000]}\n\n"
277
+ f"Generated answer:\n{answer[:1500]}\n\n"
278
+ "Score the answer on three dimensions. Output ONLY three lines in this exact format:\n"
279
+ "groundedness: <1|2|3>\n"
280
+ "completeness: <1|2|3>\n"
281
+ "specificity: <1|2|3>\n\n"
282
+ "Scoring guide:\n"
283
+ "groundedness β€” 3: every claim comes from a passage. 2: most do. 1: claims not in passages.\n"
284
+ "completeness β€” 3: all relevant passages used. 2: partially used. 1: relevant passages ignored.\n"
285
+ "specificity β€” 3: specific details (names, numbers, examples). 2: mixed. 1: entirely generic.\n"
286
+ )
287
+ try:
288
+ from google.genai import types # noqa: PLC0415
289
+ response = await self._client.aio.models.generate_content( # type: ignore[attr-defined]
290
+ model=self._model,
291
+ contents=prompt,
292
+ config=types.GenerateContentConfig(temperature=0.0, max_output_tokens=30),
293
+ )
294
+ text = (response.candidates[0].content.parts[0].text or "").strip()
295
+ scores: dict[str, int] = {}
296
+ for line in text.splitlines():
297
+ if ":" in line:
298
+ k, _, v = line.partition(":")
299
+ k = k.strip().lower()
300
+ try:
301
+ val = int(v.strip())
302
+ if k in ("groundedness", "completeness", "specificity") and 1 <= val <= 3:
303
+ scores[k] = val
304
+ except ValueError:
305
+ pass
306
+ if len(scores) == 3:
307
+ logger.debug("SELF-RAG critic: %s", scores)
308
+ return scores
309
+ except Exception as exc:
310
+ logger.warning("critique_rag_answer failed (%s); defaulting to high quality.", exc)
311
+ return _default
312
+
313
  @property
314
  def is_configured(self) -> bool:
315
  return self._client is not None
app/services/vector_store.py CHANGED
@@ -234,3 +234,43 @@ class VectorStore:
234
  except Exception as exc:
235
  logger.warning("fetch_by_doc_id failed for %r: %s", doc_id, exc)
236
  return []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
234
  except Exception as exc:
235
  logger.warning("fetch_by_doc_id failed for %r: %s", doc_id, exc)
236
  return []
237
+
238
+ def search_by_raptor_level(
239
+ self,
240
+ query_vector: list[float],
241
+ level: int,
242
+ top_k: int = 5,
243
+ ) -> list[Chunk]:
244
+ """
245
+ Dense vector search restricted to chunks at a specific RAPTOR hierarchy level.
246
+
247
+ level=0 β†’ leaf chunks (normal passage-level chunks).
248
+ level=1 β†’ cluster summary nodes generated by RaptorBuilder.
249
+ level=2 β†’ reserved for document-level summaries.
250
+
251
+ Filter is applied via Qdrant payload filter on metadata.raptor_level.
252
+ Old chunks that pre-date RAPTOR indexing lack the field and are excluded,
253
+ which is the correct behaviour (they are effectively level-0 leaves already
254
+ returned by the main dense search in retrieve.py).
255
+ """
256
+ try:
257
+ results = self.client.search(
258
+ collection_name=self.collection,
259
+ query_vector=NamedVector(name=_DENSE_VEC, vector=query_vector),
260
+ limit=top_k,
261
+ query_filter=Filter(
262
+ must=[
263
+ FieldCondition(
264
+ key="metadata.raptor_level",
265
+ match=MatchValue(value=level),
266
+ )
267
+ ]
268
+ ),
269
+ with_payload=True,
270
+ )
271
+ return [Chunk(**hit.payload) for hit in results if hit.payload]
272
+ except Exception as exc:
273
+ logger.warning(
274
+ "search_by_raptor_level(level=%d) failed: %s β€” skipping RAPTOR results.", level, exc
275
+ )
276
+ return []