Spaces:
Running
Running
| """ | |
| pe_pb_engine.py β P/E and P/B for Indian mutual funds. | |
| Two-track approach: | |
| ACTIVE funds β AMFI monthly portfolio holdings + NSE/yfinance stock PE/PB | |
| Weighted average: Portfolio PE = Ξ£ (weight% Γ stock PE) | |
| This is identical to what Groww shows (same AMFI source). | |
| INDEX funds β NSE allIndices API (benchmark index PE/PB) | |
| Fast, real-time, already accurate since fund mirrors index. | |
| Active vs Index detection: | |
| Category contains "Index Fund", "ETF", "Exchange Traded" β INDEX track | |
| Everything else β ACTIVE track | |
| AMFI holdings URL pattern: | |
| https://portal.amfiindia.com/spages/am{mon}{year}repo.xls | |
| e.g. amfeb2026repo.xls (February 2026 data) | |
| Caching: | |
| - AMFI XLS : 30 days in Neon/SQLite (monthly data β no point refreshing sooner) | |
| - Stock PE/PB : 1 day in Neon/SQLite (NSE stock data changes daily) | |
| - Index PE/PB : 1 day in Neon/SQLite (existing behaviour) | |
| Usage: | |
| from src.pe_pb_engine import fetch_pe_pb, warm_index_cache | |
| pe, pb = fetch_pe_pb( | |
| benchmark_type="NIFTY 100 TRI", | |
| fund_name="Mirae Asset Large Cap Fund", | |
| category="Equity: Large Cap", | |
| scheme_isin="INF769K01036", # optional β improves AMFI matching | |
| ) | |
| """ | |
| from __future__ import annotations | |
| import io | |
| import json | |
| import os | |
| import re | |
| import threading | |
| import time | |
| from datetime import datetime | |
| from typing import Optional | |
| import pandas as pd | |
| import requests | |
| import yfinance as yf | |
| # ββ TTLs βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _INDEX_PE_TTL = 24 * 3600 # 1 day β index PE/PB | |
| _STOCK_PE_TTL = 24 * 3600 # 1 day β individual stock PE/PB | |
| _AMFI_XLS_TTL = 30 * 24 * 3600 # 30 days β AMFI monthly holdings XLS | |
| # ββ Index fund category detection βββββββββββββββββββββββββββββββββββββββββββββ | |
| _INDEX_FUND_TOKENS = { | |
| "INDEX FUND", "ETF", "EXCHANGE TRADED", "INDEX - DOMESTIC", | |
| "INDEX - INTERNATIONAL", "OTHER ETFS", "GOLD ETF", "SILVER ETF", | |
| "FUND OF FUNDS", | |
| } | |
| def _is_index_fund(category: str) -> bool: | |
| cat_upper = (category or "").upper() | |
| return any(token in cat_upper for token in _INDEX_FUND_TOKENS) | |
| # ββ No-PE benchmark tokens (debt/liquid/hybrid) ββββββββββββββββββββββββββββββββ | |
| _NO_PE_TOKENS = { | |
| "CRISIL", "G-SEC", "G SEC", "GSEC", "SDL", "GILT", | |
| "LIQUID", "OVERNIGHT", "1D RATE", "ARBITRAGE", | |
| "S&P BSE LIQUID", "MSCI", "S&P GLOBAL", "4-8 YR", | |
| "AK HYBRID", "AK EQUITY SAVINGS", "AK MULTI ASSET", | |
| "COM.ADVISORKHOJ", | |
| } | |
| def _is_no_pe_benchmark(bm: str) -> bool: | |
| bm_upper = bm.upper() | |
| return any(token in bm_upper for token in _NO_PE_TOKENS) | |
| # ββ NSE index benchmark map ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _BENCHMARK_MAP: dict[str, str] = { | |
| "NIFTY 50": "NIFTY 50", "NIFTY 100": "NIFTY 100", | |
| "NIFTY 200": "NIFTY 200", "NIFTY 500": "NIFTY 500", | |
| "NIFTY NEXT 50": "NIFTY NEXT 50", "NIFTY TOTAL MARKET": "NIFTY TOTAL MARKET", | |
| "NIFTY MIDCAP 50": "NIFTY MIDCAP 50", "NIFTY MIDCAP 100": "NIFTY MIDCAP 100", | |
| "NIFTY MIDCAP 150": "NIFTY MIDCAP 150", | |
| "NIFTY SMALLCAP 50": "NIFTY SMALLCAP 50", | |
| "NIFTY SMALLCAP 100": "NIFTY SMALLCAP 100", | |
| "NIFTY SMALLCAP 250": "NIFTY SMALLCAP 250", | |
| "NIFTY MIDSMALLCAP 400": "NIFTY MIDSMALLCAP 400", | |
| "NIFTY LARGEMIDCAP 250": "NIFTY LARGEMIDCAP 250", | |
| "NIFTY LARGE MIDCAP 250": "NIFTY LARGEMIDCAP 250", | |
| "NIFTY LARGE - MIDCAP 250": "NIFTY LARGEMIDCAP 250", | |
| "NIFTY500 MULTICAP 50:25:25": "NIFTY500 MULTICAP 50:25:25", | |
| "NIFTY BANK": "NIFTY BANK", | |
| "NIFTY FINANCIAL SERVICES": "NIFTY FINANCIAL SERVICES", | |
| "NIFTY IT": "NIFTY IT", "NIFTY FMCG": "NIFTY FMCG", | |
| "NIFTY PHARMA": "NIFTY PHARMA", "NIFTY HEALTHCARE INDEX": "NIFTY HEALTHCARE INDEX", | |
| "NIFTY HEALTHCARE": "NIFTY HEALTHCARE INDEX", | |
| "NIFTY AUTO": "NIFTY AUTO", "NIFTY METAL": "NIFTY METAL", | |
| "NIFTY REALTY": "NIFTY REALTY", "NIFTY INFRASTRUCTURE": "NIFTY INFRASTRUCTURE", | |
| "NIFTY COMMODITIES": "NIFTY COMMODITIES", "NIFTY ENERGY": "NIFTY ENERGY", | |
| "NIFTY OIL & GAS": "NIFTY OIL & GAS", "NIFTY MNC": "NIFTY MNC", | |
| "NIFTY CPSE": "NIFTY CPSE", "NIFTY PSE": "NIFTY PSE", | |
| "NIFTY INDIA CONSUMPTION": "NIFTY INDIA CONSUMPTION", | |
| "NIFTY INDIA MANUFACTURING": "NIFTY INDIA MANUFACTURING", | |
| "NIFTY INDIA DEFENCE": "NIFTY INDIA DEFENCE", | |
| "NIFTY HOUSING": "NIFTY HOUSING", | |
| "NIFTY100 LOW VOLATILITY 30": "NIFTY100 LOW VOLATILITY 30", | |
| "NIFTY 100 LOW VOLATILITY 30": "NIFTY100 LOW VOLATILITY 30", | |
| "NIFTY200 MOMENTUM 30": "NIFTY200 MOMENTUM 30", | |
| "NIFTY 200 MOMENTUM 30": "NIFTY200 MOMENTUM 30", | |
| } | |
| def _normalize_benchmark(bm: str) -> str: | |
| s = re.sub(r'\s+TRI\.?\s*$', '', bm.strip(), flags=re.IGNORECASE) | |
| s = re.sub(r'\s*\(TRI\)\s*$', '', s, flags=re.IGNORECASE) | |
| s = re.sub(r'[\s\(\)]+', ' ', s).strip().upper() | |
| s = s.replace("LARGE - MIDCAP", "LARGEMIDCAP") | |
| s = s.replace("LARGE MIDCAP", "LARGEMIDCAP") | |
| s = s.replace("SMALL CAP", "SMALLCAP") | |
| s = re.sub(r'HEALTHCARE$', 'HEALTHCARE INDEX', s) | |
| s = s.replace("FINANCIAL SERVICES INDEX", "FINANCIAL SERVICES") | |
| return s | |
| # ββ DB cache (SQLite local / Neon postgres production) ββββββββββββββββββββββββ | |
| _DATABASE_URL = os.environ.get("DATABASE_URL", "") | |
| _USE_POSTGRES = bool(_DATABASE_URL) | |
| import threading as _threading | |
| _tls = _threading.local() | |
| def _get_conn(): | |
| if _USE_POSTGRES: | |
| import psycopg2 | |
| conn = getattr(_tls, "pg_conn", None) | |
| if conn is None or conn.closed: | |
| conn = psycopg2.connect(_DATABASE_URL, connect_timeout=10) | |
| _tls.pg_conn = conn | |
| try: | |
| conn.cursor().execute("SELECT 1") | |
| except Exception: | |
| conn = psycopg2.connect(_DATABASE_URL, connect_timeout=10) | |
| _tls.pg_conn = conn | |
| return conn, True | |
| else: | |
| import sqlite3 | |
| from pathlib import Path | |
| db_path = Path.home() / ".mf_nav_cache.db" | |
| return sqlite3.connect(str(db_path)), False | |
| def _cache_get(key: str, ttl: float) -> Optional[str]: | |
| try: | |
| conn, is_pg = _get_conn() | |
| ph = "%s" if is_pg else "?" | |
| if is_pg: | |
| with conn.cursor() as cur: | |
| cur.execute(f"SELECT data, ts FROM nav_cache WHERE key = {ph}", (key,)) | |
| row = cur.fetchone() | |
| else: | |
| with conn: | |
| row = conn.execute( | |
| f"SELECT data, ts FROM nav_cache WHERE key = {ph}", (key,) | |
| ).fetchone() | |
| if row and (time.time() - row[1]) < ttl: | |
| return row[0] | |
| except Exception: | |
| pass | |
| return None | |
| def _cache_set(key: str, value: str) -> None: | |
| try: | |
| conn, is_pg = _get_conn() | |
| ph = "%s" if is_pg else "?" | |
| sql = ( | |
| f"INSERT INTO nav_cache (key, data, ts) VALUES ({ph},{ph},{ph}) " | |
| f"ON CONFLICT (key) DO UPDATE SET data=EXCLUDED.data, ts=EXCLUDED.ts" | |
| if is_pg else | |
| f"INSERT OR REPLACE INTO nav_cache (key, data, ts) VALUES ({ph},{ph},{ph})" | |
| ) | |
| if is_pg: | |
| with conn.cursor() as cur: | |
| cur.execute(sql, (key, value, time.time())) | |
| conn.commit() | |
| else: | |
| with conn: | |
| conn.execute(sql, (key, value, time.time())) | |
| except Exception: | |
| pass | |
| def _init_cache_db() -> None: | |
| try: | |
| conn, is_pg = _get_conn() | |
| sql = """CREATE TABLE IF NOT EXISTS nav_cache ( | |
| key TEXT PRIMARY KEY, | |
| data TEXT NOT NULL, | |
| ts DOUBLE PRECISION NOT NULL | |
| )""" | |
| if is_pg: | |
| with conn.cursor() as cur: | |
| cur.execute(sql) | |
| conn.commit() | |
| else: | |
| with conn: | |
| conn.execute(sql) | |
| except Exception: | |
| pass | |
| try: | |
| _init_cache_db() | |
| except Exception: | |
| pass | |
| # ββ In-process caches βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _INDEX_PE_CACHE: dict[str, tuple[float, float]] = {} | |
| _STOCK_PE_CACHE: dict[str, tuple[float | None, float | None]] = {} | |
| _AMFI_HOLD_CACHE: dict[str, pd.DataFrame] = {} # scheme_isin/name β holdings df | |
| _CACHE_LOCK = threading.Lock() | |
| # ββ NSE session βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _NSE_SESSION: Optional[requests.Session] = None | |
| _NSE_SESSION_TS = 0.0 | |
| _NSE_LOCK = threading.Lock() | |
| def _get_nse_session() -> requests.Session: | |
| global _NSE_SESSION, _NSE_SESSION_TS | |
| with _NSE_LOCK: | |
| if _NSE_SESSION is None or (time.time() - _NSE_SESSION_TS) > 300: | |
| s = requests.Session() | |
| s.headers.update({ | |
| "User-Agent": ( | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " | |
| "AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36" | |
| ), | |
| "Accept": "application/json, */*", | |
| "Referer": "https://www.nseindia.com/", | |
| }) | |
| try: | |
| s.get("https://www.nseindia.com/", timeout=10) | |
| time.sleep(0.3) | |
| except Exception: | |
| pass | |
| _NSE_SESSION = s | |
| _NSE_SESSION_TS = time.time() | |
| return _NSE_SESSION | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TRACK 1 β INDEX funds: NSE allIndices benchmark PE/PB | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _fetch_all_index_pe() -> dict[str, tuple[float, float]]: | |
| """Fetch PE/PB for all NSE indices in one API call. Cached 1 day.""" | |
| cache_key = "nse_index_pe_pb_v2" | |
| cached = _cache_get(cache_key, _INDEX_PE_TTL) | |
| if cached: | |
| data = json.loads(cached) | |
| return {k: tuple(v) for k, v in data.items()} | |
| print(" [pe_pb] Fetching NSE allIndices...") | |
| try: | |
| r = _get_nse_session().get( | |
| "https://www.nseindia.com/api/allIndices", timeout=15 | |
| ) | |
| r.raise_for_status() | |
| indices = r.json().get("data", []) | |
| except Exception as e: | |
| print(f" [pe_pb] NSE allIndices failed: {e}") | |
| return {} | |
| result: dict[str, tuple[float, float]] = {} | |
| for idx in indices: | |
| name = idx.get("index", "") | |
| pe = idx.get("pe", "-") | |
| pb = idx.get("pb", "-") | |
| if pe in ("-", None, "", "0") or pb in ("-", None, ""): | |
| continue | |
| try: | |
| result[name] = ( | |
| float(str(pe).replace(",", "")), | |
| float(str(pb).replace(",", "")), | |
| ) | |
| except (ValueError, TypeError): | |
| pass | |
| print(f" [pe_pb] Got PE/PB for {len(result)} NSE indices") | |
| if result: | |
| _cache_set(cache_key, json.dumps(result)) | |
| return result | |
| def warm_index_cache() -> dict[str, tuple[float, float]]: | |
| global _INDEX_PE_CACHE | |
| with _CACHE_LOCK: | |
| if not _INDEX_PE_CACHE: | |
| _INDEX_PE_CACHE = _fetch_all_index_pe() | |
| return _INDEX_PE_CACHE | |
| def _fetch_index_pe_pb(benchmark_type: str) -> tuple[Optional[float], Optional[float]]: | |
| """Return PE/PB for a fund via its benchmark index (INDEX fund track).""" | |
| if not benchmark_type or _is_no_pe_benchmark(benchmark_type): | |
| return None, None | |
| index_map = warm_index_cache() | |
| if not index_map: | |
| return None, None | |
| norm = _normalize_benchmark(benchmark_type) | |
| nse_name = _BENCHMARK_MAP.get(norm) | |
| if not nse_name: | |
| norm_upper = norm.upper() | |
| for idx_name in index_map: | |
| if norm_upper == idx_name.upper(): | |
| nse_name = idx_name | |
| break | |
| if not nse_name: | |
| norm_upper = norm.upper() | |
| for idx_name in index_map: | |
| if norm_upper in idx_name.upper() or idx_name.upper() in norm_upper: | |
| nse_name = idx_name | |
| break | |
| if not nse_name or nse_name not in index_map: | |
| return None, None | |
| return index_map[nse_name] | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TRACK 2 β ACTIVE funds: AMFI holdings + stock PE/PB | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _amfi_xls_url(year: int | None = None, month: int | None = None) -> str: | |
| """ | |
| Build AMFI monthly portfolio XLS URL. | |
| Defaults to the most recently completed month's disclosure. | |
| AMFI publishes by 10th of the following month, so: | |
| - If today >= 10th: use last month | |
| - If today < 10th: use month before last | |
| """ | |
| now = datetime.now() | |
| if year is None or month is None: | |
| if now.day >= 10: | |
| # Last month is fully published | |
| ref = now.replace(day=1) - pd.DateOffset(months=1) | |
| else: | |
| # Still waiting for last month's β use month before last | |
| ref = now.replace(day=1) - pd.DateOffset(months=2) | |
| year = int(ref.year) | |
| month = int(ref.month) | |
| month_abbr = { | |
| 1: "jan", 2: "feb", 3: "mar", 4: "apr", | |
| 5: "may", 6: "jun", 7: "jul", 8: "aug", | |
| 9: "sep", 10: "oct", 11: "nov", 12: "dec", | |
| }[month] | |
| yr2 = str(year)[-2:] # "2026" β "26" | |
| return f"https://portal.amfiindia.com/spages/am{month_abbr}{year}repo.xls" | |
| def _download_amfi_xls() -> Optional[bytes]: | |
| """Download AMFI monthly portfolio XLS. Returns raw bytes or None.""" | |
| url = _amfi_xls_url() | |
| cache_key = f"amfi_xls:{url}" | |
| cached = _cache_get(cache_key, _AMFI_XLS_TTL) | |
| if cached: | |
| print(f" [amfi] XLS loaded from cache ({url.split('/')[-1]})") | |
| return bytes.fromhex(cached) | |
| print(f" [amfi] Downloading {url.split('/')[-1]}...") | |
| headers = { | |
| "User-Agent": ( | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " | |
| "AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36" | |
| ), | |
| "Referer": "https://www.amfiindia.com/", | |
| } | |
| try: | |
| r = requests.get(url, headers=headers, timeout=60) | |
| r.raise_for_status() | |
| raw = r.content | |
| print(f" [amfi] Downloaded {len(raw):,} bytes") | |
| _cache_set(cache_key, raw.hex()) | |
| return raw | |
| except Exception as e: | |
| print(f" [amfi] Download failed: {e}") | |
| return None | |
| def _parse_amfi_xls(raw: bytes) -> dict[str, pd.DataFrame]: | |
| """ | |
| Parse AMFI monthly portfolio XLS. | |
| The XLS has one sheet. Structure (repeating for each scheme): | |
| Row N: Scheme name header line (e.g. "HDFC Large Cap Fund - Growth") | |
| Row N+1: Column headers (Issuer Name | ISIN | ... | % to NAV) | |
| Row N+2..: Holdings rows | |
| (blank row separates schemes) | |
| Returns: {scheme_name_upper: DataFrame with columns [isin, weight_pct]} | |
| """ | |
| try: | |
| df_raw = pd.read_excel(io.BytesIO(raw), header=None, dtype=str) | |
| except Exception as e: | |
| print(f" [amfi] XLS parse failed: {e}") | |
| return {} | |
| schemes: dict[str, pd.DataFrame] = {} | |
| current_scheme = None | |
| header_row = None | |
| isin_col = None | |
| weight_col = None | |
| holding_rows: list[dict] = [] | |
| def _flush(): | |
| nonlocal current_scheme, header_row, isin_col, weight_col, holding_rows | |
| if current_scheme and holding_rows: | |
| schemes[current_scheme.upper()] = pd.DataFrame(holding_rows) | |
| current_scheme = None | |
| header_row = None | |
| isin_col = None | |
| weight_col = None | |
| holding_rows = [] | |
| for _, row in df_raw.iterrows(): | |
| cells = [str(c).strip() if pd.notna(c) else "" for c in row] | |
| non_empty = [c for c in cells if c] | |
| # Blank row β flush current scheme | |
| if not non_empty: | |
| _flush() | |
| continue | |
| # Detect column header row (contains "ISIN" and "% to NAV" or "% To NAV") | |
| cells_upper = [c.upper() for c in cells] | |
| if "ISIN" in cells_upper and any("% TO NAV" in c for c in cells_upper): | |
| try: | |
| isin_col = cells_upper.index("ISIN") | |
| weight_col = next( | |
| i for i, c in enumerate(cells_upper) if "% TO NAV" in c | |
| ) | |
| header_row = True | |
| except (ValueError, StopIteration): | |
| pass | |
| continue | |
| # If we have headers, this is a data row | |
| if header_row and isin_col is not None and weight_col is not None: | |
| isin = cells[isin_col] if isin_col < len(cells) else "" | |
| weight = cells[weight_col] if weight_col < len(cells) else "" | |
| # Valid ISIN: starts with IN + 10 alphanumeric chars | |
| if re.match(r'^IN[A-Z0-9]{10}$', isin): | |
| try: | |
| w = float(str(weight).replace(",", "")) | |
| if w > 0: | |
| holding_rows.append({"isin": isin, "weight_pct": w}) | |
| except (ValueError, TypeError): | |
| pass | |
| continue | |
| # Scheme name line: long text in first cell, not all-caps header | |
| first = cells[0] if cells else "" | |
| if ( | |
| len(first) > 15 | |
| and not first.startswith("Scheme") | |
| and not first.startswith("Fund") | |
| and "%" not in first | |
| and header_row is None | |
| and current_scheme is None | |
| ): | |
| current_scheme = first | |
| continue | |
| _flush() # flush last scheme | |
| print(f" [amfi] Parsed {len(schemes)} schemes from XLS") | |
| return schemes | |
| # ββ AMFI holdings cache (process-level) βββββββββββββββββββββββββββββββββββββββ | |
| _AMFI_SCHEMES: dict[str, pd.DataFrame] = {} # upper scheme name β df | |
| _AMFI_SCHEMES_LOCK = threading.Lock() | |
| _AMFI_LOADED = False | |
| def _ensure_amfi_loaded() -> dict[str, pd.DataFrame]: | |
| global _AMFI_SCHEMES, _AMFI_LOADED | |
| with _AMFI_SCHEMES_LOCK: | |
| if not _AMFI_LOADED: | |
| raw = _download_amfi_xls() | |
| if raw: | |
| _AMFI_SCHEMES = _parse_amfi_xls(raw) | |
| _AMFI_LOADED = True | |
| return _AMFI_SCHEMES | |
| def _find_scheme_holdings(fund_name: str, scheme_isin: str = "") -> Optional[pd.DataFrame]: | |
| """ | |
| Look up holdings for a fund from the AMFI XLS. | |
| Tries ISIN match first (exact), then fuzzy name match. | |
| """ | |
| schemes = _ensure_amfi_loaded() | |
| if not schemes: | |
| return None | |
| # Fuzzy name match: normalise both sides | |
| def _norm(s: str) -> str: | |
| return re.sub(r'[^a-z0-9]', '', s.lower()) | |
| fund_norm = _norm(fund_name) | |
| best_match: Optional[pd.DataFrame] = None | |
| best_score = 0 | |
| for scheme_key, df in schemes.items(): | |
| key_norm = _norm(scheme_key) | |
| # Score = length of longest common substring (simple but effective) | |
| # Use overlap of words instead for robustness | |
| fund_words = set(fund_norm.split()) if " " in fund_norm else {fund_norm} | |
| key_words = set(key_norm.split()) if " " in key_norm else {key_norm} | |
| # Character-level overlap | |
| overlap = sum(1 for c in fund_norm if c in key_norm) | |
| score = overlap / max(len(fund_norm), len(key_norm), 1) | |
| if score > best_score and score > 0.7: | |
| best_score = score | |
| best_match = df | |
| if best_match is not None: | |
| return best_match | |
| return None | |
| # ββ Stock PE/PB fetcher ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _isin_to_yf_ticker(isin: str) -> str: | |
| """ | |
| Convert Indian stock ISIN to Yahoo Finance ticker. | |
| NSE stocks: append .NS (e.g. INE009A01021 β lookup needed) | |
| We use NSE's ISIN lookup API to get the symbol, then append .NS | |
| """ | |
| # Check in-process cache first | |
| cache_key = f"isin_ticker:{isin}" | |
| cached = _cache_get(cache_key, 7 * 24 * 3600) | |
| if cached: | |
| return cached | |
| try: | |
| r = _get_nse_session().get( | |
| f"https://www.nseindia.com/api/search/autocomplete?q={isin}", | |
| timeout=10, | |
| ) | |
| r.raise_for_status() | |
| results = r.json().get("symbols", []) | |
| for item in results: | |
| symbol = item.get("symbol", "") | |
| if symbol: | |
| ticker = f"{symbol}.NS" | |
| _cache_set(cache_key, ticker) | |
| return ticker | |
| except Exception: | |
| pass | |
| return "" | |
| def _fetch_stock_pe_pb(isin: str) -> tuple[Optional[float], Optional[float]]: | |
| """ | |
| Fetch PE and PB for a single stock ISIN via yfinance. | |
| Returns (pe, pb) or (None, None). | |
| """ | |
| global _STOCK_PE_CACHE | |
| if isin in _STOCK_PE_CACHE: | |
| return _STOCK_PE_CACHE[isin] | |
| cache_key = f"stock_pe:{isin}" | |
| cached = _cache_get(cache_key, _STOCK_PE_TTL) | |
| if cached: | |
| data = json.loads(cached) | |
| result = (data.get("pe"), data.get("pb")) | |
| _STOCK_PE_CACHE[isin] = result | |
| return result | |
| ticker_sym = _isin_to_yf_ticker(isin) | |
| if not ticker_sym: | |
| _STOCK_PE_CACHE[isin] = (None, None) | |
| return None, None | |
| try: | |
| info = yf.Ticker(ticker_sym).info | |
| pe = info.get("trailingPE") or info.get("forwardPE") | |
| pb = info.get("priceToBook") | |
| pe = float(pe) if pe is not None else None | |
| pb = float(pb) if pb is not None else None | |
| result = (pe, pb) | |
| _cache_set(cache_key, json.dumps({"pe": pe, "pb": pb})) | |
| _STOCK_PE_CACHE[isin] = result | |
| return result | |
| except Exception: | |
| _STOCK_PE_CACHE[isin] = (None, None) | |
| return None, None | |
| def _compute_active_fund_pe_pb( | |
| fund_name: str, | |
| scheme_isin: str = "", | |
| ) -> tuple[Optional[float], Optional[float]]: | |
| """ | |
| Compute portfolio-weighted PE/PB for an active fund using AMFI holdings. | |
| Portfolio PE = Ξ£ (weight_i Γ PE_i) / Ξ£ weight_i (only over valid PE stocks) | |
| Portfolio PB = Ξ£ (weight_i Γ PB_i) / Ξ£ weight_i | |
| """ | |
| holdings = _find_scheme_holdings(fund_name, scheme_isin) | |
| if holdings is None or holdings.empty: | |
| print(f" [amfi] No holdings found for: {fund_name[:50]}") | |
| return None, None | |
| print(f" [amfi] {fund_name[:45]}: {len(holdings)} holdings β fetching stock PE/PB...") | |
| weighted_pe_sum = 0.0 | |
| weighted_pb_sum = 0.0 | |
| weight_pe_total = 0.0 | |
| weight_pb_total = 0.0 | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| futures = {} | |
| with ThreadPoolExecutor(max_workers=10) as ex: | |
| for _, row in holdings.iterrows(): | |
| isin = row["isin"] | |
| weight = float(row["weight_pct"]) | |
| futures[ex.submit(_fetch_stock_pe_pb, isin)] = (isin, weight) | |
| for fut in as_completed(futures): | |
| isin, weight = futures[fut] | |
| try: | |
| pe, pb = fut.result() | |
| except Exception: | |
| pe, pb = None, None | |
| if pe is not None and pe > 0: | |
| weighted_pe_sum += weight * pe | |
| weight_pe_total += weight | |
| if pb is not None and pb > 0: | |
| weighted_pb_sum += weight * pb | |
| weight_pb_total += weight | |
| portfolio_pe = round(weighted_pe_sum / weight_pe_total, 2) if weight_pe_total > 0 else None | |
| portfolio_pb = round(weighted_pb_sum / weight_pb_total, 2) if weight_pb_total > 0 else None | |
| coverage_pct = round(weight_pe_total, 1) | |
| print( | |
| f" [amfi] {fund_name[:40]}: " | |
| f"PE={portfolio_pe} PB={portfolio_pb} " | |
| f"(coverage {coverage_pct}% of NAV)" | |
| ) | |
| return portfolio_pe, portfolio_pb | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PUBLIC API | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def fetch_pe_pb( | |
| benchmark_type: str, | |
| scheme_code: str = "", # unused, kept for backward compat | |
| fund_name: str = "", | |
| category: str = "", | |
| scheme_isin: str = "", | |
| ) -> tuple[Optional[float], Optional[float]]: | |
| """ | |
| Return (pe, pb) for a fund. | |
| Routing: | |
| - Index fund (category contains "Index Fund"/"ETF"/etc.) β NSE index API | |
| - Debt/liquid (benchmark contains CRISIL/GSEC/etc.) β (None, None) | |
| - Active fund everything else β AMFI holdings | |
| ββ Falls back to NSE index PE/PB if AMFI holdings unavailable | |
| """ | |
| # Debt / liquid β no PE applicable | |
| if _is_no_pe_benchmark(benchmark_type): | |
| return None, None | |
| # Index funds β use benchmark index PE/PB (accurate, real-time) | |
| if _is_index_fund(category): | |
| return _fetch_index_pe_pb(benchmark_type) | |
| # Active funds β AMFI holdings-based PE/PB | |
| if fund_name: | |
| pe, pb = _compute_active_fund_pe_pb(fund_name, scheme_isin) | |
| if pe is not None or pb is not None: | |
| return pe, pb | |
| # Fallback: if AMFI lookup failed, use index PE/PB as proxy | |
| print(f" [pe_pb] AMFI fallback β index PE/PB for: {fund_name[:50]}") | |
| return _fetch_index_pe_pb(benchmark_type) | |
| def batch_fetch_pe_pb( | |
| fund_benchmarks: dict[str, str], | |
| fund_categories: dict[str, str] | None = None, | |
| fund_isins: dict[str, str] | None = None, | |
| ) -> dict[str, tuple[Optional[float], Optional[float]]]: | |
| """ | |
| {fund_name: benchmark_type} β {fund_name: (pe, pb)} | |
| Optional: | |
| fund_categories: {fund_name: category} | |
| fund_isins: {fund_name: scheme_isin} | |
| """ | |
| # Pre-warm AMFI XLS once before parallel calls | |
| _ensure_amfi_loaded() | |
| warm_index_cache() | |
| results = {} | |
| for name, bm in fund_benchmarks.items(): | |
| cat = (fund_categories or {}).get(name, "") | |
| isin = (fund_isins or {}).get(name, "") | |
| results[name] = fetch_pe_pb( | |
| benchmark_type=bm, | |
| fund_name=name, | |
| category=cat, | |
| scheme_isin=isin, | |
| ) | |
| return results |