CrisesLensApp / logic.py
techatcreated's picture
Upload 4 files
5b3602b verified
# ---------------- LOGIC ----------------
# This file contains all the data processing and simulation logic.
import pandas as pd
import numpy as np
import yfinance as yf
import plotly.express as px
import plotly.graph_objects as go
import matplotlib.pyplot as plt
import warnings
from datetime import timedelta
# Import configuration variables
from config import (
CRISIS_PERIODS,
BENCHMARK_TICKER,
BENCHMARK_NAME,
RECOVERY_DAYS,
CRISIS_SUMMARY,
CRISIS_INSIGHTS,
GEMINI_MODEL_NAME,
GEMINI_SYSTEM_PROMPT,
GEMINI_USER_PROMPT_TEMPLATE,
)
warnings.filterwarnings("ignore")
try:
import google.generativeai as genai
except ImportError:
genai = None
# ---------------- UTILS ----------------
def _ensure_ns_suffix(t):
"""Ensures a ticker has the .NS suffix for Indian stocks."""
t = t.strip().upper()
if t.startswith("^") or "." in t:
return t
return t + ".NS"
def _fetch_prices(tickers, start, end):
"""Fetches historical price data from yfinance."""
raw = yf.download(tickers, start=start, end=end, progress=False, auto_adjust=True)
if "Adj Close" in raw:
df = raw["Adj Close"]
elif "Close" in raw:
df = raw["Close"]
else:
df = raw
if isinstance(df, pd.Series):
df = df.to_frame()
# Handle single ticker download which doesn't have multi-index cols
if not isinstance(df.columns, pd.MultiIndex):
df.columns = [c.upper() for c in df.columns]
return df
def calc_metrics(series, benchmark_returns=None):
"""Calculates key performance metrics for a time series."""
returns = series.pct_change().dropna()
if returns.empty:
return {
"total_return": 0,
"volatility": 0,
"VaR_95": 0,
"CAGR": 0,
"max_drawdown": 0,
"beta": None,
}
total_return = (series.iloc[-1] / series.iloc[0]) - 1
vol = returns.std() * np.sqrt(252)
VaR_95 = returns.quantile(0.05)
days = (series.index[-1] - series.index[0]).days
years = max(days / 365.25, 1 / 365.25)
CAGR = (series.iloc[-1] / series.iloc[0]) ** (1 / years) - 1
drawdown = (series / series.cummax()) - 1
max_dd = drawdown.min()
beta = None
if benchmark_returns is not None:
rr, br = returns.align(benchmark_returns, join="inner")
if len(rr) > 10:
cov = np.cov(rr, br)[0, 1]
varb = np.var(br)
beta = cov / varb if varb != 0 else np.nan
return {
"total_return": total_return,
"volatility": vol,
"VaR_95": VaR_95,
"CAGR": CAGR,
"max_drawdown": max_dd,
"beta": beta,
}
def sector_from_ticker(t):
"""Fetches sector and industry info for a ticker."""
try:
info = yf.Ticker(t).info
return info.get("sector", "Unknown"), info.get("industry", "Unknown")
except Exception:
return "Unknown", "Unknown"
def format_pct(x):
"""Formats a float as a percentage string."""
if x is None or (isinstance(x, float) and np.isnan(x)):
return "N/A"
return f"{x * 100:.2f}%"
# ---------------- GEMINI AI HELPER ----------------
def generate_gemini_insights(
api_key: str,
crisis_name: str,
metrics_md: str,
extra_instructions: str = "",
) -> str:
"""Call Gemini to get AI-generated insights based on metrics."""
if not api_key:
return "ℹ️ To see AI-generated insights, please paste a valid Gemini API key."
if genai is None:
return "⚠️ google-generativeai is not installed. Run `pip install google-generativeai` and retry."
user_prompt = GEMINI_USER_PROMPT_TEMPLATE.format(
crisis_name=crisis_name,
metrics_text=metrics_md,
extra_instructions=extra_instructions or "None.",
)
try:
genai.configure(api_key=api_key)
model = genai.GenerativeModel(
GEMINI_MODEL_NAME,
system_instruction=GEMINI_SYSTEM_PROMPT.strip(),
generation_config={"max_output_tokens": 256},
)
response = model.generate_content(user_prompt)
text = getattr(response, "text", "") or ""
if not text.strip():
return "⚠️ Gemini did not return any text. Please check your API key, quota, or try again."
return text.strip()
except Exception as e:
return f"⚠️ Gemini call failed: {e}"
# ---------------- SIMULATION ----------------
def run_crisis_simulation(
crisis,
uploaded,
tickers_str,
weights_str,
include_etf,
gemini_api_key="",
gemini_extra_prompt="",
):
"""
The main simulation function.
Takes user inputs, processes the portfolio, fetches data,
and returns all outputs for the Gradio interface.
"""
# --- 1. Parse Portfolio ---
if uploaded is not None:
try:
df = pd.read_csv(uploaded.name if hasattr(uploaded, "name") else uploaded)
except Exception as e:
return (
None,
f"Error reading CSV: {e}",
None,
None,
None,
"",
"No AI insights (CSV error).",
)
else:
try:
tickers = [t.strip() for t in tickers_str.split(",") if t.strip()]
weights = [float(w) for w in weights_str.split(",") if w.strip()]
if not tickers or not weights or len(tickers) != len(weights):
return (
None,
"Error: Mismatch between tickers and weights, or fields are empty.",
None,
None,
None,
"",
"No AI insights (input mismatch).",
)
df = pd.DataFrame({"Ticker": tickers, "Weight": weights})
except ValueError:
return (
None,
"Error: Weights must be numbers.",
None,
None,
None,
"",
"No AI insights (weights error).",
)
if df.empty or "Ticker" not in df or "Weight" not in df:
return (
None,
"Error: Invalid portfolio. Check inputs.",
None,
None,
None,
"",
"No AI insights (invalid portfolio).",
)
df["Ticker"] = df["Ticker"].apply(_ensure_ns_suffix)
# --- 2. Normalize Weights (with ETF logic) ---
try:
if include_etf:
# Scale user's portfolio to 95%
df["Weight"] = (
df["Weight"].astype(float) / df["Weight"].astype(float).sum()
) * 0.95
# Add the 5% ETF
etf_row = pd.DataFrame([{"Ticker": "NIFTYBEES.NS", "Weight": 0.05}])
df = pd.concat([df, etf_row], ignore_index=True)
else:
# Normalize user's portfolio to 100%
df["Weight"] = df["Weight"].astype(float) / df["Weight"].astype(float).sum()
except ZeroDivisionError:
return (
None,
"Error: Portfolio weights sum to zero.",
None,
None,
None,
"",
"No AI insights (weights zero).",
)
# --- 3. Fetch Data ---
start, end = CRISIS_PERIODS[crisis]
recovery_end = pd.to_datetime(end) + pd.Timedelta(days=RECOVERY_DAYS)
tickers = list(df["Ticker"].unique()) + [BENCHMARK_TICKER]
prices = _fetch_prices(tickers, start, recovery_end)
if prices.empty:
return (
None,
"No data found. Some tickers may not exist historically.",
None,
None,
None,
"",
"No AI insights (no data).",
)
# Ensure all required tickers were fetched
fetched_tickers = [c.upper() for c in prices.columns]
required_tickers = [t.upper() for t in df["Ticker"]] + [BENCHMARK_TICKER.upper()]
missing = [t for t in required_tickers if t not in fetched_tickers]
if missing:
return (
None,
f"Error: Could not fetch data for: {', '.join(missing)}",
None,
None,
None,
"",
"No AI insights (missing tickers).",
)
prices.ffill(inplace=True)
crisis_window = prices.loc[start:end]
if BENCHMARK_TICKER not in crisis_window.columns:
return (
None,
f"Error: Could not fetch benchmark {BENCHMARK_NAME} data for this period.",
None,
None,
None,
"",
"No AI insights (benchmark error).",
)
bench = crisis_window[BENCHMARK_TICKER]
# --- 4. Calculate Portfolio Performance ---
df_aligned = df.set_index("Ticker")
df_aligned.index = df_aligned.index.str.upper()
# Filter price columns to only those in our portfolio
portfolio_prices = crisis_window[df_aligned.index]
norm = (portfolio_prices / portfolio_prices.iloc[0]) * 100
weighted = (norm * df_aligned["Weight"]).sum(axis=1)
weighted.name = "Portfolio"
bench_norm = (bench / bench.iloc[0]) * 100
port_m = calc_metrics(weighted, bench.pct_change())
bench_m = calc_metrics(bench_norm)
# --- 5. Generate Outputs (Metrics Table) ---
beta_val = port_m["beta"]
if beta_val is None or (isinstance(beta_val, float) and np.isnan(beta_val)):
beta_str = "N/A"
else:
beta_str = f"{beta_val:.2f}"
metrics_md = f"""### Simulation: {crisis}
| Metric | Portfolio | {BENCHMARK_NAME} |
|:---|---:|---:|
| **Total Return** | **{format_pct(port_m['total_return'])}** | **{format_pct(bench_m['total_return'])}** |
| Max Drawdown | {format_pct(port_m['max_drawdown'])} | {format_pct(bench_m['max_drawdown'])} |
| Volatility (Ann.) | {format_pct(port_m['volatility'])} | {format_pct(bench_m['volatility'])} |
| CAGR | {format_pct(port_m['CAGR'])} | {format_pct(bench_m['CAGR'])} |
| Beta | {beta_str} | - |
| VaR (95%, Daily) | {format_pct(port_m['VaR_95'])} | {format_pct(bench_m['VaR_95'])} |
"""
# --- 6. Generate Outputs (Performance Plot) ---
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=weighted.index,
y=weighted.values,
name="Portfolio",
mode="lines",
line=dict(width=3, color="#1E88E5"),
)
)
fig.add_trace(
go.Scatter(
x=bench_norm.index,
y=bench_norm.values,
name=BENCHMARK_NAME,
mode="lines",
line=dict(width=2, color="#FFC107", dash="dot"),
)
)
fig.update_layout(
title=f"<b>{crisis}</b>: Portfolio vs Benchmark Performance",
template="plotly_white",
xaxis_title="Date",
yaxis_title="Normalized Value (Base 100)",
height=450,
legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01),
)
# --- 7. Generate Outputs (Sector Analysis) ---
df["Sector"], df["Industry"] = zip(*df["Ticker"].map(sector_from_ticker))
sector_dd = []
for t in df.Ticker:
if t.upper() in crisis_window.columns:
ser = crisis_window[t.upper()]
dd = (ser / ser.cummax() - 1).min()
sector_dd.append(dd)
else:
sector_dd.append(0) # Ticker wasn't in crisis window
df["Drawdown"] = sector_dd
# Aggregate weighted drawdown by sector
sec_agg = df.groupby("Sector").apply(
lambda d: np.average(d["Drawdown"], weights=d["Weight"] / d["Weight"].sum())
)
sec_agg = sec_agg.sort_values()
sec_fig = px.bar(
sec_agg * 100,
y=sec_agg.index,
x=sec_agg.values,
orientation="h",
title="Weighted Max Drawdown by Sector",
labels={"x": "Max Drawdown (%)", "y": "Sector"},
)
sec_fig.update_layout(
template="plotly_white",
yaxis={"categoryorder": "total ascending"},
)
# --- 8. Generate Outputs (Insights & Pie Chart) ---
ins = [
f"### Insights for: {crisis}",
f"_{CRISIS_SUMMARY.get(crisis, 'No summary available.')}_",
]
if crisis in CRISIS_INSIGHTS:
for s, txt in CRISIS_INSIGHTS[crisis].items():
ins.append(f"- **{s}**: {txt}")
insights_md = "\n".join(ins)
# --- 8. Generate Outputs (Insights & Pie Chart) ---
ins = [
f"### Insights for: {crisis}",
f"_{CRISIS_SUMMARY.get(crisis, 'No summary available.')}_",
]
if crisis in CRISIS_INSIGHTS:
for s, txt in CRISIS_INSIGHTS[crisis].items():
ins.append(f"- **{s}**: {txt}")
insights_md = "\n".join(ins)
# --- 8. Generate Outputs (Insights & Pie Chart) ---
ins = [
f"### Insights for: {crisis}",
f"_{CRISIS_SUMMARY.get(crisis, 'No summary available.')}_",
]
if crisis in CRISIS_INSIGHTS:
for s, txt in CRISIS_INSIGHTS[crisis].items():
ins.append(f"- **{s}**: {txt}")
insights_md = "\n".join(ins)
# --- 8. Generate Outputs (Insights & Pie Chart) ---
ins = [
f"### Insights for: {crisis}",
f"_{CRISIS_SUMMARY.get(crisis, 'No summary available.')}_",
]
if crisis in CRISIS_INSIGHTS:
for s, txt in CRISIS_INSIGHTS[crisis].items():
ins.append(f"- **{s}**: {txt}")
insights_md = "\n".join(ins)
# --- Pie chart: final portfolio weights (including ETF if added) ---
pie_df = df[["Ticker", "Weight"]].copy()
pie_df["Ticker"] = pie_df["Ticker"].astype(str)
pie_df["Weight"] = pd.to_numeric(pie_df["Weight"], errors="raise")
wsum = pie_df["Weight"].sum()
if wsum <= 0:
raise ValueError(f"Pie chart error: portfolio weights sum to {wsum}.")
pie_df["Weight"] = pie_df["Weight"] / wsum
print("DEBUG pie_df for pie chart:\n", pie_df)
print("DEBUG weight sum:", pie_df["Weight"].sum())
# Matplotlib pie chart
fig_pie, ax = plt.subplots(figsize=(4, 4))
ax.pie(
pie_df["Weight"].values,
labels=pie_df["Ticker"].values,
autopct="%1.1f%%",
startangle=90,
)
ax.set_title("Final Portfolio Allocation")
ax.axis("equal")
# --- 9. Logs & AI Insights ---
log_message = f"✅ Simulation Complete. Received weights: '{weights_str}'"
gemini_insights = generate_gemini_insights(
api_key=gemini_api_key or "",
crisis_name=crisis,
metrics_md=metrics_md,
extra_instructions=gemini_extra_prompt or "",
)
return fig, metrics_md, sec_fig, insights_md, fig_pie, log_message, gemini_insights