from __future__ import annotations
import io
import sys
import tempfile
import time
import traceback
from contextlib import redirect_stderr, redirect_stdout
from datetime import datetime
from pathlib import Path
from typing import Any
import streamlit as st
from src.csv_enrichment import (
TARGET_COLUMNS,
EnrichmentConfig,
enrich_csv,
)
from src.data_engine import run_data_engine
# ── Session logging ───────────────────────────────────────────────────────────
def _init_session_log() -> Path:
if "session_log_path" not in st.session_state:
log_dir = Path("logs") / "streamlit_sessions"
log_dir.mkdir(parents=True, exist_ok=True)
stamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
log_path = log_dir / f"session_{stamp}.log"
log_path.write_text(
f"[{datetime.now().isoformat()}] session_started\n",
encoding="utf-8",
)
st.session_state["session_log_path"] = str(log_path)
return Path(st.session_state["session_log_path"])
def _log_session_event(message: str) -> None:
try:
log_path = _init_session_log()
with log_path.open("a", encoding="utf-8") as f:
f.write(f"[{datetime.now().isoformat()}] {message}\n")
except Exception:
pass
def _log_session_block(title: str, content: str) -> None:
try:
log_path = _init_session_log()
with log_path.open("a", encoding="utf-8") as f:
f.write(f"[{datetime.now().isoformat()}] --- {title} (start) ---\n")
f.write((content.rstrip() + "\n") if content.strip() else "(no output)\n")
f.write(f"[{datetime.now().isoformat()}] --- {title} (end) ---\n")
except Exception:
pass
# ── Captured output runner ────────────────────────────────────────────────────
def _run_with_captured_output(func: Any, *args: Any, **kwargs: Any) -> tuple[Any, str]:
"""Run function, mirror prints to terminal, capture for UI display."""
class _TeeCapture(io.TextIOBase):
def __init__(self, mirror: Any, on_write: Any = None) -> None:
self._mirror = mirror
self._buffer = io.StringIO()
self._on_write = on_write
def write(self, s: str) -> int:
text = str(s)
self._buffer.write(text)
try:
self._mirror.write(text)
self._mirror.flush()
except Exception:
pass
if self._on_write is not None:
try:
self._on_write(text)
except Exception:
pass
return len(text)
def flush(self) -> None:
try:
self._mirror.flush()
except Exception:
pass
def getvalue(self) -> str:
return self._buffer.getvalue()
live_callback = kwargs.pop("live_callback", None)
out_tee = _TeeCapture(sys.__stdout__, live_callback)
err_tee = _TeeCapture(sys.__stderr__, live_callback)
with redirect_stdout(out_tee), redirect_stderr(err_tee):
result = func(*args, **kwargs)
return result, out_tee.getvalue() + err_tee.getvalue()
# ── CSS ───────────────────────────────────────────────────────────────────────
def _inject_custom_css() -> None:
st.markdown(
"""
""",
unsafe_allow_html=True,
)
# ── Main ──────────────────────────────────────────────────────────────────────
def main() -> None:
st.set_page_config(
page_title="MF Scoring Engine · Advisor Demo",
page_icon="📈",
layout="centered",
)
_inject_custom_css()
_init_session_log()
_log_session_event("app_rendered")
st.markdown('
', unsafe_allow_html=True)
st.markdown(
"""
Advisor tool
Score your mutual fund list in Excel.
Upload your mutual fund CSV. The app runs enrichment (NAV engine → web fallback → median),
scores every fund, and gives you a ready-to-share Excel workbook.
""",
unsafe_allow_html=True,
)
st.markdown('
', unsafe_allow_html=True)
tab_run, tab_about = st.tabs(["Run analysis", "How scoring works"])
with tab_run:
st.markdown("### Upload CSV & generate workbook")
st.markdown(
"""
Upload your standard fund universe CSV
(Fund, Benchmark Type, CAGR columns, etc.).
P/E and P/B are computed from AMFI monthly holdings (active funds) or NSE index API (index funds) —
all risk metrics (Alpha, Sharpe, Sortino, etc.) are computed directly from NAV history.
""",
unsafe_allow_html=True,
)
uploaded_file = st.file_uploader(
"Step 1 · Upload fund universe CSV",
type=["csv"],
help="Same CSV you feed into the offline data engine.",
)
if uploaded_file is not None:
st.caption(
f"Selected: **{uploaded_file.name}** · "
f"{(len(uploaded_file.getbuffer()) / 1024):.1f} KB"
)
_log_session_event(
f"uploaded_file name={uploaded_file.name} "
f"size_kb={(len(uploaded_file.getbuffer())/1024):.1f}"
)
st.info(
"Pipeline: **Scheme code resolution → NAV engine (parallel) "
"→ PE/PB via AMFI holdings + NSE API → category median fallback → scoring engine**"
)
st.markdown(
"""
- 1 — Upload your latest CSV export.
- 2 — Click Run analysis and watch live logs.
- 3 — Download the scored Excel when complete.
""",
unsafe_allow_html=True,
)
run_clicked = st.button(
"Step 2 · Run analysis",
type="primary",
use_container_width=True,
disabled=uploaded_file is None,
)
# ── State carried across rerun ─────────────────────────────────────
generated_bytes: io.BytesIO | None = None
generated_filename: str | None = None
funds_count: int | None = None
categories_count: int | None = None
enrichment_summary: str | None = None
timing_html: str | None = None
if run_clicked:
_log_session_event("run_analysis_clicked")
if uploaded_file is None:
st.warning("Please upload a CSV file first.")
_log_session_event("run_aborted_no_upload")
else:
base_stem = Path(uploaded_file.name).stem
stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
input_stem = f"{base_stem}_{stamp}"
with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as tmp:
tmp.write(uploaded_file.getbuffer())
input_path = Path(tmp.name)
out_dir = Path("output")
out_dir.mkdir(exist_ok=True)
generated_path = out_dir / f"fund_analysis_{input_stem}.xlsx"
t_total_start = time.perf_counter()
try:
with st.status("Processing…", expanded=True) as status:
live_lines: list[str] = []
live_box = st.empty()
# Noise patterns to suppress from the live log box
_SUPPRESS = (
"missing ScriptRunContext",
"FutureWarning",
"Passing literal json",
"To read from a literal string",
"return pd.read_json",
)
def _live_sink(chunk: str) -> None:
clean = chunk.replace("\r", "")
new = [
ln for ln in clean.split("\n")
if ln.strip()
and not any(s in ln for s in _SUPPRESS)
]
if not new:
return
live_lines.extend(new)
if len(live_lines) > 50:
del live_lines[:-50]
live_box.code("\n".join(live_lines), language="text")
# ── Phase 1: Enrichment ────────────────────────────
st.write("**1/2 Enrichment** — scheme codes → NAV engine → PE/PB → medians…")
t_enrich_start = time.perf_counter()
enrichment, enrich_output = _run_with_captured_output(
enrich_csv,
str(input_path),
config=EnrichmentConfig(
enabled=True,
max_cells=None,
resolve_scheme_codes=True,
enable_nav_engine=True,
impute_unresolved=True,
),
live_callback=_live_sink,
)
t_enrich_end = time.perf_counter()
enrich_secs = t_enrich_end - t_enrich_start
_log_session_block("enrichment_output", enrich_output)
_log_session_event(
f"enrichment_done "
f"checked={enrichment.examined_cells} "
f"nav={enrichment.nav_cells} "
f"imputed={enrichment.imputed_cells} "
f"skipped={enrichment.skipped_cells} "
f"codes={enrichment.resolved_codes} "
f"secs={enrich_secs:.1f}"
)
st.write(
f" ✅ Enrichment done in **{enrich_secs:.0f}s** — "
f"checked {enrichment.examined_cells} cells, "
f"NAV filled {enrichment.nav_cells}, "
f"imputed {enrichment.imputed_cells}"
)
pipeline_input_path = Path(enrichment.enriched_csv_path)
# ── Phase 2: Scoring + Excel ───────────────────────
st.write("**2/2 Scoring engine** — computing scores, ranking, generating Excel…")
t_engine_start = time.perf_counter()
funds, engine_output = _run_with_captured_output(
run_data_engine,
csv_path=str(pipeline_input_path),
output_path=str(generated_path),
use_comprehensive_scoring=True,
live_callback=_live_sink,
)
t_engine_end = time.perf_counter()
engine_secs = t_engine_end - t_engine_start
total_secs = time.perf_counter() - t_total_start
_log_session_block("engine_output", engine_output)
_log_session_event(
f"engine_done funds={len(funds)} "
f"secs={engine_secs:.1f} total={total_secs:.1f}"
)
st.write(
f" ✅ Scoring done in **{engine_secs:.0f}s** — "
f"{len(funds)} funds scored"
)
status.update(
label=f"✅ Complete — {total_secs:.0f}s total",
state="complete",
expanded=False,
)
except Exception as exc:
err_text = "".join(traceback.format_exception(exc))
_log_session_block("run_failure", err_text)
_log_session_event(f"run_failed error={exc}")
st.error("Run failed. See terminal for traceback.")
st.code(err_text, language="text")
return
# ── Summary ────────────────────────────────────────────────
if enrichment.errors:
st.warning("Enrichment completed with warnings — check scratchpad for details.")
if enrichment.scratchpad_path:
st.caption(f"Scratchpad: `{enrichment.scratchpad_path}`")
enrichment_summary = (
f"Enrichment: {enrichment.examined_cells} cells checked — "
f"NAV filled {enrichment.nav_cells}, "
f"imputed {enrichment.imputed_cells}, "
f"skipped {enrichment.skipped_cells}."
)
timing_html = (
f''
f'⏱ Enrichment: {enrich_secs:.0f}s | '
f'Scoring: {engine_secs:.0f}s | '
f'Total: {total_secs:.0f}s ({total_secs/60:.1f} min)'
f"{' 🎯 Under 3 min!' if total_secs < 180 else ''}"
f'
'
)
with generated_path.open("rb") as f:
generated_bytes = io.BytesIO(f.read())
generated_filename = generated_path.name
funds_count = len(funds)
categories_count = len({f.category for f in funds})
st.success("Step 3 · Excel ready — download below.")
if enrichment_summary:
st.info(enrichment_summary)
# ── Download area (persists after rerun) ──────────────────────────
if generated_bytes and generated_filename:
if timing_html:
st.markdown(timing_html, unsafe_allow_html=True)
st.markdown(
"""
Schemes scored
{funds_count}
Categories
{categories_count}
Output format
Excel (.xlsx)
""".format(
funds_count=funds_count or 0,
categories_count=categories_count or 0,
),
unsafe_allow_html=True,
)
st.markdown(
'Download the scored workbook:
',
unsafe_allow_html=True,
)
st.download_button(
label="⬇️ Download processed Excel",
data=generated_bytes.getvalue(),
file_name=generated_filename,
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
use_container_width=True,
)
with tab_about:
st.markdown("### What the pipeline does")
st.markdown(
"""
| Phase | What happens |
|---|---|
| **0 — Scheme resolution** | Parallel fuzzy-match of missing AMFI scheme codes (8 threads) |
| **1 — NAV engine** | Trailing 3Y risk metrics computed from mfapi NAV history (12 threads) |
| **2 — PE/PB engine** | Active funds: AMFI monthly holdings weighted PE/PB (same as Groww). Index funds: NSE index API |
| **3 — Median impute** | Category median fills remaining gaps for ≥3Y funds. Young funds (<3Y) marked NA |
| **4 — Scoring** | Top/Bottom 10 per category, 10-point weighted model |
| **5 — Excel export** | Conditional formatting, quartile bands, benchmark rows |
**Cache**: NAV history is cached in Neon (production) or SQLite (local) with a 7-day TTL.
Second runs are near-instant for cached funds.
"""
)
st.markdown("", unsafe_allow_html=True)
st.markdown("
", unsafe_allow_html=True)
if __name__ == "__main__":
main()