Spaces:
Sleeping
Sleeping
| """ | |
| Index Fund Ingest — capture index funds the same way as raw CSV (mftool/AMFI). | |
| Two sources: | |
| - mftool (default): Same as raw CSV under PS — AMFI category 38 (Index Funds/ETFs). | |
| Returns only the schemes AMFI lists under that category (curated, ~same count as | |
| your fund-stats CSV Index Fund section). Output format matches PS: "Index Fund", | |
| hyphenated fund names. | |
| - mfapi: Search mfapi.in and filter by index; use when you need more schemes. | |
| Usage: | |
| python -m src.index_fund_ingest [--output index_funds.csv] # default: mftool | |
| python -m src.index_fund_ingest --source mfapi [--limit 100] # mfapi search | |
| Then: enrich the output CSV, merge into main fund CSV, run data_engine as usual. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import csv | |
| import re | |
| import time | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| import requests | |
| # Same AMFI gateway as mftool (get_open_ended_other_scheme_performance) | |
| AMFI_FUND_PERFORMANCE_URL = "https://www.amfiindia.com/gateway/pollingsebi/api/amfi/fundperformance" | |
| AMFI_CATEGORY_OTHER = 5 | |
| AMFI_SUBCATEGORY_INDEX_FUNDS = 38 # "Index Funds/ETFs" | |
| MFAPI_LIST = "https://api.mfapi.in/mf" | |
| MFAPI_SEARCH = "https://api.mfapi.in/mf/search" | |
| MFAPI_NAV = "https://api.mfapi.in/mf/{scheme_code}" | |
| SLEEP = 0.3 # polite delay between API calls | |
| # CSV headers matching project fund-stats CSV (must match data_engine / csv_enrichment) | |
| FUND_CSV_HEADERS = [ | |
| "Fund", "Category", "Scheme Code", "Launch Date", "Total Assets (in Cr)", | |
| "TER", "Turn over (%)", "CAGR Since Inception", | |
| "1 Year CAGR", "1 Year Category CAGR", "1 Year Benchmark CAGR", | |
| "3 Years CAGR", "3 Years Category CAGR", "3 Years Benchmark CAGR", | |
| "5 Years CAGR", "5 Years Category CAGR", "5 Years Benchmark CAGR", | |
| "10 Years CAGR", "10 Years Category CAGR", "10 Years Benchmark CAGR", | |
| "Benchmark Type", "NAV", "Alpha", "Beta", "Standard Deviation", | |
| "Sharpe Ratio", "Volatility", "Mean", "Sortino Ratio", | |
| "Up Market Capture\nRatio", "Down Market Capture\nRatio", | |
| "Maximum Drawdown", "R-Squared", "Information Ratio", "P/E Ratio", "P/B Ratio", | |
| ] | |
| # Raw CSV under PS uses "Index Fund" (no "Equity:" prefix) for this category | |
| INDEX_FUND_CATEGORY_PS = "Index Fund" | |
| # mfapi scheme_category (from NAV meta) -> our Category label | |
| CATEGORY_MAP = { | |
| "index fund": "Equity: Index Fund", | |
| "index funds": "Equity: Index Fund", | |
| "equity scheme - index fund": "Equity: Index Fund", | |
| "equity scheme - index funds": "Equity: Index Fund", | |
| } | |
| def _to_hyphenated(name: str) -> str: | |
| """Convert scheme name to hyphenated form like raw CSV under PS (e.g. DSP-Nifty-50-Index-Fund-Regular-Plan-Growth).""" | |
| if not name: | |
| return "" | |
| # Replace spaces and multiple hyphens with single hyphen, strip | |
| s = re.sub(r"[\s_]+", "-", name.strip()) | |
| return re.sub(r"-+", "-", s).strip("-") | |
| def _get_amfi_report_date() -> str: | |
| """DD-MMM-YYYY for AMFI API. Use last weekday (API returns empty for weekend dates).""" | |
| today = datetime.now().date() | |
| d = today | |
| for _ in range(7): | |
| if d.weekday() < 5: # Mon=0 .. Fri=4 | |
| break | |
| d -= timedelta(days=1) | |
| return d.strftime("%d-%b-%Y") | |
| # Scheme name fragments -> Benchmark Type (for nav_metrics_engine) | |
| # Order matters: more specific (e.g. Nifty 500) before generic (Nifty 50) | |
| BENCHMARK_INFER = [ | |
| (r"nifty\s*500|nifty500", "Nifty 500"), | |
| (r"nifty\s*200|nifty200", "Nifty 200"), | |
| (r"nifty\s*100|nifty100", "Nifty 100"), | |
| (r"nifty\s*next\s*50|nifty\s*junior|niftyjr", "Nifty Next 50"), | |
| (r"nifty\s*50|nifty50", "Nifty 50"), | |
| (r"nifty\s*midcap\s*150|midcap\s*150", "Nifty Midcap 150"), | |
| (r"nifty\s*smallcap\s*250|smallcap\s*250", "Nifty Smallcap 250"), | |
| (r"sensex|bse\s*sensex", "BSE Sensex"), | |
| (r"bse\s*100", "BSE 100"), | |
| (r"bse\s*500", "BSE 500"), | |
| ] | |
| def _normalize_category(meta_category: str | None) -> str: | |
| if not meta_category: | |
| return "Equity: Index Fund" | |
| key = meta_category.strip().lower() | |
| for k, v in CATEGORY_MAP.items(): | |
| if k in key: | |
| return v | |
| if "index" in key: | |
| return "Equity: Index Fund" | |
| return meta_category.strip() | |
| def _infer_benchmark(scheme_name: str) -> str: | |
| name = (scheme_name or "").lower() | |
| for pattern, bench in BENCHMARK_INFER: | |
| if re.search(pattern, name): | |
| return bench | |
| return "Nifty 50" # safe default for index funds | |
| def _search_mfapi(query: str, limit: int = 200) -> list[dict]: | |
| """Return list of {schemeCode, schemeName} from mfapi search.""" | |
| try: | |
| resp = requests.get(MFAPI_SEARCH, params={"q": query}, timeout=15) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if isinstance(data, list): | |
| return data[:limit] | |
| return [] | |
| except Exception as e: | |
| print(f" [search] error for '{query}': {e}") | |
| return [] | |
| def _fetch_nav_meta(scheme_code: str) -> dict | None: | |
| """Fetch NAV endpoint and return meta only (scheme_name, scheme_category).""" | |
| url = MFAPI_NAV.format(scheme_code=scheme_code) | |
| try: | |
| resp = requests.get(url, params={"limit": 1}, timeout=15) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| meta = data.get("meta") or {} | |
| return { | |
| "scheme_name": meta.get("scheme_name") or "", | |
| "scheme_category": meta.get("scheme_category") or "", | |
| "fund_house": meta.get("fund_house") or "", | |
| } | |
| except Exception as e: | |
| print(f" [nav meta] {scheme_code}: {e}") | |
| return None | |
| def get_index_funds_via_mftool(verbose: bool = True) -> list[dict]: | |
| """ | |
| Fetch index funds from the same AMFI API used by mftool (category 5, subCategory 38). | |
| Returns the same curated list as would appear in the raw CSV under PS — not 10k schemes. | |
| Each item: scheme_name, benchmark_type. Scheme code is left blank; enrichment will resolve. | |
| """ | |
| out: list[dict] = [] | |
| base_date = datetime.now().date() | |
| for day_back in range(8): # try up to 8 days back to get a date with data | |
| d = base_date - timedelta(days=day_back) | |
| if d.weekday() >= 5: # skip weekend | |
| continue | |
| report_date = d.strftime("%d-%b-%Y") | |
| payload = { | |
| "maturityType": 1, | |
| "category": AMFI_CATEGORY_OTHER, | |
| "subCategory": AMFI_SUBCATEGORY_INDEX_FUNDS, | |
| "mfid": 0, | |
| "reportDate": report_date, | |
| } | |
| try: | |
| resp = requests.post( | |
| AMFI_FUND_PERFORMANCE_URL, | |
| headers={"User-Agent": "Mozilla/5.0"}, | |
| json=payload, | |
| timeout=25, | |
| ) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| raw_list = data.get("data") or [] | |
| for item in raw_list: | |
| name = (item.get("schemeName") or "").strip() | |
| if not name: | |
| continue | |
| # Exclude ETFs so we match raw CSV (Index Fund section has open-ended funds only) | |
| if " ETF" in name or name.endswith(" ETF"): | |
| continue | |
| benchmark = (item.get("benchmark") or "").strip() or "Nifty 50" | |
| out.append({ | |
| "scheme_name": name, | |
| "benchmark_type": benchmark, | |
| "scheme_code": "", # AMFI API doesn't return code; enrichment resolves | |
| "category": INDEX_FUND_CATEGORY_PS, | |
| }) | |
| if out: | |
| if verbose: | |
| print(f"[mftool] AMFI category 38 (Index Funds/ETFs): {len(out)} schemes (report date {report_date})") | |
| break | |
| except Exception as e: | |
| if verbose and day_back == 0: | |
| print(f"[mftool] AMFI request failed for {report_date}: {e}") | |
| continue | |
| if not out and verbose: | |
| print("[mftool] No schemes returned (tried several weekdays). Check AMFI API.") | |
| return out | |
| def _is_index_scheme(meta_category: str, scheme_name: str) -> bool: | |
| """True if this scheme should be treated as index fund.""" | |
| cat = (meta_category or "").lower() | |
| name = (scheme_name or "").lower() | |
| if "index" in cat: | |
| return True | |
| if "index" in name and ("fund" in name or "etf" not in name): | |
| return True | |
| # Explicit index benchmarks in name | |
| if re.search(r"nifty\s*50|nifty\s*next\s*50|sensex|nifty\s*100|nifty\s*500", name): | |
| return True | |
| return False | |
| def discover_index_schemes( | |
| search_queries: list[str] | None = None, | |
| limit_per_query: int = 150, | |
| require_index_category: bool = True, | |
| verbose: bool = True, | |
| ) -> list[dict]: | |
| """ | |
| Discover index fund schemes via mfapi search and NAV meta. | |
| Returns list of dicts: scheme_code, scheme_name, category, benchmark_type. | |
| """ | |
| if search_queries is None: | |
| search_queries = ["Index", "Index Fund", "Nifty 50", "Nifty Next 50", "Sensex"] | |
| seen_codes: set[int] = set() | |
| out: list[dict] = [] | |
| for q in search_queries: | |
| if verbose: | |
| print(f"[discover] search q={q!r} …") | |
| candidates = _search_mfapi(q, limit=limit_per_query) | |
| for item in candidates: | |
| code = item.get("schemeCode") | |
| if code is None or code in seen_codes: | |
| continue | |
| name = item.get("schemeName") or "" | |
| time.sleep(SLEEP) | |
| meta = _fetch_nav_meta(str(code)) | |
| if not meta: | |
| continue | |
| cat = meta.get("scheme_category") or "" | |
| if require_index_category and not _is_index_scheme(cat, name): | |
| continue | |
| seen_codes.add(code) | |
| category = _normalize_category(cat) | |
| benchmark = _infer_benchmark(meta.get("scheme_name") or name) | |
| out.append({ | |
| "scheme_code": str(code), | |
| "scheme_name": meta.get("scheme_name") or name, | |
| "category": category, | |
| "benchmark_type": benchmark, | |
| }) | |
| if verbose: | |
| print(f" + {meta.get('scheme_name', name)[:55]} | {category} | {benchmark}") | |
| return out | |
| def write_fund_csv(rows: list[dict], path: str | Path) -> None: | |
| """Write CSV with FUND_CSV_HEADERS; each row is a dict with those keys (blank = '').""" | |
| path = Path(path) | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(path, "w", encoding="utf-8-sig", newline="") as f: | |
| w = csv.DictWriter(f, fieldnames=FUND_CSV_HEADERS, restval="", extrasaction="ignore") | |
| w.writeheader() | |
| w.writerows(rows) | |
| def build_csv_rows(schemes: list[dict], use_ps_format: bool = False) -> list[dict]: | |
| """Convert discover output to CSV row dicts (metrics blank). | |
| use_ps_format: when True, Fund = hyphenated name, Category = 'Index Fund' (matches raw CSV under PS). | |
| """ | |
| rows = [] | |
| for s in schemes: | |
| row = {h: "" for h in FUND_CSV_HEADERS} | |
| name = s.get("scheme_name") or "" | |
| row["Fund"] = _to_hyphenated(name) if use_ps_format else name.replace(",", " ") | |
| row["Category"] = s.get("category") or ("Index Fund" if use_ps_format else "Equity: Index Fund") | |
| row["Scheme Code"] = s.get("scheme_code") or "" | |
| row["Benchmark Type"] = s.get("benchmark_type") or "Nifty 50" | |
| rows.append(row) | |
| return rows | |
| def run_ingest( | |
| output_path: str | Path = "index_funds.csv", | |
| source: str = "mftool", | |
| search_queries: list[str] | None = None, | |
| limit_per_query: int = 150, | |
| verbose: bool = True, | |
| ) -> tuple[list[dict], Path]: | |
| """ | |
| Discover index schemes, build CSV rows, write CSV. | |
| source: "mftool" = same as raw CSV (AMFI category 38, curated list). "mfapi" = search mfapi. | |
| Returns (list of scheme dicts, output path). | |
| """ | |
| if source.lower() == "mftool": | |
| schemes = get_index_funds_via_mftool(verbose=verbose) | |
| use_ps_format = True | |
| else: | |
| schemes = discover_index_schemes( | |
| search_queries=search_queries, | |
| limit_per_query=limit_per_query, | |
| require_index_category=True, | |
| verbose=verbose, | |
| ) | |
| use_ps_format = False | |
| rows = build_csv_rows(schemes, use_ps_format=use_ps_format) | |
| out = Path(output_path) | |
| write_fund_csv(rows, out) | |
| if verbose: | |
| print(f"\n[ingest] Wrote {len(rows)} rows to {out.absolute()} (source={source})") | |
| print(" Next: run CSV enrichment on this file, then merge into main fund CSV.") | |
| return schemes, out | |
| def main() -> None: | |
| ap = argparse.ArgumentParser( | |
| description="Index fund ingest — same list as raw CSV (mftool/AMFI) or mfapi search" | |
| ) | |
| ap.add_argument("--output", "-o", default="index_funds.csv", help="Output CSV path") | |
| ap.add_argument( | |
| "--source", | |
| choices=("mftool", "mfapi"), | |
| default="mftool", | |
| help="mftool = AMFI category 38 (same as raw CSV under PS). mfapi = search (more schemes).", | |
| ) | |
| ap.add_argument("--search", "-s", action="append", default=None, | |
| help="[mfapi only] Search query (repeatable). Default: Index, Index Fund, ...") | |
| ap.add_argument("--limit", "-n", type=int, default=150, | |
| help="[mfapi only] Max schemes per search query") | |
| ap.add_argument("--quiet", "-q", action="store_true", help="Less output") | |
| args = ap.parse_args() | |
| run_ingest( | |
| output_path=args.output, | |
| source=args.source, | |
| search_queries=args.search, | |
| limit_per_query=args.limit, | |
| verbose=not args.quiet, | |
| ) | |
| if __name__ == "__main__": | |
| main() | |