Spaces:
Running
Running
| """ | |
| Portfolio Engine: -p mode | |
| Loads a client CSV, matches holdings to the fund universe, | |
| computes portfolio metrics, exposure checks, and wealth projection. | |
| """ | |
| import csv | |
| import numpy as np | |
| from pathlib import Path | |
| from typing import List, Optional, Dict | |
| from src.models import Fund, Client, ClientHolding, Advisor, PortfolioReport | |
| # βββ Client CSV Loader βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_client_csv(csv_path: str) -> tuple[Client, List[ClientHolding]]: | |
| """ | |
| Load client data from CSV. | |
| Expected CSV format: | |
| Line 1: Name, Age, Email, Mobile[, PAN] | |
| Line 2+: Scheme Name, Current Value, Invested Amount, SIP Amount, SIP Frequency | |
| Example: | |
| Parthiban,45,parthiban@gmail.com,9876543210,ABCDE1234F | |
| Nippon India Small Cap Fund,280923,200000,5000,Monthly | |
| HDFC Mid Cap Fund,134562,120000,3000,Monthly | |
| """ | |
| csv_path = Path(csv_path) | |
| if not csv_path.exists(): | |
| raise FileNotFoundError(f"Client CSV not found: {csv_path}") | |
| with open(csv_path, encoding='utf-8-sig', errors='replace') as f: | |
| reader = csv.reader(f) | |
| rows = [r for r in reader if any(c.strip() for c in r)] | |
| if not rows: | |
| raise ValueError("Client CSV is empty") | |
| # Parse client info from first row | |
| info = rows[0] | |
| client = Client( | |
| name=info[0].strip() if len(info) > 0 else "Unknown", | |
| age=int(info[1]) if len(info) > 1 and info[1].strip().isdigit() else None, | |
| email=info[2].strip() if len(info) > 2 else None, | |
| mobile=info[3].strip() if len(info) > 3 else None, | |
| pan=info[4].strip() if len(info) > 4 else None, | |
| ) | |
| # Parse holdings from remaining rows | |
| holdings: List[ClientHolding] = [] | |
| for row in rows[1:]: | |
| if not row or not row[0].strip(): | |
| continue | |
| # Skip header-like rows | |
| if row[0].strip().lower() in ('scheme name', 'fund', 'scheme'): | |
| continue | |
| def safe_float(v): | |
| try: | |
| return float(str(v).replace(',', '').strip()) | |
| except (ValueError, TypeError): | |
| return None | |
| holding = ClientHolding( | |
| scheme_name=row[0].strip(), | |
| current_value=safe_float(row[1]) or 0.0, | |
| invested_amount=safe_float(row[2]) if len(row) > 2 else None, | |
| sip_amount=safe_float(row[3]) if len(row) > 3 else None, | |
| sip_frequency=row[4].strip() if len(row) > 4 else None, | |
| ) | |
| holdings.append(holding) | |
| return client, holdings | |
| # βββ Fund Matcher ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def match_holdings_to_funds(holdings: List[ClientHolding], funds: List[Fund]) -> List[ClientHolding]: | |
| """ | |
| Fuzzy-match each client holding to a fund in the universe. | |
| Uses token overlap on lowercased fund names. | |
| """ | |
| def tokenize(name: str) -> set: | |
| stopwords = {'fund', 'regular', 'plan', 'growth', 'option', 'direct', | |
| 'idcw', 'div', 'dividend', '-', 'the', 'india', 'of'} | |
| tokens = set(name.lower().replace('-', ' ').split()) | |
| return tokens - stopwords | |
| fund_tokens = [(f, tokenize(f.name)) for f in funds] | |
| for holding in holdings: | |
| h_tokens = tokenize(holding.scheme_name) | |
| if not h_tokens: | |
| continue | |
| best_fund = None | |
| best_score = 0 | |
| for fund, f_tokens in fund_tokens: | |
| if not f_tokens: | |
| continue | |
| intersection = len(h_tokens & f_tokens) | |
| union = len(h_tokens | f_tokens) | |
| jaccard = intersection / union if union else 0 | |
| if jaccard > best_score: | |
| best_score = jaccard | |
| best_fund = fund | |
| if best_score > 0.15: # minimum match threshold | |
| holding.fund = best_fund | |
| return holdings | |
| # βββ Portfolio Analysis ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def compute_allocation(holdings: List[ClientHolding]) -> List[ClientHolding]: | |
| """Compute each holding's % allocation of total portfolio.""" | |
| total = sum(h.current_value for h in holdings) | |
| if total == 0: | |
| return holdings | |
| for h in holdings: | |
| h.allocation_pct = round((h.current_value / total) * 100, 2) | |
| return holdings | |
| def check_exposure(holdings: List[ClientHolding]) -> tuple[Dict, Dict, List[str]]: | |
| """ | |
| Check AMC and scheme-level exposure. | |
| Returns (amc_exposure, scheme_exposure, warnings). | |
| """ | |
| total = sum(h.current_value for h in holdings) | |
| if total == 0: | |
| return {}, {}, [] | |
| amc_exposure: Dict[str, float] = {} | |
| scheme_exposure: Dict[str, float] = {} | |
| warnings: List[str] = [] | |
| for h in holdings: | |
| pct = h.allocation_pct | |
| scheme_exposure[h.scheme_name] = pct | |
| # Extract AMC name (first word(s) before "-") | |
| amc = h.scheme_name.split('-')[0].strip() | |
| amc_exposure[amc] = amc_exposure.get(amc, 0) + pct | |
| THRESHOLD = 20.0 | |
| for amc, pct in amc_exposure.items(): | |
| if pct > THRESHOLD: | |
| warnings.append(f"β οΈ AMC Exposure Alert: {amc} = {pct:.1f}% (>{THRESHOLD}% threshold)") | |
| for scheme, pct in scheme_exposure.items(): | |
| if pct > THRESHOLD: | |
| warnings.append(f"β οΈ Scheme Exposure Alert: {scheme} = {pct:.1f}% (>{THRESHOLD}% threshold)") | |
| return amc_exposure, scheme_exposure, warnings | |
| def compute_portfolio_metrics(holdings: List[ClientHolding]) -> Dict: | |
| """ | |
| Compute portfolio-level weighted average risk metrics. | |
| """ | |
| total = sum(h.current_value for h in holdings) | |
| if total == 0: | |
| return {} | |
| metrics = {"sharpe": 0.0, "alpha": 0.0, "beta": 0.0, "std_dev": 0.0} | |
| for h in holdings: | |
| w = h.current_value / total | |
| if h.fund: | |
| if h.fund.sharpe is not None: | |
| metrics["sharpe"] += w * h.fund.sharpe | |
| if h.fund.alpha is not None: | |
| metrics["alpha"] += w * h.fund.alpha | |
| if h.fund.beta is not None: | |
| metrics["beta"] += w * h.fund.beta | |
| if h.fund.std_dev is not None: | |
| metrics["std_dev"] += w * h.fund.std_dev | |
| return {k: round(v, 4) for k, v in metrics.items()} | |
| def flag_underperformers(holdings: List[ClientHolding]) -> List[ClientHolding]: | |
| """ | |
| Flag a holding as underperforming if its fund's CAGR fails to outperform | |
| EITHER the BM Index OR the Category Average across multiple time periods. | |
| Rule (from senior advisor's framework): | |
| A fund's CAGR should: | |
| 1. Outperform the BM Index across time periods (1Y, 3Y, 5Y) | |
| 2. Outperform the category average across time periods | |
| 3. Have superior risk metrics (handled separately via score) | |
| A fund is flagged if it underperforms on 2+ out of 3 periods | |
| on EITHER benchmark OR category average. | |
| """ | |
| PERIODS = [ | |
| ("1Y", "cagr_1y", "cagr_1y_bm", "cagr_1y_cat"), | |
| ("3Y", "cagr_3y", "cagr_3y_bm", "cagr_3y_cat"), | |
| ("5Y", "cagr_5y", "cagr_5y_bm", "cagr_5y_cat"), | |
| ] | |
| for h in holdings: | |
| f = h.fund | |
| if not f: | |
| continue | |
| bm_fails = 0 | |
| cat_fails = 0 | |
| checked = 0 | |
| for label, cagr_attr, bm_attr, cat_attr in PERIODS: | |
| fund_cagr = getattr(f, cagr_attr, None) | |
| bm_cagr = getattr(f, bm_attr, None) | |
| cat_cagr = getattr(f, cat_attr, None) | |
| if fund_cagr is None: | |
| continue | |
| checked += 1 | |
| if bm_cagr is not None and fund_cagr < bm_cagr: | |
| bm_fails += 1 | |
| if cat_cagr is not None and fund_cagr < cat_cagr: | |
| cat_fails += 1 | |
| # Flag if underperforms BM on 2+ periods OR underperforms category on 2+ periods | |
| if checked > 0 and (bm_fails >= 2 or cat_fails >= 2): | |
| h.is_underperforming = True | |
| return holdings | |
| def compute_wealth_projection(total_value: float, years_list: list = [5, 10, 15, 20], | |
| rate: float = 0.12) -> Dict: | |
| """Project portfolio value at a fixed annual return rate.""" | |
| return { | |
| yr: round(total_value * ((1 + rate) ** yr), 2) | |
| for yr in years_list | |
| } | |
| # βββ Main entry ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_portfolio_engine( | |
| client_csv: str, | |
| fund_universe: List[Fund], | |
| advisor: Optional[Advisor] = None, | |
| ) -> PortfolioReport: | |
| """ | |
| Full pipeline: load client β match funds β analyse β build report object. | |
| """ | |
| if advisor is None: | |
| advisor = Advisor() | |
| print(f"π Loading client data from: {client_csv}") | |
| client, holdings = load_client_csv(client_csv) | |
| print(f" Client: {client.name} | Holdings: {len(holdings)}") | |
| print("π Matching holdings to fund universe...") | |
| holdings = match_holdings_to_funds(holdings, fund_universe) | |
| matched = sum(1 for h in holdings if h.fund is not None) | |
| print(f" Matched {matched}/{len(holdings)} holdings") | |
| holdings = compute_allocation(holdings) | |
| amc_exp, scheme_exp, warnings = check_exposure(holdings) | |
| holdings = flag_underperformers(holdings) | |
| metrics = compute_portfolio_metrics(holdings) | |
| total_current = sum(h.current_value for h in holdings) | |
| total_invested = sum(h.invested_amount or 0 for h in holdings) | |
| wealth_projection = compute_wealth_projection(total_current) | |
| report = PortfolioReport( | |
| client=client, | |
| advisor=advisor, | |
| holdings=holdings, | |
| total_current_value=total_current, | |
| total_invested=total_invested, | |
| unrealized_gain=total_current - total_invested, | |
| sharpe=metrics.get("sharpe"), | |
| alpha=metrics.get("alpha"), | |
| beta=metrics.get("beta"), | |
| std_dev=metrics.get("std_dev"), | |
| amc_exposure=amc_exp, | |
| scheme_exposure=scheme_exp, | |
| exposure_warnings=warnings, | |
| wealth_projection=wealth_projection, | |
| ) | |
| if warnings: | |
| print("\n".join(warnings)) | |
| return report | |