Spaces:
Sleeping
Sleeping
| """Pre-built statistical / quantitative models for financial analysis. | |
| Every function fetches its own data via yfinance and returns a | |
| structured dict with: | |
| - ticker, model, period | |
| - metrics : key-value pairs for KPIs | |
| - signals : list of actionable alerts | |
| - chart_data: list of dicts ready for Recharts | |
| - interpretation: 1-3 sentence plain-English summary | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import math | |
| from typing import Any | |
| import numpy as np | |
| logger = logging.getLogger(__name__) | |
| # ------------------------------------------------------------------ # | |
| # Helpers # | |
| # ------------------------------------------------------------------ # | |
| def _get_history(ticker: str, period: str = "1y", interval: str = "1d"): | |
| import yfinance as yf | |
| return yf.Ticker(ticker).history(period=period, interval=interval) | |
| def _dates(df) -> list[str]: | |
| return [d.strftime("%Y-%m-%d") for d in df.index] | |
| def _safe(v): | |
| if v is None or (isinstance(v, float) and (math.isnan(v) or math.isinf(v))): | |
| return None | |
| if isinstance(v, (np.floating, np.integer)): | |
| return round(float(v), 4) | |
| if isinstance(v, float): | |
| return round(v, 4) | |
| return v | |
| # ------------------------------------------------------------------ # | |
| # 1. Moving Averages & Crossovers # | |
| # ------------------------------------------------------------------ # | |
| def moving_averages(ticker: str, period: str = "1y") -> dict[str, Any]: | |
| df = _get_history(ticker, period) | |
| if df.empty: | |
| return {"error": f"No data for {ticker}", "model": "moving_averages"} | |
| closes = df["Close"].values | |
| dates = _dates(df) | |
| def _sma(arr, w): | |
| out = np.full(len(arr), np.nan) | |
| for i in range(w - 1, len(arr)): | |
| out[i] = np.mean(arr[i - w + 1 : i + 1]) | |
| return out | |
| def _ema(arr, span): | |
| alpha = 2 / (span + 1) | |
| out = np.empty(len(arr)) | |
| out[0] = arr[0] | |
| for i in range(1, len(arr)): | |
| out[i] = alpha * arr[i] + (1 - alpha) * out[i - 1] | |
| return out | |
| sma20 = _sma(closes, 20) | |
| sma50 = _sma(closes, 50) | |
| sma200 = _sma(closes, 200) | |
| ema12 = _ema(closes, 12) | |
| ema26 = _ema(closes, 26) | |
| signals = [] | |
| if len(closes) >= 50 and sma20[-1] > sma50[-1] and sma20[-2] <= sma50[-2]: | |
| signals.append("Golden crossover: SMA-20 just crossed above SMA-50 (bullish)") | |
| if len(closes) >= 50 and sma20[-1] < sma50[-1] and sma20[-2] >= sma50[-2]: | |
| signals.append("Death cross: SMA-20 just crossed below SMA-50 (bearish)") | |
| if closes[-1] > sma200[-1] if not np.isnan(sma200[-1]) else False: | |
| signals.append("Price is above SMA-200 (long-term bullish)") | |
| elif len(closes) >= 200: | |
| signals.append("Price is below SMA-200 (long-term bearish)") | |
| if not signals: | |
| signals.append("No crossover signals detected in the current period") | |
| chart = [] | |
| for i in range(len(dates)): | |
| chart.append({ | |
| "date": dates[i], | |
| "close": _safe(closes[i]), | |
| "SMA20": _safe(sma20[i]), | |
| "SMA50": _safe(sma50[i]), | |
| }) | |
| return { | |
| "ticker": ticker, "model": "moving_averages", "period": period, | |
| "metrics": { | |
| "current_price": _safe(closes[-1]), | |
| "sma_20": _safe(sma20[-1]), | |
| "sma_50": _safe(sma50[-1]), | |
| "sma_200": _safe(sma200[-1]) if len(closes) >= 200 else None, | |
| "ema_12": _safe(ema12[-1]), | |
| "ema_26": _safe(ema26[-1]), | |
| }, | |
| "signals": signals, | |
| "chart_data": chart[-60:], | |
| "interpretation": ( | |
| f"{ticker} is trading at ${closes[-1]:.2f}. " | |
| f"SMA-20=${sma20[-1]:.2f}, SMA-50=${sma50[-1]:.2f}. " | |
| + signals[0] | |
| ), | |
| } | |
| # ------------------------------------------------------------------ # | |
| # 2. RSI (Relative Strength Index) # | |
| # ------------------------------------------------------------------ # | |
| def rsi_analysis(ticker: str, period: str = "6mo", rsi_period: int = 14) -> dict[str, Any]: | |
| df = _get_history(ticker, period) | |
| if df.empty: | |
| return {"error": f"No data for {ticker}", "model": "rsi"} | |
| closes = df["Close"].values | |
| dates = _dates(df) | |
| deltas = np.diff(closes) | |
| gains = np.where(deltas > 0, deltas, 0.0) | |
| losses = np.where(deltas < 0, -deltas, 0.0) | |
| rsi_vals = np.full(len(closes), np.nan) | |
| if len(gains) >= rsi_period: | |
| avg_gain = np.mean(gains[:rsi_period]) | |
| avg_loss = np.mean(losses[:rsi_period]) | |
| for i in range(rsi_period, len(deltas)): | |
| avg_gain = (avg_gain * (rsi_period - 1) + gains[i]) / rsi_period | |
| avg_loss = (avg_loss * (rsi_period - 1) + losses[i]) / rsi_period | |
| rs = avg_gain / avg_loss if avg_loss != 0 else 100 | |
| rsi_vals[i + 1] = 100 - (100 / (1 + rs)) | |
| current_rsi = _safe(rsi_vals[-1]) | |
| signals = [] | |
| if current_rsi is not None: | |
| if current_rsi > 70: | |
| signals.append(f"RSI at {current_rsi:.1f} — OVERBOUGHT territory (>70)") | |
| elif current_rsi < 30: | |
| signals.append(f"RSI at {current_rsi:.1f} — OVERSOLD territory (<30)") | |
| else: | |
| signals.append(f"RSI at {current_rsi:.1f} — neutral range") | |
| chart = [{"date": dates[i], "close": _safe(closes[i]), "RSI": _safe(rsi_vals[i])} for i in range(len(dates))] | |
| return { | |
| "ticker": ticker, "model": "rsi", "period": period, | |
| "metrics": {"rsi": current_rsi, "rsi_period": rsi_period}, | |
| "signals": signals, | |
| "chart_data": chart[-60:], | |
| "interpretation": f"{ticker} RSI({rsi_period}) = {current_rsi}. {signals[0] if signals else ''}", | |
| } | |
| # ------------------------------------------------------------------ # | |
| # 3. MACD # | |
| # ------------------------------------------------------------------ # | |
| def macd_analysis(ticker: str, period: str = "1y") -> dict[str, Any]: | |
| df = _get_history(ticker, period) | |
| if df.empty: | |
| return {"error": f"No data for {ticker}", "model": "macd"} | |
| closes = df["Close"].values | |
| dates = _dates(df) | |
| def _ema(arr, span): | |
| alpha = 2 / (span + 1) | |
| out = np.empty(len(arr)) | |
| out[0] = arr[0] | |
| for i in range(1, len(arr)): | |
| out[i] = alpha * arr[i] + (1 - alpha) * out[i - 1] | |
| return out | |
| ema12 = _ema(closes, 12) | |
| ema26 = _ema(closes, 26) | |
| macd_line = ema12 - ema26 | |
| signal_line = _ema(macd_line, 9) | |
| histogram = macd_line - signal_line | |
| signals = [] | |
| if macd_line[-1] > signal_line[-1] and macd_line[-2] <= signal_line[-2]: | |
| signals.append("MACD bullish crossover — buy signal") | |
| elif macd_line[-1] < signal_line[-1] and macd_line[-2] >= signal_line[-2]: | |
| signals.append("MACD bearish crossover — sell signal") | |
| if macd_line[-1] > 0: | |
| signals.append("MACD is positive — bullish momentum") | |
| else: | |
| signals.append("MACD is negative — bearish momentum") | |
| chart = [{ | |
| "date": dates[i], "MACD": _safe(macd_line[i]), | |
| "Signal": _safe(signal_line[i]), "Histogram": _safe(histogram[i]), | |
| } for i in range(len(dates))] | |
| return { | |
| "ticker": ticker, "model": "macd", "period": period, | |
| "metrics": { | |
| "macd": _safe(macd_line[-1]), | |
| "signal": _safe(signal_line[-1]), | |
| "histogram": _safe(histogram[-1]), | |
| }, | |
| "signals": signals, | |
| "chart_data": chart[-60:], | |
| "interpretation": ( | |
| f"{ticker} MACD={macd_line[-1]:.4f}, Signal={signal_line[-1]:.4f}. " | |
| + signals[0] | |
| ), | |
| } | |
| # ------------------------------------------------------------------ # | |
| # 4. Bollinger Bands # | |
| # ------------------------------------------------------------------ # | |
| def bollinger_bands(ticker: str, period: str = "6mo", window: int = 20, num_std: int = 2) -> dict[str, Any]: | |
| df = _get_history(ticker, period) | |
| if df.empty: | |
| return {"error": f"No data for {ticker}", "model": "bollinger_bands"} | |
| closes = df["Close"].values | |
| dates = _dates(df) | |
| mid = np.full(len(closes), np.nan) | |
| upper = np.full(len(closes), np.nan) | |
| lower = np.full(len(closes), np.nan) | |
| bw = np.full(len(closes), np.nan) | |
| for i in range(window - 1, len(closes)): | |
| seg = closes[i - window + 1 : i + 1] | |
| m = np.mean(seg) | |
| s = np.std(seg, ddof=1) | |
| mid[i] = m | |
| upper[i] = m + num_std * s | |
| lower[i] = m - num_std * s | |
| bw[i] = (upper[i] - lower[i]) / m * 100 if m != 0 else 0 | |
| signals = [] | |
| if closes[-1] > upper[-1] and not np.isnan(upper[-1]): | |
| signals.append("Price ABOVE upper band — potential overbought / breakout") | |
| elif closes[-1] < lower[-1] and not np.isnan(lower[-1]): | |
| signals.append("Price BELOW lower band — potential oversold / breakdown") | |
| else: | |
| signals.append("Price within Bollinger Bands — no extreme signal") | |
| if not np.isnan(bw[-1]) and bw[-1] < 5: | |
| signals.append("Bollinger squeeze detected — low volatility, expect breakout") | |
| chart = [{ | |
| "date": dates[i], "close": _safe(closes[i]), | |
| "upper": _safe(upper[i]), "mid": _safe(mid[i]), "lower": _safe(lower[i]), | |
| } for i in range(len(dates))] | |
| return { | |
| "ticker": ticker, "model": "bollinger_bands", "period": period, | |
| "metrics": { | |
| "current_price": _safe(closes[-1]), | |
| "upper_band": _safe(upper[-1]), | |
| "middle_band": _safe(mid[-1]), | |
| "lower_band": _safe(lower[-1]), | |
| "bandwidth_pct": _safe(bw[-1]), | |
| }, | |
| "signals": signals, | |
| "chart_data": chart[-60:], | |
| "interpretation": f"{ticker} BB({window},{num_std}): Price ${closes[-1]:.2f}, bands ${lower[-1]:.2f}-${upper[-1]:.2f}. {signals[0]}", | |
| } | |
| # ------------------------------------------------------------------ # | |
| # 5. Monte Carlo Price Forecast # | |
| # ------------------------------------------------------------------ # | |
| def monte_carlo_forecast(ticker: str, days: int = 30, simulations: int = 1000, period: str = "1y", | |
| drift_adjustment: float = 0.0) -> dict[str, Any]: | |
| df = _get_history(ticker, period) | |
| if df.empty or len(df) < 30: | |
| return {"error": f"Insufficient data for {ticker}", "model": "monte_carlo"} | |
| closes = df["Close"].values | |
| log_returns = np.diff(np.log(closes)) | |
| mu = np.mean(log_returns) + drift_adjustment | |
| sigma = np.std(log_returns, ddof=1) | |
| last_price = closes[-1] | |
| rng = np.random.default_rng(42) | |
| sims = np.zeros((simulations, days)) | |
| for s in range(simulations): | |
| price = last_price | |
| for d in range(days): | |
| price *= np.exp(mu + sigma * rng.standard_normal()) | |
| sims[s, d] = price | |
| p5 = np.percentile(sims, 5, axis=0) | |
| p25 = np.percentile(sims, 25, axis=0) | |
| median = np.median(sims, axis=0) | |
| p75 = np.percentile(sims, 75, axis=0) | |
| p95 = np.percentile(sims, 95, axis=0) | |
| chart = [{"day": i + 1, "p5": _safe(p5[i]), "p25": _safe(p25[i]), | |
| "median": _safe(median[i]), "p75": _safe(p75[i]), "p95": _safe(p95[i])} for i in range(days)] | |
| final = sims[:, -1] | |
| prob_up = float(np.mean(final > last_price) * 100) | |
| return { | |
| "ticker": ticker, "model": "monte_carlo", "period": period, | |
| "metrics": { | |
| "current_price": _safe(last_price), | |
| "forecast_median": _safe(median[-1]), | |
| "forecast_p5": _safe(p5[-1]), | |
| "forecast_p95": _safe(p95[-1]), | |
| "probability_up_pct": round(prob_up, 1), | |
| "daily_mu": _safe(mu), | |
| "daily_sigma": _safe(sigma), | |
| "simulations": simulations, | |
| "horizon_days": days, | |
| }, | |
| "signals": [ | |
| f"{prob_up:.0f}% probability price rises over {days} days", | |
| f"Median forecast: ${median[-1]:.2f} (range ${p5[-1]:.2f} – ${p95[-1]:.2f})", | |
| ], | |
| "chart_data": chart, | |
| "interpretation": ( | |
| f"{ticker} Monte Carlo ({simulations} sims, {days}d): " | |
| f"median ${median[-1]:.2f}, 90% range ${p5[-1]:.2f}–${p95[-1]:.2f}. " | |
| f"{prob_up:.0f}% chance of finishing higher." | |
| ), | |
| } | |
| # ------------------------------------------------------------------ # | |
| # 6. DCF Valuation (simplified) # | |
| # ------------------------------------------------------------------ # | |
| def dcf_valuation(ticker: str, growth_rate: float = 0.08, discount_rate: float = 0.10, | |
| terminal_growth: float = 0.03, projection_years: int = 5, | |
| period: str | None = None) -> dict[str, Any]: | |
| try: | |
| import yfinance as yf | |
| stock = yf.Ticker(ticker) | |
| info = stock.info | |
| cf = stock.cashflow | |
| except Exception as exc: | |
| return {"error": str(exc), "model": "dcf"} | |
| fcf = None | |
| if cf is not None and not cf.empty: | |
| for label in ["Free Cash Flow", "FreeCashFlow"]: | |
| if label in cf.index: | |
| vals = cf.loc[label].dropna() | |
| if not vals.empty: | |
| fcf = float(vals.iloc[0]) | |
| break | |
| if fcf is None: | |
| fcf = info.get("freeCashflow") | |
| if fcf is None or fcf <= 0: | |
| return {"error": f"No positive FCF found for {ticker}", "model": "dcf"} | |
| shares = info.get("sharesOutstanding", 1) | |
| current_price = info.get("currentPrice") or info.get("regularMarketPrice") or 0 | |
| projected = [] | |
| cf_val = fcf | |
| pv_total = 0.0 | |
| for yr in range(1, projection_years + 1): | |
| cf_val *= (1 + growth_rate) | |
| pv = cf_val / (1 + discount_rate) ** yr | |
| pv_total += pv | |
| projected.append({"year": yr, "fcf": _safe(cf_val), "pv": _safe(pv)}) | |
| terminal_value = cf_val * (1 + terminal_growth) / (discount_rate - terminal_growth) | |
| pv_terminal = terminal_value / (1 + discount_rate) ** projection_years | |
| enterprise_value = pv_total + pv_terminal | |
| intrinsic_per_share = enterprise_value / shares if shares else 0 | |
| upside = ((intrinsic_per_share / current_price) - 1) * 100 if current_price else 0 | |
| signals = [] | |
| if upside > 20: | |
| signals.append(f"Stock appears UNDERVALUED by {upside:.1f}% — potential buy") | |
| elif upside < -20: | |
| signals.append(f"Stock appears OVERVALUED by {abs(upside):.1f}% — caution") | |
| else: | |
| signals.append(f"Stock is FAIRLY VALUED (upside/downside {upside:+.1f}%)") | |
| return { | |
| "ticker": ticker, "model": "dcf", "period": f"{projection_years}y projection", | |
| "metrics": { | |
| "base_fcf": _safe(fcf), | |
| "intrinsic_value_per_share": _safe(intrinsic_per_share), | |
| "current_price": _safe(current_price), | |
| "upside_pct": _safe(upside), | |
| "enterprise_value": _safe(enterprise_value), | |
| "pv_terminal": _safe(pv_terminal), | |
| "growth_rate": growth_rate, | |
| "discount_rate": discount_rate, | |
| }, | |
| "signals": signals, | |
| "chart_data": projected, | |
| "interpretation": ( | |
| f"{ticker} DCF intrinsic value: ${intrinsic_per_share:.2f} vs market ${current_price:.2f} " | |
| f"({upside:+.1f}%). {signals[0]}" | |
| ), | |
| } | |
| # ------------------------------------------------------------------ # | |
| # 7. Beta & CAPM # | |
| # ------------------------------------------------------------------ # | |
| def beta_capm(ticker: str, benchmark: str = "SPY", period: str = "2y", | |
| risk_free_rate: float = 0.05) -> dict[str, Any]: | |
| try: | |
| import yfinance as yf | |
| stock_df = yf.Ticker(ticker).history(period=period) | |
| bench_df = yf.Ticker(benchmark).history(period=period) | |
| except Exception as exc: | |
| return {"error": str(exc), "model": "beta_capm"} | |
| if stock_df.empty or bench_df.empty: | |
| return {"error": "Insufficient data", "model": "beta_capm"} | |
| common_dates = stock_df.index.intersection(bench_df.index) | |
| if len(common_dates) < 60: | |
| return {"error": "Not enough overlapping dates", "model": "beta_capm"} | |
| s = stock_df.loc[common_dates, "Close"].values | |
| b = bench_df.loc[common_dates, "Close"].values | |
| sr = np.diff(s) / s[:-1] | |
| br = np.diff(b) / b[:-1] | |
| cov = np.cov(sr, br) | |
| beta = cov[0, 1] / cov[1, 1] if cov[1, 1] != 0 else 1.0 | |
| market_return = float(np.mean(br)) * 252 | |
| expected_return = risk_free_rate + beta * (market_return - risk_free_rate) | |
| alpha = (float(np.mean(sr)) * 252) - expected_return | |
| signals = [] | |
| if beta > 1.2: | |
| signals.append(f"High beta ({beta:.2f}) — more volatile than market") | |
| elif beta < 0.8: | |
| signals.append(f"Low beta ({beta:.2f}) — defensive / less volatile") | |
| else: | |
| signals.append(f"Beta ({beta:.2f}) — moves roughly with the market") | |
| if alpha > 0: | |
| signals.append(f"Positive alpha ({alpha:.2%}) — outperforming CAPM expectation") | |
| else: | |
| signals.append(f"Negative alpha ({alpha:.2%}) — underperforming CAPM expectation") | |
| chart = [{"date": common_dates[i + 1].strftime("%Y-%m-%d"), | |
| "stock_return": _safe(sr[i] * 100), "benchmark_return": _safe(br[i] * 100)} | |
| for i in range(0, len(sr), max(1, len(sr) // 60))] | |
| return { | |
| "ticker": ticker, "model": "beta_capm", "period": period, | |
| "metrics": { | |
| "beta": _safe(beta), | |
| "alpha_annual": _safe(alpha), | |
| "capm_expected_return": _safe(expected_return), | |
| "market_return_annual": _safe(market_return), | |
| "risk_free_rate": risk_free_rate, | |
| "benchmark": benchmark, | |
| }, | |
| "signals": signals, | |
| "chart_data": chart, | |
| "interpretation": f"{ticker} beta={beta:.2f} vs {benchmark}. CAPM expected return {expected_return:.2%}. {signals[0]}", | |
| } | |
| # ------------------------------------------------------------------ # | |
| # 8. Sharpe / Risk Metrics # | |
| # ------------------------------------------------------------------ # | |
| def sharpe_risk_metrics(ticker: str, period: str = "1y", risk_free_rate: float = 0.05) -> dict[str, Any]: | |
| df = _get_history(ticker, period) | |
| if df.empty or len(df) < 30: | |
| return {"error": f"Insufficient data for {ticker}", "model": "risk_metrics"} | |
| closes = df["Close"].values | |
| dates = _dates(df) | |
| daily_returns = np.diff(closes) / closes[:-1] | |
| ann_return = float(np.mean(daily_returns)) * 252 | |
| ann_vol = float(np.std(daily_returns, ddof=1)) * np.sqrt(252) | |
| sharpe = (ann_return - risk_free_rate) / ann_vol if ann_vol != 0 else 0 | |
| downside = daily_returns[daily_returns < 0] | |
| downside_vol = float(np.std(downside, ddof=1)) * np.sqrt(252) if len(downside) > 1 else ann_vol | |
| sortino = (ann_return - risk_free_rate) / downside_vol if downside_vol != 0 else 0 | |
| cumulative = np.cumprod(1 + daily_returns) | |
| running_max = np.maximum.accumulate(cumulative) | |
| drawdowns = (cumulative - running_max) / running_max | |
| max_dd = float(np.min(drawdowns)) * 100 | |
| sorted_ret = np.sort(daily_returns) | |
| var_95 = float(sorted_ret[int(0.05 * len(sorted_ret))]) * 100 | |
| cvar_95 = float(np.mean(sorted_ret[: int(0.05 * len(sorted_ret)) + 1])) * 100 | |
| signals = [] | |
| if sharpe > 1: | |
| signals.append(f"Strong risk-adjusted returns (Sharpe {sharpe:.2f})") | |
| elif sharpe > 0: | |
| signals.append(f"Positive but modest risk-adjusted returns (Sharpe {sharpe:.2f})") | |
| else: | |
| signals.append(f"Negative risk-adjusted returns (Sharpe {sharpe:.2f})") | |
| if max_dd < -20: | |
| signals.append(f"Significant drawdown risk: max drawdown {max_dd:.1f}%") | |
| chart = [{"date": dates[i + 1], "cumulative_return": _safe((cumulative[i] - 1) * 100), | |
| "drawdown": _safe(drawdowns[i] * 100)} for i in range(len(cumulative))] | |
| return { | |
| "ticker": ticker, "model": "risk_metrics", "period": period, | |
| "metrics": { | |
| "annual_return": _safe(ann_return), | |
| "annual_volatility": _safe(ann_vol), | |
| "sharpe_ratio": _safe(sharpe), | |
| "sortino_ratio": _safe(sortino), | |
| "max_drawdown_pct": _safe(max_dd), | |
| "var_95_daily_pct": _safe(var_95), | |
| "cvar_95_daily_pct": _safe(cvar_95), | |
| }, | |
| "signals": signals, | |
| "chart_data": chart[-60:], | |
| "interpretation": ( | |
| f"{ticker} Sharpe={sharpe:.2f}, Sortino={sortino:.2f}, " | |
| f"annual vol {ann_vol:.1%}, max drawdown {max_dd:.1f}%. {signals[0]}" | |
| ), | |
| } | |
| # ------------------------------------------------------------------ # | |
| # 9. Correlation Matrix # | |
| # ------------------------------------------------------------------ # | |
| def correlation_matrix(tickers: list[str], period: str = "1y") -> dict[str, Any]: | |
| import yfinance as yf | |
| if len(tickers) < 2: | |
| return {"error": "Need at least 2 tickers", "model": "correlation"} | |
| tickers = tickers[:8] | |
| returns_map: dict[str, np.ndarray] = {} | |
| common_len = None | |
| for tkr in tickers: | |
| try: | |
| df = yf.Ticker(tkr).history(period=period) | |
| if df.empty or len(df) < 30: | |
| continue | |
| r = np.diff(df["Close"].values) / df["Close"].values[:-1] | |
| returns_map[tkr] = r | |
| common_len = len(r) if common_len is None else min(common_len, len(r)) | |
| except Exception: | |
| continue | |
| valid_tickers = list(returns_map.keys()) | |
| if len(valid_tickers) < 2: | |
| return {"error": "Could not fetch data for enough tickers", "model": "correlation"} | |
| matrix_data = np.column_stack([returns_map[t][-common_len:] for t in valid_tickers]) | |
| corr = np.corrcoef(matrix_data, rowvar=False) | |
| chart = [] | |
| for i, t1 in enumerate(valid_tickers): | |
| for j, t2 in enumerate(valid_tickers): | |
| chart.append({"ticker1": t1, "ticker2": t2, "correlation": _safe(corr[i, j])}) | |
| table_rows = [] | |
| for i, t1 in enumerate(valid_tickers): | |
| row = [t1] + [str(_safe(corr[i, j])) for j in range(len(valid_tickers))] | |
| table_rows.append(row) | |
| signals = [] | |
| for i in range(len(valid_tickers)): | |
| for j in range(i + 1, len(valid_tickers)): | |
| c = corr[i, j] | |
| if c > 0.8: | |
| signals.append(f"{valid_tickers[i]}/{valid_tickers[j]} highly correlated ({c:.2f})") | |
| elif c < -0.3: | |
| signals.append(f"{valid_tickers[i]}/{valid_tickers[j]} negatively correlated ({c:.2f}) — diversification benefit") | |
| if not signals: | |
| signals.append("No extreme correlations detected") | |
| return { | |
| "tickers": valid_tickers, "model": "correlation", "period": period, | |
| "metrics": {f"{valid_tickers[i]}_{valid_tickers[j]}": _safe(corr[i, j]) | |
| for i in range(len(valid_tickers)) for j in range(i + 1, len(valid_tickers))}, | |
| "signals": signals, | |
| "chart_data": chart, | |
| "table": {"columns": [""] + valid_tickers, "rows": table_rows}, | |
| "interpretation": f"Correlation matrix for {', '.join(valid_tickers)} over {period}. {signals[0]}", | |
| } | |
| # ------------------------------------------------------------------ # | |
| # 10. Linear Regression Trend # | |
| # ------------------------------------------------------------------ # | |
| def linear_regression_trend(ticker: str, period: str = "1y", forecast_days: int = 30) -> dict[str, Any]: | |
| df = _get_history(ticker, period) | |
| if df.empty or len(df) < 30: | |
| return {"error": f"Insufficient data for {ticker}", "model": "regression"} | |
| closes = df["Close"].values | |
| dates = _dates(df) | |
| n = len(closes) | |
| x = np.arange(n, dtype=float) | |
| y = closes.astype(float) | |
| x_mean, y_mean = np.mean(x), np.mean(y) | |
| ss_xy = np.sum((x - x_mean) * (y - y_mean)) | |
| ss_xx = np.sum((x - x_mean) ** 2) | |
| slope = ss_xy / ss_xx if ss_xx != 0 else 0 | |
| intercept = y_mean - slope * x_mean | |
| y_pred = slope * x + intercept | |
| ss_res = np.sum((y - y_pred) ** 2) | |
| ss_tot = np.sum((y - y_mean) ** 2) | |
| r_squared = 1 - ss_res / ss_tot if ss_tot != 0 else 0 | |
| forecast_x = np.arange(n, n + forecast_days) | |
| forecast_y = slope * forecast_x + intercept | |
| daily_pct = (slope / closes[-1]) * 100 if closes[-1] != 0 else 0 | |
| signals = [] | |
| if slope > 0: | |
| signals.append(f"Upward trend: +${slope:.4f}/day ({daily_pct:.3f}%/day)") | |
| else: | |
| signals.append(f"Downward trend: ${slope:.4f}/day ({daily_pct:.3f}%/day)") | |
| if r_squared > 0.7: | |
| signals.append(f"Strong trend fit (R²={r_squared:.3f})") | |
| elif r_squared > 0.4: | |
| signals.append(f"Moderate trend fit (R²={r_squared:.3f})") | |
| else: | |
| signals.append(f"Weak trend fit (R²={r_squared:.3f}) — price is choppy") | |
| chart = [{"date": dates[i], "close": _safe(closes[i]), "trend": _safe(y_pred[i])} for i in range(n)] | |
| for i in range(forecast_days): | |
| chart.append({"date": f"D+{i + 1}", "close": None, "trend": _safe(forecast_y[i])}) | |
| return { | |
| "ticker": ticker, "model": "regression", "period": period, | |
| "metrics": { | |
| "slope_per_day": _safe(slope), | |
| "daily_pct_change": _safe(daily_pct), | |
| "intercept": _safe(intercept), | |
| "r_squared": _safe(r_squared), | |
| "forecast_price": _safe(forecast_y[-1]), | |
| "forecast_days": forecast_days, | |
| }, | |
| "signals": signals, | |
| "chart_data": chart[-90:], | |
| "interpretation": ( | |
| f"{ticker} linear trend: slope ${slope:.4f}/day, R²={r_squared:.3f}. " | |
| f"{forecast_days}-day forecast: ${forecast_y[-1]:.2f}. {signals[0]}" | |
| ), | |
| } | |
| # ------------------------------------------------------------------ # | |
| # Registry # | |
| # ------------------------------------------------------------------ # | |
| ALL_MODELS = { | |
| "moving_averages": moving_averages, | |
| "rsi": rsi_analysis, | |
| "macd": macd_analysis, | |
| "bollinger_bands": bollinger_bands, | |
| "monte_carlo": monte_carlo_forecast, | |
| "dcf": dcf_valuation, | |
| "beta_capm": beta_capm, | |
| "risk_metrics": sharpe_risk_metrics, | |
| "correlation": correlation_matrix, | |
| "regression": linear_regression_trend, | |
| } | |
| def run_model(model_name: str, ticker: str, **kwargs) -> dict[str, Any]: | |
| """Run a named model. Returns error dict if model unknown.""" | |
| fn = ALL_MODELS.get(model_name) | |
| if fn is None: | |
| return {"error": f"Unknown model: {model_name}. Available: {', '.join(ALL_MODELS)}", "model": model_name} | |
| try: | |
| if model_name == "correlation": | |
| tickers = kwargs.pop("tickers", [ticker]) | |
| return fn(tickers, **kwargs) | |
| return fn(ticker, **kwargs) | |
| except Exception as exc: | |
| logger.error("Model %s failed for %s: %s", model_name, ticker, exc) | |
| return {"error": str(exc), "model": model_name, "ticker": ticker} | |