Spaces:
Building
Building
| """ | |
| Query Expander & Rewriter | |
| Improves query quality by: | |
| - Expanding short/vague queries | |
| - Fixing typos | |
| - Adding context | |
| - Clarifying ambiguous queries | |
| Uses LLM only for short queries (<4 words) to minimize latency. | |
| """ | |
| import logging | |
| import re | |
| from typing import Dict, Any, Optional | |
| from dataclasses import dataclass | |
| logger = logging.getLogger(__name__) | |
| class ExpandedQuery: | |
| """Result of query expansion""" | |
| original: str | |
| expanded: str | |
| was_expanded: bool | |
| expansion_reason: str | |
| confidence: float | |
| class QueryExpander: | |
| """ | |
| Expands and rewrites queries for better search results. | |
| Strategy: | |
| 1. Check if expansion needed (short, vague, typos) | |
| 2. Use LLM to expand (only for queries that need it) | |
| 3. Cache expansions to avoid repeated LLM calls | |
| """ | |
| # Queries that are too vague and need expansion | |
| VAGUE_PATTERNS = [ | |
| r"^news$", | |
| r"^today'?s?\s+news$", | |
| r"^latest$", | |
| r"^breaking$", | |
| r"^updates?$", | |
| r"^ethiopia$", | |
| r"^africa$", | |
| ] | |
| # Common typos to fix | |
| TYPO_FIXES = { | |
| "ethopia": "ethiopia", | |
| "etiopia": "ethiopia", | |
| "ethiopa": "ethiopia", | |
| "todays": "today's", | |
| "whats": "what's", | |
| "wheres": "where's", | |
| "hows": "how's", | |
| "breakin": "breaking", | |
| "lates": "latest", | |
| "updat": "update", | |
| } | |
| def __init__(self, llm_adapter=None, cache=None): | |
| """ | |
| Initialize query expander. | |
| Args: | |
| llm_adapter: LLM adapter for query expansion | |
| cache: Cache adapter for storing expansions | |
| """ | |
| self.llm = llm_adapter | |
| self.cache = cache | |
| def expand(self, query: str) -> ExpandedQuery: | |
| """ | |
| Expand query if needed. | |
| Args: | |
| query: Original user query | |
| Returns: | |
| ExpandedQuery with original and expanded versions | |
| """ | |
| original = query.strip() | |
| # Step 1: Check cache first | |
| if self.cache: | |
| cache_key = f"query_expansion:{original.lower()}" | |
| cached = self.cache.get(cache_key) | |
| if cached: | |
| logger.debug(f"Query expansion cache hit: {original}") | |
| return ExpandedQuery( | |
| original=original, | |
| expanded=cached["expanded"], | |
| was_expanded=cached["was_expanded"], | |
| expansion_reason=cached["reason"], | |
| confidence=cached["confidence"] | |
| ) | |
| # Step 2: Fix typos first | |
| fixed_query = self._fix_typos(original) | |
| if fixed_query != original: | |
| logger.info(f"Fixed typos: '{original}' β '{fixed_query}'") | |
| # Step 3: Check if expansion needed | |
| needs_expansion, reason = self._needs_expansion(fixed_query) | |
| if not needs_expansion: | |
| result = ExpandedQuery( | |
| original=original, | |
| expanded=fixed_query, | |
| was_expanded=False, | |
| expansion_reason="No expansion needed", | |
| confidence=1.0 | |
| ) | |
| self._cache_result(original, result) | |
| return result | |
| # Step 4: Expand using LLM | |
| if self.llm: | |
| try: | |
| expanded = self._expand_with_llm(fixed_query, reason) | |
| result = ExpandedQuery( | |
| original=original, | |
| expanded=expanded, | |
| was_expanded=True, | |
| expansion_reason=reason, | |
| confidence=0.85 | |
| ) | |
| logger.info(f"Expanded query: '{original}' β '{expanded}'") | |
| self._cache_result(original, result) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Query expansion failed: {e}") | |
| # Step 5: Fallback - use fixed query | |
| result = ExpandedQuery( | |
| original=original, | |
| expanded=fixed_query, | |
| was_expanded=False, | |
| expansion_reason="LLM expansion failed", | |
| confidence=0.7 | |
| ) | |
| self._cache_result(original, result) | |
| return result | |
| def _fix_typos(self, query: str) -> str: | |
| """Fix common typos in query""" | |
| words = query.lower().split() | |
| fixed_words = [] | |
| for word in words: | |
| # Remove punctuation for matching | |
| clean_word = re.sub(r'[^\w\s]', '', word) | |
| if clean_word in self.TYPO_FIXES: | |
| fixed_words.append(self.TYPO_FIXES[clean_word]) | |
| else: | |
| fixed_words.append(word) | |
| return ' '.join(fixed_words) | |
| def _needs_expansion(self, query: str) -> tuple[bool, str]: | |
| """ | |
| Check if query needs expansion. | |
| Returns: | |
| (needs_expansion, reason) | |
| """ | |
| query_lower = query.lower().strip() | |
| word_count = len(query.split()) | |
| # Check if too vague | |
| for pattern in self.VAGUE_PATTERNS: | |
| if re.match(pattern, query_lower, re.IGNORECASE): | |
| return True, "Vague query" | |
| # Check if too short (but not a proper noun) | |
| if word_count <= 2: | |
| # Don't expand if it's a location or proper noun | |
| if not self._is_proper_noun(query): | |
| return True, "Too short" | |
| # Check if missing context | |
| if word_count <= 3 and not any( | |
| kw in query_lower | |
| for kw in ["news", "latest", "today", "breaking", "what", "when", "where", "who", "how", "why"] | |
| ): | |
| return True, "Missing context" | |
| return False, "No expansion needed" | |
| def _is_proper_noun(self, query: str) -> bool: | |
| """Check if query is a proper noun (location, name, etc.)""" | |
| # Simple heuristic: starts with capital letter | |
| words = query.split() | |
| return all(word[0].isupper() for word in words if word) | |
| def _expand_with_llm(self, query: str, reason: str) -> str: | |
| """ | |
| Expand query using LLM. | |
| Args: | |
| query: Query to expand | |
| reason: Reason for expansion | |
| Returns: | |
| Expanded query | |
| """ | |
| prompt = f"""You are a query expansion assistant for a news search system. | |
| Task: Expand this short/vague query into a clear, specific news search query. | |
| Rules: | |
| 1. Keep it concise (max 15 words) | |
| 2. Add context about what news the user wants | |
| 3. Preserve the original intent | |
| 4. Add temporal context if missing (e.g., "latest", "today") | |
| 5. Make it a natural question or statement | |
| Original query: "{query}" | |
| Reason for expansion: {reason} | |
| Expanded query:""" | |
| try: | |
| expanded = self.llm.generate(prompt, max_tokens=50).strip() | |
| # Clean up the response | |
| expanded = expanded.strip('"\'') | |
| # Validate expansion | |
| if len(expanded.split()) > 20: | |
| # Too long, truncate | |
| expanded = ' '.join(expanded.split()[:15]) | |
| if len(expanded.split()) < 3: | |
| # Expansion failed, return original | |
| return query | |
| return expanded | |
| except Exception as e: | |
| logger.error(f"LLM expansion error: {e}") | |
| return query | |
| def _cache_result(self, original: str, result: ExpandedQuery): | |
| """Cache expansion result""" | |
| if self.cache: | |
| cache_key = f"query_expansion:{original.lower()}" | |
| self.cache.set( | |
| cache_key, | |
| { | |
| "expanded": result.expanded, | |
| "was_expanded": result.was_expanded, | |
| "reason": result.expansion_reason, | |
| "confidence": result.confidence | |
| }, | |
| expiration=3600 # 1 hour | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SINGLETON INSTANCE | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Will be initialized with dependencies in main.py | |
| query_expander: Optional[QueryExpander] = None | |
| def initialize_query_expander(llm_adapter, cache=None): | |
| """Initialize global query expander instance""" | |
| global query_expander | |
| query_expander = QueryExpander(llm_adapter, cache) | |
| logger.info("Query expander initialized") | |