Spaces:
Running
Running
File size: 3,846 Bytes
9d29748 624c105 9d29748 624c105 9d29748 624c105 9d29748 624c105 9d29748 624c105 9d29748 624c105 9d29748 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | """
Correlation Analysis Engine.
Computes pairwise correlation matrices between portfolio holdings
using historical return data from Yahoo Finance.
Detects hidden concentration risk and flags highly correlated pairs.
"""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
async def compute_correlation_matrix(
tickers: List[str],
period: str = "6mo",
) -> Dict[str, Any]:
"""
Compute pairwise correlation matrix for a list of tickers.
Returns:
Dict with matrix data, highly correlated pairs, and sector clustering.
"""
if len(tickers) < 2:
return {"matrix": [], "tickers": tickers, "pairs": [], "risk_flags": []}
try:
import numpy as np
from app.services.data_ingestion.yahoo import yahoo_adapter
# Use the adapter (which uses curl_cffi on cloud)
data_dict = await yahoo_adapter.fetch_multiple_tickers(tickers[:15], period=period, interval="1d")
if not data_dict:
return {"matrix": [], "tickers": tickers, "pairs": [], "risk_flags": []}
import pandas as pd
# Build a DataFrame of close prices
close_frames = {}
for t, df in data_dict.items():
if "Close" in df.columns and len(df) > 0:
close_frames[t] = df["Close"]
if len(close_frames) < 2:
return {"matrix": [], "tickers": list(close_frames.keys()), "pairs": [], "risk_flags": []}
close = pd.DataFrame(close_frames).dropna()
available = list(close.columns)
# Compute daily returns
returns = close.pct_change().dropna()
if len(returns) < 10:
return {"matrix": [], "tickers": available, "pairs": [], "risk_flags": []}
# Correlation matrix
corr = returns.corr()
# Convert to serializable format
matrix = []
for i, t1 in enumerate(available):
row = []
for j, t2 in enumerate(available):
val = float(corr.iloc[i, j])
row.append(round(val, 3))
matrix.append(row)
# Find highly correlated pairs (|corr| > 0.7, excluding self)
pairs = []
risk_flags = []
for i in range(len(available)):
for j in range(i + 1, len(available)):
c = float(corr.iloc[i, j])
pair_data = {
"ticker1": available[i],
"ticker2": available[j],
"correlation": round(c, 3),
"strength": "strong" if abs(c) > 0.8 else "moderate" if abs(c) > 0.6 else "weak",
"direction": "positive" if c > 0 else "negative",
}
pairs.append(pair_data)
# Flag hidden concentration
if abs(c) > 0.75:
risk_flags.append({
"type": "high_correlation",
"severity": "high" if abs(c) > 0.85 else "medium",
"message": f"{available[i]} and {available[j]} are {abs(c):.0%} correlated — hidden concentration risk",
"tickers": [available[i], available[j]],
"correlation": round(c, 3),
})
# Sort pairs by |correlation| descending
pairs.sort(key=lambda p: abs(p["correlation"]), reverse=True)
return {
"matrix": matrix,
"tickers": available,
"pairs": pairs,
"risk_flags": risk_flags,
"period": period,
"data_points": len(returns),
}
except Exception as e:
logger.error("Correlation computation failed: %s", e)
return {"matrix": [], "tickers": tickers, "pairs": [], "risk_flags": [], "error": str(e)}
|