agentic-market-analyzer / tools /correlation.py
WolfDavid's picture
Upload folder using huggingface_hub
75418e4 verified
"""
Tool: correlate_assets
Computes:
- Pairwise correlation matrix between multiple tickers.
- Rolling correlation over a configurable window.
- Beta of each ticker vs. a benchmark (default SPY).
All calculations use daily log returns for statistical robustness.
"""
from __future__ import annotations
import logging
from typing import Any
import numpy as np
import pandas as pd
from tools.base import BaseTool, ToolResult
logger = logging.getLogger(__name__)
def compute_correlation_matrix(
returns_df: pd.DataFrame,
) -> dict[str, Any]:
"""Pearson correlation matrix of log returns.
Parameters
----------
returns_df:
DataFrame where each column is a ticker's daily log returns.
Returns
-------
dict with ``tickers``, ``matrix`` (2-D list), and ``pairs`` (list of
{ticker_a, ticker_b, correlation}).
"""
corr = returns_df.corr()
tickers = list(corr.columns)
matrix = [[round(float(corr.iloc[i, j]), 4) for j in range(len(tickers))] for i in range(len(tickers))]
pairs = []
for i in range(len(tickers)):
for j in range(i + 1, len(tickers)):
pairs.append({
"ticker_a": tickers[i],
"ticker_b": tickers[j],
"correlation": round(float(corr.iloc[i, j]), 4),
"interpretation": _interpret_correlation(float(corr.iloc[i, j])),
})
return {"tickers": tickers, "matrix": matrix, "pairs": pairs}
def compute_rolling_correlation(
returns_a: pd.Series,
returns_b: pd.Series,
window: int = 30,
) -> dict[str, Any]:
"""Rolling Pearson correlation between two return series."""
rolling = returns_a.rolling(window).corr(returns_b)
dates = [d.strftime("%Y-%m-%d") if hasattr(d, "strftime") else str(d) for d in rolling.index]
values = [None if np.isnan(v) else round(float(v), 4) for v in rolling.values]
current = values[-1] if values else None
return {
"window": window,
"dates": dates,
"values": values,
"current": current,
}
def compute_beta(
asset_returns: pd.Series,
benchmark_returns: pd.Series,
) -> dict[str, float]:
"""Compute beta and alpha of an asset vs. a benchmark.
beta = Cov(asset, benchmark) / Var(benchmark)
alpha = mean(asset) - beta * mean(benchmark) (annualized)
"""
aligned = pd.DataFrame({"asset": asset_returns, "bench": benchmark_returns}).dropna()
if len(aligned) < 10:
return {"beta": float("nan"), "alpha_annualized": float("nan")}
cov = np.cov(aligned["asset"], aligned["bench"])
beta = float(cov[0, 1] / cov[1, 1]) if cov[1, 1] != 0 else float("nan")
alpha_daily = float(aligned["asset"].mean() - beta * aligned["bench"].mean())
alpha_annual = alpha_daily * 252
return {
"beta": round(beta, 4),
"alpha_annualized": round(alpha_annual, 4),
"interpretation": _interpret_beta(beta),
}
def _interpret_correlation(r: float) -> str:
abs_r = abs(r)
if abs_r >= 0.8:
strength = "very strong"
elif abs_r >= 0.6:
strength = "strong"
elif abs_r >= 0.4:
strength = "moderate"
elif abs_r >= 0.2:
strength = "weak"
else:
strength = "very weak / negligible"
direction = "positive" if r >= 0 else "negative"
return f"{strength.capitalize()} {direction} correlation"
def _interpret_beta(beta: float) -> str:
if np.isnan(beta):
return "Insufficient data"
if beta > 1.5:
return "Highly aggressive -- amplifies market moves significantly"
if beta > 1.0:
return "Aggressive -- moves more than the market"
if beta > 0.8:
return "Roughly market-neutral"
if beta > 0.5:
return "Defensive -- less volatile than the market"
if beta > 0:
return "Very defensive / low correlation with the market"
return "Negative beta -- tends to move opposite to the market"
class CorrelateAssetsTool(BaseTool):
name = "correlate_assets"
description = (
"Compute correlation matrix, rolling correlation, and beta for a "
"list of tickers. Useful for portfolio diversification analysis "
"and understanding co-movement."
)
parameters = {
"type": "object",
"properties": {
"tickers": {
"type": "array",
"description": "List of ticker symbols (at least 2).",
},
"period": {
"type": "string",
"description": "Lookback period.",
"default": "6mo",
},
"rolling_window": {
"type": "integer",
"description": "Window for rolling correlation (in days).",
"default": 30,
},
"benchmark": {
"type": "string",
"description": "Benchmark ticker for beta calculation.",
"default": "SPY",
},
},
"required": ["tickers"],
}
async def execute(self, **kwargs: Any) -> ToolResult:
tickers: list[str] = [t.upper().strip() for t in kwargs["tickers"]]
period: str = kwargs.get("period", "6mo")
rolling_window: int = kwargs.get("rolling_window", 30)
benchmark: str = kwargs.get("benchmark", "SPY").upper()
if len(tickers) < 2:
return ToolResult(
success=False,
error="At least 2 tickers are required for correlation analysis.",
)
# Fetch data for all tickers (+ benchmark if not already included).
all_tickers = list(set(tickers + [benchmark]))
from tools.market_data import FetchMarketDataTool
md_tool = FetchMarketDataTool()
close_data: dict[str, pd.Series] = {}
for t in all_tickers:
result = await md_tool.execute(ticker=t, interval="1d", period=period)
if not result.success:
return ToolResult(
success=False,
error=f"Could not fetch data for {t}: {result.error}",
)
dates = pd.to_datetime(result.data["dates"])
close_data[t] = pd.Series(result.data["close"], index=dates, name=t)
# Build returns DataFrame.
price_df = pd.DataFrame(close_data).dropna()
if len(price_df) < rolling_window:
return ToolResult(
success=False,
error=(
f"Only {len(price_df)} overlapping bars -- need at least "
f"{rolling_window} for rolling correlation."
),
)
returns_df = np.log(price_df / price_df.shift(1)).dropna()
# Correlation matrix (only requested tickers).
corr_result = compute_correlation_matrix(returns_df[tickers])
# Rolling correlation for each pair.
rolling_results = []
for i in range(len(tickers)):
for j in range(i + 1, len(tickers)):
rc = compute_rolling_correlation(
returns_df[tickers[i]],
returns_df[tickers[j]],
window=rolling_window,
)
rc["ticker_a"] = tickers[i]
rc["ticker_b"] = tickers[j]
rolling_results.append(rc)
# Beta vs. benchmark.
betas = {}
for t in tickers:
if t == benchmark:
betas[t] = {"beta": 1.0, "alpha_annualized": 0.0, "interpretation": "Benchmark itself"}
continue
betas[t] = compute_beta(returns_df[t], returns_df[benchmark])
return ToolResult(
success=True,
data={
"tickers": tickers,
"benchmark": benchmark,
"period": period,
"correlation_matrix": corr_result,
"rolling_correlations": rolling_results,
"betas": betas,
},
)