Spaces:
Running
feat(query-enhancement): upgrade LLM prompt + replace print with logger
Browse filesQuery Enhancement Prompt Upgrades:
- Added 'location' field: extracts specific region/city from query
- Added 'query_type' field: conflict|humanitarian|political|economic|social|general
- Added 'search_keywords' field: 3-5 key terms for keyword overlap filter
- Removed NLI DeBERTa fallback (replaced with simple regex fallback)
- Cleaner regex fallback: temporal + source detection without DeBERTa dependency
- Structured log output: [QE] topic=... type=... days_back=... source=... location=...
Logging Improvements:
- Replaced ALL print(f'DEBUG: ...') with logger.info(f'[RAG] ...')
- Fixed broken line: except block return [] was on same line as logger call
- Consistent [RAG] prefix for all pipeline log messages
- Errors use logger.warning() instead of logger.info()
- Server logs now show clean structured output instead of print spam
- src/core/use_cases/rag_chat_use_case.py +124 -117
|
@@ -135,38 +135,52 @@ Document:
|
|
| 135 |
|
| 136 |
def _extract_intents_and_translate(self, query: str) -> Dict[str, Any]:
|
| 137 |
"""
|
| 138 |
-
Single LLM call
|
| 139 |
-
Replaces the previous two-call approach (_extract_intents + _translate_query_to_all_languages).
|
| 140 |
|
| 141 |
Returns:
|
| 142 |
{
|
| 143 |
-
"expanded_query": str,
|
| 144 |
-
"days_back": int | None,
|
| 145 |
-
"source": str | None,
|
| 146 |
-
"
|
| 147 |
-
|
|
|
|
|
|
|
|
|
|
| 148 |
}
|
| 149 |
}
|
| 150 |
Falls back gracefully on any LLM/parse failure.
|
| 151 |
"""
|
| 152 |
import re, json
|
| 153 |
|
| 154 |
-
prompt = f"""You are a search query processor for a multilingual news system.
|
| 155 |
-
|
| 156 |
-
|
| 157 |
-
|
| 158 |
-
|
| 159 |
-
|
| 160 |
-
|
| 161 |
-
|
| 162 |
-
|
| 163 |
-
|
| 164 |
-
|
| 165 |
-
|
| 166 |
-
|
| 167 |
-
|
| 168 |
-
|
| 169 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 170 |
|
| 171 |
Query: "{query}"
|
| 172 |
|
|
@@ -176,6 +190,9 @@ JSON:"""
|
|
| 176 |
"expanded_query": query,
|
| 177 |
"days_back": None,
|
| 178 |
"source": None,
|
|
|
|
|
|
|
|
|
|
| 179 |
"translations": {lang: query for lang in ["ar", "am", "so", "sw", "fr"]},
|
| 180 |
}
|
| 181 |
|
|
@@ -187,68 +204,55 @@ JSON:"""
|
|
| 187 |
|
| 188 |
if isinstance(parsed.get("days_back"), int):
|
| 189 |
result["days_back"] = parsed["days_back"]
|
| 190 |
-
print(f"DEBUG: days_back={parsed['days_back']}")
|
| 191 |
|
| 192 |
if isinstance(parsed.get("source"), str) and parsed["source"]:
|
| 193 |
result["source"] = parsed["source"]
|
| 194 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 195 |
|
| 196 |
if isinstance(parsed.get("topic"), str) and parsed["topic"].strip():
|
| 197 |
result["expanded_query"] = parsed["topic"].strip()
|
| 198 |
-
print(f"DEBUG: expanded_query='{result['expanded_query']}'")
|
| 199 |
|
| 200 |
translations = parsed.get("translations", {})
|
| 201 |
if isinstance(translations, dict):
|
| 202 |
for lang in ["ar", "am", "so", "sw", "fr"]:
|
| 203 |
val = translations.get(lang, "").strip()
|
| 204 |
result["translations"][lang] = val if val else result["expanded_query"]
|
| 205 |
-
print(f"DEBUG: translation [{lang}]: {result['translations'][lang]}")
|
| 206 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 207 |
return result
|
| 208 |
|
| 209 |
except Exception as e:
|
| 210 |
-
|
| 211 |
-
|
| 212 |
-
# ββ
|
| 213 |
-
|
| 214 |
-
|
| 215 |
-
|
| 216 |
-
|
| 217 |
-
|
| 218 |
-
|
| 219 |
-
|
| 220 |
-
|
| 221 |
-
|
| 222 |
-
|
| 223 |
-
|
| 224 |
-
|
| 225 |
-
|
| 226 |
-
"asking about news from this week or past few days",
|
| 227 |
-
"asking about news from this month",
|
| 228 |
-
"asking about news from this year",
|
| 229 |
-
"no specific time period mentioned",
|
| 230 |
-
],
|
| 231 |
-
hypothesis_template="The user is {}.",
|
| 232 |
-
multi_label=False,
|
| 233 |
-
)
|
| 234 |
-
top_label = temporal_result["labels"][0]
|
| 235 |
-
top_score = temporal_result["scores"][0]
|
| 236 |
-
if top_score > 0.4:
|
| 237 |
-
if "today" in top_label or "right now" in top_label:
|
| 238 |
-
result["days_back"] = 1
|
| 239 |
-
elif "yesterday" in top_label:
|
| 240 |
-
result["days_back"] = 2
|
| 241 |
-
elif "week" in top_label or "past few days" in top_label:
|
| 242 |
-
result["days_back"] = 7
|
| 243 |
-
elif "month" in top_label:
|
| 244 |
-
result["days_back"] = 30
|
| 245 |
-
elif "year" in top_label:
|
| 246 |
-
result["days_back"] = 365
|
| 247 |
-
print(f"DEBUG: NLI temporal fallback β days_back={result['days_back']}")
|
| 248 |
-
except Exception as e:
|
| 249 |
-
print(f"DEBUG: NLI fallback also failed: {e}")
|
| 250 |
-
|
| 251 |
-
# ββ Source regex fallback βββββββββββββββββββββββββββββββββββββββββββββ
|
| 252 |
source_match = re.search(
|
| 253 |
r'\b(?:from|on|by|via|source[:\s]+)\s*([A-Z][A-Za-z]+(?:\s[A-Z][A-Za-z]+)?)\b',
|
| 254 |
query
|
|
@@ -256,6 +260,7 @@ JSON:"""
|
|
| 256 |
if source_match:
|
| 257 |
result["source"] = source_match.group(1)
|
| 258 |
|
|
|
|
| 259 |
return result
|
| 260 |
|
| 261 |
def _search_single_language(
|
|
@@ -309,11 +314,11 @@ JSON:"""
|
|
| 309 |
"doc_id": hit.doc_id,
|
| 310 |
})
|
| 311 |
|
| 312 |
-
|
| 313 |
return docs
|
| 314 |
|
| 315 |
except Exception as e:
|
| 316 |
-
|
| 317 |
return []
|
| 318 |
|
| 319 |
async def _build_context(self, query: str, top_k: int, source_filter=None, language_filter=None, days_back=None) -> Tuple[str, List[Dict[str, Any]]]:
|
|
@@ -327,13 +332,13 @@ JSON:"""
|
|
| 327 |
if language_detector:
|
| 328 |
lang_detection = language_detector.detect(query)
|
| 329 |
query_language = lang_detection.language
|
| 330 |
-
|
| 331 |
|
| 332 |
# If query is not in English, we'll handle it in translation step
|
| 333 |
if query_language != "en":
|
| 334 |
-
|
| 335 |
except Exception as e:
|
| 336 |
-
|
| 337 |
|
| 338 |
# Expand query if needed (typo fix, short query expansion)
|
| 339 |
try:
|
|
@@ -341,20 +346,20 @@ JSON:"""
|
|
| 341 |
if query_expander:
|
| 342 |
expansion_result = query_expander.expand(query)
|
| 343 |
if expansion_result.was_expanded:
|
| 344 |
-
|
| 345 |
-
|
| 346 |
query = expansion_result.expanded
|
| 347 |
else:
|
| 348 |
-
|
| 349 |
except Exception as e:
|
| 350 |
-
|
| 351 |
|
| 352 |
# Extract entities for better filtering
|
| 353 |
try:
|
| 354 |
from src.infrastructure.adapters.entity_extractor import entity_extractor
|
| 355 |
if entity_extractor:
|
| 356 |
entities = entity_extractor.extract(query)
|
| 357 |
-
|
| 358 |
print(f" - Locations: {entities.locations}")
|
| 359 |
print(f" - Organizations: {entities.organizations}")
|
| 360 |
print(f" - Temporal keywords: {entities.temporal_keywords}")
|
|
@@ -364,9 +369,9 @@ JSON:"""
|
|
| 364 |
auto_source = entity_extractor.get_source_filter(entities)
|
| 365 |
if auto_source:
|
| 366 |
source_filter = auto_source
|
| 367 |
-
|
| 368 |
except Exception as e:
|
| 369 |
-
|
| 370 |
|
| 371 |
# ββ Step 1: Single LLM call β intent extraction + multilingual translation ββ
|
| 372 |
expanded_query = query
|
|
@@ -380,7 +385,7 @@ JSON:"""
|
|
| 380 |
use_hybrid = self.orchestrator is not None and self.hybrid_ranker is not None
|
| 381 |
|
| 382 |
if use_hybrid:
|
| 383 |
-
|
| 384 |
|
| 385 |
# Classify intent using v2 (production-grade) or v1 (fallback)
|
| 386 |
# Check Redis cache first to avoid 8-11s DeBERTa inference on repeat queries
|
|
@@ -390,7 +395,7 @@ JSON:"""
|
|
| 390 |
if self.cache:
|
| 391 |
cached_intent = self.cache.get(intent_cache_key)
|
| 392 |
if cached_intent:
|
| 393 |
-
|
| 394 |
# Reconstruct a minimal intent result from cache
|
| 395 |
class _CachedIntent:
|
| 396 |
def __init__(self, d):
|
|
@@ -400,17 +405,19 @@ JSON:"""
|
|
| 400 |
self.inference_time_ms = 0.0
|
| 401 |
intent_result = _CachedIntent(cached_intent)
|
| 402 |
intent = "NEWS" if intent_result.intent != "OTHER" else "OTHER"
|
| 403 |
-
|
| 404 |
|
| 405 |
if intent_result is None:
|
| 406 |
if self.use_v2_classifier and self.intent_classifier_v2:
|
| 407 |
intent_result = self.intent_classifier_v2.classify(query)
|
| 408 |
intent = "NEWS" if intent_result.intent != "OTHER" else "OTHER"
|
| 409 |
|
| 410 |
-
|
| 411 |
-
|
| 412 |
-
|
| 413 |
-
|
|
|
|
|
|
|
| 414 |
|
| 415 |
# Cache intent result for 1 hour (same query = same intent)
|
| 416 |
if self.cache:
|
|
@@ -422,27 +429,27 @@ JSON:"""
|
|
| 422 |
else:
|
| 423 |
intent = self.intent_classifier.classify(query)
|
| 424 |
intent_result = None
|
| 425 |
-
|
| 426 |
|
| 427 |
# Decide search strategy (pass full intent_result for v2)
|
| 428 |
strategy = self.orchestrator.decide_search_strategy(query, intent, intent_result)
|
| 429 |
-
|
| 430 |
|
| 431 |
# If intent is OTHER (small talk), skip search entirely
|
| 432 |
if intent == "OTHER":
|
| 433 |
-
|
| 434 |
return "", []
|
| 435 |
else:
|
| 436 |
-
|
| 437 |
use_hybrid = False
|
| 438 |
strategy = None
|
| 439 |
|
| 440 |
if actual_language_filter:
|
| 441 |
# Explicit language override β single-language mode, no translation needed
|
| 442 |
-
|
| 443 |
lang_sparse_queries: Dict[str, str] = {actual_language_filter: expanded_query}
|
| 444 |
else:
|
| 445 |
-
|
| 446 |
combined = self._extract_intents_and_translate(query)
|
| 447 |
|
| 448 |
if combined.get("days_back") and isinstance(combined["days_back"], int):
|
|
@@ -466,7 +473,7 @@ JSON:"""
|
|
| 466 |
|
| 467 |
# ββ HYBRID SEARCH EXECUTION ββββββββββββββββββββββββββββββββββββββββββββ
|
| 468 |
if use_hybrid and strategy and (strategy.use_live or strategy.use_db):
|
| 469 |
-
|
| 470 |
|
| 471 |
# Execute hybrid search (parallel live + DB)
|
| 472 |
try:
|
|
@@ -480,7 +487,7 @@ JSON:"""
|
|
| 480 |
top_k=per_lang_limit
|
| 481 |
)
|
| 482 |
|
| 483 |
-
|
| 484 |
|
| 485 |
# Merge and rank results
|
| 486 |
all_docs = self.hybrid_ranker.merge_and_rank(
|
|
@@ -491,24 +498,24 @@ JSON:"""
|
|
| 491 |
final_top_n=top_k * 3 # Get more candidates for quality filtering
|
| 492 |
)
|
| 493 |
|
| 494 |
-
|
| 495 |
|
| 496 |
except Exception as e:
|
| 497 |
-
|
| 498 |
use_hybrid = False
|
| 499 |
all_docs = []
|
| 500 |
|
| 501 |
# ββ TRADITIONAL PIPELINE (fallback or when hybrid disabled) ββββββββββββ
|
| 502 |
if not use_hybrid or not all_docs:
|
| 503 |
-
|
| 504 |
|
| 505 |
# ββ Step 2: Compute dense vector ONCE from English query ββββββββββββββ
|
| 506 |
# BGE-M3 dense space is language-agnostic β one English dense vector
|
| 507 |
# semantically matches content in all 6 languages.
|
| 508 |
-
|
| 509 |
english_vectors = self.embedder.encode_query(expanded_query)
|
| 510 |
dense_vec: List[float] = english_vectors["dense"]
|
| 511 |
-
|
| 512 |
|
| 513 |
# ββ Step 3: Batch sparse encoding β ONE forward pass for all languages β
|
| 514 |
# BGE-M3 holds the GIL during inference. ThreadPoolExecutor gives zero
|
|
@@ -518,7 +525,7 @@ JSON:"""
|
|
| 518 |
lang_codes = list(lang_sparse_queries.keys())
|
| 519 |
lang_texts = [lang_sparse_queries[lc] for lc in lang_codes]
|
| 520 |
|
| 521 |
-
|
| 522 |
sparse_results = self.embedder.encode_sparse_batch(lang_texts)
|
| 523 |
|
| 524 |
lang_sparse_vecs: Dict[str, Optional[Dict]] = {}
|
|
@@ -526,7 +533,7 @@ JSON:"""
|
|
| 526 |
sparse = result.get("sparse")
|
| 527 |
lang_sparse_vecs[lc] = sparse
|
| 528 |
token_count = len(sparse["indices"]) if sparse else 0
|
| 529 |
-
|
| 530 |
|
| 531 |
# ββ Step 4: Fan out to Qdrant β 6 parallel searches ββββββββββββββββββ
|
| 532 |
# Each lane: shared dense_vec + language-specific sparse_vec + language filter
|
|
@@ -559,14 +566,14 @@ JSON:"""
|
|
| 559 |
seen_doc_ids.add(doc_id)
|
| 560 |
all_docs.append(doc)
|
| 561 |
except Exception as e:
|
| 562 |
-
|
| 563 |
|
| 564 |
-
|
| 565 |
|
| 566 |
# ββ Step 5: Temporal fallback β retry without date filter if zero results ββ
|
| 567 |
self._temporal_fallback_used = False
|
| 568 |
if len(all_docs) == 0 and days_back is not None:
|
| 569 |
-
|
| 570 |
with concurrent.futures.ThreadPoolExecutor(max_workers=len(lang_sparse_queries)) as executor:
|
| 571 |
futures = {
|
| 572 |
executor.submit(_run_search, lc, None): lc
|
|
@@ -583,9 +590,9 @@ JSON:"""
|
|
| 583 |
seen_doc_ids.add(doc_id)
|
| 584 |
all_docs.append(doc)
|
| 585 |
except Exception as e:
|
| 586 |
-
|
| 587 |
self._temporal_fallback_used = True
|
| 588 |
-
|
| 589 |
|
| 590 |
# ββ Step 6: Multilingual reranking ββββββββββββββββββββββββββββββββββββ
|
| 591 |
# bge-reranker-v2-m3 scores (English query, any-language content) natively
|
|
@@ -610,11 +617,11 @@ JSON:"""
|
|
| 610 |
above_threshold = [d for d in quality_docs if d.get("rerank_score", 1.0) >= RERANK_THRESHOLD]
|
| 611 |
if above_threshold:
|
| 612 |
quality_docs = above_threshold
|
| 613 |
-
|
| 614 |
else:
|
| 615 |
# All scores low β keep top 3 anyway rather than returning nothing
|
| 616 |
quality_docs = quality_docs[:3]
|
| 617 |
-
|
| 618 |
|
| 619 |
# ββ Keyword overlap filter β soft filter, keeps docs with ANY query term β
|
| 620 |
# Only drops docs with ZERO overlap AND low rerank score.
|
|
@@ -643,7 +650,7 @@ JSON:"""
|
|
| 643 |
overlapping = [d for d in quality_docs if _has_overlap(d)]
|
| 644 |
if overlapping:
|
| 645 |
quality_docs = overlapping
|
| 646 |
-
|
| 647 |
else:
|
| 648 |
# No overlap at all β keep top 5 by score rather than dropping everything
|
| 649 |
quality_docs = sorted(
|
|
@@ -651,7 +658,7 @@ JSON:"""
|
|
| 651 |
key=lambda d: d.get("rerank_score") or d.get("score", 0),
|
| 652 |
reverse=True
|
| 653 |
)[:5]
|
| 654 |
-
|
| 655 |
|
| 656 |
# Guarantee at least 1 non-English result if available
|
| 657 |
non_english = [d for d in quality_docs if d.get("metadata", {}).get("_search_lang", "en") != "en"]
|
|
@@ -672,7 +679,7 @@ JSON:"""
|
|
| 672 |
deduped_final.append(d)
|
| 673 |
|
| 674 |
langs_in_result = list({d.get("metadata", {}).get("_search_lang", "en") for d in deduped_final})
|
| 675 |
-
|
| 676 |
|
| 677 |
# ββ Step 8: Token limitation ββββββββββββββββββββββββββββββββββββββββββ
|
| 678 |
return self._limit_context(query, deduped_final)
|
|
@@ -703,12 +710,12 @@ JSON:"""
|
|
| 703 |
}
|
| 704 |
|
| 705 |
async def execute_chat(self, request: ChatRequest) -> Dict[str, Any]:
|
| 706 |
-
|
| 707 |
# Generate a unique session ID if none provided β never use a shared fallback
|
| 708 |
if not request.session_id:
|
| 709 |
import uuid
|
| 710 |
request.session_id = str(uuid.uuid4())
|
| 711 |
-
|
| 712 |
session_id = request.session_id
|
| 713 |
|
| 714 |
# ββ Layer 1: Full Response Cache (5 min TTL) ββββββββββββββββββββββββββ
|
|
@@ -869,7 +876,7 @@ Answer:"""
|
|
| 869 |
result,
|
| 870 |
expiration=settings.CACHE_RESPONSE_TTL
|
| 871 |
)
|
| 872 |
-
|
| 873 |
|
| 874 |
return result
|
| 875 |
|
|
@@ -878,7 +885,7 @@ Answer:"""
|
|
| 878 |
if not request.session_id:
|
| 879 |
import uuid
|
| 880 |
request.session_id = str(uuid.uuid4())
|
| 881 |
-
|
| 882 |
session_id = request.session_id
|
| 883 |
history_text = "" if is_guest else self._get_history_text(session_id)
|
| 884 |
context_text, final_sources = await self._build_context(
|
|
|
|
| 135 |
|
| 136 |
def _extract_intents_and_translate(self, query: str) -> Dict[str, Any]:
|
| 137 |
"""
|
| 138 |
+
Single LLM call: query understanding + multilingual translation.
|
|
|
|
| 139 |
|
| 140 |
Returns:
|
| 141 |
{
|
| 142 |
+
"expanded_query": str, # cleaned, specific English topic
|
| 143 |
+
"days_back": int | None, # temporal filter (1=today, 7=week, 30=month, 365=year)
|
| 144 |
+
"source": str | None, # news outlet filter if mentioned
|
| 145 |
+
"location": str | None, # specific location/region if mentioned
|
| 146 |
+
"query_type": str, # "conflict" | "humanitarian" | "political" | "economic" | "general"
|
| 147 |
+
"search_keywords": list, # 3-5 key terms for keyword overlap filter
|
| 148 |
+
"translations": { # per-language queries for Qdrant
|
| 149 |
+
"ar": str, "am": str, "so": str, "sw": str, "fr": str
|
| 150 |
}
|
| 151 |
}
|
| 152 |
Falls back gracefully on any LLM/parse failure.
|
| 153 |
"""
|
| 154 |
import re, json
|
| 155 |
|
| 156 |
+
prompt = f"""You are a search query processor for a multilingual Ethiopia/Africa news system.
|
| 157 |
+
Analyze the query and output ONLY valid JSON with these exact fields:
|
| 158 |
+
|
| 159 |
+
{{
|
| 160 |
+
"days_back": <integer or null>,
|
| 161 |
+
"source": <string or null>,
|
| 162 |
+
"location": <string or null>,
|
| 163 |
+
"query_type": <string>,
|
| 164 |
+
"topic": <string>,
|
| 165 |
+
"search_keywords": [<string>, ...],
|
| 166 |
+
"translations": {{"ar": <string>, "am": <string>, "so": <string>, "sw": <string>, "fr": <string>}}
|
| 167 |
+
}}
|
| 168 |
+
|
| 169 |
+
Field rules:
|
| 170 |
+
- "days_back": 1=today/now/tonight/breaking, 2=yesterday, 7=this week/recently/past few days, 30=this month, 365=this year, null=no time reference
|
| 171 |
+
- "source": news outlet name ONLY if explicitly mentioned (e.g. "BBC", "Reuters", "Al Jazeera"), else null
|
| 172 |
+
- "location": specific place if mentioned (e.g. "Amhara", "Tigray", "Addis Ababa", "Somalia"), else null
|
| 173 |
+
- "query_type": ONE of: "conflict" | "humanitarian" | "political" | "economic" | "social" | "general"
|
| 174 |
+
* conflict = fighting, clashes, attacks, military, armed groups
|
| 175 |
+
* humanitarian = displaced, refugees, aid, famine, drought, flood
|
| 176 |
+
* political = elections, government, diplomacy, policy, leaders
|
| 177 |
+
* economic = economy, trade, investment, inflation, development
|
| 178 |
+
* social = education, health, culture, religion, sports
|
| 179 |
+
* general = anything else
|
| 180 |
+
- "topic": the SPECIFIC search topic β fix typos, remove time/source words, keep named entities EXACT.
|
| 181 |
+
"Ethiopia peace talks" stays "Ethiopia peace talks". Do NOT broaden or generalize.
|
| 182 |
+
- "search_keywords": 3-5 key terms from the topic for keyword matching (lowercase, no stopwords)
|
| 183 |
+
- "translations": translate "topic" into each language. Keep named entities in local spelling. Use English if uncertain.
|
| 184 |
|
| 185 |
Query: "{query}"
|
| 186 |
|
|
|
|
| 190 |
"expanded_query": query,
|
| 191 |
"days_back": None,
|
| 192 |
"source": None,
|
| 193 |
+
"location": None,
|
| 194 |
+
"query_type": "general",
|
| 195 |
+
"search_keywords": [],
|
| 196 |
"translations": {lang: query for lang in ["ar", "am", "so", "sw", "fr"]},
|
| 197 |
}
|
| 198 |
|
|
|
|
| 204 |
|
| 205 |
if isinstance(parsed.get("days_back"), int):
|
| 206 |
result["days_back"] = parsed["days_back"]
|
|
|
|
| 207 |
|
| 208 |
if isinstance(parsed.get("source"), str) and parsed["source"]:
|
| 209 |
result["source"] = parsed["source"]
|
| 210 |
+
|
| 211 |
+
if isinstance(parsed.get("location"), str) and parsed["location"]:
|
| 212 |
+
result["location"] = parsed["location"]
|
| 213 |
+
|
| 214 |
+
if isinstance(parsed.get("query_type"), str) and parsed["query_type"]:
|
| 215 |
+
result["query_type"] = parsed["query_type"]
|
| 216 |
+
|
| 217 |
+
if isinstance(parsed.get("search_keywords"), list):
|
| 218 |
+
result["search_keywords"] = [k for k in parsed["search_keywords"] if isinstance(k, str)][:5]
|
| 219 |
|
| 220 |
if isinstance(parsed.get("topic"), str) and parsed["topic"].strip():
|
| 221 |
result["expanded_query"] = parsed["topic"].strip()
|
|
|
|
| 222 |
|
| 223 |
translations = parsed.get("translations", {})
|
| 224 |
if isinstance(translations, dict):
|
| 225 |
for lang in ["ar", "am", "so", "sw", "fr"]:
|
| 226 |
val = translations.get(lang, "").strip()
|
| 227 |
result["translations"][lang] = val if val else result["expanded_query"]
|
|
|
|
| 228 |
|
| 229 |
+
logger.info(
|
| 230 |
+
f"[QE] topic='{result['expanded_query']}' "
|
| 231 |
+
f"type={result['query_type']} "
|
| 232 |
+
f"days_back={result['days_back']} "
|
| 233 |
+
f"source={result['source']} "
|
| 234 |
+
f"location={result['location']} "
|
| 235 |
+
f"keywords={result['search_keywords']}"
|
| 236 |
+
)
|
| 237 |
return result
|
| 238 |
|
| 239 |
except Exception as e:
|
| 240 |
+
logger.warning(f"[QE] LLM call failed: {e} β using regex fallback")
|
| 241 |
+
|
| 242 |
+
# ββ Regex fallback for temporal + source βββββββββββββββββββββββββββββ
|
| 243 |
+
query_lower = query.lower()
|
| 244 |
+
|
| 245 |
+
# Temporal fallback
|
| 246 |
+
if any(w in query_lower for w in ("today", "tonight", "now", "breaking", "just")):
|
| 247 |
+
result["days_back"] = 1
|
| 248 |
+
elif "yesterday" in query_lower:
|
| 249 |
+
result["days_back"] = 2
|
| 250 |
+
elif any(w in query_lower for w in ("this week", "recently", "past few days")):
|
| 251 |
+
result["days_back"] = 7
|
| 252 |
+
elif "this month" in query_lower:
|
| 253 |
+
result["days_back"] = 30
|
| 254 |
+
|
| 255 |
+
# Source fallback
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 256 |
source_match = re.search(
|
| 257 |
r'\b(?:from|on|by|via|source[:\s]+)\s*([A-Z][A-Za-z]+(?:\s[A-Z][A-Za-z]+)?)\b',
|
| 258 |
query
|
|
|
|
| 260 |
if source_match:
|
| 261 |
result["source"] = source_match.group(1)
|
| 262 |
|
| 263 |
+
logger.info(f"[QE] Regex fallback: days_back={result['days_back']} source={result['source']}")
|
| 264 |
return result
|
| 265 |
|
| 266 |
def _search_single_language(
|
|
|
|
| 314 |
"doc_id": hit.doc_id,
|
| 315 |
})
|
| 316 |
|
| 317 |
+
logger.info(f"[RAG] [{lang_code}] search returned {len(docs)} results")
|
| 318 |
return docs
|
| 319 |
|
| 320 |
except Exception as e:
|
| 321 |
+
logger.warning(f"[RAG] [{lang_code}] search failed: {e}")
|
| 322 |
return []
|
| 323 |
|
| 324 |
async def _build_context(self, query: str, top_k: int, source_filter=None, language_filter=None, days_back=None) -> Tuple[str, List[Dict[str, Any]]]:
|
|
|
|
| 332 |
if language_detector:
|
| 333 |
lang_detection = language_detector.detect(query)
|
| 334 |
query_language = lang_detection.language
|
| 335 |
+
logger.info(f"[RAG] Detected language: {query_language} (confidence={lang_detection.confidence:.2f}, method={lang_detection.method})")
|
| 336 |
|
| 337 |
# If query is not in English, we'll handle it in translation step
|
| 338 |
if query_language != "en":
|
| 339 |
+
logger.info(f"[RAG] Non-English query detected, will translate to English for processing")
|
| 340 |
except Exception as e:
|
| 341 |
+
logger.info(f"[RAG] Language detection failed: {e}, assuming English")
|
| 342 |
|
| 343 |
# Expand query if needed (typo fix, short query expansion)
|
| 344 |
try:
|
|
|
|
| 346 |
if query_expander:
|
| 347 |
expansion_result = query_expander.expand(query)
|
| 348 |
if expansion_result.was_expanded:
|
| 349 |
+
logger.info(f"[RAG] Query expanded: '{query}' β '{expansion_result.expanded}'")
|
| 350 |
+
logger.info(f"[RAG] Expansion reason: {expansion_result.expansion_reason}")
|
| 351 |
query = expansion_result.expanded
|
| 352 |
else:
|
| 353 |
+
logger.info(f"[RAG] Query not expanded: {expansion_result.expansion_reason}")
|
| 354 |
except Exception as e:
|
| 355 |
+
logger.info(f"[RAG] Query expansion failed: {e}, using original query")
|
| 356 |
|
| 357 |
# Extract entities for better filtering
|
| 358 |
try:
|
| 359 |
from src.infrastructure.adapters.entity_extractor import entity_extractor
|
| 360 |
if entity_extractor:
|
| 361 |
entities = entity_extractor.extract(query)
|
| 362 |
+
logger.info(f"[RAG] Extracted entities:")
|
| 363 |
print(f" - Locations: {entities.locations}")
|
| 364 |
print(f" - Organizations: {entities.organizations}")
|
| 365 |
print(f" - Temporal keywords: {entities.temporal_keywords}")
|
|
|
|
| 369 |
auto_source = entity_extractor.get_source_filter(entities)
|
| 370 |
if auto_source:
|
| 371 |
source_filter = auto_source
|
| 372 |
+
logger.info(f"[RAG] Auto-detected source filter: {source_filter}")
|
| 373 |
except Exception as e:
|
| 374 |
+
logger.info(f"[RAG] Entity extraction failed: {e}")
|
| 375 |
|
| 376 |
# ββ Step 1: Single LLM call β intent extraction + multilingual translation ββ
|
| 377 |
expanded_query = query
|
|
|
|
| 385 |
use_hybrid = self.orchestrator is not None and self.hybrid_ranker is not None
|
| 386 |
|
| 387 |
if use_hybrid:
|
| 388 |
+
logger.info(f"[RAG] Hybrid search enabled - checking intent and strategy")
|
| 389 |
|
| 390 |
# Classify intent using v2 (production-grade) or v1 (fallback)
|
| 391 |
# Check Redis cache first to avoid 8-11s DeBERTa inference on repeat queries
|
|
|
|
| 395 |
if self.cache:
|
| 396 |
cached_intent = self.cache.get(intent_cache_key)
|
| 397 |
if cached_intent:
|
| 398 |
+
logger.info(f"[RAG] Intent cache HIT β skipping DeBERTa inference")
|
| 399 |
# Reconstruct a minimal intent result from cache
|
| 400 |
class _CachedIntent:
|
| 401 |
def __init__(self, d):
|
|
|
|
| 405 |
self.inference_time_ms = 0.0
|
| 406 |
intent_result = _CachedIntent(cached_intent)
|
| 407 |
intent = "NEWS" if intent_result.intent != "OTHER" else "OTHER"
|
| 408 |
+
logger.info(f"[RAG] Intent (cached): {intent_result.intent} (confidence={intent_result.confidence:.2f})")
|
| 409 |
|
| 410 |
if intent_result is None:
|
| 411 |
if self.use_v2_classifier and self.intent_classifier_v2:
|
| 412 |
intent_result = self.intent_classifier_v2.classify(query)
|
| 413 |
intent = "NEWS" if intent_result.intent != "OTHER" else "OTHER"
|
| 414 |
|
| 415 |
+
logger.info(
|
| 416 |
+
f"[RAG] Intent v2: {intent_result.intent} "
|
| 417 |
+
f"conf={intent_result.confidence:.2f} "
|
| 418 |
+
f"method={intent_result.method} "
|
| 419 |
+
f"time={intent_result.inference_time_ms:.1f}ms"
|
| 420 |
+
)
|
| 421 |
|
| 422 |
# Cache intent result for 1 hour (same query = same intent)
|
| 423 |
if self.cache:
|
|
|
|
| 429 |
else:
|
| 430 |
intent = self.intent_classifier.classify(query)
|
| 431 |
intent_result = None
|
| 432 |
+
logger.info(f"[RAG] Intent classification v1: {intent}")
|
| 433 |
|
| 434 |
# Decide search strategy (pass full intent_result for v2)
|
| 435 |
strategy = self.orchestrator.decide_search_strategy(query, intent, intent_result)
|
| 436 |
+
logger.info(f"[RAG] Search strategy: {strategy}")
|
| 437 |
|
| 438 |
# If intent is OTHER (small talk), skip search entirely
|
| 439 |
if intent == "OTHER":
|
| 440 |
+
logger.info(f"[RAG] Small talk detected - skipping search")
|
| 441 |
return "", []
|
| 442 |
else:
|
| 443 |
+
logger.info(f"[RAG] Hybrid search disabled - using traditional pipeline")
|
| 444 |
use_hybrid = False
|
| 445 |
strategy = None
|
| 446 |
|
| 447 |
if actual_language_filter:
|
| 448 |
# Explicit language override β single-language mode, no translation needed
|
| 449 |
+
logger.info(f"[RAG] Language filter '{actual_language_filter}' β single-language mode")
|
| 450 |
lang_sparse_queries: Dict[str, str] = {actual_language_filter: expanded_query}
|
| 451 |
else:
|
| 452 |
+
logger.info(f"[RAG] Running combined intent extraction + translation...")
|
| 453 |
combined = self._extract_intents_and_translate(query)
|
| 454 |
|
| 455 |
if combined.get("days_back") and isinstance(combined["days_back"], int):
|
|
|
|
| 473 |
|
| 474 |
# ββ HYBRID SEARCH EXECUTION ββββββββββββββββββββββββββββββββββββββββββββ
|
| 475 |
if use_hybrid and strategy and (strategy.use_live or strategy.use_db):
|
| 476 |
+
logger.info(f"[RAG] Executing hybrid search...")
|
| 477 |
|
| 478 |
# Execute hybrid search (parallel live + DB)
|
| 479 |
try:
|
|
|
|
| 487 |
top_k=per_lang_limit
|
| 488 |
)
|
| 489 |
|
| 490 |
+
logger.info(f"[RAG] Hybrid search returned {len(db_results)} DB + {len(live_results)} live results")
|
| 491 |
|
| 492 |
# Merge and rank results
|
| 493 |
all_docs = self.hybrid_ranker.merge_and_rank(
|
|
|
|
| 498 |
final_top_n=top_k * 3 # Get more candidates for quality filtering
|
| 499 |
)
|
| 500 |
|
| 501 |
+
logger.info(f"[RAG] After hybrid ranking: {len(all_docs)} results")
|
| 502 |
|
| 503 |
except Exception as e:
|
| 504 |
+
logger.info(f"[RAG] Hybrid search failed: {e} - falling back to traditional pipeline")
|
| 505 |
use_hybrid = False
|
| 506 |
all_docs = []
|
| 507 |
|
| 508 |
# ββ TRADITIONAL PIPELINE (fallback or when hybrid disabled) ββββββββββββ
|
| 509 |
if not use_hybrid or not all_docs:
|
| 510 |
+
logger.info(f"[RAG] Using traditional multilingual pipeline")
|
| 511 |
|
| 512 |
# ββ Step 2: Compute dense vector ONCE from English query ββββββββββββββ
|
| 513 |
# BGE-M3 dense space is language-agnostic β one English dense vector
|
| 514 |
# semantically matches content in all 6 languages.
|
| 515 |
+
logger.info(f"[RAG] Computing dense vector for: '{expanded_query}'")
|
| 516 |
english_vectors = self.embedder.encode_query(expanded_query)
|
| 517 |
dense_vec: List[float] = english_vectors["dense"]
|
| 518 |
+
logger.info(f"[RAG] Dense vector ready ({len(dense_vec)} dims)")
|
| 519 |
|
| 520 |
# ββ Step 3: Batch sparse encoding β ONE forward pass for all languages β
|
| 521 |
# BGE-M3 holds the GIL during inference. ThreadPoolExecutor gives zero
|
|
|
|
| 525 |
lang_codes = list(lang_sparse_queries.keys())
|
| 526 |
lang_texts = [lang_sparse_queries[lc] for lc in lang_codes]
|
| 527 |
|
| 528 |
+
logger.info(f"[RAG] Batch sparse encoding {len(lang_texts)} language queries...")
|
| 529 |
sparse_results = self.embedder.encode_sparse_batch(lang_texts)
|
| 530 |
|
| 531 |
lang_sparse_vecs: Dict[str, Optional[Dict]] = {}
|
|
|
|
| 533 |
sparse = result.get("sparse")
|
| 534 |
lang_sparse_vecs[lc] = sparse
|
| 535 |
token_count = len(sparse["indices"]) if sparse else 0
|
| 536 |
+
logger.info(f"[RAG] [{lc}] sparse ready β {token_count} tokens")
|
| 537 |
|
| 538 |
# ββ Step 4: Fan out to Qdrant β 6 parallel searches ββββββββββββββββββ
|
| 539 |
# Each lane: shared dense_vec + language-specific sparse_vec + language filter
|
|
|
|
| 566 |
seen_doc_ids.add(doc_id)
|
| 567 |
all_docs.append(doc)
|
| 568 |
except Exception as e:
|
| 569 |
+
logger.info(f"[RAG] [{lc}] future failed: {e}")
|
| 570 |
|
| 571 |
+
logger.info(f"[RAG] Total pooled candidates after dedup: {len(all_docs)}")
|
| 572 |
|
| 573 |
# ββ Step 5: Temporal fallback β retry without date filter if zero results ββ
|
| 574 |
self._temporal_fallback_used = False
|
| 575 |
if len(all_docs) == 0 and days_back is not None:
|
| 576 |
+
logger.info(f"[RAG] No results with days_back={days_back} β retrying without temporal filter")
|
| 577 |
with concurrent.futures.ThreadPoolExecutor(max_workers=len(lang_sparse_queries)) as executor:
|
| 578 |
futures = {
|
| 579 |
executor.submit(_run_search, lc, None): lc
|
|
|
|
| 590 |
seen_doc_ids.add(doc_id)
|
| 591 |
all_docs.append(doc)
|
| 592 |
except Exception as e:
|
| 593 |
+
logger.info(f"[RAG] [{lc}] fallback future failed: {e}")
|
| 594 |
self._temporal_fallback_used = True
|
| 595 |
+
logger.info(f"[RAG] Fallback returned {len(all_docs)} total candidates")
|
| 596 |
|
| 597 |
# ββ Step 6: Multilingual reranking ββββββββββββββββββββββββββββββββββββ
|
| 598 |
# bge-reranker-v2-m3 scores (English query, any-language content) natively
|
|
|
|
| 617 |
above_threshold = [d for d in quality_docs if d.get("rerank_score", 1.0) >= RERANK_THRESHOLD]
|
| 618 |
if above_threshold:
|
| 619 |
quality_docs = above_threshold
|
| 620 |
+
logger.info(f"[RAG] {len(quality_docs)} docs above rerank threshold {RERANK_THRESHOLD}")
|
| 621 |
else:
|
| 622 |
# All scores low β keep top 3 anyway rather than returning nothing
|
| 623 |
quality_docs = quality_docs[:3]
|
| 624 |
+
logger.info(f"[RAG] All docs below threshold β keeping top 3 by rerank score")
|
| 625 |
|
| 626 |
# ββ Keyword overlap filter β soft filter, keeps docs with ANY query term β
|
| 627 |
# Only drops docs with ZERO overlap AND low rerank score.
|
|
|
|
| 650 |
overlapping = [d for d in quality_docs if _has_overlap(d)]
|
| 651 |
if overlapping:
|
| 652 |
quality_docs = overlapping
|
| 653 |
+
logger.info(f"[RAG] {len(quality_docs)} docs after keyword overlap filter")
|
| 654 |
else:
|
| 655 |
# No overlap at all β keep top 5 by score rather than dropping everything
|
| 656 |
quality_docs = sorted(
|
|
|
|
| 658 |
key=lambda d: d.get("rerank_score") or d.get("score", 0),
|
| 659 |
reverse=True
|
| 660 |
)[:5]
|
| 661 |
+
logger.info(f"[RAG] No keyword overlap β keeping top 5 by score ({len(quality_docs)} docs)")
|
| 662 |
|
| 663 |
# Guarantee at least 1 non-English result if available
|
| 664 |
non_english = [d for d in quality_docs if d.get("metadata", {}).get("_search_lang", "en") != "en"]
|
|
|
|
| 679 |
deduped_final.append(d)
|
| 680 |
|
| 681 |
langs_in_result = list({d.get("metadata", {}).get("_search_lang", "en") for d in deduped_final})
|
| 682 |
+
logger.info(f"[RAG] Final {len(deduped_final)} docs β languages: {langs_in_result}")
|
| 683 |
|
| 684 |
# ββ Step 8: Token limitation ββββββββββββββββββββββββββββββββββββββββββ
|
| 685 |
return self._limit_context(query, deduped_final)
|
|
|
|
| 710 |
}
|
| 711 |
|
| 712 |
async def execute_chat(self, request: ChatRequest) -> Dict[str, Any]:
|
| 713 |
+
logger.info(f"[RAG] execute_chat called with query: {request.query}")
|
| 714 |
# Generate a unique session ID if none provided β never use a shared fallback
|
| 715 |
if not request.session_id:
|
| 716 |
import uuid
|
| 717 |
request.session_id = str(uuid.uuid4())
|
| 718 |
+
logger.info(f"[RAG] Generated new session_id: {request.session_id}")
|
| 719 |
session_id = request.session_id
|
| 720 |
|
| 721 |
# ββ Layer 1: Full Response Cache (5 min TTL) ββββββββββββββββββββββββββ
|
|
|
|
| 876 |
result,
|
| 877 |
expiration=settings.CACHE_RESPONSE_TTL
|
| 878 |
)
|
| 879 |
+
logger.info(f"[RAG] Cached full response (TTL={settings.CACHE_RESPONSE_TTL}s)")
|
| 880 |
|
| 881 |
return result
|
| 882 |
|
|
|
|
| 885 |
if not request.session_id:
|
| 886 |
import uuid
|
| 887 |
request.session_id = str(uuid.uuid4())
|
| 888 |
+
logger.info(f"[RAG] Generated new session_id: {request.session_id}")
|
| 889 |
session_id = request.session_id
|
| 890 |
history_text = "" if is_guest else self._get_history_text(session_id)
|
| 891 |
context_text, final_sources = await self._build_context(
|