MF / src /portfolio_engine.py
Parthiban97's picture
Upload 15 files
b0e15c1 verified
"""
Portfolio Engine: -p mode
Loads a client CSV, matches holdings to the fund universe,
computes portfolio metrics, exposure checks, and wealth projection.
"""
import csv
import numpy as np
from pathlib import Path
from typing import List, Optional, Dict
from src.models import Fund, Client, ClientHolding, Advisor, PortfolioReport
# ─── Client CSV Loader ───────────────────────────────────────────────────────
def load_client_csv(csv_path: str) -> tuple[Client, List[ClientHolding]]:
"""
Load client data from CSV.
Expected CSV format:
Line 1: Name, Age, Email, Mobile[, PAN]
Line 2+: Scheme Name, Current Value, Invested Amount, SIP Amount, SIP Frequency
Example:
Parthiban,45,parthiban@gmail.com,9876543210,ABCDE1234F
Nippon India Small Cap Fund,280923,200000,5000,Monthly
HDFC Mid Cap Fund,134562,120000,3000,Monthly
"""
csv_path = Path(csv_path)
if not csv_path.exists():
raise FileNotFoundError(f"Client CSV not found: {csv_path}")
with open(csv_path, encoding='utf-8-sig', errors='replace') as f:
reader = csv.reader(f)
rows = [r for r in reader if any(c.strip() for c in r)]
if not rows:
raise ValueError("Client CSV is empty")
# Parse client info from first row
info = rows[0]
client = Client(
name=info[0].strip() if len(info) > 0 else "Unknown",
age=int(info[1]) if len(info) > 1 and info[1].strip().isdigit() else None,
email=info[2].strip() if len(info) > 2 else None,
mobile=info[3].strip() if len(info) > 3 else None,
pan=info[4].strip() if len(info) > 4 else None,
)
# Parse holdings from remaining rows
holdings: List[ClientHolding] = []
for row in rows[1:]:
if not row or not row[0].strip():
continue
# Skip header-like rows
if row[0].strip().lower() in ('scheme name', 'fund', 'scheme'):
continue
def safe_float(v):
try:
return float(str(v).replace(',', '').strip())
except (ValueError, TypeError):
return None
holding = ClientHolding(
scheme_name=row[0].strip(),
current_value=safe_float(row[1]) or 0.0,
invested_amount=safe_float(row[2]) if len(row) > 2 else None,
sip_amount=safe_float(row[3]) if len(row) > 3 else None,
sip_frequency=row[4].strip() if len(row) > 4 else None,
)
holdings.append(holding)
return client, holdings
# ─── Fund Matcher ────────────────────────────────────────────────────────────
def match_holdings_to_funds(holdings: List[ClientHolding], funds: List[Fund]) -> List[ClientHolding]:
"""
Fuzzy-match each client holding to a fund in the universe.
Uses token overlap on lowercased fund names.
"""
def tokenize(name: str) -> set:
stopwords = {'fund', 'regular', 'plan', 'growth', 'option', 'direct',
'idcw', 'div', 'dividend', '-', 'the', 'india', 'of'}
tokens = set(name.lower().replace('-', ' ').split())
return tokens - stopwords
fund_tokens = [(f, tokenize(f.name)) for f in funds]
for holding in holdings:
h_tokens = tokenize(holding.scheme_name)
if not h_tokens:
continue
best_fund = None
best_score = 0
for fund, f_tokens in fund_tokens:
if not f_tokens:
continue
intersection = len(h_tokens & f_tokens)
union = len(h_tokens | f_tokens)
jaccard = intersection / union if union else 0
if jaccard > best_score:
best_score = jaccard
best_fund = fund
if best_score > 0.15: # minimum match threshold
holding.fund = best_fund
return holdings
# ─── Portfolio Analysis ──────────────────────────────────────────────────────
def compute_allocation(holdings: List[ClientHolding]) -> List[ClientHolding]:
"""Compute each holding's % allocation of total portfolio."""
total = sum(h.current_value for h in holdings)
if total == 0:
return holdings
for h in holdings:
h.allocation_pct = round((h.current_value / total) * 100, 2)
return holdings
def check_exposure(holdings: List[ClientHolding]) -> tuple[Dict, Dict, List[str]]:
"""
Check AMC and scheme-level exposure.
Returns (amc_exposure, scheme_exposure, warnings).
"""
total = sum(h.current_value for h in holdings)
if total == 0:
return {}, {}, []
amc_exposure: Dict[str, float] = {}
scheme_exposure: Dict[str, float] = {}
warnings: List[str] = []
for h in holdings:
pct = h.allocation_pct
scheme_exposure[h.scheme_name] = pct
# Extract AMC name (first word(s) before "-")
amc = h.scheme_name.split('-')[0].strip()
amc_exposure[amc] = amc_exposure.get(amc, 0) + pct
THRESHOLD = 20.0
for amc, pct in amc_exposure.items():
if pct > THRESHOLD:
warnings.append(f"⚠️ AMC Exposure Alert: {amc} = {pct:.1f}% (>{THRESHOLD}% threshold)")
for scheme, pct in scheme_exposure.items():
if pct > THRESHOLD:
warnings.append(f"⚠️ Scheme Exposure Alert: {scheme} = {pct:.1f}% (>{THRESHOLD}% threshold)")
return amc_exposure, scheme_exposure, warnings
def compute_portfolio_metrics(holdings: List[ClientHolding]) -> Dict:
"""
Compute portfolio-level weighted average risk metrics.
"""
total = sum(h.current_value for h in holdings)
if total == 0:
return {}
metrics = {"sharpe": 0.0, "alpha": 0.0, "beta": 0.0, "std_dev": 0.0}
for h in holdings:
w = h.current_value / total
if h.fund:
if h.fund.sharpe is not None:
metrics["sharpe"] += w * h.fund.sharpe
if h.fund.alpha is not None:
metrics["alpha"] += w * h.fund.alpha
if h.fund.beta is not None:
metrics["beta"] += w * h.fund.beta
if h.fund.std_dev is not None:
metrics["std_dev"] += w * h.fund.std_dev
return {k: round(v, 4) for k, v in metrics.items()}
def flag_underperformers(holdings: List[ClientHolding]) -> List[ClientHolding]:
"""
Flag a holding as underperforming if its fund's CAGR fails to outperform
EITHER the BM Index OR the Category Average across multiple time periods.
Rule (from senior advisor's framework):
A fund's CAGR should:
1. Outperform the BM Index across time periods (1Y, 3Y, 5Y)
2. Outperform the category average across time periods
3. Have superior risk metrics (handled separately via score)
A fund is flagged if it underperforms on 2+ out of 3 periods
on EITHER benchmark OR category average.
"""
PERIODS = [
("1Y", "cagr_1y", "cagr_1y_bm", "cagr_1y_cat"),
("3Y", "cagr_3y", "cagr_3y_bm", "cagr_3y_cat"),
("5Y", "cagr_5y", "cagr_5y_bm", "cagr_5y_cat"),
]
for h in holdings:
f = h.fund
if not f:
continue
bm_fails = 0
cat_fails = 0
checked = 0
for label, cagr_attr, bm_attr, cat_attr in PERIODS:
fund_cagr = getattr(f, cagr_attr, None)
bm_cagr = getattr(f, bm_attr, None)
cat_cagr = getattr(f, cat_attr, None)
if fund_cagr is None:
continue
checked += 1
if bm_cagr is not None and fund_cagr < bm_cagr:
bm_fails += 1
if cat_cagr is not None and fund_cagr < cat_cagr:
cat_fails += 1
# Flag if underperforms BM on 2+ periods OR underperforms category on 2+ periods
if checked > 0 and (bm_fails >= 2 or cat_fails >= 2):
h.is_underperforming = True
return holdings
def compute_wealth_projection(total_value: float, years_list: list = [5, 10, 15, 20],
rate: float = 0.12) -> Dict:
"""Project portfolio value at a fixed annual return rate."""
return {
yr: round(total_value * ((1 + rate) ** yr), 2)
for yr in years_list
}
# ─── Main entry ──────────────────────────────────────────────────────────────
def run_portfolio_engine(
client_csv: str,
fund_universe: List[Fund],
advisor: Optional[Advisor] = None,
) -> PortfolioReport:
"""
Full pipeline: load client β†’ match funds β†’ analyse β†’ build report object.
"""
if advisor is None:
advisor = Advisor()
print(f"πŸ“‚ Loading client data from: {client_csv}")
client, holdings = load_client_csv(client_csv)
print(f" Client: {client.name} | Holdings: {len(holdings)}")
print("πŸ”— Matching holdings to fund universe...")
holdings = match_holdings_to_funds(holdings, fund_universe)
matched = sum(1 for h in holdings if h.fund is not None)
print(f" Matched {matched}/{len(holdings)} holdings")
holdings = compute_allocation(holdings)
amc_exp, scheme_exp, warnings = check_exposure(holdings)
holdings = flag_underperformers(holdings)
metrics = compute_portfolio_metrics(holdings)
total_current = sum(h.current_value for h in holdings)
total_invested = sum(h.invested_amount or 0 for h in holdings)
wealth_projection = compute_wealth_projection(total_current)
report = PortfolioReport(
client=client,
advisor=advisor,
holdings=holdings,
total_current_value=total_current,
total_invested=total_invested,
unrealized_gain=total_current - total_invested,
sharpe=metrics.get("sharpe"),
alpha=metrics.get("alpha"),
beta=metrics.get("beta"),
std_dev=metrics.get("std_dev"),
amc_exposure=amc_exp,
scheme_exposure=scheme_exp,
exposure_warnings=warnings,
wealth_projection=wealth_projection,
)
if warnings:
print("\n".join(warnings))
return report