import os import re import textwrap import traceback import pandas as pd import gradio as gr from langchain_openai import ChatOpenAI, OpenAIEmbeddings from langchain_community.vectorstores import FAISS from langchain_core.documents import Document from langchain_core.prompts import PromptTemplate # ========================= # Files expected in repo root # ========================= BASE_DIR = os.path.dirname(__file__) CSV_FILE = os.path.join(BASE_DIR, "Customer records.csv") RISK_PDF = os.path.join(BASE_DIR, "Risk_Policy.pdf") INT_PDF = os.path.join(BASE_DIR, "Interest_Rate_Policy.pdf") # ========================= # Helpers: formatting # ========================= def one_sentence_per_line(text: str, width: int = 110) -> str: if text is None: return "" def _wrap_line(line: str): prefix_match = re.match(r"^(\s*(?:[-*]|\d+\.|\u2022)\s+)(.*)$", line) if prefix_match: prefix, body = prefix_match.group(1), prefix_match.group(2) wrapped = textwrap.wrap(body, width=max(20, width - len(prefix))) or [""] return [prefix + wrapped[0]] + [(" " * len(prefix)) + w for w in wrapped[1:]] return textwrap.wrap(line, width=width) or [""] out_lines = [] for raw_line in str(text).splitlines(): line = raw_line.rstrip() if not line: out_lines.append("") continue parts = re.split(r"(?<=[.!?])\s+", line) for part in parts: part = part.strip() if part: out_lines.extend(_wrap_line(part)) return "\n".join(out_lines) def enforce_third_person(text: str, customer_name: str) -> str: if not text: return "" name = customer_name or "The customer" text = re.sub(r"\bYou're\b", f"{name} is", text) text = re.sub(r"\byou're\b", f"{name} is", text) text = re.sub(r"\bYou\b", name, text) text = re.sub(r"\byou\b", name, text) text = re.sub(r"\bYour\b", f"{name}'s", text) text = re.sub(r"\byour\b", f"{name}'s", text) return text def format_customer_profile(profile: dict) -> str: if not profile: return "" d = dict(profile) preferred = ["ID", "Name", "Email", "Credit_Score", "Nationality", "Account_Status", "PR_Status"] nat = str(d.get("Nationality", "")).strip().lower() if nat == "singaporean": d.pop("PR_Status", None) lines = [] for k in preferred: if k in d: lines.append(f"{k}: {d.get(k)}") for k in sorted(d.keys()): if k not in preferred: lines.append(f"{k}: {d.get(k)}") return "\n".join(lines) # ========================= # Load CSV + build mock systems # ========================= def load_customer_csv(csv_path: str) -> pd.DataFrame: df_all = pd.read_csv(csv_path) df_all.columns = [c.strip() for c in df_all.columns] df_all["ID"] = df_all["ID"].astype(str).str.strip() df_all["Credit Score"] = pd.to_numeric(df_all["Credit Score"], errors="coerce").astype("Int64") df_all["Account Status"] = df_all["Account Status"].astype(str).str.strip().str.lower() df_all["Nationality"] = df_all["Nationality"].astype(str).str.strip() def _parse_pr(x): if pd.isna(x): return pd.NA s = str(x).strip().lower() if s in ("true", "t", "yes", "y", "1"): return True if s in ("false", "f", "no", "n", "0"): return False return pd.NA if "PR Status" in df_all.columns: df_all["PR_Status"] = df_all["PR Status"].apply(_parse_pr) else: df_all["PR_Status"] = pd.NA return df_all def build_mock_systems(df_all: pd.DataFrame): df_credit = df_all[["ID", "Name", "Email", "Credit Score"]].copy() df_credit = df_credit.rename(columns={"Credit Score": "Credit_Score"}) df_account = df_all[["ID", "Name", "Nationality", "Account Status"]].copy() df_account = df_account.rename(columns={"Account Status": "Account_Status"}) df_pr = df_all.dropna(subset=["PR_Status"]).copy() df_gov = df_pr[["ID", "Name", "PR_Status"]].copy() return df_credit, df_account, df_gov def get_customer_profile(customer_id: str): customer_id = str(customer_id).strip() credit_rec = DF_CREDIT[DF_CREDIT["ID"].astype(str) == customer_id] if credit_rec.empty: return None name = credit_rec.iloc[0]["Name"] email = credit_rec.iloc[0]["Email"] credit_score = int(credit_rec.iloc[0]["Credit_Score"]) acct_rec = DF_ACCOUNT[DF_ACCOUNT["ID"].astype(str) == customer_id] nationality = acct_rec.iloc[0]["Nationality"] if not acct_rec.empty else None account_status = acct_rec.iloc[0]["Account_Status"] if not acct_rec.empty else None pr_status = None if nationality and str(nationality).strip().lower() == "non-singaporean": gov_rec = DF_GOV[DF_GOV["ID"].astype(str) == customer_id] pr_status = bool(gov_rec.iloc[0]["PR_Status"]) if not gov_rec.empty else None return { "ID": customer_id, "Name": name, "Email": email, "Credit_Score": credit_score, "Nationality": nationality, "Account_Status": account_status, "PR_Status": pr_status, } # ========================= # PDF ingest + parse policies # ========================= def extract_pdf_text(pdf_path: str) -> str: from pypdf import PdfReader reader = PdfReader(pdf_path) pages = [] for p in reader.pages: pages.append(p.extract_text() or "") return "\n".join(pages) def parse_policies(risk_text: str, interest_text: str): # Interest rates rate_matches = re.findall( r"\b(Low|Medium|High)\b\s+([0-9]+\.[0-9]+)\s*%?", interest_text, flags=re.IGNORECASE ) interest_rates = {k.capitalize(): float(v) for k, v in rate_matches} # Risk table rows risk_rows = re.findall( r"(\d{3})\s*(?:-|–|—)?\s*(\d{3})\s+(Delinquent|Closed|Good-standing)\s+(High|Medium|Low)", risk_text, flags=re.IGNORECASE, ) risk_mapping = {} for lo, hi, status, risk in risk_rows: band = (int(lo), int(hi)) risk_mapping[(band, status.strip().lower())] = risk.capitalize() return risk_mapping, interest_rates def _score_band(score: int): if 300 <= score <= 674: return (300, 674) if 675 <= score <= 749: return (675, 749) if 750 <= score <= 850: return (750, 850) if score < 300: return (300, 674) return (750, 850) def determine_overall_risk(score: int, account_status: str) -> str: band = _score_band(score) status = str(account_status).strip().lower() return RISK_MAPPING.get((band, status), "High") def determine_interest_rate(overall_risk: str) -> float: return float(INTEREST_RATES[overall_risk]) def is_non_singaporean_no_pr(customer_id: str) -> bool: cid = str(customer_id).strip() nat_row = DF_ACCOUNT[DF_ACCOUNT["ID"].astype(str).str.strip() == cid] nationality = nat_row.iloc[0]["Nationality"] if not nat_row.empty else None pr_row = DF_GOV[DF_GOV["ID"].astype(str).str.strip() == cid] pr_status = bool(pr_row.iloc[0]["PR_Status"]) if not pr_row.empty else False return (str(nationality).strip().lower() != "singaporean") and (pr_status is False) def apply_mandatory_exception_to_report(report_text: str, customer_id: str) -> str: text = "" if report_text is None else str(report_text) if not is_non_singaporean_no_pr(customer_id): return text text = re.sub( r"(Loan\s+Approval\s+Decision:\s*\*\*?)\s*Approve\b", r"\1 Reject", text, flags=re.IGNORECASE ) text = re.sub( r"(Loan\s+Approval:\s*\*\*?)\s*Approve\b", r"\1 Reject", text, flags=re.IGNORECASE ) if not re.search(r"not\s+recommend|reject", text, flags=re.IGNORECASE): text += "\n\nConclusion: Not recommend although risk is low, because Non-Singaporean and PR status is false." elif not re.search(r"Non[-\s]?Singaporean.*PR", text, flags=re.IGNORECASE | re.DOTALL): text += "\n\nConclusion: Not recommend although risk is low, because Non-Singaporean and PR status is false." return text # ========================= # Unstructured resolver (Colab-style) # ========================= def resolve_customer_id(unstructured_text: str): s = (unstructured_text or "").strip() if not s: return None, "❌ Please enter Applicant Name or ID." # 1) Extract an ID from any sentence m = re.search(r"\b(\d{3,})\b", s) if m: cid = m.group(1) prof = get_customer_profile(cid) if prof: return cid, f"✅ Found ID {cid}: {prof['Name']}" return None, f"❌ No such customer ID: {cid}" # 2) Otherwise treat as name search (contains) results = DF_CREDIT[DF_CREDIT["Name"].astype(str).str.contains(s, case=False, na=False)] if results.empty: return None, f"❌ No such customer: '{s}'" if len(results) == 1: cid = str(results.iloc[0]["ID"]) nm = str(results.iloc[0]["Name"]) return cid, f"✅ Found Name '{nm}' -> ID {cid}" # Multiple matches: no dropdown, just tell user to type ID opts = ", ".join([f"{r['Name']} (ID {r['ID']})" for _, r in results.iterrows()]) return None, f"⚠️ Multiple customers match '{s}'. Please enter ID. Matches: {opts}" # ========================= # Prompts / Chains # ========================= QA_PROMPT = PromptTemplate( input_variables=["customer_data", "policy_rules", "question"], template=""" You are a helpful banking assistant. Answer the user's question based strictly on the provided Customer Data and Policy Rules. CUSTOMER DATA: {customer_data} POLICY RULES: {policy_rules} USER QUESTION: {question} ANSWER: """ ) ADVICE_PROMPT = PromptTemplate( input_variables=["customer_data", "policy_rules", "question"], template="""You are a helpful loan officer assistant. Write in THIRD PERSON about the customer. Always use the customer's Name and possessive. Never address the reader as 'you' or 'your'. You must provide ADVICE/RECOMMENDATION (not just restating risk and rate). Use ONLY the provided Customer Data and Policy Rules. REQUIREMENTS: - Provide 3-5 actionable advice points (short sentences). - Include a clear final recommendation: APPROVE or NOT RECOMMEND / REJECT. - If customer is Non-Singaporean and PR_Status is False, you MUST recommend NOT RECOMMEND / REJECT regardless of risk level. - Keep it concise. CUSTOMER DATA: {customer_data} POLICY RULES: {policy_rules} USER QUESTION: {question} ANSWER: """ ) REPORT_PROMPT = PromptTemplate( input_variables=["customer_data", "policy_rules"], template=""" You are a senior loan officer. Generate a comprehensive loan assessment report based on the provided customer data and banking policies. Analyze the customer's profile, determine overall risk, calculate interest rate, and provide a clear recommendation. Follow any mandatory exceptions. CUSTOMER DATA: {customer_data} POLICY RULES: {policy_rules} REPORT: """ ) # ========================= # Global init (Spaces safe) # ========================= RETRIEVER = None POLICY_FULL = "" LLM = None INIT_ERROR = None try: # Check files missing = [p for p in [CSV_FILE, RISK_PDF, INT_PDF] if not os.path.exists(p)] if missing: raise RuntimeError("Missing required files in repo root:\n" + "\n".join(missing)) # Clean key (fixes illegal header newline) api_key = os.getenv("OPENAI_API_KEY", "") api_key = api_key.strip() os.environ["OPENAI_API_KEY"] = api_key if not api_key: raise RuntimeError("Missing OPENAI_API_KEY. Set it in Space Secrets.") # Load CSV DF_ALL = load_customer_csv(CSV_FILE) DF_CREDIT, DF_ACCOUNT, DF_GOV = build_mock_systems(DF_ALL) # Load PDFs RISK_TEXT = extract_pdf_text(RISK_PDF) INT_TEXT = extract_pdf_text(INT_PDF) POLICY_FULL = RISK_TEXT + "\n\n" + INT_TEXT # Parse policies RISK_MAPPING, INTEREST_RATES = parse_policies(RISK_TEXT, INT_TEXT) # LLM model_name = os.getenv("OPENAI_MODEL", "gpt-4o") LLM = ChatOpenAI(model=model_name, temperature=0, openai_api_key=api_key) # Chains (LCEL) QA_CHAIN = QA_PROMPT | LLM ADVICE_CHAIN = ADVICE_PROMPT | LLM REPORT_CHAIN = REPORT_PROMPT | LLM except Exception as e: INIT_ERROR = str(e) def build_retriever_if_needed(): global RETRIEVER if RETRIEVER is not None: return RETRIEVER api_key = os.getenv("OPENAI_API_KEY", "").strip() if not api_key: return None docs = [ Document(page_content=RISK_TEXT, metadata={"source": "Risk_Policy.pdf"}), Document(page_content=INT_TEXT, metadata={"source": "Interest_Rate_Policy.pdf"}), ] embeddings = OpenAIEmbeddings(openai_api_key=api_key) db = FAISS.from_documents(docs, embeddings) RETRIEVER = db.as_retriever() return RETRIEVER def get_policy_context(use_rag: bool) -> str: if not use_rag: return POLICY_FULL try: retriever = build_retriever_if_needed() if retriever is None: return POLICY_FULL + "\n\n[Note] RAG unavailable (missing API key). Using full policy text." docs = retriever.invoke("risk level interest rate PR status") ctx = "\n\n".join([d.page_content for d in docs]).strip() return ctx if ctx else POLICY_FULL except Exception as e: return POLICY_FULL + f"\n\n[Note] RAG failed, using full policy text: {e}" # ========================= # Main Run (Colab-style) # ========================= def run_action(user_input: str, action: str, use_rag: bool): if INIT_ERROR: return f"❌ Initialization error:\n\n{INIT_ERROR}" try: customer_id, msg = resolve_customer_id(user_input) if not customer_id: return msg profile = get_customer_profile(customer_id) if not profile: return f"❌ No such customer ID: {customer_id}" profile_text = format_customer_profile(profile) policy_context = get_policy_context(use_rag) # Deterministic summary (optional but helps demo) overall_risk = determine_overall_risk(profile["Credit_Score"], profile["Account_Status"]) rate = determine_interest_rate(overall_risk) must_reject = is_non_singaporean_no_pr(customer_id) det = [ msg, "", "Deterministic (Policy-based):", f"- Overall risk: {overall_risk}", f"- Interest rate: {rate:.3f}%", ] if must_reject: det.append("- Mandatory exception: NOT RECOMMEND / REJECT (Non-Singaporean without PR).") det_text = "\n".join(det) if action == "1) Check Risk & Interest": question = f"What are the risk level and applicable interest rate for the customer {customer_id}?" resp = QA_CHAIN.invoke({ "customer_data": profile_text, "policy_rules": policy_context, "question": question }) return det_text + "\n\nAI Output:\n" + one_sentence_per_line(resp.content) if action == "2) Advice / Recommendation": question = f"What interest rate advice can be recommended for customer with Id {customer_id}?" resp = ADVICE_CHAIN.invoke({ "customer_data": profile_text, "policy_rules": policy_context, "question": question }) return det_text + "\n\nAI Output:\n" + one_sentence_per_line(enforce_third_person(resp.content, profile.get("Name", "Customer"))) # 3) FULL report customer_data_with_exception = profile_text + ( "\n\nMANDATORY EXCEPTION (must follow): Non-Singaporean with PR_Status = False => NOT RECOMMENDED / REJECTED." if must_reject else "" ) full_report = REPORT_CHAIN.invoke({ "customer_data": customer_data_with_exception, "policy_rules": policy_context }) final_text = apply_mandatory_exception_to_report(full_report.content, customer_id) return det_text + "\n\nFull Report:\n" + one_sentence_per_line(final_text) except Exception: return "❌ Run failed:\n\n" + traceback.format_exc() # ========================= # Gradio UI (NO dropdown) # ========================= with gr.Blocks(title="Bank Loan Officer System") as demo: gr.Markdown("# 🏦 Bank Loan Officer System (Unstructured Input Search)") gr.Markdown("Type Applicant **Name / ID / sentence**. The system resolves and responds like your Colab notebook.") if INIT_ERROR: gr.Markdown(f"## ❌ Initialization error\n\n```\n{INIT_ERROR}\n```") user_input = gr.Textbox( label="Applicant Name or ID (unstructured)", placeholder="e.g. 3333 OR Hilda OR 'please check loan for 3333'" ) action = gr.Radio( label="Action", choices=[ "1) Check Risk & Interest", "2) Advice / Recommendation", "3) FULL Formal Loan Report" ], value="1) Check Risk & Interest" ) use_rag = gr.Checkbox( label="Use RAG (FAISS embeddings). If it fails, auto fallback.", value=False ) run_btn = gr.Button("🚀 Run") output = gr.Textbox(label="Output", lines=28) run_btn.click(fn=run_action, inputs=[user_input, action, use_rag], outputs=[output]) user_input.submit(fn=run_action, inputs=[user_input, action, use_rag], outputs=[output]) # ========================= # HF Spaces launch (the add-on at bottom) # ========================= PORT = int(os.environ.get("PORT", 7860)) demo.queue().launch( server_name="0.0.0.0", server_port=PORT, ssr_mode=False, prevent_thread_lock=False )