GitHub Actions commited on
Commit
3d134a6
·
1 Parent(s): e7c9ee6

Deploy 85f07db

Browse files
app/core/config.py CHANGED
@@ -53,6 +53,14 @@ class Settings(BaseSettings):
53
  GEMINI_MODEL: str = "gemini-2.5-flash-lite"
54
  GEMINI_CONTEXT_PATH: str = "backend/app/services/gemini_context.toon"
55
 
 
 
 
 
 
 
 
 
56
  # HuggingFace Space model servers.
57
  # In local env, embedder/reranker run in-process (these URLs are ignored).
58
  # In prod, the API Space calls the HF embedder/reranker Spaces via HTTP.
 
53
  GEMINI_MODEL: str = "gemini-2.5-flash-lite"
54
  GEMINI_CONTEXT_PATH: str = "backend/app/services/gemini_context.toon"
55
 
56
+ # Durable GitHub interaction log — survives HF Space restarts.
57
+ # PERSONABOT_WRITE_TOKEN: fine-grained PAT with read+write Contents access
58
+ # on the PersonaBot repo. When set, every interaction is appended to
59
+ # data/interactions.jsonl in the repo so training signals persist.
60
+ # Leave unset in local dev (interactions stay in SQLite only).
61
+ PERSONABOT_WRITE_TOKEN: Optional[str] = None
62
+ PERSONABOT_REPO: str = "1337Xcode/PersonaBot"
63
+
64
  # HuggingFace Space model servers.
65
  # In local env, embedder/reranker run in-process (these URLs are ignored).
66
  # In prod, the API Space calls the HF embedder/reranker Spaces via HTTP.
app/core/quality.py ADDED
@@ -0,0 +1,53 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ backend/app/core/quality.py
3
+
4
+ Shared quality-gate logic used by both the generate node (Groq responses) and
5
+ the gemini_fast node (Gemini fast-path responses).
6
+
7
+ Centralised here — rather than in generate.py — so the same hedge-detection and
8
+ trust scoring logic runs on every answer regardless of which pipeline branch produced
9
+ it. Duplicating the list of hedge phrases across two modules was the root cause of
10
+ Bug A (Issue 2): Gemini fast-path answers were never checked for hedge phrases.
11
+ """
12
+ from __future__ import annotations
13
+
14
+ import re
15
+
16
+ # Phrases that indicate the model hedged despite having been told not to.
17
+ # Applies to both Groq (generate node) and Gemini (gemini_fast node) outputs.
18
+ _HEDGE_PHRASES: tuple[str, ...] = (
19
+ "unfortunately",
20
+ "limited information",
21
+ "passages only",
22
+ "passages do not",
23
+ "passages don't",
24
+ "you may need to",
25
+ "you may want to",
26
+ "i don't have",
27
+ "i cannot provide",
28
+ "not able to provide",
29
+ "does not provide",
30
+ "does not offer",
31
+ "no detailed information",
32
+ )
33
+
34
+
35
+ def is_low_trust(answer: str, chunks: list, complexity: str) -> bool:
36
+ """
37
+ Return True when the answer is likely poor quality and should be reformatted
38
+ or rerouted to the full RAG pipeline.
39
+
40
+ Three signals, checked in order of cost (cheapest first):
41
+ 1. A hedge phrase survived the system-prompt prohibition.
42
+ 2. Chunks were retrieved but the model cited nothing (no [N] markers).
43
+ Not applicable to Gemini fast-path answers (chunks is always empty there).
44
+ 3. Answer is suspiciously short for a complex query (< 30 words).
45
+ """
46
+ lowered = answer.lower()
47
+ if any(phrase in lowered for phrase in _HEDGE_PHRASES):
48
+ return True
49
+ if chunks and not re.search(r"\[\d+\]", answer):
50
+ return True
51
+ if complexity == "complex" and len(answer.split()) < 30:
52
+ return True
53
+ return False
app/main.py CHANGED
@@ -1,5 +1,6 @@
1
  from contextlib import asynccontextmanager
2
  import os
 
3
 
4
  from fastapi import FastAPI, Request
5
  from fastapi.middleware.cors import CORSMiddleware
@@ -17,6 +18,8 @@ from app.pipeline.graph import build_pipeline
17
  from app.security.rate_limiter import limiter, custom_rate_limit_handler
18
  from app.services.embedder import Embedder
19
  from app.services.gemini_client import GeminiClient
 
 
20
  from app.services.reranker import Reranker
21
  from app.services.semantic_cache import SemanticCache
22
  from app.services.conversation_store import ConversationStore
@@ -25,18 +28,47 @@ from qdrant_client import QdrantClient
25
  logger = get_logger(__name__)
26
 
27
 
 
 
 
 
 
 
 
 
 
 
 
28
  @asynccontextmanager
29
  async def lifespan(app: FastAPI):
30
  settings = get_settings()
31
  logger.info("Starting PersonaBot API | env=%s", settings.ENVIRONMENT)
32
 
 
 
 
 
 
 
 
 
 
33
  # Attach the in-memory semantic cache. No external service required.
34
  app.state.semantic_cache = SemanticCache(
35
  max_size=settings.SEMANTIC_CACHE_SIZE,
36
  ttl_seconds=settings.SEMANTIC_CACHE_TTL_SECONDS,
37
  similarity_threshold=settings.SEMANTIC_CACHE_SIMILARITY_THRESHOLD,
38
  )
39
- app.state.conversation_store = ConversationStore(settings.DB_PATH)
 
 
 
 
 
 
 
 
 
40
 
41
  # DagsHub/MLflow experiment tracking — optional, only active when token is set.
42
  # In prod with DAGSHUB_TOKEN set, experiments are tracked at dagshub.com.
@@ -61,7 +93,6 @@ async def lifespan(app: FastAPI):
61
  )
62
  app.state.gemini_client = gemini_client
63
 
64
- from app.services.llm_client import get_llm_client
65
  from app.services.vector_store import VectorStore
66
  from app.security.guard_classifier import GuardClassifier
67
 
@@ -76,7 +107,11 @@ async def lifespan(app: FastAPI):
76
  # ingest run doesn't crash every search with "collection not found".
77
  vector_store.ensure_collection()
78
 
79
- llm_client = get_llm_client(settings)
 
 
 
 
80
  # Expose llm_client on app state so chat.py can use it for follow-up
81
  # question generation without re-constructing the client per request.
82
  app.state.llm_client = llm_client
@@ -90,6 +125,7 @@ async def lifespan(app: FastAPI):
90
  "vector_store": vector_store,
91
  "reranker": reranker,
92
  "db_path": settings.DB_PATH,
 
93
  })
94
  app.state.settings = settings
95
  app.state.qdrant = qdrant
 
1
  from contextlib import asynccontextmanager
2
  import os
3
+ import sqlite3
4
 
5
  from fastapi import FastAPI, Request
6
  from fastapi.middleware.cors import CORSMiddleware
 
18
  from app.security.rate_limiter import limiter, custom_rate_limit_handler
19
  from app.services.embedder import Embedder
20
  from app.services.gemini_client import GeminiClient
21
+ from app.services.github_log import GithubLog
22
+ from app.services.llm_client import get_llm_client, TpmBucket
23
  from app.services.reranker import Reranker
24
  from app.services.semantic_cache import SemanticCache
25
  from app.services.conversation_store import ConversationStore
 
28
  logger = get_logger(__name__)
29
 
30
 
31
+ def _sqlite_row_count(db_path: str) -> int:
32
+ """Return the current interactions row count, or 0 if the table doesn't exist."""
33
+ try:
34
+ with sqlite3.connect(db_path) as conn:
35
+ return conn.execute("SELECT COUNT(*) FROM interactions").fetchone()[0]
36
+ except sqlite3.OperationalError:
37
+ return 0
38
+ except Exception:
39
+ return 0
40
+
41
+
42
  @asynccontextmanager
43
  async def lifespan(app: FastAPI):
44
  settings = get_settings()
45
  logger.info("Starting PersonaBot API | env=%s", settings.ENVIRONMENT)
46
 
47
+ # Durable GitHub interaction log — survives HF Space restarts.
48
+ # When PERSONABOT_WRITE_TOKEN is not set (local dev), GithubLog.enabled=False
49
+ # and all append calls are silent no-ops.
50
+ github_log = GithubLog(
51
+ write_token=settings.PERSONABOT_WRITE_TOKEN or "",
52
+ repo=settings.PERSONABOT_REPO,
53
+ )
54
+ app.state.github_log = github_log
55
+
56
  # Attach the in-memory semantic cache. No external service required.
57
  app.state.semantic_cache = SemanticCache(
58
  max_size=settings.SEMANTIC_CACHE_SIZE,
59
  ttl_seconds=settings.SEMANTIC_CACHE_TTL_SECONDS,
60
  similarity_threshold=settings.SEMANTIC_CACHE_SIMILARITY_THRESHOLD,
61
  )
62
+ app.state.conversation_store = ConversationStore(settings.DB_PATH, github_log=github_log)
63
+
64
+ # Issue 1: reconstruct SQLite conversation history from the durable GitHub log
65
+ # after an ephemeral HF Space restart. Only triggers when SQLite is empty
66
+ # (<10 rows) so a healthy Space with accumulated data is never overwritten.
67
+ if github_log.enabled and _sqlite_row_count(settings.DB_PATH) < 10:
68
+ logger.info("SQLite appears empty — attempting reconstruction from durable log.")
69
+ recent = await github_log.load_recent(500)
70
+ if recent:
71
+ app.state.conversation_store.populate_from_records(recent)
72
 
73
  # DagsHub/MLflow experiment tracking — optional, only active when token is set.
74
  # In prod with DAGSHUB_TOKEN set, experiments are tracked at dagshub.com.
 
93
  )
94
  app.state.gemini_client = gemini_client
95
 
 
96
  from app.services.vector_store import VectorStore
97
  from app.security.guard_classifier import GuardClassifier
98
 
 
107
  # ingest run doesn't crash every search with "collection not found".
108
  vector_store.ensure_collection()
109
 
110
+ # Issue 7: shared TPM bucket tracks token consumption across the current 60s
111
+ # window. Injected into GroqClient so it can downgrade 70B → 8B automatically
112
+ # when the bucket is above 12,000 tokens, preventing hard rate-limit failures.
113
+ tpm_bucket = TpmBucket()
114
+ llm_client = get_llm_client(settings, tpm_bucket=tpm_bucket)
115
  # Expose llm_client on app state so chat.py can use it for follow-up
116
  # question generation without re-constructing the client per request.
117
  app.state.llm_client = llm_client
 
125
  "vector_store": vector_store,
126
  "reranker": reranker,
127
  "db_path": settings.DB_PATH,
128
+ "github_log": github_log,
129
  })
130
  app.state.settings = settings
131
  app.state.qdrant = qdrant
app/models/pipeline.py CHANGED
@@ -51,3 +51,9 @@ class PipelineState(TypedDict):
51
  # Follow-up question suggestions generated after the main answer.
52
  # 3 short questions specific to content in the answer.
53
  follow_ups: list[str]
 
 
 
 
 
 
 
51
  # Follow-up question suggestions generated after the main answer.
52
  # 3 short questions specific to content in the answer.
53
  follow_ups: list[str]
54
+ # Which pipeline branch produced the final answer.
55
+ # Values: "cache_hit", "gemini_fast", "rag", "blocked".
56
+ # Set by cache, gemini_fast, and generate nodes respectively.
57
+ # data_prep.py filters to path=="rag" when building reranker triplets because
58
+ # only RAG interactions have chunk associations that form valid training pairs.
59
+ path: Optional[str]
app/pipeline/graph.py CHANGED
@@ -72,7 +72,7 @@ def build_pipeline(services: dict) -> CompiledStateGraph:
72
  # CRAG: one query rewrite on failed retrieval — then retrieve runs a second time.
73
  graph.add_node("rewrite_query", make_rewrite_query_node(services["gemini"]))
74
  graph.add_node("generate", make_generate_node(services["llm"], services["gemini"]))
75
- graph.add_node("log_eval", make_log_eval_node(services["db_path"]))
76
 
77
  graph.set_entry_point("guard")
78
 
 
72
  # CRAG: one query rewrite on failed retrieval — then retrieve runs a second time.
73
  graph.add_node("rewrite_query", make_rewrite_query_node(services["gemini"]))
74
  graph.add_node("generate", make_generate_node(services["llm"], services["gemini"]))
75
+ graph.add_node("log_eval", make_log_eval_node(services["db_path"], services.get("github_log")))
76
 
77
  graph.set_entry_point("guard")
78
 
app/pipeline/nodes/cache.py CHANGED
@@ -5,6 +5,15 @@
5
  #
6
  # The computed query embedding is stored in state so the retrieve node can
7
  # reuse it directly — avoiding a second identical HTTP call to the embedder.
 
 
 
 
 
 
 
 
 
8
 
9
  from typing import Callable
10
 
@@ -13,20 +22,55 @@ import numpy as np
13
  from app.models.pipeline import PipelineState
14
  from app.services.semantic_cache import SemanticCache
15
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
 
17
  def make_cache_node(cache: SemanticCache, embedder) -> Callable[[PipelineState], dict]:
18
  async def cache_node(state: PipelineState) -> dict:
 
 
 
 
 
 
 
 
 
 
19
  # is_query=True: prepend BGE asymmetric instruction so query embedding
20
  # lands in the retrieval-optimised neighbourhood of the vector space.
21
  # Document embeddings at ingestion time use is_query=False (default).
22
- embedding = await embedder.embed_one(state["query"], is_query=True)
23
  query_embedding = np.array(embedding)
24
 
25
  cached = await cache.get(query_embedding)
26
  if cached:
27
- return {"answer": cached, "cached": True, "query_embedding": embedding}
 
 
 
 
 
28
 
29
  # Store embedding in state so retrieve_node doesn't re-embed the same query.
30
  return {"cached": False, "query_embedding": embedding}
31
 
32
  return cache_node
 
 
5
  #
6
  # The computed query embedding is stored in state so the retrieve node can
7
  # reuse it directly — avoiding a second identical HTTP call to the embedder.
8
+ #
9
+ # Issue 5 (cache bypass narrowing):
10
+ # The previous design bypassed the cache unconditionally for any multi-turn
11
+ # session. This prevented caching self-contained follow-up queries like
12
+ # "what programming languages does Darshan know?" even when they appear after
13
+ # prior turns. The corrected behaviour: check for unresolved reference tokens
14
+ # (pronouns, demonstratives) BEFORE the cache lookup. Only queries that
15
+ # contain such tokens AND have conversation history are cache-bypassed. All
16
+ # other queries in multi-turn sessions go through cache normally.
17
 
18
  from typing import Callable
19
 
 
22
  from app.models.pipeline import PipelineState
23
  from app.services.semantic_cache import SemanticCache
24
 
25
+ # Tokens that indicate the query cannot be understood without prior context:
26
+ # pronouns and demonstratives that refer to something the user said earlier.
27
+ # "his" and "he" are excluded — they almost always refer to Darshan, not a
28
+ # prior turn, and excluding them would bypass cache on most portfolio queries.
29
+ _REFERENCE_TOKENS: frozenset[str] = frozenset({
30
+ "that", "it", "its", "they", "their", "those",
31
+ "this", "these", "them", "there", "then",
32
+ })
33
+
34
+
35
+ def _has_unresolved_reference(query: str) -> bool:
36
+ """
37
+ True when the query contains a pronoun or demonstrative that likely refers
38
+ to something in the prior conversation turn rather than to Darshan or the
39
+ portfolio content.
40
+ """
41
+ tokens = frozenset(query.lower().split())
42
+ return bool(tokens & _REFERENCE_TOKENS)
43
+
44
 
45
  def make_cache_node(cache: SemanticCache, embedder) -> Callable[[PipelineState], dict]:
46
  async def cache_node(state: PipelineState) -> dict:
47
+ query = state["query"]
48
+ has_history = bool(state.get("conversation_history"))
49
+
50
+ # If the query contains a reference token AND the session has history,
51
+ # the query is a genuine follow-up that cannot be resolved without context.
52
+ # Skip the cache so the pipeline injects history into downstream nodes.
53
+ if has_history and _has_unresolved_reference(query):
54
+ embedding = await embedder.embed_one(query, is_query=True)
55
+ return {"cached": False, "query_embedding": embedding}
56
+
57
  # is_query=True: prepend BGE asymmetric instruction so query embedding
58
  # lands in the retrieval-optimised neighbourhood of the vector space.
59
  # Document embeddings at ingestion time use is_query=False (default).
60
+ embedding = await embedder.embed_one(query, is_query=True)
61
  query_embedding = np.array(embedding)
62
 
63
  cached = await cache.get(query_embedding)
64
  if cached:
65
+ return {
66
+ "answer": cached,
67
+ "cached": True,
68
+ "query_embedding": embedding,
69
+ "path": "cache_hit",
70
+ }
71
 
72
  # Store embedding in state so retrieve_node doesn't re-embed the same query.
73
  return {"cached": False, "query_embedding": embedding}
74
 
75
  return cache_node
76
+
app/pipeline/nodes/gemini_fast.py CHANGED
@@ -9,8 +9,14 @@ Decision logic:
9
  - Gemini calls search_knowledge_base() → state.thinking=True, pipeline
10
  goes to retrieve+generate so the user gets a cited answer.
11
 
12
- The `expand` node is no longer part of the graph; this node carries the
13
- complexity classification it depended on (O(1) heuristic, no LLM call).
 
 
 
 
 
 
14
  """
15
  from __future__ import annotations
16
 
@@ -19,6 +25,7 @@ from typing import Any
19
 
20
  from app.models.pipeline import PipelineState
21
  from app.services.gemini_client import GeminiClient
 
22
 
23
  logger = logging.getLogger(__name__)
24
 
@@ -32,13 +39,28 @@ _COMPLEX_SIGNALS: frozenset[str] = frozenset({
32
  "reference", "proof", "derive", "calculate", "optimise", "optimize",
33
  })
34
 
 
 
 
 
 
 
 
35
 
36
  def _is_complex(query: str) -> bool:
37
- """O(1) heuristic — true when the query signals a need for a cited answer."""
 
 
 
 
 
 
 
38
  tokens = set(query.lower().split())
39
- if len(tokens) > 20:
 
40
  return True
41
- return bool(tokens & _COMPLEX_SIGNALS)
42
 
43
 
44
  def make_gemini_fast_node(gemini_client: GeminiClient) -> Any:
@@ -67,13 +89,28 @@ def make_gemini_fast_node(gemini_client: GeminiClient) -> Any:
67
  )
68
 
69
  if answer is not None:
70
- # Gemini answered from context no RAG needed.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71
  logger.debug("Gemini fast-path answered query (len=%d)", len(answer))
72
  return {
73
  "query_complexity": complexity,
74
  "answer": answer,
75
  "sources": [],
76
  "thinking": False,
 
77
  }
78
 
79
  # Gemini called search_knowledge_base() — signal RAG via thinking=True.
@@ -86,3 +123,4 @@ def make_gemini_fast_node(gemini_client: GeminiClient) -> Any:
86
  }
87
 
88
  return gemini_fast
 
 
9
  - Gemini calls search_knowledge_base() → state.thinking=True, pipeline
10
  goes to retrieve+generate so the user gets a cited answer.
11
 
12
+ Bug A fix (Issue 2): Gemini fast-path answers now run through the same
13
+ is_low_trust() quality gate as Groq answers. If the gate fires (hedge phrase
14
+ detected, or suspiciously short complex answer), the answer is discarded
15
+ and the pipeline routes to full RAG instead of returning a low-quality answer.
16
+
17
+ Issue 7 fix: _is_complex() now requires BOTH a keyword match AND query length
18
+ > 8 words, eliminating false-positive complex classifications for short
19
+ conversational queries like "How?" or "How many projects?".
20
  """
21
  from __future__ import annotations
22
 
 
25
 
26
  from app.models.pipeline import PipelineState
27
  from app.services.gemini_client import GeminiClient
28
+ from app.core.quality import is_low_trust
29
 
30
  logger = logging.getLogger(__name__)
31
 
 
39
  "reference", "proof", "derive", "calculate", "optimise", "optimize",
40
  })
41
 
42
+ # Minimum token count for a query to be classified as complex.
43
+ # Queries shorter than this are almost always conversational or simple
44
+ # biographical lookups regardless of vocabulary. "How?" alone currently
45
+ # triggers 70B without this gate; "How many projects?" should not.
46
+ # Documented in copilot-instructions.md — do not lower without profiling.
47
+ _COMPLEX_MIN_WORDS: int = 8
48
+
49
 
50
  def _is_complex(query: str) -> bool:
51
+ """
52
+ O(1) heuristic — true when the query signals a need for a cited answer.
53
+
54
+ A query is complex only when BOTH conditions hold:
55
+ 1. It contains a complexity-signal keyword (architecture, explain, etc.)
56
+ 2. Its length exceeds _COMPLEX_MIN_WORDS (eliminates "How?" false positives)
57
+ OR it is extremely long (>20 tokens, reliably indicates detailed request).
58
+ """
59
  tokens = set(query.lower().split())
60
+ token_count = len(tokens)
61
+ if token_count > 20:
62
  return True
63
+ return bool(tokens & _COMPLEX_SIGNALS) and token_count > _COMPLEX_MIN_WORDS
64
 
65
 
66
  def make_gemini_fast_node(gemini_client: GeminiClient) -> Any:
 
89
  )
90
 
91
  if answer is not None:
92
+ # Run the same quality gate that guards Groq answers.
93
+ # Gemini fast-path has no retrieved chunks, so only the hedge-phrase
94
+ # and short-complex-answer signals apply (chunks argument is []).
95
+ if is_low_trust(answer, [], complexity):
96
+ logger.debug(
97
+ "Gemini fast-path answer failed quality gate — routing to RAG."
98
+ )
99
+ # Clear the answer so route_gemini() sends us to RAG.
100
+ return {
101
+ "query_complexity": complexity,
102
+ "expanded_queries": [query],
103
+ "thinking": True,
104
+ }
105
+
106
+ # Gemini answered from context and passed quality gate.
107
  logger.debug("Gemini fast-path answered query (len=%d)", len(answer))
108
  return {
109
  "query_complexity": complexity,
110
  "answer": answer,
111
  "sources": [],
112
  "thinking": False,
113
+ "path": "gemini_fast",
114
  }
115
 
116
  # Gemini called search_knowledge_base() — signal RAG via thinking=True.
 
123
  }
124
 
125
  return gemini_fast
126
+
app/pipeline/nodes/generate.py CHANGED
@@ -5,6 +5,7 @@ from typing import Callable
5
  from app.models.chat import SourceRef
6
  from app.models.pipeline import PipelineState
7
  from app.services.llm_client import LLMClient
 
8
 
9
  logger = logging.getLogger(__name__)
10
 
@@ -135,42 +136,6 @@ def _format_history(history: list[dict]) -> str:
135
  return "Prior conversation (oldest first):\n" + "\n".join(lines) + "\n\n"
136
 
137
 
138
- # Phrases that indicate the model hedged despite having source passages.
139
- # Gemini reformat is triggered when any of these appear in the Groq draft.
140
- _HEDGE_PHRASES: tuple[str, ...] = (
141
- "unfortunately",
142
- "limited information",
143
- "passages only",
144
- "passages do not",
145
- "passages don't",
146
- "you may need to",
147
- "you may want to",
148
- "i don't have",
149
- "i cannot provide",
150
- "not able to provide",
151
- "does not provide",
152
- "does not offer",
153
- "no detailed information",
154
- )
155
-
156
-
157
- def _is_low_trust(answer: str, chunks: list, complexity: str) -> bool:
158
- """
159
- True when the Groq draft is likely poor quality and Gemini should rewrite it.
160
-
161
- Three signals:
162
- 1. Hedging phrase survived the system-prompt prohibition.
163
- 2. Chunks were retrieved but the model cited nothing (no [N] markers).
164
- 3. Answer is suspiciously short for a complex query (< 30 words).
165
- """
166
- lowered = answer.lower()
167
- if any(phrase in lowered for phrase in _HEDGE_PHRASES):
168
- return True
169
- if chunks and not re.search(r"\[\d+\]", answer):
170
- return True
171
- if complexity == "complex" and len(answer.split()) < 30:
172
- return True
173
- return False
174
 
175
 
176
  def make_generate_node(llm_client: LLMClient, gemini_client=None) -> Callable[[PipelineState], dict]: # noqa: ANN001
@@ -194,7 +159,7 @@ def make_generate_node(llm_client: LLMClient, gemini_client=None) -> Callable[[P
194
  full_answer = ""
195
  async for token in stream:
196
  full_answer += token
197
- return {"answer": full_answer, "sources": []}
198
 
199
  # ── Pre-LLM coherence shortcut ──────────────────────────────────────
200
  # Check that at least one meaningful query token appears somewhere in
@@ -215,7 +180,7 @@ def make_generate_node(llm_client: LLMClient, gemini_client=None) -> Callable[[P
215
  full_answer = ""
216
  async for token in stream:
217
  full_answer += token
218
- return {"answer": full_answer, "sources": []}
219
 
220
  # ── Build numbered context block ────────────────────────────────────
221
  context_parts: list[str] = []
@@ -273,7 +238,7 @@ def make_generate_node(llm_client: LLMClient, gemini_client=None) -> Callable[[P
273
  # Fires when: (a) criticism was detected — always reformat to be safe, or
274
  # (b) low-trust heuristic flags the draft (hedging / no citations / too short).
275
  # Zero extra cost on good responses; ~200-400ms only when genuinely needed.
276
- if gemini_client is not None and (is_criticism or _is_low_trust(full_answer, reranked_chunks, complexity)):
277
  logger.debug("Triggering Gemini reformat (criticism=%s).", is_criticism)
278
  reformatted = await gemini_client.reformat_rag_answer(query, context_block, full_answer)
279
  if reformatted:
@@ -287,6 +252,9 @@ def make_generate_node(llm_client: LLMClient, gemini_client=None) -> Callable[[P
287
  return {
288
  "answer": full_answer,
289
  "sources": cited_sources if cited_sources else source_refs[:2],
 
 
 
290
  }
291
 
292
  return generate_node
 
5
  from app.models.chat import SourceRef
6
  from app.models.pipeline import PipelineState
7
  from app.services.llm_client import LLMClient
8
+ from app.core.quality import is_low_trust
9
 
10
  logger = logging.getLogger(__name__)
11
 
 
136
  return "Prior conversation (oldest first):\n" + "\n".join(lines) + "\n\n"
137
 
138
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
139
 
140
 
141
  def make_generate_node(llm_client: LLMClient, gemini_client=None) -> Callable[[PipelineState], dict]: # noqa: ANN001
 
159
  full_answer = ""
160
  async for token in stream:
161
  full_answer += token
162
+ return {"answer": full_answer, "sources": [], "path": "rag"}
163
 
164
  # ── Pre-LLM coherence shortcut ──────────────────────────────────────
165
  # Check that at least one meaningful query token appears somewhere in
 
180
  full_answer = ""
181
  async for token in stream:
182
  full_answer += token
183
+ return {"answer": full_answer, "sources": [], "path": "rag"}
184
 
185
  # ── Build numbered context block ────────────────────────────────────
186
  context_parts: list[str] = []
 
238
  # Fires when: (a) criticism was detected — always reformat to be safe, or
239
  # (b) low-trust heuristic flags the draft (hedging / no citations / too short).
240
  # Zero extra cost on good responses; ~200-400ms only when genuinely needed.
241
+ if gemini_client is not None and (is_criticism or is_low_trust(full_answer, reranked_chunks, complexity)):
242
  logger.debug("Triggering Gemini reformat (criticism=%s).", is_criticism)
243
  reformatted = await gemini_client.reformat_rag_answer(query, context_block, full_answer)
244
  if reformatted:
 
252
  return {
253
  "answer": full_answer,
254
  "sources": cited_sources if cited_sources else source_refs[:2],
255
+ # Tag this interaction so data_prep.py can filter to RAG-path only
256
+ # when building reranker triplets (only RAG has chunk associations).
257
+ "path": "rag",
258
  }
259
 
260
  return generate_node
app/pipeline/nodes/guard.py CHANGED
@@ -22,7 +22,8 @@ def make_guard_node(classifier: GuardClassifier) -> Callable[[PipelineState], di
22
  return {
23
  "query": clean_query,
24
  "guard_passed": False,
25
- "answer": "I can only answer questions about Darshan's work, projects, and background."
 
26
  }
27
 
28
  # 3. Classify (Scope evaluation)
@@ -32,7 +33,8 @@ def make_guard_node(classifier: GuardClassifier) -> Callable[[PipelineState], di
32
  return {
33
  "query": clean_query,
34
  "guard_passed": False,
35
- "answer": "I can only answer questions about Darshan's work, projects, and background."
 
36
  }
37
 
38
  return {
 
22
  return {
23
  "query": clean_query,
24
  "guard_passed": False,
25
+ "answer": "I can only answer questions about Darshan's work, projects, and background.",
26
+ "path": "blocked",
27
  }
28
 
29
  # 3. Classify (Scope evaluation)
 
33
  return {
34
  "query": clean_query,
35
  "guard_passed": False,
36
+ "answer": "I can only answer questions about Darshan's work, projects, and background.",
37
+ "path": "blocked",
38
  }
39
 
40
  return {
app/pipeline/nodes/log_eval.py CHANGED
@@ -2,7 +2,7 @@ import json
2
  import logging
3
  import sqlite3
4
  import os
5
- from datetime import datetime
6
  from typing import Callable
7
 
8
  from app.models.pipeline import PipelineState
@@ -11,12 +11,20 @@ from app.core.config import get_settings
11
  logger = logging.getLogger(__name__)
12
 
13
 
14
- def make_log_eval_node(db_path: str) -> Callable[[PipelineState], dict]:
15
  """
16
  Writes interaction to SQLite synchronously (<5ms) inside the request lifespan.
17
- Returns the row ID as interaction_id so the API can expose it for feedback.
18
- RAGAS evaluation runs separately in the GitHub Actions eval workflow against
19
- the accumulated SQLite data — not in the request path.
 
 
 
 
 
 
 
 
20
  """
21
 
22
  def _write_to_sqlite(state: PipelineState) -> int:
@@ -36,6 +44,7 @@ def make_log_eval_node(db_path: str) -> Callable[[PipelineState], dict]:
36
  [{"text": c["text"], "doc_id": c["metadata"]["doc_id"]}
37
  for c in state.get("reranked_chunks", [])]
38
  )
 
39
 
40
  with sqlite3.connect(db_path) as conn:
41
  conn.execute(
@@ -51,7 +60,8 @@ def make_log_eval_node(db_path: str) -> Callable[[PipelineState], dict]:
51
  reranked_chunks_json TEXT,
52
  latency_ms INTEGER,
53
  cached BOOLEAN,
54
- feedback INTEGER DEFAULT 0
 
55
  )
56
  """
57
  )
@@ -60,6 +70,8 @@ def make_log_eval_node(db_path: str) -> Callable[[PipelineState], dict]:
60
  ("reranked_chunks_json", "TEXT DEFAULT '[]'"),
61
  ("feedback", "INTEGER DEFAULT 0"),
62
  ("session_id", "TEXT DEFAULT ''"),
 
 
63
  ]:
64
  try:
65
  conn.execute(f"ALTER TABLE interactions ADD COLUMN {col} {definition}")
@@ -69,11 +81,11 @@ def make_log_eval_node(db_path: str) -> Callable[[PipelineState], dict]:
69
  cursor = conn.execute(
70
  """
71
  INSERT INTO interactions
72
- (timestamp, session_id, query, answer, chunks_used, rerank_scores, reranked_chunks_json, latency_ms, cached)
73
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
74
  """,
75
  (
76
- datetime.utcnow().isoformat() + "Z",
77
  state.get("session_id", ""),
78
  state.get("query", ""),
79
  state.get("answer", ""),
@@ -82,6 +94,7 @@ def make_log_eval_node(db_path: str) -> Callable[[PipelineState], dict]:
82
  reranked_chunks_json,
83
  state.get("latency_ms", 0),
84
  state.get("cached", False),
 
85
  ),
86
  )
87
  return cursor.lastrowid # type: ignore[return-value]
@@ -89,6 +102,33 @@ def make_log_eval_node(db_path: str) -> Callable[[PipelineState], dict]:
89
  async def log_eval_node(state: PipelineState) -> dict:
90
  try:
91
  row_id = _write_to_sqlite(state)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92
  return {"interaction_id": row_id}
93
  except Exception as e:
94
  # Log but never surface to user — this node is a sink.
 
2
  import logging
3
  import sqlite3
4
  import os
5
+ from datetime import datetime, timezone
6
  from typing import Callable
7
 
8
  from app.models.pipeline import PipelineState
 
11
  logger = logging.getLogger(__name__)
12
 
13
 
14
+ def make_log_eval_node(db_path: str, github_log=None) -> Callable[[PipelineState], dict]:
15
  """
16
  Writes interaction to SQLite synchronously (<5ms) inside the request lifespan.
17
+ Also appends to the durable GitHub JSONL log (fire-and-forget background task)
18
+ so training signals survive HuggingFace Space restarts.
19
+
20
+ The `path` field tags which pipeline branch produced the answer:
21
+ "cache_hit" — served from semantic cache, no LLM called.
22
+ "gemini_fast" — Gemini answered directly from context summary.
23
+ "rag" — full retrieve + rerank + Groq path.
24
+ "blocked" — guard rejected the query.
25
+
26
+ data_prep.py filters to path=="rag" when building reranker triplets because
27
+ only RAG interactions have chunk associations for valid training pairs.
28
  """
29
 
30
  def _write_to_sqlite(state: PipelineState) -> int:
 
44
  [{"text": c["text"], "doc_id": c["metadata"]["doc_id"]}
45
  for c in state.get("reranked_chunks", [])]
46
  )
47
+ path = state.get("path") or "rag"
48
 
49
  with sqlite3.connect(db_path) as conn:
50
  conn.execute(
 
60
  reranked_chunks_json TEXT,
61
  latency_ms INTEGER,
62
  cached BOOLEAN,
63
+ feedback INTEGER DEFAULT 0,
64
+ path TEXT DEFAULT 'rag'
65
  )
66
  """
67
  )
 
70
  ("reranked_chunks_json", "TEXT DEFAULT '[]'"),
71
  ("feedback", "INTEGER DEFAULT 0"),
72
  ("session_id", "TEXT DEFAULT ''"),
73
+ # path column: old rows default to "rag" — they were all RAG interactions.
74
+ ("path", "TEXT DEFAULT 'rag'"),
75
  ]:
76
  try:
77
  conn.execute(f"ALTER TABLE interactions ADD COLUMN {col} {definition}")
 
81
  cursor = conn.execute(
82
  """
83
  INSERT INTO interactions
84
+ (timestamp, session_id, query, answer, chunks_used, rerank_scores, reranked_chunks_json, latency_ms, cached, path)
85
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
86
  """,
87
  (
88
+ datetime.now(tz=timezone.utc).isoformat(),
89
  state.get("session_id", ""),
90
  state.get("query", ""),
91
  state.get("answer", ""),
 
94
  reranked_chunks_json,
95
  state.get("latency_ms", 0),
96
  state.get("cached", False),
97
+ path,
98
  ),
99
  )
100
  return cursor.lastrowid # type: ignore[return-value]
 
102
  async def log_eval_node(state: PipelineState) -> dict:
103
  try:
104
  row_id = _write_to_sqlite(state)
105
+
106
+ # Append to durable GitHub log (fire-and-forget — never blocks the response).
107
+ if github_log is not None and github_log.enabled:
108
+ path = state.get("path") or "rag"
109
+ record = {
110
+ "timestamp": datetime.now(tz=timezone.utc).isoformat(),
111
+ "session_id": state.get("session_id", ""),
112
+ "query": state.get("query", ""),
113
+ "answer": state.get("answer", ""),
114
+ "chunks_used": json.loads(
115
+ json.dumps([c["metadata"]["doc_id"] for c in state.get("reranked_chunks", [])])
116
+ ),
117
+ "reranked_chunks_json": [
118
+ {"text": c["text"], "doc_id": c["metadata"]["doc_id"]}
119
+ for c in state.get("reranked_chunks", [])
120
+ ],
121
+ "rerank_scores": [
122
+ c["metadata"].get("rerank_score", 0.0)
123
+ for c in state.get("reranked_chunks", [])
124
+ ],
125
+ "latency_ms": state.get("latency_ms", 0),
126
+ "cached": state.get("cached", False),
127
+ "feedback": 0,
128
+ "path": path,
129
+ }
130
+ github_log.append(record)
131
+
132
  return {"interaction_id": row_id}
133
  except Exception as e:
134
  # Log but never surface to user — this node is a sink.
app/pipeline/nodes/retrieve.py CHANGED
@@ -17,10 +17,27 @@ from app.services.sparse_encoder import SparseEncoder
17
  # passages that answer tech-stack or experience questions.
18
  _MIN_TOP_SCORE: float = -3.5
19
 
20
- # Cap the number of chunks taken from any single source document after reranking.
21
  # Without this, a verbose doc can crowd out all 5 context slots, hiding other
22
  # relevant sources and making the answer look one-dimensional.
23
- _MAX_CHUNKS_PER_DOC: int = 2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
24
 
25
  # RRF rank fusion constant. k=60 is the original Cormack et al. default.
26
  # Higher k reduces the influence of top-1 rank advantage.
@@ -30,6 +47,22 @@ _RRF_K: int = 60
30
  _sparse_encoder = SparseEncoder()
31
 
32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
  def _rrf_merge(ranked_lists: list[list[Chunk]]) -> list[Chunk]:
34
  """
35
  Reciprocal Rank Fusion across multiple ranked chunk lists.
@@ -124,12 +157,25 @@ def make_retrieve_node(
124
  "retrieval_attempts": attempts + 1,
125
  }
126
 
127
- # ── Source diversity cap ───────────────────────────────────────────────
 
 
 
 
 
 
128
  doc_counts: dict[str, int] = {}
129
  diverse_chunks: list[Chunk] = []
130
  for chunk in reranked:
131
  doc_id = chunk["metadata"]["doc_id"]
132
- if doc_counts.get(doc_id, 0) < _MAX_CHUNKS_PER_DOC:
 
 
 
 
 
 
 
133
  diverse_chunks.append(chunk)
134
  doc_counts[doc_id] = doc_counts.get(doc_id, 0) + 1
135
 
 
17
  # passages that answer tech-stack or experience questions.
18
  _MIN_TOP_SCORE: float = -3.5
19
 
20
+ # Default cap: max chunks per source document for BROAD queries.
21
  # Without this, a verbose doc can crowd out all 5 context slots, hiding other
22
  # relevant sources and making the answer look one-dimensional.
23
+ _MAX_CHUNKS_PER_DOC_BROAD: int = 2
24
+
25
+ # For FOCUSED queries, the matching source is allowed more depth (4 slots)
26
+ # while all other sources are capped at 1. This prevents the cap from
27
+ # removing the 3rd-most-relevant resume section on an experience question.
28
+ _MAX_CHUNKS_PER_DOC_FOCUSED: int = 4
29
+ _MAX_CHUNKS_OTHER_FOCUSED: int = 1
30
+
31
+ # Keywords that imply the visitor wants depth from a specific source type.
32
+ # Values are the source_type values set by ingest (ChunkMetadata.source_type).
33
+ _FOCUS_KEYWORDS: dict[frozenset[str], str] = {
34
+ frozenset({"experience", "work", "job", "role", "career", "internship",
35
+ "skills", "skill", "education", "degree", "university",
36
+ "certification", "certifications", "qualified", "resume", "cv"}): "cv",
37
+ frozenset({"project", "built", "build", "developed", "architecture",
38
+ "system", "platform", "app", "application"}): "project",
39
+ frozenset({"blog", "post", "article", "wrote", "writing", "published"}): "blog",
40
+ }
41
 
42
  # RRF rank fusion constant. k=60 is the original Cormack et al. default.
43
  # Higher k reduces the influence of top-1 rank advantage.
 
47
  _sparse_encoder = SparseEncoder()
48
 
49
 
50
+ def _focused_source_type(query: str) -> str | None:
51
+ """
52
+ Return the source_type that the query is focused on, or None for broad queries.
53
+
54
+ A query is focused when it contains at least one keyword that strongly implies
55
+ a specific content source (resume, project pages, blog posts). Broad queries
56
+ that don't match any category retain the 2-per-doc default cap so no single
57
+ source dominates the 5 context slots.
58
+ """
59
+ tokens = frozenset(query.lower().split())
60
+ for keyword_set, source_type in _FOCUS_KEYWORDS.items():
61
+ if tokens & keyword_set:
62
+ return source_type
63
+ return None
64
+
65
+
66
  def _rrf_merge(ranked_lists: list[list[Chunk]]) -> list[Chunk]:
67
  """
68
  Reciprocal Rank Fusion across multiple ranked chunk lists.
 
157
  "retrieval_attempts": attempts + 1,
158
  }
159
 
160
+ # ── Source diversity cap (query-aware) ─────────────────────────────────
161
+ # Broad queries: max 2 chunks per source document (anti-resume-monopoly).
162
+ # Focused queries (experience, skills, project, blog): raise the cap for
163
+ # the matching source type to 4, cap everything else at 1. This lets
164
+ # the resume fill appropriately on "what is Darshan's work experience?"
165
+ # without harming answer quality on broad queries.
166
+ focused_type = _focused_source_type(query)
167
  doc_counts: dict[str, int] = {}
168
  diverse_chunks: list[Chunk] = []
169
  for chunk in reranked:
170
  doc_id = chunk["metadata"]["doc_id"]
171
+ src_type = chunk["metadata"].get("source_type", "")
172
+ if focused_type and src_type == focused_type:
173
+ cap = _MAX_CHUNKS_PER_DOC_FOCUSED
174
+ elif focused_type:
175
+ cap = _MAX_CHUNKS_OTHER_FOCUSED
176
+ else:
177
+ cap = _MAX_CHUNKS_PER_DOC_BROAD
178
+ if doc_counts.get(doc_id, 0) < cap:
179
  diverse_chunks.append(chunk)
180
  doc_counts[doc_id] = doc_counts.get(doc_id, 0) + 1
181
 
app/security/sanitizer.py CHANGED
@@ -1,12 +1,25 @@
1
- import re
2
- from typing import Optional
 
 
 
 
3
 
4
- try:
5
- from presidio_analyzer import AnalyzerEngine
6
- except ImportError:
7
- AnalyzerEngine = None
 
 
 
 
8
 
9
- _analyzer = None
 
 
 
 
 
10
 
11
  # LLM token delimiters that attackers embed in queries to escape the system prompt
12
  # or inject new instructions. Strip them before any further processing.
@@ -20,15 +33,23 @@ _RE_INJECT_TOKENS = re.compile(
20
  re.IGNORECASE,
21
  )
22
 
23
-
24
- def get_analyzer() -> Optional["AnalyzerEngine"]:
25
- global _analyzer
26
- if _analyzer is None and AnalyzerEngine is not None:
27
- try:
28
- _analyzer = AnalyzerEngine()
29
- except Exception:
30
- _analyzer = None
31
- return _analyzer
 
 
 
 
 
 
 
 
32
 
33
 
34
  def sanitize_input(text: str) -> str:
@@ -54,40 +75,19 @@ def sanitize_input(text: str) -> str:
54
 
55
  def redact_pii(text: str) -> str:
56
  """
57
- Use presidio_analyzer.AnalyzerEngine with language="en".
58
- Detect EMAIL_ADDRESS, PHONE_NUMBER, UK_NHS, IBAN_CODE, PERSON.
59
- Replace detected spans with [REDACTED].
 
 
 
 
 
60
  """
61
  if not text:
62
  return text
63
 
64
- analyzer = get_analyzer()
65
- if not analyzer:
66
- # Failsafe if Presidio isn't installed/working
67
- return text
68
-
69
- # PERSON is intentionally excluded: visitors are expected to name Darshan Chheda
70
- # in their queries. Redacting that breaks retrieval and confuses the LLM.
71
- # We only protect against visitor PII that could leak into logs (e-mail, phone, etc.).
72
- entities = ["EMAIL_ADDRESS", "PHONE_NUMBER", "UK_NHS", "IBAN_CODE"]
73
-
74
- try:
75
- results = analyzer.analyze(text=text, entities=entities, language='en')
76
-
77
- if not results:
78
- return text
79
-
80
- # Sort results by start index in reverse order to comfortably replace without shifting
81
- # the remaining string indices.
82
- results.sort(key=lambda x: x.start, reverse=True)
83
-
84
- redacted_text = text
85
- for result in results:
86
- start = result.start
87
- end = result.end
88
- redacted_text = redacted_text[:start] + "[REDACTED]" + redacted_text[end:]
89
-
90
- return redacted_text
91
- except Exception:
92
- # Failsafe fallback
93
- return text
 
1
+ """
2
+ backend/app/security/sanitizer.py
3
+
4
+ Input sanitisation and lightweight PII redaction for user queries.
5
+
6
+ Issue 4 resolution: Presidio was replaced with six compiled regex patterns.
7
 
8
+ WHY Presidio was removed
9
+ ─────────────────────────
10
+ Presidio uses spaCy-based NLP internally: named entity recognition, pattern
11
+ matching, and context analysis. This added 50-100ms to every request before
12
+ any business logic ran. For a personal portfolio chatbot, the realistic PII
13
+ risk is near zero — no legitimate user submits their credit card number or SSN
14
+ to a developer's portfolio assistant. The threat model does not justify the
15
+ latency cost or the large spaCy model in the Docker image.
16
 
17
+ Six regex patterns cover every plausible PII type for this use case and run
18
+ in microseconds, not milliseconds. If Presidio is ever reconsidered, the
19
+ latency cost must be measured and documented before reintroduction.
20
+ DO NOT reintroduce Presidio or spaCy without explicit justification.
21
+ """
22
+ import re
23
 
24
  # LLM token delimiters that attackers embed in queries to escape the system prompt
25
  # or inject new instructions. Strip them before any further processing.
 
33
  re.IGNORECASE,
34
  )
35
 
36
+ # Six compiled patterns covering plausible PII in portfolio chatbot input.
37
+ # Named capturing groups make the replacements self-documenting.
38
+ # Patterns are ordered cheapest-first (no backtracking before complex ones).
39
+ _PII_PATTERNS: tuple[re.Pattern, ...] = (
40
+ # Email address
41
+ re.compile(r"\b[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}\b"),
42
+ # IPv4 address (before phone to avoid 4-octet false positives in phone patterns)
43
+ re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b"),
44
+ # UK phone: 07xxx xxxxxx, +44 7xxx xxxxxx, 01xxx xxxxxx, etc.
45
+ re.compile(r"\b(?:\+44\s?|0)(?:\d\s?){9,10}\b"),
46
+ # UK National Insurance number: two letters, six digits, one letter (A–D)
47
+ re.compile(r"\b[A-CEGHJ-PR-TW-Z]{2}\d{6}[A-D]\b", re.IGNORECASE),
48
+ # UK sort code: xx-xx-xx or xxxxxx (6 digits)
49
+ re.compile(r"\b\d{2}[-\s]?\d{2}[-\s]?\d{2}\b"),
50
+ # Credit card: 13–19 digit sequences with optional spaces/dashes
51
+ re.compile(r"\b(?:\d[ \-]?){13,19}\b"),
52
+ )
53
 
54
 
55
  def sanitize_input(text: str) -> str:
 
75
 
76
  def redact_pii(text: str) -> str:
77
  """
78
+ Detect and redact PII using six lightweight compiled regex patterns.
79
+
80
+ Patterns cover: email address, IPv4 address, UK phone number,
81
+ UK National Insurance number, UK sort code, and credit card number.
82
+ Runs in microseconds per query — no NLP model, no spaCy, no network calls.
83
+
84
+ PERSON entities are intentionally not redacted: visitors are expected to
85
+ name Darshan Chheda in their queries. Redacting that breaks retrieval.
86
  """
87
  if not text:
88
  return text
89
 
90
+ for pattern in _PII_PATTERNS:
91
+ text = pattern.sub("[REDACTED]", text)
92
+ return text
93
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
app/services/conversation_store.py CHANGED
@@ -12,11 +12,18 @@ wasting significant token budget on verbatim prior answers.
12
  All reads/writes are synchronous sqlite3 (<3ms on SSD) — acceptable because:
13
  1. The call happens once at request start, outside the model call path.
14
  2. SQLite WAL mode allows concurrent readers and one writer without blocking.
 
 
 
 
 
15
  """
16
  from __future__ import annotations
17
 
 
18
  import logging
19
  import sqlite3
 
20
 
21
  logger = logging.getLogger(__name__)
22
 
@@ -35,8 +42,9 @@ class ConversationStore:
35
  One instance is created at startup and shared across all requests via app.state.
36
  """
37
 
38
- def __init__(self, db_path: str) -> None:
39
  self._db_path = db_path
 
40
 
41
  def get_recent(self, session_id: str, max_turns: int = _DEFAULT_MAX_TURNS) -> list[dict]:
42
  """
@@ -75,9 +83,12 @@ class ConversationStore:
75
 
76
  def mark_last_negative(self, session_id: str) -> None:
77
  """
78
- Set feedback=-1 on the most recent interaction for `session_id`.
79
- Called when the current user message clearly criticises the previous answer.
80
- This feeds the self-improvement loop in data_prep.py / purge_bad_chunks.py.
 
 
 
81
  """
82
  try:
83
  with sqlite3.connect(self._db_path) as conn:
@@ -94,4 +105,88 @@ class ConversationStore:
94
  (session_id,),
95
  )
96
  except Exception as exc:
97
- logger.warning("ConversationStore.mark_last_negative failed: %s", exc)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
  All reads/writes are synchronous sqlite3 (<3ms on SSD) — acceptable because:
13
  1. The call happens once at request start, outside the model call path.
14
  2. SQLite WAL mode allows concurrent readers and one writer without blocking.
15
+
16
+ Issue 1: mark_last_negative() now also fires github_log.append_feedback() so
17
+ negative labels persist across HF Space restarts. Without this, negative
18
+ examples accumulated during a session are lost on the next restart, and
19
+ data_prep.py cannot produce accurate hard-negative training triplets.
20
  """
21
  from __future__ import annotations
22
 
23
+ import json
24
  import logging
25
  import sqlite3
26
+ from datetime import datetime, timezone
27
 
28
  logger = logging.getLogger(__name__)
29
 
 
42
  One instance is created at startup and shared across all requests via app.state.
43
  """
44
 
45
+ def __init__(self, db_path: str, github_log=None) -> None:
46
  self._db_path = db_path
47
+ self._github_log = github_log
48
 
49
  def get_recent(self, session_id: str, max_turns: int = _DEFAULT_MAX_TURNS) -> list[dict]:
50
  """
 
83
 
84
  def mark_last_negative(self, session_id: str) -> None:
85
  """
86
+ Set feedback=-1 on the most recent interaction for `session_id` in SQLite,
87
+ then durably record the correction in the GitHub JSONL log so the negative
88
+ label survives a HF Space restart.
89
+
90
+ data_prep.py reads {type:"feedback", feedback:-1} correction records from
91
+ the durable log and applies them when building reranker training triplets.
92
  """
93
  try:
94
  with sqlite3.connect(self._db_path) as conn:
 
105
  (session_id,),
106
  )
107
  except Exception as exc:
108
+ logger.warning("ConversationStore.mark_last_negative SQLite failed: %s", exc)
109
+
110
+ # Durable correction record — survives Space restart; not in SQLite only.
111
+ if self._github_log is not None:
112
+ self._github_log.append_feedback(session_id, feedback=-1)
113
+
114
+ def populate_from_records(self, records: list[dict]) -> None:
115
+ """
116
+ Replay interaction records from the durable GitHub log into SQLite.
117
+ Called at startup when SQLite is empty after a Space restart so conversation
118
+ history is available without requiring a full log replay on every request.
119
+
120
+ Only inserts rows for path='rag'|'gemini_fast'|'cache_hit' interactions;
121
+ skips feedback correction records (type='feedback') which are not interactions.
122
+ """
123
+ import os
124
+ db_dir = os.path.dirname(self._db_path)
125
+ if db_dir:
126
+ os.makedirs(db_dir, exist_ok=True)
127
+
128
+ interaction_records = [
129
+ r for r in records
130
+ if r.get("type") != "feedback" and r.get("query")
131
+ ]
132
+ if not interaction_records:
133
+ return
134
+
135
+ try:
136
+ with sqlite3.connect(self._db_path) as conn:
137
+ conn.execute(
138
+ """
139
+ CREATE TABLE IF NOT EXISTS interactions (
140
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
141
+ timestamp TEXT,
142
+ session_id TEXT,
143
+ query TEXT,
144
+ answer TEXT,
145
+ chunks_used TEXT,
146
+ rerank_scores TEXT,
147
+ reranked_chunks_json TEXT,
148
+ latency_ms INTEGER,
149
+ cached BOOLEAN,
150
+ feedback INTEGER DEFAULT 0,
151
+ path TEXT DEFAULT 'rag'
152
+ )
153
+ """
154
+ )
155
+ # Apply feedback corrections: build a map session_id -> feedback
156
+ # so they can be applied when inserting the matching interactions.
157
+ feedback_corrections: dict[str, int] = {}
158
+ for r in records:
159
+ if r.get("type") == "feedback":
160
+ feedback_corrections[r["session_id"]] = r.get("feedback", 0)
161
+
162
+ for r in interaction_records:
163
+ sid = r.get("session_id", "")
164
+ feedback = feedback_corrections.get(sid, r.get("feedback", 0))
165
+ conn.execute(
166
+ """
167
+ INSERT INTO interactions
168
+ (timestamp, session_id, query, answer, chunks_used,
169
+ rerank_scores, reranked_chunks_json, latency_ms, cached, feedback, path)
170
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
171
+ """,
172
+ (
173
+ r.get("timestamp", datetime.now(tz=timezone.utc).isoformat()),
174
+ sid,
175
+ r.get("query", ""),
176
+ r.get("answer", ""),
177
+ json.dumps(r.get("chunks_used", [])),
178
+ json.dumps(r.get("rerank_scores", [])),
179
+ json.dumps(r.get("reranked_chunks_json", [])),
180
+ r.get("latency_ms", 0),
181
+ r.get("cached", False),
182
+ feedback,
183
+ r.get("path", "rag"),
184
+ ),
185
+ )
186
+ logger.info(
187
+ "Reconstructed %d interactions from durable GitHub log into SQLite.",
188
+ len(interaction_records),
189
+ )
190
+ except Exception as exc:
191
+ logger.warning("ConversationStore.populate_from_records failed: %s", exc)
192
+
app/services/github_log.py ADDED
@@ -0,0 +1,165 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ backend/app/services/github_log.py
3
+
4
+ Durable interaction log backed by a JSONL file in the PersonaBot GitHub repo.
5
+
6
+ HuggingFace Spaces free tier destroys in-Space storage (SQLite, /data/) on every
7
+ restart, maintenance window, and idle reclamation. Every interaction written only
8
+ to SQLite is silently reset to zero — the self-improvement loop accumulates nothing
9
+ across restarts.
10
+
11
+ This service appends each interaction as a single JSON line to a committed file in
12
+ the PersonaBot repo via the GitHub Contents API, using PERSONABOT_WRITE_TOKEN. The
13
+ file survives Space restarts because it lives in Git, not on the Space filesystem.
14
+
15
+ On Space startup, if SQLite is empty (< 10 rows), the last 500 lines are fetched from
16
+ this file and replayed into SQLite so conversation history and training signals are
17
+ available immediately without a full log replay on every request.
18
+
19
+ Negative feedback (mark_last_negative) is durably recorded by appending a correction
20
+ record {type:"feedback", feedback:-1, session_id:...} that data_prep.py interprets when
21
+ building training triplets.
22
+
23
+ Failure modes
24
+ ─────────────
25
+ If the GitHub API call fails (rate limit, network error, 409 SHA conflict), the error
26
+ is logged at WARNING level and the interaction is NOT lost — it is always written to
27
+ SQLite first. The durable log is a best-effort durability layer, not a primary store.
28
+ """
29
+ from __future__ import annotations
30
+
31
+ import asyncio
32
+ import base64
33
+ import json
34
+ import logging
35
+ from datetime import datetime, timezone
36
+
37
+ import httpx
38
+
39
+ logger = logging.getLogger(__name__)
40
+
41
+ # Fixed path inside the PersonaBot repository. The retrain workflow reads this
42
+ # file directly from the repo checkout — no admin endpoint download required.
43
+ _LOG_PATH = "data/interactions.jsonl"
44
+ _API_TIMEOUT = 20
45
+
46
+
47
+ class GithubLog:
48
+ """
49
+ Append-only JSONL log backed by the PersonaBot GitHub repo.
50
+
51
+ All writes are fire-and-forget background tasks so they never add latency
52
+ to the SSE stream. This object is created once at startup and shared
53
+ across all requests via app.state.github_log.
54
+ """
55
+
56
+ def __init__(self, write_token: str, repo: str) -> None:
57
+ self._token = write_token
58
+ self._repo = repo
59
+ self._api_url = f"https://api.github.com/repos/{repo}/contents/{_LOG_PATH}"
60
+ self._headers = {
61
+ "Authorization": f"Bearer {write_token}",
62
+ "Accept": "application/vnd.github+json",
63
+ }
64
+
65
+ @property
66
+ def enabled(self) -> bool:
67
+ return bool(self._token)
68
+
69
+ def append(self, record: dict) -> None:
70
+ """
71
+ Schedule a background task to append `record` to the durable JSONL log.
72
+ Returns immediately — never blocks the request path.
73
+ """
74
+ if not self.enabled:
75
+ return
76
+ # asyncio.create_task requires a running event loop; log_eval is async so this is safe.
77
+ asyncio.create_task(self._append_bg(record))
78
+
79
+ async def _append_bg(self, record: dict) -> None:
80
+ try:
81
+ async with httpx.AsyncClient(timeout=_API_TIMEOUT) as client:
82
+ get_r = await client.get(self._api_url, headers=self._headers)
83
+ if get_r.status_code == 200:
84
+ data = get_r.json()
85
+ sha: str | None = data["sha"]
86
+ current = base64.b64decode(
87
+ data["content"].replace("\n", "")
88
+ ).decode("utf-8")
89
+ elif get_r.status_code == 404:
90
+ sha = None
91
+ current = ""
92
+ else:
93
+ logger.warning(
94
+ "GithubLog GET failed (%d) — interaction not logged durably.",
95
+ get_r.status_code,
96
+ )
97
+ return
98
+
99
+ new_content = current.rstrip("\n") + "\n" + json.dumps(record) + "\n"
100
+ encoded = base64.b64encode(new_content.encode("utf-8")).decode("ascii")
101
+ payload: dict = {
102
+ "message": "log: append interaction [skip ci]",
103
+ "content": encoded,
104
+ }
105
+ if sha:
106
+ payload["sha"] = sha
107
+
108
+ put_r = await client.put(
109
+ self._api_url, headers=self._headers, json=payload
110
+ )
111
+ if put_r.status_code not in (200, 201):
112
+ # 409 = SHA conflict (two concurrent appends) — rare for a portfolio bot.
113
+ # The interaction is safe in SQLite; this is a best-effort durability layer.
114
+ logger.warning(
115
+ "GithubLog PUT failed (%d) — interaction not logged durably.",
116
+ put_r.status_code,
117
+ )
118
+ except Exception as exc:
119
+ logger.warning("GithubLog.append error: %s", exc)
120
+
121
+ async def load_recent(self, n: int = 500) -> list[dict]:
122
+ """
123
+ Fetch the last `n` interaction records from the durable log.
124
+ Used at Space startup to reconstruct SQLite after an ephemeral restart.
125
+ Returns [] if the file doesn't exist or if the token is not configured.
126
+ """
127
+ if not self.enabled:
128
+ return []
129
+ try:
130
+ async with httpx.AsyncClient(timeout=_API_TIMEOUT) as client:
131
+ r = await client.get(self._api_url, headers=self._headers)
132
+ if r.status_code == 404:
133
+ return []
134
+ if r.status_code != 200:
135
+ logger.warning("GithubLog.load_recent GET failed (%d).", r.status_code)
136
+ return []
137
+ content = base64.b64decode(
138
+ r.json()["content"].replace("\n", "")
139
+ ).decode("utf-8")
140
+ lines = [ln.strip() for ln in content.splitlines() if ln.strip()]
141
+ records: list[dict] = []
142
+ for line in lines[-n:]:
143
+ try:
144
+ records.append(json.loads(line))
145
+ except json.JSONDecodeError:
146
+ pass
147
+ return records
148
+ except Exception as exc:
149
+ logger.warning("GithubLog.load_recent error: %s", exc)
150
+ return []
151
+
152
+ def append_feedback(self, session_id: str, feedback: int) -> None:
153
+ """
154
+ Durably record a feedback update without rewriting an existing line.
155
+ data_prep.py applies these correction records when building triplets.
156
+ """
157
+ if not self.enabled:
158
+ return
159
+ record = {
160
+ "type": "feedback",
161
+ "session_id": session_id,
162
+ "feedback": feedback,
163
+ "timestamp": datetime.now(tz=timezone.utc).isoformat(),
164
+ }
165
+ asyncio.create_task(self._append_bg(record))
app/services/llm_client.py CHANGED
@@ -1,5 +1,6 @@
1
  import json
2
- from typing import AsyncIterator, Literal, Protocol
 
3
 
4
  import httpx
5
  from groq import AsyncGroq
@@ -9,6 +10,41 @@ from app.core.config import Settings
9
  from app.core.exceptions import GenerationError
10
 
11
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
  class LLMClient(Protocol):
13
  async def complete(self, prompt: str, system: str, stream: bool) -> AsyncIterator[str]:
14
  ...
@@ -21,7 +57,7 @@ class LLMClient(Protocol):
21
 
22
 
23
  class GroqClient:
24
- def __init__(self, api_key: str, model_default: str, model_large: str):
25
  if not api_key or api_key == "gsk_placeholder":
26
  # We might be initialized in a test context without a real key
27
  self.client = None
@@ -30,6 +66,8 @@ class GroqClient:
30
 
31
  self.model_default = model_default
32
  self.model_large = model_large
 
 
33
 
34
  @retry(stop=stop_after_attempt(2), wait=wait_fixed(1.0), retry=retry_if_exception_type((httpx.RequestError, httpx.TimeoutException)))
35
  async def classify_complexity(self, query: str) -> Literal["simple", "complex"]:
@@ -86,12 +124,22 @@ class GroqClient:
86
 
87
 
88
  async def complete_with_complexity(self, prompt: str, system: str, stream: bool, complexity: str) -> AsyncIterator[str]:
89
- # Helper to allow pipeline nodes to pass the pre-classified complexity
 
 
90
  if not self.client:
91
  raise GenerationError("GroqClient not configured with an API Key.")
92
-
93
- model = self.model_large if complexity == "complex" else self.model_default
94
-
 
 
 
 
 
 
 
 
95
  try:
96
  stream_response = await self.client.chat.completions.create(
97
  messages=[
@@ -99,16 +147,22 @@ class GroqClient:
99
  {"role": "user", "content": prompt}
100
  ],
101
  model=model,
102
- stream=stream # Instruct strictly said stream=True yields token chunks.
103
  )
104
 
105
  if stream:
106
  async for chunk in stream_response:
107
  content = chunk.choices[0].delta.content
108
  if content:
 
 
 
109
  yield content
110
  else:
111
- yield stream_response.choices[0].message.content
 
 
 
112
 
113
  except Exception as e:
114
  raise GenerationError("Groq completion failed", context={"error": str(e)}) from e
@@ -186,7 +240,7 @@ class OllamaClient:
186
  yield token
187
 
188
 
189
- def get_llm_client(settings: Settings) -> LLMClient:
190
  if settings.LLM_PROVIDER == "ollama":
191
  if not settings.OLLAMA_BASE_URL or not settings.OLLAMA_MODEL:
192
  raise ValueError("OLLAMA_BASE_URL and OLLAMA_MODEL must be explicitly set when LLM_PROVIDER is 'ollama'")
@@ -199,5 +253,6 @@ def get_llm_client(settings: Settings) -> LLMClient:
199
  return GroqClient(
200
  api_key=settings.GROQ_API_KEY or "",
201
  model_default=settings.GROQ_MODEL_DEFAULT,
202
- model_large=settings.GROQ_MODEL_LARGE
 
203
  )
 
1
  import json
2
+ import time
3
+ from typing import AsyncIterator, Literal, Optional, Protocol
4
 
5
  import httpx
6
  from groq import AsyncGroq
 
10
  from app.core.exceptions import GenerationError
11
 
12
 
13
+ class TpmBucket:
14
+ """
15
+ Sliding 60-second token-consumption tracker shared across all Groq calls.
16
+
17
+ Issue 7: When the bucket exceeds 12,000 estimated tokens in the current
18
+ minute window, complete_with_complexity() downgrades 70B calls to 8B
19
+ automatically. This leaves 2,400 TPM headroom and prevents hard failures
20
+ (HTTP 429) from degrading the service under load.
21
+
22
+ Token estimates are rough (prompt_chars / 4) but accurate enough for this
23
+ protective purpose — the goal is load shedding, not exact accounting.
24
+ """
25
+
26
+ _WINDOW_SECONDS: int = 60
27
+ _DOWNGRADE_THRESHOLD: int = 12_000
28
+
29
+ def __init__(self) -> None:
30
+ self._count: int = 0
31
+ self._window_start: float = time.monotonic()
32
+
33
+ def add(self, estimated_tokens: int) -> None:
34
+ now = time.monotonic()
35
+ if now - self._window_start >= self._WINDOW_SECONDS:
36
+ self._count = 0
37
+ self._window_start = now
38
+ self._count += estimated_tokens
39
+
40
+ @property
41
+ def should_downgrade(self) -> bool:
42
+ now = time.monotonic()
43
+ if now - self._window_start >= self._WINDOW_SECONDS:
44
+ return False
45
+ return self._count > self._DOWNGRADE_THRESHOLD
46
+
47
+
48
  class LLMClient(Protocol):
49
  async def complete(self, prompt: str, system: str, stream: bool) -> AsyncIterator[str]:
50
  ...
 
57
 
58
 
59
  class GroqClient:
60
+ def __init__(self, api_key: str, model_default: str, model_large: str, tpm_bucket: Optional[TpmBucket] = None):
61
  if not api_key or api_key == "gsk_placeholder":
62
  # We might be initialized in a test context without a real key
63
  self.client = None
 
66
 
67
  self.model_default = model_default
68
  self.model_large = model_large
69
+ # Shared TPM bucket — injected at startup, None in test contexts.
70
+ self._tpm_bucket = tpm_bucket
71
 
72
  @retry(stop=stop_after_attempt(2), wait=wait_fixed(1.0), retry=retry_if_exception_type((httpx.RequestError, httpx.TimeoutException)))
73
  async def classify_complexity(self, query: str) -> Literal["simple", "complex"]:
 
124
 
125
 
126
  async def complete_with_complexity(self, prompt: str, system: str, stream: bool, complexity: str) -> AsyncIterator[str]:
127
+ # Helper to allow pipeline nodes to pass the pre-classified complexity.
128
+ # Issue 7: if the shared TPM bucket is above 12,000 tokens in the current
129
+ # minute window, downgrade 70B to 8B to prevent hard rate-limit failures.
130
  if not self.client:
131
  raise GenerationError("GroqClient not configured with an API Key.")
132
+
133
+ if complexity == "complex" and self._tpm_bucket is not None and self._tpm_bucket.should_downgrade:
134
+ model = self.model_default
135
+ else:
136
+ model = self.model_large if complexity == "complex" else self.model_default
137
+
138
+ # Estimate input tokens before the call so the bucket reflects the full
139
+ # cost even when the response is long. 4 chars ≈ 1 token (rough heuristic).
140
+ if self._tpm_bucket is not None:
141
+ self._tpm_bucket.add((len(prompt) + len(system)) // 4)
142
+
143
  try:
144
  stream_response = await self.client.chat.completions.create(
145
  messages=[
 
147
  {"role": "user", "content": prompt}
148
  ],
149
  model=model,
150
+ stream=stream # Instruct strictly said stream=True yields token chunks.
151
  )
152
 
153
  if stream:
154
  async for chunk in stream_response:
155
  content = chunk.choices[0].delta.content
156
  if content:
157
+ # Accumulate estimated response tokens in the bucket.
158
+ if self._tpm_bucket is not None:
159
+ self._tpm_bucket.add(len(content) // 4 or 1)
160
  yield content
161
  else:
162
+ full = stream_response.choices[0].message.content
163
+ if self._tpm_bucket is not None and full:
164
+ self._tpm_bucket.add(len(full) // 4)
165
+ yield full
166
 
167
  except Exception as e:
168
  raise GenerationError("Groq completion failed", context={"error": str(e)}) from e
 
240
  yield token
241
 
242
 
243
+ def get_llm_client(settings: Settings, tpm_bucket: Optional[TpmBucket] = None) -> LLMClient:
244
  if settings.LLM_PROVIDER == "ollama":
245
  if not settings.OLLAMA_BASE_URL or not settings.OLLAMA_MODEL:
246
  raise ValueError("OLLAMA_BASE_URL and OLLAMA_MODEL must be explicitly set when LLM_PROVIDER is 'ollama'")
 
253
  return GroqClient(
254
  api_key=settings.GROQ_API_KEY or "",
255
  model_default=settings.GROQ_MODEL_DEFAULT,
256
+ model_large=settings.GROQ_MODEL_LARGE,
257
+ tpm_bucket=tpm_bucket,
258
  )
requirements.txt CHANGED
@@ -16,7 +16,9 @@ groq>=0.5.0
16
  httpx>=0.27.0
17
  numpy>=1.26.0
18
  slowapi>=0.1.9
19
- presidio-analyzer>=2.2.354
 
 
20
  tenacity>=8.3.0
21
  python-jose[cryptography]>=3.3.0
22
  google-genai>=1.0.0
 
16
  httpx>=0.27.0
17
  numpy>=1.26.0
18
  slowapi>=0.1.9
19
+ # presidio-analyzer was removed (Issue 4): spaCy-based NLP added 50-100ms to every
20
+ # request for near-zero real-world PII risk. Replaced with six compiled regex
21
+ # patterns in sanitizer.py that run in microseconds. See copilot-instructions.md.
22
  tenacity>=8.3.0
23
  python-jose[cryptography]>=3.3.0
24
  google-genai>=1.0.0