Spaces:
Running
Running
| """CSV Enrichment β missing-cell filler for mutual fund statistics. | |
| Fill pipeline (in order): | |
| 0. SCHEME CODE RESOLUTION β fuzzy-match missing scheme codes via mfapi | |
| 1. TRIAGE β classify every missing cell | |
| 2. NAV ENGINE β compute trailing-3Y metrics from NAV history | |
| 3. WEB SEARCH (Firecrawl) β scrape trusted sites for remaining gaps | |
| 4. CATEGORY MEDIAN β last-resort statistical imputation | |
| Fixes vs original: | |
| β’ Phase-label typo in log (Phase 4 β Phase 5 for imputation step) | |
| β’ Unknown launch date β is_young = False (attempt search, don't silently impute) | |
| β’ _normalize_fund_name uses re.sub to handle multi-hyphen sequences | |
| β’ scheme code resolution runs BEFORE triage so NAV engine fires for more funds | |
| β’ Standard Deviation now included in NAV-computable metrics | |
| """ | |
| from __future__ import annotations | |
| import csv | |
| import os | |
| import re | |
| import threading | |
| import time | |
| from dataclasses import dataclass, field | |
| from datetime import datetime | |
| from pathlib import Path | |
| from statistics import median | |
| from typing import Any | |
| import requests | |
| from src.nav_metrics_engine import NavEngineCache, compute_nav_metrics_for_scheme | |
| from src.scheme_resolver import ( | |
| resolve_missing_scheme_codes, | |
| resolve_scheme_code_for_fund_name, | |
| ) | |
| # Fund names attempted by NAV engine this session | |
| _NAV_ATTEMPTED_FUNDS: set[str] = set() | |
| # ββ Tavily global rate-limit state βββββββββββββββββββββββββββββββββββββββββββ | |
| # Single lock: only one Tavily call dispatched at a time across all workers, | |
| # preventing the burst that triggers 429/432. | |
| _TAVILY_LOCK = threading.Lock() | |
| _TAVILY_LAST_CALL_TS = 0.0 # monotonic seconds of last successful dispatch | |
| _TAVILY_BLOCKED_UNTIL = 0.0 # monotonic seconds until allowed to call again | |
| # Per-run abort flag: new enrich_csv() stamps a new run-id; background threads | |
| # from stale Streamlit reruns check this and exit early. | |
| _CURRENT_RUN_ID: list[int] = [0] | |
| # ββ Constants ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| MISSING_TOKENS = {"", "-", "na", "n/a", "n/a*", "nan", "none", "null", "β"} | |
| TARGET_COLUMNS = ( | |
| "Alpha", | |
| "Beta", | |
| "Standard Deviation", | |
| "Sharpe Ratio", | |
| "Volatility", | |
| "Mean", | |
| "Sortino Ratio", | |
| "Up Market Capture\nRatio", | |
| "Down Market Capture\nRatio", | |
| "Maximum Drawdown", | |
| "R-Squared", | |
| "Information Ratio", | |
| "P/E Ratio", | |
| "P/B Ratio", | |
| ) | |
| # For all of these risk/ratio metrics, a literal numeric 0 is usually a | |
| # data-quality gap rather than a meaningful "zero risk" value. We therefore | |
| # treat 0 as missing so that enrichment (NAV engine + web search) can attempt | |
| # to backfill real numbers. | |
| ZERO_AS_MISSING_COLUMNS = set(TARGET_COLUMNS) | |
| # ALL metrics that are equity-specific and should NOT be attempted | |
| # via NAV engine or web search for debt/liquid/overnight funds. | |
| # Sharpe, Sortino, Volatility etc. ARE computed by NAV engine for equity | |
| # but for debt funds they either don't exist or are meaningless. | |
| EQUITY_ONLY_METRICS = { | |
| "Alpha", | |
| "Beta", | |
| "Standard Deviation", | |
| "Sharpe Ratio", | |
| "Volatility", | |
| "Mean", | |
| "Sortino Ratio", | |
| "Up Market Capture\nRatio", | |
| "Down Market Capture\nRatio", | |
| "Maximum Drawdown", | |
| "R-Squared", | |
| "Information Ratio", | |
| "P/E Ratio", | |
| "P/B Ratio", | |
| } | |
| DEBT_CATEGORIES_PREFIXES = ( | |
| "Debt:", "Liquid", "Overnight", "Money Market", "Gilt", | |
| "Fixed Maturity", "Interval Fund", "FMP", | |
| ) | |
| MIN_YEARS_FOR_RISK_METRICS = 3 | |
| TRUSTED_DOMAINS = ( | |
| "valueresearchonline.com", | |
| "morningstar.in", | |
| "moneycontrol.com", | |
| "advisorkhoj.com", | |
| "amfiindia.com", | |
| "tickertape.in", | |
| ) | |
| METRIC_ALIASES: dict[str, list[str]] = { | |
| "Alpha": ["alpha"], | |
| "Beta": ["beta"], | |
| "Standard Deviation": ["standard deviation", "std dev", "std. dev"], | |
| "Sharpe Ratio": ["sharpe ratio", "sharpe"], | |
| "Volatility": ["volatility"], | |
| "Mean": ["mean", "mean return"], | |
| "Sortino Ratio": ["sortino ratio", "sortino"], | |
| "Up Market Capture\nRatio": ["upside", "up market capture", "upside capture", "up capture"], | |
| "Down Market Capture\nRatio": ["downside", "down market capture", "downside capture", "down capture"], | |
| "Maximum Drawdown": ["maximum drawdown", "max drawdown", "maximum"], | |
| "R-Squared": ["r-squared", "r squared", "r2", "rΒ²"], | |
| "Information Ratio": ["information ratio", "info ratio"], | |
| "P/E Ratio": ["p/e ratio", "p/e", "pe ratio", "pe"], | |
| "P/B Ratio": ["p/b ratio", "p/b", "pb ratio", "pb"], | |
| } | |
| # ββ Config & Result ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class EnrichmentConfig: | |
| enabled: bool = True | |
| max_cells: int | None = None | |
| min_confidence: float = 0.65 | |
| search_limit: int = 5 | |
| impute_unresolved: bool = True | |
| filter_category: str | None = None | |
| target_columns: tuple[str, ...] = TARGET_COLUMNS | |
| trusted_domains: tuple[str, ...] = TRUSTED_DOMAINS | |
| enable_nav_engine: bool = True | |
| resolve_scheme_codes: bool = True # run pre-triage code resolution | |
| web_search_pe_pb_only: bool = False # limit web search to P/E and P/B only | |
| web_workers: int = 2 # parallel Tavily workers (keep low!) | |
| tavily_retries: int = 4 # retries on 429/432 | |
| tavily_backoff_seconds: float = 15.0 # backoff multiplier per retry | |
| tavily_min_interval_seconds: float = 4.0 # minimum seconds between Tavily calls | |
| class EnrichmentResult: | |
| input_csv_path: str | |
| enriched_csv_path: str | |
| scratchpad_path: str | None = None | |
| examined_cells: int = 0 | |
| updated_cells: int = 0 | |
| imputed_cells: int = 0 | |
| skipped_cells: int = 0 | |
| resolved_codes: int = 0 # NEW: how many scheme codes were resolved | |
| # Optional breakdowns used by older callers / UIs | |
| nav_cells: int = 0 # cells filled via NAV engine | |
| web_cells: int = 0 # cells filled via web search | |
| errors: list[str] = field(default_factory=list) | |
| # ββ Triage labels ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| TRIAGE_YOUNG = "YOUNG_FUND" | |
| TRIAGE_NOT_APPLICABLE = "NOT_APPLICABLE" | |
| TRIAGE_SEARCHABLE = "SEARCHABLE" | |
| # ββ Helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _load_env() -> None: | |
| env_path = Path(__file__).resolve().parent.parent / ".env" | |
| if not env_path.exists(): | |
| return | |
| for line in env_path.read_text(encoding="utf-8").splitlines(): | |
| raw = line.strip() | |
| if not raw or raw.startswith("#") or "=" not in raw: | |
| continue | |
| k, v = raw.split("=", 1) | |
| os.environ.setdefault(k.strip(), v.strip()) | |
| def _is_missing(val: str | None) -> bool: | |
| return (val or "").strip().lower() in MISSING_TOKENS | |
| def _parse_launch_date(val: str | None) -> datetime | None: | |
| if not val: | |
| return None | |
| for fmt in ("%d-%m-%Y", "%Y-%m-%d", "%d/%m/%Y"): | |
| try: | |
| return datetime.strptime(val.strip(), fmt) | |
| except ValueError: | |
| continue | |
| return None | |
| def _is_debt_category(category: str) -> bool: | |
| cat = (category or "").strip().lower() | |
| for prefix in DEBT_CATEGORIES_PREFIXES: | |
| if cat.startswith(prefix.lower()): | |
| return True | |
| return False | |
| def _normalize_fund_name(name: str) -> str: | |
| # FIX: use re.sub so multi-hyphen runs collapse to a single space | |
| return re.sub(r"-+", " ", name).strip() | |
| def _looks_like_category_label(fund_name: str, category: str) -> bool: | |
| """Return True for header/category rows that aren't real fund entries.""" | |
| fund = (fund_name or "").strip().lower() | |
| cat = (category or "").strip().lower() | |
| if not fund: | |
| return True | |
| if cat and fund == cat: | |
| return True | |
| # Patterns like "Equity: Contra" or "Debt: Short Duration" β no hyphen, no "fund" | |
| if ":" in fund and "-" not in fund and not any( | |
| tok in fund for tok in (" fund", " plan", " option", " scheme") | |
| ): | |
| return True | |
| return False | |
| def _build_category_medians( | |
| rows: list[dict[str, str]], columns: tuple[str, ...] | |
| ) -> dict[str, dict[str, float]]: | |
| """Returns {category: {column: median_value}}.""" | |
| buckets: dict[str, dict[str, list[float]]] = {} | |
| for row in rows: | |
| cat = row.get("Category", "") | |
| if not cat: | |
| continue | |
| if cat not in buckets: | |
| buckets[cat] = {c: [] for c in columns} | |
| for col in columns: | |
| raw = (row.get(col) or "").strip().replace("%", "").replace(",", "") | |
| if raw.lower() in MISSING_TOKENS: | |
| continue | |
| try: | |
| buckets[cat][col].append(float(raw)) | |
| except ValueError: | |
| pass | |
| result: dict[str, dict[str, float]] = {} | |
| for cat, col_map in buckets.items(): | |
| result[cat] = {} | |
| for col, vals in col_map.items(): | |
| if vals: | |
| result[cat][col] = round(median(vals), 4) | |
| return result | |
| # ββ Triage βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TriagedCell: | |
| row_idx: int | |
| fund_name: str | |
| category: str | |
| column: str | |
| current_value: str | |
| label: str | |
| reason: str | |
| def _triage_missing_cells( | |
| rows: list[dict[str, str]], | |
| config: EnrichmentConfig, | |
| ) -> list[TriagedCell]: | |
| """Classify every missing cell with reasoning.""" | |
| today = datetime.now() | |
| cells: list[TriagedCell] = [] | |
| for idx, row in enumerate(rows): | |
| fund = row.get("Fund", "") | |
| cat = row.get("Category", "") | |
| launch_str = row.get("Launch Date", "") | |
| if config.filter_category and cat != config.filter_category: | |
| continue | |
| launch_dt = _parse_launch_date(launch_str) | |
| if launch_dt is not None: | |
| age_years = (today - launch_dt).days / 365.25 | |
| is_young = age_years < MIN_YEARS_FOR_RISK_METRICS | |
| else: | |
| # FIX: unknown date β do NOT silently mark as young; attempt search | |
| is_young = False | |
| is_debt = _is_debt_category(cat) | |
| for col in config.target_columns: | |
| raw = (row.get(col) or "").strip() | |
| # Base missing check (blank, "-", "N/A", etc.) | |
| is_missing_val = _is_missing(raw) | |
| # Additionally, for all ZERO_AS_MISSING_COLUMNS, treat an exact | |
| # numeric 0 as "missing" so enrichment will try to fill it. | |
| if not is_missing_val and col in ZERO_AS_MISSING_COLUMNS: | |
| norm = raw.replace("%", "").replace(",", "").strip() | |
| try: | |
| if float(norm) == 0.0: | |
| is_missing_val = True | |
| except ValueError: | |
| pass | |
| if not is_missing_val: | |
| continue | |
| # PE/PB are point-in-time portfolio metrics β fund age is irrelevant. | |
| # Always attempt to fetch them regardless of how young the fund is. | |
| age_exempt = col in ("P/E Ratio", "P/B Ratio") | |
| if is_young and not age_exempt: | |
| cells.append(TriagedCell( | |
| row_idx=idx, fund_name=fund, category=cat, column=col, | |
| current_value=raw, label=TRIAGE_YOUNG, | |
| reason=(f"Fund launched {launch_str or '(unknown)'}, " | |
| f"<{MIN_YEARS_FOR_RISK_METRICS}yr history β metric not computed yet"), | |
| )) | |
| else: | |
| cells.append(TriagedCell( | |
| row_idx=idx, fund_name=fund, category=cat, column=col, | |
| current_value=raw, label=TRIAGE_SEARCHABLE, | |
| reason=(f"Fund launched {launch_str or '(unknown date)'}, " | |
| f"category '{cat}' β metric should exist, attempting NAV/web"), | |
| )) | |
| return cells | |
| # ββ Markdown table parser ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _extract_number(text: str) -> float | None: | |
| text = text.strip().replace(",", "") | |
| if text.lower() in MISSING_TOKENS or text == "β": | |
| return None | |
| match = re.search(r"-?\d+\.?\d*", text) | |
| if match: | |
| try: | |
| return float(match.group()) | |
| except ValueError: | |
| return None | |
| return None | |
| def _parse_table_row(markdown: str, alias: str) -> float | None: | |
| """Extract the first numeric cell after the label in a markdown table row. | |
| For a row like: | Alpha | 1.59 | -0.56 | 8.25 | | |
| Returns 1.59 β the fund's own column (leftmost numeric value after the label). | |
| This is intentional: sites like Morningstar show Fund | Category | Index | |
| and we want the fund value, not the category or index value. | |
| """ | |
| pattern = re.compile( | |
| r"\|\s*" + re.escape(alias) + r"\s*\|(.+?)(?:\n|$)", | |
| re.IGNORECASE, | |
| ) | |
| for m in pattern.finditer(markdown): | |
| rest = m.group(1) | |
| cells = [c.strip() for c in rest.split("|")] | |
| for cell in cells: | |
| val = _extract_number(cell) | |
| if val is not None: | |
| return val | |
| return None | |
| def _parse_metrics_from_markdown( | |
| markdown: str, wanted_metrics: list[str] | |
| ) -> dict[str, float | None]: | |
| found: dict[str, float | None] = {} | |
| for metric in wanted_metrics: | |
| aliases = METRIC_ALIASES.get(metric, [metric.lower()]) | |
| best_val: float | None = None | |
| for alias in aliases: | |
| if alias.lower() not in markdown.lower(): | |
| continue | |
| val = _parse_table_row(markdown, alias) | |
| if val is not None: | |
| best_val = val | |
| break | |
| found[metric] = best_val | |
| return found | |
| # ββ Web search (Firecrawl) βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _post_tavily( | |
| endpoint: str, | |
| payload: dict, | |
| *, | |
| retries: int, | |
| backoff: float, | |
| min_interval: float, | |
| ) -> "requests.Response | None": | |
| """Single-threaded Tavily dispatcher with shared cooldown and retry logic.""" | |
| global _TAVILY_LAST_CALL_TS, _TAVILY_BLOCKED_UNTIL | |
| url = f"https://api.tavily.com/{endpoint}" | |
| for attempt in range(retries + 1): | |
| with _TAVILY_LOCK: | |
| now = time.monotonic() | |
| wait = max( | |
| (_TAVILY_LAST_CALL_TS + min_interval) - now, | |
| _TAVILY_BLOCKED_UNTIL - now, | |
| 0.0, | |
| ) | |
| if wait > 0: | |
| time.sleep(wait) | |
| try: | |
| resp = requests.post( | |
| url, | |
| headers={"Content-Type": "application/json"}, | |
| json=payload, | |
| timeout=30, | |
| ) | |
| _TAVILY_LAST_CALL_TS = time.monotonic() | |
| except Exception as exc: | |
| print(f" [tavily] {endpoint} connection error: {exc}") | |
| return None | |
| if resp.status_code in {429, 432}: | |
| wait_retry = backoff * (attempt + 1) | |
| print(f" [tavily] {endpoint} rate-limited ({resp.status_code}); " | |
| f"waiting {wait_retry:.1f}s before retry") | |
| with _TAVILY_LOCK: | |
| _TAVILY_BLOCKED_UNTIL = max( | |
| _TAVILY_BLOCKED_UNTIL, time.monotonic() + wait_retry | |
| ) | |
| if attempt < retries: | |
| continue | |
| return None | |
| try: | |
| resp.raise_for_status() | |
| except Exception as exc: | |
| print(f" [tavily] {endpoint} HTTP error: {exc}") | |
| return None | |
| return resp | |
| return None | |
| def _call_tavily_search( | |
| query: str, | |
| api_key: str, | |
| limit: int = 5, | |
| *, | |
| retries: int = 4, | |
| backoff_seconds: float = 15.0, | |
| min_interval_seconds: float = 4.0, | |
| ) -> list[dict]: | |
| """Search using Tavily API. Returns list of dicts with 'url' and 'markdown' keys.""" | |
| resp = _post_tavily( | |
| "search", | |
| {"api_key": api_key, "query": query, "max_results": limit, | |
| "include_raw_content": True, "search_depth": "advanced"}, | |
| retries=retries, backoff=backoff_seconds, min_interval=min_interval_seconds, | |
| ) | |
| if resp is None: | |
| return [] | |
| results = [] | |
| for r in resp.json().get("results", []): | |
| results.append({ | |
| "url": r.get("url", ""), | |
| "markdown": r.get("raw_content") or r.get("content", ""), | |
| }) | |
| return results | |
| # Keep firecrawl as alias name so _search_fund_metrics calls work unchanged | |
| _call_firecrawl_search = _call_tavily_search | |
| def _scrape_url( | |
| url: str, | |
| api_key: str, | |
| *, | |
| retries: int = 4, | |
| backoff_seconds: float = 15.0, | |
| min_interval_seconds: float = 4.0, | |
| ) -> str: | |
| """Fetch page content using Tavily extract API.""" | |
| resp = _post_tavily( | |
| "extract", | |
| {"api_key": api_key, "urls": [url]}, | |
| retries=retries, backoff=backoff_seconds, min_interval=min_interval_seconds, | |
| ) | |
| if resp is None: | |
| return "" | |
| results = resp.json().get("results", []) | |
| return results[0].get("raw_content", "") if results else "" | |
| def _derive_morningstar_risk_url(any_ms_url: str) -> str | None: | |
| if "morningstar.in/mutualfunds/" not in any_ms_url: | |
| return None | |
| for suffix in ("fund-factsheet.aspx", "overview.aspx", "portfolio.aspx", | |
| "performance.aspx", "detailed-portfolio.aspx"): | |
| if suffix in any_ms_url: | |
| return any_ms_url.replace(suffix, "risk-ratings.aspx") | |
| if "risk-ratings.aspx" in any_ms_url: | |
| return any_ms_url | |
| return None | |
| def _derive_morningstar_portfolio_url(any_ms_url: str) -> str | None: | |
| """Derive the Morningstar portfolio page (for P/E and P/B).""" | |
| if "morningstar.in/mutualfunds/" not in any_ms_url: | |
| return None | |
| return re.sub( | |
| r"(fund-factsheet|overview|risk-ratings|performance|detailed-portfolio)\.aspx", | |
| "portfolio.aspx", | |
| any_ms_url, | |
| ) | |
| def _search_fund_metrics( | |
| fund_name: str, | |
| missing_metrics: list[str], | |
| config: EnrichmentConfig, | |
| firecrawl_key: str, | |
| ) -> tuple[dict[str, float | None], list[str]]: | |
| from urllib.parse import urlparse | |
| readable = _normalize_fund_name(fund_name) | |
| query = f"{readable} risk rating alpha beta sharpe morningstar" | |
| print(f" [search] query: {query[:80]}") | |
| results = _call_firecrawl_search( | |
| query, firecrawl_key, config.search_limit, | |
| retries=config.tavily_retries, | |
| backoff_seconds=config.tavily_backoff_seconds, | |
| min_interval_seconds=config.tavily_min_interval_seconds, | |
| ) | |
| if not results: | |
| print(" [search] no results") | |
| return {m: None for m in missing_metrics}, [] | |
| trusted, other = [], [] | |
| for r in results: | |
| url = r.get("url", "") | |
| domain = urlparse(url).netloc.lower().replace("www.", "") | |
| (trusted if any(td in domain for td in config.trusted_domains) else other).append(r) | |
| use = (trusted + other)[:3] | |
| source_urls = [r.get("url", "") for r in use] | |
| print(f" [search] using {len(use)} sources: {[urlparse(u).netloc for u in source_urls]}") | |
| combined = "" | |
| for r in use: | |
| md = r.get("markdown", "") | |
| if md: | |
| combined += f"\n\n--- {r.get('url', '')} ---\n{md}" | |
| # Morningstar: scrape risk-ratings page if not already in results | |
| ms_risk_url = None | |
| for r in use: | |
| ms_risk_url = _derive_morningstar_risk_url(r.get("url", "")) | |
| if ms_risk_url: | |
| break | |
| if ms_risk_url and "risk-ratings" not in " ".join(source_urls): | |
| print(f" [scrape] Morningstar risk page: {ms_risk_url}") | |
| risk_md = _scrape_url(ms_risk_url, firecrawl_key, | |
| retries=config.tavily_retries, | |
| backoff_seconds=config.tavily_backoff_seconds, | |
| min_interval_seconds=config.tavily_min_interval_seconds, | |
| ) | |
| if risk_md: | |
| combined += f"\n\n--- {ms_risk_url} ---\n{risk_md}" | |
| source_urls.append(ms_risk_url) | |
| # Morningstar: scrape portfolio page for P/E and P/B | |
| pe_pb_needed = {"P/E Ratio", "P/B Ratio"} & set(missing_metrics) | |
| if pe_pb_needed and ms_risk_url: | |
| ms_port_url = _derive_morningstar_portfolio_url(ms_risk_url) | |
| if ms_port_url and ms_port_url not in source_urls: | |
| print(f" [scrape] Morningstar portfolio page: {ms_port_url}") | |
| port_md = _scrape_url(ms_port_url, firecrawl_key, | |
| retries=config.tavily_retries, | |
| backoff_seconds=config.tavily_backoff_seconds, | |
| min_interval_seconds=config.tavily_min_interval_seconds, | |
| ) | |
| if port_md: | |
| combined += f"\n\n--- {ms_port_url} ---\n{port_md}" | |
| source_urls.append(ms_port_url) | |
| # If we still have no markdown content, or if later we still miss | |
| # metrics, we'll do a second pass focused on ValueResearch. | |
| if not combined.strip(): | |
| print(" [search] no markdown from initial sources; retrying via valueresearchonlineβ¦") | |
| vr_query = f"{readable} {' '.join(missing_metrics)} valueresearchonline" | |
| vr_results = _call_firecrawl_search( | |
| vr_query, firecrawl_key, config.search_limit, | |
| retries=config.tavily_retries, | |
| backoff_seconds=config.tavily_backoff_seconds, | |
| min_interval_seconds=config.tavily_min_interval_seconds, | |
| ) | |
| if vr_results: | |
| vr_combined = "" | |
| for r in vr_results: | |
| url = r.get("url", "") | |
| domain = urlparse(url).netloc.lower().replace("www.", "") | |
| if "valueresearchonline.com" not in domain: | |
| continue | |
| md = r.get("markdown", "") | |
| if md: | |
| vr_combined += f"\n\n--- {url} ---\n{md}" | |
| source_urls.append(url) | |
| combined = vr_combined | |
| if not combined.strip(): | |
| print(" [search] no markdown content after ValueResearch retry") | |
| return {m: None for m in missing_metrics}, source_urls | |
| found = _parse_metrics_from_markdown(combined, missing_metrics) | |
| for m, v in found.items(): | |
| print(f" [parsed] {m} = {v if v is not None else 'NOT FOUND'}") | |
| return found, source_urls | |
| # ββ Scratchpad βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _write_scratchpad( | |
| path: Path, | |
| triaged: list[TriagedCell], | |
| resolved_codes: dict[str, str], | |
| nav_results: dict[str, dict[str, float | None]], | |
| web_results: dict[str, dict[str, float | None]], | |
| web_sources: dict[str, list[str]], | |
| medians_used: list[tuple[str, str, float]], | |
| nav_filled: list[tuple[str, str, float]], | |
| web_filled: list[tuple[str, str, float]], | |
| ) -> None: | |
| lines = [ | |
| "=" * 70, | |
| "ENRICHMENT SCRATCHPAD", | |
| f"Generated: {datetime.now().isoformat()}", | |
| "=" * 70, "", | |
| ] | |
| if resolved_codes: | |
| lines += ["-" * 70, f"SCHEME CODES RESOLVED ({len(resolved_codes)})", "-" * 70] | |
| for fund, code in resolved_codes.items(): | |
| lines.append(f" {fund[:60]:60s} β {code}") | |
| lines.append("") | |
| young = [c for c in triaged if c.label == TRIAGE_YOUNG] | |
| na = [c for c in triaged if c.label == TRIAGE_NOT_APPLICABLE] | |
| searchable = [c for c in triaged if c.label == TRIAGE_SEARCHABLE] | |
| lines += [ | |
| f"TOTAL MISSING CELLS: {len(triaged)}", | |
| f" YOUNG_FUND (auto-impute): {len(young)}", | |
| f" NOT_APPLICABLE (auto-impute): {len(na)}", | |
| f" SEARCHABLE (nav/web): {len(searchable)}", | |
| "", | |
| "-" * 70, "TRIAGE DECISIONS", "-" * 70, | |
| ] | |
| for c in triaged: | |
| lines.append(f" [{c.label:16s}] {c.fund_name} :: {c.column}") | |
| lines.append(f" Reason: {c.reason}") | |
| lines.append("") | |
| if nav_results: | |
| lines += ["-" * 70, "NAV ENGINE RESULTS (TRAILING 3Y)", "-" * 70] | |
| for fund_key, metrics in nav_results.items(): | |
| lines.append(f" Fund: {fund_key}") | |
| for metric, val in metrics.items(): | |
| lines.append(f" {metric}: {'FOUND = ' + str(val) if val is not None else 'NOT_FOUND'}") | |
| lines.append("") | |
| if web_results: | |
| lines += ["-" * 70, "WEB SEARCH RESULTS", "-" * 70] | |
| for fund_key, metrics in web_results.items(): | |
| lines.append(f" Fund: {fund_key}") | |
| for s in web_sources.get(fund_key, []): | |
| lines.append(f" Source: {s}") | |
| for metric, val in metrics.items(): | |
| lines.append(f" {metric}: {'FOUND = ' + str(val) if val is not None else 'NOT_FOUND'}") | |
| lines.append("") | |
| for section_label, items in [ | |
| (f"NAV-FILLED VALUES ({len(nav_filled)})", nav_filled), | |
| (f"WEB-FILLED VALUES ({len(web_filled)})", web_filled), | |
| (f"CATEGORY-MEDIAN IMPUTED ({len(medians_used)})", medians_used), | |
| ]: | |
| if items: | |
| lines += ["-" * 70, section_label, "-" * 70] | |
| for fund, col, val in items: | |
| lines.append(f" {fund} :: {col} = {val}") | |
| lines.append("") | |
| lines += ["=" * 70, "END OF SCRATCHPAD", "=" * 70] | |
| path.write_text("\n".join(lines), encoding="utf-8") | |
| # ββ Main entry point βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def enrich_csv( | |
| csv_path: str, | |
| config: EnrichmentConfig | None = None, | |
| ) -> EnrichmentResult: | |
| """Parse CSV β resolve codes β triage β NAV engine β web fallback β median impute β write. | |
| (Previously named enrich_csv_with_firecrawl_and_kimi; renamed for clarity.) | |
| """ | |
| if config is None: | |
| config = EnrichmentConfig() | |
| _load_env() | |
| src = Path(csv_path) | |
| result = EnrichmentResult(input_csv_path=csv_path, enriched_csv_path=csv_path) | |
| if not config.enabled or not src.exists(): | |
| return result | |
| with open(src, encoding="utf-8-sig", newline="") as f: | |
| reader = csv.DictReader(f) | |
| fieldnames = list(reader.fieldnames or []) | |
| rows = list(reader) | |
| if not rows: | |
| return result | |
| # ββ Phase 0: Scheme Code Resolution βββββββββββββββββββββββββββββββββ | |
| resolved_codes: dict[str, str] = {} | |
| if config.resolve_scheme_codes: | |
| print("[enrichment] Phase 0: Resolving missing scheme codesβ¦") | |
| rows, resolved_codes = resolve_missing_scheme_codes(rows, verbose=True) | |
| result.resolved_codes = len(resolved_codes) | |
| # ββ Phase 1: Triage ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("[enrichment] Phase 1: Triage β classifying missing cellsβ¦") | |
| triaged = _triage_missing_cells(rows, config) | |
| result.examined_cells = len(triaged) | |
| if not triaged: | |
| print("[enrichment] No missing cells found.") | |
| _write_output(src, rows, fieldnames, result) | |
| return result | |
| searchable = [c for c in triaged if c.label == TRIAGE_SEARCHABLE] | |
| imputable = [c for c in triaged if c.label != TRIAGE_SEARCHABLE] | |
| print(f"[enrichment] {len(triaged)} missing cells: " | |
| f"{len(searchable)} SEARCHABLE, {len(imputable)} auto-impute") | |
| if config.max_cells is not None: | |
| searchable = searchable[:config.max_cells] | |
| # ββ Phase 2: Category medians ββββββββββββββββββββββββββββββββββββββββ | |
| print("[enrichment] Phase 2: Computing category mediansβ¦") | |
| cat_medians = _build_category_medians(rows, config.target_columns) | |
| # ββ Phase 3: NAV engine ββββββββββββββββββββββββββββββββββββββββββββββ | |
| nav_results: dict[str, dict[str, float | None]] = {} | |
| nav_filled: list[tuple[str, str, float]] = [] | |
| if searchable and config.enable_nav_engine: | |
| print("[enrichment] Phase 3: NAV engine β computing trailing 3Y metricsβ¦") | |
| nav_cache = NavEngineCache() | |
| # All funds with missing cells go through NAV engine β including debt/liquid. | |
| # Debt funds can have valid Sharpe, Mean, Volatility etc. from their NAV history. | |
| searchable_for_nav = searchable | |
| row_groups: dict[int, list[TriagedCell]] = {} | |
| for cell in searchable_for_nav: | |
| row_groups.setdefault(cell.row_idx, []).append(cell) | |
| total_rows = len(row_groups) | |
| processed_count = 0 | |
| nav_lock = __import__("threading").Lock() | |
| NAV_WORKERS = 20 # mfapi is stateless REST β scales well beyond 12 | |
| # ββ Pre-warm: bulk load NAV + benchmarks before workers touch network ββ | |
| # Step 1: Pull all valid scheme codes and unique benchmarks from rows | |
| from src.nav_metrics_engine import _bulk_preload_cache, _prewarm_benchmarks | |
| _scheme_codes = [ | |
| (rows[ri].get("Scheme Code") or "").strip() | |
| for ri in row_groups | |
| if (rows[ri].get("Scheme Code") or "").strip().isdigit() | |
| ] | |
| _bench_tickers_raw = [ | |
| rows[ri].get("Benchmark Type", "") for ri in row_groups | |
| ] | |
| # Step 2: Resolve benchmark type β ticker (same logic as nav engine) | |
| from src.nav_metrics_engine import resolve_benchmark_ticker | |
| _bench_tickers = list(dict.fromkeys( | |
| resolve_benchmark_ticker(b) for b in _bench_tickers_raw if b | |
| )) | |
| # Step 3: Bulk load from Neon in 1 SQL query (nav + bench keys) | |
| _bulk_preload_cache(_scheme_codes, _bench_tickers) | |
| # Step 4: Download any cold benchmark tickers in parallel NOW, | |
| # before workers start β eliminates yfinance contention | |
| _prewarm_benchmarks(_bench_tickers) | |
| def _process_one_fund(args): | |
| row_idx, cells = args | |
| row = rows[row_idx] | |
| fund_name = row.get("Fund", "") | |
| scheme_code = (row.get("Scheme Code") or "").strip() | |
| benchmark_type = row.get("Benchmark Type", "") | |
| needed_metrics = [c.column for c in cells] | |
| if not scheme_code: | |
| return fund_name, {}, cells | |
| metrics, skip = compute_nav_metrics_for_scheme( | |
| scheme_code=scheme_code, | |
| benchmark_type=benchmark_type, | |
| needed_metrics=needed_metrics, | |
| cache=nav_cache, | |
| ) | |
| joined_reasons = " | ".join(skip.values()).lower() | |
| should_refresh_code = ( | |
| "returned no nav history" in joined_reasons | |
| or "nav history is stale" in joined_reasons | |
| ) | |
| if should_refresh_code: | |
| refreshed_code, _ = resolve_scheme_code_for_fund_name(fund_name) | |
| if refreshed_code and refreshed_code != scheme_code: | |
| row["Scheme Code"] = refreshed_code | |
| metrics, skip = compute_nav_metrics_for_scheme( | |
| scheme_code=refreshed_code, | |
| benchmark_type=benchmark_type, | |
| needed_metrics=needed_metrics, | |
| cache=nav_cache, | |
| ) | |
| return fund_name, metrics, cells | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| work_items = list(row_groups.items()) | |
| with ThreadPoolExecutor(max_workers=NAV_WORKERS) as executor: | |
| futures = {executor.submit(_process_one_fund, item): item for item in work_items} | |
| for fut in as_completed(futures): | |
| fund_name, metrics, cells = fut.result() | |
| with nav_lock: | |
| processed_count += 1 | |
| nav_results[fund_name] = metrics | |
| for cell in cells: | |
| val = metrics.get(cell.column) | |
| if val is not None: | |
| rows[cell.row_idx][cell.column] = str(round(float(val), 4)) | |
| result.updated_cells += 1 | |
| nav_filled.append((fund_name, cell.column, float(val))) | |
| # Only mark as attempted if MDD was actually filled β | |
| # drawdown_zero_fix should still retry funds where MDD came back None | |
| if metrics.get("Maximum Drawdown") is not None: | |
| _NAV_ATTEMPTED_FUNDS.add(fund_name) | |
| if processed_count % 20 == 0 or processed_count == total_rows: | |
| print(f" [nav] {processed_count}/{total_rows} funds processedβ¦") | |
| # Keep only still-missing searchable cells for web phase | |
| searchable = [c for c in searchable if _is_missing(rows[c.row_idx].get(c.column, ""))] | |
| print(f"[enrichment] NAV phase resolved {len(nav_filled)} cells; " | |
| f"{len(searchable)} remain for web search") | |
| result.nav_cells = len(nav_filled) | |
| # ββ Phase 4: Web search ββββββββββββββββββββββββββββββββββββββββββββββ | |
| web_results: dict[str, dict[str, float | None]] = {} | |
| web_sources: dict[str, list[str]] = {} | |
| web_filled: list[tuple[str, str, float]] = [] | |
| firecrawl_key = os.environ.get("TAVILY_API_KEY", "") | |
| # ββ Phase 4a: Screener.in PE/PB (no API key needed) ββββββββββββββββββββββ | |
| # Weighted average from holdings; results Neon-cached 30 days. | |
| pe_pb_searchable = [c for c in searchable | |
| if c.column in ("P/E Ratio", "P/B Ratio")] | |
| pe_pb_funds: dict[str, list[TriagedCell]] = {} | |
| for cell in pe_pb_searchable: | |
| pe_pb_funds.setdefault(cell.fund_name, []).append(cell) | |
| if pe_pb_funds: | |
| try: | |
| from src.pe_pb_engine import fetch_pe_pb as _fetch_pe_pb | |
| except ImportError: | |
| try: | |
| from pe_pb_engine import fetch_pe_pb as _fetch_pe_pb | |
| except ImportError: | |
| _fetch_pe_pb = None # type: ignore | |
| if _fetch_pe_pb is not None: | |
| print(f"[enrichment] Phase 4a: NSE index PE/PB β {len(pe_pb_funds)} funds") | |
| # Warm the index cache once before spawning threads | |
| _fetch_pe_pb.__module__ and __import__( | |
| _fetch_pe_pb.__module__.split(".")[0] | |
| ) # ensure module loaded | |
| try: | |
| from src.pe_pb_engine import warm_index_cache as _warm | |
| except ImportError: | |
| from pe_pb_engine import warm_index_cache as _warm | |
| _warm() | |
| pepb_lock = threading.Lock() | |
| pepb_done = [0] | |
| def _screener_one(args): | |
| fund_name, cells = args | |
| # Get benchmark_type from the row (not scheme_code) | |
| bm = "" | |
| for cell in cells: | |
| row = rows[cell.row_idx] | |
| if row: | |
| bm = str(row.get("Benchmark Type") or "").strip() | |
| if bm: | |
| break | |
| pe, pb = _fetch_pe_pb(benchmark_type=bm, fund_name=fund_name) if bm else (None, None) | |
| found: dict[str, float] = {} | |
| if pe is not None: | |
| found["P/E Ratio"] = pe | |
| if pb is not None: | |
| found["P/B Ratio"] = pb | |
| return fund_name, cells, found | |
| from concurrent.futures import ThreadPoolExecutor as _PePbTPE, as_completed as _pepb_done_iter | |
| with _PePbTPE(max_workers=5) as pepb_ex: | |
| pepb_futs = { | |
| pepb_ex.submit(_screener_one, item): item | |
| for item in pe_pb_funds.items() | |
| } | |
| for fut in _pepb_done_iter(pepb_futs): | |
| fund_name, cells, found = fut.result() | |
| with pepb_lock: | |
| pepb_done[0] += 1 | |
| web_results.setdefault(fund_name, {}).update(found) | |
| web_sources.setdefault(fund_name, []).append("nse-index-pe") | |
| for cell in cells: | |
| val = found.get(cell.column) | |
| if val is not None: | |
| rows[cell.row_idx][cell.column] = str(val) | |
| result.updated_cells += 1 | |
| web_filled.append((fund_name, cell.column, val)) | |
| print(f" [{pepb_done[0]}/{len(pe_pb_funds)}] " | |
| f"{fund_name[:45]} -> {cell.column} = {val} (screener)") | |
| # Re-filter: drop cells already filled by Screener | |
| searchable = [c for c in searchable if _is_missing(rows[c.row_idx].get(c.column, "") | |
| if rows[c.row_idx] else "")] | |
| # ββ Phase 4b: Tavily web search for remaining cells βββββββββββββββββββββββ | |
| if searchable and firecrawl_key: | |
| # Stamp run-id so stale background threads from prior runs can bail | |
| _CURRENT_RUN_ID[0] += 1 | |
| my_run_id = _CURRENT_RUN_ID[0] | |
| fund_groups: dict[str, list[TriagedCell]] = {} | |
| for cell in searchable: | |
| fund_groups.setdefault(cell.fund_name, []).append(cell) | |
| print(f"[enrichment] Phase 4b: Tavily web search β {len(searchable)} cells " | |
| f"across {len(fund_groups)} funds") | |
| web_search_groups: dict[str, list[TriagedCell]] = {} | |
| for fund_name, cells in fund_groups.items(): | |
| # Skip category header rows (e.g. "Equity: Contra") β not real funds | |
| category = cells[0].category if cells else "" | |
| if _looks_like_category_label(fund_name, category): | |
| for cell in cells: | |
| med = cat_medians.get(cell.category, {}).get(cell.column) | |
| if med is not None and config.impute_unresolved: | |
| rows[cell.row_idx][cell.column] = str(med) | |
| result.updated_cells += 1 | |
| result.imputed_cells += 1 | |
| else: | |
| result.skipped_cells += 1 | |
| continue | |
| if config.web_search_pe_pb_only: | |
| cells_to_impute = [c for c in cells if c.column not in ("P/E Ratio", "P/B Ratio")] | |
| for cell in cells_to_impute: | |
| med = cat_medians.get(cell.category, {}).get(cell.column) | |
| if med is not None and config.impute_unresolved: | |
| rows[cell.row_idx][cell.column] = str(med) | |
| result.updated_cells += 1 | |
| result.imputed_cells += 1 | |
| cells_for_web = [c for c in cells if c.column in ("P/E Ratio", "P/B Ratio")] | |
| else: | |
| cells_for_web = cells | |
| if cells_for_web: | |
| web_search_groups[fund_name] = cells_for_web | |
| WEB_WORKERS = max(1, config.web_workers) | |
| web_lock = threading.Lock() | |
| web_done = [0] | |
| total_web = len(web_search_groups) | |
| def _search_one_fund(args): | |
| fund_name, cells = args | |
| # Abort if a newer run has started (Streamlit rerun safety) | |
| if _CURRENT_RUN_ID[0] != my_run_id: | |
| return fund_name, cells, {}, [] | |
| missing_metrics = [c.column for c in cells] | |
| found, sources = _search_fund_metrics( | |
| fund_name, missing_metrics, config, firecrawl_key) | |
| return fund_name, cells, found, sources | |
| from concurrent.futures import ThreadPoolExecutor as _WebTPE, as_completed as _web_as_completed | |
| with _WebTPE(max_workers=WEB_WORKERS) as web_executor: | |
| futures = { | |
| web_executor.submit(_search_one_fund, item): item | |
| for item in web_search_groups.items() | |
| } | |
| for fut in _web_as_completed(futures): | |
| fund_name, cells, found, sources = fut.result() | |
| with web_lock: | |
| web_done[0] += 1 | |
| web_results.setdefault(fund_name, {}).update(found) | |
| web_sources.setdefault(fund_name, []).extend(sources) | |
| print(f"\n[{web_done[0]}/{total_web}] {fund_name}") | |
| for cell in cells: | |
| val = found.get(cell.column) | |
| if val is not None: | |
| rows[cell.row_idx][cell.column] = str(val) | |
| result.updated_cells += 1 | |
| web_filled.append((fund_name, cell.column, val)) | |
| print(f" -> {cell.column} = {val} (web)") | |
| else: | |
| med = cat_medians.get(cell.category, {}).get(cell.column) | |
| if med is not None and config.impute_unresolved: | |
| rows[cell.row_idx][cell.column] = str(med) | |
| result.imputed_cells += 1 | |
| print(f" ~> {cell.column} = {med} (median)") | |
| else: | |
| result.skipped_cells += 1 | |
| print(f" x> {cell.column} β not found, no median") | |
| elif searchable and not firecrawl_key: | |
| print("[enrichment] No TAVILY_API_KEY β PE/PB via Screener only; remaining β medians") | |
| for cell in searchable: | |
| med = cat_medians.get(cell.category, {}).get(cell.column) | |
| if med is not None and config.impute_unresolved: | |
| rows[cell.row_idx][cell.column] = str(med) | |
| result.imputed_cells += 1 | |
| else: | |
| result.skipped_cells += 1 | |
| # ββ Phase 5: Impute non-searchable (YOUNG / NOT_APPLICABLE) cells ββββ | |
| # FIX: was incorrectly labelled "Phase 4" in log | |
| medians_used: list[tuple[str, str, float]] = [] | |
| if imputable and config.impute_unresolved: | |
| print(f"\n[enrichment] Phase 5: Imputing {len(imputable)} non-searchable cellsβ¦") | |
| for cell in imputable: | |
| med = cat_medians.get(cell.category, {}).get(cell.column) | |
| if med is not None: | |
| rows[cell.row_idx][cell.column] = str(med) | |
| result.imputed_cells += 1 | |
| medians_used.append((cell.fund_name, cell.column, med)) | |
| else: | |
| result.skipped_cells += 1 | |
| elif imputable: | |
| result.skipped_cells += len(imputable) | |
| # Record how many cells came from web search | |
| result.web_cells = len(web_filled) | |
| # ββ Phase 6: Write enriched CSV ββββββββββββββββββββββββββββββββββββββ | |
| _write_output(src, rows, fieldnames, result) | |
| # ββ Phase 7: Scratchpad ββββββββββββββββββββββββββββββββββββββββββββββ | |
| scratch_dir = Path("scratchpad") | |
| scratch_dir.mkdir(exist_ok=True) | |
| stamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| scratch_path = scratch_dir / f"enrichment_{stamp}.txt" | |
| _write_scratchpad( | |
| scratch_path, triaged, resolved_codes, | |
| nav_results, web_results, web_sources, | |
| medians_used, nav_filled, web_filled, | |
| ) | |
| result.scratchpad_path = str(scratch_path) | |
| print(f"\n[enrichment] DONE β nav_filled={len(nav_filled)} web_filled={len(web_filled)} " | |
| f"imputed={result.imputed_cells} skipped={result.skipped_cells}") | |
| print(f"[enrichment] Enriched CSV : {result.enriched_csv_path}") | |
| print(f"[enrichment] Scratchpad : {scratch_path}") | |
| return result | |
| def _write_output( | |
| src: Path, | |
| rows: list[dict[str, str]], | |
| fieldnames: list[str], | |
| result: EnrichmentResult, | |
| ) -> None: | |
| out_dir = src.parent / "enriched" | |
| out_dir.mkdir(exist_ok=True) | |
| out_path = out_dir / f"enriched_{src.name}" | |
| with open(out_path, "w", encoding="utf-8-sig", newline="") as f: | |
| writer = csv.DictWriter(f, fieldnames=fieldnames) | |
| writer.writeheader() | |
| writer.writerows(rows) | |
| result.enriched_csv_path = str(out_path) | |
| # Backward-compat alias (old name used in streamlit_app and run_enrichment_pipeline) | |
| enrich_csv_with_firecrawl_and_kimi = enrich_csv | |
| # ββ Single metric lookup (for Streamlit UI) ββββββββββββββββββββββββββββββββββ | |
| def lookup_fund_metric_value( | |
| fund_name: str, | |
| column_name: str, | |
| scheme_code: str = "", | |
| config: EnrichmentConfig | None = None, | |
| ) -> dict[str, Any]: | |
| _load_env() | |
| if config is None: | |
| config = EnrichmentConfig() | |
| firecrawl_key = os.environ.get("TAVILY_API_KEY", "") | |
| if not firecrawl_key: | |
| return {"status": "error", "message": "TAVILY_API_KEY not set"} | |
| found, sources = _search_fund_metrics(fund_name, [column_name], config, firecrawl_key) | |
| val = found.get(column_name) | |
| if val is not None: | |
| return {"status": "found", "fund": fund_name, "metric": column_name, | |
| "value": val, "sources": sources} | |
| return {"status": "not_found", "fund": fund_name, "metric": column_name, | |
| "value": None, "sources": sources} |