""" P&L Attribution Engine. Two attribution methodologies: 1. Brinson Attribution — allocation effect + selection effect vs benchmark 2. Factor-Based Attribution — decompose returns into factor contributions Provides waterfall chart data for visualizing P&L breakdown. """ from __future__ import annotations import logging from typing import Any, Dict, List import numpy as np import pandas as pd from app.config import get_settings from app.services.data_ingestion.yahoo import yahoo_adapter logger = logging.getLogger(__name__) _settings = get_settings() TRADING_DAYS = _settings.trading_days_per_year async def brinson_attribution( holdings: List[Dict[str, Any]], total_value: float, benchmark: str = "SPY", period: str = "1y", ) -> Dict[str, Any]: """ Brinson-Fachler attribution: decompose portfolio P&L into allocation effect, selection effect, and interaction effect. """ if not holdings or total_value <= 0: return {"error": "No holdings for attribution"} # Group holdings by sector/asset_class sector_map: Dict[str, List[Dict]] = {} for h in holdings: sector = h.get("asset_class", h.get("sector", "equity")) if sector not in sector_map: sector_map[sector] = [] sector_map[sector].append(h) # Compute benchmark returns bench_df = await yahoo_adapter.get_price_dataframe(benchmark, period=period) if bench_df.empty: return {"error": "Could not fetch benchmark data"} bench_returns = bench_df["Close"].pct_change().dropna() bench_total_return = float((bench_df["Close"].iloc[-1] / bench_df["Close"].iloc[0]) - 1) # Compute per-sector portfolio weights and returns sectors = [] total_port_return = 0.0 for sector, sector_holdings in sector_map.items(): sector_value = sum(h.get("market_value", 0) for h in sector_holdings) port_weight = sector_value / total_value sector_pnl = sum(h.get("pnl", 0) for h in sector_holdings) sector_cost = sum(h.get("cost_basis", h.get("avg_price", 0) * h.get("quantity", 0)) for h in sector_holdings) sector_return = (sector_pnl / sector_cost) if sector_cost > 0 else 0 # Approximate benchmark weight (equal for simplicity) bench_weight = 1.0 / max(len(sector_map), 1) # Brinson decomposition allocation_effect = (port_weight - bench_weight) * bench_total_return selection_effect = bench_weight * (sector_return - bench_total_return) interaction_effect = (port_weight - bench_weight) * (sector_return - bench_total_return) total_effect = allocation_effect + selection_effect + interaction_effect total_port_return += port_weight * sector_return sectors.append({ "sector": sector, "portfolio_weight": round(port_weight * 100, 2), "benchmark_weight": round(bench_weight * 100, 2), "portfolio_return": round(sector_return * 100, 2), "benchmark_return": round(bench_total_return * 100, 2), "allocation_effect": round(allocation_effect * 100, 4), "selection_effect": round(selection_effect * 100, 4), "interaction_effect": round(interaction_effect * 100, 4), "total_effect": round(total_effect * 100, 4), "pnl": round(sector_pnl, 2), }) # Sort by total effect sectors.sort(key=lambda x: x["total_effect"], reverse=True) total_alpha = total_port_return - bench_total_return # Waterfall chart data waterfall = [{"label": "Benchmark", "value": round(bench_total_return * 100, 2), "type": "start"}] for s in sectors: waterfall.append({ "label": f"{s['sector']} (Alloc)", "value": s["allocation_effect"], "type": "positive" if s["allocation_effect"] >= 0 else "negative", }) waterfall.append({ "label": f"{s['sector']} (Select)", "value": s["selection_effect"], "type": "positive" if s["selection_effect"] >= 0 else "negative", }) waterfall.append({"label": "Portfolio Return", "value": round(total_port_return * 100, 2), "type": "total"}) return { "method": "brinson", "benchmark": benchmark, "benchmark_return": round(bench_total_return * 100, 2), "portfolio_return": round(total_port_return * 100, 2), "alpha": round(total_alpha * 100, 2), "sectors": sectors, "waterfall": waterfall, } async def factor_attribution( holdings: List[Dict[str, Any]], total_value: float, period: str = "1y", ) -> Dict[str, Any]: """ Factor-based attribution: decompose returns into contributions from systematic factors (market, size, value, momentum, volatility). """ if not holdings or total_value <= 0: return {"error": "No holdings for factor attribution"} tickers = list(set(h.get("ticker", "") for h in holdings if h.get("market_value", 0) > 0)) # Fetch returns returns_data = {} for ticker in tickers: try: df = await yahoo_adapter.get_price_dataframe(ticker, period=period) if not df.empty and len(df) > 20: returns_data[ticker] = df["Close"].pct_change().dropna() except Exception: continue if not returns_data: return {"error": "Could not fetch return data"} # Align aligned = pd.DataFrame(returns_data).dropna() if aligned.empty: return {"error": "Insufficient overlapping data"} # Compute portfolio returns weights = np.array([ sum(h.get("market_value", 0) for h in holdings if h.get("ticker") == t) / total_value for t in aligned.columns ]) port_returns = aligned.values @ weights # Fetch factor proxies factor_tickers = { "market": "SPY", "size": "IWM", "value": "IVE", "momentum": "MTUM", "low_vol": "USMV", } factor_returns: Dict[str, pd.Series] = {} for fname, fticker in factor_tickers.items(): try: df = await yahoo_adapter.get_price_dataframe(fticker, period=period) if not df.empty and len(df) > 20: factor_returns[fname] = df["Close"].pct_change().dropna() except Exception: pass if not factor_returns: return {"error": "Could not fetch factor data"} # Simple OLS regression for factor loadings factor_df = pd.DataFrame(factor_returns).dropna() min_len = min(len(port_returns), len(factor_df)) if min_len < 30: return {"error": "Insufficient data for factor attribution"} y = port_returns[:min_len] X = factor_df.iloc[:min_len].values # Add intercept X_with_intercept = np.column_stack([np.ones(min_len), X]) try: # Least squares betas, _, _, _ = np.linalg.lstsq(X_with_intercept, y, rcond=None) alpha_daily = betas[0] factor_betas = betas[1:] except Exception: return {"error": "Regression failed"} # Attribution factor_names = list(factor_df.columns) contributions = [] total_explained = 0.0 for i, fname in enumerate(factor_names): avg_factor_ret = float(factor_df[fname].mean() * TRADING_DAYS) contribution = float(factor_betas[i] * avg_factor_ret * 100) total_explained += contribution contributions.append({ "factor": fname, "beta": round(float(factor_betas[i]), 4), "factor_return": round(avg_factor_ret * 100, 2), "contribution": round(contribution, 4), }) alpha_contribution = round(float(alpha_daily * TRADING_DAYS * 100), 4) total_return = round(float(np.mean(y) * TRADING_DAYS * 100), 2) contributions.sort(key=lambda x: abs(x["contribution"]), reverse=True) return { "method": "factor_based", "total_return": total_return, "alpha": alpha_contribution, "factor_contributions": contributions, "total_explained": round(total_explained, 4), "residual": round(total_return - total_explained - alpha_contribution, 4), }