dhruv575
Minimum Volume Filter
09368f6
import os
import json
import time
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, Response
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
import pandas as pd
import numpy as np
from simulation_utils import (
load_and_filter_data,
run_single_fund_simulation,
run_multi_fund_simulation,
run_kelly_simulation
)
# Initialize FastAPI app
app = FastAPI(title="Safe Choices Simulation API")
# Enable CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global variable to cache loaded data
market_data = None
DATA_PATH = "data/all_filtered_markets_full_2025.csv"
# Load market data on startup
@app.on_event("startup")
async def load_data():
global market_data
if os.path.exists(DATA_PATH):
print(f"Loading market data from {DATA_PATH}...")
market_data = load_and_filter_data(DATA_PATH, start_date='2025-01-01')
print(f"Loaded {len(market_data):,} markets")
else:
print(f"Warning: Data file {DATA_PATH} not found!")
# Request models
class SimulationParams(BaseModel):
simType: str # 'single', 'threshold', 'multi', or 'kelly'
startingCapital: float
numSimulations: int
startDate: str
maxDuration: int
minProb7d: float
minProbCurrent: float
daysBefore: int
investmentProbability: float
minVolume: Optional[float] = 1000000 # Default 1 million
targetReturn: Optional[float] = None
numFunds: Optional[int] = None
kellyFraction: Optional[float] = None
edgeEstimate: Optional[str] = None
# API Routes
@app.get("/")
async def read_root():
"""Serve the frontend index.html"""
response = FileResponse("static/index.html")
response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
response.headers["Pragma"] = "no-cache"
response.headers["Expires"] = "0"
return response
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"markets_loaded": len(market_data) if market_data is not None else 0
}
@app.post("/api/simulate")
async def run_simulation(params: SimulationParams):
"""Run simulation based on parameters"""
if market_data is None:
raise HTTPException(status_code=500, detail="Market data not loaded")
if params.numSimulations > 100:
raise HTTPException(status_code=400, detail="Maximum 100 simulations allowed")
if params.numSimulations < 1:
raise HTTPException(status_code=400, detail="At least 1 simulation required")
results = {
"simType": params.simType,
"parameters": params.dict(),
"runs": [],
"summary": {}
}
# Generate a unique seed offset based on current time (makes each run different)
seed_offset = int(time.time() * 1000) % 1000000
# Run simulations
for i in range(params.numSimulations):
if params.simType == 'multi':
if params.numFunds is None or params.numFunds < 2:
raise HTTPException(status_code=400, detail="numFunds required for multi simulation")
result = run_multi_fund_simulation(
df=market_data,
n_funds=params.numFunds,
starting_capital=params.startingCapital,
start_date=params.startDate,
max_duration_days=params.maxDuration,
days_before=params.daysBefore,
min_prob_7d=params.minProb7d,
min_prob_current=params.minProbCurrent,
investment_probability=params.investmentProbability,
target_return=params.targetReturn,
min_volume=params.minVolume,
random_seed=seed_offset + i
)
# Convert to frontend format
run = {
"finalCapital": float(result['portfolio_final_capital']),
"totalReturn": float(result['portfolio_total_return']),
"numTrades": int(result['total_trades']),
"wentBust": bool(result['surviving_funds'] == 0),
"reachedTarget": bool(result.get('funds_reached_target', 0) > 0),
"simulationDays": int(max([f['simulation_days'] for f in result['fund_results']]) if result['fund_results'] else 0),
"survivingFunds": int(result['surviving_funds']),
"fundResults": [
{
"finalCapital": float(f['final_capital']),
"totalReturn": float(f['total_return']),
"numTrades": int(f['num_trades']),
"wentBust": bool(f['went_bust']),
"reachedTarget": bool(f.get('reached_target', False))
}
for f in result['fund_results']
],
"capitalHistory": [] # Simplified for API
}
elif params.simType == 'kelly':
# Kelly criterion simulation
result = run_kelly_simulation(
df=market_data,
starting_capital=params.startingCapital,
start_date=params.startDate,
max_duration_days=params.maxDuration,
days_before=params.daysBefore,
min_prob_7d=params.minProb7d,
min_prob_current=params.minProbCurrent,
investment_probability=params.investmentProbability,
kelly_fraction=params.kellyFraction or 0.5,
edge_estimate=params.edgeEstimate or 'historical',
min_volume=params.minVolume,
random_seed=seed_offset + i
)
# Convert to frontend format
kelly_stats = result.get('kelly_stats', {})
run = {
"finalCapital": float(result['final_capital']),
"totalReturn": float(result['total_return']),
"numTrades": int(result['num_trades']),
"wentBust": bool(result['went_bust']),
"reachedTarget": False,
"simulationDays": int(result['simulation_days']),
"capitalHistory": [
{"day": int(dc['day']), "capital": float(dc['capital'])}
for dc in result['daily_capital']
],
"kellyStats": {
"avgBetSize": float(kelly_stats.get('avg_bet_size', 0)),
"avgEdge": float(kelly_stats.get('avg_edge', 0)),
"betsSkipped": int(kelly_stats.get('bets_skipped', 0)),
"totalOpportunities": int(kelly_stats.get('total_opportunities', 0))
}
}
else:
# Single fund or threshold
result = run_single_fund_simulation(
df=market_data,
starting_capital=params.startingCapital,
start_date=params.startDate,
max_duration_days=params.maxDuration,
days_before=params.daysBefore,
min_prob_7d=params.minProb7d,
min_prob_current=params.minProbCurrent,
investment_probability=params.investmentProbability,
target_return=params.targetReturn,
min_volume=params.minVolume,
random_seed=seed_offset + i
)
# Convert to frontend format
run = {
"finalCapital": float(result['final_capital']),
"totalReturn": float(result['total_return']),
"numTrades": int(result['num_trades']),
"wentBust": bool(result['went_bust']),
"reachedTarget": bool(result.get('reached_target', False)),
"simulationDays": int(result['simulation_days']),
"capitalHistory": [
{"day": int(dc['day']), "capital": float(dc['capital'])}
for dc in result['daily_capital']
]
}
results["runs"].append(run)
# Calculate summary statistics
returns = [r['totalReturn'] for r in results['runs']]
capitals = [r['finalCapital'] for r in results['runs']]
trades = [r['numTrades'] for r in results['runs']]
results["summary"] = {
"avgReturn": float(np.mean(returns)),
"medianReturn": float(np.median(returns)),
"returnVolatility": float(np.std(returns)),
"avgFinalCapital": float(np.mean(capitals)),
"medianFinalCapital": float(np.median(capitals)),
"bustRate": sum(1 for r in results['runs'] if r['wentBust']) / len(results['runs']),
"positiveReturnRate": sum(1 for r in returns if r > 0) / len(returns),
"avgTrades": float(np.mean(trades)),
"return5th": float(np.percentile(returns, 5)),
"return95th": float(np.percentile(returns, 95)),
"maxDrawdown": float(abs(min(returns)))
}
# Type-specific stats
if params.simType == 'threshold' and params.targetReturn:
reached_target = sum(1 for r in results['runs'] if r['reachedTarget'])
results["summary"]["targetReachedRate"] = reached_target / len(results['runs'])
avg_time = np.mean([r['simulationDays'] for r in results['runs'] if r['reachedTarget']]) if reached_target > 0 else 0
results["summary"]["avgTimeToTarget"] = float(avg_time)
if params.simType == 'multi':
surviving_funds = [r['survivingFunds'] for r in results['runs']]
results["summary"]["avgSurvivingFunds"] = float(np.mean(surviving_funds))
results["summary"]["survivorshipRate"] = float(np.mean(surviving_funds) / params.numFunds)
results["summary"]["portfolioBustRate"] = sum(1 for s in surviving_funds if s == 0) / len(surviving_funds)
if params.simType == 'kelly':
kelly_runs = [r for r in results['runs'] if 'kellyStats' in r]
if kelly_runs:
avg_bet_sizes = [r['kellyStats']['avgBetSize'] for r in kelly_runs]
avg_edges = [r['kellyStats']['avgEdge'] for r in kelly_runs]
bets_skipped = [r['kellyStats']['betsSkipped'] for r in kelly_runs]
results["summary"]["avgBetSize"] = float(np.mean(avg_bet_sizes))
results["summary"]["avgEdge"] = float(np.mean(avg_edges))
results["summary"]["betsSkipped"] = float(np.mean(bets_skipped))
return results
# Mount static files with no-cache headers
from starlette.responses import Response
class NoCacheStaticFiles(StaticFiles):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def __call__(self, scope, receive, send):
async def send_wrapper(message):
if message["type"] == "http.response.start":
# Add no-cache headers
headers = dict(message.get("headers", []))
headers[b"cache-control"] = b"no-cache, no-store, must-revalidate"
headers[b"pragma"] = b"no-cache"
headers[b"expires"] = b"0"
message["headers"] = list(headers.items())
await send(message)
await super().__call__(scope, receive, send_wrapper)
app.mount("/static", NoCacheStaticFiles(directory="static"), name="static")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)