File size: 18,889 Bytes
b81b0cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
"""
FinResearch AI - FastAPI REST API Backend

This module provides a REST API wrapper around the CrewAI research workflow.
Supports asynchronous execution to prevent blocking the server during long-running
agent workflows.

Endpoints:
    POST /api/research - Execute research workflow for a stock ticker
    GET  /api/health   - Health check endpoint
    GET  /api/status/{task_id} - Check status of async research task
    
Production:
    Serves React frontend static assets when built
"""

import asyncio
import logging
import os
import uuid
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional

from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, JSONResponse
from pydantic import BaseModel, Field

# Add parent to path for imports
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))

from src.config.settings import get_settings, setup_logging
from src.crew import FinResearchCrew, SequentialFinResearchCrew, CrewExecutionResult
from src.tools.memory import MemoryTool


# =============================================================================
# Configuration
# =============================================================================

logger = logging.getLogger(__name__)

# Thread pool for running synchronous CrewAI in async context
executor = ThreadPoolExecutor(max_workers=3)

# In-memory task store (production would use Redis/DB)
task_store: Dict[str, "ResearchTask"] = {}


# =============================================================================
# Pydantic Models - API Schema
# =============================================================================

class ModelProvider(str, Enum):
    """Supported LLM providers."""
    OPENAI = "openai"
    GROQ = "groq"


class ResearchRequest(BaseModel):
    """Request body for POST /api/research."""
    ticker: str = Field(
        ...,
        min_length=1,
        max_length=10,
        description="Stock ticker symbol (e.g., AAPL, TSLA)",
        examples=["AAPL", "TSLA", "GOOGL"]
    )
    company_name: Optional[str] = Field(
        default=None,
        description="Optional company name for context"
    )
    reset_memory: bool = Field(
        default=False,
        description="Clear ChromaDB memory before starting"
    )
    model_provider: ModelProvider = Field(
        default=ModelProvider.OPENAI,
        description="LLM provider to use"
    )
    sequential_mode: bool = Field(
        default=True,
        description="Use sequential process (recommended for stability)"
    )

    class Config:
        json_schema_extra = {
            "example": {
                "ticker": "AAPL",
                "company_name": "Apple Inc",
                "reset_memory": False,
                "model_provider": "openai",
                "sequential_mode": True
            }
        }


class TaskStatus(str, Enum):
    """Status of an async research task."""
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"


class ResearchTask(BaseModel):
    """Model for tracking async research tasks."""
    task_id: str = Field(..., description="Unique task identifier")
    ticker: str = Field(..., description="Stock ticker being researched")
    status: TaskStatus = Field(default=TaskStatus.PENDING)
    created_at: datetime = Field(default_factory=datetime.now)
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    progress_logs: List[str] = Field(default_factory=list)
    report: Optional[str] = None
    error_message: Optional[str] = None
    
    class Config:
        arbitrary_types_allowed = True


class ResearchResponse(BaseModel):
    """Response body for POST /api/research (sync mode)."""
    success: bool = Field(..., description="Whether research completed successfully")
    ticker: str = Field(..., description="Stock ticker researched")
    report: Optional[str] = Field(default=None, description="Markdown report content")
    logs: List[str] = Field(default_factory=list, description="Execution logs")
    duration_seconds: Optional[float] = Field(default=None)
    error: Optional[str] = Field(default=None, description="Error message if failed")
    
    class Config:
        json_schema_extra = {
            "example": {
                "success": True,
                "ticker": "AAPL",
                "report": "# Financial Research Report: AAPL\n\n## Executive Summary...",
                "logs": ["Starting research...", "Research complete"],
                "duration_seconds": 45.2,
                "error": None
            }
        }


class AsyncResearchResponse(BaseModel):
    """Response for async research initiation."""
    task_id: str = Field(..., description="Task ID to poll for results")
    status: TaskStatus
    message: str


class HealthResponse(BaseModel):
    """Response for health check endpoint."""
    status: str = "healthy"
    version: str = "1.0.0"
    timestamp: datetime = Field(default_factory=datetime.now)


# =============================================================================
# FastAPI Application Setup
# =============================================================================

def create_app() -> FastAPI:
    """
    Create and configure the FastAPI application.
    
    Returns:
        Configured FastAPI instance
    """
    app = FastAPI(
        title="FinResearch AI API",
        description="""
        Multi-Agent Financial Research System API.
        
        This API provides endpoints to execute AI-powered financial research
        workflows using CrewAI agents.
        
        ## Features
        - Async research execution
        - Multiple LLM provider support
        - ChromaDB memory persistence
        - Structured markdown reports
        """,
        version="1.0.0",
        docs_url="/api/docs",
        redoc_url="/api/redoc",
        openapi_url="/api/openapi.json",
    )
    
    # CORS middleware for development
    app.add_middleware(
        CORSMiddleware,
        allow_origins=[
            "http://localhost:5173",  # Vite dev server
            "http://localhost:3000",  # Alternative React port
            "http://127.0.0.1:5173",
            "http://127.0.0.1:3000",
        ],
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )
    
    return app


app = create_app()


# =============================================================================
# Helper Functions
# =============================================================================

def reset_memory_store() -> str:
    """Reset ChromaDB memory for fresh research."""
    try:
        memory_tool = MemoryTool()
        if memory_tool._collection is not None:
            result = memory_tool._clear()
            logger.info(f"Memory reset: {result}")
            return result
        return "Memory not initialized"
    except Exception as e:
        logger.error(f"Failed to reset memory: {e}")
        return f"Failed: {str(e)}"


def run_research_sync(
    ticker: str,
    company_name: Optional[str],
    sequential: bool,
    verbose: bool = False
) -> tuple[str, CrewExecutionResult]:
    """
    Run research workflow synchronously.
    
    This is the core execution function that runs in a thread pool.
    
    Args:
        ticker: Stock ticker symbol
        company_name: Optional company name
        sequential: Whether to use sequential process
        verbose: Enable verbose logging
        
    Returns:
        Tuple of (report_content, execution_result)
    """
    logger.info(f"Starting research for {ticker} (sequential={sequential})")
    
    if sequential:
        crew = SequentialFinResearchCrew(
            ticker=ticker,
            company_name=company_name or ticker,
            verbose=verbose
        )
    else:
        crew = FinResearchCrew(
            ticker=ticker,
            company_name=company_name or ticker,
            verbose=verbose
        )
    
    result = crew.run()
    execution_result = crew.get_execution_result()
    
    # Save report
    crew.save_report(result, filename=f"{ticker}_report.md")
    
    return result, execution_result


async def run_research_async(
    ticker: str,
    company_name: Optional[str],
    sequential: bool
) -> tuple[str, CrewExecutionResult]:
    """
    Run research workflow asynchronously using thread pool.
    
    Args:
        ticker: Stock ticker symbol
        company_name: Optional company name
        sequential: Whether to use sequential process
        
    Returns:
        Tuple of (report_content, execution_result)
    """
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(
        executor,
        run_research_sync,
        ticker,
        company_name,
        sequential,
        False  # verbose
    )


async def execute_research_task(task_id: str, request: ResearchRequest):
    """
    Background task to execute research and update task store.
    
    Args:
        task_id: Unique task identifier
        request: Research request parameters
    """
    task = task_store.get(task_id)
    if not task:
        logger.error(f"Task {task_id} not found in store")
        return
    
    try:
        task.status = TaskStatus.RUNNING
        task.started_at = datetime.now()
        task.progress_logs.append(f"Starting research for {request.ticker}...")
        
        # Reset memory if requested
        if request.reset_memory:
            task.progress_logs.append("Clearing memory...")
            reset_memory_store()
        
        # Run research
        task.progress_logs.append("Executing agent workflow...")
        report, execution_result = await run_research_async(
            ticker=request.ticker.upper(),
            company_name=request.company_name,
            sequential=request.sequential_mode
        )
        
        # Update task with results
        task.status = TaskStatus.COMPLETED
        task.completed_at = datetime.now()
        task.report = report
        task.progress_logs.append("Research completed successfully!")
        
        if execution_result and execution_result.duration_seconds:
            task.progress_logs.append(
                f"Duration: {execution_result.duration_seconds:.1f}s"
            )
        
    except Exception as e:
        logger.exception(f"Research task {task_id} failed")
        task.status = TaskStatus.FAILED
        task.completed_at = datetime.now()
        task.error_message = str(e)
        task.progress_logs.append(f"Error: {str(e)}")


# =============================================================================
# API Routes
# =============================================================================

@app.get("/api/health", response_model=HealthResponse, tags=["System"])
async def health_check():
    """
    Health check endpoint.
    
    Returns the API status and version information.
    """
    return HealthResponse()


@app.post("/api/research", response_model=ResearchResponse, tags=["Research"])
async def create_research(request: ResearchRequest):
    """
    Execute a financial research workflow (synchronous).
    
    This endpoint runs the full CrewAI research workflow and returns
    the completed report. For long-running research, consider using
    the async endpoint `/api/research/async`.
    
    Args:
        request: Research parameters including ticker and options
        
    Returns:
        Research response with report content and execution logs
    """
    ticker = request.ticker.strip().upper()
    logs: List[str] = []
    
    # Validate environment
    settings = get_settings()
    if not settings.openai_api_key:
        raise HTTPException(
            status_code=500,
            detail="OPENAI_API_KEY not configured on server"
        )
    
    logs.append(f"Starting research for {ticker}...")
    
    try:
        # Reset memory if requested
        if request.reset_memory:
            logs.append("Resetting memory store...")
            reset_memory_store()
        
        # Run research asynchronously (non-blocking)
        logs.append("Executing agent workflow...")
        report, execution_result = await run_research_async(
            ticker=ticker,
            company_name=request.company_name,
            sequential=request.sequential_mode
        )
        
        logs.append("Research completed successfully!")
        
        return ResearchResponse(
            success=True,
            ticker=ticker,
            report=report,
            logs=logs,
            duration_seconds=execution_result.duration_seconds if execution_result else None,
            error=None
        )
        
    except Exception as e:
        logger.exception(f"Research failed for {ticker}")
        logs.append(f"Error: {str(e)}")
        
        return ResearchResponse(
            success=False,
            ticker=ticker,
            report=None,
            logs=logs,
            error=str(e)
        )


@app.post("/api/research/async", response_model=AsyncResearchResponse, tags=["Research"])
async def create_research_async(
    request: ResearchRequest,
    background_tasks: BackgroundTasks
):
    """
    Initiate an asynchronous research workflow.
    
    Returns a task ID that can be polled via `/api/status/{task_id}`.
    
    Args:
        request: Research parameters
        background_tasks: FastAPI background task handler
        
    Returns:
        Task ID and initial status
    """
    # Validate environment
    settings = get_settings()
    if not settings.openai_api_key:
        raise HTTPException(
            status_code=500,
            detail="OPENAI_API_KEY not configured on server"
        )
    
    # Create task
    task_id = str(uuid.uuid4())
    task = ResearchTask(
        task_id=task_id,
        ticker=request.ticker.upper(),
        status=TaskStatus.PENDING
    )
    task_store[task_id] = task
    
    # Schedule background execution
    background_tasks.add_task(execute_research_task, task_id, request)
    
    return AsyncResearchResponse(
        task_id=task_id,
        status=TaskStatus.PENDING,
        message=f"Research task created for {request.ticker.upper()}"
    )


@app.get("/api/status/{task_id}", response_model=ResearchTask, tags=["Research"])
async def get_task_status(task_id: str):
    """
    Get the status of an async research task.
    
    Args:
        task_id: Task identifier from `/api/research/async`
        
    Returns:
        Current task status, logs, and report (if completed)
    """
    task = task_store.get(task_id)
    if not task:
        raise HTTPException(status_code=404, detail=f"Task {task_id} not found")
    
    return task


@app.delete("/api/memory", tags=["System"])
async def clear_memory():
    """
    Clear the ChromaDB memory store.
    
    Useful for starting fresh research without previous context.
    """
    result = reset_memory_store()
    return {"message": result}


# =============================================================================
# Static File Serving (Production)
# =============================================================================

# Path to React build output - supports both local dev and HF Spaces deployment
# HF Spaces: /app/static (copied during Docker build)
# Local dev: ./frontend/dist (built locally)
HF_STATIC_PATH = Path(__file__).parent.parent / "static"
FRONTEND_BUILD_PATH = Path(__file__).parent.parent / "frontend" / "dist"

# Determine which static path to use
def get_static_path() -> Path:
    """Get the correct static files path based on environment."""
    if HF_STATIC_PATH.exists():
        return HF_STATIC_PATH  # Hugging Face Spaces deployment
    return FRONTEND_BUILD_PATH  # Local development

STATIC_PATH = get_static_path()

# Mount static assets immediately if they exist
if STATIC_PATH.exists() and (STATIC_PATH / "assets").exists():
    app.mount(
        "/assets",
        StaticFiles(directory=STATIC_PATH / "assets"),
        name="assets"
    )


@app.on_event("startup")
async def startup_event():
    """Configure app on startup."""
    setup_logging(level="INFO")
    logger.info("FinResearch API starting up...")
    
    # Log frontend status
    if STATIC_PATH.exists():
        logger.info(f"Serving frontend from {STATIC_PATH}")
    else:
        logger.warning(f"Frontend build not found")
        logger.warning("Run 'npm run build' in frontend/ to build the UI")


@app.get("/", tags=["Frontend"])
async def serve_frontend():
    """
    Serve the React frontend application.
    
    In production, this serves the built React app.
    In development, redirect to Vite dev server.
    """
    index_path = STATIC_PATH / "index.html"
    
    if index_path.exists():
        return FileResponse(index_path)
    else:
        # Development mode - frontend served by Vite
        return JSONResponse(
            content={
                "message": "Frontend not built. Use Vite dev server at http://localhost:5173",
                "api_docs": "/api/docs"
            }
        )


@app.get("/vite.svg", tags=["Frontend"])
async def serve_vite_svg():
    """Serve the Vite favicon."""
    # Try HF Spaces path first, then local dev path
    for base_path in [HF_STATIC_PATH, FRONTEND_BUILD_PATH.parent / "public"]:
        svg_path = base_path / "vite.svg" if "static" in str(base_path) else base_path / "vite.svg"
        if svg_path.exists():
            return FileResponse(svg_path, media_type="image/svg+xml")
    raise HTTPException(status_code=404, detail="Favicon not found")


# Catch-all for SPA routing (must be last)
@app.get("/{full_path:path}", tags=["Frontend"])
async def serve_spa(full_path: str):
    """
    Catch-all route for SPA client-side routing.
    
    All non-API routes serve the React app's index.html.
    """
    # Don't interfere with API routes
    if full_path.startswith("api/"):
        raise HTTPException(status_code=404, detail="API endpoint not found")
    
    index_path = STATIC_PATH / "index.html"
    
    if index_path.exists():
        return FileResponse(index_path)
    else:
        raise HTTPException(
            status_code=404,
            detail="Frontend not built. Run 'npm run build' in frontend/"
        )


# =============================================================================
# Development Entry Point
# =============================================================================

if __name__ == "__main__":
    import uvicorn
    
    uvicorn.run(
        "src.api:app",
        host="0.0.0.0",
        port=8000,
        reload=True,
        log_level="info"
    )