Madras1 commited on
Commit
db9bbfd
·
verified ·
1 Parent(s): d70e556

Upload 38 files

Browse files
app/agents/deep_research.py ADDED
@@ -0,0 +1,249 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Deep Research Orchestrator.
2
+
3
+ Coordinates the full deep research pipeline:
4
+ 1. Planning (query decomposition)
5
+ 2. Parallel searching (multiple dimensions)
6
+ 3. Report synthesis
7
+ """
8
+
9
+ import asyncio
10
+ import json
11
+ import time
12
+ from typing import AsyncIterator, Optional
13
+
14
+ from app.agents.planner import create_research_plan, ResearchPlan, ResearchDimension
15
+ from app.agents.llm_client import generate_completion_stream
16
+ from app.config import get_settings
17
+ from app.sources.tavily import search_tavily
18
+ from app.sources.duckduckgo import search_duckduckgo
19
+ from app.reranking.pipeline import rerank_results
20
+ from app.temporal.freshness_scorer import calculate_freshness_score
21
+
22
+
23
+ class DimensionResult:
24
+ """Results from researching a single dimension."""
25
+
26
+ def __init__(self, dimension: ResearchDimension):
27
+ self.dimension = dimension
28
+ self.results: list[dict] = []
29
+ self.error: Optional[str] = None
30
+
31
+
32
+ async def run_deep_research(
33
+ query: str,
34
+ max_dimensions: int = 6,
35
+ max_sources_per_dim: int = 5,
36
+ max_total_searches: int = 20,
37
+ ) -> AsyncIterator[str]:
38
+ """
39
+ Run a deep research pipeline with streaming progress.
40
+
41
+ Yields SSE-formatted events as the research progresses.
42
+
43
+ Args:
44
+ query: The research query
45
+ max_dimensions: Maximum dimensions to research
46
+ max_sources_per_dim: Max results per dimension
47
+ max_total_searches: Total Tavily API calls allowed
48
+
49
+ Yields:
50
+ SSE event strings in format: data: {json}\n\n
51
+ """
52
+ start_time = time.perf_counter()
53
+ settings = get_settings()
54
+
55
+ try:
56
+ # === PHASE 1: PLANNING ===
57
+ yield _sse_event("status", {"phase": "planning", "message": "Analyzing query..."})
58
+
59
+ plan = await create_research_plan(query, max_dimensions)
60
+
61
+ yield _sse_event("plan_ready", {
62
+ "refined_query": plan.refined_query,
63
+ "dimensions": [
64
+ {"name": d.name, "description": d.description, "priority": d.priority}
65
+ for d in plan.dimensions
66
+ ],
67
+ "estimated_sources": plan.estimated_sources,
68
+ })
69
+
70
+ # === PHASE 2: PARALLEL SEARCHING ===
71
+ yield _sse_event("status", {"phase": "searching", "message": "Researching dimensions..."})
72
+
73
+ # Distribute search budget across dimensions
74
+ num_dimensions = len(plan.dimensions)
75
+ searches_per_dim = max(1, max_total_searches // num_dimensions)
76
+
77
+ dimension_results: list[DimensionResult] = []
78
+
79
+ # Search dimensions in parallel batches
80
+ for i, dimension in enumerate(plan.dimensions):
81
+ yield _sse_event("dimension_start", {
82
+ "index": i + 1,
83
+ "total": num_dimensions,
84
+ "name": dimension.name,
85
+ "query": dimension.search_query,
86
+ })
87
+
88
+ # Search this dimension
89
+ result = await _search_dimension(
90
+ dimension=dimension,
91
+ max_results=max_sources_per_dim,
92
+ max_searches=searches_per_dim,
93
+ )
94
+ dimension_results.append(result)
95
+
96
+ yield _sse_event("dimension_complete", {
97
+ "index": i + 1,
98
+ "name": dimension.name,
99
+ "results_count": len(result.results),
100
+ "error": result.error,
101
+ })
102
+
103
+ # Small delay to avoid rate limits
104
+ await asyncio.sleep(0.1)
105
+
106
+ # === PHASE 3: SYNTHESIS ===
107
+ yield _sse_event("status", {"phase": "synthesizing", "message": "Generating report..."})
108
+ yield _sse_event("synthesis_start", {})
109
+
110
+ # Stream the report generation
111
+ async for chunk in _synthesize_report_stream(query, plan, dimension_results):
112
+ yield _sse_event("report_chunk", {"content": chunk})
113
+
114
+ # === COMPLETE ===
115
+ total_time = time.perf_counter() - start_time
116
+ total_sources = sum(len(r.results) for r in dimension_results)
117
+
118
+ yield _sse_event("done", {
119
+ "total_sources": total_sources,
120
+ "total_dimensions": num_dimensions,
121
+ "total_time_seconds": round(total_time, 2),
122
+ })
123
+
124
+ except Exception as e:
125
+ yield _sse_event("error", {"message": str(e)})
126
+
127
+
128
+ async def _search_dimension(
129
+ dimension: ResearchDimension,
130
+ max_results: int = 5,
131
+ max_searches: int = 2,
132
+ ) -> DimensionResult:
133
+ """Search a single dimension using available sources."""
134
+ result = DimensionResult(dimension)
135
+
136
+ try:
137
+ settings = get_settings()
138
+ all_results = []
139
+
140
+ # Try Tavily first
141
+ if settings.tavily_api_key and max_searches > 0:
142
+ tavily_results = await search_tavily(
143
+ query=dimension.search_query,
144
+ max_results=max_results,
145
+ freshness="any",
146
+ )
147
+ all_results.extend(tavily_results)
148
+
149
+ # Supplement with DuckDuckGo if needed
150
+ if len(all_results) < max_results:
151
+ ddg_results = await search_duckduckgo(
152
+ query=dimension.search_query,
153
+ max_results=max_results - len(all_results),
154
+ )
155
+ all_results.extend(ddg_results)
156
+
157
+ # Light reranking (without embeddings for speed)
158
+ if all_results:
159
+ ranked = await rerank_results(
160
+ query=dimension.search_query,
161
+ results=all_results,
162
+ temporal_urgency=0.5,
163
+ max_results=max_results,
164
+ use_embeddings=False, # Skip for speed
165
+ )
166
+ result.results = ranked
167
+
168
+ except Exception as e:
169
+ result.error = str(e)
170
+
171
+ return result
172
+
173
+
174
+ async def _synthesize_report_stream(
175
+ original_query: str,
176
+ plan: ResearchPlan,
177
+ dimension_results: list[DimensionResult],
178
+ ) -> AsyncIterator[str]:
179
+ """Stream the synthesis of the final report."""
180
+
181
+ # Build context from all dimension results
182
+ context_parts = []
183
+ all_sources = []
184
+ source_index = 1
185
+
186
+ for dr in dimension_results:
187
+ if dr.results:
188
+ context_parts.append(f"\n## {dr.dimension.name}\n")
189
+ for r in dr.results:
190
+ context_parts.append(
191
+ f"[{source_index}] {r.get('title', 'Untitled')}\n"
192
+ f" URL: {r.get('url', '')}\n"
193
+ f" Content: {r.get('content', '')[:400]}...\n"
194
+ )
195
+ all_sources.append({
196
+ "index": source_index,
197
+ "title": r.get("title", ""),
198
+ "url": r.get("url", ""),
199
+ })
200
+ source_index += 1
201
+
202
+ context = "\n".join(context_parts)
203
+
204
+ # Build synthesis prompt
205
+ prompt = f"""You are a research analyst. Create a comprehensive research report based on the gathered information.
206
+
207
+ ORIGINAL QUERY: {original_query}
208
+ REFINED QUERY: {plan.refined_query}
209
+
210
+ RESEARCH DIMENSIONS:
211
+ {', '.join(d.name for d in plan.dimensions)}
212
+
213
+ GATHERED INFORMATION:
214
+ {context}
215
+
216
+ INSTRUCTIONS:
217
+ 1. Write a comprehensive research report in Markdown format
218
+ 2. Start with an Executive Summary (2-3 paragraphs)
219
+ 3. Create a section for each research dimension
220
+ 4. Use citations [1], [2], etc. to reference sources
221
+ 5. Include a Conclusion section
222
+ 6. Be thorough but concise
223
+ 7. Write in the same language as the query
224
+ 8. Use headers (##) to organize sections
225
+
226
+ Generate the report:"""
227
+
228
+ messages = [
229
+ {"role": "system", "content": "You are a research analyst creating detailed reports."},
230
+ {"role": "user", "content": prompt},
231
+ ]
232
+
233
+ try:
234
+ async for chunk in generate_completion_stream(messages, temperature=0.4):
235
+ yield chunk
236
+
237
+ # Append sources at the end
238
+ yield "\n\n---\n\n## Sources\n\n"
239
+ for src in all_sources:
240
+ yield f"[{src['index']}] [{src['title']}]({src['url']})\n"
241
+
242
+ except Exception as e:
243
+ yield f"\n\n**Error generating report:** {e}"
244
+
245
+
246
+ def _sse_event(event_type: str, data: dict) -> str:
247
+ """Format an SSE event."""
248
+ payload = {"type": event_type, **data}
249
+ return f"data: {json.dumps(payload)}\n\n"
app/agents/planner.py ADDED
@@ -0,0 +1,133 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Research Planner Agent.
2
+
3
+ Decomposes complex queries into multiple research dimensions.
4
+ """
5
+
6
+ import json
7
+ from typing import Optional
8
+
9
+ from pydantic import BaseModel, Field
10
+
11
+ from app.agents.llm_client import generate_completion
12
+ from app.config import get_settings
13
+
14
+
15
+ class ResearchDimension(BaseModel):
16
+ """A single dimension/aspect to research."""
17
+
18
+ name: str = Field(..., description="Short name for this dimension")
19
+ description: str = Field(..., description="What this dimension covers")
20
+ search_query: str = Field(..., description="Optimized search query for this dimension")
21
+ priority: int = Field(default=1, ge=1, le=3, description="1=high, 2=medium, 3=low")
22
+
23
+
24
+ class ResearchPlan(BaseModel):
25
+ """Complete research plan with all dimensions."""
26
+
27
+ original_query: str
28
+ refined_query: str = Field(..., description="Clarified version of the query")
29
+ dimensions: list[ResearchDimension]
30
+ estimated_sources: int = Field(default=20)
31
+
32
+
33
+ PLANNER_PROMPT = """You are a research planning assistant. Your job is to decompose a complex query into multiple research dimensions.
34
+
35
+ USER QUERY: {query}
36
+
37
+ INSTRUCTIONS:
38
+ 1. Analyze the query and identify 2-6 key dimensions/aspects that need to be researched
39
+ 2. Each dimension should be distinct and cover a different angle
40
+ 3. Create an optimized search query for each dimension
41
+ 4. Assign priority (1=high, 2=medium, 3=low) based on relevance to the main query
42
+ 5. Respond ONLY with valid JSON, no other text
43
+
44
+ OUTPUT FORMAT:
45
+ {{
46
+ "refined_query": "A clearer version of the user's query",
47
+ "dimensions": [
48
+ {{
49
+ "name": "Short name",
50
+ "description": "What this covers",
51
+ "search_query": "Optimized search query",
52
+ "priority": 1
53
+ }}
54
+ ]
55
+ }}
56
+
57
+ Generate the research plan:"""
58
+
59
+
60
+ async def create_research_plan(
61
+ query: str,
62
+ max_dimensions: int = 6,
63
+ ) -> ResearchPlan:
64
+ """
65
+ Create a research plan by decomposing a query into dimensions.
66
+
67
+ Args:
68
+ query: The user's research query
69
+ max_dimensions: Maximum number of dimensions to generate
70
+
71
+ Returns:
72
+ ResearchPlan with dimensions to investigate
73
+ """
74
+ settings = get_settings()
75
+
76
+ messages = [
77
+ {"role": "system", "content": "You are a research planning assistant. Always respond with valid JSON only."},
78
+ {"role": "user", "content": PLANNER_PROMPT.format(query=query)},
79
+ ]
80
+
81
+ try:
82
+ response = await generate_completion(messages, temperature=0.3)
83
+
84
+ # Parse JSON response
85
+ # Try to extract JSON if there's extra text
86
+ json_start = response.find("{")
87
+ json_end = response.rfind("}") + 1
88
+ if json_start >= 0 and json_end > json_start:
89
+ response = response[json_start:json_end]
90
+
91
+ data = json.loads(response)
92
+
93
+ # Build dimensions
94
+ dimensions = []
95
+ for dim_data in data.get("dimensions", [])[:max_dimensions]:
96
+ dimensions.append(ResearchDimension(
97
+ name=dim_data.get("name", "Unknown"),
98
+ description=dim_data.get("description", ""),
99
+ search_query=dim_data.get("search_query", query),
100
+ priority=dim_data.get("priority", 2),
101
+ ))
102
+
103
+ # Sort by priority
104
+ dimensions.sort(key=lambda d: d.priority)
105
+
106
+ return ResearchPlan(
107
+ original_query=query,
108
+ refined_query=data.get("refined_query", query),
109
+ dimensions=dimensions,
110
+ estimated_sources=len(dimensions) * 5,
111
+ )
112
+
113
+ except (json.JSONDecodeError, KeyError) as e:
114
+ # Fallback: create a simple 2-dimension plan
115
+ return ResearchPlan(
116
+ original_query=query,
117
+ refined_query=query,
118
+ dimensions=[
119
+ ResearchDimension(
120
+ name="Main Research",
121
+ description=f"Primary research on: {query}",
122
+ search_query=query,
123
+ priority=1,
124
+ ),
125
+ ResearchDimension(
126
+ name="Background",
127
+ description=f"Background and context for: {query}",
128
+ search_query=f"{query} background overview",
129
+ priority=2,
130
+ ),
131
+ ],
132
+ estimated_sources=10,
133
+ )
app/api/routes/search.py CHANGED
@@ -14,6 +14,7 @@ from app.api.schemas import (
14
  TemporalContext,
15
  Citation,
16
  ErrorResponse,
 
17
  )
18
  from app.config import get_settings
19
  from app.temporal.intent_detector import detect_temporal_intent
@@ -252,3 +253,38 @@ async def search_stream(request: SearchRequest):
252
  "X-Accel-Buffering": "no",
253
  },
254
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
  TemporalContext,
15
  Citation,
16
  ErrorResponse,
17
+ DeepResearchRequest,
18
  )
19
  from app.config import get_settings
20
  from app.temporal.intent_detector import detect_temporal_intent
 
253
  "X-Accel-Buffering": "no",
254
  },
255
  )
256
+
257
+
258
+ # === Deep Research Endpoints ===
259
+
260
+ @router.post(
261
+ "/research/deep",
262
+ summary="Deep research with multi-dimensional analysis",
263
+ description="Decompose a query into dimensions, search each in parallel, and generate a comprehensive report.",
264
+ )
265
+ async def deep_research(request: DeepResearchRequest):
266
+ """
267
+ Run deep research with streaming progress updates.
268
+
269
+ Returns SSE events:
270
+ - plan_ready: Research plan with dimensions
271
+ - dimension_start/complete: Progress per dimension
272
+ - report_chunk: Streaming report content
273
+ - done: Final summary
274
+ """
275
+ from app.agents.deep_research import run_deep_research
276
+
277
+ return StreamingResponse(
278
+ run_deep_research(
279
+ query=request.query,
280
+ max_dimensions=request.max_dimensions,
281
+ max_sources_per_dim=request.max_sources_per_dim,
282
+ max_total_searches=request.max_total_searches,
283
+ ),
284
+ media_type="text/event-stream",
285
+ headers={
286
+ "Cache-Control": "no-cache",
287
+ "Connection": "keep-alive",
288
+ "X-Accel-Buffering": "no",
289
+ },
290
+ )
app/api/schemas.py CHANGED
@@ -110,3 +110,29 @@ class ErrorResponse(BaseModel):
110
 
111
  error: str = Field(..., description="Error message")
112
  detail: str | None = Field(default=None, description="Detailed error information")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
110
 
111
  error: str = Field(..., description="Error message")
112
  detail: str | None = Field(default=None, description="Detailed error information")
113
+
114
+
115
+ # === Deep Research Models ===
116
+
117
+ class DeepResearchRequest(BaseModel):
118
+ """Deep research request payload."""
119
+
120
+ query: str = Field(..., min_length=1, max_length=2000, description="Research query")
121
+ max_dimensions: int = Field(
122
+ default=5,
123
+ ge=2,
124
+ le=8,
125
+ description="Maximum research dimensions to explore"
126
+ )
127
+ max_sources_per_dim: int = Field(
128
+ default=5,
129
+ ge=1,
130
+ le=10,
131
+ description="Maximum sources per dimension"
132
+ )
133
+ max_total_searches: int = Field(
134
+ default=20,
135
+ ge=5,
136
+ le=30,
137
+ description="Maximum total API searches"
138
+ )
app/config.py CHANGED
@@ -38,6 +38,11 @@ class Settings(BaseSettings):
38
  max_search_results: int = 20
39
  max_final_results: int = 10
40
 
 
 
 
 
 
41
  @property
42
  def llm_api_key(self) -> str:
43
  """Get the appropriate API key based on provider."""
 
38
  max_search_results: int = 20
39
  max_final_results: int = 10
40
 
41
+ # Deep Research Settings
42
+ max_research_dimensions: int = 6
43
+ max_tavily_calls_per_research: int = 20
44
+ deep_research_model: str | None = None # Use main model if None
45
+
46
  @property
47
  def llm_api_key(self) -> str:
48
  """Get the appropriate API key based on provider."""