File size: 5,863 Bytes
9d29748
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
"""
Strategy Analytics Engine.

Cross-strategy analysis including:
- Performance comparison tables
- Return correlation matrices
- Diversification scoring
- Strategy clustering (hierarchical)
"""

from __future__ import annotations

import logging
from typing import Any, Dict, List, Optional

import numpy as np
import pandas as pd
from scipy.cluster.hierarchy import fcluster, linkage
from scipy.spatial.distance import squareform

logger = logging.getLogger(__name__)


class AnalyticsEngine:
    """Cross-strategy analytics and comparison."""

    def compare_strategies(
        self, backtest_results: List[Dict[str, Any]]
    ) -> Dict[str, Any]:
        """
        Compare multiple backtest results side by side.

        Args:
            backtest_results: List of backtest result dicts (from BacktestEngine).

        Returns:
            Comparison table, correlation matrix, diversification score, clusters.
        """
        if not backtest_results:
            return {"comparisons": [], "correlation_matrix": None}

        # 1. Build comparison table
        comparisons = []
        equity_curves: Dict[str, List[float]] = {}

        for i, result in enumerate(backtest_results):
            metrics = result.get("metrics", {})
            strategy_name = result.get("strategy_name", f"Strategy {i + 1}")
            strategy_id = result.get("strategy_id", i)

            comparisons.append({
                "strategy_id": strategy_id,
                "strategy_name": strategy_name,
                "total_return": metrics.get("total_return"),
                "annualized_return": metrics.get("annualized_return"),
                "sharpe_ratio": metrics.get("sharpe_ratio"),
                "sortino_ratio": metrics.get("sortino_ratio"),
                "max_drawdown": metrics.get("max_drawdown"),
                "volatility": metrics.get("volatility"),
                "win_rate": metrics.get("win_rate"),
                "calmar_ratio": metrics.get("calmar_ratio"),
                "total_trades": metrics.get("total_trades"),
                "alpha": metrics.get("alpha"),
                "beta": metrics.get("beta"),
            })

            # Extract daily returns for correlation
            curve = result.get("equity_curve", [])
            if curve:
                daily_rets = [p.get("daily_return", 0) for p in curve]
                equity_curves[strategy_name] = daily_rets

        # 2. Correlation matrix
        corr_matrix = None
        if len(equity_curves) >= 2:
            corr_matrix = self._compute_correlation(equity_curves)

        # 3. Diversification score
        div_score = None
        if corr_matrix:
            div_score = self._diversification_score(corr_matrix)

        # 4. Clustering
        clusters = None
        if corr_matrix and len(equity_curves) >= 3:
            clusters = self._cluster_strategies(corr_matrix)

        return {
            "comparisons": comparisons,
            "correlation_matrix": corr_matrix,
            "diversification_score": div_score,
            "clusters": clusters,
        }

    @staticmethod
    def _compute_correlation(
        return_series: Dict[str, List[float]],
    ) -> Dict[str, Any]:
        """Compute return correlation matrix between strategies."""
        # Align lengths
        min_len = min(len(v) for v in return_series.values())
        aligned = {k: v[:min_len] for k, v in return_series.items()}

        df = pd.DataFrame(aligned)
        corr = df.corr()

        return {
            "strategy_names": list(corr.columns),
            "matrix": [[round(v, 4) for v in row] for row in corr.values.tolist()],
        }

    @staticmethod
    def _diversification_score(corr_data: Dict[str, Any]) -> float:
        """
        Compute diversification score (0-1).
        Lower average correlation = higher diversification.
        """
        matrix = np.array(corr_data["matrix"])
        n = len(matrix)
        if n < 2:
            return 1.0

        # Average off-diagonal correlation
        mask = ~np.eye(n, dtype=bool)
        avg_corr = np.mean(np.abs(matrix[mask]))
        return round(float(1.0 - avg_corr), 4)

    @staticmethod
    def _cluster_strategies(corr_data: Dict[str, Any]) -> Dict[str, List[str]]:
        """Hierarchical clustering of strategies by return correlation."""
        names = corr_data["strategy_names"]
        matrix = np.array(corr_data["matrix"])
        n = len(matrix)

        if n < 3:
            return {"cluster_1": names}

        # Convert correlation to distance
        distance = 1.0 - np.abs(matrix)
        np.fill_diagonal(distance, 0)

        # Ensure symmetry
        distance = (distance + distance.T) / 2

        try:
            condensed = squareform(distance, checks=False)
            Z = linkage(condensed, method="ward")
            max_clusters = min(n, 3)
            labels = fcluster(Z, t=max_clusters, criterion="maxclust")

            clusters: Dict[str, List[str]] = {}
            for name, label in zip(names, labels):
                key = f"cluster_{label}"
                clusters.setdefault(key, []).append(name)

            return clusters
        except Exception as e:
            logger.warning("Clustering failed: %s", e)
            return {"cluster_1": names}

    @staticmethod
    def rank_strategies(
        comparisons: List[Dict[str, Any]],
        sort_by: str = "sharpe_ratio",
        ascending: bool = False,
    ) -> List[Dict[str, Any]]:
        """Rank strategies by a specific metric."""
        valid = [c for c in comparisons if c.get(sort_by) is not None]
        sorted_list = sorted(
            valid, key=lambda x: x.get(sort_by, 0), reverse=not ascending
        )
        for i, item in enumerate(sorted_list, 1):
            item["rank"] = i
        return sorted_list


analytics_engine = AnalyticsEngine()