jashdoshi77's picture
fix: eliminate all remaining yfinance/XGBoost/calendar errors on cloud
624c105
"""
Correlation Analysis Engine.
Computes pairwise correlation matrices between portfolio holdings
using historical return data from Yahoo Finance.
Detects hidden concentration risk and flags highly correlated pairs.
"""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
async def compute_correlation_matrix(
tickers: List[str],
period: str = "6mo",
) -> Dict[str, Any]:
"""
Compute pairwise correlation matrix for a list of tickers.
Returns:
Dict with matrix data, highly correlated pairs, and sector clustering.
"""
if len(tickers) < 2:
return {"matrix": [], "tickers": tickers, "pairs": [], "risk_flags": []}
try:
import numpy as np
from app.services.data_ingestion.yahoo import yahoo_adapter
# Use the adapter (which uses curl_cffi on cloud)
data_dict = await yahoo_adapter.fetch_multiple_tickers(tickers[:15], period=period, interval="1d")
if not data_dict:
return {"matrix": [], "tickers": tickers, "pairs": [], "risk_flags": []}
import pandas as pd
# Build a DataFrame of close prices
close_frames = {}
for t, df in data_dict.items():
if "Close" in df.columns and len(df) > 0:
close_frames[t] = df["Close"]
if len(close_frames) < 2:
return {"matrix": [], "tickers": list(close_frames.keys()), "pairs": [], "risk_flags": []}
close = pd.DataFrame(close_frames).dropna()
available = list(close.columns)
# Compute daily returns
returns = close.pct_change().dropna()
if len(returns) < 10:
return {"matrix": [], "tickers": available, "pairs": [], "risk_flags": []}
# Correlation matrix
corr = returns.corr()
# Convert to serializable format
matrix = []
for i, t1 in enumerate(available):
row = []
for j, t2 in enumerate(available):
val = float(corr.iloc[i, j])
row.append(round(val, 3))
matrix.append(row)
# Find highly correlated pairs (|corr| > 0.7, excluding self)
pairs = []
risk_flags = []
for i in range(len(available)):
for j in range(i + 1, len(available)):
c = float(corr.iloc[i, j])
pair_data = {
"ticker1": available[i],
"ticker2": available[j],
"correlation": round(c, 3),
"strength": "strong" if abs(c) > 0.8 else "moderate" if abs(c) > 0.6 else "weak",
"direction": "positive" if c > 0 else "negative",
}
pairs.append(pair_data)
# Flag hidden concentration
if abs(c) > 0.75:
risk_flags.append({
"type": "high_correlation",
"severity": "high" if abs(c) > 0.85 else "medium",
"message": f"{available[i]} and {available[j]} are {abs(c):.0%} correlated — hidden concentration risk",
"tickers": [available[i], available[j]],
"correlation": round(c, 3),
})
# Sort pairs by |correlation| descending
pairs.sort(key=lambda p: abs(p["correlation"]), reverse=True)
return {
"matrix": matrix,
"tickers": available,
"pairs": pairs,
"risk_flags": risk_flags,
"period": period,
"data_points": len(returns),
}
except Exception as e:
logger.error("Correlation computation failed: %s", e)
return {"matrix": [], "tickers": tickers, "pairs": [], "risk_flags": [], "error": str(e)}