Spaces:
Sleeping
Sleeping
| # ---------------- LOGIC ---------------- | |
| # This file contains all the data processing and simulation logic. | |
| import pandas as pd | |
| import numpy as np | |
| import yfinance as yf | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| import matplotlib.pyplot as plt | |
| import warnings | |
| from datetime import timedelta | |
| # Import configuration variables | |
| from config import ( | |
| CRISIS_PERIODS, | |
| BENCHMARK_TICKER, | |
| BENCHMARK_NAME, | |
| RECOVERY_DAYS, | |
| CRISIS_SUMMARY, | |
| CRISIS_INSIGHTS, | |
| GEMINI_MODEL_NAME, | |
| GEMINI_SYSTEM_PROMPT, | |
| GEMINI_USER_PROMPT_TEMPLATE, | |
| ) | |
| warnings.filterwarnings("ignore") | |
| try: | |
| import google.generativeai as genai | |
| except ImportError: | |
| genai = None | |
| # ---------------- UTILS ---------------- | |
| def _ensure_ns_suffix(t): | |
| """Ensures a ticker has the .NS suffix for Indian stocks.""" | |
| t = t.strip().upper() | |
| if t.startswith("^") or "." in t: | |
| return t | |
| return t + ".NS" | |
| def _fetch_prices(tickers, start, end): | |
| """Fetches historical price data from yfinance.""" | |
| raw = yf.download(tickers, start=start, end=end, progress=False, auto_adjust=True) | |
| if "Adj Close" in raw: | |
| df = raw["Adj Close"] | |
| elif "Close" in raw: | |
| df = raw["Close"] | |
| else: | |
| df = raw | |
| if isinstance(df, pd.Series): | |
| df = df.to_frame() | |
| # Handle single ticker download which doesn't have multi-index cols | |
| if not isinstance(df.columns, pd.MultiIndex): | |
| df.columns = [c.upper() for c in df.columns] | |
| return df | |
| def calc_metrics(series, benchmark_returns=None): | |
| """Calculates key performance metrics for a time series.""" | |
| returns = series.pct_change().dropna() | |
| if returns.empty: | |
| return { | |
| "total_return": 0, | |
| "volatility": 0, | |
| "VaR_95": 0, | |
| "CAGR": 0, | |
| "max_drawdown": 0, | |
| "beta": None, | |
| } | |
| total_return = (series.iloc[-1] / series.iloc[0]) - 1 | |
| vol = returns.std() * np.sqrt(252) | |
| VaR_95 = returns.quantile(0.05) | |
| days = (series.index[-1] - series.index[0]).days | |
| years = max(days / 365.25, 1 / 365.25) | |
| CAGR = (series.iloc[-1] / series.iloc[0]) ** (1 / years) - 1 | |
| drawdown = (series / series.cummax()) - 1 | |
| max_dd = drawdown.min() | |
| beta = None | |
| if benchmark_returns is not None: | |
| rr, br = returns.align(benchmark_returns, join="inner") | |
| if len(rr) > 10: | |
| cov = np.cov(rr, br)[0, 1] | |
| varb = np.var(br) | |
| beta = cov / varb if varb != 0 else np.nan | |
| return { | |
| "total_return": total_return, | |
| "volatility": vol, | |
| "VaR_95": VaR_95, | |
| "CAGR": CAGR, | |
| "max_drawdown": max_dd, | |
| "beta": beta, | |
| } | |
| def sector_from_ticker(t): | |
| """Fetches sector and industry info for a ticker.""" | |
| try: | |
| info = yf.Ticker(t).info | |
| return info.get("sector", "Unknown"), info.get("industry", "Unknown") | |
| except Exception: | |
| return "Unknown", "Unknown" | |
| def format_pct(x): | |
| """Formats a float as a percentage string.""" | |
| if x is None or (isinstance(x, float) and np.isnan(x)): | |
| return "N/A" | |
| return f"{x * 100:.2f}%" | |
| # ---------------- GEMINI AI HELPER ---------------- | |
| def generate_gemini_insights( | |
| api_key: str, | |
| crisis_name: str, | |
| metrics_md: str, | |
| extra_instructions: str = "", | |
| ) -> str: | |
| """Call Gemini to get AI-generated insights based on metrics.""" | |
| if not api_key: | |
| return "ℹ️ To see AI-generated insights, please paste a valid Gemini API key." | |
| if genai is None: | |
| return "⚠️ google-generativeai is not installed. Run `pip install google-generativeai` and retry." | |
| user_prompt = GEMINI_USER_PROMPT_TEMPLATE.format( | |
| crisis_name=crisis_name, | |
| metrics_text=metrics_md, | |
| extra_instructions=extra_instructions or "None.", | |
| ) | |
| try: | |
| genai.configure(api_key=api_key) | |
| model = genai.GenerativeModel( | |
| GEMINI_MODEL_NAME, | |
| system_instruction=GEMINI_SYSTEM_PROMPT.strip(), | |
| generation_config={"max_output_tokens": 256}, | |
| ) | |
| response = model.generate_content(user_prompt) | |
| text = getattr(response, "text", "") or "" | |
| if not text.strip(): | |
| return "⚠️ Gemini did not return any text. Please check your API key, quota, or try again." | |
| return text.strip() | |
| except Exception as e: | |
| return f"⚠️ Gemini call failed: {e}" | |
| # ---------------- SIMULATION ---------------- | |
| def run_crisis_simulation( | |
| crisis, | |
| uploaded, | |
| tickers_str, | |
| weights_str, | |
| include_etf, | |
| gemini_api_key="", | |
| gemini_extra_prompt="", | |
| ): | |
| """ | |
| The main simulation function. | |
| Takes user inputs, processes the portfolio, fetches data, | |
| and returns all outputs for the Gradio interface. | |
| """ | |
| # --- 1. Parse Portfolio --- | |
| if uploaded is not None: | |
| try: | |
| df = pd.read_csv(uploaded.name if hasattr(uploaded, "name") else uploaded) | |
| except Exception as e: | |
| return ( | |
| None, | |
| f"Error reading CSV: {e}", | |
| None, | |
| None, | |
| None, | |
| "", | |
| "No AI insights (CSV error).", | |
| ) | |
| else: | |
| try: | |
| tickers = [t.strip() for t in tickers_str.split(",") if t.strip()] | |
| weights = [float(w) for w in weights_str.split(",") if w.strip()] | |
| if not tickers or not weights or len(tickers) != len(weights): | |
| return ( | |
| None, | |
| "Error: Mismatch between tickers and weights, or fields are empty.", | |
| None, | |
| None, | |
| None, | |
| "", | |
| "No AI insights (input mismatch).", | |
| ) | |
| df = pd.DataFrame({"Ticker": tickers, "Weight": weights}) | |
| except ValueError: | |
| return ( | |
| None, | |
| "Error: Weights must be numbers.", | |
| None, | |
| None, | |
| None, | |
| "", | |
| "No AI insights (weights error).", | |
| ) | |
| if df.empty or "Ticker" not in df or "Weight" not in df: | |
| return ( | |
| None, | |
| "Error: Invalid portfolio. Check inputs.", | |
| None, | |
| None, | |
| None, | |
| "", | |
| "No AI insights (invalid portfolio).", | |
| ) | |
| df["Ticker"] = df["Ticker"].apply(_ensure_ns_suffix) | |
| # --- 2. Normalize Weights (with ETF logic) --- | |
| try: | |
| if include_etf: | |
| # Scale user's portfolio to 95% | |
| df["Weight"] = ( | |
| df["Weight"].astype(float) / df["Weight"].astype(float).sum() | |
| ) * 0.95 | |
| # Add the 5% ETF | |
| etf_row = pd.DataFrame([{"Ticker": "NIFTYBEES.NS", "Weight": 0.05}]) | |
| df = pd.concat([df, etf_row], ignore_index=True) | |
| else: | |
| # Normalize user's portfolio to 100% | |
| df["Weight"] = df["Weight"].astype(float) / df["Weight"].astype(float).sum() | |
| except ZeroDivisionError: | |
| return ( | |
| None, | |
| "Error: Portfolio weights sum to zero.", | |
| None, | |
| None, | |
| None, | |
| "", | |
| "No AI insights (weights zero).", | |
| ) | |
| # --- 3. Fetch Data --- | |
| start, end = CRISIS_PERIODS[crisis] | |
| recovery_end = pd.to_datetime(end) + pd.Timedelta(days=RECOVERY_DAYS) | |
| tickers = list(df["Ticker"].unique()) + [BENCHMARK_TICKER] | |
| prices = _fetch_prices(tickers, start, recovery_end) | |
| if prices.empty: | |
| return ( | |
| None, | |
| "No data found. Some tickers may not exist historically.", | |
| None, | |
| None, | |
| None, | |
| "", | |
| "No AI insights (no data).", | |
| ) | |
| # Ensure all required tickers were fetched | |
| fetched_tickers = [c.upper() for c in prices.columns] | |
| required_tickers = [t.upper() for t in df["Ticker"]] + [BENCHMARK_TICKER.upper()] | |
| missing = [t for t in required_tickers if t not in fetched_tickers] | |
| if missing: | |
| return ( | |
| None, | |
| f"Error: Could not fetch data for: {', '.join(missing)}", | |
| None, | |
| None, | |
| None, | |
| "", | |
| "No AI insights (missing tickers).", | |
| ) | |
| prices.ffill(inplace=True) | |
| crisis_window = prices.loc[start:end] | |
| if BENCHMARK_TICKER not in crisis_window.columns: | |
| return ( | |
| None, | |
| f"Error: Could not fetch benchmark {BENCHMARK_NAME} data for this period.", | |
| None, | |
| None, | |
| None, | |
| "", | |
| "No AI insights (benchmark error).", | |
| ) | |
| bench = crisis_window[BENCHMARK_TICKER] | |
| # --- 4. Calculate Portfolio Performance --- | |
| df_aligned = df.set_index("Ticker") | |
| df_aligned.index = df_aligned.index.str.upper() | |
| # Filter price columns to only those in our portfolio | |
| portfolio_prices = crisis_window[df_aligned.index] | |
| norm = (portfolio_prices / portfolio_prices.iloc[0]) * 100 | |
| weighted = (norm * df_aligned["Weight"]).sum(axis=1) | |
| weighted.name = "Portfolio" | |
| bench_norm = (bench / bench.iloc[0]) * 100 | |
| port_m = calc_metrics(weighted, bench.pct_change()) | |
| bench_m = calc_metrics(bench_norm) | |
| # --- 5. Generate Outputs (Metrics Table) --- | |
| beta_val = port_m["beta"] | |
| if beta_val is None or (isinstance(beta_val, float) and np.isnan(beta_val)): | |
| beta_str = "N/A" | |
| else: | |
| beta_str = f"{beta_val:.2f}" | |
| metrics_md = f"""### Simulation: {crisis} | |
| | Metric | Portfolio | {BENCHMARK_NAME} | | |
| |:---|---:|---:| | |
| | **Total Return** | **{format_pct(port_m['total_return'])}** | **{format_pct(bench_m['total_return'])}** | | |
| | Max Drawdown | {format_pct(port_m['max_drawdown'])} | {format_pct(bench_m['max_drawdown'])} | | |
| | Volatility (Ann.) | {format_pct(port_m['volatility'])} | {format_pct(bench_m['volatility'])} | | |
| | CAGR | {format_pct(port_m['CAGR'])} | {format_pct(bench_m['CAGR'])} | | |
| | Beta | {beta_str} | - | | |
| | VaR (95%, Daily) | {format_pct(port_m['VaR_95'])} | {format_pct(bench_m['VaR_95'])} | | |
| """ | |
| # --- 6. Generate Outputs (Performance Plot) --- | |
| fig = go.Figure() | |
| fig.add_trace( | |
| go.Scatter( | |
| x=weighted.index, | |
| y=weighted.values, | |
| name="Portfolio", | |
| mode="lines", | |
| line=dict(width=3, color="#1E88E5"), | |
| ) | |
| ) | |
| fig.add_trace( | |
| go.Scatter( | |
| x=bench_norm.index, | |
| y=bench_norm.values, | |
| name=BENCHMARK_NAME, | |
| mode="lines", | |
| line=dict(width=2, color="#FFC107", dash="dot"), | |
| ) | |
| ) | |
| fig.update_layout( | |
| title=f"<b>{crisis}</b>: Portfolio vs Benchmark Performance", | |
| template="plotly_white", | |
| xaxis_title="Date", | |
| yaxis_title="Normalized Value (Base 100)", | |
| height=450, | |
| legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01), | |
| ) | |
| # --- 7. Generate Outputs (Sector Analysis) --- | |
| df["Sector"], df["Industry"] = zip(*df["Ticker"].map(sector_from_ticker)) | |
| sector_dd = [] | |
| for t in df.Ticker: | |
| if t.upper() in crisis_window.columns: | |
| ser = crisis_window[t.upper()] | |
| dd = (ser / ser.cummax() - 1).min() | |
| sector_dd.append(dd) | |
| else: | |
| sector_dd.append(0) # Ticker wasn't in crisis window | |
| df["Drawdown"] = sector_dd | |
| # Aggregate weighted drawdown by sector | |
| sec_agg = df.groupby("Sector").apply( | |
| lambda d: np.average(d["Drawdown"], weights=d["Weight"] / d["Weight"].sum()) | |
| ) | |
| sec_agg = sec_agg.sort_values() | |
| sec_fig = px.bar( | |
| sec_agg * 100, | |
| y=sec_agg.index, | |
| x=sec_agg.values, | |
| orientation="h", | |
| title="Weighted Max Drawdown by Sector", | |
| labels={"x": "Max Drawdown (%)", "y": "Sector"}, | |
| ) | |
| sec_fig.update_layout( | |
| template="plotly_white", | |
| yaxis={"categoryorder": "total ascending"}, | |
| ) | |
| # --- 8. Generate Outputs (Insights & Pie Chart) --- | |
| ins = [ | |
| f"### Insights for: {crisis}", | |
| f"_{CRISIS_SUMMARY.get(crisis, 'No summary available.')}_", | |
| ] | |
| if crisis in CRISIS_INSIGHTS: | |
| for s, txt in CRISIS_INSIGHTS[crisis].items(): | |
| ins.append(f"- **{s}**: {txt}") | |
| insights_md = "\n".join(ins) | |
| # --- 8. Generate Outputs (Insights & Pie Chart) --- | |
| ins = [ | |
| f"### Insights for: {crisis}", | |
| f"_{CRISIS_SUMMARY.get(crisis, 'No summary available.')}_", | |
| ] | |
| if crisis in CRISIS_INSIGHTS: | |
| for s, txt in CRISIS_INSIGHTS[crisis].items(): | |
| ins.append(f"- **{s}**: {txt}") | |
| insights_md = "\n".join(ins) | |
| # --- 8. Generate Outputs (Insights & Pie Chart) --- | |
| ins = [ | |
| f"### Insights for: {crisis}", | |
| f"_{CRISIS_SUMMARY.get(crisis, 'No summary available.')}_", | |
| ] | |
| if crisis in CRISIS_INSIGHTS: | |
| for s, txt in CRISIS_INSIGHTS[crisis].items(): | |
| ins.append(f"- **{s}**: {txt}") | |
| insights_md = "\n".join(ins) | |
| # --- 8. Generate Outputs (Insights & Pie Chart) --- | |
| ins = [ | |
| f"### Insights for: {crisis}", | |
| f"_{CRISIS_SUMMARY.get(crisis, 'No summary available.')}_", | |
| ] | |
| if crisis in CRISIS_INSIGHTS: | |
| for s, txt in CRISIS_INSIGHTS[crisis].items(): | |
| ins.append(f"- **{s}**: {txt}") | |
| insights_md = "\n".join(ins) | |
| # --- Pie chart: final portfolio weights (including ETF if added) --- | |
| pie_df = df[["Ticker", "Weight"]].copy() | |
| pie_df["Ticker"] = pie_df["Ticker"].astype(str) | |
| pie_df["Weight"] = pd.to_numeric(pie_df["Weight"], errors="raise") | |
| wsum = pie_df["Weight"].sum() | |
| if wsum <= 0: | |
| raise ValueError(f"Pie chart error: portfolio weights sum to {wsum}.") | |
| pie_df["Weight"] = pie_df["Weight"] / wsum | |
| print("DEBUG pie_df for pie chart:\n", pie_df) | |
| print("DEBUG weight sum:", pie_df["Weight"].sum()) | |
| # Matplotlib pie chart | |
| fig_pie, ax = plt.subplots(figsize=(4, 4)) | |
| ax.pie( | |
| pie_df["Weight"].values, | |
| labels=pie_df["Ticker"].values, | |
| autopct="%1.1f%%", | |
| startangle=90, | |
| ) | |
| ax.set_title("Final Portfolio Allocation") | |
| ax.axis("equal") | |
| # --- 9. Logs & AI Insights --- | |
| log_message = f"✅ Simulation Complete. Received weights: '{weights_str}'" | |
| gemini_insights = generate_gemini_insights( | |
| api_key=gemini_api_key or "", | |
| crisis_name=crisis, | |
| metrics_md=metrics_md, | |
| extra_instructions=gemini_extra_prompt or "", | |
| ) | |
| return fig, metrics_md, sec_fig, insights_md, fig_pie, log_message, gemini_insights | |