Spaces:
Running
Running
File size: 10,791 Bytes
b0e15c1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 | """
Portfolio Engine: -p mode
Loads a client CSV, matches holdings to the fund universe,
computes portfolio metrics, exposure checks, and wealth projection.
"""
import csv
import numpy as np
from pathlib import Path
from typing import List, Optional, Dict
from src.models import Fund, Client, ClientHolding, Advisor, PortfolioReport
# βββ Client CSV Loader βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def load_client_csv(csv_path: str) -> tuple[Client, List[ClientHolding]]:
"""
Load client data from CSV.
Expected CSV format:
Line 1: Name, Age, Email, Mobile[, PAN]
Line 2+: Scheme Name, Current Value, Invested Amount, SIP Amount, SIP Frequency
Example:
Parthiban,45,parthiban@gmail.com,9876543210,ABCDE1234F
Nippon India Small Cap Fund,280923,200000,5000,Monthly
HDFC Mid Cap Fund,134562,120000,3000,Monthly
"""
csv_path = Path(csv_path)
if not csv_path.exists():
raise FileNotFoundError(f"Client CSV not found: {csv_path}")
with open(csv_path, encoding='utf-8-sig', errors='replace') as f:
reader = csv.reader(f)
rows = [r for r in reader if any(c.strip() for c in r)]
if not rows:
raise ValueError("Client CSV is empty")
# Parse client info from first row
info = rows[0]
client = Client(
name=info[0].strip() if len(info) > 0 else "Unknown",
age=int(info[1]) if len(info) > 1 and info[1].strip().isdigit() else None,
email=info[2].strip() if len(info) > 2 else None,
mobile=info[3].strip() if len(info) > 3 else None,
pan=info[4].strip() if len(info) > 4 else None,
)
# Parse holdings from remaining rows
holdings: List[ClientHolding] = []
for row in rows[1:]:
if not row or not row[0].strip():
continue
# Skip header-like rows
if row[0].strip().lower() in ('scheme name', 'fund', 'scheme'):
continue
def safe_float(v):
try:
return float(str(v).replace(',', '').strip())
except (ValueError, TypeError):
return None
holding = ClientHolding(
scheme_name=row[0].strip(),
current_value=safe_float(row[1]) or 0.0,
invested_amount=safe_float(row[2]) if len(row) > 2 else None,
sip_amount=safe_float(row[3]) if len(row) > 3 else None,
sip_frequency=row[4].strip() if len(row) > 4 else None,
)
holdings.append(holding)
return client, holdings
# βββ Fund Matcher ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def match_holdings_to_funds(holdings: List[ClientHolding], funds: List[Fund]) -> List[ClientHolding]:
"""
Fuzzy-match each client holding to a fund in the universe.
Uses token overlap on lowercased fund names.
"""
def tokenize(name: str) -> set:
stopwords = {'fund', 'regular', 'plan', 'growth', 'option', 'direct',
'idcw', 'div', 'dividend', '-', 'the', 'india', 'of'}
tokens = set(name.lower().replace('-', ' ').split())
return tokens - stopwords
fund_tokens = [(f, tokenize(f.name)) for f in funds]
for holding in holdings:
h_tokens = tokenize(holding.scheme_name)
if not h_tokens:
continue
best_fund = None
best_score = 0
for fund, f_tokens in fund_tokens:
if not f_tokens:
continue
intersection = len(h_tokens & f_tokens)
union = len(h_tokens | f_tokens)
jaccard = intersection / union if union else 0
if jaccard > best_score:
best_score = jaccard
best_fund = fund
if best_score > 0.15: # minimum match threshold
holding.fund = best_fund
return holdings
# βββ Portfolio Analysis ββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def compute_allocation(holdings: List[ClientHolding]) -> List[ClientHolding]:
"""Compute each holding's % allocation of total portfolio."""
total = sum(h.current_value for h in holdings)
if total == 0:
return holdings
for h in holdings:
h.allocation_pct = round((h.current_value / total) * 100, 2)
return holdings
def check_exposure(holdings: List[ClientHolding]) -> tuple[Dict, Dict, List[str]]:
"""
Check AMC and scheme-level exposure.
Returns (amc_exposure, scheme_exposure, warnings).
"""
total = sum(h.current_value for h in holdings)
if total == 0:
return {}, {}, []
amc_exposure: Dict[str, float] = {}
scheme_exposure: Dict[str, float] = {}
warnings: List[str] = []
for h in holdings:
pct = h.allocation_pct
scheme_exposure[h.scheme_name] = pct
# Extract AMC name (first word(s) before "-")
amc = h.scheme_name.split('-')[0].strip()
amc_exposure[amc] = amc_exposure.get(amc, 0) + pct
THRESHOLD = 20.0
for amc, pct in amc_exposure.items():
if pct > THRESHOLD:
warnings.append(f"β οΈ AMC Exposure Alert: {amc} = {pct:.1f}% (>{THRESHOLD}% threshold)")
for scheme, pct in scheme_exposure.items():
if pct > THRESHOLD:
warnings.append(f"β οΈ Scheme Exposure Alert: {scheme} = {pct:.1f}% (>{THRESHOLD}% threshold)")
return amc_exposure, scheme_exposure, warnings
def compute_portfolio_metrics(holdings: List[ClientHolding]) -> Dict:
"""
Compute portfolio-level weighted average risk metrics.
"""
total = sum(h.current_value for h in holdings)
if total == 0:
return {}
metrics = {"sharpe": 0.0, "alpha": 0.0, "beta": 0.0, "std_dev": 0.0}
for h in holdings:
w = h.current_value / total
if h.fund:
if h.fund.sharpe is not None:
metrics["sharpe"] += w * h.fund.sharpe
if h.fund.alpha is not None:
metrics["alpha"] += w * h.fund.alpha
if h.fund.beta is not None:
metrics["beta"] += w * h.fund.beta
if h.fund.std_dev is not None:
metrics["std_dev"] += w * h.fund.std_dev
return {k: round(v, 4) for k, v in metrics.items()}
def flag_underperformers(holdings: List[ClientHolding]) -> List[ClientHolding]:
"""
Flag a holding as underperforming if its fund's CAGR fails to outperform
EITHER the BM Index OR the Category Average across multiple time periods.
Rule (from senior advisor's framework):
A fund's CAGR should:
1. Outperform the BM Index across time periods (1Y, 3Y, 5Y)
2. Outperform the category average across time periods
3. Have superior risk metrics (handled separately via score)
A fund is flagged if it underperforms on 2+ out of 3 periods
on EITHER benchmark OR category average.
"""
PERIODS = [
("1Y", "cagr_1y", "cagr_1y_bm", "cagr_1y_cat"),
("3Y", "cagr_3y", "cagr_3y_bm", "cagr_3y_cat"),
("5Y", "cagr_5y", "cagr_5y_bm", "cagr_5y_cat"),
]
for h in holdings:
f = h.fund
if not f:
continue
bm_fails = 0
cat_fails = 0
checked = 0
for label, cagr_attr, bm_attr, cat_attr in PERIODS:
fund_cagr = getattr(f, cagr_attr, None)
bm_cagr = getattr(f, bm_attr, None)
cat_cagr = getattr(f, cat_attr, None)
if fund_cagr is None:
continue
checked += 1
if bm_cagr is not None and fund_cagr < bm_cagr:
bm_fails += 1
if cat_cagr is not None and fund_cagr < cat_cagr:
cat_fails += 1
# Flag if underperforms BM on 2+ periods OR underperforms category on 2+ periods
if checked > 0 and (bm_fails >= 2 or cat_fails >= 2):
h.is_underperforming = True
return holdings
def compute_wealth_projection(total_value: float, years_list: list = [5, 10, 15, 20],
rate: float = 0.12) -> Dict:
"""Project portfolio value at a fixed annual return rate."""
return {
yr: round(total_value * ((1 + rate) ** yr), 2)
for yr in years_list
}
# βββ Main entry ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def run_portfolio_engine(
client_csv: str,
fund_universe: List[Fund],
advisor: Optional[Advisor] = None,
) -> PortfolioReport:
"""
Full pipeline: load client β match funds β analyse β build report object.
"""
if advisor is None:
advisor = Advisor()
print(f"π Loading client data from: {client_csv}")
client, holdings = load_client_csv(client_csv)
print(f" Client: {client.name} | Holdings: {len(holdings)}")
print("π Matching holdings to fund universe...")
holdings = match_holdings_to_funds(holdings, fund_universe)
matched = sum(1 for h in holdings if h.fund is not None)
print(f" Matched {matched}/{len(holdings)} holdings")
holdings = compute_allocation(holdings)
amc_exp, scheme_exp, warnings = check_exposure(holdings)
holdings = flag_underperformers(holdings)
metrics = compute_portfolio_metrics(holdings)
total_current = sum(h.current_value for h in holdings)
total_invested = sum(h.invested_amount or 0 for h in holdings)
wealth_projection = compute_wealth_projection(total_current)
report = PortfolioReport(
client=client,
advisor=advisor,
holdings=holdings,
total_current_value=total_current,
total_invested=total_invested,
unrealized_gain=total_current - total_invested,
sharpe=metrics.get("sharpe"),
alpha=metrics.get("alpha"),
beta=metrics.get("beta"),
std_dev=metrics.get("std_dev"),
amc_exposure=amc_exp,
scheme_exposure=scheme_exp,
exposure_warnings=warnings,
wealth_projection=wealth_projection,
)
if warnings:
print("\n".join(warnings))
return report
|