"""CSV Enrichment — missing-cell filler for mutual fund statistics. Fill pipeline (in order): 0. SCHEME CODE RESOLUTION — fuzzy-match missing scheme codes via mfapi 1. TRIAGE — classify every missing cell 2. NAV ENGINE — compute trailing-3Y metrics from NAV history 3. WEB SEARCH (Firecrawl) — scrape trusted sites for remaining gaps 4. CATEGORY MEDIAN — last-resort statistical imputation Fixes vs original: • Phase-label typo in log (Phase 4 → Phase 5 for imputation step) • Unknown launch date → is_young = False (attempt search, don't silently impute) • _normalize_fund_name uses re.sub to handle multi-hyphen sequences • scheme code resolution runs BEFORE triage so NAV engine fires for more funds • Standard Deviation now included in NAV-computable metrics """ from __future__ import annotations import csv import os import re import threading import time from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from statistics import median from typing import Any import requests from src.nav_metrics_engine import NavEngineCache, compute_nav_metrics_for_scheme from src.scheme_resolver import ( resolve_missing_scheme_codes, resolve_scheme_code_for_fund_name, ) # Fund names attempted by NAV engine this session _NAV_ATTEMPTED_FUNDS: set[str] = set() # ── Tavily global rate-limit state ─────────────────────────────────────────── # Single lock: only one Tavily call dispatched at a time across all workers, # preventing the burst that triggers 429/432. _TAVILY_LOCK = threading.Lock() _TAVILY_LAST_CALL_TS = 0.0 # monotonic seconds of last successful dispatch _TAVILY_BLOCKED_UNTIL = 0.0 # monotonic seconds until allowed to call again # Per-run abort flag: new enrich_csv() stamps a new run-id; background threads # from stale Streamlit reruns check this and exit early. _CURRENT_RUN_ID: list[int] = [0] # ── Constants ──────────────────────────────────────────────────────────────── MISSING_TOKENS = {"", "-", "na", "n/a", "n/a*", "nan", "none", "null", "—"} TARGET_COLUMNS = ( "Alpha", "Beta", "Standard Deviation", "Sharpe Ratio", "Volatility", "Mean", "Sortino Ratio", "Up Market Capture\nRatio", "Down Market Capture\nRatio", "Maximum Drawdown", "R-Squared", "Information Ratio", "P/E Ratio", "P/B Ratio", ) # For all of these risk/ratio metrics, a literal numeric 0 is usually a # data-quality gap rather than a meaningful "zero risk" value. We therefore # treat 0 as missing so that enrichment (NAV engine + web search) can attempt # to backfill real numbers. ZERO_AS_MISSING_COLUMNS = set(TARGET_COLUMNS) # ALL metrics that are equity-specific and should NOT be attempted # via NAV engine or web search for debt/liquid/overnight funds. # Sharpe, Sortino, Volatility etc. ARE computed by NAV engine for equity # but for debt funds they either don't exist or are meaningless. EQUITY_ONLY_METRICS = { "Alpha", "Beta", "Standard Deviation", "Sharpe Ratio", "Volatility", "Mean", "Sortino Ratio", "Up Market Capture\nRatio", "Down Market Capture\nRatio", "Maximum Drawdown", "R-Squared", "Information Ratio", "P/E Ratio", "P/B Ratio", } DEBT_CATEGORIES_PREFIXES = ( "Debt:", "Liquid", "Overnight", "Money Market", "Gilt", "Fixed Maturity", "Interval Fund", "FMP", ) MIN_YEARS_FOR_RISK_METRICS = 3 TRUSTED_DOMAINS = ( "valueresearchonline.com", "morningstar.in", "moneycontrol.com", "advisorkhoj.com", "amfiindia.com", "tickertape.in", ) METRIC_ALIASES: dict[str, list[str]] = { "Alpha": ["alpha"], "Beta": ["beta"], "Standard Deviation": ["standard deviation", "std dev", "std. dev"], "Sharpe Ratio": ["sharpe ratio", "sharpe"], "Volatility": ["volatility"], "Mean": ["mean", "mean return"], "Sortino Ratio": ["sortino ratio", "sortino"], "Up Market Capture\nRatio": ["upside", "up market capture", "upside capture", "up capture"], "Down Market Capture\nRatio": ["downside", "down market capture", "downside capture", "down capture"], "Maximum Drawdown": ["maximum drawdown", "max drawdown", "maximum"], "R-Squared": ["r-squared", "r squared", "r2", "r²"], "Information Ratio": ["information ratio", "info ratio"], "P/E Ratio": ["p/e ratio", "p/e", "pe ratio", "pe"], "P/B Ratio": ["p/b ratio", "p/b", "pb ratio", "pb"], } # ── Config & Result ────────────────────────────────────────────────────────── @dataclass class EnrichmentConfig: enabled: bool = True max_cells: int | None = None min_confidence: float = 0.65 search_limit: int = 5 impute_unresolved: bool = True filter_category: str | None = None target_columns: tuple[str, ...] = TARGET_COLUMNS trusted_domains: tuple[str, ...] = TRUSTED_DOMAINS enable_nav_engine: bool = True resolve_scheme_codes: bool = True # run pre-triage code resolution web_search_pe_pb_only: bool = False # limit web search to P/E and P/B only web_workers: int = 2 # parallel Tavily workers (keep low!) tavily_retries: int = 4 # retries on 429/432 tavily_backoff_seconds: float = 15.0 # backoff multiplier per retry tavily_min_interval_seconds: float = 4.0 # minimum seconds between Tavily calls @dataclass class EnrichmentResult: input_csv_path: str enriched_csv_path: str scratchpad_path: str | None = None examined_cells: int = 0 updated_cells: int = 0 imputed_cells: int = 0 skipped_cells: int = 0 resolved_codes: int = 0 # NEW: how many scheme codes were resolved # Optional breakdowns used by older callers / UIs nav_cells: int = 0 # cells filled via NAV engine web_cells: int = 0 # cells filled via web search errors: list[str] = field(default_factory=list) # ── Triage labels ──────────────────────────────────────────────────────────── TRIAGE_YOUNG = "YOUNG_FUND" TRIAGE_NOT_APPLICABLE = "NOT_APPLICABLE" TRIAGE_SEARCHABLE = "SEARCHABLE" # ── Helpers ────────────────────────────────────────────────────────────────── def _load_env() -> None: env_path = Path(__file__).resolve().parent.parent / ".env" if not env_path.exists(): return for line in env_path.read_text(encoding="utf-8").splitlines(): raw = line.strip() if not raw or raw.startswith("#") or "=" not in raw: continue k, v = raw.split("=", 1) os.environ.setdefault(k.strip(), v.strip()) def _is_missing(val: str | None) -> bool: return (val or "").strip().lower() in MISSING_TOKENS def _parse_launch_date(val: str | None) -> datetime | None: if not val: return None for fmt in ("%d-%m-%Y", "%Y-%m-%d", "%d/%m/%Y"): try: return datetime.strptime(val.strip(), fmt) except ValueError: continue return None def _is_debt_category(category: str) -> bool: cat = (category or "").strip().lower() for prefix in DEBT_CATEGORIES_PREFIXES: if cat.startswith(prefix.lower()): return True return False def _normalize_fund_name(name: str) -> str: # FIX: use re.sub so multi-hyphen runs collapse to a single space return re.sub(r"-+", " ", name).strip() def _looks_like_category_label(fund_name: str, category: str) -> bool: """Return True for header/category rows that aren't real fund entries.""" fund = (fund_name or "").strip().lower() cat = (category or "").strip().lower() if not fund: return True if cat and fund == cat: return True # Patterns like "Equity: Contra" or "Debt: Short Duration" — no hyphen, no "fund" if ":" in fund and "-" not in fund and not any( tok in fund for tok in (" fund", " plan", " option", " scheme") ): return True return False def _build_category_medians( rows: list[dict[str, str]], columns: tuple[str, ...] ) -> dict[str, dict[str, float]]: """Returns {category: {column: median_value}}.""" buckets: dict[str, dict[str, list[float]]] = {} for row in rows: cat = row.get("Category", "") if not cat: continue if cat not in buckets: buckets[cat] = {c: [] for c in columns} for col in columns: raw = (row.get(col) or "").strip().replace("%", "").replace(",", "") if raw.lower() in MISSING_TOKENS: continue try: buckets[cat][col].append(float(raw)) except ValueError: pass result: dict[str, dict[str, float]] = {} for cat, col_map in buckets.items(): result[cat] = {} for col, vals in col_map.items(): if vals: result[cat][col] = round(median(vals), 4) return result # ── Triage ─────────────────────────────────────────────────────────────────── @dataclass class TriagedCell: row_idx: int fund_name: str category: str column: str current_value: str label: str reason: str def _triage_missing_cells( rows: list[dict[str, str]], config: EnrichmentConfig, ) -> list[TriagedCell]: """Classify every missing cell with reasoning.""" today = datetime.now() cells: list[TriagedCell] = [] for idx, row in enumerate(rows): fund = row.get("Fund", "") cat = row.get("Category", "") launch_str = row.get("Launch Date", "") if config.filter_category and cat != config.filter_category: continue launch_dt = _parse_launch_date(launch_str) if launch_dt is not None: age_years = (today - launch_dt).days / 365.25 is_young = age_years < MIN_YEARS_FOR_RISK_METRICS else: # FIX: unknown date → do NOT silently mark as young; attempt search is_young = False is_debt = _is_debt_category(cat) for col in config.target_columns: raw = (row.get(col) or "").strip() # Base missing check (blank, "-", "N/A", etc.) is_missing_val = _is_missing(raw) # Additionally, for all ZERO_AS_MISSING_COLUMNS, treat an exact # numeric 0 as "missing" so enrichment will try to fill it. if not is_missing_val and col in ZERO_AS_MISSING_COLUMNS: norm = raw.replace("%", "").replace(",", "").strip() try: if float(norm) == 0.0: is_missing_val = True except ValueError: pass if not is_missing_val: continue # PE/PB are point-in-time portfolio metrics — fund age is irrelevant. # Always attempt to fetch them regardless of how young the fund is. age_exempt = col in ("P/E Ratio", "P/B Ratio") if is_young and not age_exempt: cells.append(TriagedCell( row_idx=idx, fund_name=fund, category=cat, column=col, current_value=raw, label=TRIAGE_YOUNG, reason=(f"Fund launched {launch_str or '(unknown)'}, " f"<{MIN_YEARS_FOR_RISK_METRICS}yr history — metric not computed yet"), )) else: cells.append(TriagedCell( row_idx=idx, fund_name=fund, category=cat, column=col, current_value=raw, label=TRIAGE_SEARCHABLE, reason=(f"Fund launched {launch_str or '(unknown date)'}, " f"category '{cat}' — metric should exist, attempting NAV/web"), )) return cells # ── Markdown table parser ──────────────────────────────────────────────────── def _extract_number(text: str) -> float | None: text = text.strip().replace(",", "") if text.lower() in MISSING_TOKENS or text == "—": return None match = re.search(r"-?\d+\.?\d*", text) if match: try: return float(match.group()) except ValueError: return None return None def _parse_table_row(markdown: str, alias: str) -> float | None: """Extract the first numeric cell after the label in a markdown table row. For a row like: | Alpha | 1.59 | -0.56 | 8.25 | Returns 1.59 — the fund's own column (leftmost numeric value after the label). This is intentional: sites like Morningstar show Fund | Category | Index and we want the fund value, not the category or index value. """ pattern = re.compile( r"\|\s*" + re.escape(alias) + r"\s*\|(.+?)(?:\n|$)", re.IGNORECASE, ) for m in pattern.finditer(markdown): rest = m.group(1) cells = [c.strip() for c in rest.split("|")] for cell in cells: val = _extract_number(cell) if val is not None: return val return None def _parse_metrics_from_markdown( markdown: str, wanted_metrics: list[str] ) -> dict[str, float | None]: found: dict[str, float | None] = {} for metric in wanted_metrics: aliases = METRIC_ALIASES.get(metric, [metric.lower()]) best_val: float | None = None for alias in aliases: if alias.lower() not in markdown.lower(): continue val = _parse_table_row(markdown, alias) if val is not None: best_val = val break found[metric] = best_val return found # ── Web search (Firecrawl) ─────────────────────────────────────────────────── def _post_tavily( endpoint: str, payload: dict, *, retries: int, backoff: float, min_interval: float, ) -> "requests.Response | None": """Single-threaded Tavily dispatcher with shared cooldown and retry logic.""" global _TAVILY_LAST_CALL_TS, _TAVILY_BLOCKED_UNTIL url = f"https://api.tavily.com/{endpoint}" for attempt in range(retries + 1): with _TAVILY_LOCK: now = time.monotonic() wait = max( (_TAVILY_LAST_CALL_TS + min_interval) - now, _TAVILY_BLOCKED_UNTIL - now, 0.0, ) if wait > 0: time.sleep(wait) try: resp = requests.post( url, headers={"Content-Type": "application/json"}, json=payload, timeout=30, ) _TAVILY_LAST_CALL_TS = time.monotonic() except Exception as exc: print(f" [tavily] {endpoint} connection error: {exc}") return None if resp.status_code in {429, 432}: wait_retry = backoff * (attempt + 1) print(f" [tavily] {endpoint} rate-limited ({resp.status_code}); " f"waiting {wait_retry:.1f}s before retry") with _TAVILY_LOCK: _TAVILY_BLOCKED_UNTIL = max( _TAVILY_BLOCKED_UNTIL, time.monotonic() + wait_retry ) if attempt < retries: continue return None try: resp.raise_for_status() except Exception as exc: print(f" [tavily] {endpoint} HTTP error: {exc}") return None return resp return None def _call_tavily_search( query: str, api_key: str, limit: int = 5, *, retries: int = 4, backoff_seconds: float = 15.0, min_interval_seconds: float = 4.0, ) -> list[dict]: """Search using Tavily API. Returns list of dicts with 'url' and 'markdown' keys.""" resp = _post_tavily( "search", {"api_key": api_key, "query": query, "max_results": limit, "include_raw_content": True, "search_depth": "advanced"}, retries=retries, backoff=backoff_seconds, min_interval=min_interval_seconds, ) if resp is None: return [] results = [] for r in resp.json().get("results", []): results.append({ "url": r.get("url", ""), "markdown": r.get("raw_content") or r.get("content", ""), }) return results # Keep firecrawl as alias name so _search_fund_metrics calls work unchanged _call_firecrawl_search = _call_tavily_search def _scrape_url( url: str, api_key: str, *, retries: int = 4, backoff_seconds: float = 15.0, min_interval_seconds: float = 4.0, ) -> str: """Fetch page content using Tavily extract API.""" resp = _post_tavily( "extract", {"api_key": api_key, "urls": [url]}, retries=retries, backoff=backoff_seconds, min_interval=min_interval_seconds, ) if resp is None: return "" results = resp.json().get("results", []) return results[0].get("raw_content", "") if results else "" def _derive_morningstar_risk_url(any_ms_url: str) -> str | None: if "morningstar.in/mutualfunds/" not in any_ms_url: return None for suffix in ("fund-factsheet.aspx", "overview.aspx", "portfolio.aspx", "performance.aspx", "detailed-portfolio.aspx"): if suffix in any_ms_url: return any_ms_url.replace(suffix, "risk-ratings.aspx") if "risk-ratings.aspx" in any_ms_url: return any_ms_url return None def _derive_morningstar_portfolio_url(any_ms_url: str) -> str | None: """Derive the Morningstar portfolio page (for P/E and P/B).""" if "morningstar.in/mutualfunds/" not in any_ms_url: return None return re.sub( r"(fund-factsheet|overview|risk-ratings|performance|detailed-portfolio)\.aspx", "portfolio.aspx", any_ms_url, ) def _search_fund_metrics( fund_name: str, missing_metrics: list[str], config: EnrichmentConfig, firecrawl_key: str, ) -> tuple[dict[str, float | None], list[str]]: from urllib.parse import urlparse readable = _normalize_fund_name(fund_name) query = f"{readable} risk rating alpha beta sharpe morningstar" print(f" [search] query: {query[:80]}") results = _call_firecrawl_search( query, firecrawl_key, config.search_limit, retries=config.tavily_retries, backoff_seconds=config.tavily_backoff_seconds, min_interval_seconds=config.tavily_min_interval_seconds, ) if not results: print(" [search] no results") return {m: None for m in missing_metrics}, [] trusted, other = [], [] for r in results: url = r.get("url", "") domain = urlparse(url).netloc.lower().replace("www.", "") (trusted if any(td in domain for td in config.trusted_domains) else other).append(r) use = (trusted + other)[:3] source_urls = [r.get("url", "") for r in use] print(f" [search] using {len(use)} sources: {[urlparse(u).netloc for u in source_urls]}") combined = "" for r in use: md = r.get("markdown", "") if md: combined += f"\n\n--- {r.get('url', '')} ---\n{md}" # Morningstar: scrape risk-ratings page if not already in results ms_risk_url = None for r in use: ms_risk_url = _derive_morningstar_risk_url(r.get("url", "")) if ms_risk_url: break if ms_risk_url and "risk-ratings" not in " ".join(source_urls): print(f" [scrape] Morningstar risk page: {ms_risk_url}") risk_md = _scrape_url(ms_risk_url, firecrawl_key, retries=config.tavily_retries, backoff_seconds=config.tavily_backoff_seconds, min_interval_seconds=config.tavily_min_interval_seconds, ) if risk_md: combined += f"\n\n--- {ms_risk_url} ---\n{risk_md}" source_urls.append(ms_risk_url) # Morningstar: scrape portfolio page for P/E and P/B pe_pb_needed = {"P/E Ratio", "P/B Ratio"} & set(missing_metrics) if pe_pb_needed and ms_risk_url: ms_port_url = _derive_morningstar_portfolio_url(ms_risk_url) if ms_port_url and ms_port_url not in source_urls: print(f" [scrape] Morningstar portfolio page: {ms_port_url}") port_md = _scrape_url(ms_port_url, firecrawl_key, retries=config.tavily_retries, backoff_seconds=config.tavily_backoff_seconds, min_interval_seconds=config.tavily_min_interval_seconds, ) if port_md: combined += f"\n\n--- {ms_port_url} ---\n{port_md}" source_urls.append(ms_port_url) # If we still have no markdown content, or if later we still miss # metrics, we'll do a second pass focused on ValueResearch. if not combined.strip(): print(" [search] no markdown from initial sources; retrying via valueresearchonline…") vr_query = f"{readable} {' '.join(missing_metrics)} valueresearchonline" vr_results = _call_firecrawl_search( vr_query, firecrawl_key, config.search_limit, retries=config.tavily_retries, backoff_seconds=config.tavily_backoff_seconds, min_interval_seconds=config.tavily_min_interval_seconds, ) if vr_results: vr_combined = "" for r in vr_results: url = r.get("url", "") domain = urlparse(url).netloc.lower().replace("www.", "") if "valueresearchonline.com" not in domain: continue md = r.get("markdown", "") if md: vr_combined += f"\n\n--- {url} ---\n{md}" source_urls.append(url) combined = vr_combined if not combined.strip(): print(" [search] no markdown content after ValueResearch retry") return {m: None for m in missing_metrics}, source_urls found = _parse_metrics_from_markdown(combined, missing_metrics) for m, v in found.items(): print(f" [parsed] {m} = {v if v is not None else 'NOT FOUND'}") return found, source_urls # ── Scratchpad ─────────────────────────────────────────────────────────────── def _write_scratchpad( path: Path, triaged: list[TriagedCell], resolved_codes: dict[str, str], nav_results: dict[str, dict[str, float | None]], web_results: dict[str, dict[str, float | None]], web_sources: dict[str, list[str]], medians_used: list[tuple[str, str, float]], nav_filled: list[tuple[str, str, float]], web_filled: list[tuple[str, str, float]], ) -> None: lines = [ "=" * 70, "ENRICHMENT SCRATCHPAD", f"Generated: {datetime.now().isoformat()}", "=" * 70, "", ] if resolved_codes: lines += ["-" * 70, f"SCHEME CODES RESOLVED ({len(resolved_codes)})", "-" * 70] for fund, code in resolved_codes.items(): lines.append(f" {fund[:60]:60s} → {code}") lines.append("") young = [c for c in triaged if c.label == TRIAGE_YOUNG] na = [c for c in triaged if c.label == TRIAGE_NOT_APPLICABLE] searchable = [c for c in triaged if c.label == TRIAGE_SEARCHABLE] lines += [ f"TOTAL MISSING CELLS: {len(triaged)}", f" YOUNG_FUND (auto-impute): {len(young)}", f" NOT_APPLICABLE (auto-impute): {len(na)}", f" SEARCHABLE (nav/web): {len(searchable)}", "", "-" * 70, "TRIAGE DECISIONS", "-" * 70, ] for c in triaged: lines.append(f" [{c.label:16s}] {c.fund_name} :: {c.column}") lines.append(f" Reason: {c.reason}") lines.append("") if nav_results: lines += ["-" * 70, "NAV ENGINE RESULTS (TRAILING 3Y)", "-" * 70] for fund_key, metrics in nav_results.items(): lines.append(f" Fund: {fund_key}") for metric, val in metrics.items(): lines.append(f" {metric}: {'FOUND = ' + str(val) if val is not None else 'NOT_FOUND'}") lines.append("") if web_results: lines += ["-" * 70, "WEB SEARCH RESULTS", "-" * 70] for fund_key, metrics in web_results.items(): lines.append(f" Fund: {fund_key}") for s in web_sources.get(fund_key, []): lines.append(f" Source: {s}") for metric, val in metrics.items(): lines.append(f" {metric}: {'FOUND = ' + str(val) if val is not None else 'NOT_FOUND'}") lines.append("") for section_label, items in [ (f"NAV-FILLED VALUES ({len(nav_filled)})", nav_filled), (f"WEB-FILLED VALUES ({len(web_filled)})", web_filled), (f"CATEGORY-MEDIAN IMPUTED ({len(medians_used)})", medians_used), ]: if items: lines += ["-" * 70, section_label, "-" * 70] for fund, col, val in items: lines.append(f" {fund} :: {col} = {val}") lines.append("") lines += ["=" * 70, "END OF SCRATCHPAD", "=" * 70] path.write_text("\n".join(lines), encoding="utf-8") # ── Main entry point ───────────────────────────────────────────────────────── def enrich_csv( csv_path: str, config: EnrichmentConfig | None = None, ) -> EnrichmentResult: """Parse CSV → resolve codes → triage → NAV engine → web fallback → median impute → write. (Previously named enrich_csv_with_firecrawl_and_kimi; renamed for clarity.) """ if config is None: config = EnrichmentConfig() _load_env() src = Path(csv_path) result = EnrichmentResult(input_csv_path=csv_path, enriched_csv_path=csv_path) if not config.enabled or not src.exists(): return result with open(src, encoding="utf-8-sig", newline="") as f: reader = csv.DictReader(f) fieldnames = list(reader.fieldnames or []) rows = list(reader) if not rows: return result # ── Phase 0: Scheme Code Resolution ───────────────────────────────── resolved_codes: dict[str, str] = {} if config.resolve_scheme_codes: print("[enrichment] Phase 0: Resolving missing scheme codes…") rows, resolved_codes = resolve_missing_scheme_codes(rows, verbose=True) result.resolved_codes = len(resolved_codes) # ── Phase 1: Triage ────────────────────────────────────────────────── print("[enrichment] Phase 1: Triage — classifying missing cells…") triaged = _triage_missing_cells(rows, config) result.examined_cells = len(triaged) if not triaged: print("[enrichment] No missing cells found.") _write_output(src, rows, fieldnames, result) return result searchable = [c for c in triaged if c.label == TRIAGE_SEARCHABLE] imputable = [c for c in triaged if c.label != TRIAGE_SEARCHABLE] print(f"[enrichment] {len(triaged)} missing cells: " f"{len(searchable)} SEARCHABLE, {len(imputable)} auto-impute") if config.max_cells is not None: searchable = searchable[:config.max_cells] # ── Phase 2: Category medians ──────────────────────────────────────── print("[enrichment] Phase 2: Computing category medians…") cat_medians = _build_category_medians(rows, config.target_columns) # ── Phase 3: NAV engine ────────────────────────────────────────────── nav_results: dict[str, dict[str, float | None]] = {} nav_filled: list[tuple[str, str, float]] = [] if searchable and config.enable_nav_engine: print("[enrichment] Phase 3: NAV engine — computing trailing 3Y metrics…") nav_cache = NavEngineCache() # All funds with missing cells go through NAV engine — including debt/liquid. # Debt funds can have valid Sharpe, Mean, Volatility etc. from their NAV history. searchable_for_nav = searchable row_groups: dict[int, list[TriagedCell]] = {} for cell in searchable_for_nav: row_groups.setdefault(cell.row_idx, []).append(cell) total_rows = len(row_groups) processed_count = 0 nav_lock = __import__("threading").Lock() NAV_WORKERS = 20 # mfapi is stateless REST — scales well beyond 12 # ── Pre-warm: bulk load NAV + benchmarks before workers touch network ── # Step 1: Pull all valid scheme codes and unique benchmarks from rows from src.nav_metrics_engine import _bulk_preload_cache, _prewarm_benchmarks _scheme_codes = [ (rows[ri].get("Scheme Code") or "").strip() for ri in row_groups if (rows[ri].get("Scheme Code") or "").strip().isdigit() ] _bench_tickers_raw = [ rows[ri].get("Benchmark Type", "") for ri in row_groups ] # Step 2: Resolve benchmark type → ticker (same logic as nav engine) from src.nav_metrics_engine import resolve_benchmark_ticker _bench_tickers = list(dict.fromkeys( resolve_benchmark_ticker(b) for b in _bench_tickers_raw if b )) # Step 3: Bulk load from Neon in 1 SQL query (nav + bench keys) _bulk_preload_cache(_scheme_codes, _bench_tickers) # Step 4: Download any cold benchmark tickers in parallel NOW, # before workers start — eliminates yfinance contention _prewarm_benchmarks(_bench_tickers) def _process_one_fund(args): row_idx, cells = args row = rows[row_idx] fund_name = row.get("Fund", "") scheme_code = (row.get("Scheme Code") or "").strip() benchmark_type = row.get("Benchmark Type", "") needed_metrics = [c.column for c in cells] if not scheme_code: return fund_name, {}, cells metrics, skip = compute_nav_metrics_for_scheme( scheme_code=scheme_code, benchmark_type=benchmark_type, needed_metrics=needed_metrics, cache=nav_cache, ) joined_reasons = " | ".join(skip.values()).lower() should_refresh_code = ( "returned no nav history" in joined_reasons or "nav history is stale" in joined_reasons ) if should_refresh_code: refreshed_code, _ = resolve_scheme_code_for_fund_name(fund_name) if refreshed_code and refreshed_code != scheme_code: row["Scheme Code"] = refreshed_code metrics, skip = compute_nav_metrics_for_scheme( scheme_code=refreshed_code, benchmark_type=benchmark_type, needed_metrics=needed_metrics, cache=nav_cache, ) return fund_name, metrics, cells from concurrent.futures import ThreadPoolExecutor, as_completed work_items = list(row_groups.items()) with ThreadPoolExecutor(max_workers=NAV_WORKERS) as executor: futures = {executor.submit(_process_one_fund, item): item for item in work_items} for fut in as_completed(futures): fund_name, metrics, cells = fut.result() with nav_lock: processed_count += 1 nav_results[fund_name] = metrics for cell in cells: val = metrics.get(cell.column) if val is not None: rows[cell.row_idx][cell.column] = str(round(float(val), 4)) result.updated_cells += 1 nav_filled.append((fund_name, cell.column, float(val))) # Only mark as attempted if MDD was actually filled — # drawdown_zero_fix should still retry funds where MDD came back None if metrics.get("Maximum Drawdown") is not None: _NAV_ATTEMPTED_FUNDS.add(fund_name) if processed_count % 20 == 0 or processed_count == total_rows: print(f" [nav] {processed_count}/{total_rows} funds processed…") # Keep only still-missing searchable cells for web phase searchable = [c for c in searchable if _is_missing(rows[c.row_idx].get(c.column, ""))] print(f"[enrichment] NAV phase resolved {len(nav_filled)} cells; " f"{len(searchable)} remain for web search") result.nav_cells = len(nav_filled) # ── Phase 4: Web search ────────────────────────────────────────────── web_results: dict[str, dict[str, float | None]] = {} web_sources: dict[str, list[str]] = {} web_filled: list[tuple[str, str, float]] = [] firecrawl_key = os.environ.get("TAVILY_API_KEY", "") # ── Phase 4a: Screener.in PE/PB (no API key needed) ────────────────────── # Weighted average from holdings; results Neon-cached 30 days. pe_pb_searchable = [c for c in searchable if c.column in ("P/E Ratio", "P/B Ratio")] pe_pb_funds: dict[str, list[TriagedCell]] = {} for cell in pe_pb_searchable: pe_pb_funds.setdefault(cell.fund_name, []).append(cell) if pe_pb_funds: try: from src.pe_pb_engine import fetch_pe_pb as _fetch_pe_pb except ImportError: try: from pe_pb_engine import fetch_pe_pb as _fetch_pe_pb except ImportError: _fetch_pe_pb = None # type: ignore if _fetch_pe_pb is not None: print(f"[enrichment] Phase 4a: NSE index PE/PB — {len(pe_pb_funds)} funds") # Warm the index cache once before spawning threads _fetch_pe_pb.__module__ and __import__( _fetch_pe_pb.__module__.split(".")[0] ) # ensure module loaded try: from src.pe_pb_engine import warm_index_cache as _warm except ImportError: from pe_pb_engine import warm_index_cache as _warm _warm() pepb_lock = threading.Lock() pepb_done = [0] def _screener_one(args): fund_name, cells = args # Get benchmark_type from the row (not scheme_code) bm = "" for cell in cells: row = rows[cell.row_idx] if row: bm = str(row.get("Benchmark Type") or "").strip() if bm: break pe, pb = _fetch_pe_pb(benchmark_type=bm, fund_name=fund_name) if bm else (None, None) found: dict[str, float] = {} if pe is not None: found["P/E Ratio"] = pe if pb is not None: found["P/B Ratio"] = pb return fund_name, cells, found from concurrent.futures import ThreadPoolExecutor as _PePbTPE, as_completed as _pepb_done_iter with _PePbTPE(max_workers=5) as pepb_ex: pepb_futs = { pepb_ex.submit(_screener_one, item): item for item in pe_pb_funds.items() } for fut in _pepb_done_iter(pepb_futs): fund_name, cells, found = fut.result() with pepb_lock: pepb_done[0] += 1 web_results.setdefault(fund_name, {}).update(found) web_sources.setdefault(fund_name, []).append("nse-index-pe") for cell in cells: val = found.get(cell.column) if val is not None: rows[cell.row_idx][cell.column] = str(val) result.updated_cells += 1 web_filled.append((fund_name, cell.column, val)) print(f" [{pepb_done[0]}/{len(pe_pb_funds)}] " f"{fund_name[:45]} -> {cell.column} = {val} (screener)") # Re-filter: drop cells already filled by Screener searchable = [c for c in searchable if _is_missing(rows[c.row_idx].get(c.column, "") if rows[c.row_idx] else "")] # ── Phase 4b: Tavily web search for remaining cells ─────────────────────── if searchable and firecrawl_key: # Stamp run-id so stale background threads from prior runs can bail _CURRENT_RUN_ID[0] += 1 my_run_id = _CURRENT_RUN_ID[0] fund_groups: dict[str, list[TriagedCell]] = {} for cell in searchable: fund_groups.setdefault(cell.fund_name, []).append(cell) print(f"[enrichment] Phase 4b: Tavily web search — {len(searchable)} cells " f"across {len(fund_groups)} funds") web_search_groups: dict[str, list[TriagedCell]] = {} for fund_name, cells in fund_groups.items(): # Skip category header rows (e.g. "Equity: Contra") — not real funds category = cells[0].category if cells else "" if _looks_like_category_label(fund_name, category): for cell in cells: med = cat_medians.get(cell.category, {}).get(cell.column) if med is not None and config.impute_unresolved: rows[cell.row_idx][cell.column] = str(med) result.updated_cells += 1 result.imputed_cells += 1 else: result.skipped_cells += 1 continue if config.web_search_pe_pb_only: cells_to_impute = [c for c in cells if c.column not in ("P/E Ratio", "P/B Ratio")] for cell in cells_to_impute: med = cat_medians.get(cell.category, {}).get(cell.column) if med is not None and config.impute_unresolved: rows[cell.row_idx][cell.column] = str(med) result.updated_cells += 1 result.imputed_cells += 1 cells_for_web = [c for c in cells if c.column in ("P/E Ratio", "P/B Ratio")] else: cells_for_web = cells if cells_for_web: web_search_groups[fund_name] = cells_for_web WEB_WORKERS = max(1, config.web_workers) web_lock = threading.Lock() web_done = [0] total_web = len(web_search_groups) def _search_one_fund(args): fund_name, cells = args # Abort if a newer run has started (Streamlit rerun safety) if _CURRENT_RUN_ID[0] != my_run_id: return fund_name, cells, {}, [] missing_metrics = [c.column for c in cells] found, sources = _search_fund_metrics( fund_name, missing_metrics, config, firecrawl_key) return fund_name, cells, found, sources from concurrent.futures import ThreadPoolExecutor as _WebTPE, as_completed as _web_as_completed with _WebTPE(max_workers=WEB_WORKERS) as web_executor: futures = { web_executor.submit(_search_one_fund, item): item for item in web_search_groups.items() } for fut in _web_as_completed(futures): fund_name, cells, found, sources = fut.result() with web_lock: web_done[0] += 1 web_results.setdefault(fund_name, {}).update(found) web_sources.setdefault(fund_name, []).extend(sources) print(f"\n[{web_done[0]}/{total_web}] {fund_name}") for cell in cells: val = found.get(cell.column) if val is not None: rows[cell.row_idx][cell.column] = str(val) result.updated_cells += 1 web_filled.append((fund_name, cell.column, val)) print(f" -> {cell.column} = {val} (web)") else: med = cat_medians.get(cell.category, {}).get(cell.column) if med is not None and config.impute_unresolved: rows[cell.row_idx][cell.column] = str(med) result.imputed_cells += 1 print(f" ~> {cell.column} = {med} (median)") else: result.skipped_cells += 1 print(f" x> {cell.column} — not found, no median") elif searchable and not firecrawl_key: print("[enrichment] No TAVILY_API_KEY — PE/PB via Screener only; remaining → medians") for cell in searchable: med = cat_medians.get(cell.category, {}).get(cell.column) if med is not None and config.impute_unresolved: rows[cell.row_idx][cell.column] = str(med) result.imputed_cells += 1 else: result.skipped_cells += 1 # ── Phase 5: Impute non-searchable (YOUNG / NOT_APPLICABLE) cells ──── # FIX: was incorrectly labelled "Phase 4" in log medians_used: list[tuple[str, str, float]] = [] if imputable and config.impute_unresolved: print(f"\n[enrichment] Phase 5: Imputing {len(imputable)} non-searchable cells…") for cell in imputable: med = cat_medians.get(cell.category, {}).get(cell.column) if med is not None: rows[cell.row_idx][cell.column] = str(med) result.imputed_cells += 1 medians_used.append((cell.fund_name, cell.column, med)) else: result.skipped_cells += 1 elif imputable: result.skipped_cells += len(imputable) # Record how many cells came from web search result.web_cells = len(web_filled) # ── Phase 6: Write enriched CSV ────────────────────────────────────── _write_output(src, rows, fieldnames, result) # ── Phase 7: Scratchpad ────────────────────────────────────────────── scratch_dir = Path("scratchpad") scratch_dir.mkdir(exist_ok=True) stamp = datetime.now().strftime("%Y%m%d_%H%M%S") scratch_path = scratch_dir / f"enrichment_{stamp}.txt" _write_scratchpad( scratch_path, triaged, resolved_codes, nav_results, web_results, web_sources, medians_used, nav_filled, web_filled, ) result.scratchpad_path = str(scratch_path) print(f"\n[enrichment] DONE — nav_filled={len(nav_filled)} web_filled={len(web_filled)} " f"imputed={result.imputed_cells} skipped={result.skipped_cells}") print(f"[enrichment] Enriched CSV : {result.enriched_csv_path}") print(f"[enrichment] Scratchpad : {scratch_path}") return result def _write_output( src: Path, rows: list[dict[str, str]], fieldnames: list[str], result: EnrichmentResult, ) -> None: out_dir = src.parent / "enriched" out_dir.mkdir(exist_ok=True) out_path = out_dir / f"enriched_{src.name}" with open(out_path, "w", encoding="utf-8-sig", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(rows) result.enriched_csv_path = str(out_path) # Backward-compat alias (old name used in streamlit_app and run_enrichment_pipeline) enrich_csv_with_firecrawl_and_kimi = enrich_csv # ── Single metric lookup (for Streamlit UI) ────────────────────────────────── def lookup_fund_metric_value( fund_name: str, column_name: str, scheme_code: str = "", config: EnrichmentConfig | None = None, ) -> dict[str, Any]: _load_env() if config is None: config = EnrichmentConfig() firecrawl_key = os.environ.get("TAVILY_API_KEY", "") if not firecrawl_key: return {"status": "error", "message": "TAVILY_API_KEY not set"} found, sources = _search_fund_metrics(fund_name, [column_name], config, firecrawl_key) val = found.get(column_name) if val is not None: return {"status": "found", "fund": fund_name, "metric": column_name, "value": val, "sources": sources} return {"status": "not_found", "fund": fund_name, "metric": column_name, "value": None, "sources": sources}