MF / src /csv_enrichment.py
Parthiban97's picture
Upload 2 files
94dbd6f verified
"""CSV Enrichment β€” missing-cell filler for mutual fund statistics.
Fill pipeline (in order):
0. SCHEME CODE RESOLUTION β€” fuzzy-match missing scheme codes via mfapi
1. TRIAGE β€” classify every missing cell
2. NAV ENGINE β€” compute trailing-3Y metrics from NAV history
3. WEB SEARCH (Firecrawl) β€” scrape trusted sites for remaining gaps
4. CATEGORY MEDIAN β€” last-resort statistical imputation
Fixes vs original:
β€’ Phase-label typo in log (Phase 4 β†’ Phase 5 for imputation step)
β€’ Unknown launch date β†’ is_young = False (attempt search, don't silently impute)
β€’ _normalize_fund_name uses re.sub to handle multi-hyphen sequences
β€’ scheme code resolution runs BEFORE triage so NAV engine fires for more funds
β€’ Standard Deviation now included in NAV-computable metrics
"""
from __future__ import annotations
import csv
import os
import re
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from statistics import median
from typing import Any
import requests
from src.nav_metrics_engine import NavEngineCache, compute_nav_metrics_for_scheme
from src.scheme_resolver import (
resolve_missing_scheme_codes,
resolve_scheme_code_for_fund_name,
)
# Fund names attempted by NAV engine this session
_NAV_ATTEMPTED_FUNDS: set[str] = set()
# ── Tavily global rate-limit state ───────────────────────────────────────────
# Single lock: only one Tavily call dispatched at a time across all workers,
# preventing the burst that triggers 429/432.
_TAVILY_LOCK = threading.Lock()
_TAVILY_LAST_CALL_TS = 0.0 # monotonic seconds of last successful dispatch
_TAVILY_BLOCKED_UNTIL = 0.0 # monotonic seconds until allowed to call again
# Per-run abort flag: new enrich_csv() stamps a new run-id; background threads
# from stale Streamlit reruns check this and exit early.
_CURRENT_RUN_ID: list[int] = [0]
# ── Constants ────────────────────────────────────────────────────────────────
MISSING_TOKENS = {"", "-", "na", "n/a", "n/a*", "nan", "none", "null", "β€”"}
TARGET_COLUMNS = (
"Alpha",
"Beta",
"Standard Deviation",
"Sharpe Ratio",
"Volatility",
"Mean",
"Sortino Ratio",
"Up Market Capture\nRatio",
"Down Market Capture\nRatio",
"Maximum Drawdown",
"R-Squared",
"Information Ratio",
"P/E Ratio",
"P/B Ratio",
)
# For all of these risk/ratio metrics, a literal numeric 0 is usually a
# data-quality gap rather than a meaningful "zero risk" value. We therefore
# treat 0 as missing so that enrichment (NAV engine + web search) can attempt
# to backfill real numbers.
ZERO_AS_MISSING_COLUMNS = set(TARGET_COLUMNS)
# ALL metrics that are equity-specific and should NOT be attempted
# via NAV engine or web search for debt/liquid/overnight funds.
# Sharpe, Sortino, Volatility etc. ARE computed by NAV engine for equity
# but for debt funds they either don't exist or are meaningless.
EQUITY_ONLY_METRICS = {
"Alpha",
"Beta",
"Standard Deviation",
"Sharpe Ratio",
"Volatility",
"Mean",
"Sortino Ratio",
"Up Market Capture\nRatio",
"Down Market Capture\nRatio",
"Maximum Drawdown",
"R-Squared",
"Information Ratio",
"P/E Ratio",
"P/B Ratio",
}
DEBT_CATEGORIES_PREFIXES = (
"Debt:", "Liquid", "Overnight", "Money Market", "Gilt",
"Fixed Maturity", "Interval Fund", "FMP",
)
MIN_YEARS_FOR_RISK_METRICS = 3
TRUSTED_DOMAINS = (
"valueresearchonline.com",
"morningstar.in",
"moneycontrol.com",
"advisorkhoj.com",
"amfiindia.com",
"tickertape.in",
)
METRIC_ALIASES: dict[str, list[str]] = {
"Alpha": ["alpha"],
"Beta": ["beta"],
"Standard Deviation": ["standard deviation", "std dev", "std. dev"],
"Sharpe Ratio": ["sharpe ratio", "sharpe"],
"Volatility": ["volatility"],
"Mean": ["mean", "mean return"],
"Sortino Ratio": ["sortino ratio", "sortino"],
"Up Market Capture\nRatio": ["upside", "up market capture", "upside capture", "up capture"],
"Down Market Capture\nRatio": ["downside", "down market capture", "downside capture", "down capture"],
"Maximum Drawdown": ["maximum drawdown", "max drawdown", "maximum"],
"R-Squared": ["r-squared", "r squared", "r2", "rΒ²"],
"Information Ratio": ["information ratio", "info ratio"],
"P/E Ratio": ["p/e ratio", "p/e", "pe ratio", "pe"],
"P/B Ratio": ["p/b ratio", "p/b", "pb ratio", "pb"],
}
# ── Config & Result ──────────────────────────────────────────────────────────
@dataclass
class EnrichmentConfig:
enabled: bool = True
max_cells: int | None = None
min_confidence: float = 0.65
search_limit: int = 5
impute_unresolved: bool = True
filter_category: str | None = None
target_columns: tuple[str, ...] = TARGET_COLUMNS
trusted_domains: tuple[str, ...] = TRUSTED_DOMAINS
enable_nav_engine: bool = True
resolve_scheme_codes: bool = True # run pre-triage code resolution
web_search_pe_pb_only: bool = False # limit web search to P/E and P/B only
web_workers: int = 2 # parallel Tavily workers (keep low!)
tavily_retries: int = 4 # retries on 429/432
tavily_backoff_seconds: float = 15.0 # backoff multiplier per retry
tavily_min_interval_seconds: float = 4.0 # minimum seconds between Tavily calls
@dataclass
class EnrichmentResult:
input_csv_path: str
enriched_csv_path: str
scratchpad_path: str | None = None
examined_cells: int = 0
updated_cells: int = 0
imputed_cells: int = 0
skipped_cells: int = 0
resolved_codes: int = 0 # NEW: how many scheme codes were resolved
# Optional breakdowns used by older callers / UIs
nav_cells: int = 0 # cells filled via NAV engine
web_cells: int = 0 # cells filled via web search
errors: list[str] = field(default_factory=list)
# ── Triage labels ────────────────────────────────────────────────────────────
TRIAGE_YOUNG = "YOUNG_FUND"
TRIAGE_NOT_APPLICABLE = "NOT_APPLICABLE"
TRIAGE_SEARCHABLE = "SEARCHABLE"
# ── Helpers ──────────────────────────────────────────────────────────────────
def _load_env() -> None:
env_path = Path(__file__).resolve().parent.parent / ".env"
if not env_path.exists():
return
for line in env_path.read_text(encoding="utf-8").splitlines():
raw = line.strip()
if not raw or raw.startswith("#") or "=" not in raw:
continue
k, v = raw.split("=", 1)
os.environ.setdefault(k.strip(), v.strip())
def _is_missing(val: str | None) -> bool:
return (val or "").strip().lower() in MISSING_TOKENS
def _parse_launch_date(val: str | None) -> datetime | None:
if not val:
return None
for fmt in ("%d-%m-%Y", "%Y-%m-%d", "%d/%m/%Y"):
try:
return datetime.strptime(val.strip(), fmt)
except ValueError:
continue
return None
def _is_debt_category(category: str) -> bool:
cat = (category or "").strip().lower()
for prefix in DEBT_CATEGORIES_PREFIXES:
if cat.startswith(prefix.lower()):
return True
return False
def _normalize_fund_name(name: str) -> str:
# FIX: use re.sub so multi-hyphen runs collapse to a single space
return re.sub(r"-+", " ", name).strip()
def _looks_like_category_label(fund_name: str, category: str) -> bool:
"""Return True for header/category rows that aren't real fund entries."""
fund = (fund_name or "").strip().lower()
cat = (category or "").strip().lower()
if not fund:
return True
if cat and fund == cat:
return True
# Patterns like "Equity: Contra" or "Debt: Short Duration" β€” no hyphen, no "fund"
if ":" in fund and "-" not in fund and not any(
tok in fund for tok in (" fund", " plan", " option", " scheme")
):
return True
return False
def _build_category_medians(
rows: list[dict[str, str]], columns: tuple[str, ...]
) -> dict[str, dict[str, float]]:
"""Returns {category: {column: median_value}}."""
buckets: dict[str, dict[str, list[float]]] = {}
for row in rows:
cat = row.get("Category", "")
if not cat:
continue
if cat not in buckets:
buckets[cat] = {c: [] for c in columns}
for col in columns:
raw = (row.get(col) or "").strip().replace("%", "").replace(",", "")
if raw.lower() in MISSING_TOKENS:
continue
try:
buckets[cat][col].append(float(raw))
except ValueError:
pass
result: dict[str, dict[str, float]] = {}
for cat, col_map in buckets.items():
result[cat] = {}
for col, vals in col_map.items():
if vals:
result[cat][col] = round(median(vals), 4)
return result
# ── Triage ───────────────────────────────────────────────────────────────────
@dataclass
class TriagedCell:
row_idx: int
fund_name: str
category: str
column: str
current_value: str
label: str
reason: str
def _triage_missing_cells(
rows: list[dict[str, str]],
config: EnrichmentConfig,
) -> list[TriagedCell]:
"""Classify every missing cell with reasoning."""
today = datetime.now()
cells: list[TriagedCell] = []
for idx, row in enumerate(rows):
fund = row.get("Fund", "")
cat = row.get("Category", "")
launch_str = row.get("Launch Date", "")
if config.filter_category and cat != config.filter_category:
continue
launch_dt = _parse_launch_date(launch_str)
if launch_dt is not None:
age_years = (today - launch_dt).days / 365.25
is_young = age_years < MIN_YEARS_FOR_RISK_METRICS
else:
# FIX: unknown date β†’ do NOT silently mark as young; attempt search
is_young = False
is_debt = _is_debt_category(cat)
for col in config.target_columns:
raw = (row.get(col) or "").strip()
# Base missing check (blank, "-", "N/A", etc.)
is_missing_val = _is_missing(raw)
# Additionally, for all ZERO_AS_MISSING_COLUMNS, treat an exact
# numeric 0 as "missing" so enrichment will try to fill it.
if not is_missing_val and col in ZERO_AS_MISSING_COLUMNS:
norm = raw.replace("%", "").replace(",", "").strip()
try:
if float(norm) == 0.0:
is_missing_val = True
except ValueError:
pass
if not is_missing_val:
continue
# PE/PB are point-in-time portfolio metrics β€” fund age is irrelevant.
# Always attempt to fetch them regardless of how young the fund is.
age_exempt = col in ("P/E Ratio", "P/B Ratio")
if is_young and not age_exempt:
cells.append(TriagedCell(
row_idx=idx, fund_name=fund, category=cat, column=col,
current_value=raw, label=TRIAGE_YOUNG,
reason=(f"Fund launched {launch_str or '(unknown)'}, "
f"<{MIN_YEARS_FOR_RISK_METRICS}yr history β€” metric not computed yet"),
))
else:
cells.append(TriagedCell(
row_idx=idx, fund_name=fund, category=cat, column=col,
current_value=raw, label=TRIAGE_SEARCHABLE,
reason=(f"Fund launched {launch_str or '(unknown date)'}, "
f"category '{cat}' β€” metric should exist, attempting NAV/web"),
))
return cells
# ── Markdown table parser ────────────────────────────────────────────────────
def _extract_number(text: str) -> float | None:
text = text.strip().replace(",", "")
if text.lower() in MISSING_TOKENS or text == "β€”":
return None
match = re.search(r"-?\d+\.?\d*", text)
if match:
try:
return float(match.group())
except ValueError:
return None
return None
def _parse_table_row(markdown: str, alias: str) -> float | None:
"""Extract the first numeric cell after the label in a markdown table row.
For a row like: | Alpha | 1.59 | -0.56 | 8.25 |
Returns 1.59 β€” the fund's own column (leftmost numeric value after the label).
This is intentional: sites like Morningstar show Fund | Category | Index
and we want the fund value, not the category or index value.
"""
pattern = re.compile(
r"\|\s*" + re.escape(alias) + r"\s*\|(.+?)(?:\n|$)",
re.IGNORECASE,
)
for m in pattern.finditer(markdown):
rest = m.group(1)
cells = [c.strip() for c in rest.split("|")]
for cell in cells:
val = _extract_number(cell)
if val is not None:
return val
return None
def _parse_metrics_from_markdown(
markdown: str, wanted_metrics: list[str]
) -> dict[str, float | None]:
found: dict[str, float | None] = {}
for metric in wanted_metrics:
aliases = METRIC_ALIASES.get(metric, [metric.lower()])
best_val: float | None = None
for alias in aliases:
if alias.lower() not in markdown.lower():
continue
val = _parse_table_row(markdown, alias)
if val is not None:
best_val = val
break
found[metric] = best_val
return found
# ── Web search (Firecrawl) ───────────────────────────────────────────────────
def _post_tavily(
endpoint: str,
payload: dict,
*,
retries: int,
backoff: float,
min_interval: float,
) -> "requests.Response | None":
"""Single-threaded Tavily dispatcher with shared cooldown and retry logic."""
global _TAVILY_LAST_CALL_TS, _TAVILY_BLOCKED_UNTIL
url = f"https://api.tavily.com/{endpoint}"
for attempt in range(retries + 1):
with _TAVILY_LOCK:
now = time.monotonic()
wait = max(
(_TAVILY_LAST_CALL_TS + min_interval) - now,
_TAVILY_BLOCKED_UNTIL - now,
0.0,
)
if wait > 0:
time.sleep(wait)
try:
resp = requests.post(
url,
headers={"Content-Type": "application/json"},
json=payload,
timeout=30,
)
_TAVILY_LAST_CALL_TS = time.monotonic()
except Exception as exc:
print(f" [tavily] {endpoint} connection error: {exc}")
return None
if resp.status_code in {429, 432}:
wait_retry = backoff * (attempt + 1)
print(f" [tavily] {endpoint} rate-limited ({resp.status_code}); "
f"waiting {wait_retry:.1f}s before retry")
with _TAVILY_LOCK:
_TAVILY_BLOCKED_UNTIL = max(
_TAVILY_BLOCKED_UNTIL, time.monotonic() + wait_retry
)
if attempt < retries:
continue
return None
try:
resp.raise_for_status()
except Exception as exc:
print(f" [tavily] {endpoint} HTTP error: {exc}")
return None
return resp
return None
def _call_tavily_search(
query: str,
api_key: str,
limit: int = 5,
*,
retries: int = 4,
backoff_seconds: float = 15.0,
min_interval_seconds: float = 4.0,
) -> list[dict]:
"""Search using Tavily API. Returns list of dicts with 'url' and 'markdown' keys."""
resp = _post_tavily(
"search",
{"api_key": api_key, "query": query, "max_results": limit,
"include_raw_content": True, "search_depth": "advanced"},
retries=retries, backoff=backoff_seconds, min_interval=min_interval_seconds,
)
if resp is None:
return []
results = []
for r in resp.json().get("results", []):
results.append({
"url": r.get("url", ""),
"markdown": r.get("raw_content") or r.get("content", ""),
})
return results
# Keep firecrawl as alias name so _search_fund_metrics calls work unchanged
_call_firecrawl_search = _call_tavily_search
def _scrape_url(
url: str,
api_key: str,
*,
retries: int = 4,
backoff_seconds: float = 15.0,
min_interval_seconds: float = 4.0,
) -> str:
"""Fetch page content using Tavily extract API."""
resp = _post_tavily(
"extract",
{"api_key": api_key, "urls": [url]},
retries=retries, backoff=backoff_seconds, min_interval=min_interval_seconds,
)
if resp is None:
return ""
results = resp.json().get("results", [])
return results[0].get("raw_content", "") if results else ""
def _derive_morningstar_risk_url(any_ms_url: str) -> str | None:
if "morningstar.in/mutualfunds/" not in any_ms_url:
return None
for suffix in ("fund-factsheet.aspx", "overview.aspx", "portfolio.aspx",
"performance.aspx", "detailed-portfolio.aspx"):
if suffix in any_ms_url:
return any_ms_url.replace(suffix, "risk-ratings.aspx")
if "risk-ratings.aspx" in any_ms_url:
return any_ms_url
return None
def _derive_morningstar_portfolio_url(any_ms_url: str) -> str | None:
"""Derive the Morningstar portfolio page (for P/E and P/B)."""
if "morningstar.in/mutualfunds/" not in any_ms_url:
return None
return re.sub(
r"(fund-factsheet|overview|risk-ratings|performance|detailed-portfolio)\.aspx",
"portfolio.aspx",
any_ms_url,
)
def _search_fund_metrics(
fund_name: str,
missing_metrics: list[str],
config: EnrichmentConfig,
firecrawl_key: str,
) -> tuple[dict[str, float | None], list[str]]:
from urllib.parse import urlparse
readable = _normalize_fund_name(fund_name)
query = f"{readable} risk rating alpha beta sharpe morningstar"
print(f" [search] query: {query[:80]}")
results = _call_firecrawl_search(
query, firecrawl_key, config.search_limit,
retries=config.tavily_retries,
backoff_seconds=config.tavily_backoff_seconds,
min_interval_seconds=config.tavily_min_interval_seconds,
)
if not results:
print(" [search] no results")
return {m: None for m in missing_metrics}, []
trusted, other = [], []
for r in results:
url = r.get("url", "")
domain = urlparse(url).netloc.lower().replace("www.", "")
(trusted if any(td in domain for td in config.trusted_domains) else other).append(r)
use = (trusted + other)[:3]
source_urls = [r.get("url", "") for r in use]
print(f" [search] using {len(use)} sources: {[urlparse(u).netloc for u in source_urls]}")
combined = ""
for r in use:
md = r.get("markdown", "")
if md:
combined += f"\n\n--- {r.get('url', '')} ---\n{md}"
# Morningstar: scrape risk-ratings page if not already in results
ms_risk_url = None
for r in use:
ms_risk_url = _derive_morningstar_risk_url(r.get("url", ""))
if ms_risk_url:
break
if ms_risk_url and "risk-ratings" not in " ".join(source_urls):
print(f" [scrape] Morningstar risk page: {ms_risk_url}")
risk_md = _scrape_url(ms_risk_url, firecrawl_key,
retries=config.tavily_retries,
backoff_seconds=config.tavily_backoff_seconds,
min_interval_seconds=config.tavily_min_interval_seconds,
)
if risk_md:
combined += f"\n\n--- {ms_risk_url} ---\n{risk_md}"
source_urls.append(ms_risk_url)
# Morningstar: scrape portfolio page for P/E and P/B
pe_pb_needed = {"P/E Ratio", "P/B Ratio"} & set(missing_metrics)
if pe_pb_needed and ms_risk_url:
ms_port_url = _derive_morningstar_portfolio_url(ms_risk_url)
if ms_port_url and ms_port_url not in source_urls:
print(f" [scrape] Morningstar portfolio page: {ms_port_url}")
port_md = _scrape_url(ms_port_url, firecrawl_key,
retries=config.tavily_retries,
backoff_seconds=config.tavily_backoff_seconds,
min_interval_seconds=config.tavily_min_interval_seconds,
)
if port_md:
combined += f"\n\n--- {ms_port_url} ---\n{port_md}"
source_urls.append(ms_port_url)
# If we still have no markdown content, or if later we still miss
# metrics, we'll do a second pass focused on ValueResearch.
if not combined.strip():
print(" [search] no markdown from initial sources; retrying via valueresearchonline…")
vr_query = f"{readable} {' '.join(missing_metrics)} valueresearchonline"
vr_results = _call_firecrawl_search(
vr_query, firecrawl_key, config.search_limit,
retries=config.tavily_retries,
backoff_seconds=config.tavily_backoff_seconds,
min_interval_seconds=config.tavily_min_interval_seconds,
)
if vr_results:
vr_combined = ""
for r in vr_results:
url = r.get("url", "")
domain = urlparse(url).netloc.lower().replace("www.", "")
if "valueresearchonline.com" not in domain:
continue
md = r.get("markdown", "")
if md:
vr_combined += f"\n\n--- {url} ---\n{md}"
source_urls.append(url)
combined = vr_combined
if not combined.strip():
print(" [search] no markdown content after ValueResearch retry")
return {m: None for m in missing_metrics}, source_urls
found = _parse_metrics_from_markdown(combined, missing_metrics)
for m, v in found.items():
print(f" [parsed] {m} = {v if v is not None else 'NOT FOUND'}")
return found, source_urls
# ── Scratchpad ───────────────────────────────────────────────────────────────
def _write_scratchpad(
path: Path,
triaged: list[TriagedCell],
resolved_codes: dict[str, str],
nav_results: dict[str, dict[str, float | None]],
web_results: dict[str, dict[str, float | None]],
web_sources: dict[str, list[str]],
medians_used: list[tuple[str, str, float]],
nav_filled: list[tuple[str, str, float]],
web_filled: list[tuple[str, str, float]],
) -> None:
lines = [
"=" * 70,
"ENRICHMENT SCRATCHPAD",
f"Generated: {datetime.now().isoformat()}",
"=" * 70, "",
]
if resolved_codes:
lines += ["-" * 70, f"SCHEME CODES RESOLVED ({len(resolved_codes)})", "-" * 70]
for fund, code in resolved_codes.items():
lines.append(f" {fund[:60]:60s} β†’ {code}")
lines.append("")
young = [c for c in triaged if c.label == TRIAGE_YOUNG]
na = [c for c in triaged if c.label == TRIAGE_NOT_APPLICABLE]
searchable = [c for c in triaged if c.label == TRIAGE_SEARCHABLE]
lines += [
f"TOTAL MISSING CELLS: {len(triaged)}",
f" YOUNG_FUND (auto-impute): {len(young)}",
f" NOT_APPLICABLE (auto-impute): {len(na)}",
f" SEARCHABLE (nav/web): {len(searchable)}",
"",
"-" * 70, "TRIAGE DECISIONS", "-" * 70,
]
for c in triaged:
lines.append(f" [{c.label:16s}] {c.fund_name} :: {c.column}")
lines.append(f" Reason: {c.reason}")
lines.append("")
if nav_results:
lines += ["-" * 70, "NAV ENGINE RESULTS (TRAILING 3Y)", "-" * 70]
for fund_key, metrics in nav_results.items():
lines.append(f" Fund: {fund_key}")
for metric, val in metrics.items():
lines.append(f" {metric}: {'FOUND = ' + str(val) if val is not None else 'NOT_FOUND'}")
lines.append("")
if web_results:
lines += ["-" * 70, "WEB SEARCH RESULTS", "-" * 70]
for fund_key, metrics in web_results.items():
lines.append(f" Fund: {fund_key}")
for s in web_sources.get(fund_key, []):
lines.append(f" Source: {s}")
for metric, val in metrics.items():
lines.append(f" {metric}: {'FOUND = ' + str(val) if val is not None else 'NOT_FOUND'}")
lines.append("")
for section_label, items in [
(f"NAV-FILLED VALUES ({len(nav_filled)})", nav_filled),
(f"WEB-FILLED VALUES ({len(web_filled)})", web_filled),
(f"CATEGORY-MEDIAN IMPUTED ({len(medians_used)})", medians_used),
]:
if items:
lines += ["-" * 70, section_label, "-" * 70]
for fund, col, val in items:
lines.append(f" {fund} :: {col} = {val}")
lines.append("")
lines += ["=" * 70, "END OF SCRATCHPAD", "=" * 70]
path.write_text("\n".join(lines), encoding="utf-8")
# ── Main entry point ─────────────────────────────────────────────────────────
def enrich_csv(
csv_path: str,
config: EnrichmentConfig | None = None,
) -> EnrichmentResult:
"""Parse CSV β†’ resolve codes β†’ triage β†’ NAV engine β†’ web fallback β†’ median impute β†’ write.
(Previously named enrich_csv_with_firecrawl_and_kimi; renamed for clarity.)
"""
if config is None:
config = EnrichmentConfig()
_load_env()
src = Path(csv_path)
result = EnrichmentResult(input_csv_path=csv_path, enriched_csv_path=csv_path)
if not config.enabled or not src.exists():
return result
with open(src, encoding="utf-8-sig", newline="") as f:
reader = csv.DictReader(f)
fieldnames = list(reader.fieldnames or [])
rows = list(reader)
if not rows:
return result
# ── Phase 0: Scheme Code Resolution ─────────────────────────────────
resolved_codes: dict[str, str] = {}
if config.resolve_scheme_codes:
print("[enrichment] Phase 0: Resolving missing scheme codes…")
rows, resolved_codes = resolve_missing_scheme_codes(rows, verbose=True)
result.resolved_codes = len(resolved_codes)
# ── Phase 1: Triage ──────────────────────────────────────────────────
print("[enrichment] Phase 1: Triage β€” classifying missing cells…")
triaged = _triage_missing_cells(rows, config)
result.examined_cells = len(triaged)
if not triaged:
print("[enrichment] No missing cells found.")
_write_output(src, rows, fieldnames, result)
return result
searchable = [c for c in triaged if c.label == TRIAGE_SEARCHABLE]
imputable = [c for c in triaged if c.label != TRIAGE_SEARCHABLE]
print(f"[enrichment] {len(triaged)} missing cells: "
f"{len(searchable)} SEARCHABLE, {len(imputable)} auto-impute")
if config.max_cells is not None:
searchable = searchable[:config.max_cells]
# ── Phase 2: Category medians ────────────────────────────────────────
print("[enrichment] Phase 2: Computing category medians…")
cat_medians = _build_category_medians(rows, config.target_columns)
# ── Phase 3: NAV engine ──────────────────────────────────────────────
nav_results: dict[str, dict[str, float | None]] = {}
nav_filled: list[tuple[str, str, float]] = []
if searchable and config.enable_nav_engine:
print("[enrichment] Phase 3: NAV engine β€” computing trailing 3Y metrics…")
nav_cache = NavEngineCache()
# All funds with missing cells go through NAV engine β€” including debt/liquid.
# Debt funds can have valid Sharpe, Mean, Volatility etc. from their NAV history.
searchable_for_nav = searchable
row_groups: dict[int, list[TriagedCell]] = {}
for cell in searchable_for_nav:
row_groups.setdefault(cell.row_idx, []).append(cell)
total_rows = len(row_groups)
processed_count = 0
nav_lock = __import__("threading").Lock()
NAV_WORKERS = 20 # mfapi is stateless REST β€” scales well beyond 12
# ── Pre-warm: bulk load NAV + benchmarks before workers touch network ──
# Step 1: Pull all valid scheme codes and unique benchmarks from rows
from src.nav_metrics_engine import _bulk_preload_cache, _prewarm_benchmarks
_scheme_codes = [
(rows[ri].get("Scheme Code") or "").strip()
for ri in row_groups
if (rows[ri].get("Scheme Code") or "").strip().isdigit()
]
_bench_tickers_raw = [
rows[ri].get("Benchmark Type", "") for ri in row_groups
]
# Step 2: Resolve benchmark type β†’ ticker (same logic as nav engine)
from src.nav_metrics_engine import resolve_benchmark_ticker
_bench_tickers = list(dict.fromkeys(
resolve_benchmark_ticker(b) for b in _bench_tickers_raw if b
))
# Step 3: Bulk load from Neon in 1 SQL query (nav + bench keys)
_bulk_preload_cache(_scheme_codes, _bench_tickers)
# Step 4: Download any cold benchmark tickers in parallel NOW,
# before workers start β€” eliminates yfinance contention
_prewarm_benchmarks(_bench_tickers)
def _process_one_fund(args):
row_idx, cells = args
row = rows[row_idx]
fund_name = row.get("Fund", "")
scheme_code = (row.get("Scheme Code") or "").strip()
benchmark_type = row.get("Benchmark Type", "")
needed_metrics = [c.column for c in cells]
if not scheme_code:
return fund_name, {}, cells
metrics, skip = compute_nav_metrics_for_scheme(
scheme_code=scheme_code,
benchmark_type=benchmark_type,
needed_metrics=needed_metrics,
cache=nav_cache,
)
joined_reasons = " | ".join(skip.values()).lower()
should_refresh_code = (
"returned no nav history" in joined_reasons
or "nav history is stale" in joined_reasons
)
if should_refresh_code:
refreshed_code, _ = resolve_scheme_code_for_fund_name(fund_name)
if refreshed_code and refreshed_code != scheme_code:
row["Scheme Code"] = refreshed_code
metrics, skip = compute_nav_metrics_for_scheme(
scheme_code=refreshed_code,
benchmark_type=benchmark_type,
needed_metrics=needed_metrics,
cache=nav_cache,
)
return fund_name, metrics, cells
from concurrent.futures import ThreadPoolExecutor, as_completed
work_items = list(row_groups.items())
with ThreadPoolExecutor(max_workers=NAV_WORKERS) as executor:
futures = {executor.submit(_process_one_fund, item): item for item in work_items}
for fut in as_completed(futures):
fund_name, metrics, cells = fut.result()
with nav_lock:
processed_count += 1
nav_results[fund_name] = metrics
for cell in cells:
val = metrics.get(cell.column)
if val is not None:
rows[cell.row_idx][cell.column] = str(round(float(val), 4))
result.updated_cells += 1
nav_filled.append((fund_name, cell.column, float(val)))
# Only mark as attempted if MDD was actually filled β€”
# drawdown_zero_fix should still retry funds where MDD came back None
if metrics.get("Maximum Drawdown") is not None:
_NAV_ATTEMPTED_FUNDS.add(fund_name)
if processed_count % 20 == 0 or processed_count == total_rows:
print(f" [nav] {processed_count}/{total_rows} funds processed…")
# Keep only still-missing searchable cells for web phase
searchable = [c for c in searchable if _is_missing(rows[c.row_idx].get(c.column, ""))]
print(f"[enrichment] NAV phase resolved {len(nav_filled)} cells; "
f"{len(searchable)} remain for web search")
result.nav_cells = len(nav_filled)
# ── Phase 4: Web search ──────────────────────────────────────────────
web_results: dict[str, dict[str, float | None]] = {}
web_sources: dict[str, list[str]] = {}
web_filled: list[tuple[str, str, float]] = []
firecrawl_key = os.environ.get("TAVILY_API_KEY", "")
# ── Phase 4a: Screener.in PE/PB (no API key needed) ──────────────────────
# Weighted average from holdings; results Neon-cached 30 days.
pe_pb_searchable = [c for c in searchable
if c.column in ("P/E Ratio", "P/B Ratio")]
pe_pb_funds: dict[str, list[TriagedCell]] = {}
for cell in pe_pb_searchable:
pe_pb_funds.setdefault(cell.fund_name, []).append(cell)
if pe_pb_funds:
try:
from src.pe_pb_engine import fetch_pe_pb as _fetch_pe_pb
except ImportError:
try:
from pe_pb_engine import fetch_pe_pb as _fetch_pe_pb
except ImportError:
_fetch_pe_pb = None # type: ignore
if _fetch_pe_pb is not None:
print(f"[enrichment] Phase 4a: NSE index PE/PB β€” {len(pe_pb_funds)} funds")
# Warm the index cache once before spawning threads
_fetch_pe_pb.__module__ and __import__(
_fetch_pe_pb.__module__.split(".")[0]
) # ensure module loaded
try:
from src.pe_pb_engine import warm_index_cache as _warm
except ImportError:
from pe_pb_engine import warm_index_cache as _warm
_warm()
pepb_lock = threading.Lock()
pepb_done = [0]
def _screener_one(args):
fund_name, cells = args
# Get benchmark_type from the row (not scheme_code)
bm = ""
for cell in cells:
row = rows[cell.row_idx]
if row:
bm = str(row.get("Benchmark Type") or "").strip()
if bm:
break
pe, pb = _fetch_pe_pb(benchmark_type=bm, fund_name=fund_name) if bm else (None, None)
found: dict[str, float] = {}
if pe is not None:
found["P/E Ratio"] = pe
if pb is not None:
found["P/B Ratio"] = pb
return fund_name, cells, found
from concurrent.futures import ThreadPoolExecutor as _PePbTPE, as_completed as _pepb_done_iter
with _PePbTPE(max_workers=5) as pepb_ex:
pepb_futs = {
pepb_ex.submit(_screener_one, item): item
for item in pe_pb_funds.items()
}
for fut in _pepb_done_iter(pepb_futs):
fund_name, cells, found = fut.result()
with pepb_lock:
pepb_done[0] += 1
web_results.setdefault(fund_name, {}).update(found)
web_sources.setdefault(fund_name, []).append("nse-index-pe")
for cell in cells:
val = found.get(cell.column)
if val is not None:
rows[cell.row_idx][cell.column] = str(val)
result.updated_cells += 1
web_filled.append((fund_name, cell.column, val))
print(f" [{pepb_done[0]}/{len(pe_pb_funds)}] "
f"{fund_name[:45]} -> {cell.column} = {val} (screener)")
# Re-filter: drop cells already filled by Screener
searchable = [c for c in searchable if _is_missing(rows[c.row_idx].get(c.column, "")
if rows[c.row_idx] else "")]
# ── Phase 4b: Tavily web search for remaining cells ───────────────────────
if searchable and firecrawl_key:
# Stamp run-id so stale background threads from prior runs can bail
_CURRENT_RUN_ID[0] += 1
my_run_id = _CURRENT_RUN_ID[0]
fund_groups: dict[str, list[TriagedCell]] = {}
for cell in searchable:
fund_groups.setdefault(cell.fund_name, []).append(cell)
print(f"[enrichment] Phase 4b: Tavily web search β€” {len(searchable)} cells "
f"across {len(fund_groups)} funds")
web_search_groups: dict[str, list[TriagedCell]] = {}
for fund_name, cells in fund_groups.items():
# Skip category header rows (e.g. "Equity: Contra") β€” not real funds
category = cells[0].category if cells else ""
if _looks_like_category_label(fund_name, category):
for cell in cells:
med = cat_medians.get(cell.category, {}).get(cell.column)
if med is not None and config.impute_unresolved:
rows[cell.row_idx][cell.column] = str(med)
result.updated_cells += 1
result.imputed_cells += 1
else:
result.skipped_cells += 1
continue
if config.web_search_pe_pb_only:
cells_to_impute = [c for c in cells if c.column not in ("P/E Ratio", "P/B Ratio")]
for cell in cells_to_impute:
med = cat_medians.get(cell.category, {}).get(cell.column)
if med is not None and config.impute_unresolved:
rows[cell.row_idx][cell.column] = str(med)
result.updated_cells += 1
result.imputed_cells += 1
cells_for_web = [c for c in cells if c.column in ("P/E Ratio", "P/B Ratio")]
else:
cells_for_web = cells
if cells_for_web:
web_search_groups[fund_name] = cells_for_web
WEB_WORKERS = max(1, config.web_workers)
web_lock = threading.Lock()
web_done = [0]
total_web = len(web_search_groups)
def _search_one_fund(args):
fund_name, cells = args
# Abort if a newer run has started (Streamlit rerun safety)
if _CURRENT_RUN_ID[0] != my_run_id:
return fund_name, cells, {}, []
missing_metrics = [c.column for c in cells]
found, sources = _search_fund_metrics(
fund_name, missing_metrics, config, firecrawl_key)
return fund_name, cells, found, sources
from concurrent.futures import ThreadPoolExecutor as _WebTPE, as_completed as _web_as_completed
with _WebTPE(max_workers=WEB_WORKERS) as web_executor:
futures = {
web_executor.submit(_search_one_fund, item): item
for item in web_search_groups.items()
}
for fut in _web_as_completed(futures):
fund_name, cells, found, sources = fut.result()
with web_lock:
web_done[0] += 1
web_results.setdefault(fund_name, {}).update(found)
web_sources.setdefault(fund_name, []).extend(sources)
print(f"\n[{web_done[0]}/{total_web}] {fund_name}")
for cell in cells:
val = found.get(cell.column)
if val is not None:
rows[cell.row_idx][cell.column] = str(val)
result.updated_cells += 1
web_filled.append((fund_name, cell.column, val))
print(f" -> {cell.column} = {val} (web)")
else:
med = cat_medians.get(cell.category, {}).get(cell.column)
if med is not None and config.impute_unresolved:
rows[cell.row_idx][cell.column] = str(med)
result.imputed_cells += 1
print(f" ~> {cell.column} = {med} (median)")
else:
result.skipped_cells += 1
print(f" x> {cell.column} β€” not found, no median")
elif searchable and not firecrawl_key:
print("[enrichment] No TAVILY_API_KEY β€” PE/PB via Screener only; remaining β†’ medians")
for cell in searchable:
med = cat_medians.get(cell.category, {}).get(cell.column)
if med is not None and config.impute_unresolved:
rows[cell.row_idx][cell.column] = str(med)
result.imputed_cells += 1
else:
result.skipped_cells += 1
# ── Phase 5: Impute non-searchable (YOUNG / NOT_APPLICABLE) cells ────
# FIX: was incorrectly labelled "Phase 4" in log
medians_used: list[tuple[str, str, float]] = []
if imputable and config.impute_unresolved:
print(f"\n[enrichment] Phase 5: Imputing {len(imputable)} non-searchable cells…")
for cell in imputable:
med = cat_medians.get(cell.category, {}).get(cell.column)
if med is not None:
rows[cell.row_idx][cell.column] = str(med)
result.imputed_cells += 1
medians_used.append((cell.fund_name, cell.column, med))
else:
result.skipped_cells += 1
elif imputable:
result.skipped_cells += len(imputable)
# Record how many cells came from web search
result.web_cells = len(web_filled)
# ── Phase 6: Write enriched CSV ──────────────────────────────────────
_write_output(src, rows, fieldnames, result)
# ── Phase 7: Scratchpad ──────────────────────────────────────────────
scratch_dir = Path("scratchpad")
scratch_dir.mkdir(exist_ok=True)
stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
scratch_path = scratch_dir / f"enrichment_{stamp}.txt"
_write_scratchpad(
scratch_path, triaged, resolved_codes,
nav_results, web_results, web_sources,
medians_used, nav_filled, web_filled,
)
result.scratchpad_path = str(scratch_path)
print(f"\n[enrichment] DONE β€” nav_filled={len(nav_filled)} web_filled={len(web_filled)} "
f"imputed={result.imputed_cells} skipped={result.skipped_cells}")
print(f"[enrichment] Enriched CSV : {result.enriched_csv_path}")
print(f"[enrichment] Scratchpad : {scratch_path}")
return result
def _write_output(
src: Path,
rows: list[dict[str, str]],
fieldnames: list[str],
result: EnrichmentResult,
) -> None:
out_dir = src.parent / "enriched"
out_dir.mkdir(exist_ok=True)
out_path = out_dir / f"enriched_{src.name}"
with open(out_path, "w", encoding="utf-8-sig", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
result.enriched_csv_path = str(out_path)
# Backward-compat alias (old name used in streamlit_app and run_enrichment_pipeline)
enrich_csv_with_firecrawl_and_kimi = enrich_csv
# ── Single metric lookup (for Streamlit UI) ──────────────────────────────────
def lookup_fund_metric_value(
fund_name: str,
column_name: str,
scheme_code: str = "",
config: EnrichmentConfig | None = None,
) -> dict[str, Any]:
_load_env()
if config is None:
config = EnrichmentConfig()
firecrawl_key = os.environ.get("TAVILY_API_KEY", "")
if not firecrawl_key:
return {"status": "error", "message": "TAVILY_API_KEY not set"}
found, sources = _search_fund_metrics(fund_name, [column_name], config, firecrawl_key)
val = found.get(column_name)
if val is not None:
return {"status": "found", "fund": fund_name, "metric": column_name,
"value": val, "sources": sources}
return {"status": "not_found", "fund": fund_name, "metric": column_name,
"value": None, "sources": sources}