XQ commited on
Commit
c44bb5c
·
1 Parent(s): 6a54ecb

Add Pipeline visualization

Browse files
Files changed (5) hide show
  1. src/agent/router.py +21 -2
  2. src/api/routes.py +47 -0
  3. src/models.py +28 -1
  4. src/retrieval/hybrid.py +34 -1
  5. src/ui/app.py +105 -0
src/agent/router.py CHANGED
@@ -5,7 +5,7 @@ import math
5
 
6
  from langchain_core.runnables import Runnable
7
 
8
- from src.models import IntentType, GenerationResponse
9
  from src.agent.intent_classifier import IntentClassifier
10
  from src.retrieval.hybrid import HybridRetriever
11
  from src.retrieval.reranker import Reranker
@@ -104,6 +104,7 @@ class QueryRouter:
104
 
105
  # Detect language and translate to Danish for retrieval if needed
106
  retrieval_query, user_language = self._detect_and_translate_query(query)
 
107
 
108
  intent = self._intent_classifier.classify(query)
109
  logger.info("Classified intent: %s", intent.value)
@@ -112,11 +113,28 @@ class QueryRouter:
112
  should_retrieve = intent != IntentType.UNKNOWN
113
  logger.debug("[DEBUG] Retrieval executed: %s (intent=%s)", should_retrieve, intent.value)
114
 
115
- results = self._hybrid_retriever.search(retrieval_query, top_k=top_k) if should_retrieve else []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
116
  logger.info("Retrieved %d results from hybrid search", len(results))
117
  logger.debug("[DEBUG] Retrieval returned %d results", len(results))
118
 
119
  reranked = self._reranker.rerank(retrieval_query, results, top_k=top_k) if results else []
 
120
  logger.info("Reranked to %d results", len(reranked))
121
 
122
  if reranked and intent == IntentType.FACTUAL:
@@ -143,6 +161,7 @@ class QueryRouter:
143
  sources=reranked,
144
  intent=intent,
145
  confidence=confidence,
 
146
  )
147
 
148
  def _build_prompt(
 
5
 
6
  from langchain_core.runnables import Runnable
7
 
8
+ from src.models import IntentType, GenerationResponse, PipelineDetails
9
  from src.agent.intent_classifier import IntentClassifier
10
  from src.retrieval.hybrid import HybridRetriever
11
  from src.retrieval.reranker import Reranker
 
104
 
105
  # Detect language and translate to Danish for retrieval if needed
106
  retrieval_query, user_language = self._detect_and_translate_query(query)
107
+ translated = retrieval_query != query
108
 
109
  intent = self._intent_classifier.classify(query)
110
  logger.info("Classified intent: %s", intent.value)
 
113
  should_retrieve = intent != IntentType.UNKNOWN
114
  logger.debug("[DEBUG] Retrieval executed: %s (intent=%s)", should_retrieve, intent.value)
115
 
116
+ # Use detailed search to capture intermediate results
117
+ pipeline = PipelineDetails(
118
+ original_query=query,
119
+ retrieval_query=retrieval_query,
120
+ detected_language=user_language,
121
+ translated=translated,
122
+ )
123
+
124
+ if should_retrieve:
125
+ hybrid_result = self._hybrid_retriever.search_detailed(retrieval_query, top_k=top_k)
126
+ pipeline.dense_results = hybrid_result.dense_results
127
+ pipeline.sparse_results = hybrid_result.sparse_results
128
+ pipeline.fused_results = hybrid_result.fused_results
129
+ results = hybrid_result.fused_results
130
+ else:
131
+ results = []
132
+
133
  logger.info("Retrieved %d results from hybrid search", len(results))
134
  logger.debug("[DEBUG] Retrieval returned %d results", len(results))
135
 
136
  reranked = self._reranker.rerank(retrieval_query, results, top_k=top_k) if results else []
137
+ pipeline.reranked_results = reranked
138
  logger.info("Reranked to %d results", len(reranked))
139
 
140
  if reranked and intent == IntentType.FACTUAL:
 
161
  sources=reranked,
162
  intent=intent,
163
  confidence=confidence,
164
+ pipeline_details=pipeline,
165
  )
166
 
167
  def _build_prompt(
src/api/routes.py CHANGED
@@ -62,6 +62,28 @@ class QueryRequest(BaseModel):
62
  strategy: str = "recursive"
63
 
64
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65
  class QueryResponse(BaseModel):
66
  """Response body for the query endpoint."""
67
 
@@ -69,6 +91,7 @@ class QueryResponse(BaseModel):
69
  sources: list[dict[str, str | float]]
70
  intent: str
71
  confidence: float
 
72
 
73
 
74
  class IngestRequest(BaseModel):
@@ -154,11 +177,35 @@ async def query_documents(request: QueryRequest) -> QueryResponse:
154
  for result in response.sources
155
  ]
156
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
157
  return QueryResponse(
158
  answer=response.answer,
159
  sources=sources,
160
  intent=response.intent.value,
161
  confidence=response.confidence,
 
162
  )
163
 
164
 
 
62
  strategy: str = "recursive"
63
 
64
 
65
+ class PipelineResultItem(BaseModel):
66
+ """A single result item in pipeline details."""
67
+
68
+ document_id: str
69
+ chunk_id: str
70
+ score: float
71
+ source: str
72
+
73
+
74
+ class PipelineDetailsResponse(BaseModel):
75
+ """Intermediate pipeline data for the query response."""
76
+
77
+ original_query: str = ""
78
+ retrieval_query: str = ""
79
+ detected_language: str = ""
80
+ translated: bool = False
81
+ dense_results: list[PipelineResultItem] = []
82
+ sparse_results: list[PipelineResultItem] = []
83
+ fused_results: list[PipelineResultItem] = []
84
+ reranked_results: list[PipelineResultItem] = []
85
+
86
+
87
  class QueryResponse(BaseModel):
88
  """Response body for the query endpoint."""
89
 
 
91
  sources: list[dict[str, str | float]]
92
  intent: str
93
  confidence: float
94
+ pipeline_details: PipelineDetailsResponse = PipelineDetailsResponse()
95
 
96
 
97
  class IngestRequest(BaseModel):
 
177
  for result in response.sources
178
  ]
179
 
180
+ def _to_pipeline_items(results: list) -> list[PipelineResultItem]:
181
+ return [
182
+ PipelineResultItem(
183
+ document_id=r.chunk.document_id,
184
+ chunk_id=r.chunk.chunk_id,
185
+ score=r.score,
186
+ source=r.source,
187
+ )
188
+ for r in results
189
+ ]
190
+
191
+ pd = response.pipeline_details
192
+ pipeline_details = PipelineDetailsResponse(
193
+ original_query=pd.original_query,
194
+ retrieval_query=pd.retrieval_query,
195
+ detected_language=pd.detected_language,
196
+ translated=pd.translated,
197
+ dense_results=_to_pipeline_items(pd.dense_results),
198
+ sparse_results=_to_pipeline_items(pd.sparse_results),
199
+ fused_results=_to_pipeline_items(pd.fused_results),
200
+ reranked_results=_to_pipeline_items(pd.reranked_results),
201
+ )
202
+
203
  return QueryResponse(
204
  answer=response.answer,
205
  sources=sources,
206
  intent=response.intent.value,
207
  confidence=response.confidence,
208
+ pipeline_details=pipeline_details,
209
  )
210
 
211
 
src/models.py CHANGED
@@ -57,6 +57,31 @@ class QueryResult:
57
  source: str
58
 
59
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
60
  @dataclass
61
  class GenerationResponse:
62
  """Structured response from the generation pipeline.
@@ -66,9 +91,11 @@ class GenerationResponse:
66
  sources: List of source chunks used for generation.
67
  intent: Classified intent of the original query.
68
  confidence: Model confidence in the answer (0.0-1.0).
 
69
  """
70
 
71
  answer: str
72
  sources: list[QueryResult]
73
  intent: IntentType
74
- confidence: float
 
 
57
  source: str
58
 
59
 
60
+ @dataclass
61
+ class PipelineDetails:
62
+ """Intermediate pipeline data for debugging and transparency.
63
+
64
+ Attributes:
65
+ original_query: The user's original query text.
66
+ retrieval_query: The query used for retrieval (may be translated).
67
+ detected_language: Detected language of the original query.
68
+ translated: Whether the query was translated for retrieval.
69
+ dense_results: Results from dense (vector) retrieval.
70
+ sparse_results: Results from sparse (BM25) retrieval.
71
+ fused_results: Results after reciprocal rank fusion.
72
+ reranked_results: Results after cross-encoder reranking.
73
+ """
74
+
75
+ original_query: str = ""
76
+ retrieval_query: str = ""
77
+ detected_language: str = ""
78
+ translated: bool = False
79
+ dense_results: list[QueryResult] = field(default_factory=list)
80
+ sparse_results: list[QueryResult] = field(default_factory=list)
81
+ fused_results: list[QueryResult] = field(default_factory=list)
82
+ reranked_results: list[QueryResult] = field(default_factory=list)
83
+
84
+
85
  @dataclass
86
  class GenerationResponse:
87
  """Structured response from the generation pipeline.
 
91
  sources: List of source chunks used for generation.
92
  intent: Classified intent of the original query.
93
  confidence: Model confidence in the answer (0.0-1.0).
94
+ pipeline_details: Optional intermediate pipeline data.
95
  """
96
 
97
  answer: str
98
  sources: list[QueryResult]
99
  intent: IntentType
100
+ confidence: float
101
+ pipeline_details: PipelineDetails = field(default_factory=PipelineDetails)
src/retrieval/hybrid.py CHANGED
@@ -1,6 +1,7 @@
1
  """Hybrid search combining dense and sparse retrieval with reciprocal rank fusion."""
2
 
3
  import logging
 
4
 
5
  from src.models import QueryResult
6
  from src.retrieval.bm25_search import BM25Search
@@ -10,6 +11,21 @@ from src.retrieval.vector_store import VectorStore
10
  logger = logging.getLogger(__name__)
11
 
12
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
  class HybridRetriever:
14
  """Combines dense (vector) and sparse (BM25) retrieval using rank fusion."""
15
 
@@ -46,6 +62,19 @@ class HybridRetriever:
46
  Returns:
47
  List of QueryResult objects sorted by fused score.
48
  """
 
 
 
 
 
 
 
 
 
 
 
 
 
49
  query_embedding = self._embedder.embed_text(query)
50
  dense_results = self._vector_store.search(query_embedding, top_k)
51
  sparse_results = self._bm25_search.search(query, top_k)
@@ -57,7 +86,11 @@ class HybridRetriever:
57
  )
58
 
59
  fused = self.reciprocal_rank_fusion(dense_results, sparse_results, k=60)
60
- return fused[:top_k]
 
 
 
 
61
 
62
  def reciprocal_rank_fusion(
63
  self,
 
1
  """Hybrid search combining dense and sparse retrieval with reciprocal rank fusion."""
2
 
3
  import logging
4
+ from dataclasses import dataclass
5
 
6
  from src.models import QueryResult
7
  from src.retrieval.bm25_search import BM25Search
 
11
  logger = logging.getLogger(__name__)
12
 
13
 
14
+ @dataclass
15
+ class HybridSearchResult:
16
+ """Container for hybrid search results including intermediate stages.
17
+
18
+ Attributes:
19
+ dense_results: Results from dense (vector) retrieval.
20
+ sparse_results: Results from sparse (BM25) retrieval.
21
+ fused_results: Results after reciprocal rank fusion.
22
+ """
23
+
24
+ dense_results: list[QueryResult]
25
+ sparse_results: list[QueryResult]
26
+ fused_results: list[QueryResult]
27
+
28
+
29
  class HybridRetriever:
30
  """Combines dense (vector) and sparse (BM25) retrieval using rank fusion."""
31
 
 
62
  Returns:
63
  List of QueryResult objects sorted by fused score.
64
  """
65
+ result = self.search_detailed(query, top_k)
66
+ return result.fused_results
67
+
68
+ def search_detailed(self, query: str, top_k: int) -> HybridSearchResult:
69
+ """Execute hybrid search and return all intermediate results.
70
+
71
+ Args:
72
+ query: The search query string.
73
+ top_k: Number of top results to return after fusion.
74
+
75
+ Returns:
76
+ HybridSearchResult containing dense, sparse, and fused results.
77
+ """
78
  query_embedding = self._embedder.embed_text(query)
79
  dense_results = self._vector_store.search(query_embedding, top_k)
80
  sparse_results = self._bm25_search.search(query, top_k)
 
86
  )
87
 
88
  fused = self.reciprocal_rank_fusion(dense_results, sparse_results, k=60)
89
+ return HybridSearchResult(
90
+ dense_results=dense_results,
91
+ sparse_results=sparse_results,
92
+ fused_results=fused[:top_k],
93
+ )
94
 
95
  def reciprocal_rank_fusion(
96
  self,
src/ui/app.py CHANGED
@@ -70,6 +70,21 @@ TEXTS: Dict[str, Dict[str, str]] = {
70
  "model_llm": "LLM",
71
  "model_embedding": "Embedding",
72
  "model_unavailable": "Kunne ikke hente modelinfo.",
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
  },
74
  "en": {
75
  "page_title": "Document Assistant",
@@ -125,6 +140,21 @@ TEXTS: Dict[str, Dict[str, str]] = {
125
  "model_llm": "LLM",
126
  "model_embedding": "Embedding",
127
  "model_unavailable": "Could not fetch model info.",
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
128
  },
129
  }
130
 
@@ -449,5 +479,80 @@ if search_clicked and question.strip():
449
  else:
450
  st.info(t["no_sources"])
451
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
452
  elif search_clicked:
453
  st.warning(t["empty_warning"])
 
70
  "model_llm": "LLM",
71
  "model_embedding": "Embedding",
72
  "model_unavailable": "Kunne ikke hente modelinfo.",
73
+ "pipeline_heading": "Pipeline-detaljer",
74
+ "pipeline_translation": "Oversaettelse",
75
+ "pipeline_original": "Original foresporgsel",
76
+ "pipeline_translated": "Oversat til dansk",
77
+ "pipeline_lang": "Sprog registreret",
78
+ "pipeline_no_translation": "Ingen oversaettelse (foresporgsel allerede paa dansk)",
79
+ "pipeline_bm25": "BM25-resultater (leksikalsk soegning)",
80
+ "pipeline_dense": "Vektorsoegning (semantisk)",
81
+ "pipeline_fused": "RRF-fusioneret raekkefoelge",
82
+ "pipeline_reranked": "Reranking (endelig raekkefoelge)",
83
+ "pipeline_doc": "Dokument",
84
+ "pipeline_score": "Score",
85
+ "pipeline_rank": "#",
86
+ "pipeline_no_results": "Ingen resultater",
87
+ "pipeline_score_change": "Score-aendring",
88
  },
89
  "en": {
90
  "page_title": "Document Assistant",
 
140
  "model_llm": "LLM",
141
  "model_embedding": "Embedding",
142
  "model_unavailable": "Could not fetch model info.",
143
+ "pipeline_heading": "Pipeline Details",
144
+ "pipeline_translation": "Query Translation",
145
+ "pipeline_original": "Original query",
146
+ "pipeline_translated": "Translated to Danish",
147
+ "pipeline_lang": "Detected language",
148
+ "pipeline_no_translation": "No translation (query already in Danish)",
149
+ "pipeline_bm25": "BM25 Results (lexical search)",
150
+ "pipeline_dense": "Vector Search (semantic)",
151
+ "pipeline_fused": "RRF Fused Ranking",
152
+ "pipeline_reranked": "Reranked (final ranking)",
153
+ "pipeline_doc": "Document",
154
+ "pipeline_score": "Score",
155
+ "pipeline_rank": "#",
156
+ "pipeline_no_results": "No results",
157
+ "pipeline_score_change": "Score change",
158
  },
159
  }
160
 
 
479
  else:
480
  st.info(t["no_sources"])
481
 
482
+ # -- Pipeline Details --
483
+ pd = data.get("pipeline_details", {})
484
+ if pd:
485
+ with st.expander(t["pipeline_heading"], expanded=False):
486
+ # 1) Query translation
487
+ st.markdown(f'**{t["pipeline_translation"]}**')
488
+ if pd.get("translated"):
489
+ st.markdown(
490
+ f'- {t["pipeline_lang"]}: **{pd.get("detected_language", "")}**\n'
491
+ f'- {t["pipeline_original"]}: {pd.get("original_query", "")}\n'
492
+ f'- {t["pipeline_translated"]}: {pd.get("retrieval_query", "")}'
493
+ )
494
+ else:
495
+ st.markdown(f'_{t["pipeline_no_translation"]}_')
496
+
497
+ st.markdown("---")
498
+
499
+ def _render_result_table(results: list[dict], label: str) -> None:
500
+ """Render a ranked results table."""
501
+ st.markdown(f"**{label}**")
502
+ if not results:
503
+ st.caption(t["pipeline_no_results"])
504
+ return
505
+ header = f'| {t["pipeline_rank"]} | {t["pipeline_doc"]} | {t["pipeline_score"]} |\n|---|---|---|'
506
+ rows = "\n".join(
507
+ f'| {i + 1} | {r.get("document_id", "")} | {r.get("score", 0):.4f} |'
508
+ for i, r in enumerate(results)
509
+ )
510
+ st.markdown(f"{header}\n{rows}")
511
+
512
+ # 2) BM25 results
513
+ _render_result_table(pd.get("sparse_results", []), t["pipeline_bm25"])
514
+
515
+ st.markdown("---")
516
+
517
+ # 3) Vector search results
518
+ _render_result_table(pd.get("dense_results", []), t["pipeline_dense"])
519
+
520
+ st.markdown("---")
521
+
522
+ # 4) RRF fused ranking
523
+ _render_result_table(pd.get("fused_results", []), t["pipeline_fused"])
524
+
525
+ st.markdown("---")
526
+
527
+ # 5) Reranked results with score change
528
+ reranked = pd.get("reranked_results", [])
529
+ st.markdown(f'**{t["pipeline_reranked"]}**')
530
+ if reranked:
531
+ # Build a map from chunk_id -> fused score for comparison
532
+ fused_scores: dict[str, float] = {
533
+ r.get("chunk_id", ""): r.get("score", 0.0)
534
+ for r in pd.get("fused_results", [])
535
+ }
536
+ header = (
537
+ f'| {t["pipeline_rank"]} | {t["pipeline_doc"]} | '
538
+ f'{t["pipeline_score"]} | {t["pipeline_score_change"]} |\n'
539
+ f"|---|---|---|---|"
540
+ )
541
+ rows_list = []
542
+ for i, r in enumerate(reranked):
543
+ cid = r.get("chunk_id", "")
544
+ new_score = r.get("score", 0.0)
545
+ old_score = fused_scores.get(cid)
546
+ if old_score is not None:
547
+ change = f"RRF {old_score:.4f} -> {new_score:.4f}"
548
+ else:
549
+ change = "-"
550
+ rows_list.append(
551
+ f'| {i + 1} | {r.get("document_id", "")} | {new_score:.4f} | {change} |'
552
+ )
553
+ st.markdown(f"{header}\n" + "\n".join(rows_list))
554
+ else:
555
+ st.caption(t["pipeline_no_results"])
556
+
557
  elif search_clicked:
558
  st.warning(t["empty_warning"])