Spaces:
Running
Running
| """ | |
| Cross-Sectional Ranking Cache | |
| Caches the daily universe ranking to avoid re-downloading 98 stocks on every ticker lookup. | |
| TTL: 4 hours (refreshes during market hours, persists after close). | |
| """ | |
| import time | |
| import logging | |
| import pandas as pd | |
| import numpy as np | |
| import yfinance as yf | |
| logger = logging.getLogger(__name__) | |
| # The validated 98-stock universe | |
| UNIVERSE = [ | |
| "AAPL", "MSFT", "GOOGL", "AMZN", "NVDA", "BRK-B", "JPM", "V", "JNJ", "WMT", | |
| "PG", "MA", "UNH", "HD", "DIS", "BAC", "XOM", "PFE", "KO", "PEP", "CSCO", | |
| "INTC", "NFLX", "ADBE", "CRM", "AMD", "CVX", "CAT", "BA", "IBM", | |
| "T", "VZ", "CMCSA", "PNC", "USB", "C", "WFC", "GS", "MS", "AXP", | |
| "MCD", "SBUX", "NKE", "TGT", "COST", "TJX", "LOW", "MO", "PM", "CL", | |
| "KMB", "GIS", "SYK", "MDT", "ISRG", "ABT", "TMO", "DHR", | |
| "BMY", "AMGN", "GILD", "VRTX", "REGN", "COP", "SLB", "HAL", "EOG", "OXY", | |
| "LMT", "RTX", "GD", "NOC", "UPS", "FDX", "UNP", "CSX", "NSC", "HON", | |
| "GE", "MMM", "EMR", "ITW", "APD", "LIN", "ECL", "SHW", "PPG", "NEM", | |
| "NEE", "DUK", "SO", "AEP", "D", "EXC", "XEL", "SRE", "PEG", "WEC" | |
| ] | |
| W_VAM = 0.2 # Validated: 80% Mom_1M, 20% Vol_Adj_Mom | |
| _cache = { | |
| "rankings": {}, # ticker -> {"alpha_score": float, "rank": int, "total": int, "mom_1m": float} | |
| "stats": {}, # {"Mom_1M": (mean, std), "Vol_Adj_Mom": (mean, std)} for Z-scoring out-of-universe | |
| "timestamp": 0, | |
| "ttl": 4 * 3600, # 4 hours | |
| } | |
| def _compute_rankings(): | |
| """Download universe data and compute cross-sectional Z-scored Alpha Scores.""" | |
| logger.info("[CS Cache] Computing cross-sectional rankings for %d stocks...", len(UNIVERSE)) | |
| try: | |
| raw = yf.download(UNIVERSE, period="6mo", progress=False) | |
| if isinstance(raw.columns, pd.MultiIndex): | |
| if "Adj Close" in raw.columns.get_level_values(0): | |
| df_close = raw["Adj Close"] | |
| else: | |
| df_close = raw["Close"] | |
| else: | |
| df_close = raw | |
| df_close = df_close.ffill().dropna(how='all') | |
| except Exception as e: | |
| logger.error("[CS Cache] Download failed: %s", e) | |
| return {}, {} | |
| # Compute factors | |
| daily_ret = df_close.pct_change() | |
| mom_1m = (df_close / df_close.shift(21)) - 1 | |
| mom_3m = (df_close / df_close.shift(61)) - 1 | |
| vol_1m = daily_ret.rolling(window=21).std() * np.sqrt(252) | |
| vol_adj_mom = mom_3m / vol_1m | |
| # Build cross-section from latest values | |
| factors = [] | |
| for ticker in UNIVERSE: | |
| if ticker not in df_close.columns: | |
| continue | |
| m1 = mom_1m[ticker].iloc[-1] | |
| vam = vol_adj_mom[ticker].iloc[-1] | |
| if pd.isna(m1) or pd.isna(vam): | |
| continue | |
| factors.append({"Ticker": ticker, "Mom_1M": m1, "Vol_Adj_Mom": vam}) | |
| if not factors: | |
| return {}, {} | |
| cs = pd.DataFrame(factors) | |
| # Store universe distribution stats for out-of-universe Z-scoring | |
| stats = {} | |
| for f in ["Mom_1M", "Vol_Adj_Mom"]: | |
| m, s = cs[f].mean(), cs[f].std() | |
| stats[f] = (float(m), float(s)) | |
| cs[f"Z_{f}"] = (cs[f] - m) / s if s > 1e-8 else 0.0 | |
| # Composite Alpha Score | |
| cs["Alpha_Score"] = (W_VAM * cs["Z_Vol_Adj_Mom"]) + ((1 - W_VAM) * cs["Z_Mom_1M"]) | |
| cs = cs.sort_values("Alpha_Score", ascending=False).reset_index(drop=True) | |
| total = len(cs) | |
| rankings = {} | |
| for i, row in cs.iterrows(): | |
| rankings[row["Ticker"]] = { | |
| "alpha_score": round(float(row["Alpha_Score"]), 4), | |
| "rank": i + 1, | |
| "total": total, | |
| "mom_1m": round(float(row["Mom_1M"]) * 100, 2), | |
| "z_mom_1m": round(float(row["Z_Mom_1M"]), 4), | |
| "z_vam": round(float(row["Z_Vol_Adj_Mom"]), 4), | |
| } | |
| logger.info("[CS Cache] Rankings computed: %d stocks ranked.", total) | |
| return rankings, stats | |
| def _score_out_of_universe(ticker: str) -> dict: | |
| """Compute momentum for a ticker NOT in the universe by Z-scoring against the cached distribution.""" | |
| stats = _cache.get("stats", {}) | |
| if not stats: | |
| return {"alpha_score": 0.0, "rank": 0, "total": 0, "mom_1m": 0.0, "z_mom_1m": 0.0, "z_vam": 0.0} | |
| try: | |
| raw = yf.download(ticker, period="6mo", progress=False) | |
| if raw is None or raw.empty or len(raw) < 62: | |
| return {"alpha_score": 0.0, "rank": 0, "total": 0, "mom_1m": 0.0, "z_mom_1m": 0.0, "z_vam": 0.0} | |
| close = raw["Adj Close"] if "Adj Close" in raw.columns else raw["Close"] | |
| close = close.squeeze() | |
| mom_1m_val = float((close.iloc[-1] / close.iloc[-21]) - 1) | |
| mom_3m_val = float((close.iloc[-1] / close.iloc[-61]) - 1) | |
| daily_ret = close.pct_change() | |
| vol_1m_val = float(daily_ret.iloc[-21:].std() * np.sqrt(252)) | |
| if vol_1m_val < 1e-8: | |
| return {"alpha_score": 0.0, "rank": 0, "total": 0, "mom_1m": round(mom_1m_val * 100, 2), "z_mom_1m": 0.0, "z_vam": 0.0} | |
| vam_val = mom_3m_val / vol_1m_val | |
| # Z-score against universe distribution | |
| m1_mean, m1_std = stats.get("Mom_1M", (0, 1)) | |
| vam_mean, vam_std = stats.get("Vol_Adj_Mom", (0, 1)) | |
| z_m1 = (mom_1m_val - m1_mean) / m1_std if m1_std > 1e-8 else 0.0 | |
| z_vam = (vam_val - vam_mean) / vam_std if vam_std > 1e-8 else 0.0 | |
| alpha = (W_VAM * z_vam) + ((1 - W_VAM) * z_m1) | |
| total = _cache["rankings"].get(list(_cache["rankings"].keys())[0], {}).get("total", 0) if _cache["rankings"] else 0 | |
| result = { | |
| "alpha_score": round(alpha, 4), | |
| "rank": 0, # Not formally ranked (out of universe) | |
| "total": total, | |
| "mom_1m": round(mom_1m_val * 100, 2), | |
| "z_mom_1m": round(z_m1, 4), | |
| "z_vam": round(z_vam, 4), | |
| } | |
| # Cache it so we don't re-download | |
| _cache["rankings"][ticker] = result | |
| logger.info("[CS Cache] Scored out-of-universe ticker %s: alpha=%.4f", ticker, alpha) | |
| return result | |
| except Exception as e: | |
| logger.error("[CS Cache] Failed to score %s: %s", ticker, e) | |
| return {"alpha_score": 0.0, "rank": 0, "total": 0, "mom_1m": 0.0, "z_mom_1m": 0.0, "z_vam": 0.0} | |
| def _load_from_supabase(): | |
| """Try to hydrate rankings from the nightly worker's Supabase cache.""" | |
| try: | |
| from backend.database import NewsDatabase | |
| db = NewsDatabase() | |
| data = db.get_cache("screener") | |
| if data and "stocks" in data: | |
| rankings = {} | |
| stocks = data["stocks"] | |
| total = len(stocks) | |
| for i, s in enumerate(stocks): | |
| rankings[s["ticker"]] = { | |
| "alpha_score": s.get("alpha_score", 0.0), | |
| "rank": i + 1, | |
| "total": total, | |
| "mom_1m": 0.0, | |
| "z_mom_1m": 0.0, | |
| "z_vam": 0.0, | |
| } | |
| logger.info("[CS Cache] Pre-warmed %d rankings from Supabase (instant).", total) | |
| return rankings | |
| except Exception as e: | |
| logger.warning("[CS Cache] Supabase pre-warm failed: %s. Falling back to yfinance.", e) | |
| return None | |
| def get_ticker_ranking(ticker: str) -> dict: | |
| """ | |
| Get a ticker's Cross-Sectional Alpha Score and Rank. | |
| Priority: in-memory cache → Supabase pre-warm → live yfinance download. | |
| For out-of-universe tickers, computes on the fly using cached distribution stats. | |
| """ | |
| ticker = ticker.upper() | |
| now = time.time() | |
| # Check cache freshness | |
| if now - _cache["timestamp"] > _cache["ttl"] or not _cache["rankings"]: | |
| # Try Supabase first (instant, from nightly worker) | |
| supabase_rankings = _load_from_supabase() | |
| if supabase_rankings: | |
| _cache["rankings"] = supabase_rankings | |
| _cache["timestamp"] = now | |
| else: | |
| # Fall back to live yfinance download (~20s) | |
| rankings, stats = _compute_rankings() | |
| _cache["rankings"] = rankings | |
| _cache["stats"] = stats | |
| _cache["timestamp"] = now | |
| # If ticker is in cache, return it | |
| if ticker in _cache["rankings"]: | |
| return _cache["rankings"][ticker] | |
| # Out-of-universe: compute on the fly | |
| return _score_out_of_universe(ticker) | |