AnalyzrAI / apps /copilot /financial_models.py
thejagstudio's picture
Upload 92 files
0310410 verified
"""Pre-built statistical / quantitative models for financial analysis.
Every function fetches its own data via yfinance and returns a
structured dict with:
- ticker, model, period
- metrics : key-value pairs for KPIs
- signals : list of actionable alerts
- chart_data: list of dicts ready for Recharts
- interpretation: 1-3 sentence plain-English summary
"""
from __future__ import annotations
import logging
import math
from typing import Any
import numpy as np
logger = logging.getLogger(__name__)
# ------------------------------------------------------------------ #
# Helpers #
# ------------------------------------------------------------------ #
def _get_history(ticker: str, period: str = "1y", interval: str = "1d"):
import yfinance as yf
return yf.Ticker(ticker).history(period=period, interval=interval)
def _dates(df) -> list[str]:
return [d.strftime("%Y-%m-%d") for d in df.index]
def _safe(v):
if v is None or (isinstance(v, float) and (math.isnan(v) or math.isinf(v))):
return None
if isinstance(v, (np.floating, np.integer)):
return round(float(v), 4)
if isinstance(v, float):
return round(v, 4)
return v
# ------------------------------------------------------------------ #
# 1. Moving Averages & Crossovers #
# ------------------------------------------------------------------ #
def moving_averages(ticker: str, period: str = "1y") -> dict[str, Any]:
df = _get_history(ticker, period)
if df.empty:
return {"error": f"No data for {ticker}", "model": "moving_averages"}
closes = df["Close"].values
dates = _dates(df)
def _sma(arr, w):
out = np.full(len(arr), np.nan)
for i in range(w - 1, len(arr)):
out[i] = np.mean(arr[i - w + 1 : i + 1])
return out
def _ema(arr, span):
alpha = 2 / (span + 1)
out = np.empty(len(arr))
out[0] = arr[0]
for i in range(1, len(arr)):
out[i] = alpha * arr[i] + (1 - alpha) * out[i - 1]
return out
sma20 = _sma(closes, 20)
sma50 = _sma(closes, 50)
sma200 = _sma(closes, 200)
ema12 = _ema(closes, 12)
ema26 = _ema(closes, 26)
signals = []
if len(closes) >= 50 and sma20[-1] > sma50[-1] and sma20[-2] <= sma50[-2]:
signals.append("Golden crossover: SMA-20 just crossed above SMA-50 (bullish)")
if len(closes) >= 50 and sma20[-1] < sma50[-1] and sma20[-2] >= sma50[-2]:
signals.append("Death cross: SMA-20 just crossed below SMA-50 (bearish)")
if closes[-1] > sma200[-1] if not np.isnan(sma200[-1]) else False:
signals.append("Price is above SMA-200 (long-term bullish)")
elif len(closes) >= 200:
signals.append("Price is below SMA-200 (long-term bearish)")
if not signals:
signals.append("No crossover signals detected in the current period")
chart = []
for i in range(len(dates)):
chart.append({
"date": dates[i],
"close": _safe(closes[i]),
"SMA20": _safe(sma20[i]),
"SMA50": _safe(sma50[i]),
})
return {
"ticker": ticker, "model": "moving_averages", "period": period,
"metrics": {
"current_price": _safe(closes[-1]),
"sma_20": _safe(sma20[-1]),
"sma_50": _safe(sma50[-1]),
"sma_200": _safe(sma200[-1]) if len(closes) >= 200 else None,
"ema_12": _safe(ema12[-1]),
"ema_26": _safe(ema26[-1]),
},
"signals": signals,
"chart_data": chart[-60:],
"interpretation": (
f"{ticker} is trading at ${closes[-1]:.2f}. "
f"SMA-20=${sma20[-1]:.2f}, SMA-50=${sma50[-1]:.2f}. "
+ signals[0]
),
}
# ------------------------------------------------------------------ #
# 2. RSI (Relative Strength Index) #
# ------------------------------------------------------------------ #
def rsi_analysis(ticker: str, period: str = "6mo", rsi_period: int = 14) -> dict[str, Any]:
df = _get_history(ticker, period)
if df.empty:
return {"error": f"No data for {ticker}", "model": "rsi"}
closes = df["Close"].values
dates = _dates(df)
deltas = np.diff(closes)
gains = np.where(deltas > 0, deltas, 0.0)
losses = np.where(deltas < 0, -deltas, 0.0)
rsi_vals = np.full(len(closes), np.nan)
if len(gains) >= rsi_period:
avg_gain = np.mean(gains[:rsi_period])
avg_loss = np.mean(losses[:rsi_period])
for i in range(rsi_period, len(deltas)):
avg_gain = (avg_gain * (rsi_period - 1) + gains[i]) / rsi_period
avg_loss = (avg_loss * (rsi_period - 1) + losses[i]) / rsi_period
rs = avg_gain / avg_loss if avg_loss != 0 else 100
rsi_vals[i + 1] = 100 - (100 / (1 + rs))
current_rsi = _safe(rsi_vals[-1])
signals = []
if current_rsi is not None:
if current_rsi > 70:
signals.append(f"RSI at {current_rsi:.1f} — OVERBOUGHT territory (>70)")
elif current_rsi < 30:
signals.append(f"RSI at {current_rsi:.1f} — OVERSOLD territory (<30)")
else:
signals.append(f"RSI at {current_rsi:.1f} — neutral range")
chart = [{"date": dates[i], "close": _safe(closes[i]), "RSI": _safe(rsi_vals[i])} for i in range(len(dates))]
return {
"ticker": ticker, "model": "rsi", "period": period,
"metrics": {"rsi": current_rsi, "rsi_period": rsi_period},
"signals": signals,
"chart_data": chart[-60:],
"interpretation": f"{ticker} RSI({rsi_period}) = {current_rsi}. {signals[0] if signals else ''}",
}
# ------------------------------------------------------------------ #
# 3. MACD #
# ------------------------------------------------------------------ #
def macd_analysis(ticker: str, period: str = "1y") -> dict[str, Any]:
df = _get_history(ticker, period)
if df.empty:
return {"error": f"No data for {ticker}", "model": "macd"}
closes = df["Close"].values
dates = _dates(df)
def _ema(arr, span):
alpha = 2 / (span + 1)
out = np.empty(len(arr))
out[0] = arr[0]
for i in range(1, len(arr)):
out[i] = alpha * arr[i] + (1 - alpha) * out[i - 1]
return out
ema12 = _ema(closes, 12)
ema26 = _ema(closes, 26)
macd_line = ema12 - ema26
signal_line = _ema(macd_line, 9)
histogram = macd_line - signal_line
signals = []
if macd_line[-1] > signal_line[-1] and macd_line[-2] <= signal_line[-2]:
signals.append("MACD bullish crossover — buy signal")
elif macd_line[-1] < signal_line[-1] and macd_line[-2] >= signal_line[-2]:
signals.append("MACD bearish crossover — sell signal")
if macd_line[-1] > 0:
signals.append("MACD is positive — bullish momentum")
else:
signals.append("MACD is negative — bearish momentum")
chart = [{
"date": dates[i], "MACD": _safe(macd_line[i]),
"Signal": _safe(signal_line[i]), "Histogram": _safe(histogram[i]),
} for i in range(len(dates))]
return {
"ticker": ticker, "model": "macd", "period": period,
"metrics": {
"macd": _safe(macd_line[-1]),
"signal": _safe(signal_line[-1]),
"histogram": _safe(histogram[-1]),
},
"signals": signals,
"chart_data": chart[-60:],
"interpretation": (
f"{ticker} MACD={macd_line[-1]:.4f}, Signal={signal_line[-1]:.4f}. "
+ signals[0]
),
}
# ------------------------------------------------------------------ #
# 4. Bollinger Bands #
# ------------------------------------------------------------------ #
def bollinger_bands(ticker: str, period: str = "6mo", window: int = 20, num_std: int = 2) -> dict[str, Any]:
df = _get_history(ticker, period)
if df.empty:
return {"error": f"No data for {ticker}", "model": "bollinger_bands"}
closes = df["Close"].values
dates = _dates(df)
mid = np.full(len(closes), np.nan)
upper = np.full(len(closes), np.nan)
lower = np.full(len(closes), np.nan)
bw = np.full(len(closes), np.nan)
for i in range(window - 1, len(closes)):
seg = closes[i - window + 1 : i + 1]
m = np.mean(seg)
s = np.std(seg, ddof=1)
mid[i] = m
upper[i] = m + num_std * s
lower[i] = m - num_std * s
bw[i] = (upper[i] - lower[i]) / m * 100 if m != 0 else 0
signals = []
if closes[-1] > upper[-1] and not np.isnan(upper[-1]):
signals.append("Price ABOVE upper band — potential overbought / breakout")
elif closes[-1] < lower[-1] and not np.isnan(lower[-1]):
signals.append("Price BELOW lower band — potential oversold / breakdown")
else:
signals.append("Price within Bollinger Bands — no extreme signal")
if not np.isnan(bw[-1]) and bw[-1] < 5:
signals.append("Bollinger squeeze detected — low volatility, expect breakout")
chart = [{
"date": dates[i], "close": _safe(closes[i]),
"upper": _safe(upper[i]), "mid": _safe(mid[i]), "lower": _safe(lower[i]),
} for i in range(len(dates))]
return {
"ticker": ticker, "model": "bollinger_bands", "period": period,
"metrics": {
"current_price": _safe(closes[-1]),
"upper_band": _safe(upper[-1]),
"middle_band": _safe(mid[-1]),
"lower_band": _safe(lower[-1]),
"bandwidth_pct": _safe(bw[-1]),
},
"signals": signals,
"chart_data": chart[-60:],
"interpretation": f"{ticker} BB({window},{num_std}): Price ${closes[-1]:.2f}, bands ${lower[-1]:.2f}-${upper[-1]:.2f}. {signals[0]}",
}
# ------------------------------------------------------------------ #
# 5. Monte Carlo Price Forecast #
# ------------------------------------------------------------------ #
def monte_carlo_forecast(ticker: str, days: int = 30, simulations: int = 1000, period: str = "1y",
drift_adjustment: float = 0.0) -> dict[str, Any]:
df = _get_history(ticker, period)
if df.empty or len(df) < 30:
return {"error": f"Insufficient data for {ticker}", "model": "monte_carlo"}
closes = df["Close"].values
log_returns = np.diff(np.log(closes))
mu = np.mean(log_returns) + drift_adjustment
sigma = np.std(log_returns, ddof=1)
last_price = closes[-1]
rng = np.random.default_rng(42)
sims = np.zeros((simulations, days))
for s in range(simulations):
price = last_price
for d in range(days):
price *= np.exp(mu + sigma * rng.standard_normal())
sims[s, d] = price
p5 = np.percentile(sims, 5, axis=0)
p25 = np.percentile(sims, 25, axis=0)
median = np.median(sims, axis=0)
p75 = np.percentile(sims, 75, axis=0)
p95 = np.percentile(sims, 95, axis=0)
chart = [{"day": i + 1, "p5": _safe(p5[i]), "p25": _safe(p25[i]),
"median": _safe(median[i]), "p75": _safe(p75[i]), "p95": _safe(p95[i])} for i in range(days)]
final = sims[:, -1]
prob_up = float(np.mean(final > last_price) * 100)
return {
"ticker": ticker, "model": "monte_carlo", "period": period,
"metrics": {
"current_price": _safe(last_price),
"forecast_median": _safe(median[-1]),
"forecast_p5": _safe(p5[-1]),
"forecast_p95": _safe(p95[-1]),
"probability_up_pct": round(prob_up, 1),
"daily_mu": _safe(mu),
"daily_sigma": _safe(sigma),
"simulations": simulations,
"horizon_days": days,
},
"signals": [
f"{prob_up:.0f}% probability price rises over {days} days",
f"Median forecast: ${median[-1]:.2f} (range ${p5[-1]:.2f} – ${p95[-1]:.2f})",
],
"chart_data": chart,
"interpretation": (
f"{ticker} Monte Carlo ({simulations} sims, {days}d): "
f"median ${median[-1]:.2f}, 90% range ${p5[-1]:.2f}–${p95[-1]:.2f}. "
f"{prob_up:.0f}% chance of finishing higher."
),
}
# ------------------------------------------------------------------ #
# 6. DCF Valuation (simplified) #
# ------------------------------------------------------------------ #
def dcf_valuation(ticker: str, growth_rate: float = 0.08, discount_rate: float = 0.10,
terminal_growth: float = 0.03, projection_years: int = 5,
period: str | None = None) -> dict[str, Any]:
try:
import yfinance as yf
stock = yf.Ticker(ticker)
info = stock.info
cf = stock.cashflow
except Exception as exc:
return {"error": str(exc), "model": "dcf"}
fcf = None
if cf is not None and not cf.empty:
for label in ["Free Cash Flow", "FreeCashFlow"]:
if label in cf.index:
vals = cf.loc[label].dropna()
if not vals.empty:
fcf = float(vals.iloc[0])
break
if fcf is None:
fcf = info.get("freeCashflow")
if fcf is None or fcf <= 0:
return {"error": f"No positive FCF found for {ticker}", "model": "dcf"}
shares = info.get("sharesOutstanding", 1)
current_price = info.get("currentPrice") or info.get("regularMarketPrice") or 0
projected = []
cf_val = fcf
pv_total = 0.0
for yr in range(1, projection_years + 1):
cf_val *= (1 + growth_rate)
pv = cf_val / (1 + discount_rate) ** yr
pv_total += pv
projected.append({"year": yr, "fcf": _safe(cf_val), "pv": _safe(pv)})
terminal_value = cf_val * (1 + terminal_growth) / (discount_rate - terminal_growth)
pv_terminal = terminal_value / (1 + discount_rate) ** projection_years
enterprise_value = pv_total + pv_terminal
intrinsic_per_share = enterprise_value / shares if shares else 0
upside = ((intrinsic_per_share / current_price) - 1) * 100 if current_price else 0
signals = []
if upside > 20:
signals.append(f"Stock appears UNDERVALUED by {upside:.1f}% — potential buy")
elif upside < -20:
signals.append(f"Stock appears OVERVALUED by {abs(upside):.1f}% — caution")
else:
signals.append(f"Stock is FAIRLY VALUED (upside/downside {upside:+.1f}%)")
return {
"ticker": ticker, "model": "dcf", "period": f"{projection_years}y projection",
"metrics": {
"base_fcf": _safe(fcf),
"intrinsic_value_per_share": _safe(intrinsic_per_share),
"current_price": _safe(current_price),
"upside_pct": _safe(upside),
"enterprise_value": _safe(enterprise_value),
"pv_terminal": _safe(pv_terminal),
"growth_rate": growth_rate,
"discount_rate": discount_rate,
},
"signals": signals,
"chart_data": projected,
"interpretation": (
f"{ticker} DCF intrinsic value: ${intrinsic_per_share:.2f} vs market ${current_price:.2f} "
f"({upside:+.1f}%). {signals[0]}"
),
}
# ------------------------------------------------------------------ #
# 7. Beta & CAPM #
# ------------------------------------------------------------------ #
def beta_capm(ticker: str, benchmark: str = "SPY", period: str = "2y",
risk_free_rate: float = 0.05) -> dict[str, Any]:
try:
import yfinance as yf
stock_df = yf.Ticker(ticker).history(period=period)
bench_df = yf.Ticker(benchmark).history(period=period)
except Exception as exc:
return {"error": str(exc), "model": "beta_capm"}
if stock_df.empty or bench_df.empty:
return {"error": "Insufficient data", "model": "beta_capm"}
common_dates = stock_df.index.intersection(bench_df.index)
if len(common_dates) < 60:
return {"error": "Not enough overlapping dates", "model": "beta_capm"}
s = stock_df.loc[common_dates, "Close"].values
b = bench_df.loc[common_dates, "Close"].values
sr = np.diff(s) / s[:-1]
br = np.diff(b) / b[:-1]
cov = np.cov(sr, br)
beta = cov[0, 1] / cov[1, 1] if cov[1, 1] != 0 else 1.0
market_return = float(np.mean(br)) * 252
expected_return = risk_free_rate + beta * (market_return - risk_free_rate)
alpha = (float(np.mean(sr)) * 252) - expected_return
signals = []
if beta > 1.2:
signals.append(f"High beta ({beta:.2f}) — more volatile than market")
elif beta < 0.8:
signals.append(f"Low beta ({beta:.2f}) — defensive / less volatile")
else:
signals.append(f"Beta ({beta:.2f}) — moves roughly with the market")
if alpha > 0:
signals.append(f"Positive alpha ({alpha:.2%}) — outperforming CAPM expectation")
else:
signals.append(f"Negative alpha ({alpha:.2%}) — underperforming CAPM expectation")
chart = [{"date": common_dates[i + 1].strftime("%Y-%m-%d"),
"stock_return": _safe(sr[i] * 100), "benchmark_return": _safe(br[i] * 100)}
for i in range(0, len(sr), max(1, len(sr) // 60))]
return {
"ticker": ticker, "model": "beta_capm", "period": period,
"metrics": {
"beta": _safe(beta),
"alpha_annual": _safe(alpha),
"capm_expected_return": _safe(expected_return),
"market_return_annual": _safe(market_return),
"risk_free_rate": risk_free_rate,
"benchmark": benchmark,
},
"signals": signals,
"chart_data": chart,
"interpretation": f"{ticker} beta={beta:.2f} vs {benchmark}. CAPM expected return {expected_return:.2%}. {signals[0]}",
}
# ------------------------------------------------------------------ #
# 8. Sharpe / Risk Metrics #
# ------------------------------------------------------------------ #
def sharpe_risk_metrics(ticker: str, period: str = "1y", risk_free_rate: float = 0.05) -> dict[str, Any]:
df = _get_history(ticker, period)
if df.empty or len(df) < 30:
return {"error": f"Insufficient data for {ticker}", "model": "risk_metrics"}
closes = df["Close"].values
dates = _dates(df)
daily_returns = np.diff(closes) / closes[:-1]
ann_return = float(np.mean(daily_returns)) * 252
ann_vol = float(np.std(daily_returns, ddof=1)) * np.sqrt(252)
sharpe = (ann_return - risk_free_rate) / ann_vol if ann_vol != 0 else 0
downside = daily_returns[daily_returns < 0]
downside_vol = float(np.std(downside, ddof=1)) * np.sqrt(252) if len(downside) > 1 else ann_vol
sortino = (ann_return - risk_free_rate) / downside_vol if downside_vol != 0 else 0
cumulative = np.cumprod(1 + daily_returns)
running_max = np.maximum.accumulate(cumulative)
drawdowns = (cumulative - running_max) / running_max
max_dd = float(np.min(drawdowns)) * 100
sorted_ret = np.sort(daily_returns)
var_95 = float(sorted_ret[int(0.05 * len(sorted_ret))]) * 100
cvar_95 = float(np.mean(sorted_ret[: int(0.05 * len(sorted_ret)) + 1])) * 100
signals = []
if sharpe > 1:
signals.append(f"Strong risk-adjusted returns (Sharpe {sharpe:.2f})")
elif sharpe > 0:
signals.append(f"Positive but modest risk-adjusted returns (Sharpe {sharpe:.2f})")
else:
signals.append(f"Negative risk-adjusted returns (Sharpe {sharpe:.2f})")
if max_dd < -20:
signals.append(f"Significant drawdown risk: max drawdown {max_dd:.1f}%")
chart = [{"date": dates[i + 1], "cumulative_return": _safe((cumulative[i] - 1) * 100),
"drawdown": _safe(drawdowns[i] * 100)} for i in range(len(cumulative))]
return {
"ticker": ticker, "model": "risk_metrics", "period": period,
"metrics": {
"annual_return": _safe(ann_return),
"annual_volatility": _safe(ann_vol),
"sharpe_ratio": _safe(sharpe),
"sortino_ratio": _safe(sortino),
"max_drawdown_pct": _safe(max_dd),
"var_95_daily_pct": _safe(var_95),
"cvar_95_daily_pct": _safe(cvar_95),
},
"signals": signals,
"chart_data": chart[-60:],
"interpretation": (
f"{ticker} Sharpe={sharpe:.2f}, Sortino={sortino:.2f}, "
f"annual vol {ann_vol:.1%}, max drawdown {max_dd:.1f}%. {signals[0]}"
),
}
# ------------------------------------------------------------------ #
# 9. Correlation Matrix #
# ------------------------------------------------------------------ #
def correlation_matrix(tickers: list[str], period: str = "1y") -> dict[str, Any]:
import yfinance as yf
if len(tickers) < 2:
return {"error": "Need at least 2 tickers", "model": "correlation"}
tickers = tickers[:8]
returns_map: dict[str, np.ndarray] = {}
common_len = None
for tkr in tickers:
try:
df = yf.Ticker(tkr).history(period=period)
if df.empty or len(df) < 30:
continue
r = np.diff(df["Close"].values) / df["Close"].values[:-1]
returns_map[tkr] = r
common_len = len(r) if common_len is None else min(common_len, len(r))
except Exception:
continue
valid_tickers = list(returns_map.keys())
if len(valid_tickers) < 2:
return {"error": "Could not fetch data for enough tickers", "model": "correlation"}
matrix_data = np.column_stack([returns_map[t][-common_len:] for t in valid_tickers])
corr = np.corrcoef(matrix_data, rowvar=False)
chart = []
for i, t1 in enumerate(valid_tickers):
for j, t2 in enumerate(valid_tickers):
chart.append({"ticker1": t1, "ticker2": t2, "correlation": _safe(corr[i, j])})
table_rows = []
for i, t1 in enumerate(valid_tickers):
row = [t1] + [str(_safe(corr[i, j])) for j in range(len(valid_tickers))]
table_rows.append(row)
signals = []
for i in range(len(valid_tickers)):
for j in range(i + 1, len(valid_tickers)):
c = corr[i, j]
if c > 0.8:
signals.append(f"{valid_tickers[i]}/{valid_tickers[j]} highly correlated ({c:.2f})")
elif c < -0.3:
signals.append(f"{valid_tickers[i]}/{valid_tickers[j]} negatively correlated ({c:.2f}) — diversification benefit")
if not signals:
signals.append("No extreme correlations detected")
return {
"tickers": valid_tickers, "model": "correlation", "period": period,
"metrics": {f"{valid_tickers[i]}_{valid_tickers[j]}": _safe(corr[i, j])
for i in range(len(valid_tickers)) for j in range(i + 1, len(valid_tickers))},
"signals": signals,
"chart_data": chart,
"table": {"columns": [""] + valid_tickers, "rows": table_rows},
"interpretation": f"Correlation matrix for {', '.join(valid_tickers)} over {period}. {signals[0]}",
}
# ------------------------------------------------------------------ #
# 10. Linear Regression Trend #
# ------------------------------------------------------------------ #
def linear_regression_trend(ticker: str, period: str = "1y", forecast_days: int = 30) -> dict[str, Any]:
df = _get_history(ticker, period)
if df.empty or len(df) < 30:
return {"error": f"Insufficient data for {ticker}", "model": "regression"}
closes = df["Close"].values
dates = _dates(df)
n = len(closes)
x = np.arange(n, dtype=float)
y = closes.astype(float)
x_mean, y_mean = np.mean(x), np.mean(y)
ss_xy = np.sum((x - x_mean) * (y - y_mean))
ss_xx = np.sum((x - x_mean) ** 2)
slope = ss_xy / ss_xx if ss_xx != 0 else 0
intercept = y_mean - slope * x_mean
y_pred = slope * x + intercept
ss_res = np.sum((y - y_pred) ** 2)
ss_tot = np.sum((y - y_mean) ** 2)
r_squared = 1 - ss_res / ss_tot if ss_tot != 0 else 0
forecast_x = np.arange(n, n + forecast_days)
forecast_y = slope * forecast_x + intercept
daily_pct = (slope / closes[-1]) * 100 if closes[-1] != 0 else 0
signals = []
if slope > 0:
signals.append(f"Upward trend: +${slope:.4f}/day ({daily_pct:.3f}%/day)")
else:
signals.append(f"Downward trend: ${slope:.4f}/day ({daily_pct:.3f}%/day)")
if r_squared > 0.7:
signals.append(f"Strong trend fit (R²={r_squared:.3f})")
elif r_squared > 0.4:
signals.append(f"Moderate trend fit (R²={r_squared:.3f})")
else:
signals.append(f"Weak trend fit (R²={r_squared:.3f}) — price is choppy")
chart = [{"date": dates[i], "close": _safe(closes[i]), "trend": _safe(y_pred[i])} for i in range(n)]
for i in range(forecast_days):
chart.append({"date": f"D+{i + 1}", "close": None, "trend": _safe(forecast_y[i])})
return {
"ticker": ticker, "model": "regression", "period": period,
"metrics": {
"slope_per_day": _safe(slope),
"daily_pct_change": _safe(daily_pct),
"intercept": _safe(intercept),
"r_squared": _safe(r_squared),
"forecast_price": _safe(forecast_y[-1]),
"forecast_days": forecast_days,
},
"signals": signals,
"chart_data": chart[-90:],
"interpretation": (
f"{ticker} linear trend: slope ${slope:.4f}/day, R²={r_squared:.3f}. "
f"{forecast_days}-day forecast: ${forecast_y[-1]:.2f}. {signals[0]}"
),
}
# ------------------------------------------------------------------ #
# Registry #
# ------------------------------------------------------------------ #
ALL_MODELS = {
"moving_averages": moving_averages,
"rsi": rsi_analysis,
"macd": macd_analysis,
"bollinger_bands": bollinger_bands,
"monte_carlo": monte_carlo_forecast,
"dcf": dcf_valuation,
"beta_capm": beta_capm,
"risk_metrics": sharpe_risk_metrics,
"correlation": correlation_matrix,
"regression": linear_regression_trend,
}
def run_model(model_name: str, ticker: str, **kwargs) -> dict[str, Any]:
"""Run a named model. Returns error dict if model unknown."""
fn = ALL_MODELS.get(model_name)
if fn is None:
return {"error": f"Unknown model: {model_name}. Available: {', '.join(ALL_MODELS)}", "model": model_name}
try:
if model_name == "correlation":
tickers = kwargs.pop("tickers", [ticker])
return fn(tickers, **kwargs)
return fn(ticker, **kwargs)
except Exception as exc:
logger.error("Model %s failed for %s: %s", model_name, ticker, exc)
return {"error": str(exc), "model": model_name, "ticker": ticker}