File size: 8,885 Bytes
6246bba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
"""
Query Expander & Rewriter

Improves query quality by:
- Expanding short/vague queries
- Fixing typos
- Adding context
- Clarifying ambiguous queries

Uses LLM only for short queries (<4 words) to minimize latency.
"""

import logging
import re
from typing import Dict, Any, Optional
from dataclasses import dataclass

logger = logging.getLogger(__name__)


@dataclass
class ExpandedQuery:
    """Result of query expansion"""
    original: str
    expanded: str
    was_expanded: bool
    expansion_reason: str
    confidence: float


class QueryExpander:
    """
    Expands and rewrites queries for better search results.
    
    Strategy:
    1. Check if expansion needed (short, vague, typos)
    2. Use LLM to expand (only for queries that need it)
    3. Cache expansions to avoid repeated LLM calls
    """
    
    # Queries that are too vague and need expansion
    VAGUE_PATTERNS = [
        r"^news$",
        r"^today'?s?\s+news$",
        r"^latest$",
        r"^breaking$",
        r"^updates?$",
        r"^ethiopia$",
        r"^africa$",
    ]
    
    # Common typos to fix
    TYPO_FIXES = {
        "ethopia": "ethiopia",
        "etiopia": "ethiopia",
        "ethiopa": "ethiopia",
        "todays": "today's",
        "whats": "what's",
        "wheres": "where's",
        "hows": "how's",
        "breakin": "breaking",
        "lates": "latest",
        "updat": "update",
    }
    
    def __init__(self, llm_adapter=None, cache=None):
        """
        Initialize query expander.
        
        Args:
            llm_adapter: LLM adapter for query expansion
            cache: Cache adapter for storing expansions
        """
        self.llm = llm_adapter
        self.cache = cache
    
    def expand(self, query: str) -> ExpandedQuery:
        """
        Expand query if needed.
        
        Args:
            query: Original user query
        
        Returns:
            ExpandedQuery with original and expanded versions
        """
        original = query.strip()
        
        # Step 1: Check cache first
        if self.cache:
            cache_key = f"query_expansion:{original.lower()}"
            cached = self.cache.get(cache_key)
            if cached:
                logger.debug(f"Query expansion cache hit: {original}")
                return ExpandedQuery(
                    original=original,
                    expanded=cached["expanded"],
                    was_expanded=cached["was_expanded"],
                    expansion_reason=cached["reason"],
                    confidence=cached["confidence"]
                )
        
        # Step 2: Fix typos first
        fixed_query = self._fix_typos(original)
        if fixed_query != original:
            logger.info(f"Fixed typos: '{original}' β†’ '{fixed_query}'")
        
        # Step 3: Check if expansion needed
        needs_expansion, reason = self._needs_expansion(fixed_query)
        
        if not needs_expansion:
            result = ExpandedQuery(
                original=original,
                expanded=fixed_query,
                was_expanded=False,
                expansion_reason="No expansion needed",
                confidence=1.0
            )
            self._cache_result(original, result)
            return result
        
        # Step 4: Expand using LLM
        if self.llm:
            try:
                expanded = self._expand_with_llm(fixed_query, reason)
                result = ExpandedQuery(
                    original=original,
                    expanded=expanded,
                    was_expanded=True,
                    expansion_reason=reason,
                    confidence=0.85
                )
                logger.info(f"Expanded query: '{original}' β†’ '{expanded}'")
                self._cache_result(original, result)
                return result
            except Exception as e:
                logger.error(f"Query expansion failed: {e}")
        
        # Step 5: Fallback - use fixed query
        result = ExpandedQuery(
            original=original,
            expanded=fixed_query,
            was_expanded=False,
            expansion_reason="LLM expansion failed",
            confidence=0.7
        )
        self._cache_result(original, result)
        return result
    
    def _fix_typos(self, query: str) -> str:
        """Fix common typos in query"""
        words = query.lower().split()
        fixed_words = []
        
        for word in words:
            # Remove punctuation for matching
            clean_word = re.sub(r'[^\w\s]', '', word)
            if clean_word in self.TYPO_FIXES:
                fixed_words.append(self.TYPO_FIXES[clean_word])
            else:
                fixed_words.append(word)
        
        return ' '.join(fixed_words)
    
    def _needs_expansion(self, query: str) -> tuple[bool, str]:
        """
        Check if query needs expansion.
        
        Returns:
            (needs_expansion, reason)
        """
        query_lower = query.lower().strip()
        word_count = len(query.split())
        
        # Check if too vague
        for pattern in self.VAGUE_PATTERNS:
            if re.match(pattern, query_lower, re.IGNORECASE):
                return True, "Vague query"
        
        # Check if too short (but not a proper noun)
        if word_count <= 2:
            # Don't expand if it's a location or proper noun
            if not self._is_proper_noun(query):
                return True, "Too short"
        
        # Check if missing context
        if word_count <= 3 and not any(
            kw in query_lower 
            for kw in ["news", "latest", "today", "breaking", "what", "when", "where", "who", "how", "why"]
        ):
            return True, "Missing context"
        
        return False, "No expansion needed"
    
    def _is_proper_noun(self, query: str) -> bool:
        """Check if query is a proper noun (location, name, etc.)"""
        # Simple heuristic: starts with capital letter
        words = query.split()
        return all(word[0].isupper() for word in words if word)
    
    def _expand_with_llm(self, query: str, reason: str) -> str:
        """
        Expand query using LLM.
        
        Args:
            query: Query to expand
            reason: Reason for expansion
        
        Returns:
            Expanded query
        """
        prompt = f"""You are a query expansion assistant for a news search system.

Task: Expand this short/vague query into a clear, specific news search query.

Rules:
1. Keep it concise (max 15 words)
2. Add context about what news the user wants
3. Preserve the original intent
4. Add temporal context if missing (e.g., "latest", "today")
5. Make it a natural question or statement

Original query: "{query}"
Reason for expansion: {reason}

Expanded query:"""
        
        try:
            expanded = self.llm.generate(prompt, max_tokens=50).strip()
            
            # Clean up the response
            expanded = expanded.strip('"\'')
            
            # Validate expansion
            if len(expanded.split()) > 20:
                # Too long, truncate
                expanded = ' '.join(expanded.split()[:15])
            
            if len(expanded.split()) < 3:
                # Expansion failed, return original
                return query
            
            return expanded
            
        except Exception as e:
            logger.error(f"LLM expansion error: {e}")
            return query
    
    def _cache_result(self, original: str, result: ExpandedQuery):
        """Cache expansion result"""
        if self.cache:
            cache_key = f"query_expansion:{original.lower()}"
            self.cache.set(
                cache_key,
                {
                    "expanded": result.expanded,
                    "was_expanded": result.was_expanded,
                    "reason": result.expansion_reason,
                    "confidence": result.confidence
                },
                expiration=3600  # 1 hour
            )


# ═══════════════════════════════════════════════════════════════════════════
# SINGLETON INSTANCE
# ═══════════════════════════════════════════════════════════════════════════

# Will be initialized with dependencies in main.py
query_expander: Optional[QueryExpander] = None


def initialize_query_expander(llm_adapter, cache=None):
    """Initialize global query expander instance"""
    global query_expander
    query_expander = QueryExpander(llm_adapter, cache)
    logger.info("Query expander initialized")