quanthedge / backend /app /services /analytics /attribution.py
jashdoshi77's picture
addes more section , improvments , scalability
c81e8a5
"""
P&L Attribution Engine.
Two attribution methodologies:
1. Brinson Attribution — allocation effect + selection effect vs benchmark
2. Factor-Based Attribution — decompose returns into factor contributions
Provides waterfall chart data for visualizing P&L breakdown.
"""
from __future__ import annotations
import logging
from typing import Any, Dict, List
import numpy as np
import pandas as pd
from app.config import get_settings
from app.services.data_ingestion.yahoo import yahoo_adapter
logger = logging.getLogger(__name__)
_settings = get_settings()
TRADING_DAYS = _settings.trading_days_per_year
async def brinson_attribution(
holdings: List[Dict[str, Any]],
total_value: float,
benchmark: str = "SPY",
period: str = "1y",
) -> Dict[str, Any]:
"""
Brinson-Fachler attribution: decompose portfolio P&L into
allocation effect, selection effect, and interaction effect.
"""
if not holdings or total_value <= 0:
return {"error": "No holdings for attribution"}
# Group holdings by sector/asset_class
sector_map: Dict[str, List[Dict]] = {}
for h in holdings:
sector = h.get("asset_class", h.get("sector", "equity"))
if sector not in sector_map:
sector_map[sector] = []
sector_map[sector].append(h)
# Compute benchmark returns
bench_df = await yahoo_adapter.get_price_dataframe(benchmark, period=period)
if bench_df.empty:
return {"error": "Could not fetch benchmark data"}
bench_returns = bench_df["Close"].pct_change().dropna()
bench_total_return = float((bench_df["Close"].iloc[-1] / bench_df["Close"].iloc[0]) - 1)
# Compute per-sector portfolio weights and returns
sectors = []
total_port_return = 0.0
for sector, sector_holdings in sector_map.items():
sector_value = sum(h.get("market_value", 0) for h in sector_holdings)
port_weight = sector_value / total_value
sector_pnl = sum(h.get("pnl", 0) for h in sector_holdings)
sector_cost = sum(h.get("cost_basis", h.get("avg_price", 0) * h.get("quantity", 0)) for h in sector_holdings)
sector_return = (sector_pnl / sector_cost) if sector_cost > 0 else 0
# Approximate benchmark weight (equal for simplicity)
bench_weight = 1.0 / max(len(sector_map), 1)
# Brinson decomposition
allocation_effect = (port_weight - bench_weight) * bench_total_return
selection_effect = bench_weight * (sector_return - bench_total_return)
interaction_effect = (port_weight - bench_weight) * (sector_return - bench_total_return)
total_effect = allocation_effect + selection_effect + interaction_effect
total_port_return += port_weight * sector_return
sectors.append({
"sector": sector,
"portfolio_weight": round(port_weight * 100, 2),
"benchmark_weight": round(bench_weight * 100, 2),
"portfolio_return": round(sector_return * 100, 2),
"benchmark_return": round(bench_total_return * 100, 2),
"allocation_effect": round(allocation_effect * 100, 4),
"selection_effect": round(selection_effect * 100, 4),
"interaction_effect": round(interaction_effect * 100, 4),
"total_effect": round(total_effect * 100, 4),
"pnl": round(sector_pnl, 2),
})
# Sort by total effect
sectors.sort(key=lambda x: x["total_effect"], reverse=True)
total_alpha = total_port_return - bench_total_return
# Waterfall chart data
waterfall = [{"label": "Benchmark", "value": round(bench_total_return * 100, 2), "type": "start"}]
for s in sectors:
waterfall.append({
"label": f"{s['sector']} (Alloc)",
"value": s["allocation_effect"],
"type": "positive" if s["allocation_effect"] >= 0 else "negative",
})
waterfall.append({
"label": f"{s['sector']} (Select)",
"value": s["selection_effect"],
"type": "positive" if s["selection_effect"] >= 0 else "negative",
})
waterfall.append({"label": "Portfolio Return", "value": round(total_port_return * 100, 2), "type": "total"})
return {
"method": "brinson",
"benchmark": benchmark,
"benchmark_return": round(bench_total_return * 100, 2),
"portfolio_return": round(total_port_return * 100, 2),
"alpha": round(total_alpha * 100, 2),
"sectors": sectors,
"waterfall": waterfall,
}
async def factor_attribution(
holdings: List[Dict[str, Any]],
total_value: float,
period: str = "1y",
) -> Dict[str, Any]:
"""
Factor-based attribution: decompose returns into contributions from
systematic factors (market, size, value, momentum, volatility).
"""
if not holdings or total_value <= 0:
return {"error": "No holdings for factor attribution"}
tickers = list(set(h.get("ticker", "") for h in holdings if h.get("market_value", 0) > 0))
# Fetch returns
returns_data = {}
for ticker in tickers:
try:
df = await yahoo_adapter.get_price_dataframe(ticker, period=period)
if not df.empty and len(df) > 20:
returns_data[ticker] = df["Close"].pct_change().dropna()
except Exception:
continue
if not returns_data:
return {"error": "Could not fetch return data"}
# Align
aligned = pd.DataFrame(returns_data).dropna()
if aligned.empty:
return {"error": "Insufficient overlapping data"}
# Compute portfolio returns
weights = np.array([
sum(h.get("market_value", 0) for h in holdings if h.get("ticker") == t) / total_value
for t in aligned.columns
])
port_returns = aligned.values @ weights
# Fetch factor proxies
factor_tickers = {
"market": "SPY",
"size": "IWM",
"value": "IVE",
"momentum": "MTUM",
"low_vol": "USMV",
}
factor_returns: Dict[str, pd.Series] = {}
for fname, fticker in factor_tickers.items():
try:
df = await yahoo_adapter.get_price_dataframe(fticker, period=period)
if not df.empty and len(df) > 20:
factor_returns[fname] = df["Close"].pct_change().dropna()
except Exception:
pass
if not factor_returns:
return {"error": "Could not fetch factor data"}
# Simple OLS regression for factor loadings
factor_df = pd.DataFrame(factor_returns).dropna()
min_len = min(len(port_returns), len(factor_df))
if min_len < 30:
return {"error": "Insufficient data for factor attribution"}
y = port_returns[:min_len]
X = factor_df.iloc[:min_len].values
# Add intercept
X_with_intercept = np.column_stack([np.ones(min_len), X])
try:
# Least squares
betas, _, _, _ = np.linalg.lstsq(X_with_intercept, y, rcond=None)
alpha_daily = betas[0]
factor_betas = betas[1:]
except Exception:
return {"error": "Regression failed"}
# Attribution
factor_names = list(factor_df.columns)
contributions = []
total_explained = 0.0
for i, fname in enumerate(factor_names):
avg_factor_ret = float(factor_df[fname].mean() * TRADING_DAYS)
contribution = float(factor_betas[i] * avg_factor_ret * 100)
total_explained += contribution
contributions.append({
"factor": fname,
"beta": round(float(factor_betas[i]), 4),
"factor_return": round(avg_factor_ret * 100, 2),
"contribution": round(contribution, 4),
})
alpha_contribution = round(float(alpha_daily * TRADING_DAYS * 100), 4)
total_return = round(float(np.mean(y) * TRADING_DAYS * 100), 2)
contributions.sort(key=lambda x: abs(x["contribution"]), reverse=True)
return {
"method": "factor_based",
"total_return": total_return,
"alpha": alpha_contribution,
"factor_contributions": contributions,
"total_explained": round(total_explained, 4),
"residual": round(total_return - total_explained - alpha_contribution, 4),
}