opt / app.py
dhruv575
Potential fixes
e7b1c28
import gradio as gr
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('Agg') # Use Agg backend for non-interactive plotting
import json
import os
from fastapi import FastAPI, Request, Response, Body
from fastapi.responses import JSONResponse, HTMLResponse, FileResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from pathlib import Path
import numpy as np
import datetime
import math
from utils import load_data, filter_data, load_factor_data, filter_factors
from optimization import run_ogd, plot_optimization_results, compute_sharpe, compute_max_drawdown, compute_alpha
# Import benchmark functions and metrics
from benchmarks import (
run_equal_weight,
run_random_portfolio,
calculate_cumulative_returns,
calculate_performance_metrics
)
# --- Custom JSON Encoder to handle inf and NaN values ---
class CustomJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, float):
if math.isnan(obj):
return None # Convert NaN to null
if math.isinf(obj):
if obj > 0:
return 1.0e308 # Very large number instead of infinity
else:
return -1.0e308 # Very negative number instead of -infinity
return super().default(obj)
# --- Function to safely convert values for JSON ---
def safe_json_value(value):
if isinstance(value, float):
if math.isnan(value) or math.isinf(value):
return None
return value
# --- Load Data Globally ---
stock_data_df, rf_data_df = load_data()
factor_data_df = load_factor_data()
if stock_data_df is None or rf_data_df is None:
print("Exiting application due to data loading error.")
exit()
# --- Create FastAPI app ---
app = FastAPI()
# --- Enable CORS for all origins ---
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all origins
allow_credentials=True,
allow_methods=["*"], # Allow all methods
allow_headers=["*"], # Allow all headers
)
# --- Setup Static Files ---
static_dir = Path(__file__).parent / "assets" / "static"
if not static_dir.exists():
static_dir.mkdir(parents=True, exist_ok=True)
# Mount static files with a name that's more unique to avoid conflicts
app.mount("/portfolio_static", StaticFiles(directory=static_dir), name="portfolio_static")
# For backward compatibility, also mount at /static
app.mount("/static", StaticFiles(directory=static_dir), name="static")
# --- Main Optimization Pipeline Function ---
def run_optimization_pipeline(
start_date, end_date, tickers_str,
window_size, learning_rate,
alpha_sortino, alpha_max_drawdown, alpha_turnover, alpha_concentration=0.25,
enp_min=5.0, enp_max=20.0
):
"""Runs the full pipeline: filter data -> run OGD & benchmarks -> calculate metrics & plots -> return results."""
if not tickers_str:
tickers = None
else:
tickers = [t.strip().upper() for t in tickers_str.split(',')]
try:
window_size = int(window_size)
learning_rate = float(learning_rate)
alphas = [
float(alpha_sortino),
float(alpha_max_drawdown),
float(alpha_turnover),
float(alpha_concentration)
]
enp_min = float(enp_min)
enp_max = float(enp_max)
except ValueError as e:
return {"error": f"Invalid hyperparameter input. Details: {e}"}
print(f"Filtering data: Start={start_date}, End={end_date}, Tickers={tickers}")
# 2. Filter Data
filtered_df = filter_data(stock_data_df, rf_data_df, start_date, end_date, tickers)
filtered_factors = filter_factors(factor_data_df, start_date, end_date)
if filtered_df is None or filtered_df.empty:
return {"error": "Filtering resulted in empty data. Cannot run."}
# Extract risk-free series for metric calculation
rf_series = filtered_df['rf']
print(f"Running OGD: Window={window_size}, LR={learning_rate}, Alphas={alphas}, ENP Range={enp_min}-{enp_max}")
# 3. Run OGD with enhanced parameters
try:
ogd_weights_df, ogd_returns_series = run_ogd(
filtered_df, window_size=window_size, learning_rate=learning_rate,
alphas=alphas, enp_min=enp_min, enp_max=enp_max, use_tqdm=True,
factor_data=filtered_factors
)
if ogd_weights_df.empty or ogd_returns_series.empty:
return {"error": "OGD failed or returned empty results."}
except Exception as e:
return {"error": f"OGD optimization failed: {str(e)}"}
print("Running Benchmarks...")
# 4. Run Benchmarks
try:
equal_weight_returns = run_equal_weight(filtered_df)
random_portfolio_returns = run_random_portfolio(filtered_df)
except Exception as e:
return {"error": f"Benchmark calculation failed: {str(e)}"}
# 5. Calculate Metrics & Cumulative Returns
try:
ogd_metrics = calculate_performance_metrics(ogd_returns_series, rf_series)
ew_metrics = calculate_performance_metrics(equal_weight_returns, rf_series)
rp_metrics = calculate_performance_metrics(random_portfolio_returns, rf_series)
except Exception as e:
return {"error": f"Performance metrics calculation failed: {str(e)}"}
# Calculate factor model alphas if factor data is available
factor_metrics = {}
if filtered_factors is not None and not filtered_factors.empty:
try:
# Align dates between returns and factors
common_dates = filtered_factors.index.intersection(ogd_returns_series.index)
if len(common_dates) > 30: # Ensure we have enough data points for regression
# Calculate CAPM and FF3 alphas for each strategy
ogd_capm_alpha, _ = compute_alpha(
ogd_returns_series.loc[common_dates],
rf_series.loc[common_dates],
filtered_factors.loc[common_dates],
model="CAPM"
)
ogd_ff3_alpha, _ = compute_alpha(
ogd_returns_series.loc[common_dates],
rf_series.loc[common_dates],
filtered_factors.loc[common_dates],
model="FF3"
)
ew_capm_alpha, _ = compute_alpha(
equal_weight_returns.loc[common_dates],
rf_series.loc[common_dates],
filtered_factors.loc[common_dates],
model="CAPM"
)
ew_ff3_alpha, _ = compute_alpha(
equal_weight_returns.loc[common_dates],
rf_series.loc[common_dates],
filtered_factors.loc[common_dates],
model="FF3"
)
rp_capm_alpha, _ = compute_alpha(
random_portfolio_returns.loc[common_dates],
rf_series.loc[common_dates],
filtered_factors.loc[common_dates],
model="CAPM"
)
rp_ff3_alpha, _ = compute_alpha(
random_portfolio_returns.loc[common_dates],
rf_series.loc[common_dates],
filtered_factors.loc[common_dates],
model="FF3"
)
# Store alphas in dictionary (annualize them)
factor_metrics = {
"ogd": {
"capm_alpha": safe_json_value(float(ogd_capm_alpha * 252)), # Annualize alpha
"ff3_alpha": safe_json_value(float(ogd_ff3_alpha * 252))
},
"equal_weight": {
"capm_alpha": safe_json_value(float(ew_capm_alpha * 252)),
"ff3_alpha": safe_json_value(float(ew_ff3_alpha * 252))
},
"random": {
"capm_alpha": safe_json_value(float(rp_capm_alpha * 252)),
"ff3_alpha": safe_json_value(float(rp_ff3_alpha * 252))
}
}
else:
print(f"Warning: Not enough common dates between returns and factors. Factor analysis skipped.")
factor_metrics = None
except Exception as e:
print(f"Factor metrics calculation failed: {str(e)}")
factor_metrics = None
else:
print("Factor data not available. Factor analysis skipped.")
factor_metrics = None
# Calculate cumulative returns for charts
try:
ogd_cumulative = calculate_cumulative_returns(ogd_returns_series)
ew_cumulative = calculate_cumulative_returns(equal_weight_returns)
rp_cumulative = calculate_cumulative_returns(random_portfolio_returns)
except Exception as e:
return {"error": f"Cumulative returns calculation failed: {str(e)}"}
# Convert cumulative returns to chart-friendly format with safe values
ogd_returns_data = []
ew_returns_data = []
rp_returns_data = []
try:
for date, value in ogd_cumulative.items():
safe_value = safe_json_value(float(value))
if safe_value is not None: # Only include valid values
ogd_returns_data.append({"date": date.strftime("%Y-%m-%d"), "value": safe_value})
for date, value in ew_cumulative.items():
safe_value = safe_json_value(float(value))
if safe_value is not None:
ew_returns_data.append({"date": date.strftime("%Y-%m-%d"), "value": safe_value})
for date, value in rp_cumulative.items():
safe_value = safe_json_value(float(value))
if safe_value is not None:
rp_returns_data.append({"date": date.strftime("%Y-%m-%d"), "value": safe_value})
except Exception as e:
return {"error": f"Error formatting return data: {str(e)}"}
# Convert weights to chart-friendly format with safe values
weights_data = []
try:
for date, row in ogd_weights_df.iterrows():
# Filter out very small weights and ensure all values are JSON safe
significant_weights = {}
for ticker, weight in row.items():
if weight > 0.01: # Only include weights > 1%
safe_weight = safe_json_value(float(weight))
if safe_weight is not None:
significant_weights[ticker] = safe_weight
weights_data.append({
"date": date.strftime("%Y-%m-%d"),
"weights": significant_weights
})
except Exception as e:
return {"error": f"Error formatting weight data: {str(e)}"}
# Calculate HHI (concentration) over time with safe values
hhi_data = []
enp_data = []
try:
hhi_values = [(ogd_weights_df.loc[date] ** 2).sum() for date in ogd_weights_df.index]
enp_values = [1.0 / hhi if hhi > 0 else None for hhi in hhi_values]
for date, value in zip(ogd_weights_df.index, hhi_values):
safe_value = safe_json_value(float(value))
if safe_value is not None:
hhi_data.append({"date": date.strftime("%Y-%m-%d"), "value": safe_value})
for date, value in zip(ogd_weights_df.index, enp_values):
safe_value = safe_json_value(float(value) if value is not None else None)
if safe_value is not None:
enp_data.append({"date": date.strftime("%Y-%m-%d"), "value": safe_value})
except Exception as e:
return {"error": f"Error calculating concentration metrics: {str(e)}"}
# Create the result dictionary with safe values for all metrics
try:
result = {
"success": True,
"cumulative_returns": {
"ogd": ogd_returns_data,
"equal_weight": ew_returns_data,
"random": rp_returns_data
},
"weights": weights_data,
"concentration": {
"hhi": hhi_data,
"enp": enp_data
},
"metrics": {
"ogd": {
"sharpe": safe_json_value(float(ogd_metrics["Annualized Sharpe Ratio"])),
"max_drawdown": safe_json_value(float(ogd_metrics["Max Drawdown"])),
"cumulative_return": safe_json_value(float(ogd_metrics["Cumulative Return"]))
},
"equal_weight": {
"sharpe": safe_json_value(float(ew_metrics["Annualized Sharpe Ratio"])),
"max_drawdown": safe_json_value(float(ew_metrics["Max Drawdown"])),
"cumulative_return": safe_json_value(float(ew_metrics["Cumulative Return"]))
},
"random": {
"sharpe": safe_json_value(float(rp_metrics["Annualized Sharpe Ratio"])),
"max_drawdown": safe_json_value(float(rp_metrics["Max Drawdown"])),
"cumulative_return": safe_json_value(float(rp_metrics["Cumulative Return"]))
}
}
}
except Exception as e:
return {"error": f"Error creating result data structure: {str(e)}"}
# Add factor metrics if available
if factor_metrics:
result["factor_metrics"] = factor_metrics
return result
# --- API Endpoints ---
@app.get("/")
async def serve_frontend():
"""Serve the custom frontend HTML."""
html_path = Path(__file__).parent / "assets" / "static" / "index.html"
if html_path.exists():
with open(html_path, 'r', encoding='utf-8') as f:
content = f.read()
# Replace static paths to use the new mount point
content = content.replace('href="/static/', 'href="/portfolio_static/')
content = content.replace('src="/static/', 'src="/portfolio_static/')
return HTMLResponse(content=content)
else:
return {"error": "Frontend HTML file not found"}
@app.get("/index.html")
async def serve_index():
"""Serve the index page HTML."""
return await serve_frontend()
@app.get("/education.html")
async def serve_education():
"""Serve the education page HTML."""
html_path = Path(__file__).parent / "assets" / "static" / "education.html"
if html_path.exists():
with open(html_path, 'r', encoding='utf-8') as f:
content = f.read()
# Replace static paths to use the new mount point
content = content.replace('href="/static/', 'href="/portfolio_static/')
content = content.replace('src="/static/', 'src="/portfolio_static/')
return HTMLResponse(content=content)
else:
return {"error": "Education HTML file not found"}
# Add a direct full-page access endpoint for HuggingFace Spaces
@app.get("/fullpage")
async def serve_fullpage():
"""Serve the fullpage HTML for standalone view."""
html_path = Path(__file__).parent / "assets" / "static" / "fullpage.html"
if html_path.exists():
with open(html_path, 'r', encoding='utf-8') as f:
content = f.read()
return HTMLResponse(content=content)
else:
# Fallback to redirecting to the main app
return RedirectResponse(url="/")
@app.get("/fullpage/education")
async def serve_fullpage_education():
"""Serve the fullpage HTML for education view."""
html_path = Path(__file__).parent / "assets" / "static" / "fullpage.html"
if html_path.exists():
with open(html_path, 'r', encoding='utf-8') as f:
content = f.read()
return HTMLResponse(content=content)
else:
# Fallback to redirecting to the education page
return RedirectResponse(url="/education.html")
@app.get("/api/tickers_by_sector")
async def get_tickers_by_sector():
"""Return the tickers grouped by sector."""
json_path = Path(__file__).parent / "data" / "tickers_by_sector.json"
if json_path.exists():
with open(json_path) as f:
return json.load(f)
else:
# Fallback to generating sectors from available tickers
tickers = stock_data_df.columns.tolist()
if 'rf' in tickers:
tickers.remove('rf')
return [{"sector": "All Available Tickers", "tickers": tickers}]
@app.post("/api/run_optimization")
async def api_run_optimization(data: dict = Body(...)):
"""API endpoint for running the optimization pipeline."""
try:
result = run_optimization_pipeline(
start_date=data.get('start_date'),
end_date=data.get('end_date'),
tickers_str=data.get('tickers', ''),
window_size=data.get('window_size', 20),
learning_rate=data.get('learning_rate', 0.1),
alpha_sortino=data.get('alpha_sortino', 1.0),
alpha_max_drawdown=data.get('alpha_max_drawdown', 1.0),
alpha_turnover=data.get('alpha_turnover', 0.1),
alpha_concentration=data.get('alpha_concentration', 0.25),
enp_min=data.get('enp_min', 5.0),
enp_max=data.get('enp_max', 20.0)
)
# Check if there was an error
if "error" in result:
return result
# Handle NaN and Infinity values in metrics
if 'metrics' in result:
for strategy in result['metrics']:
for metric, value in result['metrics'][strategy].items():
result['metrics'][strategy][metric] = safe_json_value(value)
# Check if factor metrics exist and merge them into the metrics
if 'factor_metrics' in result:
# Add factor metrics to each strategy's metrics with safe values
for strategy in result['factor_metrics']:
for metric, value in result['factor_metrics'][strategy].items():
safe_value = safe_json_value(value)
result['metrics'][strategy][metric] = safe_value
# Remove the separate factor_metrics key since we've merged it
del result['factor_metrics']
# Use custom JSON encoder by pre-encoding the content
content = json.dumps(result, cls=CustomJSONEncoder)
return Response(content=content, media_type="application/json")
except Exception as e:
return {"error": f"An error occurred: {str(e)}"}
# --- Gradio Interface ---
# Create a custom dark theme for Gradio
dark_theme = gr.themes.Monochrome(
primary_hue="indigo",
secondary_hue="slate",
neutral_hue="slate",
radius_size=gr.themes.sizes.radius_sm,
font=[gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui", "sans-serif"]
)
with gr.Blocks(theme=dark_theme) as demo:
gr.Markdown("""# Portfolio Optimization with OGD
*Optimize your portfolio using Online Gradient Descent and compare against benchmarks.*""")
# Add a link to the custom frontend
gr.Markdown("""
## View Enhanced UI
Try our enhanced, modern UI with interactive charts and stock selection:
* [Open Modern Interface](/)
Below is the basic Gradio interface for quick testing:
""")
with gr.Row():
with gr.Column(scale=1): # Input Column
gr.Markdown("### Configure Simulation")
with gr.Accordion("Data Selection", open=True): # Group data inputs
start_date_input = gr.Textbox(label="Start Date (YYYY-MM-DD)", placeholder="Default: Earliest", info="Leave blank for earliest available date.")
end_date_input = gr.Textbox(label="End Date (YYYY-MM-DD)", placeholder="Default: Latest", info="Leave blank for latest available date.")
tickers_input = gr.Textbox(
label="Tickers (comma-separated)",
placeholder="e.g., AAPL, MSFT, GOOGL",
info="Leave blank to use all available tickers in the date range."
)
with gr.Accordion("OGD Hyperparameters", open=True): # Group hyperparameters
window_size_input = gr.Number(label="Lookback Window (days)", value=20, minimum=5, step=1, info="Days of past returns used for optimization.")
learning_rate_input = gr.Number(label="Learning Rate", value=0.1, minimum=0.001, info="Step size for gradient updates.")
gr.Markdown("##### Objective Function Weights (Alphas)")
alpha_sortino_input = gr.Number(label="Sortino Ratio Weight", value=1.0, minimum=0, info="Emphasis on maximizing risk-adjusted returns (downside risk).")
alpha_max_drawdown_input = gr.Number(label="Max Drawdown Weight", value=1.0, minimum=0, info="Emphasis on minimizing the largest peak-to-trough decline.")
alpha_turnover_input = gr.Number(label="Turnover Weight", value=0.1, minimum=0, info="Emphasis on minimizing trading frequency/costs.")
alpha_concentration_input = gr.Number(label="Concentration Weight", value=0.25, minimum=0, info="Emphasis on controlling portfolio concentration.")
with gr.Accordion("Advanced Settings", open=False):
enp_min_input = gr.Number(label="Min Effective Positions", value=5.0, minimum=1.0, info="Minimum target for effective number of positions.")
enp_max_input = gr.Number(label="Max Effective Positions", value=20.0, minimum=1.0, info="Maximum target for effective number of positions.")
run_button = gr.Button("Run Optimization", variant="primary", scale=1) # Made button full width within column
with gr.Column(scale=3): # Output Column
gr.Markdown("### Results")
# Output components:
run_status_text = gr.Textbox(label="Run Status", interactive=False, lines=1)
metrics_output_df = gr.DataFrame(label="Performance Metrics Summary", interactive=False)
plot_output = gr.Plot(label="Cumulative Returns Comparison")
weights_output_df = gr.DataFrame(label="OGD Portfolio Weights (Daily)", interactive=False) # Removed height parameter
# This is the same function as before but wrapped to match the Gradio interface
def gradio_run_optimization(
start_date, end_date, tickers_str,
window_size, learning_rate,
alpha_sortino, alpha_max_drawdown, alpha_turnover, alpha_concentration,
enp_min, enp_max
):
result = run_optimization_pipeline(
start_date, end_date, tickers_str,
window_size, learning_rate,
alpha_sortino, alpha_max_drawdown, alpha_turnover, alpha_concentration,
enp_min, enp_max
)
if "error" in result:
return result["error"], None, None, None, None
# Create metrics dataframe with standard metrics
metrics_data = {
'OGD Portfolio': {
'Annualized Sharpe Ratio': result['metrics']['ogd']['sharpe'],
'Max Drawdown': result['metrics']['ogd']['max_drawdown'],
'Cumulative Return': result['metrics']['ogd']['cumulative_return']
},
'Equal Weight': {
'Annualized Sharpe Ratio': result['metrics']['equal_weight']['sharpe'],
'Max Drawdown': result['metrics']['equal_weight']['max_drawdown'],
'Cumulative Return': result['metrics']['equal_weight']['cumulative_return']
},
'Random Portfolio': {
'Annualized Sharpe Ratio': result['metrics']['random']['sharpe'],
'Max Drawdown': result['metrics']['random']['max_drawdown'],
'Cumulative Return': result['metrics']['random']['cumulative_return']
}
}
# Add factor metrics if available
if "factor_metrics" in result:
# Add CAPM and FF3 alphas for each strategy
metrics_data['OGD Portfolio']['CAPM Alpha (Ann.)'] = result['factor_metrics']['ogd']['capm_alpha']
metrics_data['OGD Portfolio']['FF3 Alpha (Ann.)'] = result['factor_metrics']['ogd']['ff3_alpha']
metrics_data['Equal Weight']['CAPM Alpha (Ann.)'] = result['factor_metrics']['equal_weight']['capm_alpha']
metrics_data['Equal Weight']['FF3 Alpha (Ann.)'] = result['factor_metrics']['equal_weight']['ff3_alpha']
metrics_data['Random Portfolio']['CAPM Alpha (Ann.)'] = result['factor_metrics']['random']['capm_alpha']
metrics_data['Random Portfolio']['FF3 Alpha (Ann.)'] = result['factor_metrics']['random']['ff3_alpha']
# Create the metrics dataframe
metrics_df = pd.DataFrame(metrics_data).T
# Create the matplotlib plot
fig, ax = plt.subplots(figsize=(10, 6))
# Convert API-friendly format back to series for plotting
ogd_data = pd.Series({datetime.datetime.strptime(d['date'], '%Y-%m-%d').date(): d['value']
for d in result['cumulative_returns']['ogd']})
ew_data = pd.Series({datetime.datetime.strptime(d['date'], '%Y-%m-%d').date(): d['value']
for d in result['cumulative_returns']['equal_weight']})
rp_data = pd.Series({datetime.datetime.strptime(d['date'], '%Y-%m-%d').date(): d['value']
for d in result['cumulative_returns']['random']})
ogd_data.plot(ax=ax, label='OGD Portfolio')
ew_data.plot(ax=ax, label='Equal Weight')
rp_data.plot(ax=ax, label='Random Portfolio')
ax.set_title('Cumulative Portfolio Returns')
ax.set_ylabel('Cumulative Return')
ax.set_xlabel('Date')
ax.legend()
ax.grid(True)
plt.tight_layout()
# Convert weights data back to DataFrame for Gradio
weights_df = pd.DataFrame()
for day_data in result['weights']:
date = datetime.datetime.strptime(day_data['date'], '%Y-%m-%d').date()
weights_df = pd.concat([weights_df, pd.Series(day_data['weights'], name=date)])
# Add ENP data
if 'concentration' in result:
enp_series = pd.Series({datetime.datetime.strptime(d['date'], '%Y-%m-%d').date(): d['value']
for d in result['concentration']['enp']})
fig2, ax2 = plt.subplots(figsize=(10, 3))
enp_series.plot(ax=ax2)
ax2.set_title('Effective Number of Positions')
ax2.set_ylabel('ENP')
ax2.grid(True)
ax2.axhline(y=enp_min, color='r', linestyle='--', alpha=0.5, label=f'Min Target ({enp_min})')
ax2.axhline(y=enp_max, color='g', linestyle='--', alpha=0.5, label=f'Max Target ({enp_max})')
ax2.legend()
plt.tight_layout()
# Add ENP plot to the output
return "Run successful!", metrics_df, fig, weights_df, fig2
return "Run successful!", metrics_df, fig, weights_df
run_button.click(
gradio_run_optimization,
inputs=[
start_date_input, end_date_input, tickers_input,
window_size_input, learning_rate_input,
alpha_sortino_input, alpha_max_drawdown_input, alpha_turnover_input, alpha_concentration_input,
enp_min_input, enp_max_input
],
outputs=[
run_status_text,
metrics_output_df,
plot_output,
weights_output_df,
gr.Plot(label="Effective Number of Positions") # Added output for ENP plot
]
)
# --- Mount Gradio app to FastAPI ---
app = gr.mount_gradio_app(app, demo, path="/gradio")
# Run the application
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)