Peterase commited on
Commit
2c1e73f
Β·
1 Parent(s): daf250b

feat: detailed step-by-step pipeline logging

Browse files

- LiveSearch: logs each step (NewsAPI/DDG source, article titles, Jina extraction per-URL)
- JinaReranker: logs input docs, per-rank scores with source/title
- RAG pipeline: logs hybrid search start/end, merged results with scores
- Entity extraction: logs to INFO instead of print()
- Quality filter: logs doc count after blocked-source filter

src/core/orchestrator/query_orchestrator.py CHANGED
@@ -447,93 +447,97 @@ class QueryOrchestrator:
447
 
448
  async def _execute_live_search(self, query: str) -> List[Dict[str, Any]]:
449
  """
450
- Execute live search with Jina Reader enhancement.
451
-
452
- Strategy:
453
- 1. Try NewsAPI first (if available and temporal query)
454
- 2. Fallback to DuckDuckGo
455
- 3. Extract full articles using Jina Reader (parallel)
456
- 4. Replace snippets with full content (14,000+ chars)
457
- 5. Fallback to snippets if extraction fails
458
-
459
- Args:
460
- query: Search query (English)
461
-
462
- Returns:
463
- List of enhanced live search results with full articles
464
  """
465
  results = []
466
-
467
- # Try NewsAPI first (best for temporal queries)
 
468
  if self.newsapi and self.newsapi.is_available():
469
  try:
470
- logger.info(f"Live search: trying NewsAPI first for '{query}'")
471
  newsapi_results = await self.newsapi.search(query)
472
-
473
  if newsapi_results:
474
- logger.info(f"NewsAPI: {len(newsapi_results)} results")
 
 
475
  results.extend(newsapi_results)
 
476
  else:
477
- logger.info("NewsAPI: no results, falling back to DuckDuckGo")
478
  except Exception as e:
479
- logger.warning(f"NewsAPI failed: {e}, falling back to DuckDuckGo")
480
-
481
- # Fallback to DuckDuckGo (or primary if NewsAPI not available)
 
 
482
  if not results:
483
  try:
484
- logger.info(f"Live search: using DuckDuckGo for '{query}'")
485
  results = await self.live_search.search(query)
486
- logger.info(f"DuckDuckGo: {len(results)} results")
 
 
 
 
 
 
487
  except Exception as e:
488
- logger.error(f"DuckDuckGo search error: {e}")
489
  return []
490
-
491
  if not results:
492
- logger.warning("No live search results from any source")
493
  return results
494
-
495
- # Step 2: Check if Jina Reader is enabled
 
 
496
  from src.core.config import settings
497
-
498
  if not settings.ENABLE_JINA_READER:
499
- logger.info("Jina Reader disabled - using snippets only")
500
  return results
501
-
502
- # Step 3: Try to enhance with Jina Reader
503
  try:
504
  from src.infrastructure.adapters.jina_reader_adapter import get_jina_reader_adapter
505
-
506
  jina = get_jina_reader_adapter(
507
  timeout=settings.JINA_READER_TIMEOUT,
508
  max_concurrent=settings.JINA_READER_MAX_CONCURRENT
509
  )
510
-
511
- # Step 4: Extract full articles (replaces snippets)
 
 
 
 
512
  enhanced_results = await jina.enhance_search_results(
513
  results,
514
- fallback_to_snippet=True # Keep snippet if Jina fails
515
  )
516
-
517
- # Log enhancement stats
518
  full_articles = sum(1 for r in enhanced_results if r.get("full_article"))
519
  snippets = len(enhanced_results) - full_articles
520
- total_chars = sum(
521
- r.get("content_length", 0)
522
- for r in enhanced_results
523
- if r.get("full_article")
524
- )
525
-
526
  logger.info(
527
- f"Jina enhancement: {full_articles} full articles ({total_chars:,} chars), "
528
- f"{snippets} snippets (fallback)"
 
 
529
  )
530
-
 
 
 
 
531
  return enhanced_results
532
-
533
  except ImportError:
534
- logger.warning("Jina Reader not available - using snippets only")
535
  return results
536
-
537
  except Exception as e:
538
- logger.warning(f"Jina Reader enhancement failed: {e} - using snippets")
539
  return results
 
447
 
448
  async def _execute_live_search(self, query: str) -> List[Dict[str, Any]]:
449
  """
450
+ Execute live search: NewsAPI β†’ DuckDuckGo fallback β†’ Jina Reader enhancement.
 
 
 
 
 
 
 
 
 
 
 
 
 
451
  """
452
  results = []
453
+ source_used = None
454
+
455
+ # ── Step 1: Try NewsAPI first ─────────────────────────────────────────
456
  if self.newsapi and self.newsapi.is_available():
457
  try:
458
+ logger.info(f"[LiveSearch] Step 1/3 β€” NewsAPI search: '{query[:60]}'")
459
  newsapi_results = await self.newsapi.search(query)
 
460
  if newsapi_results:
461
+ logger.info(f"[LiveSearch] NewsAPI returned {len(newsapi_results)} articles")
462
+ for i, r in enumerate(newsapi_results[:3], 1):
463
+ logger.info(f"[LiveSearch] NewsAPI #{i}: [{r.get('published_at','?')[:10]}] {r.get('title','?')[:70]}")
464
  results.extend(newsapi_results)
465
+ source_used = "newsapi"
466
  else:
467
+ logger.info("[LiveSearch] NewsAPI returned 0 results β†’ falling back to DuckDuckGo")
468
  except Exception as e:
469
+ logger.warning(f"[LiveSearch] NewsAPI failed: {e} β†’ falling back to DuckDuckGo")
470
+ else:
471
+ logger.info("[LiveSearch] NewsAPI not available β†’ using DuckDuckGo directly")
472
+
473
+ # ── Step 2: DuckDuckGo fallback ───────────────────────────────────────
474
  if not results:
475
  try:
476
+ logger.info(f"[LiveSearch] Step 1/3 β€” DuckDuckGo search: '{query[:60]}'")
477
  results = await self.live_search.search(query)
478
+ if results:
479
+ logger.info(f"[LiveSearch] DuckDuckGo returned {len(results)} articles")
480
+ for i, r in enumerate(results[:3], 1):
481
+ logger.info(f"[LiveSearch] DDG #{i}: [{r.get('published_at','?')[:10]}] {r.get('title','?')[:70]}")
482
+ source_used = "duckduckgo"
483
+ else:
484
+ logger.warning("[LiveSearch] DuckDuckGo returned 0 results")
485
  except Exception as e:
486
+ logger.error(f"[LiveSearch] DuckDuckGo search error: {e}")
487
  return []
488
+
489
  if not results:
490
+ logger.warning("[LiveSearch] No live search results from any source")
491
  return results
492
+
493
+ logger.info(f"[LiveSearch] Step 2/3 β€” Got {len(results)} raw results from {source_used}")
494
+
495
+ # ── Step 3: Jina Reader article extraction ────────────────────────────
496
  from src.core.config import settings
497
+
498
  if not settings.ENABLE_JINA_READER:
499
+ logger.info("[LiveSearch] Step 3/3 β€” Jina Reader disabled, using snippets only")
500
  return results
501
+
 
502
  try:
503
  from src.infrastructure.adapters.jina_reader_adapter import get_jina_reader_adapter
504
+
505
  jina = get_jina_reader_adapter(
506
  timeout=settings.JINA_READER_TIMEOUT,
507
  max_concurrent=settings.JINA_READER_MAX_CONCURRENT
508
  )
509
+
510
+ logger.info(f"[LiveSearch] Step 3/3 β€” Jina Reader extracting full articles from {len(results)} URLs")
511
+ urls_to_extract = [r.get("url", "?")[:80] for r in results[:5]]
512
+ for i, url in enumerate(urls_to_extract, 1):
513
+ logger.info(f"[LiveSearch] Extracting #{i}: {url}")
514
+
515
  enhanced_results = await jina.enhance_search_results(
516
  results,
517
+ fallback_to_snippet=True
518
  )
519
+
 
520
  full_articles = sum(1 for r in enhanced_results if r.get("full_article"))
521
  snippets = len(enhanced_results) - full_articles
522
+ total_chars = sum(r.get("content_length", 0) for r in enhanced_results if r.get("full_article"))
523
+ avg_chars = total_chars // full_articles if full_articles else 0
524
+
 
 
 
525
  logger.info(
526
+ f"[LiveSearch] Jina extraction complete: "
527
+ f"{full_articles}/{len(enhanced_results)} full articles, "
528
+ f"{snippets} snippets, "
529
+ f"{total_chars:,} total chars (avg {avg_chars:,}/article)"
530
  )
531
+ for i, r in enumerate(enhanced_results[:5], 1):
532
+ status = "FULL" if r.get("full_article") else f"SNIPPET({r.get('jina_error','?')})"
533
+ chars = r.get("content_length", len(r.get("content", "")))
534
+ logger.info(f"[LiveSearch] #{i} [{status}] {chars:,} chars β€” {r.get('title','?')[:60]}")
535
+
536
  return enhanced_results
537
+
538
  except ImportError:
539
+ logger.warning("[LiveSearch] Jina Reader not available β€” using snippets only")
540
  return results
 
541
  except Exception as e:
542
+ logger.warning(f"[LiveSearch] Jina Reader failed: {e} β€” using snippets")
543
  return results
src/core/use_cases/rag_chat_use_case.py CHANGED
@@ -359,11 +359,11 @@ JSON:"""
359
  from src.infrastructure.adapters.entity_extractor import entity_extractor
360
  if entity_extractor:
361
  entities = entity_extractor.extract(query)
362
- logger.info(f"[RAG] Extracted entities:")
363
- print(f" - Locations: {entities.locations}")
364
- print(f" - Organizations: {entities.organizations}")
365
- print(f" - Temporal keywords: {entities.temporal_keywords}")
366
-
367
  # Auto-detect source filter if not provided
368
  if not source_filter:
369
  auto_source = entity_extractor.get_source_filter(entities)
@@ -481,7 +481,11 @@ JSON:"""
481
 
482
  # ── HYBRID SEARCH EXECUTION ────────────────────────────────────────────
483
  if use_hybrid and strategy and (strategy.use_live or strategy.use_db):
484
- logger.info(f"[RAG] Executing hybrid search...")
 
 
 
 
485
 
486
  # Execute hybrid search (parallel live + DB)
487
  try:
@@ -495,21 +499,31 @@ JSON:"""
495
  top_k=per_lang_limit
496
  )
497
 
498
- logger.info(f"[RAG] Hybrid search returned {len(db_results)} DB + {len(live_results)} live results")
 
 
 
499
 
500
  # Merge and rank results
 
501
  all_docs = self.hybrid_ranker.merge_and_rank(
502
  db_results=db_results,
503
  live_results=live_results,
504
  strategy=strategy,
505
  query=expanded_query,
506
- final_top_n=top_k * 3 # Get more candidates for quality filtering
507
  )
508
 
509
- logger.info(f"[RAG] After hybrid ranking: {len(all_docs)} results")
 
 
 
 
 
 
510
 
511
  except Exception as e:
512
- logger.info(f"[RAG] Hybrid search failed: {e} - falling back to traditional pipeline")
513
  use_hybrid = False
514
  all_docs = []
515
 
@@ -616,6 +630,8 @@ JSON:"""
616
  if (d.get("metadata", {}).get("source") or "").lower().replace(" ", "") not in _BLOCKED_SOURCES
617
  ] or all_docs
618
 
 
 
619
  # ── Relevance threshold β€” drop docs the reranker scored too low ───────
620
  # Raised from 0.15 β†’ 0.25 based on live testing.
621
  # The airport article (bbc_swahili) was scoring ~0.18 on GERD queries
 
359
  from src.infrastructure.adapters.entity_extractor import entity_extractor
360
  if entity_extractor:
361
  entities = entity_extractor.extract(query)
362
+ logger.info(
363
+ f"[RAG] Entities β€” locations={entities.locations} "
364
+ f"orgs={entities.organizations} "
365
+ f"temporal={entities.temporal_keywords}"
366
+ )
367
  # Auto-detect source filter if not provided
368
  if not source_filter:
369
  auto_source = entity_extractor.get_source_filter(entities)
 
481
 
482
  # ── HYBRID SEARCH EXECUTION ────────────────────────────────────────────
483
  if use_hybrid and strategy and (strategy.use_live or strategy.use_db):
484
+ logger.info(
485
+ f"[RAG] ═══ HYBRID SEARCH START ═══ "
486
+ f"live={strategy.use_live} db={strategy.use_db} "
487
+ f"weights={strategy.live_weight:.1f}/{strategy.db_weight:.1f}"
488
+ )
489
 
490
  # Execute hybrid search (parallel live + DB)
491
  try:
 
499
  top_k=per_lang_limit
500
  )
501
 
502
+ logger.info(
503
+ f"[RAG] Hybrid search raw results: "
504
+ f"{len(db_results)} DB docs + {len(live_results)} live docs"
505
+ )
506
 
507
  # Merge and rank results
508
+ logger.info(f"[RAG] Merging and ranking {len(db_results)+len(live_results)} total candidates...")
509
  all_docs = self.hybrid_ranker.merge_and_rank(
510
  db_results=db_results,
511
  live_results=live_results,
512
  strategy=strategy,
513
  query=expanded_query,
514
+ final_top_n=top_k * 3
515
  )
516
 
517
+ logger.info(f"[RAG] After hybrid ranking: {len(all_docs)} docs")
518
+ for i, doc in enumerate(all_docs[:5], 1):
519
+ score = doc.get("rerank_score") or doc.get("score", 0)
520
+ src = doc.get("source") or doc.get("metadata", {}).get("source", "?")
521
+ stype = doc.get("source_type", "db")
522
+ title = doc.get("title") or doc.get("content", "")[:60]
523
+ logger.info(f"[RAG] Merged #{i}: score={score:.3f} [{stype}] src={src} β€” {title[:60]}")
524
 
525
  except Exception as e:
526
+ logger.warning(f"[RAG] Hybrid search failed: {e} β€” falling back to traditional pipeline")
527
  use_hybrid = False
528
  all_docs = []
529
 
 
630
  if (d.get("metadata", {}).get("source") or "").lower().replace(" ", "") not in _BLOCKED_SOURCES
631
  ] or all_docs
632
 
633
+ logger.info(f"[RAG] After blocked-source filter: {len(quality_docs)} docs")
634
+
635
  # ── Relevance threshold β€” drop docs the reranker scored too low ───────
636
  # Raised from 0.15 β†’ 0.25 based on live testing.
637
  # The airport article (bbc_swahili) was scoring ~0.18 on GERD queries
src/infrastructure/adapters/jina_reranker_adapter.py CHANGED
@@ -75,7 +75,6 @@ class JinaRerankerAPIAdapter(RerankerPort):
75
  ) -> List[Dict[str, Any]]:
76
  """
77
  Rerank documents using Jina API.
78
-
79
  Sends all docs in one request β€” Jina returns them sorted by relevance.
80
  Falls back to vector score ordering if API unavailable.
81
  """
@@ -83,10 +82,9 @@ class JinaRerankerAPIAdapter(RerankerPort):
83
  return []
84
 
85
  if not self.api_key:
86
- logger.warning("Jina Reranker API disabled β€” falling back to score ordering")
87
  return sorted(docs, key=lambda x: x.get("score", 0), reverse=True)[:top_n]
88
 
89
- # Extract text content β€” truncate to 2048 chars (Jina handles tokenization)
90
  MAX_CHARS = 2048
91
  valid_docs = []
92
  doc_texts = []
@@ -99,6 +97,14 @@ class JinaRerankerAPIAdapter(RerankerPort):
99
  if not doc_texts:
100
  return []
101
 
 
 
 
 
 
 
 
 
102
  t0 = time.time()
103
  try:
104
  response = self._get_client().post(
@@ -107,8 +113,8 @@ class JinaRerankerAPIAdapter(RerankerPort):
107
  "model": self.model,
108
  "query": query,
109
  "documents": doc_texts,
110
- "top_n": len(doc_texts), # Get all scores, we'll slice ourselves
111
- "return_documents": False, # Save tokens β€” we already have the docs
112
  }
113
  )
114
 
@@ -119,42 +125,40 @@ class JinaRerankerAPIAdapter(RerankerPort):
119
  results = data.get("results", [])
120
  usage = data.get("usage", {})
121
 
122
- # results = [{"index": int, "relevance_score": float}, ...]
123
- # Restore scores to original docs
124
  for r in results:
125
  idx = r["index"]
126
  if idx < len(valid_docs):
127
  valid_docs[idx]["rerank_score"] = float(r["relevance_score"])
128
 
129
- # Sort by rerank_score descending
130
  valid_docs.sort(key=lambda x: x.get("rerank_score", 0), reverse=True)
131
 
132
  logger.info(
133
- f"[JinaReranker] {len(valid_docs)} docs β†’ top {top_n} "
134
- f"in {elapsed_ms:.0f}ms "
135
- f"(tokens={usage.get('total_tokens', '?')}, "
136
- f"top_score={valid_docs[0].get('rerank_score', 0):.3f})"
137
  )
 
 
 
 
 
 
138
  return valid_docs[:top_n]
139
 
140
  elif response.status_code == 401:
141
- logger.error("Jina Reranker API: Invalid API key")
142
  elif response.status_code == 429:
143
- logger.warning("Jina Reranker API: Rate limit exceeded")
144
  elif response.status_code == 402:
145
- logger.warning("Jina Reranker API: Insufficient tokens β€” top up at jina.ai")
146
  else:
147
- logger.warning(
148
- f"Jina Reranker API: HTTP {response.status_code} β€” {response.text[:200]}"
149
- )
150
 
151
  except httpx.TimeoutException:
152
- logger.warning(f"Jina Reranker API: timeout ({self.timeout}s)")
153
  except Exception as e:
154
- logger.error(f"Jina Reranker API error: {e}")
155
 
156
- # Fallback: sort by vector score
157
- logger.warning("Jina Reranker API failed β€” falling back to vector score ordering")
158
  return sorted(docs, key=lambda x: x.get("score", 0), reverse=True)[:top_n]
159
 
160
  def is_available(self) -> bool:
 
75
  ) -> List[Dict[str, Any]]:
76
  """
77
  Rerank documents using Jina API.
 
78
  Sends all docs in one request β€” Jina returns them sorted by relevance.
79
  Falls back to vector score ordering if API unavailable.
80
  """
 
82
  return []
83
 
84
  if not self.api_key:
85
+ logger.warning("[JinaReranker] API disabled β€” falling back to score ordering")
86
  return sorted(docs, key=lambda x: x.get("score", 0), reverse=True)[:top_n]
87
 
 
88
  MAX_CHARS = 2048
89
  valid_docs = []
90
  doc_texts = []
 
97
  if not doc_texts:
98
  return []
99
 
100
+ logger.info(
101
+ f"[JinaReranker] Reranking {len(valid_docs)} docs for query: '{query[:60]}'"
102
+ )
103
+ for i, doc in enumerate(valid_docs[:5], 1):
104
+ chars = len(doc.get("content", ""))
105
+ src = doc.get("source") or doc.get("metadata", {}).get("source", "?")
106
+ logger.info(f"[JinaReranker] Input #{i}: {chars:,} chars β€” src={src} β€” {doc.get('title', doc.get('content',''))[:60]}")
107
+
108
  t0 = time.time()
109
  try:
110
  response = self._get_client().post(
 
113
  "model": self.model,
114
  "query": query,
115
  "documents": doc_texts,
116
+ "top_n": len(doc_texts),
117
+ "return_documents": False,
118
  }
119
  )
120
 
 
125
  results = data.get("results", [])
126
  usage = data.get("usage", {})
127
 
 
 
128
  for r in results:
129
  idx = r["index"]
130
  if idx < len(valid_docs):
131
  valid_docs[idx]["rerank_score"] = float(r["relevance_score"])
132
 
 
133
  valid_docs.sort(key=lambda x: x.get("rerank_score", 0), reverse=True)
134
 
135
  logger.info(
136
+ f"[JinaReranker] Done in {elapsed_ms:.0f}ms β€” "
137
+ f"{len(valid_docs)} docs ranked, tokens={usage.get('total_tokens', '?')}"
 
 
138
  )
139
+ for i, doc in enumerate(valid_docs[:top_n], 1):
140
+ score = doc.get("rerank_score", 0)
141
+ src = doc.get("source") or doc.get("metadata", {}).get("source", "?")
142
+ title = doc.get("title") or doc.get("content", "")[:60]
143
+ logger.info(f"[JinaReranker] Rank #{i}: score={score:.4f} src={src} β€” {title[:60]}")
144
+
145
  return valid_docs[:top_n]
146
 
147
  elif response.status_code == 401:
148
+ logger.error("[JinaReranker] Invalid API key")
149
  elif response.status_code == 429:
150
+ logger.warning("[JinaReranker] Rate limit exceeded")
151
  elif response.status_code == 402:
152
+ logger.warning("[JinaReranker] Insufficient tokens β€” top up at jina.ai")
153
  else:
154
+ logger.warning(f"[JinaReranker] HTTP {response.status_code} β€” {response.text[:200]}")
 
 
155
 
156
  except httpx.TimeoutException:
157
+ logger.warning(f"[JinaReranker] Timeout ({self.timeout}s)")
158
  except Exception as e:
159
+ logger.error(f"[JinaReranker] Error: {e}")
160
 
161
+ logger.warning("[JinaReranker] API failed β€” falling back to vector score ordering")
 
162
  return sorted(docs, key=lambda x: x.get("score", 0), reverse=True)[:top_n]
163
 
164
  def is_available(self) -> bool: