quantmacro-india / src /api /main.py
kingbros919's picture
Upload folder using huggingface_hub
292b308 verified
Raw
History Blame Contribute Delete
12.5 kB
import os
import sys
import logging
from fastapi import FastAPI, BackgroundTasks, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional
class IngestResponse(BaseModel):
status: str
message: str
class SectorInfo(BaseModel):
name: str
price_key: str
news_key: str
class PredictionMetrics(BaseModel):
accuracy: float
precision: float
recall: float
f1: float
rmse: float
class PredictionDetails(BaseModel):
trend: str
target_trend_code: int
predicted_price: float
confidence: float
metrics: PredictionMetrics
class PredictionResponse(BaseModel):
trained: bool
prediction: Optional[PredictionDetails] = None
message: Optional[str] = None
class BacktestMetricDetail(BaseModel):
total_return: float = Field(alias="Total Return")
cagr: float = Field(alias="CAGR")
annualized_volatility: float = Field(alias="Annualized Volatility")
sharpe_ratio: float = Field(alias="Sharpe Ratio")
sortino_ratio: float = Field(alias="Sortino Ratio")
calmar_ratio: float = Field(alias="Calmar Ratio")
max_drawdown: float = Field(alias="Max Drawdown")
var_95: float = Field(alias="VaR_95")
cvar_95: float = Field(alias="CVaR_95")
win_rate: float = Field(alias="Win Rate")
ci_lower_daily: float = Field(alias="CI_Lower_Daily")
ci_upper_daily: float = Field(alias="CI_Upper_Daily")
model_config = {
"populate_by_name": True
}
class BacktestResponse(BaseModel):
metrics: Dict[str, BacktestMetricDetail]
curves: List[Dict[str, Any]]
# Ensure project root is in path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")))
from src.database.connection import get_connection
from src.database.queries import get_latest_prices, get_latest_news_for_sector, get_market_pulse
from src.ingestion.news_fetcher import run_ingestion
from src.ingestion.fetch_bse_data import main as run_price_ingestion
from src.models.predictor import PricePredictor
from src.backtesting.engine import BacktestEngine
from src.insights.engine import generate_insights
from src.insights.llm import explain_market_condition
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("api_backend")
app = FastAPI(
title="QuantMacro India API",
description="Production-grade API backend providing institutional market intelligence, sentiment analytics, machine learning forecasts, and backtesting metrics for Indian equities.",
version="3.0.0"
)
# Enable CORS for frontend integration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
SECTOR_MAP = {
"Banking": {"price": "BANKING_SECTOR", "news": "BSE_BANKEX"},
"IT": {"price": "IT_SECTOR", "news": "BSE_IT"},
"Energy": {"price": "ENERGY_SECTOR", "news": "BSE_ENERGY"},
"Market (Sensex)": {"price": "BSE_SENSEX", "news": "BSE_SENSEX"},
}
class IngestResponse(BaseModel):
status: str
message: str
class SectorInfo(BaseModel):
name: str
price_key: str
news_key: str
@app.get("/")
def read_root():
return {
"title": "Indian Sector Market Intelligence Platform API",
"version": "2.0.0",
"endpoints": [
"/health",
"/api/sectors",
"/api/prices/{sector}",
"/api/sentiment/{sector}",
"/api/predict/{sector}",
"/api/backtest/{sector}",
"/api/insights/{sector}",
"/api/ingest"
]
}
@app.get("/health")
def health_check():
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.fetchone()
conn.close()
return {"status": "healthy", "database": "connected"}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
@app.get("/api/sectors", response_model=List[SectorInfo])
def get_sectors():
return [
SectorInfo(name=k, price_key=v["price"], news_key=v["news"])
for k, v in SECTOR_MAP.items()
]
@app.get("/api/prices/{sector}")
def get_prices(sector: str):
if sector not in SECTOR_MAP:
raise HTTPException(status_code=404, detail="Sector not found")
price_key = SECTOR_MAP[sector]["price"]
conn = get_connection()
try:
df = get_latest_prices(conn)
df_sector = df[df["sector_index"] == price_key].copy()
if df_sector.empty:
return []
# Prepare data with technical indicators
predictor = PricePredictor()
df_processed, _ = predictor.prepare_data(df_sector, SECTOR_MAP[sector]["news"])
# Convert date to string
df_processed['date'] = df_processed['date'].dt.strftime('%Y-%m-%d')
# Convert NaN values to None for JSON compliance
import numpy as np
df_clean = df_processed.replace({np.nan: None})
return df_clean.to_dict(orient="records")
except Exception as e:
logger.error(f"Error fetching prices for {sector}: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
conn.close()
@app.get("/api/sentiment/{sector}")
def get_sentiment(sector: str):
if sector not in SECTOR_MAP:
raise HTTPException(status_code=404, detail="Sector not found")
news_key = SECTOR_MAP[sector]["news"]
conn = get_connection()
try:
df_news = get_latest_news_for_sector(news_key, limit=50, conn=conn)
# Get overall market sentiment pulse
pulse = get_market_pulse(conn)
return {
"news": df_news.to_dict(orient="records") if not df_news.empty else [],
"avg_sentiment": float(pulse.get(news_key, 0.0))
}
except Exception as e:
logger.error(f"Error fetching sentiment for {sector}: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
conn.close()
@app.get("/api/predict/{sector}", response_model=PredictionResponse)
def get_predictions(sector: str):
if sector not in SECTOR_MAP:
raise HTTPException(status_code=404, detail="Sector not found")
price_key = SECTOR_MAP[sector]["price"]
news_key = SECTOR_MAP[sector]["news"]
conn = get_connection()
try:
df = get_latest_prices(conn)
df_sector = df[df["sector_index"] == price_key].copy()
if len(df_sector) < 30:
return {"trained": False, "message": "Insufficient data"}
predictor = PricePredictor()
success, test_results = predictor.train_and_evaluate(df_sector, news_key)
if not success:
return {"trained": False, "message": "Failed to train model"}
pred_trend, pred_price, confidence = predictor.predict_next_day(df_sector, news_key)
return {
"trained": True,
"prediction": {
"trend": "UP" if pred_trend == 1 else "DOWN",
"target_trend_code": int(pred_trend),
"predicted_price": float(pred_price),
"confidence": float(confidence),
"metrics": predictor.metrics
}
}
except Exception as e:
logger.error(f"Prediction error for {sector}: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
conn.close()
@app.get("/api/backtest/{sector}", response_model=BacktestResponse)
def get_backtest(sector: str):
if sector not in SECTOR_MAP:
raise HTTPException(status_code=404, detail="Sector not found")
price_key = SECTOR_MAP[sector]["price"]
news_key = SECTOR_MAP[sector]["news"]
conn = get_connection()
try:
df = get_latest_prices(conn)
df_sector = df[df["sector_index"] == price_key].copy()
if len(df_sector) < 30:
raise HTTPException(status_code=400, detail="Insufficient data to backtest")
# Train model
predictor = PricePredictor()
success, test_results = predictor.train_and_evaluate(df_sector, news_key)
if not success:
raise HTTPException(status_code=500, detail="Failed to train model for backtest")
# Run Backtester
engine = BacktestEngine(transaction_cost=0.001, slippage=0.0005)
backtest_results = engine.run_backtest(test_results, test_results['predicted_trend'])
# Serialize curves
curves_df = backtest_results["curves"]
curves_df['date'] = curves_df['date'].astype(str)
import numpy as np
curves_clean = curves_df.replace({np.nan: None})
return {
"metrics": backtest_results["metrics"],
"curves": curves_clean.to_dict(orient="records")
}
except Exception as e:
logger.error(f"Backtesting error for {sector}: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
conn.close()
@app.get("/api/insights/{sector}")
def get_insights(sector: str):
if sector not in SECTOR_MAP:
raise HTTPException(status_code=404, detail="Sector not found")
price_key = SECTOR_MAP[sector]["price"]
news_key = SECTOR_MAP[sector]["news"]
conn = get_connection()
try:
df = get_latest_prices(conn)
df_sector = df[df["sector_index"] == price_key].copy()
df_news = get_latest_news_for_sector(news_key, limit=20, conn=conn)
if df_sector.empty:
return {"insights": [], "explanation": "No price data found."}
insights = generate_insights(df_sector, df_news)
# Get model confidence to pass to explanation generator
confidence = None
try:
predictor = PricePredictor()
success, _ = predictor.train_and_evaluate(df_sector, news_key)
if success:
_, _, confidence = predictor.predict_next_day(df_sector, news_key)
except Exception:
pass
headlines = df_news['headline'].tolist() if not df_news.empty else []
explanation = explain_market_condition(sector, insights, headlines, confidence)
return {
"insights": insights,
"explanation": explanation
}
except Exception as e:
logger.error(f"Insights generation error for {sector}: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
conn.close()
def async_ingestion_task():
logger.info("Executing async background ingestion task...")
try:
# Run price and news ingestions sequentially
run_price_ingestion()
run_ingestion()
logger.info("Background ingestion completed successfully.")
except Exception as e:
logger.error(f"Error in background ingestion: {e}")
@app.post("/api/ingest", response_model=IngestResponse)
def trigger_ingestion(background_tasks: BackgroundTasks):
background_tasks.add_task(async_ingestion_task)
return IngestResponse(
status="accepted",
message="Background ingestion pipeline triggered successfully."
)
from src.agents.graph import run_analysis
from pydantic import BaseModel
class AgentQueryRequest(BaseModel):
question: str
sector: str = ""
class AgentQueryResponse(BaseModel):
answer: str
sources: list
confidence: str
ml_direction: str
ml_probability: float
news_sentiment: float
error: str = ""
@app.post("/agent/query", response_model=AgentQueryResponse)
async def agent_query(request: AgentQueryRequest):
try:
result = run_analysis(query=request.question, sector=request.sector)
return AgentQueryResponse(**result)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/agent/health")
async def agent_health():
return {"status": "ok", "agents": ["retriever", "quant", "analyst"], "llm": "gemini-2.5-flash"}