File size: 10,322 Bytes
f871fed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
"""
Research API Router

Provides endpoints for the multi-agent research pipeline.
"""

from typing import List, Optional
from datetime import datetime
import uuid

from fastapi import APIRouter, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
from loguru import logger

from open_notebook.graphs.research import run_research, research_graph


router = APIRouter(prefix="/research", tags=["research"])


# ============================================================================
# Request/Response Models
# ============================================================================

class ResearchRequest(BaseModel):
    """Request to start a research task"""
    query: str = Field(..., description="The research question or topic")
    source_ids: Optional[List[str]] = Field(default=None, description="Specific source IDs to use")
    research_type: Optional[str] = Field(default=None, description="Override research type detection")
    llm_config: Optional[dict] = Field(default=None, description="Model configuration overrides")


class ResearchProgress(BaseModel):
    """Progress update for a research task"""
    task_id: str
    status: str  # pending, routing, researching, fact_checking, synthesizing, reporting, completed, error
    current_step: str
    progress_percent: int
    message: str
    started_at: datetime
    updated_at: datetime


class Citation(BaseModel):
    """A citation from the research"""
    source_id: str
    title: str
    quote: Optional[str] = None


class ResearchResult(BaseModel):
    """The result of a research task"""
    task_id: str
    query: str
    research_type: str
    scholar_findings: str
    fact_check_results: str
    synthesis: str
    final_report: str
    citations: List[Citation]
    metadata: dict
    created_at: datetime
    completed_at: Optional[datetime] = None


class ResearchSummary(BaseModel):
    """Summary of a research result for listing"""
    task_id: str
    query: str
    research_type: str
    status: str
    created_at: datetime
    completed_at: Optional[datetime] = None


# ============================================================================
# In-memory storage (replace with database in production)
# ============================================================================

_research_tasks: dict = {}
_research_results: dict = {}


# ============================================================================
# Endpoints
# ============================================================================

@router.post("/start", response_model=ResearchProgress)
async def start_research(request: ResearchRequest, background_tasks: BackgroundTasks):
    """
    Start a new research task.
    The research runs asynchronously and progress can be checked via the status endpoint.
    """
    task_id = str(uuid.uuid4())
    now = datetime.now()
    
    progress = ResearchProgress(
        task_id=task_id,
        status="pending",
        current_step="Initializing research pipeline",
        progress_percent=0,
        message="Research task queued",
        started_at=now,
        updated_at=now
    )
    
    _research_tasks[task_id] = progress
    
    # Run research in background
    background_tasks.add_task(execute_research, task_id, request)
    
    logger.info(f"Started research task {task_id} for query: {request.query[:100]}...")
    
    return progress


async def execute_research(task_id: str, request: ResearchRequest):
    """Execute the research pipeline (runs in background)"""
    try:
        # Update status
        _research_tasks[task_id].status = "routing"
        _research_tasks[task_id].current_step = "Analyzing query and determining research approach"
        _research_tasks[task_id].progress_percent = 10
        _research_tasks[task_id].updated_at = datetime.now()
        
        # Build config
        config = request.llm_config or {}
        
        # Add timeout protection (5 minutes for async research)
        import asyncio
        try:
            result = await asyncio.wait_for(
                run_research(request.query, config),
                timeout=300.0  # 5 minutes
            )
        except asyncio.TimeoutError:
            logger.error(f"Research task {task_id} timed out after 300 seconds")
            _research_tasks[task_id].status = "error"
            _research_tasks[task_id].message = "Research timed out. Please try a more specific query."
            _research_tasks[task_id].updated_at = datetime.now()
            return
        
        # Update progress through stages
        _research_tasks[task_id].status = "completed"
        _research_tasks[task_id].current_step = "Research complete"
        _research_tasks[task_id].progress_percent = 100
        _research_tasks[task_id].message = "Research completed successfully"
        _research_tasks[task_id].updated_at = datetime.now()
        
        # Store result
        citations = [
            Citation(
                source_id=c.get("source_id", "") or "",
                title=c.get("title", "") or "Untitled"
            )
            for c in result.get("citations", [])
        ]
        
        _research_results[task_id] = ResearchResult(
            task_id=task_id,
            query=request.query,
            research_type=result.get("research_type", "deep_dive"),
            scholar_findings=result.get("scholar_findings", ""),
            fact_check_results=result.get("fact_check_results", ""),
            synthesis=result.get("synthesis", ""),
            final_report=result.get("final_report", ""),
            citations=citations,
            metadata=result.get("metadata", {}),
            created_at=_research_tasks[task_id].started_at,
            completed_at=datetime.now()
        )
        
        logger.info(f"Research task {task_id} completed successfully")
        
    except Exception as e:
        logger.error(f"Research task {task_id} failed: {str(e)}")
        logger.exception(e)
        _research_tasks[task_id].status = "error"
        _research_tasks[task_id].message = f"Research failed: {str(e)}"
        _research_tasks[task_id].updated_at = datetime.now()


@router.get("/status/{task_id}", response_model=ResearchProgress)
async def get_research_status(task_id: str):
    """Get the current status of a research task"""
    if task_id not in _research_tasks:
        raise HTTPException(status_code=404, detail="Research task not found")
    
    return _research_tasks[task_id]


@router.get("/result/{task_id}", response_model=ResearchResult)
async def get_research_result(task_id: str):
    """Get the result of a completed research task"""
    if task_id not in _research_results:
        if task_id in _research_tasks:
            status = _research_tasks[task_id].status
            if status != "completed":
                raise HTTPException(
                    status_code=202, 
                    detail=f"Research still in progress. Current status: {status}"
                )
        raise HTTPException(status_code=404, detail="Research result not found")
    
    return _research_results[task_id]


@router.get("/history", response_model=List[ResearchSummary])
async def get_research_history(limit: int = 20, offset: int = 0):
    """Get history of research tasks"""
    summaries = []
    
    for task_id, progress in list(_research_tasks.items())[offset:offset + limit]:
        result = _research_results.get(task_id)
        summaries.append(ResearchSummary(
            task_id=task_id,
            query=result.query if result else "Unknown",
            research_type=result.research_type if result else "unknown",
            status=progress.status,
            created_at=progress.started_at,
            completed_at=result.completed_at if result else None
        ))
    
    return summaries


@router.post("/quick", response_model=ResearchResult)
async def quick_research(request: ResearchRequest):
    """
    Run a synchronous research task and return results immediately.
    Use for shorter queries where waiting is acceptable.
    """
    task_id = str(uuid.uuid4())
    now = datetime.now()
    
    logger.info(f"Running quick research for query: {request.query[:100]}...")
    
    try:
        config = request.llm_config or {}
        
        # Add timeout protection (2 minutes for quick research)
        import asyncio
        try:
            result = await asyncio.wait_for(
                run_research(request.query, config),
                timeout=120.0  # 2 minutes
            )
        except asyncio.TimeoutError:
            logger.error(f"Quick research timed out after 120 seconds")
            raise HTTPException(
                status_code=408, 
                detail="Research took too long to complete. Please try a more specific query or use the async endpoint."
            )
        
        citations = [
            Citation(
                source_id=c.get("source_id", "") or "",
                title=c.get("title", "") or "Untitled"
            )
            for c in result.get("citations", [])
        ]
        
        return ResearchResult(
            task_id=task_id,
            query=request.query,
            research_type=result.get("research_type", "deep_dive"),
            scholar_findings=result.get("scholar_findings", ""),
            fact_check_results=result.get("fact_check_results", ""),
            synthesis=result.get("synthesis", ""),
            final_report=result.get("final_report", ""),
            citations=citations,
            metadata=result.get("metadata", {}),
            created_at=now,
            completed_at=datetime.now()
        )
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Quick research failed: {str(e)}")
        logger.exception(e)
        raise HTTPException(status_code=500, detail=f"Research failed: {str(e)}")


@router.delete("/{task_id}")
async def delete_research(task_id: str):
    """Delete a research task and its results"""
    if task_id not in _research_tasks:
        raise HTTPException(status_code=404, detail="Research task not found")
    
    del _research_tasks[task_id]
    if task_id in _research_results:
        del _research_results[task_id]
    
    return {"status": "deleted", "task_id": task_id}