""" Portfolio Engine: -p mode Loads a client CSV, matches holdings to the fund universe, computes portfolio metrics, exposure checks, and wealth projection. """ import csv import numpy as np from pathlib import Path from typing import List, Optional, Dict from src.models import Fund, Client, ClientHolding, Advisor, PortfolioReport # ─── Client CSV Loader ─────────────────────────────────────────────────────── def load_client_csv(csv_path: str) -> tuple[Client, List[ClientHolding]]: """ Load client data from CSV. Expected CSV format: Line 1: Name, Age, Email, Mobile[, PAN] Line 2+: Scheme Name, Current Value, Invested Amount, SIP Amount, SIP Frequency Example: Parthiban,45,parthiban@gmail.com,9876543210,ABCDE1234F Nippon India Small Cap Fund,280923,200000,5000,Monthly HDFC Mid Cap Fund,134562,120000,3000,Monthly """ csv_path = Path(csv_path) if not csv_path.exists(): raise FileNotFoundError(f"Client CSV not found: {csv_path}") with open(csv_path, encoding='utf-8-sig', errors='replace') as f: reader = csv.reader(f) rows = [r for r in reader if any(c.strip() for c in r)] if not rows: raise ValueError("Client CSV is empty") # Parse client info from first row info = rows[0] client = Client( name=info[0].strip() if len(info) > 0 else "Unknown", age=int(info[1]) if len(info) > 1 and info[1].strip().isdigit() else None, email=info[2].strip() if len(info) > 2 else None, mobile=info[3].strip() if len(info) > 3 else None, pan=info[4].strip() if len(info) > 4 else None, ) # Parse holdings from remaining rows holdings: List[ClientHolding] = [] for row in rows[1:]: if not row or not row[0].strip(): continue # Skip header-like rows if row[0].strip().lower() in ('scheme name', 'fund', 'scheme'): continue def safe_float(v): try: return float(str(v).replace(',', '').strip()) except (ValueError, TypeError): return None holding = ClientHolding( scheme_name=row[0].strip(), current_value=safe_float(row[1]) or 0.0, invested_amount=safe_float(row[2]) if len(row) > 2 else None, sip_amount=safe_float(row[3]) if len(row) > 3 else None, sip_frequency=row[4].strip() if len(row) > 4 else None, ) holdings.append(holding) return client, holdings # ─── Fund Matcher ──────────────────────────────────────────────────────────── def match_holdings_to_funds(holdings: List[ClientHolding], funds: List[Fund]) -> List[ClientHolding]: """ Fuzzy-match each client holding to a fund in the universe. Uses token overlap on lowercased fund names. """ def tokenize(name: str) -> set: stopwords = {'fund', 'regular', 'plan', 'growth', 'option', 'direct', 'idcw', 'div', 'dividend', '-', 'the', 'india', 'of'} tokens = set(name.lower().replace('-', ' ').split()) return tokens - stopwords fund_tokens = [(f, tokenize(f.name)) for f in funds] for holding in holdings: h_tokens = tokenize(holding.scheme_name) if not h_tokens: continue best_fund = None best_score = 0 for fund, f_tokens in fund_tokens: if not f_tokens: continue intersection = len(h_tokens & f_tokens) union = len(h_tokens | f_tokens) jaccard = intersection / union if union else 0 if jaccard > best_score: best_score = jaccard best_fund = fund if best_score > 0.15: # minimum match threshold holding.fund = best_fund return holdings # ─── Portfolio Analysis ────────────────────────────────────────────────────── def compute_allocation(holdings: List[ClientHolding]) -> List[ClientHolding]: """Compute each holding's % allocation of total portfolio.""" total = sum(h.current_value for h in holdings) if total == 0: return holdings for h in holdings: h.allocation_pct = round((h.current_value / total) * 100, 2) return holdings def check_exposure(holdings: List[ClientHolding]) -> tuple[Dict, Dict, List[str]]: """ Check AMC and scheme-level exposure. Returns (amc_exposure, scheme_exposure, warnings). """ total = sum(h.current_value for h in holdings) if total == 0: return {}, {}, [] amc_exposure: Dict[str, float] = {} scheme_exposure: Dict[str, float] = {} warnings: List[str] = [] for h in holdings: pct = h.allocation_pct scheme_exposure[h.scheme_name] = pct # Extract AMC name (first word(s) before "-") amc = h.scheme_name.split('-')[0].strip() amc_exposure[amc] = amc_exposure.get(amc, 0) + pct THRESHOLD = 20.0 for amc, pct in amc_exposure.items(): if pct > THRESHOLD: warnings.append(f"⚠️ AMC Exposure Alert: {amc} = {pct:.1f}% (>{THRESHOLD}% threshold)") for scheme, pct in scheme_exposure.items(): if pct > THRESHOLD: warnings.append(f"⚠️ Scheme Exposure Alert: {scheme} = {pct:.1f}% (>{THRESHOLD}% threshold)") return amc_exposure, scheme_exposure, warnings def compute_portfolio_metrics(holdings: List[ClientHolding]) -> Dict: """ Compute portfolio-level weighted average risk metrics. """ total = sum(h.current_value for h in holdings) if total == 0: return {} metrics = {"sharpe": 0.0, "alpha": 0.0, "beta": 0.0, "std_dev": 0.0} for h in holdings: w = h.current_value / total if h.fund: if h.fund.sharpe is not None: metrics["sharpe"] += w * h.fund.sharpe if h.fund.alpha is not None: metrics["alpha"] += w * h.fund.alpha if h.fund.beta is not None: metrics["beta"] += w * h.fund.beta if h.fund.std_dev is not None: metrics["std_dev"] += w * h.fund.std_dev return {k: round(v, 4) for k, v in metrics.items()} def flag_underperformers(holdings: List[ClientHolding]) -> List[ClientHolding]: """ Flag a holding as underperforming if its fund's CAGR fails to outperform EITHER the BM Index OR the Category Average across multiple time periods. Rule (from senior advisor's framework): A fund's CAGR should: 1. Outperform the BM Index across time periods (1Y, 3Y, 5Y) 2. Outperform the category average across time periods 3. Have superior risk metrics (handled separately via score) A fund is flagged if it underperforms on 2+ out of 3 periods on EITHER benchmark OR category average. """ PERIODS = [ ("1Y", "cagr_1y", "cagr_1y_bm", "cagr_1y_cat"), ("3Y", "cagr_3y", "cagr_3y_bm", "cagr_3y_cat"), ("5Y", "cagr_5y", "cagr_5y_bm", "cagr_5y_cat"), ] for h in holdings: f = h.fund if not f: continue bm_fails = 0 cat_fails = 0 checked = 0 for label, cagr_attr, bm_attr, cat_attr in PERIODS: fund_cagr = getattr(f, cagr_attr, None) bm_cagr = getattr(f, bm_attr, None) cat_cagr = getattr(f, cat_attr, None) if fund_cagr is None: continue checked += 1 if bm_cagr is not None and fund_cagr < bm_cagr: bm_fails += 1 if cat_cagr is not None and fund_cagr < cat_cagr: cat_fails += 1 # Flag if underperforms BM on 2+ periods OR underperforms category on 2+ periods if checked > 0 and (bm_fails >= 2 or cat_fails >= 2): h.is_underperforming = True return holdings def compute_wealth_projection(total_value: float, years_list: list = [5, 10, 15, 20], rate: float = 0.12) -> Dict: """Project portfolio value at a fixed annual return rate.""" return { yr: round(total_value * ((1 + rate) ** yr), 2) for yr in years_list } # ─── Main entry ────────────────────────────────────────────────────────────── def run_portfolio_engine( client_csv: str, fund_universe: List[Fund], advisor: Optional[Advisor] = None, ) -> PortfolioReport: """ Full pipeline: load client → match funds → analyse → build report object. """ if advisor is None: advisor = Advisor() print(f"📂 Loading client data from: {client_csv}") client, holdings = load_client_csv(client_csv) print(f" Client: {client.name} | Holdings: {len(holdings)}") print("🔗 Matching holdings to fund universe...") holdings = match_holdings_to_funds(holdings, fund_universe) matched = sum(1 for h in holdings if h.fund is not None) print(f" Matched {matched}/{len(holdings)} holdings") holdings = compute_allocation(holdings) amc_exp, scheme_exp, warnings = check_exposure(holdings) holdings = flag_underperformers(holdings) metrics = compute_portfolio_metrics(holdings) total_current = sum(h.current_value for h in holdings) total_invested = sum(h.invested_amount or 0 for h in holdings) wealth_projection = compute_wealth_projection(total_current) report = PortfolioReport( client=client, advisor=advisor, holdings=holdings, total_current_value=total_current, total_invested=total_invested, unrealized_gain=total_current - total_invested, sharpe=metrics.get("sharpe"), alpha=metrics.get("alpha"), beta=metrics.get("beta"), std_dev=metrics.get("std_dev"), amc_exposure=amc_exp, scheme_exposure=scheme_exp, exposure_warnings=warnings, wealth_projection=wealth_projection, ) if warnings: print("\n".join(warnings)) return report