# app.py import streamlit as st import pandas as pd import matplotlib.pyplot as plt import google.generativeai as genai import json, os, re, time from datetime import datetime from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime from sqlalchemy.orm import declarative_base, sessionmaker # ========== CONFIG ========== st.set_page_config(page_title="ECL Decision Assistant", layout="wide") GEN_API_KEY = os.getenv("GEMINI_API_KEY") if GEN_API_KEY: genai.configure(api_key=GEN_API_KEY) else: st.warning("GEMINI_API_KEY not found in env. Set it in HF Space secrets to enable AI decisions.") # Simple credential store (replace with secure store in production) USERS = { "analyst": {"password": "analyst123", "role": "analyst"}, "cro": {"password": "cro123", "role": "cro"}, } # SQLite DB for persisting reports DB_FILE = "reports.db" engine = create_engine(f"sqlite:///{DB_FILE}", connect_args={"check_same_thread": False}) Base = declarative_base() SessionLocal = sessionmaker(bind=engine) class Report(Base): __tablename__ = "reports" id = Column(Integer, primary_key=True, index=True) segment = Column(String) pd = Column(Float) lgd = Column(Float) ead = Column(Float) ecl = Column(Float) action = Column(String) rationale = Column(String) confidence = Column(Float) generated_by = Column(String) created_at = Column(DateTime) Base.metadata.create_all(bind=engine) # ========== UTILITIES ========== @st.cache_data def process_loan_data(df: pd.DataFrame, segment_col: str = "loan_intent"): """Compute PD, LGD, EAD, ECL by segment column.""" required = [segment_col, "credit_score", "loan_amnt", "loan_status"] df = df.dropna(subset=required) # ensure types df["loan_status"] = df["loan_status"].astype(int) df["credit_score"] = df["credit_score"].astype(float) df["loan_amnt"] = df["loan_amnt"].astype(float) group = df.groupby(segment_col) pd_seg = group["loan_status"].mean() lgd_seg = (1 - group["credit_score"].mean() / 850).clip(lower=0.0) ead_seg = group["loan_amnt"].sum() ecl_seg = pd_seg * lgd_seg * ead_seg ecl_df = pd.concat([pd_seg, lgd_seg, ead_seg, ecl_seg], axis=1) ecl_df.columns = ["PD", "LGD", "EAD", "ECL"] ecl_df = ecl_df.reset_index().rename(columns={segment_col: "segment"}) return ecl_df def sanitize_parse_json(text: str): """Extract first JSON object in text and parse it.""" if not text: raise ValueError("Empty response") # remove common markdown fences text = re.sub(r"^```json\s*", "", text, flags=re.IGNORECASE) text = re.sub(r"^```\s*", "", text) text = re.sub(r"```$", "", text) # find JSON block m = re.search(r"\{.*\}", text, flags=re.DOTALL) if m: text = m.group(0) # attempt load return json.loads(text) def get_gemini_decision_single(segment, pd_val, lgd_val, ead_val, ecl_val): """Single Gemini call per selected segment. Robust cleaning. Returns dict.""" # If API key missing, return deterministic fallback if not GEN_API_KEY: return {"action": "maintain", "rationale": "No API key configured", "confidence": 0.0} prompt = f""" You are a financial risk advisor. Return ONLY one valid JSON object with this schema: {{"action":"increase_interest"|"reduce_disbursement"|"maintain","rationale":"string","confidence":float}} Segment: {segment} PD: {pd_val:.3f} LGD: {lgd_val:.3f} EAD: {ead_val:,.0f} ECL: {ecl_val:,.0f} Rules: - PD > 0.25 => increase_interest - 0.20 <= PD <= 0.25 => reduce_disbursement - PD < 0.15 => maintain Respond with a single JSON object and nothing else. """ # Use model.generate_content with single prompt string (compat for HF) try: model = genai.GenerativeModel("gemini-2.5-flash-lite") resp = model.generate_content(prompt, generation_config={"temperature": 0.05}) raw = resp.text if hasattr(resp, "text") else str(resp) # parse data = sanitize_parse_json(raw) # validate keys for k in ("action", "rationale", "confidence"): if k not in data: raise ValueError(f"Missing key: {k}") return data except Exception as e: # handle rate limits explicitly msg = str(e) if "429" in msg or "Resource exhausted" in msg: return {"action": "maintain", "rationale": "API quota exhausted - retry later", "confidence": 0.0} # fallback deterministic rule as final fallback if pd_val > 0.25: return {"action": "increase_interest", "rationale": "PD > 0.25 (deterministic fallback)", "confidence": 0.8} if 0.20 <= pd_val <= 0.25: return {"action": "reduce_disbursement", "rationale": "PD in 0.20-0.25 (deterministic fallback)", "confidence": 0.7} return {"action": "maintain", "rationale": "Fallback - parse or API error", "confidence": 0.0} def save_report_to_db(row, decision, username): s = SessionLocal() r = Report( segment=row["segment"], pd=float(row["PD"]), lgd=float(row["LGD"]), ead=float(row["EAD"]), ecl=float(row["ECL"]), action=decision.get("action"), rationale=decision.get("rationale"), confidence=float(decision.get("confidence", 0.0)), generated_by=username, created_at=datetime.utcnow() ) s.add(r) s.commit() s.refresh(r) s.close() return r.id def load_reports_from_db(username, role): s = SessionLocal() if role == "cro": rows = s.query(Report).order_by(Report.created_at.desc()).all() else: rows = s.query(Report).filter(Report.generated_by == username).order_by(Report.created_at.desc()).all() df = pd.DataFrame([{ "id": r.id, "segment": r.segment, "pd": r.pd, "lgd": r.lgd, "ead": r.ead, "ecl": r.ecl, "action": r.action, "rationale": r.rationale, "confidence": r.confidence, "generated_by": r.generated_by, "created_at": r.created_at } for r in rows]) s.close() return df # ========== UI - AUTH ========== st.sidebar.title("Login") username = st.sidebar.text_input("Username") password = st.sidebar.text_input("Password", type="password") if "auth_ok" not in st.session_state: st.session_state.auth_ok = False if st.sidebar.button("Sign in"): user = USERS.get(username) if user and user["password"] == password: st.session_state.auth_ok = True st.session_state.username = username st.session_state.role = user["role"] st.sidebar.success(f"Signed in as {username} ({user['role']})") else: st.sidebar.error("Invalid credentials") if not st.session_state.auth_ok: st.stop() # ========== MAIN ========== st.header("ECL Decision Assistant") st.write(f"Signed in as **{st.session_state.username}** ({st.session_state.role})") # Upload CSV uploaded = st.file_uploader("Upload loan CSV (must contain loan_intent, credit_score, loan_amnt, loan_status)", type=["csv"]) if uploaded: df = pd.read_csv(uploaded) st.write("Sample rows:") st.dataframe(df.head(), width='stretch') # allow user to choose segmentation column seg_col = st.selectbox("Segment by column", options=[c for c in df.columns if df[c].dtype == object] , index=0) ecl_df = process_loan_data(df, segment_col=seg_col) st.subheader("Segment-level ECL Summary") st.dataframe(ecl_df, width='stretch') # Plots col1, col2 = st.columns(2) with col1: st.subheader("ECL by Segment") fig, ax = plt.subplots(figsize=(8, 3)) ax.bar(ecl_df["segment"], ecl_df["ECL"]) ax.set_xlabel("Segment"); ax.set_ylabel("ECL"); plt.xticks(rotation=45) st.pyplot(fig) with col2: st.subheader("PD by Segment") fig2, ax2 = plt.subplots(figsize=(8, 3)) ax2.bar(ecl_df["segment"], ecl_df["PD"], color="gray") ax2.set_xlabel("Segment"); ax2.set_ylabel("PD"); plt.xticks(rotation=45) st.pyplot(fig2) # Select single segment for Gemini st.subheader("Analyze one segment (single API call)") selected = st.selectbox("Choose a segment to analyze", ecl_df["segment"].tolist()) row = ecl_df[ecl_df["segment"] == selected].iloc[0] st.write(f"PD: {row.PD:.3f} | LGD: {row.LGD:.3f} | EAD: {row.EAD:,.0f} | ECL: {row.ECL:,.0f}") # Optionally show top segments only to reduce API usage max_n = len(ecl_df) default_n = min(5, max_n) top_n = st.number_input("Show top N segments by ECL (for reference)", min_value=1, max_value=max_n, value=default_n) st.write(ecl_df.sort_values("ECL", ascending=False).head(top_n)) if st.button("Request Gemini decision for selected segment"): with st.spinner("Querying Gemini (single call)..."): decision = get_gemini_decision_single(row["segment"], row["PD"], row["LGD"], row["EAD"], row["ECL"]) # save rec_id = save_report_to_db(row, decision, st.session_state.username) st.success("Decision recorded") st.json({"record_id": rec_id, "segment": row["segment"], "decision": decision}) # Historical reports section st.subheader("Past Reports") reports_df = load_reports_from_db(st.session_state.username, st.session_state.role) if not reports_df.empty: st.dataframe(reports_df, width='stretch') # allow filtering by action action_filter = st.selectbox("Filter by action (All / increase_interest / reduce_disbursement / maintain)", ["All", "increase_interest", "reduce_disbursement", "maintain"]) if action_filter != "All": st.dataframe(reports_df[reports_df["action"] == action_filter], width='stretch') if st.button("Download reports CSV"): st.download_button("Download", reports_df.to_csv(index=False).encode("utf-8"), file_name="reports.csv", mime="text/csv") else: st.info("No reports recorded yet (use 'Request Gemini decision' to create one).")