Spaces:
Sleeping
Sleeping
| """ | |
| Tool: correlate_assets | |
| Computes: | |
| - Pairwise correlation matrix between multiple tickers. | |
| - Rolling correlation over a configurable window. | |
| - Beta of each ticker vs. a benchmark (default SPY). | |
| All calculations use daily log returns for statistical robustness. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from typing import Any | |
| import numpy as np | |
| import pandas as pd | |
| from tools.base import BaseTool, ToolResult | |
| logger = logging.getLogger(__name__) | |
| def compute_correlation_matrix( | |
| returns_df: pd.DataFrame, | |
| ) -> dict[str, Any]: | |
| """Pearson correlation matrix of log returns. | |
| Parameters | |
| ---------- | |
| returns_df: | |
| DataFrame where each column is a ticker's daily log returns. | |
| Returns | |
| ------- | |
| dict with ``tickers``, ``matrix`` (2-D list), and ``pairs`` (list of | |
| {ticker_a, ticker_b, correlation}). | |
| """ | |
| corr = returns_df.corr() | |
| tickers = list(corr.columns) | |
| matrix = [[round(float(corr.iloc[i, j]), 4) for j in range(len(tickers))] for i in range(len(tickers))] | |
| pairs = [] | |
| for i in range(len(tickers)): | |
| for j in range(i + 1, len(tickers)): | |
| pairs.append({ | |
| "ticker_a": tickers[i], | |
| "ticker_b": tickers[j], | |
| "correlation": round(float(corr.iloc[i, j]), 4), | |
| "interpretation": _interpret_correlation(float(corr.iloc[i, j])), | |
| }) | |
| return {"tickers": tickers, "matrix": matrix, "pairs": pairs} | |
| def compute_rolling_correlation( | |
| returns_a: pd.Series, | |
| returns_b: pd.Series, | |
| window: int = 30, | |
| ) -> dict[str, Any]: | |
| """Rolling Pearson correlation between two return series.""" | |
| rolling = returns_a.rolling(window).corr(returns_b) | |
| dates = [d.strftime("%Y-%m-%d") if hasattr(d, "strftime") else str(d) for d in rolling.index] | |
| values = [None if np.isnan(v) else round(float(v), 4) for v in rolling.values] | |
| current = values[-1] if values else None | |
| return { | |
| "window": window, | |
| "dates": dates, | |
| "values": values, | |
| "current": current, | |
| } | |
| def compute_beta( | |
| asset_returns: pd.Series, | |
| benchmark_returns: pd.Series, | |
| ) -> dict[str, float]: | |
| """Compute beta and alpha of an asset vs. a benchmark. | |
| beta = Cov(asset, benchmark) / Var(benchmark) | |
| alpha = mean(asset) - beta * mean(benchmark) (annualized) | |
| """ | |
| aligned = pd.DataFrame({"asset": asset_returns, "bench": benchmark_returns}).dropna() | |
| if len(aligned) < 10: | |
| return {"beta": float("nan"), "alpha_annualized": float("nan")} | |
| cov = np.cov(aligned["asset"], aligned["bench"]) | |
| beta = float(cov[0, 1] / cov[1, 1]) if cov[1, 1] != 0 else float("nan") | |
| alpha_daily = float(aligned["asset"].mean() - beta * aligned["bench"].mean()) | |
| alpha_annual = alpha_daily * 252 | |
| return { | |
| "beta": round(beta, 4), | |
| "alpha_annualized": round(alpha_annual, 4), | |
| "interpretation": _interpret_beta(beta), | |
| } | |
| def _interpret_correlation(r: float) -> str: | |
| abs_r = abs(r) | |
| if abs_r >= 0.8: | |
| strength = "very strong" | |
| elif abs_r >= 0.6: | |
| strength = "strong" | |
| elif abs_r >= 0.4: | |
| strength = "moderate" | |
| elif abs_r >= 0.2: | |
| strength = "weak" | |
| else: | |
| strength = "very weak / negligible" | |
| direction = "positive" if r >= 0 else "negative" | |
| return f"{strength.capitalize()} {direction} correlation" | |
| def _interpret_beta(beta: float) -> str: | |
| if np.isnan(beta): | |
| return "Insufficient data" | |
| if beta > 1.5: | |
| return "Highly aggressive -- amplifies market moves significantly" | |
| if beta > 1.0: | |
| return "Aggressive -- moves more than the market" | |
| if beta > 0.8: | |
| return "Roughly market-neutral" | |
| if beta > 0.5: | |
| return "Defensive -- less volatile than the market" | |
| if beta > 0: | |
| return "Very defensive / low correlation with the market" | |
| return "Negative beta -- tends to move opposite to the market" | |
| class CorrelateAssetsTool(BaseTool): | |
| name = "correlate_assets" | |
| description = ( | |
| "Compute correlation matrix, rolling correlation, and beta for a " | |
| "list of tickers. Useful for portfolio diversification analysis " | |
| "and understanding co-movement." | |
| ) | |
| parameters = { | |
| "type": "object", | |
| "properties": { | |
| "tickers": { | |
| "type": "array", | |
| "description": "List of ticker symbols (at least 2).", | |
| }, | |
| "period": { | |
| "type": "string", | |
| "description": "Lookback period.", | |
| "default": "6mo", | |
| }, | |
| "rolling_window": { | |
| "type": "integer", | |
| "description": "Window for rolling correlation (in days).", | |
| "default": 30, | |
| }, | |
| "benchmark": { | |
| "type": "string", | |
| "description": "Benchmark ticker for beta calculation.", | |
| "default": "SPY", | |
| }, | |
| }, | |
| "required": ["tickers"], | |
| } | |
| async def execute(self, **kwargs: Any) -> ToolResult: | |
| tickers: list[str] = [t.upper().strip() for t in kwargs["tickers"]] | |
| period: str = kwargs.get("period", "6mo") | |
| rolling_window: int = kwargs.get("rolling_window", 30) | |
| benchmark: str = kwargs.get("benchmark", "SPY").upper() | |
| if len(tickers) < 2: | |
| return ToolResult( | |
| success=False, | |
| error="At least 2 tickers are required for correlation analysis.", | |
| ) | |
| # Fetch data for all tickers (+ benchmark if not already included). | |
| all_tickers = list(set(tickers + [benchmark])) | |
| from tools.market_data import FetchMarketDataTool | |
| md_tool = FetchMarketDataTool() | |
| close_data: dict[str, pd.Series] = {} | |
| for t in all_tickers: | |
| result = await md_tool.execute(ticker=t, interval="1d", period=period) | |
| if not result.success: | |
| return ToolResult( | |
| success=False, | |
| error=f"Could not fetch data for {t}: {result.error}", | |
| ) | |
| dates = pd.to_datetime(result.data["dates"]) | |
| close_data[t] = pd.Series(result.data["close"], index=dates, name=t) | |
| # Build returns DataFrame. | |
| price_df = pd.DataFrame(close_data).dropna() | |
| if len(price_df) < rolling_window: | |
| return ToolResult( | |
| success=False, | |
| error=( | |
| f"Only {len(price_df)} overlapping bars -- need at least " | |
| f"{rolling_window} for rolling correlation." | |
| ), | |
| ) | |
| returns_df = np.log(price_df / price_df.shift(1)).dropna() | |
| # Correlation matrix (only requested tickers). | |
| corr_result = compute_correlation_matrix(returns_df[tickers]) | |
| # Rolling correlation for each pair. | |
| rolling_results = [] | |
| for i in range(len(tickers)): | |
| for j in range(i + 1, len(tickers)): | |
| rc = compute_rolling_correlation( | |
| returns_df[tickers[i]], | |
| returns_df[tickers[j]], | |
| window=rolling_window, | |
| ) | |
| rc["ticker_a"] = tickers[i] | |
| rc["ticker_b"] = tickers[j] | |
| rolling_results.append(rc) | |
| # Beta vs. benchmark. | |
| betas = {} | |
| for t in tickers: | |
| if t == benchmark: | |
| betas[t] = {"beta": 1.0, "alpha_annualized": 0.0, "interpretation": "Benchmark itself"} | |
| continue | |
| betas[t] = compute_beta(returns_df[t], returns_df[benchmark]) | |
| return ToolResult( | |
| success=True, | |
| data={ | |
| "tickers": tickers, | |
| "benchmark": benchmark, | |
| "period": period, | |
| "correlation_matrix": corr_result, | |
| "rolling_correlations": rolling_results, | |
| "betas": betas, | |
| }, | |
| ) | |