""" Correlation Analysis Engine. Computes pairwise correlation matrices between portfolio holdings using historical return data from Yahoo Finance. Detects hidden concentration risk and flags highly correlated pairs. """ from __future__ import annotations import logging from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) async def compute_correlation_matrix( tickers: List[str], period: str = "6mo", ) -> Dict[str, Any]: """ Compute pairwise correlation matrix for a list of tickers. Returns: Dict with matrix data, highly correlated pairs, and sector clustering. """ if len(tickers) < 2: return {"matrix": [], "tickers": tickers, "pairs": [], "risk_flags": []} try: import numpy as np from app.services.data_ingestion.yahoo import yahoo_adapter # Use the adapter (which uses curl_cffi on cloud) data_dict = await yahoo_adapter.fetch_multiple_tickers(tickers[:15], period=period, interval="1d") if not data_dict: return {"matrix": [], "tickers": tickers, "pairs": [], "risk_flags": []} import pandas as pd # Build a DataFrame of close prices close_frames = {} for t, df in data_dict.items(): if "Close" in df.columns and len(df) > 0: close_frames[t] = df["Close"] if len(close_frames) < 2: return {"matrix": [], "tickers": list(close_frames.keys()), "pairs": [], "risk_flags": []} close = pd.DataFrame(close_frames).dropna() available = list(close.columns) # Compute daily returns returns = close.pct_change().dropna() if len(returns) < 10: return {"matrix": [], "tickers": available, "pairs": [], "risk_flags": []} # Correlation matrix corr = returns.corr() # Convert to serializable format matrix = [] for i, t1 in enumerate(available): row = [] for j, t2 in enumerate(available): val = float(corr.iloc[i, j]) row.append(round(val, 3)) matrix.append(row) # Find highly correlated pairs (|corr| > 0.7, excluding self) pairs = [] risk_flags = [] for i in range(len(available)): for j in range(i + 1, len(available)): c = float(corr.iloc[i, j]) pair_data = { "ticker1": available[i], "ticker2": available[j], "correlation": round(c, 3), "strength": "strong" if abs(c) > 0.8 else "moderate" if abs(c) > 0.6 else "weak", "direction": "positive" if c > 0 else "negative", } pairs.append(pair_data) # Flag hidden concentration if abs(c) > 0.75: risk_flags.append({ "type": "high_correlation", "severity": "high" if abs(c) > 0.85 else "medium", "message": f"{available[i]} and {available[j]} are {abs(c):.0%} correlated — hidden concentration risk", "tickers": [available[i], available[j]], "correlation": round(c, 3), }) # Sort pairs by |correlation| descending pairs.sort(key=lambda p: abs(p["correlation"]), reverse=True) return { "matrix": matrix, "tickers": available, "pairs": pairs, "risk_flags": risk_flags, "period": period, "data_points": len(returns), } except Exception as e: logger.error("Correlation computation failed: %s", e) return {"matrix": [], "tickers": tickers, "pairs": [], "risk_flags": [], "error": str(e)}