Rogerleong's picture
Update app.py
650810f verified
import os
import re
import textwrap
import traceback
import pandas as pd
import gradio as gr
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document
from langchain_core.prompts import PromptTemplate
# =========================
# Files expected in repo root
# =========================
BASE_DIR = os.path.dirname(__file__)
CSV_FILE = os.path.join(BASE_DIR, "Customer records.csv")
RISK_PDF = os.path.join(BASE_DIR, "Risk_Policy.pdf")
INT_PDF = os.path.join(BASE_DIR, "Interest_Rate_Policy.pdf")
# =========================
# Helpers: formatting
# =========================
def one_sentence_per_line(text: str, width: int = 110) -> str:
if text is None:
return ""
def _wrap_line(line: str):
prefix_match = re.match(r"^(\s*(?:[-*]|\d+\.|\u2022)\s+)(.*)$", line)
if prefix_match:
prefix, body = prefix_match.group(1), prefix_match.group(2)
wrapped = textwrap.wrap(body, width=max(20, width - len(prefix))) or [""]
return [prefix + wrapped[0]] + [(" " * len(prefix)) + w for w in wrapped[1:]]
return textwrap.wrap(line, width=width) or [""]
out_lines = []
for raw_line in str(text).splitlines():
line = raw_line.rstrip()
if not line:
out_lines.append("")
continue
parts = re.split(r"(?<=[.!?])\s+", line)
for part in parts:
part = part.strip()
if part:
out_lines.extend(_wrap_line(part))
return "\n".join(out_lines)
def enforce_third_person(text: str, customer_name: str) -> str:
if not text:
return ""
name = customer_name or "The customer"
text = re.sub(r"\bYou're\b", f"{name} is", text)
text = re.sub(r"\byou're\b", f"{name} is", text)
text = re.sub(r"\bYou\b", name, text)
text = re.sub(r"\byou\b", name, text)
text = re.sub(r"\bYour\b", f"{name}'s", text)
text = re.sub(r"\byour\b", f"{name}'s", text)
return text
def format_customer_profile(profile: dict) -> str:
if not profile:
return ""
d = dict(profile)
preferred = ["ID", "Name", "Email", "Credit_Score", "Nationality", "Account_Status", "PR_Status"]
nat = str(d.get("Nationality", "")).strip().lower()
if nat == "singaporean":
d.pop("PR_Status", None)
lines = []
for k in preferred:
if k in d:
lines.append(f"{k}: {d.get(k)}")
for k in sorted(d.keys()):
if k not in preferred:
lines.append(f"{k}: {d.get(k)}")
return "\n".join(lines)
# =========================
# Load CSV + build mock systems
# =========================
def load_customer_csv(csv_path: str) -> pd.DataFrame:
df_all = pd.read_csv(csv_path)
df_all.columns = [c.strip() for c in df_all.columns]
df_all["ID"] = df_all["ID"].astype(str).str.strip()
df_all["Credit Score"] = pd.to_numeric(df_all["Credit Score"], errors="coerce").astype("Int64")
df_all["Account Status"] = df_all["Account Status"].astype(str).str.strip().str.lower()
df_all["Nationality"] = df_all["Nationality"].astype(str).str.strip()
def _parse_pr(x):
if pd.isna(x):
return pd.NA
s = str(x).strip().lower()
if s in ("true", "t", "yes", "y", "1"):
return True
if s in ("false", "f", "no", "n", "0"):
return False
return pd.NA
if "PR Status" in df_all.columns:
df_all["PR_Status"] = df_all["PR Status"].apply(_parse_pr)
else:
df_all["PR_Status"] = pd.NA
return df_all
def build_mock_systems(df_all: pd.DataFrame):
df_credit = df_all[["ID", "Name", "Email", "Credit Score"]].copy()
df_credit = df_credit.rename(columns={"Credit Score": "Credit_Score"})
df_account = df_all[["ID", "Name", "Nationality", "Account Status"]].copy()
df_account = df_account.rename(columns={"Account Status": "Account_Status"})
df_pr = df_all.dropna(subset=["PR_Status"]).copy()
df_gov = df_pr[["ID", "Name", "PR_Status"]].copy()
return df_credit, df_account, df_gov
def get_customer_profile(customer_id: str):
customer_id = str(customer_id).strip()
credit_rec = DF_CREDIT[DF_CREDIT["ID"].astype(str) == customer_id]
if credit_rec.empty:
return None
name = credit_rec.iloc[0]["Name"]
email = credit_rec.iloc[0]["Email"]
credit_score = int(credit_rec.iloc[0]["Credit_Score"])
acct_rec = DF_ACCOUNT[DF_ACCOUNT["ID"].astype(str) == customer_id]
nationality = acct_rec.iloc[0]["Nationality"] if not acct_rec.empty else None
account_status = acct_rec.iloc[0]["Account_Status"] if not acct_rec.empty else None
pr_status = None
if nationality and str(nationality).strip().lower() == "non-singaporean":
gov_rec = DF_GOV[DF_GOV["ID"].astype(str) == customer_id]
pr_status = bool(gov_rec.iloc[0]["PR_Status"]) if not gov_rec.empty else None
return {
"ID": customer_id,
"Name": name,
"Email": email,
"Credit_Score": credit_score,
"Nationality": nationality,
"Account_Status": account_status,
"PR_Status": pr_status,
}
# =========================
# PDF ingest + parse policies
# =========================
def extract_pdf_text(pdf_path: str) -> str:
from pypdf import PdfReader
reader = PdfReader(pdf_path)
pages = []
for p in reader.pages:
pages.append(p.extract_text() or "")
return "\n".join(pages)
def parse_policies(risk_text: str, interest_text: str):
# Interest rates
rate_matches = re.findall(
r"\b(Low|Medium|High)\b\s+([0-9]+\.[0-9]+)\s*%?",
interest_text,
flags=re.IGNORECASE
)
interest_rates = {k.capitalize(): float(v) for k, v in rate_matches}
# Risk table rows
risk_rows = re.findall(
r"(\d{3})\s*(?:-|–|β€”)?\s*(\d{3})\s+(Delinquent|Closed|Good-standing)\s+(High|Medium|Low)",
risk_text,
flags=re.IGNORECASE,
)
risk_mapping = {}
for lo, hi, status, risk in risk_rows:
band = (int(lo), int(hi))
risk_mapping[(band, status.strip().lower())] = risk.capitalize()
return risk_mapping, interest_rates
def _score_band(score: int):
if 300 <= score <= 674: return (300, 674)
if 675 <= score <= 749: return (675, 749)
if 750 <= score <= 850: return (750, 850)
if score < 300: return (300, 674)
return (750, 850)
def determine_overall_risk(score: int, account_status: str) -> str:
band = _score_band(score)
status = str(account_status).strip().lower()
return RISK_MAPPING.get((band, status), "High")
def determine_interest_rate(overall_risk: str) -> float:
return float(INTEREST_RATES[overall_risk])
def is_non_singaporean_no_pr(customer_id: str) -> bool:
cid = str(customer_id).strip()
nat_row = DF_ACCOUNT[DF_ACCOUNT["ID"].astype(str).str.strip() == cid]
nationality = nat_row.iloc[0]["Nationality"] if not nat_row.empty else None
pr_row = DF_GOV[DF_GOV["ID"].astype(str).str.strip() == cid]
pr_status = bool(pr_row.iloc[0]["PR_Status"]) if not pr_row.empty else False
return (str(nationality).strip().lower() != "singaporean") and (pr_status is False)
def apply_mandatory_exception_to_report(report_text: str, customer_id: str) -> str:
text = "" if report_text is None else str(report_text)
if not is_non_singaporean_no_pr(customer_id):
return text
text = re.sub(
r"(Loan\s+Approval\s+Decision:\s*\*\*?)\s*Approve\b",
r"\1 Reject",
text,
flags=re.IGNORECASE
)
text = re.sub(
r"(Loan\s+Approval:\s*\*\*?)\s*Approve\b",
r"\1 Reject",
text,
flags=re.IGNORECASE
)
if not re.search(r"not\s+recommend|reject", text, flags=re.IGNORECASE):
text += "\n\nConclusion: Not recommend although risk is low, because Non-Singaporean and PR status is false."
elif not re.search(r"Non[-\s]?Singaporean.*PR", text, flags=re.IGNORECASE | re.DOTALL):
text += "\n\nConclusion: Not recommend although risk is low, because Non-Singaporean and PR status is false."
return text
# =========================
# Unstructured resolver (Colab-style)
# =========================
def resolve_customer_id(unstructured_text: str):
s = (unstructured_text or "").strip()
if not s:
return None, "❌ Please enter Applicant Name or ID."
# 1) Extract an ID from any sentence
m = re.search(r"\b(\d{3,})\b", s)
if m:
cid = m.group(1)
prof = get_customer_profile(cid)
if prof:
return cid, f"βœ… Found ID {cid}: {prof['Name']}"
return None, f"❌ No such customer ID: {cid}"
# 2) Otherwise treat as name search (contains)
results = DF_CREDIT[DF_CREDIT["Name"].astype(str).str.contains(s, case=False, na=False)]
if results.empty:
return None, f"❌ No such customer: '{s}'"
if len(results) == 1:
cid = str(results.iloc[0]["ID"])
nm = str(results.iloc[0]["Name"])
return cid, f"βœ… Found Name '{nm}' -> ID {cid}"
# Multiple matches: no dropdown, just tell user to type ID
opts = ", ".join([f"{r['Name']} (ID {r['ID']})" for _, r in results.iterrows()])
return None, f"⚠️ Multiple customers match '{s}'. Please enter ID. Matches: {opts}"
# =========================
# Prompts / Chains
# =========================
QA_PROMPT = PromptTemplate(
input_variables=["customer_data", "policy_rules", "question"],
template="""
You are a helpful banking assistant. Answer the user's question based strictly on the provided Customer Data and Policy Rules.
CUSTOMER DATA:
{customer_data}
POLICY RULES:
{policy_rules}
USER QUESTION:
{question}
ANSWER:
"""
)
ADVICE_PROMPT = PromptTemplate(
input_variables=["customer_data", "policy_rules", "question"],
template="""You are a helpful loan officer assistant.
Write in THIRD PERSON about the customer. Always use the customer's Name and possessive.
Never address the reader as 'you' or 'your'.
You must provide ADVICE/RECOMMENDATION (not just restating risk and rate).
Use ONLY the provided Customer Data and Policy Rules.
REQUIREMENTS:
- Provide 3-5 actionable advice points (short sentences).
- Include a clear final recommendation: APPROVE or NOT RECOMMEND / REJECT.
- If customer is Non-Singaporean and PR_Status is False, you MUST recommend NOT RECOMMEND / REJECT regardless of risk level.
- Keep it concise.
CUSTOMER DATA:
{customer_data}
POLICY RULES:
{policy_rules}
USER QUESTION:
{question}
ANSWER:
"""
)
REPORT_PROMPT = PromptTemplate(
input_variables=["customer_data", "policy_rules"],
template="""
You are a senior loan officer. Generate a comprehensive loan assessment report based on the provided customer data and banking policies.
Analyze the customer's profile, determine overall risk, calculate interest rate, and provide a clear recommendation.
Follow any mandatory exceptions.
CUSTOMER DATA:
{customer_data}
POLICY RULES:
{policy_rules}
REPORT:
"""
)
# =========================
# Global init (Spaces safe)
# =========================
RETRIEVER = None
POLICY_FULL = ""
LLM = None
INIT_ERROR = None
try:
# Check files
missing = [p for p in [CSV_FILE, RISK_PDF, INT_PDF] if not os.path.exists(p)]
if missing:
raise RuntimeError("Missing required files in repo root:\n" + "\n".join(missing))
# Clean key (fixes illegal header newline)
api_key = os.getenv("OPENAI_API_KEY", "")
api_key = api_key.strip()
os.environ["OPENAI_API_KEY"] = api_key
if not api_key:
raise RuntimeError("Missing OPENAI_API_KEY. Set it in Space Secrets.")
# Load CSV
DF_ALL = load_customer_csv(CSV_FILE)
DF_CREDIT, DF_ACCOUNT, DF_GOV = build_mock_systems(DF_ALL)
# Load PDFs
RISK_TEXT = extract_pdf_text(RISK_PDF)
INT_TEXT = extract_pdf_text(INT_PDF)
POLICY_FULL = RISK_TEXT + "\n\n" + INT_TEXT
# Parse policies
RISK_MAPPING, INTEREST_RATES = parse_policies(RISK_TEXT, INT_TEXT)
# LLM
model_name = os.getenv("OPENAI_MODEL", "gpt-4o")
LLM = ChatOpenAI(model=model_name, temperature=0, openai_api_key=api_key)
# Chains (LCEL)
QA_CHAIN = QA_PROMPT | LLM
ADVICE_CHAIN = ADVICE_PROMPT | LLM
REPORT_CHAIN = REPORT_PROMPT | LLM
except Exception as e:
INIT_ERROR = str(e)
def build_retriever_if_needed():
global RETRIEVER
if RETRIEVER is not None:
return RETRIEVER
api_key = os.getenv("OPENAI_API_KEY", "").strip()
if not api_key:
return None
docs = [
Document(page_content=RISK_TEXT, metadata={"source": "Risk_Policy.pdf"}),
Document(page_content=INT_TEXT, metadata={"source": "Interest_Rate_Policy.pdf"}),
]
embeddings = OpenAIEmbeddings(openai_api_key=api_key)
db = FAISS.from_documents(docs, embeddings)
RETRIEVER = db.as_retriever()
return RETRIEVER
def get_policy_context(use_rag: bool) -> str:
if not use_rag:
return POLICY_FULL
try:
retriever = build_retriever_if_needed()
if retriever is None:
return POLICY_FULL + "\n\n[Note] RAG unavailable (missing API key). Using full policy text."
docs = retriever.invoke("risk level interest rate PR status")
ctx = "\n\n".join([d.page_content for d in docs]).strip()
return ctx if ctx else POLICY_FULL
except Exception as e:
return POLICY_FULL + f"\n\n[Note] RAG failed, using full policy text: {e}"
# =========================
# Main Run (Colab-style)
# =========================
def run_action(user_input: str, action: str, use_rag: bool):
if INIT_ERROR:
return f"❌ Initialization error:\n\n{INIT_ERROR}"
try:
customer_id, msg = resolve_customer_id(user_input)
if not customer_id:
return msg
profile = get_customer_profile(customer_id)
if not profile:
return f"❌ No such customer ID: {customer_id}"
profile_text = format_customer_profile(profile)
policy_context = get_policy_context(use_rag)
# Deterministic summary (optional but helps demo)
overall_risk = determine_overall_risk(profile["Credit_Score"], profile["Account_Status"])
rate = determine_interest_rate(overall_risk)
must_reject = is_non_singaporean_no_pr(customer_id)
det = [
msg,
"",
"Deterministic (Policy-based):",
f"- Overall risk: {overall_risk}",
f"- Interest rate: {rate:.3f}%",
]
if must_reject:
det.append("- Mandatory exception: NOT RECOMMEND / REJECT (Non-Singaporean without PR).")
det_text = "\n".join(det)
if action == "1) Check Risk & Interest":
question = f"What are the risk level and applicable interest rate for the customer {customer_id}?"
resp = QA_CHAIN.invoke({
"customer_data": profile_text,
"policy_rules": policy_context,
"question": question
})
return det_text + "\n\nAI Output:\n" + one_sentence_per_line(resp.content)
if action == "2) Advice / Recommendation":
question = f"What interest rate advice can be recommended for customer with Id {customer_id}?"
resp = ADVICE_CHAIN.invoke({
"customer_data": profile_text,
"policy_rules": policy_context,
"question": question
})
return det_text + "\n\nAI Output:\n" + one_sentence_per_line(enforce_third_person(resp.content, profile.get("Name", "Customer")))
# 3) FULL report
customer_data_with_exception = profile_text + (
"\n\nMANDATORY EXCEPTION (must follow): Non-Singaporean with PR_Status = False => NOT RECOMMENDED / REJECTED."
if must_reject else ""
)
full_report = REPORT_CHAIN.invoke({
"customer_data": customer_data_with_exception,
"policy_rules": policy_context
})
final_text = apply_mandatory_exception_to_report(full_report.content, customer_id)
return det_text + "\n\nFull Report:\n" + one_sentence_per_line(final_text)
except Exception:
return "❌ Run failed:\n\n" + traceback.format_exc()
# =========================
# Gradio UI (NO dropdown)
# =========================
with gr.Blocks(title="Bank Loan Officer System") as demo:
gr.Markdown("# 🏦 Bank Loan Officer System (Unstructured Input Search)")
gr.Markdown("Type Applicant **Name / ID / sentence**. The system resolves and responds like your Colab notebook.")
if INIT_ERROR:
gr.Markdown(f"## ❌ Initialization error\n\n```\n{INIT_ERROR}\n```")
user_input = gr.Textbox(
label="Applicant Name or ID (unstructured)",
placeholder="e.g. 3333 OR Hilda OR 'please check loan for 3333'"
)
action = gr.Radio(
label="Action",
choices=[
"1) Check Risk & Interest",
"2) Advice / Recommendation",
"3) FULL Formal Loan Report"
],
value="1) Check Risk & Interest"
)
use_rag = gr.Checkbox(
label="Use RAG (FAISS embeddings). If it fails, auto fallback.",
value=False
)
run_btn = gr.Button("πŸš€ Run")
output = gr.Textbox(label="Output", lines=28)
run_btn.click(fn=run_action, inputs=[user_input, action, use_rag], outputs=[output])
user_input.submit(fn=run_action, inputs=[user_input, action, use_rag], outputs=[output])
# =========================
# HF Spaces launch (the add-on at bottom)
# =========================
PORT = int(os.environ.get("PORT", 7860))
demo.queue().launch(
server_name="0.0.0.0",
server_port=PORT,
ssr_mode=False,
prevent_thread_lock=False
)