""" Tool: correlate_assets Computes: - Pairwise correlation matrix between multiple tickers. - Rolling correlation over a configurable window. - Beta of each ticker vs. a benchmark (default SPY). All calculations use daily log returns for statistical robustness. """ from __future__ import annotations import logging from typing import Any import numpy as np import pandas as pd from tools.base import BaseTool, ToolResult logger = logging.getLogger(__name__) def compute_correlation_matrix( returns_df: pd.DataFrame, ) -> dict[str, Any]: """Pearson correlation matrix of log returns. Parameters ---------- returns_df: DataFrame where each column is a ticker's daily log returns. Returns ------- dict with ``tickers``, ``matrix`` (2-D list), and ``pairs`` (list of {ticker_a, ticker_b, correlation}). """ corr = returns_df.corr() tickers = list(corr.columns) matrix = [[round(float(corr.iloc[i, j]), 4) for j in range(len(tickers))] for i in range(len(tickers))] pairs = [] for i in range(len(tickers)): for j in range(i + 1, len(tickers)): pairs.append({ "ticker_a": tickers[i], "ticker_b": tickers[j], "correlation": round(float(corr.iloc[i, j]), 4), "interpretation": _interpret_correlation(float(corr.iloc[i, j])), }) return {"tickers": tickers, "matrix": matrix, "pairs": pairs} def compute_rolling_correlation( returns_a: pd.Series, returns_b: pd.Series, window: int = 30, ) -> dict[str, Any]: """Rolling Pearson correlation between two return series.""" rolling = returns_a.rolling(window).corr(returns_b) dates = [d.strftime("%Y-%m-%d") if hasattr(d, "strftime") else str(d) for d in rolling.index] values = [None if np.isnan(v) else round(float(v), 4) for v in rolling.values] current = values[-1] if values else None return { "window": window, "dates": dates, "values": values, "current": current, } def compute_beta( asset_returns: pd.Series, benchmark_returns: pd.Series, ) -> dict[str, float]: """Compute beta and alpha of an asset vs. a benchmark. beta = Cov(asset, benchmark) / Var(benchmark) alpha = mean(asset) - beta * mean(benchmark) (annualized) """ aligned = pd.DataFrame({"asset": asset_returns, "bench": benchmark_returns}).dropna() if len(aligned) < 10: return {"beta": float("nan"), "alpha_annualized": float("nan")} cov = np.cov(aligned["asset"], aligned["bench"]) beta = float(cov[0, 1] / cov[1, 1]) if cov[1, 1] != 0 else float("nan") alpha_daily = float(aligned["asset"].mean() - beta * aligned["bench"].mean()) alpha_annual = alpha_daily * 252 return { "beta": round(beta, 4), "alpha_annualized": round(alpha_annual, 4), "interpretation": _interpret_beta(beta), } def _interpret_correlation(r: float) -> str: abs_r = abs(r) if abs_r >= 0.8: strength = "very strong" elif abs_r >= 0.6: strength = "strong" elif abs_r >= 0.4: strength = "moderate" elif abs_r >= 0.2: strength = "weak" else: strength = "very weak / negligible" direction = "positive" if r >= 0 else "negative" return f"{strength.capitalize()} {direction} correlation" def _interpret_beta(beta: float) -> str: if np.isnan(beta): return "Insufficient data" if beta > 1.5: return "Highly aggressive -- amplifies market moves significantly" if beta > 1.0: return "Aggressive -- moves more than the market" if beta > 0.8: return "Roughly market-neutral" if beta > 0.5: return "Defensive -- less volatile than the market" if beta > 0: return "Very defensive / low correlation with the market" return "Negative beta -- tends to move opposite to the market" class CorrelateAssetsTool(BaseTool): name = "correlate_assets" description = ( "Compute correlation matrix, rolling correlation, and beta for a " "list of tickers. Useful for portfolio diversification analysis " "and understanding co-movement." ) parameters = { "type": "object", "properties": { "tickers": { "type": "array", "description": "List of ticker symbols (at least 2).", }, "period": { "type": "string", "description": "Lookback period.", "default": "6mo", }, "rolling_window": { "type": "integer", "description": "Window for rolling correlation (in days).", "default": 30, }, "benchmark": { "type": "string", "description": "Benchmark ticker for beta calculation.", "default": "SPY", }, }, "required": ["tickers"], } async def execute(self, **kwargs: Any) -> ToolResult: tickers: list[str] = [t.upper().strip() for t in kwargs["tickers"]] period: str = kwargs.get("period", "6mo") rolling_window: int = kwargs.get("rolling_window", 30) benchmark: str = kwargs.get("benchmark", "SPY").upper() if len(tickers) < 2: return ToolResult( success=False, error="At least 2 tickers are required for correlation analysis.", ) # Fetch data for all tickers (+ benchmark if not already included). all_tickers = list(set(tickers + [benchmark])) from tools.market_data import FetchMarketDataTool md_tool = FetchMarketDataTool() close_data: dict[str, pd.Series] = {} for t in all_tickers: result = await md_tool.execute(ticker=t, interval="1d", period=period) if not result.success: return ToolResult( success=False, error=f"Could not fetch data for {t}: {result.error}", ) dates = pd.to_datetime(result.data["dates"]) close_data[t] = pd.Series(result.data["close"], index=dates, name=t) # Build returns DataFrame. price_df = pd.DataFrame(close_data).dropna() if len(price_df) < rolling_window: return ToolResult( success=False, error=( f"Only {len(price_df)} overlapping bars -- need at least " f"{rolling_window} for rolling correlation." ), ) returns_df = np.log(price_df / price_df.shift(1)).dropna() # Correlation matrix (only requested tickers). corr_result = compute_correlation_matrix(returns_df[tickers]) # Rolling correlation for each pair. rolling_results = [] for i in range(len(tickers)): for j in range(i + 1, len(tickers)): rc = compute_rolling_correlation( returns_df[tickers[i]], returns_df[tickers[j]], window=rolling_window, ) rc["ticker_a"] = tickers[i] rc["ticker_b"] = tickers[j] rolling_results.append(rc) # Beta vs. benchmark. betas = {} for t in tickers: if t == benchmark: betas[t] = {"beta": 1.0, "alpha_annualized": 0.0, "interpretation": "Benchmark itself"} continue betas[t] = compute_beta(returns_df[t], returns_df[benchmark]) return ToolResult( success=True, data={ "tickers": tickers, "benchmark": benchmark, "period": period, "correlation_matrix": corr_result, "rolling_correlations": rolling_results, "betas": betas, }, )