Spaces:
Sleeping
Sleeping
| """ | |
| P&L Attribution Engine. | |
| Two attribution methodologies: | |
| 1. Brinson Attribution — allocation effect + selection effect vs benchmark | |
| 2. Factor-Based Attribution — decompose returns into factor contributions | |
| Provides waterfall chart data for visualizing P&L breakdown. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from typing import Any, Dict, List | |
| import numpy as np | |
| import pandas as pd | |
| from app.config import get_settings | |
| from app.services.data_ingestion.yahoo import yahoo_adapter | |
| logger = logging.getLogger(__name__) | |
| _settings = get_settings() | |
| TRADING_DAYS = _settings.trading_days_per_year | |
| async def brinson_attribution( | |
| holdings: List[Dict[str, Any]], | |
| total_value: float, | |
| benchmark: str = "SPY", | |
| period: str = "1y", | |
| ) -> Dict[str, Any]: | |
| """ | |
| Brinson-Fachler attribution: decompose portfolio P&L into | |
| allocation effect, selection effect, and interaction effect. | |
| """ | |
| if not holdings or total_value <= 0: | |
| return {"error": "No holdings for attribution"} | |
| # Group holdings by sector/asset_class | |
| sector_map: Dict[str, List[Dict]] = {} | |
| for h in holdings: | |
| sector = h.get("asset_class", h.get("sector", "equity")) | |
| if sector not in sector_map: | |
| sector_map[sector] = [] | |
| sector_map[sector].append(h) | |
| # Compute benchmark returns | |
| bench_df = await yahoo_adapter.get_price_dataframe(benchmark, period=period) | |
| if bench_df.empty: | |
| return {"error": "Could not fetch benchmark data"} | |
| bench_returns = bench_df["Close"].pct_change().dropna() | |
| bench_total_return = float((bench_df["Close"].iloc[-1] / bench_df["Close"].iloc[0]) - 1) | |
| # Compute per-sector portfolio weights and returns | |
| sectors = [] | |
| total_port_return = 0.0 | |
| for sector, sector_holdings in sector_map.items(): | |
| sector_value = sum(h.get("market_value", 0) for h in sector_holdings) | |
| port_weight = sector_value / total_value | |
| sector_pnl = sum(h.get("pnl", 0) for h in sector_holdings) | |
| sector_cost = sum(h.get("cost_basis", h.get("avg_price", 0) * h.get("quantity", 0)) for h in sector_holdings) | |
| sector_return = (sector_pnl / sector_cost) if sector_cost > 0 else 0 | |
| # Approximate benchmark weight (equal for simplicity) | |
| bench_weight = 1.0 / max(len(sector_map), 1) | |
| # Brinson decomposition | |
| allocation_effect = (port_weight - bench_weight) * bench_total_return | |
| selection_effect = bench_weight * (sector_return - bench_total_return) | |
| interaction_effect = (port_weight - bench_weight) * (sector_return - bench_total_return) | |
| total_effect = allocation_effect + selection_effect + interaction_effect | |
| total_port_return += port_weight * sector_return | |
| sectors.append({ | |
| "sector": sector, | |
| "portfolio_weight": round(port_weight * 100, 2), | |
| "benchmark_weight": round(bench_weight * 100, 2), | |
| "portfolio_return": round(sector_return * 100, 2), | |
| "benchmark_return": round(bench_total_return * 100, 2), | |
| "allocation_effect": round(allocation_effect * 100, 4), | |
| "selection_effect": round(selection_effect * 100, 4), | |
| "interaction_effect": round(interaction_effect * 100, 4), | |
| "total_effect": round(total_effect * 100, 4), | |
| "pnl": round(sector_pnl, 2), | |
| }) | |
| # Sort by total effect | |
| sectors.sort(key=lambda x: x["total_effect"], reverse=True) | |
| total_alpha = total_port_return - bench_total_return | |
| # Waterfall chart data | |
| waterfall = [{"label": "Benchmark", "value": round(bench_total_return * 100, 2), "type": "start"}] | |
| for s in sectors: | |
| waterfall.append({ | |
| "label": f"{s['sector']} (Alloc)", | |
| "value": s["allocation_effect"], | |
| "type": "positive" if s["allocation_effect"] >= 0 else "negative", | |
| }) | |
| waterfall.append({ | |
| "label": f"{s['sector']} (Select)", | |
| "value": s["selection_effect"], | |
| "type": "positive" if s["selection_effect"] >= 0 else "negative", | |
| }) | |
| waterfall.append({"label": "Portfolio Return", "value": round(total_port_return * 100, 2), "type": "total"}) | |
| return { | |
| "method": "brinson", | |
| "benchmark": benchmark, | |
| "benchmark_return": round(bench_total_return * 100, 2), | |
| "portfolio_return": round(total_port_return * 100, 2), | |
| "alpha": round(total_alpha * 100, 2), | |
| "sectors": sectors, | |
| "waterfall": waterfall, | |
| } | |
| async def factor_attribution( | |
| holdings: List[Dict[str, Any]], | |
| total_value: float, | |
| period: str = "1y", | |
| ) -> Dict[str, Any]: | |
| """ | |
| Factor-based attribution: decompose returns into contributions from | |
| systematic factors (market, size, value, momentum, volatility). | |
| """ | |
| if not holdings or total_value <= 0: | |
| return {"error": "No holdings for factor attribution"} | |
| tickers = list(set(h.get("ticker", "") for h in holdings if h.get("market_value", 0) > 0)) | |
| # Fetch returns | |
| returns_data = {} | |
| for ticker in tickers: | |
| try: | |
| df = await yahoo_adapter.get_price_dataframe(ticker, period=period) | |
| if not df.empty and len(df) > 20: | |
| returns_data[ticker] = df["Close"].pct_change().dropna() | |
| except Exception: | |
| continue | |
| if not returns_data: | |
| return {"error": "Could not fetch return data"} | |
| # Align | |
| aligned = pd.DataFrame(returns_data).dropna() | |
| if aligned.empty: | |
| return {"error": "Insufficient overlapping data"} | |
| # Compute portfolio returns | |
| weights = np.array([ | |
| sum(h.get("market_value", 0) for h in holdings if h.get("ticker") == t) / total_value | |
| for t in aligned.columns | |
| ]) | |
| port_returns = aligned.values @ weights | |
| # Fetch factor proxies | |
| factor_tickers = { | |
| "market": "SPY", | |
| "size": "IWM", | |
| "value": "IVE", | |
| "momentum": "MTUM", | |
| "low_vol": "USMV", | |
| } | |
| factor_returns: Dict[str, pd.Series] = {} | |
| for fname, fticker in factor_tickers.items(): | |
| try: | |
| df = await yahoo_adapter.get_price_dataframe(fticker, period=period) | |
| if not df.empty and len(df) > 20: | |
| factor_returns[fname] = df["Close"].pct_change().dropna() | |
| except Exception: | |
| pass | |
| if not factor_returns: | |
| return {"error": "Could not fetch factor data"} | |
| # Simple OLS regression for factor loadings | |
| factor_df = pd.DataFrame(factor_returns).dropna() | |
| min_len = min(len(port_returns), len(factor_df)) | |
| if min_len < 30: | |
| return {"error": "Insufficient data for factor attribution"} | |
| y = port_returns[:min_len] | |
| X = factor_df.iloc[:min_len].values | |
| # Add intercept | |
| X_with_intercept = np.column_stack([np.ones(min_len), X]) | |
| try: | |
| # Least squares | |
| betas, _, _, _ = np.linalg.lstsq(X_with_intercept, y, rcond=None) | |
| alpha_daily = betas[0] | |
| factor_betas = betas[1:] | |
| except Exception: | |
| return {"error": "Regression failed"} | |
| # Attribution | |
| factor_names = list(factor_df.columns) | |
| contributions = [] | |
| total_explained = 0.0 | |
| for i, fname in enumerate(factor_names): | |
| avg_factor_ret = float(factor_df[fname].mean() * TRADING_DAYS) | |
| contribution = float(factor_betas[i] * avg_factor_ret * 100) | |
| total_explained += contribution | |
| contributions.append({ | |
| "factor": fname, | |
| "beta": round(float(factor_betas[i]), 4), | |
| "factor_return": round(avg_factor_ret * 100, 2), | |
| "contribution": round(contribution, 4), | |
| }) | |
| alpha_contribution = round(float(alpha_daily * TRADING_DAYS * 100), 4) | |
| total_return = round(float(np.mean(y) * TRADING_DAYS * 100), 2) | |
| contributions.sort(key=lambda x: abs(x["contribution"]), reverse=True) | |
| return { | |
| "method": "factor_based", | |
| "total_return": total_return, | |
| "alpha": alpha_contribution, | |
| "factor_contributions": contributions, | |
| "total_explained": round(total_explained, 4), | |
| "residual": round(total_return - total_explained - alpha_contribution, 4), | |
| } | |