File size: 10,791 Bytes
b0e15c1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
"""

Portfolio Engine: -p mode



Loads a client CSV, matches holdings to the fund universe,

computes portfolio metrics, exposure checks, and wealth projection.

"""

import csv
import numpy as np
from pathlib import Path
from typing import List, Optional, Dict
from src.models import Fund, Client, ClientHolding, Advisor, PortfolioReport


# ─── Client CSV Loader ───────────────────────────────────────────────────────

def load_client_csv(csv_path: str) -> tuple[Client, List[ClientHolding]]:
    """

    Load client data from CSV.



    Expected CSV format:

    Line 1: Name, Age, Email, Mobile[, PAN]

    Line 2+: Scheme Name, Current Value, Invested Amount, SIP Amount, SIP Frequency



    Example:

        Parthiban,45,parthiban@gmail.com,9876543210,ABCDE1234F

        Nippon India Small Cap Fund,280923,200000,5000,Monthly

        HDFC Mid Cap Fund,134562,120000,3000,Monthly

    """
    csv_path = Path(csv_path)
    if not csv_path.exists():
        raise FileNotFoundError(f"Client CSV not found: {csv_path}")

    with open(csv_path, encoding='utf-8-sig', errors='replace') as f:
        reader = csv.reader(f)
        rows = [r for r in reader if any(c.strip() for c in r)]

    if not rows:
        raise ValueError("Client CSV is empty")

    # Parse client info from first row
    info = rows[0]
    client = Client(
        name=info[0].strip() if len(info) > 0 else "Unknown",
        age=int(info[1]) if len(info) > 1 and info[1].strip().isdigit() else None,
        email=info[2].strip() if len(info) > 2 else None,
        mobile=info[3].strip() if len(info) > 3 else None,
        pan=info[4].strip() if len(info) > 4 else None,
    )

    # Parse holdings from remaining rows
    holdings: List[ClientHolding] = []
    for row in rows[1:]:
        if not row or not row[0].strip():
            continue
        # Skip header-like rows
        if row[0].strip().lower() in ('scheme name', 'fund', 'scheme'):
            continue

        def safe_float(v):
            try:
                return float(str(v).replace(',', '').strip())
            except (ValueError, TypeError):
                return None

        holding = ClientHolding(
            scheme_name=row[0].strip(),
            current_value=safe_float(row[1]) or 0.0,
            invested_amount=safe_float(row[2]) if len(row) > 2 else None,
            sip_amount=safe_float(row[3]) if len(row) > 3 else None,
            sip_frequency=row[4].strip() if len(row) > 4 else None,
        )
        holdings.append(holding)

    return client, holdings


# ─── Fund Matcher ────────────────────────────────────────────────────────────

def match_holdings_to_funds(holdings: List[ClientHolding], funds: List[Fund]) -> List[ClientHolding]:
    """

    Fuzzy-match each client holding to a fund in the universe.

    Uses token overlap on lowercased fund names.

    """
    def tokenize(name: str) -> set:
        stopwords = {'fund', 'regular', 'plan', 'growth', 'option', 'direct',
                     'idcw', 'div', 'dividend', '-', 'the', 'india', 'of'}
        tokens = set(name.lower().replace('-', ' ').split())
        return tokens - stopwords

    fund_tokens = [(f, tokenize(f.name)) for f in funds]

    for holding in holdings:
        h_tokens = tokenize(holding.scheme_name)
        if not h_tokens:
            continue

        best_fund = None
        best_score = 0

        for fund, f_tokens in fund_tokens:
            if not f_tokens:
                continue
            intersection = len(h_tokens & f_tokens)
            union = len(h_tokens | f_tokens)
            jaccard = intersection / union if union else 0

            if jaccard > best_score:
                best_score = jaccard
                best_fund = fund

        if best_score > 0.15:  # minimum match threshold
            holding.fund = best_fund

    return holdings


# ─── Portfolio Analysis ──────────────────────────────────────────────────────

def compute_allocation(holdings: List[ClientHolding]) -> List[ClientHolding]:
    """Compute each holding's % allocation of total portfolio."""
    total = sum(h.current_value for h in holdings)
    if total == 0:
        return holdings
    for h in holdings:
        h.allocation_pct = round((h.current_value / total) * 100, 2)
    return holdings


def check_exposure(holdings: List[ClientHolding]) -> tuple[Dict, Dict, List[str]]:
    """

    Check AMC and scheme-level exposure.

    Returns (amc_exposure, scheme_exposure, warnings).

    """
    total = sum(h.current_value for h in holdings)
    if total == 0:
        return {}, {}, []

    amc_exposure: Dict[str, float] = {}
    scheme_exposure: Dict[str, float] = {}
    warnings: List[str] = []

    for h in holdings:
        pct = h.allocation_pct
        scheme_exposure[h.scheme_name] = pct

        # Extract AMC name (first word(s) before "-")
        amc = h.scheme_name.split('-')[0].strip()
        amc_exposure[amc] = amc_exposure.get(amc, 0) + pct

    THRESHOLD = 20.0

    for amc, pct in amc_exposure.items():
        if pct > THRESHOLD:
            warnings.append(f"⚠️  AMC Exposure Alert: {amc} = {pct:.1f}% (>{THRESHOLD}% threshold)")

    for scheme, pct in scheme_exposure.items():
        if pct > THRESHOLD:
            warnings.append(f"⚠️  Scheme Exposure Alert: {scheme} = {pct:.1f}% (>{THRESHOLD}% threshold)")

    return amc_exposure, scheme_exposure, warnings


def compute_portfolio_metrics(holdings: List[ClientHolding]) -> Dict:
    """

    Compute portfolio-level weighted average risk metrics.

    """
    total = sum(h.current_value for h in holdings)
    if total == 0:
        return {}

    metrics = {"sharpe": 0.0, "alpha": 0.0, "beta": 0.0, "std_dev": 0.0}

    for h in holdings:
        w = h.current_value / total
        if h.fund:
            if h.fund.sharpe is not None:
                metrics["sharpe"] += w * h.fund.sharpe
            if h.fund.alpha is not None:
                metrics["alpha"] += w * h.fund.alpha
            if h.fund.beta is not None:
                metrics["beta"] += w * h.fund.beta
            if h.fund.std_dev is not None:
                metrics["std_dev"] += w * h.fund.std_dev

    return {k: round(v, 4) for k, v in metrics.items()}


def flag_underperformers(holdings: List[ClientHolding]) -> List[ClientHolding]:
    """

    Flag a holding as underperforming if its fund's CAGR fails to outperform

    EITHER the BM Index OR the Category Average across multiple time periods.



    Rule (from senior advisor's framework):

      A fund's CAGR should:

        1. Outperform the BM Index across time periods (1Y, 3Y, 5Y)

        2. Outperform the category average across time periods

        3. Have superior risk metrics (handled separately via score)



    A fund is flagged if it underperforms on 2+ out of 3 periods

    on EITHER benchmark OR category average.

    """
    PERIODS = [
        ("1Y", "cagr_1y", "cagr_1y_bm", "cagr_1y_cat"),
        ("3Y", "cagr_3y", "cagr_3y_bm", "cagr_3y_cat"),
        ("5Y", "cagr_5y", "cagr_5y_bm", "cagr_5y_cat"),
    ]

    for h in holdings:
        f = h.fund
        if not f:
            continue

        bm_fails  = 0
        cat_fails = 0
        checked   = 0

        for label, cagr_attr, bm_attr, cat_attr in PERIODS:
            fund_cagr = getattr(f, cagr_attr, None)
            bm_cagr   = getattr(f, bm_attr,   None)
            cat_cagr  = getattr(f, cat_attr,  None)

            if fund_cagr is None:
                continue
            checked += 1
            if bm_cagr  is not None and fund_cagr < bm_cagr:
                bm_fails  += 1
            if cat_cagr is not None and fund_cagr < cat_cagr:
                cat_fails += 1

        # Flag if underperforms BM on 2+ periods OR underperforms category on 2+ periods
        if checked > 0 and (bm_fails >= 2 or cat_fails >= 2):
            h.is_underperforming = True

    return holdings


def compute_wealth_projection(total_value: float, years_list: list = [5, 10, 15, 20],

                               rate: float = 0.12) -> Dict:
    """Project portfolio value at a fixed annual return rate."""
    return {
        yr: round(total_value * ((1 + rate) ** yr), 2)
        for yr in years_list
    }


# ─── Main entry ──────────────────────────────────────────────────────────────

def run_portfolio_engine(

    client_csv: str,

    fund_universe: List[Fund],

    advisor: Optional[Advisor] = None,

) -> PortfolioReport:
    """

    Full pipeline: load client β†’ match funds β†’ analyse β†’ build report object.

    """
    if advisor is None:
        advisor = Advisor()

    print(f"πŸ“‚ Loading client data from: {client_csv}")
    client, holdings = load_client_csv(client_csv)
    print(f"   Client: {client.name} | Holdings: {len(holdings)}")

    print("πŸ”— Matching holdings to fund universe...")
    holdings = match_holdings_to_funds(holdings, fund_universe)
    matched = sum(1 for h in holdings if h.fund is not None)
    print(f"   Matched {matched}/{len(holdings)} holdings")

    holdings = compute_allocation(holdings)
    amc_exp, scheme_exp, warnings = check_exposure(holdings)
    holdings = flag_underperformers(holdings)
    metrics = compute_portfolio_metrics(holdings)

    total_current = sum(h.current_value for h in holdings)
    total_invested = sum(h.invested_amount or 0 for h in holdings)

    wealth_projection = compute_wealth_projection(total_current)

    report = PortfolioReport(
        client=client,
        advisor=advisor,
        holdings=holdings,
        total_current_value=total_current,
        total_invested=total_invested,
        unrealized_gain=total_current - total_invested,
        sharpe=metrics.get("sharpe"),
        alpha=metrics.get("alpha"),
        beta=metrics.get("beta"),
        std_dev=metrics.get("std_dev"),
        amc_exposure=amc_exp,
        scheme_exposure=scheme_exp,
        exposure_warnings=warnings,
        wealth_projection=wealth_projection,
    )

    if warnings:
        print("\n".join(warnings))

    return report