# scripts/app.py import gradio as gr import pandas as pd import numpy as np import plotly.graph_objects as go import plotly.express as px from datetime import datetime, timedelta import os import sys import json import torch from fetch_market_data import fetch_market_data, ASSETS, FRED_IDS from llm_analysis_rag import analyze_agent_decision, analyze_historical_segment from stable_baselines3 import SAC from environment import PortfolioEnv from evaluate_baselines import buy_and_hold, equally_weighted_rebalanced # --- Configuration --- MODEL_PATH = os.path.join("checkpoints", "sac_portfolio_model.zip") WINDOW_SIZE = 30 MACRO_COLS = list(FRED_IDS.values()) DASHBOARD_DATA_PATH = os.path.join("data", "historical_dashboard_data.csv") TRAIN_START_DATE = "2015-01-01" TRAIN_END_DATE = "2020-12-31" # Global variable for dashboard data needed for Tabs 3 & 4 DASHBOARD_DATA_DF = None # Define Time Period mappings for the dropdown TIME_PERIODS = { "6 Months": 180, "1 Year": 365, "2 Years": 730, "5 Years": 1825, "Max Available": 9999 # Sentinel value for max } # ========================================= # Initialization Functions # ========================================= def initialize_dashboard_data(): """Fetches and loads historical data at startup for Tabs 3 & 4.""" global DASHBOARD_DATA_DF print("--- Initializing Historical Data for Analyst/Simulation Tabs ---") # Fetching last 6 years to support longer analysis periods and simulation end_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d') start_date = (datetime.now() - timedelta(days=365*6)).strftime('%Y-%m-%d') print(f"Fetching historical data from {start_date} to {end_date}...") # This might take a minute on first run fetch_market_data(start_date, end_date, DASHBOARD_DATA_PATH) if os.path.exists(DASHBOARD_DATA_PATH): DASHBOARD_DATA_DF = pd.read_csv(DASHBOARD_DATA_PATH, index_col=0, parse_dates=True) # Basic cleaning DASHBOARD_DATA_DF.dropna(how='all', inplace=True) # Calculate equal weight return for dashboard metrics asset_cols = [c for c in ASSETS if c in DASHBOARD_DATA_DF.columns] if asset_cols: DASHBOARD_DATA_DF['Daily_Ret_Eq'] = DASHBOARD_DATA_DF[asset_cols].pct_change().mean(axis=1) print(f"Data loaded successfully. Shape: {DASHBOARD_DATA_DF.shape}") print(f"Data range: {DASHBOARD_DATA_DF.index.min().date()} to {DASHBOARD_DATA_DF.index.max().date()}") else: print("❌ Failed to initialize historical data.") # Initialize data at startup try: initialize_dashboard_data() except Exception as e: print(f"Warning: Data initialization failed. Error: {e}") # ========================================= # Professional Metrics & Evaluation Functions # ========================================= def evaluate_agent_pro(env, model): """ Runs the trained agent on the environment and returns portfolio values. """ obs, info = env.reset() terminated, truncated = False, False portfolio_values = [env.initial_balance] while not (terminated or truncated): action, _states = model.predict(obs, deterministic=True) obs, reward, terminated, truncated, info = env.step(action) portfolio_values.append(info['portfolio_value']) # Align index with the actual steps taken valid_dates = env.df.index[env.window_size-1:] return pd.Series(portfolio_values, index=valid_dates[:len(portfolio_values)]) def calculate_metrics_pro(portfolio_values, freq=252, rf=0.0): """ Calculates key professional performance metrics from a series of portfolio values. """ if len(portfolio_values) < 2: return {k: "N/A" for k in ["Total Return", "CAGR", "Sharpe Ratio", "Sortino Ratio", "Volatility", "Max Drawdown", "Calmar Ratio"]} returns = portfolio_values.pct_change().dropna() if returns.empty: return {k: "0.00%" if "%" in k else "0.00" for k in ["Total Return", "CAGR", "Sharpe Ratio", "Sortino Ratio", "Volatility", "Max Drawdown", "Calmar Ratio"]} total_return = (portfolio_values.iloc[-1] / portfolio_values.iloc[0]) - 1 num_years = (len(portfolio_values) - 1) / freq cagr = (portfolio_values.iloc[-1] / portfolio_values.iloc[0]) ** (1/num_years) - 1 if num_years > 0 else 0.0 sharpe_ratio = np.sqrt(freq) * (returns.mean() - rf) / returns.std() if returns.std() > 0 else np.nan downside_returns = returns[returns < 0] downside_std = downside_returns.std() sortino_ratio = np.sqrt(freq) * (returns.mean() - rf) / downside_std if downside_std > 0 else np.nan volatility = returns.std() * np.sqrt(freq) rolling_max = portfolio_values.cummax() drawdown = portfolio_values / rolling_max - 1.0 max_drawdown = drawdown.min() calmar_ratio = cagr / abs(max_drawdown) if max_drawdown != 0 and cagr != 0 else np.nan return { "Total Return": total_return, "CAGR": cagr, "Sharpe Ratio": sharpe_ratio, "Sortino Ratio": sortino_ratio, "Volatility": volatility, "Max Drawdown": max_drawdown, "Calmar Ratio": calmar_ratio } # ========================================= # XAI: Feature Importance Function # ========================================= def calculate_feature_importance(model, obs): obs_tensor = torch.as_tensor(obs, dtype=torch.float32, device=model.device) if obs_tensor.dim() == 1: obs_tensor = obs_tensor.unsqueeze(0) obs_tensor.requires_grad_() actor = model.policy.actor baseline = torch.zeros_like(obs_tensor) steps = 50 scaled_inputs = [baseline + (float(i) / steps) * (obs_tensor - baseline) for i in range(steps + 1)] grads = [] for scaled_input in scaled_inputs: action_mean = actor(scaled_input) target_output = action_mean.sum() grad = torch.autograd.grad(outputs=target_output, inputs=scaled_input)[0] grads.append(grad) # --- Stack gradients first, then perform arithmetic --- stacked_grads = torch.stack(grads) avg_grads = (stacked_grads[:-1] + stacked_grads[1:]) / 2.0 avg_grads = avg_grads.mean(dim=0) # ----------------------------------------------------------- integrated_grads = (obs_tensor - baseline) * avg_grads importance_scores = integrated_grads.detach().cpu().numpy().flatten() feature_names = [] for i in range(WINDOW_SIZE): for asset in ASSETS: feature_names.append(f"{asset}_t-{WINDOW_SIZE-1-i}") for i in range(WINDOW_SIZE): for macro in MACRO_COLS: feature_names.append(f"{macro}_t-{WINDOW_SIZE-1-i}") feature_importance_dict = dict(zip(feature_names, importance_scores)) aggregated_importance = {} for base_feature in ASSETS + MACRO_COLS: total_imp = sum(abs(val) for key, val in feature_importance_dict.items() if key.startswith(base_feature)) aggregated_importance[base_feature] = total_imp top_features = dict(sorted(aggregated_importance.items(), key=lambda item: item[1], reverse=True)[:8]) fig = px.bar(x=list(top_features.values()), y=list(top_features.keys()), orientation='h', title="Top Influential Features (XAI)", labels={'x': 'Importance', 'y': 'Feature'}, color=list(top_features.values()), color_continuous_scale=px.colors.sequential.Viridis) fig.update_layout(template="plotly_dark", paper_bgcolor='rgba(0,0,0,0)', plot_bgcolor='rgba(0,0,0,0)', yaxis={'categoryorder':'total ascending'}, coloraxis_showscale=False, margin=dict(l=10, r=10, t=40, b=10), height=300, hoverlabel=dict(bgcolor="white", font_size=14, font_family="Roboto", font_color="black")) return fig # ========================================= # Tab 4 Logic: Historical Simulation # ========================================= def run_historical_simulation(start_date_str, end_date_str): """ Runs the RL agent on historical data and compares to baselines using professional metrics. """ if DASHBOARD_DATA_DF is None: return go.Figure(), "Data not initialized. Please restart app.", gr.update(visible=False) status_msg = "Preparing simulation..." yield go.Figure(), status_msg, gr.update(visible=False) try: # 1. Validate and Slice Data try: start_date = pd.to_datetime(start_date_str) end_date = pd.to_datetime(end_date_str) except ValueError: yield go.Figure(), "Error: Invalid date format. Use YYYY-MM-DD.", gr.update(visible=False) return if start_date < DASHBOARD_DATA_DF.index.min() or end_date > DASHBOARD_DATA_DF.index.max(): avail_start = DASHBOARD_DATA_DF.index.min().date() avail_end = DASHBOARD_DATA_DF.index.max().date() yield go.Figure(), f"Error: Selected dates outside available range ({avail_start} to {avail_end}).", gr.update(visible=False) return df_slice = DASHBOARD_DATA_DF.loc[start_date:end_date].copy() asset_cols_only = [c for c in ASSETS if c in df_slice.columns] if len(df_slice) < WINDOW_SIZE + 10: yield go.Figure(), "Error: Time period too short for simulation.", gr.update(visible=False) return # 2. Setup Environment and Agent status_msg = "Running RL Agent simulation..." yield go.Figure(), status_msg, gr.update(visible=False) env = PortfolioEnv(df_slice, WINDOW_SIZE, initial_balance=10000) if not os.path.exists(MODEL_PATH): raise FileNotFoundError(f"Model not found: {MODEL_PATH}") model = SAC.load(MODEL_PATH) # 3. Run Simulation Loop & Get Values using Pro Function rl_portfolio_series = evaluate_agent_pro(env, model) # 4. Calculate Baselines using Pro Functions status_msg = "Calculating baselines and metrics..." yield go.Figure(), status_msg, gr.update(visible=False) # Pass only asset columns to baseline functions bnh_portfolio_series = buy_and_hold(df_slice[asset_cols_only], initial_balance=10000) # Realign B&H index to match RL agent's start date bnh_portfolio_series = bnh_portfolio_series.loc[rl_portfolio_series.index[0]:] # Normalize B&H starting value to match RL agent's start bnh_portfolio_series = bnh_portfolio_series / bnh_portfolio_series.iloc[0] * 10000 eq_portfolio_series = equally_weighted_rebalanced(df_slice[asset_cols_only], initial_balance=10000) eq_portfolio_series = eq_portfolio_series.loc[rl_portfolio_series.index[0]:] eq_portfolio_series = eq_portfolio_series / eq_portfolio_series.iloc[0] * 10000 # 5. Generate Plot fig = go.Figure() fig.add_trace(go.Scatter(x=rl_portfolio_series.index, y=rl_portfolio_series, mode='lines', name='RL Agent (SAC)', line=dict(color='#10b981', width=3))) fig.add_trace(go.Scatter(x=bnh_portfolio_series.index, y=bnh_portfolio_series, mode='lines', name='Buy & Hold (SPY)', line=dict(color='#6b7280', dash='dash'))) fig.add_trace(go.Scatter(x=eq_portfolio_series.index, y=eq_portfolio_series, mode='lines', name='Equal Weighted', line=dict(color='#a855f7', dash='dot'))) fig.update_layout( title="Simulation: Strategy Performance Comparison ($10k Start)", xaxis_title="Date", yaxis_title="Portfolio Value ($)", template="plotly_dark", paper_bgcolor='rgba(0,0,0,0)', plot_bgcolor='rgba(0,0,0,0)', hovermode="x unified", legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1) ) # 6. Calculate Professional Metrics Table rl_m = calculate_metrics_pro(rl_portfolio_series) bnh_m = calculate_metrics_pro(bnh_portfolio_series) eq_m = calculate_metrics_pro(eq_portfolio_series) # Helper to format based on metric type def fmt(val, is_pct=True): if pd.isna(val): return "N/A" return f"{val:.2%}" if is_pct else f"{val:.2f}" metrics_data = { "Metric": ["Total Return", "CAGR", "Sharpe Ratio", "Sortino Ratio", "Volatility (Ann.)", "Max Drawdown", "Calmar Ratio"], "RL Agent (SAC)": [fmt(rl_m["Total Return"]), fmt(rl_m["CAGR"]), fmt(rl_m["Sharpe Ratio"], False), fmt(rl_m["Sortino Ratio"], False), fmt(rl_m["Volatility"]), fmt(rl_m["Max Drawdown"]), fmt(rl_m["Calmar Ratio"], False)], "Buy & Hold (SPY)": [fmt(bnh_m["Total Return"]), fmt(bnh_m["CAGR"]), fmt(bnh_m["Sharpe Ratio"], False), fmt(bnh_m["Sortino Ratio"], False), fmt(bnh_m["Volatility"]), fmt(bnh_m["Max Drawdown"]), fmt(bnh_m["Calmar Ratio"], False)], "Equal Weighted": [fmt(eq_m["Total Return"]), fmt(eq_m["CAGR"]), fmt(eq_m["Sharpe Ratio"], False), fmt(eq_m["Sortino Ratio"], False), fmt(eq_m["Volatility"]), fmt(eq_m["Max Drawdown"]), fmt(eq_m["Calmar Ratio"], False)], } metrics_df = pd.DataFrame(metrics_data) # Format the dataframe as a markdown table for cleaner display metrics_md = metrics_df.to_markdown(index=False) final_metrics_display = f"### 📊 Professional Performance Metrics\n\n{metrics_md}" yield fig, "Simulation Complete.", final_metrics_display except Exception as e: import traceback traceback.print_exc() yield go.Figure(), f"Error during simulation: {str(e)}", gr.update(visible=False) # ========================================= # Tab 3 Logic: Historical Data Analyst # ========================================= def run_historical_analysis(selected_assets, period_name): """Backend for Tab 3.""" if DASHBOARD_DATA_DF is None or not selected_assets: return go.Figure(), "Please wait for data initialization or select assets." status_html = """
Strategy: {strat}
Risk: {risk}
Reason: {just}
Conf: {conf}/10