File size: 3,846 Bytes
9d29748
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
624c105
9d29748
624c105
 
9d29748
624c105
9d29748
 
624c105
 
 
 
 
 
9d29748
624c105
 
9d29748
624c105
 
9d29748
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""
Correlation Analysis Engine.

Computes pairwise correlation matrices between portfolio holdings
using historical return data from Yahoo Finance.
Detects hidden concentration risk and flags highly correlated pairs.
"""

from __future__ import annotations

import logging
from typing import Any, Dict, List, Optional

logger = logging.getLogger(__name__)


async def compute_correlation_matrix(
    tickers: List[str],
    period: str = "6mo",
) -> Dict[str, Any]:
    """
    Compute pairwise correlation matrix for a list of tickers.

    Returns:
        Dict with matrix data, highly correlated pairs, and sector clustering.
    """
    if len(tickers) < 2:
        return {"matrix": [], "tickers": tickers, "pairs": [], "risk_flags": []}

    try:
        import numpy as np
        from app.services.data_ingestion.yahoo import yahoo_adapter

        # Use the adapter (which uses curl_cffi on cloud)
        data_dict = await yahoo_adapter.fetch_multiple_tickers(tickers[:15], period=period, interval="1d")

        if not data_dict:
            return {"matrix": [], "tickers": tickers, "pairs": [], "risk_flags": []}

        import pandas as pd
        # Build a DataFrame of close prices
        close_frames = {}
        for t, df in data_dict.items():
            if "Close" in df.columns and len(df) > 0:
                close_frames[t] = df["Close"]

        if len(close_frames) < 2:
            return {"matrix": [], "tickers": list(close_frames.keys()), "pairs": [], "risk_flags": []}

        close = pd.DataFrame(close_frames).dropna()
        available = list(close.columns)

        # Compute daily returns
        returns = close.pct_change().dropna()

        if len(returns) < 10:
            return {"matrix": [], "tickers": available, "pairs": [], "risk_flags": []}

        # Correlation matrix
        corr = returns.corr()

        # Convert to serializable format
        matrix = []
        for i, t1 in enumerate(available):
            row = []
            for j, t2 in enumerate(available):
                val = float(corr.iloc[i, j])
                row.append(round(val, 3))
            matrix.append(row)

        # Find highly correlated pairs (|corr| > 0.7, excluding self)
        pairs = []
        risk_flags = []
        for i in range(len(available)):
            for j in range(i + 1, len(available)):
                c = float(corr.iloc[i, j])
                pair_data = {
                    "ticker1": available[i],
                    "ticker2": available[j],
                    "correlation": round(c, 3),
                    "strength": "strong" if abs(c) > 0.8 else "moderate" if abs(c) > 0.6 else "weak",
                    "direction": "positive" if c > 0 else "negative",
                }
                pairs.append(pair_data)

                # Flag hidden concentration
                if abs(c) > 0.75:
                    risk_flags.append({
                        "type": "high_correlation",
                        "severity": "high" if abs(c) > 0.85 else "medium",
                        "message": f"{available[i]} and {available[j]} are {abs(c):.0%} correlated — hidden concentration risk",
                        "tickers": [available[i], available[j]],
                        "correlation": round(c, 3),
                    })

        # Sort pairs by |correlation| descending
        pairs.sort(key=lambda p: abs(p["correlation"]), reverse=True)

        return {
            "matrix": matrix,
            "tickers": available,
            "pairs": pairs,
            "risk_flags": risk_flags,
            "period": period,
            "data_points": len(returns),
        }

    except Exception as e:
        logger.error("Correlation computation failed: %s", e)
        return {"matrix": [], "tickers": tickers, "pairs": [], "risk_flags": [], "error": str(e)}