Spaces:
Running
Running
| import os | |
| import sys | |
| import logging | |
| from fastapi import FastAPI, BackgroundTasks, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, Field | |
| from typing import List, Dict, Any, Optional | |
| class IngestResponse(BaseModel): | |
| status: str | |
| message: str | |
| class SectorInfo(BaseModel): | |
| name: str | |
| price_key: str | |
| news_key: str | |
| class PredictionMetrics(BaseModel): | |
| accuracy: float | |
| precision: float | |
| recall: float | |
| f1: float | |
| rmse: float | |
| class PredictionDetails(BaseModel): | |
| trend: str | |
| target_trend_code: int | |
| predicted_price: float | |
| confidence: float | |
| metrics: PredictionMetrics | |
| class PredictionResponse(BaseModel): | |
| trained: bool | |
| prediction: Optional[PredictionDetails] = None | |
| message: Optional[str] = None | |
| class BacktestMetricDetail(BaseModel): | |
| total_return: float = Field(alias="Total Return") | |
| cagr: float = Field(alias="CAGR") | |
| annualized_volatility: float = Field(alias="Annualized Volatility") | |
| sharpe_ratio: float = Field(alias="Sharpe Ratio") | |
| sortino_ratio: float = Field(alias="Sortino Ratio") | |
| calmar_ratio: float = Field(alias="Calmar Ratio") | |
| max_drawdown: float = Field(alias="Max Drawdown") | |
| var_95: float = Field(alias="VaR_95") | |
| cvar_95: float = Field(alias="CVaR_95") | |
| win_rate: float = Field(alias="Win Rate") | |
| ci_lower_daily: float = Field(alias="CI_Lower_Daily") | |
| ci_upper_daily: float = Field(alias="CI_Upper_Daily") | |
| model_config = { | |
| "populate_by_name": True | |
| } | |
| class BacktestResponse(BaseModel): | |
| metrics: Dict[str, BacktestMetricDetail] | |
| curves: List[Dict[str, Any]] | |
| # Ensure project root is in path | |
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))) | |
| from src.database.connection import get_connection | |
| from src.database.queries import get_latest_prices, get_latest_news_for_sector, get_market_pulse | |
| from src.ingestion.news_fetcher import run_ingestion | |
| from src.ingestion.fetch_bse_data import main as run_price_ingestion | |
| from src.models.predictor import PricePredictor | |
| from src.backtesting.engine import BacktestEngine | |
| from src.insights.engine import generate_insights | |
| from src.insights.llm import explain_market_condition | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("api_backend") | |
| app = FastAPI( | |
| title="QuantMacro India API", | |
| description="Production-grade API backend providing institutional market intelligence, sentiment analytics, machine learning forecasts, and backtesting metrics for Indian equities.", | |
| version="3.0.0" | |
| ) | |
| # Enable CORS for frontend integration | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| SECTOR_MAP = { | |
| "Banking": {"price": "BANKING_SECTOR", "news": "BSE_BANKEX"}, | |
| "IT": {"price": "IT_SECTOR", "news": "BSE_IT"}, | |
| "Energy": {"price": "ENERGY_SECTOR", "news": "BSE_ENERGY"}, | |
| "Market (Sensex)": {"price": "BSE_SENSEX", "news": "BSE_SENSEX"}, | |
| } | |
| class IngestResponse(BaseModel): | |
| status: str | |
| message: str | |
| class SectorInfo(BaseModel): | |
| name: str | |
| price_key: str | |
| news_key: str | |
| def read_root(): | |
| return { | |
| "title": "Indian Sector Market Intelligence Platform API", | |
| "version": "2.0.0", | |
| "endpoints": [ | |
| "/health", | |
| "/api/sectors", | |
| "/api/prices/{sector}", | |
| "/api/sentiment/{sector}", | |
| "/api/predict/{sector}", | |
| "/api/backtest/{sector}", | |
| "/api/insights/{sector}", | |
| "/api/ingest" | |
| ] | |
| } | |
| def health_check(): | |
| try: | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT 1") | |
| cursor.fetchone() | |
| conn.close() | |
| return {"status": "healthy", "database": "connected"} | |
| except Exception as e: | |
| return {"status": "unhealthy", "error": str(e)} | |
| def get_sectors(): | |
| return [ | |
| SectorInfo(name=k, price_key=v["price"], news_key=v["news"]) | |
| for k, v in SECTOR_MAP.items() | |
| ] | |
| def get_prices(sector: str): | |
| if sector not in SECTOR_MAP: | |
| raise HTTPException(status_code=404, detail="Sector not found") | |
| price_key = SECTOR_MAP[sector]["price"] | |
| conn = get_connection() | |
| try: | |
| df = get_latest_prices(conn) | |
| df_sector = df[df["sector_index"] == price_key].copy() | |
| if df_sector.empty: | |
| return [] | |
| # Prepare data with technical indicators | |
| predictor = PricePredictor() | |
| df_processed, _ = predictor.prepare_data(df_sector, SECTOR_MAP[sector]["news"]) | |
| # Convert date to string | |
| df_processed['date'] = df_processed['date'].dt.strftime('%Y-%m-%d') | |
| # Convert NaN values to None for JSON compliance | |
| import numpy as np | |
| df_clean = df_processed.replace({np.nan: None}) | |
| return df_clean.to_dict(orient="records") | |
| except Exception as e: | |
| logger.error(f"Error fetching prices for {sector}: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| finally: | |
| conn.close() | |
| def get_sentiment(sector: str): | |
| if sector not in SECTOR_MAP: | |
| raise HTTPException(status_code=404, detail="Sector not found") | |
| news_key = SECTOR_MAP[sector]["news"] | |
| conn = get_connection() | |
| try: | |
| df_news = get_latest_news_for_sector(news_key, limit=50, conn=conn) | |
| # Get overall market sentiment pulse | |
| pulse = get_market_pulse(conn) | |
| return { | |
| "news": df_news.to_dict(orient="records") if not df_news.empty else [], | |
| "avg_sentiment": float(pulse.get(news_key, 0.0)) | |
| } | |
| except Exception as e: | |
| logger.error(f"Error fetching sentiment for {sector}: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| finally: | |
| conn.close() | |
| def get_predictions(sector: str): | |
| if sector not in SECTOR_MAP: | |
| raise HTTPException(status_code=404, detail="Sector not found") | |
| price_key = SECTOR_MAP[sector]["price"] | |
| news_key = SECTOR_MAP[sector]["news"] | |
| conn = get_connection() | |
| try: | |
| df = get_latest_prices(conn) | |
| df_sector = df[df["sector_index"] == price_key].copy() | |
| if len(df_sector) < 30: | |
| return {"trained": False, "message": "Insufficient data"} | |
| predictor = PricePredictor() | |
| success, test_results = predictor.train_and_evaluate(df_sector, news_key) | |
| if not success: | |
| return {"trained": False, "message": "Failed to train model"} | |
| pred_trend, pred_price, confidence = predictor.predict_next_day(df_sector, news_key) | |
| return { | |
| "trained": True, | |
| "prediction": { | |
| "trend": "UP" if pred_trend == 1 else "DOWN", | |
| "target_trend_code": int(pred_trend), | |
| "predicted_price": float(pred_price), | |
| "confidence": float(confidence), | |
| "metrics": predictor.metrics | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Prediction error for {sector}: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| finally: | |
| conn.close() | |
| def get_backtest(sector: str): | |
| if sector not in SECTOR_MAP: | |
| raise HTTPException(status_code=404, detail="Sector not found") | |
| price_key = SECTOR_MAP[sector]["price"] | |
| news_key = SECTOR_MAP[sector]["news"] | |
| conn = get_connection() | |
| try: | |
| df = get_latest_prices(conn) | |
| df_sector = df[df["sector_index"] == price_key].copy() | |
| if len(df_sector) < 30: | |
| raise HTTPException(status_code=400, detail="Insufficient data to backtest") | |
| # Train model | |
| predictor = PricePredictor() | |
| success, test_results = predictor.train_and_evaluate(df_sector, news_key) | |
| if not success: | |
| raise HTTPException(status_code=500, detail="Failed to train model for backtest") | |
| # Run Backtester | |
| engine = BacktestEngine(transaction_cost=0.001, slippage=0.0005) | |
| backtest_results = engine.run_backtest(test_results, test_results['predicted_trend']) | |
| # Serialize curves | |
| curves_df = backtest_results["curves"] | |
| curves_df['date'] = curves_df['date'].astype(str) | |
| import numpy as np | |
| curves_clean = curves_df.replace({np.nan: None}) | |
| return { | |
| "metrics": backtest_results["metrics"], | |
| "curves": curves_clean.to_dict(orient="records") | |
| } | |
| except Exception as e: | |
| logger.error(f"Backtesting error for {sector}: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| finally: | |
| conn.close() | |
| def get_insights(sector: str): | |
| if sector not in SECTOR_MAP: | |
| raise HTTPException(status_code=404, detail="Sector not found") | |
| price_key = SECTOR_MAP[sector]["price"] | |
| news_key = SECTOR_MAP[sector]["news"] | |
| conn = get_connection() | |
| try: | |
| df = get_latest_prices(conn) | |
| df_sector = df[df["sector_index"] == price_key].copy() | |
| df_news = get_latest_news_for_sector(news_key, limit=20, conn=conn) | |
| if df_sector.empty: | |
| return {"insights": [], "explanation": "No price data found."} | |
| insights = generate_insights(df_sector, df_news) | |
| # Get model confidence to pass to explanation generator | |
| confidence = None | |
| try: | |
| predictor = PricePredictor() | |
| success, _ = predictor.train_and_evaluate(df_sector, news_key) | |
| if success: | |
| _, _, confidence = predictor.predict_next_day(df_sector, news_key) | |
| except Exception: | |
| pass | |
| headlines = df_news['headline'].tolist() if not df_news.empty else [] | |
| explanation = explain_market_condition(sector, insights, headlines, confidence) | |
| return { | |
| "insights": insights, | |
| "explanation": explanation | |
| } | |
| except Exception as e: | |
| logger.error(f"Insights generation error for {sector}: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| finally: | |
| conn.close() | |
| def async_ingestion_task(): | |
| logger.info("Executing async background ingestion task...") | |
| try: | |
| # Run price and news ingestions sequentially | |
| run_price_ingestion() | |
| run_ingestion() | |
| logger.info("Background ingestion completed successfully.") | |
| except Exception as e: | |
| logger.error(f"Error in background ingestion: {e}") | |
| def trigger_ingestion(background_tasks: BackgroundTasks): | |
| background_tasks.add_task(async_ingestion_task) | |
| return IngestResponse( | |
| status="accepted", | |
| message="Background ingestion pipeline triggered successfully." | |
| ) | |
| from src.agents.graph import run_analysis | |
| from pydantic import BaseModel | |
| class AgentQueryRequest(BaseModel): | |
| question: str | |
| sector: str = "" | |
| class AgentQueryResponse(BaseModel): | |
| answer: str | |
| sources: list | |
| confidence: str | |
| ml_direction: str | |
| ml_probability: float | |
| news_sentiment: float | |
| error: str = "" | |
| async def agent_query(request: AgentQueryRequest): | |
| try: | |
| result = run_analysis(query=request.question, sector=request.sector) | |
| return AgentQueryResponse(**result) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def agent_health(): | |
| return {"status": "ok", "agents": ["retriever", "quant", "analyst"], "llm": "gemini-2.5-flash"} | |