stockproject / brain /quant /cross_sectional_cache.py
harshisageek's picture
deploy: clean history for HuggingFace
1cd56b6
"""
Cross-Sectional Ranking Cache
Caches the daily universe ranking to avoid re-downloading 98 stocks on every ticker lookup.
TTL: 4 hours (refreshes during market hours, persists after close).
"""
import time
import logging
import pandas as pd
import numpy as np
import yfinance as yf
logger = logging.getLogger(__name__)
# The validated 98-stock universe
UNIVERSE = [
"AAPL", "MSFT", "GOOGL", "AMZN", "NVDA", "BRK-B", "JPM", "V", "JNJ", "WMT",
"PG", "MA", "UNH", "HD", "DIS", "BAC", "XOM", "PFE", "KO", "PEP", "CSCO",
"INTC", "NFLX", "ADBE", "CRM", "AMD", "CVX", "CAT", "BA", "IBM",
"T", "VZ", "CMCSA", "PNC", "USB", "C", "WFC", "GS", "MS", "AXP",
"MCD", "SBUX", "NKE", "TGT", "COST", "TJX", "LOW", "MO", "PM", "CL",
"KMB", "GIS", "SYK", "MDT", "ISRG", "ABT", "TMO", "DHR",
"BMY", "AMGN", "GILD", "VRTX", "REGN", "COP", "SLB", "HAL", "EOG", "OXY",
"LMT", "RTX", "GD", "NOC", "UPS", "FDX", "UNP", "CSX", "NSC", "HON",
"GE", "MMM", "EMR", "ITW", "APD", "LIN", "ECL", "SHW", "PPG", "NEM",
"NEE", "DUK", "SO", "AEP", "D", "EXC", "XEL", "SRE", "PEG", "WEC"
]
W_VAM = 0.2 # Validated: 80% Mom_1M, 20% Vol_Adj_Mom
_cache = {
"rankings": {}, # ticker -> {"alpha_score": float, "rank": int, "total": int, "mom_1m": float}
"stats": {}, # {"Mom_1M": (mean, std), "Vol_Adj_Mom": (mean, std)} for Z-scoring out-of-universe
"timestamp": 0,
"ttl": 4 * 3600, # 4 hours
}
def _compute_rankings():
"""Download universe data and compute cross-sectional Z-scored Alpha Scores."""
logger.info("[CS Cache] Computing cross-sectional rankings for %d stocks...", len(UNIVERSE))
try:
raw = yf.download(UNIVERSE, period="6mo", progress=False)
if isinstance(raw.columns, pd.MultiIndex):
if "Adj Close" in raw.columns.get_level_values(0):
df_close = raw["Adj Close"]
else:
df_close = raw["Close"]
else:
df_close = raw
df_close = df_close.ffill().dropna(how='all')
except Exception as e:
logger.error("[CS Cache] Download failed: %s", e)
return {}, {}
# Compute factors
daily_ret = df_close.pct_change()
mom_1m = (df_close / df_close.shift(21)) - 1
mom_3m = (df_close / df_close.shift(61)) - 1
vol_1m = daily_ret.rolling(window=21).std() * np.sqrt(252)
vol_adj_mom = mom_3m / vol_1m
# Build cross-section from latest values
factors = []
for ticker in UNIVERSE:
if ticker not in df_close.columns:
continue
m1 = mom_1m[ticker].iloc[-1]
vam = vol_adj_mom[ticker].iloc[-1]
if pd.isna(m1) or pd.isna(vam):
continue
factors.append({"Ticker": ticker, "Mom_1M": m1, "Vol_Adj_Mom": vam})
if not factors:
return {}, {}
cs = pd.DataFrame(factors)
# Store universe distribution stats for out-of-universe Z-scoring
stats = {}
for f in ["Mom_1M", "Vol_Adj_Mom"]:
m, s = cs[f].mean(), cs[f].std()
stats[f] = (float(m), float(s))
cs[f"Z_{f}"] = (cs[f] - m) / s if s > 1e-8 else 0.0
# Composite Alpha Score
cs["Alpha_Score"] = (W_VAM * cs["Z_Vol_Adj_Mom"]) + ((1 - W_VAM) * cs["Z_Mom_1M"])
cs = cs.sort_values("Alpha_Score", ascending=False).reset_index(drop=True)
total = len(cs)
rankings = {}
for i, row in cs.iterrows():
rankings[row["Ticker"]] = {
"alpha_score": round(float(row["Alpha_Score"]), 4),
"rank": i + 1,
"total": total,
"mom_1m": round(float(row["Mom_1M"]) * 100, 2),
"z_mom_1m": round(float(row["Z_Mom_1M"]), 4),
"z_vam": round(float(row["Z_Vol_Adj_Mom"]), 4),
}
logger.info("[CS Cache] Rankings computed: %d stocks ranked.", total)
return rankings, stats
def _score_out_of_universe(ticker: str) -> dict:
"""Compute momentum for a ticker NOT in the universe by Z-scoring against the cached distribution."""
stats = _cache.get("stats", {})
if not stats:
return {"alpha_score": 0.0, "rank": 0, "total": 0, "mom_1m": 0.0, "z_mom_1m": 0.0, "z_vam": 0.0}
try:
raw = yf.download(ticker, period="6mo", progress=False)
if raw is None or raw.empty or len(raw) < 62:
return {"alpha_score": 0.0, "rank": 0, "total": 0, "mom_1m": 0.0, "z_mom_1m": 0.0, "z_vam": 0.0}
close = raw["Adj Close"] if "Adj Close" in raw.columns else raw["Close"]
close = close.squeeze()
mom_1m_val = float((close.iloc[-1] / close.iloc[-21]) - 1)
mom_3m_val = float((close.iloc[-1] / close.iloc[-61]) - 1)
daily_ret = close.pct_change()
vol_1m_val = float(daily_ret.iloc[-21:].std() * np.sqrt(252))
if vol_1m_val < 1e-8:
return {"alpha_score": 0.0, "rank": 0, "total": 0, "mom_1m": round(mom_1m_val * 100, 2), "z_mom_1m": 0.0, "z_vam": 0.0}
vam_val = mom_3m_val / vol_1m_val
# Z-score against universe distribution
m1_mean, m1_std = stats.get("Mom_1M", (0, 1))
vam_mean, vam_std = stats.get("Vol_Adj_Mom", (0, 1))
z_m1 = (mom_1m_val - m1_mean) / m1_std if m1_std > 1e-8 else 0.0
z_vam = (vam_val - vam_mean) / vam_std if vam_std > 1e-8 else 0.0
alpha = (W_VAM * z_vam) + ((1 - W_VAM) * z_m1)
total = _cache["rankings"].get(list(_cache["rankings"].keys())[0], {}).get("total", 0) if _cache["rankings"] else 0
result = {
"alpha_score": round(alpha, 4),
"rank": 0, # Not formally ranked (out of universe)
"total": total,
"mom_1m": round(mom_1m_val * 100, 2),
"z_mom_1m": round(z_m1, 4),
"z_vam": round(z_vam, 4),
}
# Cache it so we don't re-download
_cache["rankings"][ticker] = result
logger.info("[CS Cache] Scored out-of-universe ticker %s: alpha=%.4f", ticker, alpha)
return result
except Exception as e:
logger.error("[CS Cache] Failed to score %s: %s", ticker, e)
return {"alpha_score": 0.0, "rank": 0, "total": 0, "mom_1m": 0.0, "z_mom_1m": 0.0, "z_vam": 0.0}
def _load_from_supabase():
"""Try to hydrate rankings from the nightly worker's Supabase cache."""
try:
from backend.database import NewsDatabase
db = NewsDatabase()
data = db.get_cache("screener")
if data and "stocks" in data:
rankings = {}
stocks = data["stocks"]
total = len(stocks)
for i, s in enumerate(stocks):
rankings[s["ticker"]] = {
"alpha_score": s.get("alpha_score", 0.0),
"rank": i + 1,
"total": total,
"mom_1m": 0.0,
"z_mom_1m": 0.0,
"z_vam": 0.0,
}
logger.info("[CS Cache] Pre-warmed %d rankings from Supabase (instant).", total)
return rankings
except Exception as e:
logger.warning("[CS Cache] Supabase pre-warm failed: %s. Falling back to yfinance.", e)
return None
def get_ticker_ranking(ticker: str) -> dict:
"""
Get a ticker's Cross-Sectional Alpha Score and Rank.
Priority: in-memory cache → Supabase pre-warm → live yfinance download.
For out-of-universe tickers, computes on the fly using cached distribution stats.
"""
ticker = ticker.upper()
now = time.time()
# Check cache freshness
if now - _cache["timestamp"] > _cache["ttl"] or not _cache["rankings"]:
# Try Supabase first (instant, from nightly worker)
supabase_rankings = _load_from_supabase()
if supabase_rankings:
_cache["rankings"] = supabase_rankings
_cache["timestamp"] = now
else:
# Fall back to live yfinance download (~20s)
rankings, stats = _compute_rankings()
_cache["rankings"] = rankings
_cache["stats"] = stats
_cache["timestamp"] = now
# If ticker is in cache, return it
if ticker in _cache["rankings"]:
return _cache["rankings"][ticker]
# Out-of-universe: compute on the fly
return _score_out_of_universe(ticker)