MF / src /index_fund_ingest.py
Parthiban97's picture
Upload 15 files
b0e15c1 verified
"""
Index Fund Ingest — capture index funds the same way as raw CSV (mftool/AMFI).
Two sources:
- mftool (default): Same as raw CSV under PS — AMFI category 38 (Index Funds/ETFs).
Returns only the schemes AMFI lists under that category (curated, ~same count as
your fund-stats CSV Index Fund section). Output format matches PS: "Index Fund",
hyphenated fund names.
- mfapi: Search mfapi.in and filter by index; use when you need more schemes.
Usage:
python -m src.index_fund_ingest [--output index_funds.csv] # default: mftool
python -m src.index_fund_ingest --source mfapi [--limit 100] # mfapi search
Then: enrich the output CSV, merge into main fund CSV, run data_engine as usual.
"""
from __future__ import annotations
import argparse
import csv
import re
import time
from datetime import datetime, timedelta
from pathlib import Path
import requests
# Same AMFI gateway as mftool (get_open_ended_other_scheme_performance)
AMFI_FUND_PERFORMANCE_URL = "https://www.amfiindia.com/gateway/pollingsebi/api/amfi/fundperformance"
AMFI_CATEGORY_OTHER = 5
AMFI_SUBCATEGORY_INDEX_FUNDS = 38 # "Index Funds/ETFs"
MFAPI_LIST = "https://api.mfapi.in/mf"
MFAPI_SEARCH = "https://api.mfapi.in/mf/search"
MFAPI_NAV = "https://api.mfapi.in/mf/{scheme_code}"
SLEEP = 0.3 # polite delay between API calls
# CSV headers matching project fund-stats CSV (must match data_engine / csv_enrichment)
FUND_CSV_HEADERS = [
"Fund", "Category", "Scheme Code", "Launch Date", "Total Assets (in Cr)",
"TER", "Turn over (%)", "CAGR Since Inception",
"1 Year CAGR", "1 Year Category CAGR", "1 Year Benchmark CAGR",
"3 Years CAGR", "3 Years Category CAGR", "3 Years Benchmark CAGR",
"5 Years CAGR", "5 Years Category CAGR", "5 Years Benchmark CAGR",
"10 Years CAGR", "10 Years Category CAGR", "10 Years Benchmark CAGR",
"Benchmark Type", "NAV", "Alpha", "Beta", "Standard Deviation",
"Sharpe Ratio", "Volatility", "Mean", "Sortino Ratio",
"Up Market Capture\nRatio", "Down Market Capture\nRatio",
"Maximum Drawdown", "R-Squared", "Information Ratio", "P/E Ratio", "P/B Ratio",
]
# Raw CSV under PS uses "Index Fund" (no "Equity:" prefix) for this category
INDEX_FUND_CATEGORY_PS = "Index Fund"
# mfapi scheme_category (from NAV meta) -> our Category label
CATEGORY_MAP = {
"index fund": "Equity: Index Fund",
"index funds": "Equity: Index Fund",
"equity scheme - index fund": "Equity: Index Fund",
"equity scheme - index funds": "Equity: Index Fund",
}
def _to_hyphenated(name: str) -> str:
"""Convert scheme name to hyphenated form like raw CSV under PS (e.g. DSP-Nifty-50-Index-Fund-Regular-Plan-Growth)."""
if not name:
return ""
# Replace spaces and multiple hyphens with single hyphen, strip
s = re.sub(r"[\s_]+", "-", name.strip())
return re.sub(r"-+", "-", s).strip("-")
def _get_amfi_report_date() -> str:
"""DD-MMM-YYYY for AMFI API. Use last weekday (API returns empty for weekend dates)."""
today = datetime.now().date()
d = today
for _ in range(7):
if d.weekday() < 5: # Mon=0 .. Fri=4
break
d -= timedelta(days=1)
return d.strftime("%d-%b-%Y")
# Scheme name fragments -> Benchmark Type (for nav_metrics_engine)
# Order matters: more specific (e.g. Nifty 500) before generic (Nifty 50)
BENCHMARK_INFER = [
(r"nifty\s*500|nifty500", "Nifty 500"),
(r"nifty\s*200|nifty200", "Nifty 200"),
(r"nifty\s*100|nifty100", "Nifty 100"),
(r"nifty\s*next\s*50|nifty\s*junior|niftyjr", "Nifty Next 50"),
(r"nifty\s*50|nifty50", "Nifty 50"),
(r"nifty\s*midcap\s*150|midcap\s*150", "Nifty Midcap 150"),
(r"nifty\s*smallcap\s*250|smallcap\s*250", "Nifty Smallcap 250"),
(r"sensex|bse\s*sensex", "BSE Sensex"),
(r"bse\s*100", "BSE 100"),
(r"bse\s*500", "BSE 500"),
]
def _normalize_category(meta_category: str | None) -> str:
if not meta_category:
return "Equity: Index Fund"
key = meta_category.strip().lower()
for k, v in CATEGORY_MAP.items():
if k in key:
return v
if "index" in key:
return "Equity: Index Fund"
return meta_category.strip()
def _infer_benchmark(scheme_name: str) -> str:
name = (scheme_name or "").lower()
for pattern, bench in BENCHMARK_INFER:
if re.search(pattern, name):
return bench
return "Nifty 50" # safe default for index funds
def _search_mfapi(query: str, limit: int = 200) -> list[dict]:
"""Return list of {schemeCode, schemeName} from mfapi search."""
try:
resp = requests.get(MFAPI_SEARCH, params={"q": query}, timeout=15)
resp.raise_for_status()
data = resp.json()
if isinstance(data, list):
return data[:limit]
return []
except Exception as e:
print(f" [search] error for '{query}': {e}")
return []
def _fetch_nav_meta(scheme_code: str) -> dict | None:
"""Fetch NAV endpoint and return meta only (scheme_name, scheme_category)."""
url = MFAPI_NAV.format(scheme_code=scheme_code)
try:
resp = requests.get(url, params={"limit": 1}, timeout=15)
resp.raise_for_status()
data = resp.json()
meta = data.get("meta") or {}
return {
"scheme_name": meta.get("scheme_name") or "",
"scheme_category": meta.get("scheme_category") or "",
"fund_house": meta.get("fund_house") or "",
}
except Exception as e:
print(f" [nav meta] {scheme_code}: {e}")
return None
def get_index_funds_via_mftool(verbose: bool = True) -> list[dict]:
"""
Fetch index funds from the same AMFI API used by mftool (category 5, subCategory 38).
Returns the same curated list as would appear in the raw CSV under PS — not 10k schemes.
Each item: scheme_name, benchmark_type. Scheme code is left blank; enrichment will resolve.
"""
out: list[dict] = []
base_date = datetime.now().date()
for day_back in range(8): # try up to 8 days back to get a date with data
d = base_date - timedelta(days=day_back)
if d.weekday() >= 5: # skip weekend
continue
report_date = d.strftime("%d-%b-%Y")
payload = {
"maturityType": 1,
"category": AMFI_CATEGORY_OTHER,
"subCategory": AMFI_SUBCATEGORY_INDEX_FUNDS,
"mfid": 0,
"reportDate": report_date,
}
try:
resp = requests.post(
AMFI_FUND_PERFORMANCE_URL,
headers={"User-Agent": "Mozilla/5.0"},
json=payload,
timeout=25,
)
resp.raise_for_status()
data = resp.json()
raw_list = data.get("data") or []
for item in raw_list:
name = (item.get("schemeName") or "").strip()
if not name:
continue
# Exclude ETFs so we match raw CSV (Index Fund section has open-ended funds only)
if " ETF" in name or name.endswith(" ETF"):
continue
benchmark = (item.get("benchmark") or "").strip() or "Nifty 50"
out.append({
"scheme_name": name,
"benchmark_type": benchmark,
"scheme_code": "", # AMFI API doesn't return code; enrichment resolves
"category": INDEX_FUND_CATEGORY_PS,
})
if out:
if verbose:
print(f"[mftool] AMFI category 38 (Index Funds/ETFs): {len(out)} schemes (report date {report_date})")
break
except Exception as e:
if verbose and day_back == 0:
print(f"[mftool] AMFI request failed for {report_date}: {e}")
continue
if not out and verbose:
print("[mftool] No schemes returned (tried several weekdays). Check AMFI API.")
return out
def _is_index_scheme(meta_category: str, scheme_name: str) -> bool:
"""True if this scheme should be treated as index fund."""
cat = (meta_category or "").lower()
name = (scheme_name or "").lower()
if "index" in cat:
return True
if "index" in name and ("fund" in name or "etf" not in name):
return True
# Explicit index benchmarks in name
if re.search(r"nifty\s*50|nifty\s*next\s*50|sensex|nifty\s*100|nifty\s*500", name):
return True
return False
def discover_index_schemes(
search_queries: list[str] | None = None,
limit_per_query: int = 150,
require_index_category: bool = True,
verbose: bool = True,
) -> list[dict]:
"""
Discover index fund schemes via mfapi search and NAV meta.
Returns list of dicts: scheme_code, scheme_name, category, benchmark_type.
"""
if search_queries is None:
search_queries = ["Index", "Index Fund", "Nifty 50", "Nifty Next 50", "Sensex"]
seen_codes: set[int] = set()
out: list[dict] = []
for q in search_queries:
if verbose:
print(f"[discover] search q={q!r} …")
candidates = _search_mfapi(q, limit=limit_per_query)
for item in candidates:
code = item.get("schemeCode")
if code is None or code in seen_codes:
continue
name = item.get("schemeName") or ""
time.sleep(SLEEP)
meta = _fetch_nav_meta(str(code))
if not meta:
continue
cat = meta.get("scheme_category") or ""
if require_index_category and not _is_index_scheme(cat, name):
continue
seen_codes.add(code)
category = _normalize_category(cat)
benchmark = _infer_benchmark(meta.get("scheme_name") or name)
out.append({
"scheme_code": str(code),
"scheme_name": meta.get("scheme_name") or name,
"category": category,
"benchmark_type": benchmark,
})
if verbose:
print(f" + {meta.get('scheme_name', name)[:55]} | {category} | {benchmark}")
return out
def write_fund_csv(rows: list[dict], path: str | Path) -> None:
"""Write CSV with FUND_CSV_HEADERS; each row is a dict with those keys (blank = '')."""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", encoding="utf-8-sig", newline="") as f:
w = csv.DictWriter(f, fieldnames=FUND_CSV_HEADERS, restval="", extrasaction="ignore")
w.writeheader()
w.writerows(rows)
def build_csv_rows(schemes: list[dict], use_ps_format: bool = False) -> list[dict]:
"""Convert discover output to CSV row dicts (metrics blank).
use_ps_format: when True, Fund = hyphenated name, Category = 'Index Fund' (matches raw CSV under PS).
"""
rows = []
for s in schemes:
row = {h: "" for h in FUND_CSV_HEADERS}
name = s.get("scheme_name") or ""
row["Fund"] = _to_hyphenated(name) if use_ps_format else name.replace(",", " ")
row["Category"] = s.get("category") or ("Index Fund" if use_ps_format else "Equity: Index Fund")
row["Scheme Code"] = s.get("scheme_code") or ""
row["Benchmark Type"] = s.get("benchmark_type") or "Nifty 50"
rows.append(row)
return rows
def run_ingest(
output_path: str | Path = "index_funds.csv",
source: str = "mftool",
search_queries: list[str] | None = None,
limit_per_query: int = 150,
verbose: bool = True,
) -> tuple[list[dict], Path]:
"""
Discover index schemes, build CSV rows, write CSV.
source: "mftool" = same as raw CSV (AMFI category 38, curated list). "mfapi" = search mfapi.
Returns (list of scheme dicts, output path).
"""
if source.lower() == "mftool":
schemes = get_index_funds_via_mftool(verbose=verbose)
use_ps_format = True
else:
schemes = discover_index_schemes(
search_queries=search_queries,
limit_per_query=limit_per_query,
require_index_category=True,
verbose=verbose,
)
use_ps_format = False
rows = build_csv_rows(schemes, use_ps_format=use_ps_format)
out = Path(output_path)
write_fund_csv(rows, out)
if verbose:
print(f"\n[ingest] Wrote {len(rows)} rows to {out.absolute()} (source={source})")
print(" Next: run CSV enrichment on this file, then merge into main fund CSV.")
return schemes, out
def main() -> None:
ap = argparse.ArgumentParser(
description="Index fund ingest — same list as raw CSV (mftool/AMFI) or mfapi search"
)
ap.add_argument("--output", "-o", default="index_funds.csv", help="Output CSV path")
ap.add_argument(
"--source",
choices=("mftool", "mfapi"),
default="mftool",
help="mftool = AMFI category 38 (same as raw CSV under PS). mfapi = search (more schemes).",
)
ap.add_argument("--search", "-s", action="append", default=None,
help="[mfapi only] Search query (repeatable). Default: Index, Index Fund, ...")
ap.add_argument("--limit", "-n", type=int, default=150,
help="[mfapi only] Max schemes per search query")
ap.add_argument("--quiet", "-q", action="store_true", help="Less output")
args = ap.parse_args()
run_ingest(
output_path=args.output,
source=args.source,
search_queries=args.search,
limit_per_query=args.limit,
verbose=not args.quiet,
)
if __name__ == "__main__":
main()