ECL-Risk-Analyzer / src /streamlit_app.py
AKKI-AFK's picture
Update src/streamlit_app.py
168b158 verified
# app.py
import streamlit as st
import pandas as pd
import matplotlib.pyplot as plt
import google.generativeai as genai
import json, os, re, time
from datetime import datetime
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime
from sqlalchemy.orm import declarative_base, sessionmaker
# ========== CONFIG ==========
st.set_page_config(page_title="ECL Decision Assistant", layout="wide")
GEN_API_KEY = os.getenv("GEMINI_API_KEY")
if GEN_API_KEY:
genai.configure(api_key=GEN_API_KEY)
else:
st.warning("GEMINI_API_KEY not found in env. Set it in HF Space secrets to enable AI decisions.")
# Simple credential store (replace with secure store in production)
USERS = {
"analyst": {"password": "analyst123", "role": "analyst"},
"cro": {"password": "cro123", "role": "cro"},
}
# SQLite DB for persisting reports
DB_FILE = "reports.db"
engine = create_engine(f"sqlite:///{DB_FILE}", connect_args={"check_same_thread": False})
Base = declarative_base()
SessionLocal = sessionmaker(bind=engine)
class Report(Base):
__tablename__ = "reports"
id = Column(Integer, primary_key=True, index=True)
segment = Column(String)
pd = Column(Float)
lgd = Column(Float)
ead = Column(Float)
ecl = Column(Float)
action = Column(String)
rationale = Column(String)
confidence = Column(Float)
generated_by = Column(String)
created_at = Column(DateTime)
Base.metadata.create_all(bind=engine)
# ========== UTILITIES ==========
@st.cache_data
def process_loan_data(df: pd.DataFrame, segment_col: str = "loan_intent"):
"""Compute PD, LGD, EAD, ECL by segment column."""
required = [segment_col, "credit_score", "loan_amnt", "loan_status"]
df = df.dropna(subset=required)
# ensure types
df["loan_status"] = df["loan_status"].astype(int)
df["credit_score"] = df["credit_score"].astype(float)
df["loan_amnt"] = df["loan_amnt"].astype(float)
group = df.groupby(segment_col)
pd_seg = group["loan_status"].mean()
lgd_seg = (1 - group["credit_score"].mean() / 850).clip(lower=0.0)
ead_seg = group["loan_amnt"].sum()
ecl_seg = pd_seg * lgd_seg * ead_seg
ecl_df = pd.concat([pd_seg, lgd_seg, ead_seg, ecl_seg], axis=1)
ecl_df.columns = ["PD", "LGD", "EAD", "ECL"]
ecl_df = ecl_df.reset_index().rename(columns={segment_col: "segment"})
return ecl_df
def sanitize_parse_json(text: str):
"""Extract first JSON object in text and parse it."""
if not text:
raise ValueError("Empty response")
# remove common markdown fences
text = re.sub(r"^```json\s*", "", text, flags=re.IGNORECASE)
text = re.sub(r"^```\s*", "", text)
text = re.sub(r"```$", "", text)
# find JSON block
m = re.search(r"\{.*\}", text, flags=re.DOTALL)
if m:
text = m.group(0)
# attempt load
return json.loads(text)
def get_gemini_decision_single(segment, pd_val, lgd_val, ead_val, ecl_val):
"""Single Gemini call per selected segment. Robust cleaning. Returns dict."""
# If API key missing, return deterministic fallback
if not GEN_API_KEY:
return {"action": "maintain", "rationale": "No API key configured", "confidence": 0.0}
prompt = f"""
You are a financial risk advisor. Return ONLY one valid JSON object with this schema:
{{"action":"increase_interest"|"reduce_disbursement"|"maintain","rationale":"string","confidence":float}}
Segment: {segment}
PD: {pd_val:.3f}
LGD: {lgd_val:.3f}
EAD: {ead_val:,.0f}
ECL: {ecl_val:,.0f}
Rules:
- PD > 0.25 => increase_interest
- 0.20 <= PD <= 0.25 => reduce_disbursement
- PD < 0.15 => maintain
Respond with a single JSON object and nothing else.
"""
# Use model.generate_content with single prompt string (compat for HF)
try:
model = genai.GenerativeModel("gemini-2.5-flash-lite")
resp = model.generate_content(prompt, generation_config={"temperature": 0.05})
raw = resp.text if hasattr(resp, "text") else str(resp)
# parse
data = sanitize_parse_json(raw)
# validate keys
for k in ("action", "rationale", "confidence"):
if k not in data:
raise ValueError(f"Missing key: {k}")
return data
except Exception as e:
# handle rate limits explicitly
msg = str(e)
if "429" in msg or "Resource exhausted" in msg:
return {"action": "maintain", "rationale": "API quota exhausted - retry later", "confidence": 0.0}
# fallback deterministic rule as final fallback
if pd_val > 0.25:
return {"action": "increase_interest", "rationale": "PD > 0.25 (deterministic fallback)", "confidence": 0.8}
if 0.20 <= pd_val <= 0.25:
return {"action": "reduce_disbursement", "rationale": "PD in 0.20-0.25 (deterministic fallback)", "confidence": 0.7}
return {"action": "maintain", "rationale": "Fallback - parse or API error", "confidence": 0.0}
def save_report_to_db(row, decision, username):
s = SessionLocal()
r = Report(
segment=row["segment"],
pd=float(row["PD"]),
lgd=float(row["LGD"]),
ead=float(row["EAD"]),
ecl=float(row["ECL"]),
action=decision.get("action"),
rationale=decision.get("rationale"),
confidence=float(decision.get("confidence", 0.0)),
generated_by=username,
created_at=datetime.utcnow()
)
s.add(r)
s.commit()
s.refresh(r)
s.close()
return r.id
def load_reports_from_db(username, role):
s = SessionLocal()
if role == "cro":
rows = s.query(Report).order_by(Report.created_at.desc()).all()
else:
rows = s.query(Report).filter(Report.generated_by == username).order_by(Report.created_at.desc()).all()
df = pd.DataFrame([{
"id": r.id,
"segment": r.segment,
"pd": r.pd,
"lgd": r.lgd,
"ead": r.ead,
"ecl": r.ecl,
"action": r.action,
"rationale": r.rationale,
"confidence": r.confidence,
"generated_by": r.generated_by,
"created_at": r.created_at
} for r in rows])
s.close()
return df
# ========== UI - AUTH ==========
st.sidebar.title("Login")
username = st.sidebar.text_input("Username")
password = st.sidebar.text_input("Password", type="password")
if "auth_ok" not in st.session_state:
st.session_state.auth_ok = False
if st.sidebar.button("Sign in"):
user = USERS.get(username)
if user and user["password"] == password:
st.session_state.auth_ok = True
st.session_state.username = username
st.session_state.role = user["role"]
st.sidebar.success(f"Signed in as {username} ({user['role']})")
else:
st.sidebar.error("Invalid credentials")
if not st.session_state.auth_ok:
st.stop()
# ========== MAIN ==========
st.header("ECL Decision Assistant")
st.write(f"Signed in as **{st.session_state.username}** ({st.session_state.role})")
# Upload CSV
uploaded = st.file_uploader("Upload loan CSV (must contain loan_intent, credit_score, loan_amnt, loan_status)", type=["csv"])
if uploaded:
df = pd.read_csv(uploaded)
st.write("Sample rows:")
st.dataframe(df.head(), width='stretch')
# allow user to choose segmentation column
seg_col = st.selectbox("Segment by column", options=[c for c in df.columns if df[c].dtype == object] , index=0)
ecl_df = process_loan_data(df, segment_col=seg_col)
st.subheader("Segment-level ECL Summary")
st.dataframe(ecl_df, width='stretch')
# Plots
col1, col2 = st.columns(2)
with col1:
st.subheader("ECL by Segment")
fig, ax = plt.subplots(figsize=(8, 3))
ax.bar(ecl_df["segment"], ecl_df["ECL"])
ax.set_xlabel("Segment"); ax.set_ylabel("ECL"); plt.xticks(rotation=45)
st.pyplot(fig)
with col2:
st.subheader("PD by Segment")
fig2, ax2 = plt.subplots(figsize=(8, 3))
ax2.bar(ecl_df["segment"], ecl_df["PD"], color="gray")
ax2.set_xlabel("Segment"); ax2.set_ylabel("PD"); plt.xticks(rotation=45)
st.pyplot(fig2)
# Select single segment for Gemini
st.subheader("Analyze one segment (single API call)")
selected = st.selectbox("Choose a segment to analyze", ecl_df["segment"].tolist())
row = ecl_df[ecl_df["segment"] == selected].iloc[0]
st.write(f"PD: {row.PD:.3f} | LGD: {row.LGD:.3f} | EAD: {row.EAD:,.0f} | ECL: {row.ECL:,.0f}")
# Optionally show top segments only to reduce API usage
max_n = len(ecl_df)
default_n = min(5, max_n)
top_n = st.number_input("Show top N segments by ECL (for reference)", min_value=1, max_value=max_n, value=default_n)
st.write(ecl_df.sort_values("ECL", ascending=False).head(top_n))
if st.button("Request Gemini decision for selected segment"):
with st.spinner("Querying Gemini (single call)..."):
decision = get_gemini_decision_single(row["segment"], row["PD"], row["LGD"], row["EAD"], row["ECL"])
# save
rec_id = save_report_to_db(row, decision, st.session_state.username)
st.success("Decision recorded")
st.json({"record_id": rec_id, "segment": row["segment"], "decision": decision})
# Historical reports section
st.subheader("Past Reports")
reports_df = load_reports_from_db(st.session_state.username, st.session_state.role)
if not reports_df.empty:
st.dataframe(reports_df, width='stretch')
# allow filtering by action
action_filter = st.selectbox("Filter by action (All / increase_interest / reduce_disbursement / maintain)", ["All", "increase_interest", "reduce_disbursement", "maintain"])
if action_filter != "All":
st.dataframe(reports_df[reports_df["action"] == action_filter], width='stretch')
if st.button("Download reports CSV"):
st.download_button("Download", reports_df.to_csv(index=False).encode("utf-8"), file_name="reports.csv", mime="text/csv")
else:
st.info("No reports recorded yet (use 'Request Gemini decision' to create one).")