Spaces:
Sleeping
Sleeping
| # app.py | |
| import streamlit as st | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import google.generativeai as genai | |
| import json, os, re, time | |
| from datetime import datetime | |
| from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime | |
| from sqlalchemy.orm import declarative_base, sessionmaker | |
| # ========== CONFIG ========== | |
| st.set_page_config(page_title="ECL Decision Assistant", layout="wide") | |
| GEN_API_KEY = os.getenv("GEMINI_API_KEY") | |
| if GEN_API_KEY: | |
| genai.configure(api_key=GEN_API_KEY) | |
| else: | |
| st.warning("GEMINI_API_KEY not found in env. Set it in HF Space secrets to enable AI decisions.") | |
| # Simple credential store (replace with secure store in production) | |
| USERS = { | |
| "analyst": {"password": "analyst123", "role": "analyst"}, | |
| "cro": {"password": "cro123", "role": "cro"}, | |
| } | |
| # SQLite DB for persisting reports | |
| DB_FILE = "reports.db" | |
| engine = create_engine(f"sqlite:///{DB_FILE}", connect_args={"check_same_thread": False}) | |
| Base = declarative_base() | |
| SessionLocal = sessionmaker(bind=engine) | |
| class Report(Base): | |
| __tablename__ = "reports" | |
| id = Column(Integer, primary_key=True, index=True) | |
| segment = Column(String) | |
| pd = Column(Float) | |
| lgd = Column(Float) | |
| ead = Column(Float) | |
| ecl = Column(Float) | |
| action = Column(String) | |
| rationale = Column(String) | |
| confidence = Column(Float) | |
| generated_by = Column(String) | |
| created_at = Column(DateTime) | |
| Base.metadata.create_all(bind=engine) | |
| # ========== UTILITIES ========== | |
| def process_loan_data(df: pd.DataFrame, segment_col: str = "loan_intent"): | |
| """Compute PD, LGD, EAD, ECL by segment column.""" | |
| required = [segment_col, "credit_score", "loan_amnt", "loan_status"] | |
| df = df.dropna(subset=required) | |
| # ensure types | |
| df["loan_status"] = df["loan_status"].astype(int) | |
| df["credit_score"] = df["credit_score"].astype(float) | |
| df["loan_amnt"] = df["loan_amnt"].astype(float) | |
| group = df.groupby(segment_col) | |
| pd_seg = group["loan_status"].mean() | |
| lgd_seg = (1 - group["credit_score"].mean() / 850).clip(lower=0.0) | |
| ead_seg = group["loan_amnt"].sum() | |
| ecl_seg = pd_seg * lgd_seg * ead_seg | |
| ecl_df = pd.concat([pd_seg, lgd_seg, ead_seg, ecl_seg], axis=1) | |
| ecl_df.columns = ["PD", "LGD", "EAD", "ECL"] | |
| ecl_df = ecl_df.reset_index().rename(columns={segment_col: "segment"}) | |
| return ecl_df | |
| def sanitize_parse_json(text: str): | |
| """Extract first JSON object in text and parse it.""" | |
| if not text: | |
| raise ValueError("Empty response") | |
| # remove common markdown fences | |
| text = re.sub(r"^```json\s*", "", text, flags=re.IGNORECASE) | |
| text = re.sub(r"^```\s*", "", text) | |
| text = re.sub(r"```$", "", text) | |
| # find JSON block | |
| m = re.search(r"\{.*\}", text, flags=re.DOTALL) | |
| if m: | |
| text = m.group(0) | |
| # attempt load | |
| return json.loads(text) | |
| def get_gemini_decision_single(segment, pd_val, lgd_val, ead_val, ecl_val): | |
| """Single Gemini call per selected segment. Robust cleaning. Returns dict.""" | |
| # If API key missing, return deterministic fallback | |
| if not GEN_API_KEY: | |
| return {"action": "maintain", "rationale": "No API key configured", "confidence": 0.0} | |
| prompt = f""" | |
| You are a financial risk advisor. Return ONLY one valid JSON object with this schema: | |
| {{"action":"increase_interest"|"reduce_disbursement"|"maintain","rationale":"string","confidence":float}} | |
| Segment: {segment} | |
| PD: {pd_val:.3f} | |
| LGD: {lgd_val:.3f} | |
| EAD: {ead_val:,.0f} | |
| ECL: {ecl_val:,.0f} | |
| Rules: | |
| - PD > 0.25 => increase_interest | |
| - 0.20 <= PD <= 0.25 => reduce_disbursement | |
| - PD < 0.15 => maintain | |
| Respond with a single JSON object and nothing else. | |
| """ | |
| # Use model.generate_content with single prompt string (compat for HF) | |
| try: | |
| model = genai.GenerativeModel("gemini-2.5-flash-lite") | |
| resp = model.generate_content(prompt, generation_config={"temperature": 0.05}) | |
| raw = resp.text if hasattr(resp, "text") else str(resp) | |
| # parse | |
| data = sanitize_parse_json(raw) | |
| # validate keys | |
| for k in ("action", "rationale", "confidence"): | |
| if k not in data: | |
| raise ValueError(f"Missing key: {k}") | |
| return data | |
| except Exception as e: | |
| # handle rate limits explicitly | |
| msg = str(e) | |
| if "429" in msg or "Resource exhausted" in msg: | |
| return {"action": "maintain", "rationale": "API quota exhausted - retry later", "confidence": 0.0} | |
| # fallback deterministic rule as final fallback | |
| if pd_val > 0.25: | |
| return {"action": "increase_interest", "rationale": "PD > 0.25 (deterministic fallback)", "confidence": 0.8} | |
| if 0.20 <= pd_val <= 0.25: | |
| return {"action": "reduce_disbursement", "rationale": "PD in 0.20-0.25 (deterministic fallback)", "confidence": 0.7} | |
| return {"action": "maintain", "rationale": "Fallback - parse or API error", "confidence": 0.0} | |
| def save_report_to_db(row, decision, username): | |
| s = SessionLocal() | |
| r = Report( | |
| segment=row["segment"], | |
| pd=float(row["PD"]), | |
| lgd=float(row["LGD"]), | |
| ead=float(row["EAD"]), | |
| ecl=float(row["ECL"]), | |
| action=decision.get("action"), | |
| rationale=decision.get("rationale"), | |
| confidence=float(decision.get("confidence", 0.0)), | |
| generated_by=username, | |
| created_at=datetime.utcnow() | |
| ) | |
| s.add(r) | |
| s.commit() | |
| s.refresh(r) | |
| s.close() | |
| return r.id | |
| def load_reports_from_db(username, role): | |
| s = SessionLocal() | |
| if role == "cro": | |
| rows = s.query(Report).order_by(Report.created_at.desc()).all() | |
| else: | |
| rows = s.query(Report).filter(Report.generated_by == username).order_by(Report.created_at.desc()).all() | |
| df = pd.DataFrame([{ | |
| "id": r.id, | |
| "segment": r.segment, | |
| "pd": r.pd, | |
| "lgd": r.lgd, | |
| "ead": r.ead, | |
| "ecl": r.ecl, | |
| "action": r.action, | |
| "rationale": r.rationale, | |
| "confidence": r.confidence, | |
| "generated_by": r.generated_by, | |
| "created_at": r.created_at | |
| } for r in rows]) | |
| s.close() | |
| return df | |
| # ========== UI - AUTH ========== | |
| st.sidebar.title("Login") | |
| username = st.sidebar.text_input("Username") | |
| password = st.sidebar.text_input("Password", type="password") | |
| if "auth_ok" not in st.session_state: | |
| st.session_state.auth_ok = False | |
| if st.sidebar.button("Sign in"): | |
| user = USERS.get(username) | |
| if user and user["password"] == password: | |
| st.session_state.auth_ok = True | |
| st.session_state.username = username | |
| st.session_state.role = user["role"] | |
| st.sidebar.success(f"Signed in as {username} ({user['role']})") | |
| else: | |
| st.sidebar.error("Invalid credentials") | |
| if not st.session_state.auth_ok: | |
| st.stop() | |
| # ========== MAIN ========== | |
| st.header("ECL Decision Assistant") | |
| st.write(f"Signed in as **{st.session_state.username}** ({st.session_state.role})") | |
| # Upload CSV | |
| uploaded = st.file_uploader("Upload loan CSV (must contain loan_intent, credit_score, loan_amnt, loan_status)", type=["csv"]) | |
| if uploaded: | |
| df = pd.read_csv(uploaded) | |
| st.write("Sample rows:") | |
| st.dataframe(df.head(), width='stretch') | |
| # allow user to choose segmentation column | |
| seg_col = st.selectbox("Segment by column", options=[c for c in df.columns if df[c].dtype == object] , index=0) | |
| ecl_df = process_loan_data(df, segment_col=seg_col) | |
| st.subheader("Segment-level ECL Summary") | |
| st.dataframe(ecl_df, width='stretch') | |
| # Plots | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("ECL by Segment") | |
| fig, ax = plt.subplots(figsize=(8, 3)) | |
| ax.bar(ecl_df["segment"], ecl_df["ECL"]) | |
| ax.set_xlabel("Segment"); ax.set_ylabel("ECL"); plt.xticks(rotation=45) | |
| st.pyplot(fig) | |
| with col2: | |
| st.subheader("PD by Segment") | |
| fig2, ax2 = plt.subplots(figsize=(8, 3)) | |
| ax2.bar(ecl_df["segment"], ecl_df["PD"], color="gray") | |
| ax2.set_xlabel("Segment"); ax2.set_ylabel("PD"); plt.xticks(rotation=45) | |
| st.pyplot(fig2) | |
| # Select single segment for Gemini | |
| st.subheader("Analyze one segment (single API call)") | |
| selected = st.selectbox("Choose a segment to analyze", ecl_df["segment"].tolist()) | |
| row = ecl_df[ecl_df["segment"] == selected].iloc[0] | |
| st.write(f"PD: {row.PD:.3f} | LGD: {row.LGD:.3f} | EAD: {row.EAD:,.0f} | ECL: {row.ECL:,.0f}") | |
| # Optionally show top segments only to reduce API usage | |
| max_n = len(ecl_df) | |
| default_n = min(5, max_n) | |
| top_n = st.number_input("Show top N segments by ECL (for reference)", min_value=1, max_value=max_n, value=default_n) | |
| st.write(ecl_df.sort_values("ECL", ascending=False).head(top_n)) | |
| if st.button("Request Gemini decision for selected segment"): | |
| with st.spinner("Querying Gemini (single call)..."): | |
| decision = get_gemini_decision_single(row["segment"], row["PD"], row["LGD"], row["EAD"], row["ECL"]) | |
| # save | |
| rec_id = save_report_to_db(row, decision, st.session_state.username) | |
| st.success("Decision recorded") | |
| st.json({"record_id": rec_id, "segment": row["segment"], "decision": decision}) | |
| # Historical reports section | |
| st.subheader("Past Reports") | |
| reports_df = load_reports_from_db(st.session_state.username, st.session_state.role) | |
| if not reports_df.empty: | |
| st.dataframe(reports_df, width='stretch') | |
| # allow filtering by action | |
| action_filter = st.selectbox("Filter by action (All / increase_interest / reduce_disbursement / maintain)", ["All", "increase_interest", "reduce_disbursement", "maintain"]) | |
| if action_filter != "All": | |
| st.dataframe(reports_df[reports_df["action"] == action_filter], width='stretch') | |
| if st.button("Download reports CSV"): | |
| st.download_button("Download", reports_df.to_csv(index=False).encode("utf-8"), file_name="reports.csv", mime="text/csv") | |
| else: | |
| st.info("No reports recorded yet (use 'Request Gemini decision' to create one).") |