SiddharthVenba's picture
Initial commit for HF Space
75d9b3c
Raw
History Blame Contribute Delete
4.14 kB
import logging
import os
from pathlib import Path
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
import uvicorn
from pydantic import BaseModel
# Import our quant engine modules
from backend.agents.scan_pipeline import run_full_scan
from backend.agents.supervisor_agent import invoke_supervisor
from backend.data.store import get_store
# Setup logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
# Initialize FastAPI App
app = FastAPI(title="Swing Quant Engine API", version="2.0.0")
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Paths
BASE_DIR = Path(__file__).parent.parent
FRONTEND_DIR = BASE_DIR / "frontend"
FRONTEND_DIR.mkdir(exist_ok=True)
(FRONTEND_DIR / "css").mkdir(exist_ok=True)
(FRONTEND_DIR / "js").mkdir(exist_ok=True)
# Mount static files
app.mount("/css", StaticFiles(directory=str(FRONTEND_DIR / "css")), name="css")
app.mount("/js", StaticFiles(directory=str(FRONTEND_DIR / "js")), name="js")
# ── In-memory cache for last scan results ──
_last_scan_result: dict | None = None
class ChatRequest(BaseModel):
query: str
class ScanRequest(BaseModel):
market: str = "both"
top_n: int = 10
@app.get("/", response_class=HTMLResponse)
async def serve_index():
index_path = FRONTEND_DIR / "index.html"
if index_path.exists():
return index_path.read_text()
return "Frontend not built yet. Create frontend/index.html"
@app.get("/api/dashboard")
async def get_dashboard_data():
"""Get the latest saved signals and stats for the dashboard."""
global _last_scan_result
store = get_store()
history = store.get_scan_history(limit=1)
if not history:
return JSONResponse({"status": "no_data", "signals": [], "stats": {}, "pipeline_stages": [], "regimes": {}})
signals = store.get_active_signals(limit=10)
# Use cached pipeline stages from last scan if available
pipeline_stages = []
regimes = {}
stats = {
"universe_size": history[0]["universe_size"],
"raw_signals": history[0]["signals_found"],
"final_signals": len(signals),
}
if _last_scan_result:
pipeline_stages = _last_scan_result.get("pipeline_stages", [])
regimes = _last_scan_result.get("regimes", {})
stats = _last_scan_result.get("stats", stats)
return {
"status": "success",
"last_scan_date": history[0]["created_at"] * 1000,
"stats": stats,
"signals": signals,
"pipeline_stages": pipeline_stages,
"regimes": regimes,
}
@app.post("/api/scan")
async def trigger_scan(req: ScanRequest):
"""Trigger a new full scan with step-by-step pipeline tracking."""
global _last_scan_result
logger.info(f"Triggering manual scan for market: {req.market}")
results = run_full_scan(market=req.market, top_n=req.top_n)
if "error" in results and not results.get("pipeline_stages"):
return JSONResponse({"status": "error", "message": results["error"]}, status_code=500)
# Cache the full result for dashboard
_last_scan_result = results
return {
"status": "success",
"stats": results.get("stats"),
"pipeline_stages": results.get("pipeline_stages", []),
"regimes": results.get("regimes", {}),
"signals": results.get("signals", []),
}
@app.post("/api/chat")
async def chat_with_supervisor(req: ChatRequest):
"""Chat with the LangGraph supervisor."""
try:
response = invoke_supervisor(req.query)
return {"status": "success", "response": response}
except Exception as e:
logger.error(f"Chat error: {e}")
return JSONResponse({"status": "error", "message": str(e)}, status_code=500)
if __name__ == "__main__":
uvicorn.run("backend.server:app", host="0.0.0.0", port=8001, reload=True)