from __future__ import annotations import io import sys import tempfile import time import traceback from contextlib import redirect_stderr, redirect_stdout from datetime import datetime from pathlib import Path from typing import Any import streamlit as st from src.csv_enrichment import ( TARGET_COLUMNS, EnrichmentConfig, enrich_csv, ) from src.data_engine import run_data_engine # ── Session logging ─────────────────────────────────────────────────────────── def _init_session_log() -> Path: if "session_log_path" not in st.session_state: log_dir = Path("logs") / "streamlit_sessions" log_dir.mkdir(parents=True, exist_ok=True) stamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") log_path = log_dir / f"session_{stamp}.log" log_path.write_text( f"[{datetime.now().isoformat()}] session_started\n", encoding="utf-8", ) st.session_state["session_log_path"] = str(log_path) return Path(st.session_state["session_log_path"]) def _log_session_event(message: str) -> None: try: log_path = _init_session_log() with log_path.open("a", encoding="utf-8") as f: f.write(f"[{datetime.now().isoformat()}] {message}\n") except Exception: pass def _log_session_block(title: str, content: str) -> None: try: log_path = _init_session_log() with log_path.open("a", encoding="utf-8") as f: f.write(f"[{datetime.now().isoformat()}] --- {title} (start) ---\n") f.write((content.rstrip() + "\n") if content.strip() else "(no output)\n") f.write(f"[{datetime.now().isoformat()}] --- {title} (end) ---\n") except Exception: pass # ── Captured output runner ──────────────────────────────────────────────────── def _run_with_captured_output(func: Any, *args: Any, **kwargs: Any) -> tuple[Any, str]: """Run function, mirror prints to terminal, capture for UI display.""" class _TeeCapture(io.TextIOBase): def __init__(self, mirror: Any, on_write: Any = None) -> None: self._mirror = mirror self._buffer = io.StringIO() self._on_write = on_write def write(self, s: str) -> int: text = str(s) self._buffer.write(text) try: self._mirror.write(text) self._mirror.flush() except Exception: pass if self._on_write is not None: try: self._on_write(text) except Exception: pass return len(text) def flush(self) -> None: try: self._mirror.flush() except Exception: pass def getvalue(self) -> str: return self._buffer.getvalue() live_callback = kwargs.pop("live_callback", None) out_tee = _TeeCapture(sys.__stdout__, live_callback) err_tee = _TeeCapture(sys.__stderr__, live_callback) with redirect_stdout(out_tee), redirect_stderr(err_tee): result = func(*args, **kwargs) return result, out_tee.getvalue() + err_tee.getvalue() # ── CSS ─────────────────────────────────────────────────────────────────────── def _inject_custom_css() -> None: st.markdown( """ """, unsafe_allow_html=True, ) # ── Main ────────────────────────────────────────────────────────────────────── def main() -> None: st.set_page_config( page_title="MF Scoring Engine · Advisor Demo", page_icon="📈", layout="centered", ) _inject_custom_css() _init_session_log() _log_session_event("app_rendered") st.markdown('
', unsafe_allow_html=True) st.markdown( """
Advisor tool
Score your mutual fund list in Excel.

Upload your mutual fund CSV. The app runs enrichment (NAV engine → web fallback → median), scores every fund, and gives you a ready-to-share Excel workbook.

""", unsafe_allow_html=True, ) st.markdown('
', unsafe_allow_html=True) tab_run, tab_about = st.tabs(["Run analysis", "How scoring works"]) with tab_run: st.markdown("### Upload CSV & generate workbook") st.markdown( """

Upload your standard fund universe CSV (Fund, Benchmark Type, CAGR columns, etc.).
P/E and P/B are computed from AMFI monthly holdings (active funds) or NSE index API (index funds) — all risk metrics (Alpha, Sharpe, Sortino, etc.) are computed directly from NAV history.

""", unsafe_allow_html=True, ) uploaded_file = st.file_uploader( "Step 1 · Upload fund universe CSV", type=["csv"], help="Same CSV you feed into the offline data engine.", ) if uploaded_file is not None: st.caption( f"Selected: **{uploaded_file.name}** · " f"{(len(uploaded_file.getbuffer()) / 1024):.1f} KB" ) _log_session_event( f"uploaded_file name={uploaded_file.name} " f"size_kb={(len(uploaded_file.getbuffer())/1024):.1f}" ) st.info( "Pipeline: **Scheme code resolution → NAV engine (parallel) " "→ PE/PB via AMFI holdings + NSE API → category median fallback → scoring engine**" ) st.markdown( """ """, unsafe_allow_html=True, ) run_clicked = st.button( "Step 2 · Run analysis", type="primary", use_container_width=True, disabled=uploaded_file is None, ) # ── State carried across rerun ───────────────────────────────────── generated_bytes: io.BytesIO | None = None generated_filename: str | None = None funds_count: int | None = None categories_count: int | None = None enrichment_summary: str | None = None timing_html: str | None = None if run_clicked: _log_session_event("run_analysis_clicked") if uploaded_file is None: st.warning("Please upload a CSV file first.") _log_session_event("run_aborted_no_upload") else: base_stem = Path(uploaded_file.name).stem stamp = datetime.now().strftime("%Y%m%d_%H%M%S") input_stem = f"{base_stem}_{stamp}" with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as tmp: tmp.write(uploaded_file.getbuffer()) input_path = Path(tmp.name) out_dir = Path("output") out_dir.mkdir(exist_ok=True) generated_path = out_dir / f"fund_analysis_{input_stem}.xlsx" t_total_start = time.perf_counter() try: with st.status("Processing…", expanded=True) as status: live_lines: list[str] = [] live_box = st.empty() # Noise patterns to suppress from the live log box _SUPPRESS = ( "missing ScriptRunContext", "FutureWarning", "Passing literal json", "To read from a literal string", "return pd.read_json", ) def _live_sink(chunk: str) -> None: clean = chunk.replace("\r", "") new = [ ln for ln in clean.split("\n") if ln.strip() and not any(s in ln for s in _SUPPRESS) ] if not new: return live_lines.extend(new) if len(live_lines) > 50: del live_lines[:-50] live_box.code("\n".join(live_lines), language="text") # ── Phase 1: Enrichment ──────────────────────────── st.write("**1/2 Enrichment** — scheme codes → NAV engine → PE/PB → medians…") t_enrich_start = time.perf_counter() enrichment, enrich_output = _run_with_captured_output( enrich_csv, str(input_path), config=EnrichmentConfig( enabled=True, max_cells=None, resolve_scheme_codes=True, enable_nav_engine=True, impute_unresolved=True, ), live_callback=_live_sink, ) t_enrich_end = time.perf_counter() enrich_secs = t_enrich_end - t_enrich_start _log_session_block("enrichment_output", enrich_output) _log_session_event( f"enrichment_done " f"checked={enrichment.examined_cells} " f"nav={enrichment.nav_cells} " f"imputed={enrichment.imputed_cells} " f"skipped={enrichment.skipped_cells} " f"codes={enrichment.resolved_codes} " f"secs={enrich_secs:.1f}" ) st.write( f" ✅ Enrichment done in **{enrich_secs:.0f}s** — " f"checked {enrichment.examined_cells} cells, " f"NAV filled {enrichment.nav_cells}, " f"imputed {enrichment.imputed_cells}" ) pipeline_input_path = Path(enrichment.enriched_csv_path) # ── Phase 2: Scoring + Excel ─────────────────────── st.write("**2/2 Scoring engine** — computing scores, ranking, generating Excel…") t_engine_start = time.perf_counter() funds, engine_output = _run_with_captured_output( run_data_engine, csv_path=str(pipeline_input_path), output_path=str(generated_path), use_comprehensive_scoring=True, live_callback=_live_sink, ) t_engine_end = time.perf_counter() engine_secs = t_engine_end - t_engine_start total_secs = time.perf_counter() - t_total_start _log_session_block("engine_output", engine_output) _log_session_event( f"engine_done funds={len(funds)} " f"secs={engine_secs:.1f} total={total_secs:.1f}" ) st.write( f" ✅ Scoring done in **{engine_secs:.0f}s** — " f"{len(funds)} funds scored" ) status.update( label=f"✅ Complete — {total_secs:.0f}s total", state="complete", expanded=False, ) except Exception as exc: err_text = "".join(traceback.format_exception(exc)) _log_session_block("run_failure", err_text) _log_session_event(f"run_failed error={exc}") st.error("Run failed. See terminal for traceback.") st.code(err_text, language="text") return # ── Summary ──────────────────────────────────────────────── if enrichment.errors: st.warning("Enrichment completed with warnings — check scratchpad for details.") if enrichment.scratchpad_path: st.caption(f"Scratchpad: `{enrichment.scratchpad_path}`") enrichment_summary = ( f"Enrichment: {enrichment.examined_cells} cells checked — " f"NAV filled {enrichment.nav_cells}, " f"imputed {enrichment.imputed_cells}, " f"skipped {enrichment.skipped_cells}." ) timing_html = ( f'
' f'⏱ Enrichment: {enrich_secs:.0f}s  |  ' f'Scoring: {engine_secs:.0f}s  |  ' f'Total: {total_secs:.0f}s ({total_secs/60:.1f} min)' f"{'  🎯 Under 3 min!' if total_secs < 180 else ''}" f'
' ) with generated_path.open("rb") as f: generated_bytes = io.BytesIO(f.read()) generated_filename = generated_path.name funds_count = len(funds) categories_count = len({f.category for f in funds}) st.success("Step 3 · Excel ready — download below.") if enrichment_summary: st.info(enrichment_summary) # ── Download area (persists after rerun) ────────────────────────── if generated_bytes and generated_filename: if timing_html: st.markdown(timing_html, unsafe_allow_html=True) st.markdown( """
Schemes scored
{funds_count}
Categories
{categories_count}
Output format
Excel (.xlsx)
""".format( funds_count=funds_count or 0, categories_count=categories_count or 0, ), unsafe_allow_html=True, ) st.markdown( '
Download the scored workbook:
', unsafe_allow_html=True, ) st.download_button( label="⬇️ Download processed Excel", data=generated_bytes.getvalue(), file_name=generated_filename, mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", use_container_width=True, ) with tab_about: st.markdown("### What the pipeline does") st.markdown( """ | Phase | What happens | |---|---| | **0 — Scheme resolution** | Parallel fuzzy-match of missing AMFI scheme codes (8 threads) | | **1 — NAV engine** | Trailing 3Y risk metrics computed from mfapi NAV history (12 threads) | | **2 — PE/PB engine** | Active funds: AMFI monthly holdings weighted PE/PB (same as Groww). Index funds: NSE index API | | **3 — Median impute** | Category median fills remaining gaps for ≥3Y funds. Young funds (<3Y) marked NA | | **4 — Scoring** | Top/Bottom 10 per category, 10-point weighted model | | **5 — Excel export** | Conditional formatting, quartile bands, benchmark rows | **Cache**: NAV history is cached in Neon (production) or SQLite (local) with a 7-day TTL. Second runs are near-instant for cached funds. """ ) st.markdown("
", unsafe_allow_html=True) st.markdown("
", unsafe_allow_html=True) if __name__ == "__main__": main()