# ---------------- LOGIC ---------------- # This file contains all the data processing and simulation logic. import pandas as pd import numpy as np import yfinance as yf import plotly.express as px import plotly.graph_objects as go import matplotlib.pyplot as plt import warnings from datetime import timedelta # Import configuration variables from config import ( CRISIS_PERIODS, BENCHMARK_TICKER, BENCHMARK_NAME, RECOVERY_DAYS, CRISIS_SUMMARY, CRISIS_INSIGHTS, GEMINI_MODEL_NAME, GEMINI_SYSTEM_PROMPT, GEMINI_USER_PROMPT_TEMPLATE, ) warnings.filterwarnings("ignore") try: import google.generativeai as genai except ImportError: genai = None # ---------------- UTILS ---------------- def _ensure_ns_suffix(t): """Ensures a ticker has the .NS suffix for Indian stocks.""" t = t.strip().upper() if t.startswith("^") or "." in t: return t return t + ".NS" def _fetch_prices(tickers, start, end): """Fetches historical price data from yfinance.""" raw = yf.download(tickers, start=start, end=end, progress=False, auto_adjust=True) if "Adj Close" in raw: df = raw["Adj Close"] elif "Close" in raw: df = raw["Close"] else: df = raw if isinstance(df, pd.Series): df = df.to_frame() # Handle single ticker download which doesn't have multi-index cols if not isinstance(df.columns, pd.MultiIndex): df.columns = [c.upper() for c in df.columns] return df def calc_metrics(series, benchmark_returns=None): """Calculates key performance metrics for a time series.""" returns = series.pct_change().dropna() if returns.empty: return { "total_return": 0, "volatility": 0, "VaR_95": 0, "CAGR": 0, "max_drawdown": 0, "beta": None, } total_return = (series.iloc[-1] / series.iloc[0]) - 1 vol = returns.std() * np.sqrt(252) VaR_95 = returns.quantile(0.05) days = (series.index[-1] - series.index[0]).days years = max(days / 365.25, 1 / 365.25) CAGR = (series.iloc[-1] / series.iloc[0]) ** (1 / years) - 1 drawdown = (series / series.cummax()) - 1 max_dd = drawdown.min() beta = None if benchmark_returns is not None: rr, br = returns.align(benchmark_returns, join="inner") if len(rr) > 10: cov = np.cov(rr, br)[0, 1] varb = np.var(br) beta = cov / varb if varb != 0 else np.nan return { "total_return": total_return, "volatility": vol, "VaR_95": VaR_95, "CAGR": CAGR, "max_drawdown": max_dd, "beta": beta, } def sector_from_ticker(t): """Fetches sector and industry info for a ticker.""" try: info = yf.Ticker(t).info return info.get("sector", "Unknown"), info.get("industry", "Unknown") except Exception: return "Unknown", "Unknown" def format_pct(x): """Formats a float as a percentage string.""" if x is None or (isinstance(x, float) and np.isnan(x)): return "N/A" return f"{x * 100:.2f}%" # ---------------- GEMINI AI HELPER ---------------- def generate_gemini_insights( api_key: str, crisis_name: str, metrics_md: str, extra_instructions: str = "", ) -> str: """Call Gemini to get AI-generated insights based on metrics.""" if not api_key: return "ℹ️ To see AI-generated insights, please paste a valid Gemini API key." if genai is None: return "⚠️ google-generativeai is not installed. Run `pip install google-generativeai` and retry." user_prompt = GEMINI_USER_PROMPT_TEMPLATE.format( crisis_name=crisis_name, metrics_text=metrics_md, extra_instructions=extra_instructions or "None.", ) try: genai.configure(api_key=api_key) model = genai.GenerativeModel( GEMINI_MODEL_NAME, system_instruction=GEMINI_SYSTEM_PROMPT.strip(), generation_config={"max_output_tokens": 256}, ) response = model.generate_content(user_prompt) text = getattr(response, "text", "") or "" if not text.strip(): return "⚠️ Gemini did not return any text. Please check your API key, quota, or try again." return text.strip() except Exception as e: return f"⚠️ Gemini call failed: {e}" # ---------------- SIMULATION ---------------- def run_crisis_simulation( crisis, uploaded, tickers_str, weights_str, include_etf, gemini_api_key="", gemini_extra_prompt="", ): """ The main simulation function. Takes user inputs, processes the portfolio, fetches data, and returns all outputs for the Gradio interface. """ # --- 1. Parse Portfolio --- if uploaded is not None: try: df = pd.read_csv(uploaded.name if hasattr(uploaded, "name") else uploaded) except Exception as e: return ( None, f"Error reading CSV: {e}", None, None, None, "", "No AI insights (CSV error).", ) else: try: tickers = [t.strip() for t in tickers_str.split(",") if t.strip()] weights = [float(w) for w in weights_str.split(",") if w.strip()] if not tickers or not weights or len(tickers) != len(weights): return ( None, "Error: Mismatch between tickers and weights, or fields are empty.", None, None, None, "", "No AI insights (input mismatch).", ) df = pd.DataFrame({"Ticker": tickers, "Weight": weights}) except ValueError: return ( None, "Error: Weights must be numbers.", None, None, None, "", "No AI insights (weights error).", ) if df.empty or "Ticker" not in df or "Weight" not in df: return ( None, "Error: Invalid portfolio. Check inputs.", None, None, None, "", "No AI insights (invalid portfolio).", ) df["Ticker"] = df["Ticker"].apply(_ensure_ns_suffix) # --- 2. Normalize Weights (with ETF logic) --- try: if include_etf: # Scale user's portfolio to 95% df["Weight"] = ( df["Weight"].astype(float) / df["Weight"].astype(float).sum() ) * 0.95 # Add the 5% ETF etf_row = pd.DataFrame([{"Ticker": "NIFTYBEES.NS", "Weight": 0.05}]) df = pd.concat([df, etf_row], ignore_index=True) else: # Normalize user's portfolio to 100% df["Weight"] = df["Weight"].astype(float) / df["Weight"].astype(float).sum() except ZeroDivisionError: return ( None, "Error: Portfolio weights sum to zero.", None, None, None, "", "No AI insights (weights zero).", ) # --- 3. Fetch Data --- start, end = CRISIS_PERIODS[crisis] recovery_end = pd.to_datetime(end) + pd.Timedelta(days=RECOVERY_DAYS) tickers = list(df["Ticker"].unique()) + [BENCHMARK_TICKER] prices = _fetch_prices(tickers, start, recovery_end) if prices.empty: return ( None, "No data found. Some tickers may not exist historically.", None, None, None, "", "No AI insights (no data).", ) # Ensure all required tickers were fetched fetched_tickers = [c.upper() for c in prices.columns] required_tickers = [t.upper() for t in df["Ticker"]] + [BENCHMARK_TICKER.upper()] missing = [t for t in required_tickers if t not in fetched_tickers] if missing: return ( None, f"Error: Could not fetch data for: {', '.join(missing)}", None, None, None, "", "No AI insights (missing tickers).", ) prices.ffill(inplace=True) crisis_window = prices.loc[start:end] if BENCHMARK_TICKER not in crisis_window.columns: return ( None, f"Error: Could not fetch benchmark {BENCHMARK_NAME} data for this period.", None, None, None, "", "No AI insights (benchmark error).", ) bench = crisis_window[BENCHMARK_TICKER] # --- 4. Calculate Portfolio Performance --- df_aligned = df.set_index("Ticker") df_aligned.index = df_aligned.index.str.upper() # Filter price columns to only those in our portfolio portfolio_prices = crisis_window[df_aligned.index] norm = (portfolio_prices / portfolio_prices.iloc[0]) * 100 weighted = (norm * df_aligned["Weight"]).sum(axis=1) weighted.name = "Portfolio" bench_norm = (bench / bench.iloc[0]) * 100 port_m = calc_metrics(weighted, bench.pct_change()) bench_m = calc_metrics(bench_norm) # --- 5. Generate Outputs (Metrics Table) --- beta_val = port_m["beta"] if beta_val is None or (isinstance(beta_val, float) and np.isnan(beta_val)): beta_str = "N/A" else: beta_str = f"{beta_val:.2f}" metrics_md = f"""### Simulation: {crisis} | Metric | Portfolio | {BENCHMARK_NAME} | |:---|---:|---:| | **Total Return** | **{format_pct(port_m['total_return'])}** | **{format_pct(bench_m['total_return'])}** | | Max Drawdown | {format_pct(port_m['max_drawdown'])} | {format_pct(bench_m['max_drawdown'])} | | Volatility (Ann.) | {format_pct(port_m['volatility'])} | {format_pct(bench_m['volatility'])} | | CAGR | {format_pct(port_m['CAGR'])} | {format_pct(bench_m['CAGR'])} | | Beta | {beta_str} | - | | VaR (95%, Daily) | {format_pct(port_m['VaR_95'])} | {format_pct(bench_m['VaR_95'])} | """ # --- 6. Generate Outputs (Performance Plot) --- fig = go.Figure() fig.add_trace( go.Scatter( x=weighted.index, y=weighted.values, name="Portfolio", mode="lines", line=dict(width=3, color="#1E88E5"), ) ) fig.add_trace( go.Scatter( x=bench_norm.index, y=bench_norm.values, name=BENCHMARK_NAME, mode="lines", line=dict(width=2, color="#FFC107", dash="dot"), ) ) fig.update_layout( title=f"{crisis}: Portfolio vs Benchmark Performance", template="plotly_white", xaxis_title="Date", yaxis_title="Normalized Value (Base 100)", height=450, legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01), ) # --- 7. Generate Outputs (Sector Analysis) --- df["Sector"], df["Industry"] = zip(*df["Ticker"].map(sector_from_ticker)) sector_dd = [] for t in df.Ticker: if t.upper() in crisis_window.columns: ser = crisis_window[t.upper()] dd = (ser / ser.cummax() - 1).min() sector_dd.append(dd) else: sector_dd.append(0) # Ticker wasn't in crisis window df["Drawdown"] = sector_dd # Aggregate weighted drawdown by sector sec_agg = df.groupby("Sector").apply( lambda d: np.average(d["Drawdown"], weights=d["Weight"] / d["Weight"].sum()) ) sec_agg = sec_agg.sort_values() sec_fig = px.bar( sec_agg * 100, y=sec_agg.index, x=sec_agg.values, orientation="h", title="Weighted Max Drawdown by Sector", labels={"x": "Max Drawdown (%)", "y": "Sector"}, ) sec_fig.update_layout( template="plotly_white", yaxis={"categoryorder": "total ascending"}, ) # --- 8. Generate Outputs (Insights & Pie Chart) --- ins = [ f"### Insights for: {crisis}", f"_{CRISIS_SUMMARY.get(crisis, 'No summary available.')}_", ] if crisis in CRISIS_INSIGHTS: for s, txt in CRISIS_INSIGHTS[crisis].items(): ins.append(f"- **{s}**: {txt}") insights_md = "\n".join(ins) # --- 8. Generate Outputs (Insights & Pie Chart) --- ins = [ f"### Insights for: {crisis}", f"_{CRISIS_SUMMARY.get(crisis, 'No summary available.')}_", ] if crisis in CRISIS_INSIGHTS: for s, txt in CRISIS_INSIGHTS[crisis].items(): ins.append(f"- **{s}**: {txt}") insights_md = "\n".join(ins) # --- 8. Generate Outputs (Insights & Pie Chart) --- ins = [ f"### Insights for: {crisis}", f"_{CRISIS_SUMMARY.get(crisis, 'No summary available.')}_", ] if crisis in CRISIS_INSIGHTS: for s, txt in CRISIS_INSIGHTS[crisis].items(): ins.append(f"- **{s}**: {txt}") insights_md = "\n".join(ins) # --- 8. Generate Outputs (Insights & Pie Chart) --- ins = [ f"### Insights for: {crisis}", f"_{CRISIS_SUMMARY.get(crisis, 'No summary available.')}_", ] if crisis in CRISIS_INSIGHTS: for s, txt in CRISIS_INSIGHTS[crisis].items(): ins.append(f"- **{s}**: {txt}") insights_md = "\n".join(ins) # --- Pie chart: final portfolio weights (including ETF if added) --- pie_df = df[["Ticker", "Weight"]].copy() pie_df["Ticker"] = pie_df["Ticker"].astype(str) pie_df["Weight"] = pd.to_numeric(pie_df["Weight"], errors="raise") wsum = pie_df["Weight"].sum() if wsum <= 0: raise ValueError(f"Pie chart error: portfolio weights sum to {wsum}.") pie_df["Weight"] = pie_df["Weight"] / wsum print("DEBUG pie_df for pie chart:\n", pie_df) print("DEBUG weight sum:", pie_df["Weight"].sum()) # Matplotlib pie chart fig_pie, ax = plt.subplots(figsize=(4, 4)) ax.pie( pie_df["Weight"].values, labels=pie_df["Ticker"].values, autopct="%1.1f%%", startangle=90, ) ax.set_title("Final Portfolio Allocation") ax.axis("equal") # --- 9. Logs & AI Insights --- log_message = f"✅ Simulation Complete. Received weights: '{weights_str}'" gemini_insights = generate_gemini_insights( api_key=gemini_api_key or "", crisis_name=crisis, metrics_md=metrics_md, extra_instructions=gemini_extra_prompt or "", ) return fig, metrics_md, sec_fig, insights_md, fig_pie, log_message, gemini_insights