Spaces:
Sleeping
Sleeping
| """ | |
| Correlation Analysis Engine. | |
| Computes pairwise correlation matrices between portfolio holdings | |
| using historical return data from Yahoo Finance. | |
| Detects hidden concentration risk and flags highly correlated pairs. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from typing import Any, Dict, List, Optional | |
| logger = logging.getLogger(__name__) | |
| async def compute_correlation_matrix( | |
| tickers: List[str], | |
| period: str = "6mo", | |
| ) -> Dict[str, Any]: | |
| """ | |
| Compute pairwise correlation matrix for a list of tickers. | |
| Returns: | |
| Dict with matrix data, highly correlated pairs, and sector clustering. | |
| """ | |
| if len(tickers) < 2: | |
| return {"matrix": [], "tickers": tickers, "pairs": [], "risk_flags": []} | |
| try: | |
| import numpy as np | |
| from app.services.data_ingestion.yahoo import yahoo_adapter | |
| # Use the adapter (which uses curl_cffi on cloud) | |
| data_dict = await yahoo_adapter.fetch_multiple_tickers(tickers[:15], period=period, interval="1d") | |
| if not data_dict: | |
| return {"matrix": [], "tickers": tickers, "pairs": [], "risk_flags": []} | |
| import pandas as pd | |
| # Build a DataFrame of close prices | |
| close_frames = {} | |
| for t, df in data_dict.items(): | |
| if "Close" in df.columns and len(df) > 0: | |
| close_frames[t] = df["Close"] | |
| if len(close_frames) < 2: | |
| return {"matrix": [], "tickers": list(close_frames.keys()), "pairs": [], "risk_flags": []} | |
| close = pd.DataFrame(close_frames).dropna() | |
| available = list(close.columns) | |
| # Compute daily returns | |
| returns = close.pct_change().dropna() | |
| if len(returns) < 10: | |
| return {"matrix": [], "tickers": available, "pairs": [], "risk_flags": []} | |
| # Correlation matrix | |
| corr = returns.corr() | |
| # Convert to serializable format | |
| matrix = [] | |
| for i, t1 in enumerate(available): | |
| row = [] | |
| for j, t2 in enumerate(available): | |
| val = float(corr.iloc[i, j]) | |
| row.append(round(val, 3)) | |
| matrix.append(row) | |
| # Find highly correlated pairs (|corr| > 0.7, excluding self) | |
| pairs = [] | |
| risk_flags = [] | |
| for i in range(len(available)): | |
| for j in range(i + 1, len(available)): | |
| c = float(corr.iloc[i, j]) | |
| pair_data = { | |
| "ticker1": available[i], | |
| "ticker2": available[j], | |
| "correlation": round(c, 3), | |
| "strength": "strong" if abs(c) > 0.8 else "moderate" if abs(c) > 0.6 else "weak", | |
| "direction": "positive" if c > 0 else "negative", | |
| } | |
| pairs.append(pair_data) | |
| # Flag hidden concentration | |
| if abs(c) > 0.75: | |
| risk_flags.append({ | |
| "type": "high_correlation", | |
| "severity": "high" if abs(c) > 0.85 else "medium", | |
| "message": f"{available[i]} and {available[j]} are {abs(c):.0%} correlated — hidden concentration risk", | |
| "tickers": [available[i], available[j]], | |
| "correlation": round(c, 3), | |
| }) | |
| # Sort pairs by |correlation| descending | |
| pairs.sort(key=lambda p: abs(p["correlation"]), reverse=True) | |
| return { | |
| "matrix": matrix, | |
| "tickers": available, | |
| "pairs": pairs, | |
| "risk_flags": risk_flags, | |
| "period": period, | |
| "data_points": len(returns), | |
| } | |
| except Exception as e: | |
| logger.error("Correlation computation failed: %s", e) | |
| return {"matrix": [], "tickers": tickers, "pairs": [], "risk_flags": [], "error": str(e)} | |