Madras1 commited on
Commit
7fbd9ac
·
verified ·
1 Parent(s): 9d1dc1d

Upload 36 files

Browse files
app/agents/llm_client.py CHANGED
@@ -5,12 +5,24 @@ Supports Groq and OpenRouter for LLM inference.
5
 
6
  import httpx
7
  import json
8
- from typing import Optional
9
  import asyncio
10
 
 
 
 
 
 
 
 
11
  from app.config import get_settings
12
 
13
 
 
 
 
 
 
14
  async def generate_completion(
15
  messages: list[dict],
16
  model: Optional[str] = None,
@@ -30,45 +42,65 @@ async def generate_completion(
30
  raise ValueError(f"Unknown LLM provider: {provider}")
31
 
32
 
 
 
 
 
 
 
33
  async def _call_groq(
34
  messages: list[dict],
35
  model: str,
36
  temperature: float,
37
  max_tokens: int,
38
  ) -> str:
39
- """Call Groq API."""
40
  settings = get_settings()
41
 
42
  if not settings.groq_api_key:
43
  raise ValueError("GROQ_API_KEY not configured")
44
 
45
- async with httpx.AsyncClient(timeout=60.0) as client:
46
- response = await client.post(
47
- "https://api.groq.com/openai/v1/chat/completions",
48
- headers={
49
- "Authorization": f"Bearer {settings.groq_api_key}",
50
- "Content-Type": "application/json",
51
- },
52
- json={
53
- "model": model,
54
- "messages": messages,
55
- "temperature": temperature,
56
- "max_tokens": max_tokens,
57
- },
58
- )
59
- response.raise_for_status()
60
- data = response.json()
61
-
62
- return data["choices"][0]["message"]["content"]
 
 
 
 
 
 
 
 
63
 
64
 
 
 
 
 
 
 
65
  async def _call_openrouter(
66
  messages: list[dict],
67
  model: str,
68
  temperature: float,
69
  max_tokens: int,
70
  ) -> str:
71
- """Call OpenRouter API - following official docs exactly."""
72
  settings = get_settings()
73
 
74
  if not settings.openrouter_api_key:
@@ -81,22 +113,80 @@ async def _call_openrouter(
81
  "X-Title": "Lancer Search API",
82
  }
83
 
84
- # Payload exactly like official docs
85
  payload = {
86
  "model": model,
87
  "messages": messages,
88
  }
89
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90
  async with httpx.AsyncClient(timeout=120.0) as client:
91
- response = await client.post(
 
92
  "https://openrouter.ai/api/v1/chat/completions",
93
  headers=headers,
94
- content=json.dumps(payload), # Using content= with json.dumps like official docs
95
- )
96
-
97
- if response.status_code != 200:
98
- error_text = response.text
99
- raise ValueError(f"OpenRouter error {response.status_code}: {error_text}")
100
-
101
- data = response.json()
102
- return data["choices"][0]["message"]["content"]
 
 
 
 
 
 
 
 
 
 
 
5
 
6
  import httpx
7
  import json
8
+ from typing import Optional, AsyncIterator
9
  import asyncio
10
 
11
+ from tenacity import (
12
+ retry,
13
+ stop_after_attempt,
14
+ wait_exponential,
15
+ retry_if_exception_type,
16
+ )
17
+
18
  from app.config import get_settings
19
 
20
 
21
+ class RetryableError(Exception):
22
+ """Error that should trigger a retry."""
23
+ pass
24
+
25
+
26
  async def generate_completion(
27
  messages: list[dict],
28
  model: Optional[str] = None,
 
42
  raise ValueError(f"Unknown LLM provider: {provider}")
43
 
44
 
45
+ @retry(
46
+ stop=stop_after_attempt(3),
47
+ wait=wait_exponential(multiplier=1, min=2, max=10),
48
+ retry=retry_if_exception_type(RetryableError),
49
+ reraise=True,
50
+ )
51
  async def _call_groq(
52
  messages: list[dict],
53
  model: str,
54
  temperature: float,
55
  max_tokens: int,
56
  ) -> str:
57
+ """Call Groq API with retry logic."""
58
  settings = get_settings()
59
 
60
  if not settings.groq_api_key:
61
  raise ValueError("GROQ_API_KEY not configured")
62
 
63
+ try:
64
+ async with httpx.AsyncClient(timeout=60.0) as client:
65
+ response = await client.post(
66
+ "https://api.groq.com/openai/v1/chat/completions",
67
+ headers={
68
+ "Authorization": f"Bearer {settings.groq_api_key}",
69
+ "Content-Type": "application/json",
70
+ },
71
+ json={
72
+ "model": model,
73
+ "messages": messages,
74
+ "temperature": temperature,
75
+ "max_tokens": max_tokens,
76
+ },
77
+ )
78
+
79
+ # Retry on rate limit or server errors
80
+ if response.status_code in (429, 502, 503, 504):
81
+ raise RetryableError(f"Groq error {response.status_code}")
82
+
83
+ response.raise_for_status()
84
+ data = response.json()
85
+
86
+ return data["choices"][0]["message"]["content"]
87
+ except httpx.TimeoutException as e:
88
+ raise RetryableError(f"Groq timeout: {e}")
89
 
90
 
91
+ @retry(
92
+ stop=stop_after_attempt(3),
93
+ wait=wait_exponential(multiplier=1, min=2, max=10),
94
+ retry=retry_if_exception_type(RetryableError),
95
+ reraise=True,
96
+ )
97
  async def _call_openrouter(
98
  messages: list[dict],
99
  model: str,
100
  temperature: float,
101
  max_tokens: int,
102
  ) -> str:
103
+ """Call OpenRouter API with retry logic."""
104
  settings = get_settings()
105
 
106
  if not settings.openrouter_api_key:
 
113
  "X-Title": "Lancer Search API",
114
  }
115
 
 
116
  payload = {
117
  "model": model,
118
  "messages": messages,
119
  }
120
 
121
+ try:
122
+ async with httpx.AsyncClient(timeout=120.0) as client:
123
+ response = await client.post(
124
+ "https://openrouter.ai/api/v1/chat/completions",
125
+ headers=headers,
126
+ content=json.dumps(payload),
127
+ )
128
+
129
+ # Retry on rate limit or server errors
130
+ if response.status_code in (429, 502, 503, 504):
131
+ raise RetryableError(f"OpenRouter error {response.status_code}")
132
+
133
+ if response.status_code != 200:
134
+ error_text = response.text
135
+ raise ValueError(f"OpenRouter error {response.status_code}: {error_text}")
136
+
137
+ data = response.json()
138
+ return data["choices"][0]["message"]["content"]
139
+ except httpx.TimeoutException as e:
140
+ raise RetryableError(f"OpenRouter timeout: {e}")
141
+
142
+
143
+ async def generate_completion_stream(
144
+ messages: list[dict],
145
+ model: Optional[str] = None,
146
+ temperature: float = 0.3,
147
+ max_tokens: int = 2048,
148
+ ) -> AsyncIterator[str]:
149
+ """Generate a streaming completion using OpenRouter."""
150
+ settings = get_settings()
151
+ model = model or settings.llm_model
152
+
153
+ if not settings.openrouter_api_key:
154
+ raise ValueError("OPENROUTER_API_KEY not configured")
155
+
156
+ headers = {
157
+ "Authorization": f"Bearer {settings.openrouter_api_key}",
158
+ "Content-Type": "application/json",
159
+ "HTTP-Referer": "https://madras1-lancer.hf.space",
160
+ "X-Title": "Lancer Search API",
161
+ }
162
+
163
+ payload = {
164
+ "model": model,
165
+ "messages": messages,
166
+ "stream": True,
167
+ }
168
+
169
  async with httpx.AsyncClient(timeout=120.0) as client:
170
+ async with client.stream(
171
+ "POST",
172
  "https://openrouter.ai/api/v1/chat/completions",
173
  headers=headers,
174
+ content=json.dumps(payload),
175
+ ) as response:
176
+ if response.status_code != 200:
177
+ error_text = await response.aread()
178
+ raise ValueError(f"OpenRouter streaming error {response.status_code}: {error_text}")
179
+
180
+ async for line in response.aiter_lines():
181
+ if line.startswith("data: "):
182
+ data_str = line[6:]
183
+ if data_str.strip() == "[DONE]":
184
+ break
185
+ try:
186
+ data = json.loads(data_str)
187
+ delta = data.get("choices", [{}])[0].get("delta", {})
188
+ content = delta.get("content", "")
189
+ if content:
190
+ yield content
191
+ except json.JSONDecodeError:
192
+ continue
app/agents/synthesizer.py CHANGED
@@ -4,10 +4,10 @@ Generates a coherent answer from search results with citations.
4
  """
5
 
6
  from datetime import datetime
7
- from typing import Optional
8
 
9
  from app.api.schemas import SearchResult, TemporalContext, Citation
10
- from app.agents.llm_client import generate_completion
11
 
12
 
13
  SYNTHESIS_PROMPT = """You are a research assistant that synthesizes information from search results.
@@ -54,6 +54,57 @@ async def synthesize_answer(
54
  if not results:
55
  return "No results found to synthesize an answer.", []
56
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
  # Format results for the prompt
58
  formatted_results = format_results_for_prompt(results[:10]) # Top 10 only
59
 
@@ -83,18 +134,14 @@ async def synthesize_answer(
83
  formatted_results=formatted_results,
84
  )
85
 
86
- messages = [
87
  {"role": "system", "content": "You are a helpful research assistant."},
88
  {"role": "user", "content": prompt},
89
  ]
90
-
91
- try:
92
- answer = await generate_completion(messages, temperature=0.3)
93
- except Exception as e:
94
- # Fallback: return a simple summary without LLM
95
- answer = f"Error generating synthesis: {e}. Please review the search results directly."
96
-
97
- # Build citations list
98
  citations = []
99
  for i, result in enumerate(results[:10], 1):
100
  citations.append(
@@ -104,8 +151,7 @@ async def synthesize_answer(
104
  title=result.title,
105
  )
106
  )
107
-
108
- return answer, citations
109
 
110
 
111
  def format_results_for_prompt(results: list[SearchResult]) -> str:
 
4
  """
5
 
6
  from datetime import datetime
7
+ from typing import Optional, AsyncIterator
8
 
9
  from app.api.schemas import SearchResult, TemporalContext, Citation
10
+ from app.agents.llm_client import generate_completion, generate_completion_stream
11
 
12
 
13
  SYNTHESIS_PROMPT = """You are a research assistant that synthesizes information from search results.
 
54
  if not results:
55
  return "No results found to synthesize an answer.", []
56
 
57
+ messages = _build_messages(query, results, temporal_context)
58
+
59
+ try:
60
+ answer = await generate_completion(messages, temperature=0.3)
61
+ except Exception as e:
62
+ # Fallback: return a simple summary without LLM
63
+ answer = f"Error generating synthesis: {e}. Please review the search results directly."
64
+
65
+ # Build citations list
66
+ citations = _build_citations(results)
67
+
68
+ return answer, citations
69
+
70
+
71
+ async def synthesize_answer_stream(
72
+ query: str,
73
+ results: list[SearchResult],
74
+ temporal_context: Optional[TemporalContext] = None,
75
+ ) -> AsyncIterator[str]:
76
+ """
77
+ Synthesize an answer with streaming output.
78
+
79
+ Yields chunks of the answer as they are generated.
80
+
81
+ Args:
82
+ query: Original search query
83
+ results: List of search results to synthesize from
84
+ temporal_context: Temporal analysis context
85
+
86
+ Yields:
87
+ Chunks of the answer text
88
+ """
89
+ if not results:
90
+ yield "No results found to synthesize an answer."
91
+ return
92
+
93
+ messages = _build_messages(query, results, temporal_context)
94
+
95
+ try:
96
+ async for chunk in generate_completion_stream(messages, temperature=0.3):
97
+ yield chunk
98
+ except Exception as e:
99
+ yield f"Error generating synthesis: {e}. Please review the search results directly."
100
+
101
+
102
+ def _build_messages(
103
+ query: str,
104
+ results: list[SearchResult],
105
+ temporal_context: Optional[TemporalContext] = None,
106
+ ) -> list[dict]:
107
+ """Build messages for LLM prompt."""
108
  # Format results for the prompt
109
  formatted_results = format_results_for_prompt(results[:10]) # Top 10 only
110
 
 
134
  formatted_results=formatted_results,
135
  )
136
 
137
+ return [
138
  {"role": "system", "content": "You are a helpful research assistant."},
139
  {"role": "user", "content": prompt},
140
  ]
141
+
142
+
143
+ def _build_citations(results: list[SearchResult]) -> list[Citation]:
144
+ """Build citations list from results."""
 
 
 
 
145
  citations = []
146
  for i, result in enumerate(results[:10], 1):
147
  citations.append(
 
151
  title=result.title,
152
  )
153
  )
154
+ return citations
 
155
 
156
 
157
  def format_results_for_prompt(results: list[SearchResult]) -> str:
app/api/routes/search.py CHANGED
@@ -1,9 +1,11 @@
1
  """Search API routes."""
2
 
 
3
  import time
4
  from datetime import datetime
5
 
6
  from fastapi import APIRouter, HTTPException
 
7
 
8
  from app.api.schemas import (
9
  SearchRequest,
@@ -19,7 +21,7 @@ from app.temporal.freshness_scorer import calculate_freshness_score
19
  from app.sources.tavily import search_tavily
20
  from app.sources.duckduckgo import search_duckduckgo
21
  from app.reranking.pipeline import rerank_results
22
- from app.agents.synthesizer import synthesize_answer
23
 
24
  router = APIRouter()
25
 
@@ -144,3 +146,109 @@ async def search_raw(request: SearchRequest) -> SearchResponse:
144
  """Fast search without answer synthesis."""
145
  request.include_answer = False
146
  return await search(request)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """Search API routes."""
2
 
3
+ import json
4
  import time
5
  from datetime import datetime
6
 
7
  from fastapi import APIRouter, HTTPException
8
+ from fastapi.responses import StreamingResponse
9
 
10
  from app.api.schemas import (
11
  SearchRequest,
 
21
  from app.sources.tavily import search_tavily
22
  from app.sources.duckduckgo import search_duckduckgo
23
  from app.reranking.pipeline import rerank_results
24
+ from app.agents.synthesizer import synthesize_answer, synthesize_answer_stream
25
 
26
  router = APIRouter()
27
 
 
146
  """Fast search without answer synthesis."""
147
  request.include_answer = False
148
  return await search(request)
149
+
150
+
151
+ @router.post(
152
+ "/search/stream",
153
+ summary="Search with streaming synthesis",
154
+ description="Perform a search and stream the AI-synthesized answer in real-time using SSE.",
155
+ )
156
+ async def search_stream(request: SearchRequest):
157
+ """
158
+ Streaming search with Server-Sent Events.
159
+
160
+ Returns results first, then streams the answer as it's generated.
161
+ """
162
+ settings = get_settings()
163
+
164
+ async def event_generator():
165
+ try:
166
+ # Step 1: Analyze temporal intent
167
+ temporal_intent, temporal_urgency = detect_temporal_intent(request.query)
168
+
169
+ temporal_context = TemporalContext(
170
+ query_temporal_intent=temporal_intent,
171
+ temporal_urgency=temporal_urgency,
172
+ current_date=datetime.now().strftime("%Y-%m-%d"),
173
+ )
174
+
175
+ # Step 2: Search sources
176
+ raw_results = []
177
+
178
+ if settings.tavily_api_key:
179
+ tavily_results = await search_tavily(
180
+ query=request.query,
181
+ max_results=settings.max_search_results,
182
+ freshness=request.freshness,
183
+ include_domains=request.include_domains,
184
+ exclude_domains=request.exclude_domains,
185
+ )
186
+ raw_results.extend(tavily_results)
187
+
188
+ if not raw_results:
189
+ ddg_results = await search_duckduckgo(
190
+ query=request.query,
191
+ max_results=settings.max_search_results,
192
+ )
193
+ raw_results.extend(ddg_results)
194
+
195
+ if not raw_results:
196
+ yield f"data: {json.dumps({'type': 'error', 'content': 'No results found'})}\n\n"
197
+ return
198
+
199
+ # Step 3: Rerank
200
+ ranked_results = await rerank_results(
201
+ query=request.query,
202
+ results=raw_results,
203
+ temporal_urgency=temporal_urgency,
204
+ max_results=request.max_results,
205
+ )
206
+
207
+ # Step 4: Convert to SearchResult models
208
+ search_results = []
209
+ for result in ranked_results:
210
+ freshness = calculate_freshness_score(result.get("published_date"))
211
+ search_results.append(
212
+ SearchResult(
213
+ title=result.get("title", ""),
214
+ url=result.get("url", ""),
215
+ content=result.get("content", ""),
216
+ score=result.get("score", 0.5),
217
+ published_date=result.get("published_date"),
218
+ freshness_score=freshness,
219
+ authority_score=result.get("authority_score", 0.5),
220
+ )
221
+ )
222
+
223
+ # Send results first
224
+ results_data = {
225
+ "type": "results",
226
+ "results": [r.model_dump(mode="json") for r in search_results],
227
+ "temporal_context": temporal_context.model_dump(),
228
+ }
229
+ yield f"data: {json.dumps(results_data)}\n\n"
230
+
231
+ # Step 5: Stream answer
232
+ yield f"data: {json.dumps({'type': 'answer_start'})}\n\n"
233
+
234
+ async for chunk in synthesize_answer_stream(
235
+ query=request.query,
236
+ results=search_results,
237
+ temporal_context=temporal_context,
238
+ ):
239
+ yield f"data: {json.dumps({'type': 'answer_chunk', 'content': chunk})}\n\n"
240
+
241
+ yield f"data: {json.dumps({'type': 'done'})}\n\n"
242
+
243
+ except Exception as e:
244
+ yield f"data: {json.dumps({'type': 'error', 'content': str(e)})}\n\n"
245
+
246
+ return StreamingResponse(
247
+ event_generator(),
248
+ media_type="text/event-stream",
249
+ headers={
250
+ "Cache-Control": "no-cache",
251
+ "Connection": "keep-alive",
252
+ "X-Accel-Buffering": "no",
253
+ },
254
+ )
app/reranking/embeddings.py ADDED
@@ -0,0 +1,102 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Embedding-based reranking using sentence-transformers.
2
+
3
+ Provides bi-encoder and cross-encoder reranking for better relevance scoring.
4
+ """
5
+
6
+ from functools import lru_cache
7
+ from typing import Optional
8
+
9
+ import numpy as np
10
+
11
+ from app.config import get_settings
12
+
13
+
14
+ @lru_cache(maxsize=1)
15
+ def get_bi_encoder():
16
+ """Load and cache the bi-encoder model."""
17
+ from sentence_transformers import SentenceTransformer
18
+ settings = get_settings()
19
+ return SentenceTransformer(settings.bi_encoder_model)
20
+
21
+
22
+ @lru_cache(maxsize=1)
23
+ def get_cross_encoder():
24
+ """Load and cache the cross-encoder model."""
25
+ from sentence_transformers import CrossEncoder
26
+ settings = get_settings()
27
+ return CrossEncoder(settings.cross_encoder_model)
28
+
29
+
30
+ def compute_bi_encoder_scores(
31
+ query: str,
32
+ documents: list[str],
33
+ ) -> list[float]:
34
+ """
35
+ Compute semantic similarity scores using bi-encoder.
36
+
37
+ Fast but less accurate than cross-encoder.
38
+ Good for initial filtering of large result sets.
39
+
40
+ Args:
41
+ query: Search query
42
+ documents: List of document texts
43
+
44
+ Returns:
45
+ List of similarity scores (0-1)
46
+ """
47
+ if not documents:
48
+ return []
49
+
50
+ model = get_bi_encoder()
51
+
52
+ # Encode query and documents
53
+ query_embedding = model.encode(query, normalize_embeddings=True)
54
+ doc_embeddings = model.encode(documents, normalize_embeddings=True)
55
+
56
+ # Compute cosine similarities (embeddings are normalized, so dot product = cosine)
57
+ similarities = np.dot(doc_embeddings, query_embedding)
58
+
59
+ # Convert to list and ensure values are in [0, 1]
60
+ scores = [(float(s) + 1) / 2 for s in similarities] # Map from [-1, 1] to [0, 1]
61
+
62
+ return scores
63
+
64
+
65
+ def compute_cross_encoder_scores(
66
+ query: str,
67
+ documents: list[str],
68
+ ) -> list[float]:
69
+ """
70
+ Compute relevance scores using cross-encoder.
71
+
72
+ More accurate than bi-encoder but slower.
73
+ Use after initial filtering for precise ranking.
74
+
75
+ Args:
76
+ query: Search query
77
+ documents: List of document texts
78
+
79
+ Returns:
80
+ List of relevance scores (0-1)
81
+ """
82
+ if not documents:
83
+ return []
84
+
85
+ model = get_cross_encoder()
86
+
87
+ # Create query-document pairs
88
+ pairs = [[query, doc] for doc in documents]
89
+
90
+ # Get scores
91
+ scores = model.predict(pairs)
92
+
93
+ # Normalize to [0, 1] using sigmoid if needed
94
+ min_score = float(np.min(scores))
95
+ max_score = float(np.max(scores))
96
+
97
+ if max_score > min_score:
98
+ normalized = [(float(s) - min_score) / (max_score - min_score) for s in scores]
99
+ else:
100
+ normalized = [0.5] * len(scores)
101
+
102
+ return normalized
app/reranking/pipeline.py CHANGED
@@ -1,38 +1,44 @@
1
  """Multi-stage reranking pipeline.
2
 
3
  Implements a 3-stage reranking approach:
4
- 1. Bi-Encoder: Fast semantic similarity (optional, for large result sets)
5
  2. Cross-Encoder: Accurate relevance scoring
6
  3. Temporal + Authority: Freshness and domain trust weighting
7
  """
8
 
 
9
  from typing import Optional
10
 
11
  from app.temporal.freshness_scorer import calculate_freshness_score, adjust_score_by_freshness
12
  from app.reranking.authority_scorer import calculate_authority_score
13
 
 
 
 
 
 
14
 
15
  async def rerank_results(
16
  query: str,
17
  results: list[dict],
18
  temporal_urgency: float = 0.5,
19
  max_results: int = 10,
 
20
  ) -> list[dict]:
21
  """
22
  Apply multi-stage reranking to search results.
23
 
24
- For MVP, we use a simplified pipeline:
25
- - Calculate freshness scores
26
- - Calculate authority scores
27
- - Combine with original relevance scores
28
-
29
- Full pipeline with embeddings can be enabled later.
30
 
31
  Args:
32
  query: Original search query
33
  results: Raw search results
34
  temporal_urgency: How important freshness is (0-1)
35
  max_results: Maximum results to return
 
36
 
37
  Returns:
38
  Reranked results with updated scores
@@ -40,16 +46,19 @@ async def rerank_results(
40
  if not results:
41
  return []
42
 
43
- # Stage 1: Skip bi-encoder for now (MVP)
44
- # In production, use sentence-transformers for initial filtering of 100+ results
45
 
46
- # Stage 2: Skip cross-encoder for now (MVP)
47
- # In production, use BGE-reranker for precise scoring
 
 
 
 
 
 
48
 
49
  # Stage 3: Apply temporal + authority scoring
50
- scored_results = []
51
-
52
- for result in results:
53
  # Calculate freshness score
54
  freshness = calculate_freshness_score(result.get("published_date"))
55
  result["freshness_score"] = freshness
@@ -58,7 +67,7 @@ async def rerank_results(
58
  authority = calculate_authority_score(result.get("url", ""))
59
  result["authority_score"] = authority
60
 
61
- # Get base score (from search source)
62
  base_score = result.get("score", 0.5)
63
 
64
  # Adjust for freshness based on temporal urgency
@@ -71,8 +80,6 @@ async def rerank_results(
71
  # Also factor in authority (10% weight)
72
  final_score = (adjusted_score * 0.9) + (authority * 0.1)
73
  result["score"] = final_score
74
-
75
- scored_results.append(result)
76
 
77
  # Sort by final score (descending)
78
  scored_results.sort(key=lambda x: x["score"], reverse=True)
@@ -80,20 +87,41 @@ async def rerank_results(
80
  return scored_results[:max_results]
81
 
82
 
83
- async def rerank_with_embeddings(
84
  query: str,
85
  results: list[dict],
86
- max_results: int = 10,
87
  ) -> list[dict]:
88
- """
89
- Full reranking with embedding models.
90
 
91
- TODO: Implement when adding sentence-transformers support:
92
- 1. Use bi-encoder for fast filtering
93
- 2. Use cross-encoder for precise scoring
 
 
94
 
95
- This is a placeholder for the full implementation.
96
- """
97
- # For now, just return sorted by original score
98
- sorted_results = sorted(results, key=lambda x: x.get("score", 0), reverse=True)
99
- return sorted_results[:max_results]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """Multi-stage reranking pipeline.
2
 
3
  Implements a 3-stage reranking approach:
4
+ 1. Bi-Encoder: Fast semantic similarity (for large result sets)
5
  2. Cross-Encoder: Accurate relevance scoring
6
  3. Temporal + Authority: Freshness and domain trust weighting
7
  """
8
 
9
+ import logging
10
  from typing import Optional
11
 
12
  from app.temporal.freshness_scorer import calculate_freshness_score, adjust_score_by_freshness
13
  from app.reranking.authority_scorer import calculate_authority_score
14
 
15
+ logger = logging.getLogger(__name__)
16
+
17
+ # Flag to enable/disable embedding-based reranking
18
+ ENABLE_EMBEDDING_RERANKING = True
19
+
20
 
21
  async def rerank_results(
22
  query: str,
23
  results: list[dict],
24
  temporal_urgency: float = 0.5,
25
  max_results: int = 10,
26
+ use_embeddings: bool = True,
27
  ) -> list[dict]:
28
  """
29
  Apply multi-stage reranking to search results.
30
 
31
+ Pipeline:
32
+ 1. Bi-encoder: Quick semantic filtering (if results > 20)
33
+ 2. Cross-encoder: Precise relevance scoring (top candidates)
34
+ 3. Temporal + Authority: Freshness and trust weighting
 
 
35
 
36
  Args:
37
  query: Original search query
38
  results: Raw search results
39
  temporal_urgency: How important freshness is (0-1)
40
  max_results: Maximum results to return
41
+ use_embeddings: Whether to use embedding models
42
 
43
  Returns:
44
  Reranked results with updated scores
 
46
  if not results:
47
  return []
48
 
49
+ scored_results = results.copy()
 
50
 
51
+ # Stage 1 & 2: Embedding-based reranking
52
+ if use_embeddings and ENABLE_EMBEDDING_RERANKING:
53
+ try:
54
+ scored_results = await _apply_embedding_reranking(query, scored_results)
55
+ logger.info(f"Applied embedding reranking to {len(scored_results)} results")
56
+ except Exception as e:
57
+ logger.warning(f"Embedding reranking failed, using fallback: {e}")
58
+ # Fall through to basic scoring
59
 
60
  # Stage 3: Apply temporal + authority scoring
61
+ for result in scored_results:
 
 
62
  # Calculate freshness score
63
  freshness = calculate_freshness_score(result.get("published_date"))
64
  result["freshness_score"] = freshness
 
67
  authority = calculate_authority_score(result.get("url", ""))
68
  result["authority_score"] = authority
69
 
70
+ # Get base score (from search source or embedding)
71
  base_score = result.get("score", 0.5)
72
 
73
  # Adjust for freshness based on temporal urgency
 
80
  # Also factor in authority (10% weight)
81
  final_score = (adjusted_score * 0.9) + (authority * 0.1)
82
  result["score"] = final_score
 
 
83
 
84
  # Sort by final score (descending)
85
  scored_results.sort(key=lambda x: x["score"], reverse=True)
 
87
  return scored_results[:max_results]
88
 
89
 
90
+ async def _apply_embedding_reranking(
91
  query: str,
92
  results: list[dict],
 
93
  ) -> list[dict]:
94
+ """Apply bi-encoder and cross-encoder reranking."""
95
+ from app.reranking.embeddings import compute_bi_encoder_scores, compute_cross_encoder_scores
96
 
97
+ # Extract document contents for embedding
98
+ documents = [
99
+ f"{r.get('title', '')}. {r.get('content', '')[:500]}"
100
+ for r in results
101
+ ]
102
 
103
+ # Stage 1: Bi-encoder for initial scoring (fast)
104
+ if len(results) > 15:
105
+ bi_scores = compute_bi_encoder_scores(query, documents)
106
+ for i, result in enumerate(results):
107
+ result["bi_encoder_score"] = bi_scores[i]
108
+
109
+ # Sort by bi-encoder and keep top 15 for cross-encoder
110
+ results.sort(key=lambda x: x.get("bi_encoder_score", 0), reverse=True)
111
+ results = results[:15]
112
+ documents = documents[:15]
113
+
114
+ # Stage 2: Cross-encoder for precise scoring (slower but accurate)
115
+ cross_scores = compute_cross_encoder_scores(query, documents)
116
+
117
+ for i, result in enumerate(results):
118
+ # Blend cross-encoder score with original source score
119
+ original_score = result.get("score", 0.5)
120
+ cross_score = cross_scores[i]
121
+
122
+ # Cross-encoder gets 70% weight, original 30%
123
+ result["score"] = (cross_score * 0.7) + (original_score * 0.3)
124
+ result["cross_encoder_score"] = cross_score
125
+
126
+ return results
127
+
app/temporal/intent_detector.py CHANGED
@@ -5,15 +5,23 @@ or if historical information is acceptable.
5
  """
6
 
7
  import re
 
8
  from typing import Literal
9
 
 
 
 
 
 
 
 
10
  # Keywords that strongly indicate need for current information
11
  FRESHNESS_KEYWORDS = {
12
  # English
13
  "latest", "newest", "recent", "current", "today", "now",
14
  "this week", "this month", "this year", "breaking",
15
  "update", "updates", "new", "just", "announced",
16
- "2024", "2025", # Current years
17
  # Portuguese
18
  "último", "últimos", "recente", "atual", "hoje", "agora",
19
  "essa semana", "esse mês", "esse ano", "novidade",
 
5
  """
6
 
7
  import re
8
+ from datetime import datetime
9
  from typing import Literal
10
 
11
+
12
+ def _get_dynamic_years() -> set[str]:
13
+ """Get current and previous year dynamically."""
14
+ current_year = datetime.now().year
15
+ return {str(current_year), str(current_year - 1)}
16
+
17
+
18
  # Keywords that strongly indicate need for current information
19
  FRESHNESS_KEYWORDS = {
20
  # English
21
  "latest", "newest", "recent", "current", "today", "now",
22
  "this week", "this month", "this year", "breaking",
23
  "update", "updates", "new", "just", "announced",
24
+ *_get_dynamic_years(), # Dynamic years
25
  # Portuguese
26
  "último", "últimos", "recente", "atual", "hoje", "agora",
27
  "essa semana", "esse mês", "esse ano", "novidade",