""" Cross-Sectional Ranking Cache Caches the daily universe ranking to avoid re-downloading 98 stocks on every ticker lookup. TTL: 4 hours (refreshes during market hours, persists after close). """ import time import logging import pandas as pd import numpy as np import yfinance as yf logger = logging.getLogger(__name__) # The validated 98-stock universe UNIVERSE = [ "AAPL", "MSFT", "GOOGL", "AMZN", "NVDA", "BRK-B", "JPM", "V", "JNJ", "WMT", "PG", "MA", "UNH", "HD", "DIS", "BAC", "XOM", "PFE", "KO", "PEP", "CSCO", "INTC", "NFLX", "ADBE", "CRM", "AMD", "CVX", "CAT", "BA", "IBM", "T", "VZ", "CMCSA", "PNC", "USB", "C", "WFC", "GS", "MS", "AXP", "MCD", "SBUX", "NKE", "TGT", "COST", "TJX", "LOW", "MO", "PM", "CL", "KMB", "GIS", "SYK", "MDT", "ISRG", "ABT", "TMO", "DHR", "BMY", "AMGN", "GILD", "VRTX", "REGN", "COP", "SLB", "HAL", "EOG", "OXY", "LMT", "RTX", "GD", "NOC", "UPS", "FDX", "UNP", "CSX", "NSC", "HON", "GE", "MMM", "EMR", "ITW", "APD", "LIN", "ECL", "SHW", "PPG", "NEM", "NEE", "DUK", "SO", "AEP", "D", "EXC", "XEL", "SRE", "PEG", "WEC" ] W_VAM = 0.2 # Validated: 80% Mom_1M, 20% Vol_Adj_Mom _cache = { "rankings": {}, # ticker -> {"alpha_score": float, "rank": int, "total": int, "mom_1m": float} "stats": {}, # {"Mom_1M": (mean, std), "Vol_Adj_Mom": (mean, std)} for Z-scoring out-of-universe "timestamp": 0, "ttl": 4 * 3600, # 4 hours } def _compute_rankings(): """Download universe data and compute cross-sectional Z-scored Alpha Scores.""" logger.info("[CS Cache] Computing cross-sectional rankings for %d stocks...", len(UNIVERSE)) try: raw = yf.download(UNIVERSE, period="6mo", progress=False) if isinstance(raw.columns, pd.MultiIndex): if "Adj Close" in raw.columns.get_level_values(0): df_close = raw["Adj Close"] else: df_close = raw["Close"] else: df_close = raw df_close = df_close.ffill().dropna(how='all') except Exception as e: logger.error("[CS Cache] Download failed: %s", e) return {}, {} # Compute factors daily_ret = df_close.pct_change() mom_1m = (df_close / df_close.shift(21)) - 1 mom_3m = (df_close / df_close.shift(61)) - 1 vol_1m = daily_ret.rolling(window=21).std() * np.sqrt(252) vol_adj_mom = mom_3m / vol_1m # Build cross-section from latest values factors = [] for ticker in UNIVERSE: if ticker not in df_close.columns: continue m1 = mom_1m[ticker].iloc[-1] vam = vol_adj_mom[ticker].iloc[-1] if pd.isna(m1) or pd.isna(vam): continue factors.append({"Ticker": ticker, "Mom_1M": m1, "Vol_Adj_Mom": vam}) if not factors: return {}, {} cs = pd.DataFrame(factors) # Store universe distribution stats for out-of-universe Z-scoring stats = {} for f in ["Mom_1M", "Vol_Adj_Mom"]: m, s = cs[f].mean(), cs[f].std() stats[f] = (float(m), float(s)) cs[f"Z_{f}"] = (cs[f] - m) / s if s > 1e-8 else 0.0 # Composite Alpha Score cs["Alpha_Score"] = (W_VAM * cs["Z_Vol_Adj_Mom"]) + ((1 - W_VAM) * cs["Z_Mom_1M"]) cs = cs.sort_values("Alpha_Score", ascending=False).reset_index(drop=True) total = len(cs) rankings = {} for i, row in cs.iterrows(): rankings[row["Ticker"]] = { "alpha_score": round(float(row["Alpha_Score"]), 4), "rank": i + 1, "total": total, "mom_1m": round(float(row["Mom_1M"]) * 100, 2), "z_mom_1m": round(float(row["Z_Mom_1M"]), 4), "z_vam": round(float(row["Z_Vol_Adj_Mom"]), 4), } logger.info("[CS Cache] Rankings computed: %d stocks ranked.", total) return rankings, stats def _score_out_of_universe(ticker: str) -> dict: """Compute momentum for a ticker NOT in the universe by Z-scoring against the cached distribution.""" stats = _cache.get("stats", {}) if not stats: return {"alpha_score": 0.0, "rank": 0, "total": 0, "mom_1m": 0.0, "z_mom_1m": 0.0, "z_vam": 0.0} try: raw = yf.download(ticker, period="6mo", progress=False) if raw is None or raw.empty or len(raw) < 62: return {"alpha_score": 0.0, "rank": 0, "total": 0, "mom_1m": 0.0, "z_mom_1m": 0.0, "z_vam": 0.0} close = raw["Adj Close"] if "Adj Close" in raw.columns else raw["Close"] close = close.squeeze() mom_1m_val = float((close.iloc[-1] / close.iloc[-21]) - 1) mom_3m_val = float((close.iloc[-1] / close.iloc[-61]) - 1) daily_ret = close.pct_change() vol_1m_val = float(daily_ret.iloc[-21:].std() * np.sqrt(252)) if vol_1m_val < 1e-8: return {"alpha_score": 0.0, "rank": 0, "total": 0, "mom_1m": round(mom_1m_val * 100, 2), "z_mom_1m": 0.0, "z_vam": 0.0} vam_val = mom_3m_val / vol_1m_val # Z-score against universe distribution m1_mean, m1_std = stats.get("Mom_1M", (0, 1)) vam_mean, vam_std = stats.get("Vol_Adj_Mom", (0, 1)) z_m1 = (mom_1m_val - m1_mean) / m1_std if m1_std > 1e-8 else 0.0 z_vam = (vam_val - vam_mean) / vam_std if vam_std > 1e-8 else 0.0 alpha = (W_VAM * z_vam) + ((1 - W_VAM) * z_m1) total = _cache["rankings"].get(list(_cache["rankings"].keys())[0], {}).get("total", 0) if _cache["rankings"] else 0 result = { "alpha_score": round(alpha, 4), "rank": 0, # Not formally ranked (out of universe) "total": total, "mom_1m": round(mom_1m_val * 100, 2), "z_mom_1m": round(z_m1, 4), "z_vam": round(z_vam, 4), } # Cache it so we don't re-download _cache["rankings"][ticker] = result logger.info("[CS Cache] Scored out-of-universe ticker %s: alpha=%.4f", ticker, alpha) return result except Exception as e: logger.error("[CS Cache] Failed to score %s: %s", ticker, e) return {"alpha_score": 0.0, "rank": 0, "total": 0, "mom_1m": 0.0, "z_mom_1m": 0.0, "z_vam": 0.0} def _load_from_supabase(): """Try to hydrate rankings from the nightly worker's Supabase cache.""" try: from backend.database import NewsDatabase db = NewsDatabase() data = db.get_cache("screener") if data and "stocks" in data: rankings = {} stocks = data["stocks"] total = len(stocks) for i, s in enumerate(stocks): rankings[s["ticker"]] = { "alpha_score": s.get("alpha_score", 0.0), "rank": i + 1, "total": total, "mom_1m": 0.0, "z_mom_1m": 0.0, "z_vam": 0.0, } logger.info("[CS Cache] Pre-warmed %d rankings from Supabase (instant).", total) return rankings except Exception as e: logger.warning("[CS Cache] Supabase pre-warm failed: %s. Falling back to yfinance.", e) return None def get_ticker_ranking(ticker: str) -> dict: """ Get a ticker's Cross-Sectional Alpha Score and Rank. Priority: in-memory cache → Supabase pre-warm → live yfinance download. For out-of-universe tickers, computes on the fly using cached distribution stats. """ ticker = ticker.upper() now = time.time() # Check cache freshness if now - _cache["timestamp"] > _cache["ttl"] or not _cache["rankings"]: # Try Supabase first (instant, from nightly worker) supabase_rankings = _load_from_supabase() if supabase_rankings: _cache["rankings"] = supabase_rankings _cache["timestamp"] = now else: # Fall back to live yfinance download (~20s) rankings, stats = _compute_rankings() _cache["rankings"] = rankings _cache["stats"] = stats _cache["timestamp"] = now # If ticker is in cache, return it if ticker in _cache["rankings"]: return _cache["rankings"][ticker] # Out-of-universe: compute on the fly return _score_out_of_universe(ticker)