Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import textwrap | |
| import traceback | |
| import pandas as pd | |
| import gradio as gr | |
| from langchain_openai import ChatOpenAI, OpenAIEmbeddings | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_core.documents import Document | |
| from langchain_core.prompts import PromptTemplate | |
| # ========================= | |
| # Files expected in repo root | |
| # ========================= | |
| BASE_DIR = os.path.dirname(__file__) | |
| CSV_FILE = os.path.join(BASE_DIR, "Customer records.csv") | |
| RISK_PDF = os.path.join(BASE_DIR, "Risk_Policy.pdf") | |
| INT_PDF = os.path.join(BASE_DIR, "Interest_Rate_Policy.pdf") | |
| # ========================= | |
| # Helpers: formatting | |
| # ========================= | |
| def one_sentence_per_line(text: str, width: int = 110) -> str: | |
| if text is None: | |
| return "" | |
| def _wrap_line(line: str): | |
| prefix_match = re.match(r"^(\s*(?:[-*]|\d+\.|\u2022)\s+)(.*)$", line) | |
| if prefix_match: | |
| prefix, body = prefix_match.group(1), prefix_match.group(2) | |
| wrapped = textwrap.wrap(body, width=max(20, width - len(prefix))) or [""] | |
| return [prefix + wrapped[0]] + [(" " * len(prefix)) + w for w in wrapped[1:]] | |
| return textwrap.wrap(line, width=width) or [""] | |
| out_lines = [] | |
| for raw_line in str(text).splitlines(): | |
| line = raw_line.rstrip() | |
| if not line: | |
| out_lines.append("") | |
| continue | |
| parts = re.split(r"(?<=[.!?])\s+", line) | |
| for part in parts: | |
| part = part.strip() | |
| if part: | |
| out_lines.extend(_wrap_line(part)) | |
| return "\n".join(out_lines) | |
| def enforce_third_person(text: str, customer_name: str) -> str: | |
| if not text: | |
| return "" | |
| name = customer_name or "The customer" | |
| text = re.sub(r"\bYou're\b", f"{name} is", text) | |
| text = re.sub(r"\byou're\b", f"{name} is", text) | |
| text = re.sub(r"\bYou\b", name, text) | |
| text = re.sub(r"\byou\b", name, text) | |
| text = re.sub(r"\bYour\b", f"{name}'s", text) | |
| text = re.sub(r"\byour\b", f"{name}'s", text) | |
| return text | |
| def format_customer_profile(profile: dict) -> str: | |
| if not profile: | |
| return "" | |
| d = dict(profile) | |
| preferred = ["ID", "Name", "Email", "Credit_Score", "Nationality", "Account_Status", "PR_Status"] | |
| nat = str(d.get("Nationality", "")).strip().lower() | |
| if nat == "singaporean": | |
| d.pop("PR_Status", None) | |
| lines = [] | |
| for k in preferred: | |
| if k in d: | |
| lines.append(f"{k}: {d.get(k)}") | |
| for k in sorted(d.keys()): | |
| if k not in preferred: | |
| lines.append(f"{k}: {d.get(k)}") | |
| return "\n".join(lines) | |
| # ========================= | |
| # Load CSV + build mock systems | |
| # ========================= | |
| def load_customer_csv(csv_path: str) -> pd.DataFrame: | |
| df_all = pd.read_csv(csv_path) | |
| df_all.columns = [c.strip() for c in df_all.columns] | |
| df_all["ID"] = df_all["ID"].astype(str).str.strip() | |
| df_all["Credit Score"] = pd.to_numeric(df_all["Credit Score"], errors="coerce").astype("Int64") | |
| df_all["Account Status"] = df_all["Account Status"].astype(str).str.strip().str.lower() | |
| df_all["Nationality"] = df_all["Nationality"].astype(str).str.strip() | |
| def _parse_pr(x): | |
| if pd.isna(x): | |
| return pd.NA | |
| s = str(x).strip().lower() | |
| if s in ("true", "t", "yes", "y", "1"): | |
| return True | |
| if s in ("false", "f", "no", "n", "0"): | |
| return False | |
| return pd.NA | |
| if "PR Status" in df_all.columns: | |
| df_all["PR_Status"] = df_all["PR Status"].apply(_parse_pr) | |
| else: | |
| df_all["PR_Status"] = pd.NA | |
| return df_all | |
| def build_mock_systems(df_all: pd.DataFrame): | |
| df_credit = df_all[["ID", "Name", "Email", "Credit Score"]].copy() | |
| df_credit = df_credit.rename(columns={"Credit Score": "Credit_Score"}) | |
| df_account = df_all[["ID", "Name", "Nationality", "Account Status"]].copy() | |
| df_account = df_account.rename(columns={"Account Status": "Account_Status"}) | |
| df_pr = df_all.dropna(subset=["PR_Status"]).copy() | |
| df_gov = df_pr[["ID", "Name", "PR_Status"]].copy() | |
| return df_credit, df_account, df_gov | |
| def get_customer_profile(customer_id: str): | |
| customer_id = str(customer_id).strip() | |
| credit_rec = DF_CREDIT[DF_CREDIT["ID"].astype(str) == customer_id] | |
| if credit_rec.empty: | |
| return None | |
| name = credit_rec.iloc[0]["Name"] | |
| email = credit_rec.iloc[0]["Email"] | |
| credit_score = int(credit_rec.iloc[0]["Credit_Score"]) | |
| acct_rec = DF_ACCOUNT[DF_ACCOUNT["ID"].astype(str) == customer_id] | |
| nationality = acct_rec.iloc[0]["Nationality"] if not acct_rec.empty else None | |
| account_status = acct_rec.iloc[0]["Account_Status"] if not acct_rec.empty else None | |
| pr_status = None | |
| if nationality and str(nationality).strip().lower() == "non-singaporean": | |
| gov_rec = DF_GOV[DF_GOV["ID"].astype(str) == customer_id] | |
| pr_status = bool(gov_rec.iloc[0]["PR_Status"]) if not gov_rec.empty else None | |
| return { | |
| "ID": customer_id, | |
| "Name": name, | |
| "Email": email, | |
| "Credit_Score": credit_score, | |
| "Nationality": nationality, | |
| "Account_Status": account_status, | |
| "PR_Status": pr_status, | |
| } | |
| # ========================= | |
| # PDF ingest + parse policies | |
| # ========================= | |
| def extract_pdf_text(pdf_path: str) -> str: | |
| from pypdf import PdfReader | |
| reader = PdfReader(pdf_path) | |
| pages = [] | |
| for p in reader.pages: | |
| pages.append(p.extract_text() or "") | |
| return "\n".join(pages) | |
| def parse_policies(risk_text: str, interest_text: str): | |
| # Interest rates | |
| rate_matches = re.findall( | |
| r"\b(Low|Medium|High)\b\s+([0-9]+\.[0-9]+)\s*%?", | |
| interest_text, | |
| flags=re.IGNORECASE | |
| ) | |
| interest_rates = {k.capitalize(): float(v) for k, v in rate_matches} | |
| # Risk table rows | |
| risk_rows = re.findall( | |
| r"(\d{3})\s*(?:-|β|β)?\s*(\d{3})\s+(Delinquent|Closed|Good-standing)\s+(High|Medium|Low)", | |
| risk_text, | |
| flags=re.IGNORECASE, | |
| ) | |
| risk_mapping = {} | |
| for lo, hi, status, risk in risk_rows: | |
| band = (int(lo), int(hi)) | |
| risk_mapping[(band, status.strip().lower())] = risk.capitalize() | |
| return risk_mapping, interest_rates | |
| def _score_band(score: int): | |
| if 300 <= score <= 674: return (300, 674) | |
| if 675 <= score <= 749: return (675, 749) | |
| if 750 <= score <= 850: return (750, 850) | |
| if score < 300: return (300, 674) | |
| return (750, 850) | |
| def determine_overall_risk(score: int, account_status: str) -> str: | |
| band = _score_band(score) | |
| status = str(account_status).strip().lower() | |
| return RISK_MAPPING.get((band, status), "High") | |
| def determine_interest_rate(overall_risk: str) -> float: | |
| return float(INTEREST_RATES[overall_risk]) | |
| def is_non_singaporean_no_pr(customer_id: str) -> bool: | |
| cid = str(customer_id).strip() | |
| nat_row = DF_ACCOUNT[DF_ACCOUNT["ID"].astype(str).str.strip() == cid] | |
| nationality = nat_row.iloc[0]["Nationality"] if not nat_row.empty else None | |
| pr_row = DF_GOV[DF_GOV["ID"].astype(str).str.strip() == cid] | |
| pr_status = bool(pr_row.iloc[0]["PR_Status"]) if not pr_row.empty else False | |
| return (str(nationality).strip().lower() != "singaporean") and (pr_status is False) | |
| def apply_mandatory_exception_to_report(report_text: str, customer_id: str) -> str: | |
| text = "" if report_text is None else str(report_text) | |
| if not is_non_singaporean_no_pr(customer_id): | |
| return text | |
| text = re.sub( | |
| r"(Loan\s+Approval\s+Decision:\s*\*\*?)\s*Approve\b", | |
| r"\1 Reject", | |
| text, | |
| flags=re.IGNORECASE | |
| ) | |
| text = re.sub( | |
| r"(Loan\s+Approval:\s*\*\*?)\s*Approve\b", | |
| r"\1 Reject", | |
| text, | |
| flags=re.IGNORECASE | |
| ) | |
| if not re.search(r"not\s+recommend|reject", text, flags=re.IGNORECASE): | |
| text += "\n\nConclusion: Not recommend although risk is low, because Non-Singaporean and PR status is false." | |
| elif not re.search(r"Non[-\s]?Singaporean.*PR", text, flags=re.IGNORECASE | re.DOTALL): | |
| text += "\n\nConclusion: Not recommend although risk is low, because Non-Singaporean and PR status is false." | |
| return text | |
| # ========================= | |
| # Unstructured resolver (Colab-style) | |
| # ========================= | |
| def resolve_customer_id(unstructured_text: str): | |
| s = (unstructured_text or "").strip() | |
| if not s: | |
| return None, "β Please enter Applicant Name or ID." | |
| # 1) Extract an ID from any sentence | |
| m = re.search(r"\b(\d{3,})\b", s) | |
| if m: | |
| cid = m.group(1) | |
| prof = get_customer_profile(cid) | |
| if prof: | |
| return cid, f"β Found ID {cid}: {prof['Name']}" | |
| return None, f"β No such customer ID: {cid}" | |
| # 2) Otherwise treat as name search (contains) | |
| results = DF_CREDIT[DF_CREDIT["Name"].astype(str).str.contains(s, case=False, na=False)] | |
| if results.empty: | |
| return None, f"β No such customer: '{s}'" | |
| if len(results) == 1: | |
| cid = str(results.iloc[0]["ID"]) | |
| nm = str(results.iloc[0]["Name"]) | |
| return cid, f"β Found Name '{nm}' -> ID {cid}" | |
| # Multiple matches: no dropdown, just tell user to type ID | |
| opts = ", ".join([f"{r['Name']} (ID {r['ID']})" for _, r in results.iterrows()]) | |
| return None, f"β οΈ Multiple customers match '{s}'. Please enter ID. Matches: {opts}" | |
| # ========================= | |
| # Prompts / Chains | |
| # ========================= | |
| QA_PROMPT = PromptTemplate( | |
| input_variables=["customer_data", "policy_rules", "question"], | |
| template=""" | |
| You are a helpful banking assistant. Answer the user's question based strictly on the provided Customer Data and Policy Rules. | |
| CUSTOMER DATA: | |
| {customer_data} | |
| POLICY RULES: | |
| {policy_rules} | |
| USER QUESTION: | |
| {question} | |
| ANSWER: | |
| """ | |
| ) | |
| ADVICE_PROMPT = PromptTemplate( | |
| input_variables=["customer_data", "policy_rules", "question"], | |
| template="""You are a helpful loan officer assistant. | |
| Write in THIRD PERSON about the customer. Always use the customer's Name and possessive. | |
| Never address the reader as 'you' or 'your'. | |
| You must provide ADVICE/RECOMMENDATION (not just restating risk and rate). | |
| Use ONLY the provided Customer Data and Policy Rules. | |
| REQUIREMENTS: | |
| - Provide 3-5 actionable advice points (short sentences). | |
| - Include a clear final recommendation: APPROVE or NOT RECOMMEND / REJECT. | |
| - If customer is Non-Singaporean and PR_Status is False, you MUST recommend NOT RECOMMEND / REJECT regardless of risk level. | |
| - Keep it concise. | |
| CUSTOMER DATA: | |
| {customer_data} | |
| POLICY RULES: | |
| {policy_rules} | |
| USER QUESTION: | |
| {question} | |
| ANSWER: | |
| """ | |
| ) | |
| REPORT_PROMPT = PromptTemplate( | |
| input_variables=["customer_data", "policy_rules"], | |
| template=""" | |
| You are a senior loan officer. Generate a comprehensive loan assessment report based on the provided customer data and banking policies. | |
| Analyze the customer's profile, determine overall risk, calculate interest rate, and provide a clear recommendation. | |
| Follow any mandatory exceptions. | |
| CUSTOMER DATA: | |
| {customer_data} | |
| POLICY RULES: | |
| {policy_rules} | |
| REPORT: | |
| """ | |
| ) | |
| # ========================= | |
| # Global init (Spaces safe) | |
| # ========================= | |
| RETRIEVER = None | |
| POLICY_FULL = "" | |
| LLM = None | |
| INIT_ERROR = None | |
| try: | |
| # Check files | |
| missing = [p for p in [CSV_FILE, RISK_PDF, INT_PDF] if not os.path.exists(p)] | |
| if missing: | |
| raise RuntimeError("Missing required files in repo root:\n" + "\n".join(missing)) | |
| # Clean key (fixes illegal header newline) | |
| api_key = os.getenv("OPENAI_API_KEY", "") | |
| api_key = api_key.strip() | |
| os.environ["OPENAI_API_KEY"] = api_key | |
| if not api_key: | |
| raise RuntimeError("Missing OPENAI_API_KEY. Set it in Space Secrets.") | |
| # Load CSV | |
| DF_ALL = load_customer_csv(CSV_FILE) | |
| DF_CREDIT, DF_ACCOUNT, DF_GOV = build_mock_systems(DF_ALL) | |
| # Load PDFs | |
| RISK_TEXT = extract_pdf_text(RISK_PDF) | |
| INT_TEXT = extract_pdf_text(INT_PDF) | |
| POLICY_FULL = RISK_TEXT + "\n\n" + INT_TEXT | |
| # Parse policies | |
| RISK_MAPPING, INTEREST_RATES = parse_policies(RISK_TEXT, INT_TEXT) | |
| # LLM | |
| model_name = os.getenv("OPENAI_MODEL", "gpt-4o") | |
| LLM = ChatOpenAI(model=model_name, temperature=0, openai_api_key=api_key) | |
| # Chains (LCEL) | |
| QA_CHAIN = QA_PROMPT | LLM | |
| ADVICE_CHAIN = ADVICE_PROMPT | LLM | |
| REPORT_CHAIN = REPORT_PROMPT | LLM | |
| except Exception as e: | |
| INIT_ERROR = str(e) | |
| def build_retriever_if_needed(): | |
| global RETRIEVER | |
| if RETRIEVER is not None: | |
| return RETRIEVER | |
| api_key = os.getenv("OPENAI_API_KEY", "").strip() | |
| if not api_key: | |
| return None | |
| docs = [ | |
| Document(page_content=RISK_TEXT, metadata={"source": "Risk_Policy.pdf"}), | |
| Document(page_content=INT_TEXT, metadata={"source": "Interest_Rate_Policy.pdf"}), | |
| ] | |
| embeddings = OpenAIEmbeddings(openai_api_key=api_key) | |
| db = FAISS.from_documents(docs, embeddings) | |
| RETRIEVER = db.as_retriever() | |
| return RETRIEVER | |
| def get_policy_context(use_rag: bool) -> str: | |
| if not use_rag: | |
| return POLICY_FULL | |
| try: | |
| retriever = build_retriever_if_needed() | |
| if retriever is None: | |
| return POLICY_FULL + "\n\n[Note] RAG unavailable (missing API key). Using full policy text." | |
| docs = retriever.invoke("risk level interest rate PR status") | |
| ctx = "\n\n".join([d.page_content for d in docs]).strip() | |
| return ctx if ctx else POLICY_FULL | |
| except Exception as e: | |
| return POLICY_FULL + f"\n\n[Note] RAG failed, using full policy text: {e}" | |
| # ========================= | |
| # Main Run (Colab-style) | |
| # ========================= | |
| def run_action(user_input: str, action: str, use_rag: bool): | |
| if INIT_ERROR: | |
| return f"β Initialization error:\n\n{INIT_ERROR}" | |
| try: | |
| customer_id, msg = resolve_customer_id(user_input) | |
| if not customer_id: | |
| return msg | |
| profile = get_customer_profile(customer_id) | |
| if not profile: | |
| return f"β No such customer ID: {customer_id}" | |
| profile_text = format_customer_profile(profile) | |
| policy_context = get_policy_context(use_rag) | |
| # Deterministic summary (optional but helps demo) | |
| overall_risk = determine_overall_risk(profile["Credit_Score"], profile["Account_Status"]) | |
| rate = determine_interest_rate(overall_risk) | |
| must_reject = is_non_singaporean_no_pr(customer_id) | |
| det = [ | |
| msg, | |
| "", | |
| "Deterministic (Policy-based):", | |
| f"- Overall risk: {overall_risk}", | |
| f"- Interest rate: {rate:.3f}%", | |
| ] | |
| if must_reject: | |
| det.append("- Mandatory exception: NOT RECOMMEND / REJECT (Non-Singaporean without PR).") | |
| det_text = "\n".join(det) | |
| if action == "1) Check Risk & Interest": | |
| question = f"What are the risk level and applicable interest rate for the customer {customer_id}?" | |
| resp = QA_CHAIN.invoke({ | |
| "customer_data": profile_text, | |
| "policy_rules": policy_context, | |
| "question": question | |
| }) | |
| return det_text + "\n\nAI Output:\n" + one_sentence_per_line(resp.content) | |
| if action == "2) Advice / Recommendation": | |
| question = f"What interest rate advice can be recommended for customer with Id {customer_id}?" | |
| resp = ADVICE_CHAIN.invoke({ | |
| "customer_data": profile_text, | |
| "policy_rules": policy_context, | |
| "question": question | |
| }) | |
| return det_text + "\n\nAI Output:\n" + one_sentence_per_line(enforce_third_person(resp.content, profile.get("Name", "Customer"))) | |
| # 3) FULL report | |
| customer_data_with_exception = profile_text + ( | |
| "\n\nMANDATORY EXCEPTION (must follow): Non-Singaporean with PR_Status = False => NOT RECOMMENDED / REJECTED." | |
| if must_reject else "" | |
| ) | |
| full_report = REPORT_CHAIN.invoke({ | |
| "customer_data": customer_data_with_exception, | |
| "policy_rules": policy_context | |
| }) | |
| final_text = apply_mandatory_exception_to_report(full_report.content, customer_id) | |
| return det_text + "\n\nFull Report:\n" + one_sentence_per_line(final_text) | |
| except Exception: | |
| return "β Run failed:\n\n" + traceback.format_exc() | |
| # ========================= | |
| # Gradio UI (NO dropdown) | |
| # ========================= | |
| with gr.Blocks(title="Bank Loan Officer System") as demo: | |
| gr.Markdown("# π¦ Bank Loan Officer System (Unstructured Input Search)") | |
| gr.Markdown("Type Applicant **Name / ID / sentence**. The system resolves and responds like your Colab notebook.") | |
| if INIT_ERROR: | |
| gr.Markdown(f"## β Initialization error\n\n```\n{INIT_ERROR}\n```") | |
| user_input = gr.Textbox( | |
| label="Applicant Name or ID (unstructured)", | |
| placeholder="e.g. 3333 OR Hilda OR 'please check loan for 3333'" | |
| ) | |
| action = gr.Radio( | |
| label="Action", | |
| choices=[ | |
| "1) Check Risk & Interest", | |
| "2) Advice / Recommendation", | |
| "3) FULL Formal Loan Report" | |
| ], | |
| value="1) Check Risk & Interest" | |
| ) | |
| use_rag = gr.Checkbox( | |
| label="Use RAG (FAISS embeddings). If it fails, auto fallback.", | |
| value=False | |
| ) | |
| run_btn = gr.Button("π Run") | |
| output = gr.Textbox(label="Output", lines=28) | |
| run_btn.click(fn=run_action, inputs=[user_input, action, use_rag], outputs=[output]) | |
| user_input.submit(fn=run_action, inputs=[user_input, action, use_rag], outputs=[output]) | |
| # ========================= | |
| # HF Spaces launch (the add-on at bottom) | |
| # ========================= | |
| PORT = int(os.environ.get("PORT", 7860)) | |
| demo.queue().launch( | |
| server_name="0.0.0.0", | |
| server_port=PORT, | |
| ssr_mode=False, | |
| prevent_thread_lock=False | |
| ) | |