Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import re | |
| import pandas as pd | |
| try: | |
| from huggingface_hub import InferenceClient | |
| except ImportError as exc: | |
| raise ImportError( | |
| "huggingface_hub is required for AI assistant support. " | |
| "Install it with `pip install huggingface_hub`." | |
| ) from exc | |
| from analytics.performance_analysis import generate_metric_view | |
| HF_MODEL_ID = os.environ.get("HF_MODEL_ID", "Qwen/Qwen2.5-7B-Instruct") | |
| HF_TOKEN = os.environ.get("HUGGINGFACE_API_TOKEN") | |
| HF_MAX_TOKENS = int(os.environ.get("HF_MAX_TOKENS", 1024)) | |
| HF_TEMPERATURE = float(os.environ.get("HF_TEMPERATURE", 0.3)) | |
| RISK_METRICS = ["30+@3", "30+@6", "60+@6", "Yr1 NCL"] | |
| GROUPING_MAP = { | |
| "fico": "fico_band", | |
| "fico_band": "fico_band", | |
| "channel": "sourcing_channel", | |
| "sourcing_channel": "sourcing_channel", | |
| "city": "city_tier", | |
| "city_tier": "city_tier", | |
| "occupation": "occupation_type", | |
| "occupation_type": "occupation_type" | |
| } | |
| def _detect_date_column(df: pd.DataFrame): | |
| candidates = [ | |
| "reporting_month", | |
| "observation_date", | |
| "observation_month", | |
| "obs_date", | |
| "date", | |
| "calendar_month", | |
| "month", | |
| "report_date" | |
| ] | |
| for c in candidates: | |
| if c in df.columns: | |
| return c | |
| return None | |
| def _filter_by_month(df: pd.DataFrame, as_of_month: str | None): | |
| if not as_of_month or as_of_month == "All": | |
| return df.copy() | |
| date_col = _detect_date_column(df) | |
| if date_col is None: | |
| return df.copy() | |
| ser = pd.to_datetime(df[date_col], errors="coerce").dt.to_period("M").astype(str) | |
| return df[ser == as_of_month].copy() | |
| def _fmt_pct(value): | |
| try: | |
| return f"{round(float(value), 2)}%" | |
| except Exception: | |
| return "N/A" | |
| def build_portfolio_context(df: pd.DataFrame, as_of_month: str | None = None, segment: str | None = None): | |
| df = _filter_by_month(df, as_of_month) | |
| total_accounts = int(df["account_id"].nunique()) if "account_id" in df.columns else 0 | |
| open_accounts = int(df.loc[df["balance"] > 0, "account_id"].nunique()) if "balance" in df.columns else total_accounts | |
| bad_accounts = int(df.loc[df["dpd"].fillna(0) >= 30, "account_id"].nunique()) if "dpd" in df.columns else 0 | |
| total_balance = float(df["balance"].sum(skipna=True)) if "balance" in df.columns else 0.0 | |
| ncl_cols = [c for c in df.columns if "ncl" in c.lower()] | |
| if len(ncl_cols) > 0 and total_balance > 0: | |
| overall_ncl_rate = df[ncl_cols[0]].sum(skipna=True) / total_balance * 100 | |
| elif total_balance > 0 and "dpd" in df.columns: | |
| bad_balance = float(df.loc[df["dpd"].fillna(0) >= 30, "balance"].sum()) | |
| overall_ncl_rate = bad_balance / total_balance * 100 | |
| else: | |
| overall_ncl_rate = None | |
| if overall_ncl_rate is None: | |
| overall_ncl_rate_text = "N/A" | |
| else: | |
| overall_ncl_rate_text = _fmt_pct(overall_ncl_rate) | |
| if "fico_score" in df.columns: | |
| avg_fico = round(df["fico_score"].dropna().mean(), 1) | |
| elif "fico_band" in df.columns: | |
| def band_mid(val): | |
| try: | |
| lo, hi = val.split("-") | |
| return (int(lo) + int(hi)) / 2 | |
| except Exception: | |
| return None | |
| mid_vals = df["fico_band"].dropna().apply(band_mid).dropna() | |
| avg_fico = round(mid_vals.mean(), 1) if not mid_vals.empty else None | |
| else: | |
| avg_fico = None | |
| as_of_month_text = ( | |
| "all data" if as_of_month == "All" else (as_of_month or "latest available") | |
| ) | |
| lines = [ | |
| f"As of month: {as_of_month_text}", | |
| f"Total accounts: {total_accounts}", | |
| f"Open accounts: {open_accounts}", | |
| f"Bad accounts (dpd>=30): {bad_accounts}", | |
| f"Overall NCL rate: {overall_ncl_rate_text}", | |
| f"Average FICO: {avg_fico if avg_fico is not None else 'N/A'}" | |
| ] | |
| if segment and segment in df.columns: | |
| segment_summary = ( | |
| df.groupby(segment) | |
| .agg( | |
| accounts=("account_id", "nunique"), | |
| balance=("balance", "sum"), | |
| bad_balance=("balance", lambda x: x[df.loc[x.index, "dpd"].fillna(0) >= 30].sum() if "dpd" in df.columns else 0) | |
| ) | |
| .reset_index() | |
| ) | |
| if "balance" in df.columns: | |
| segment_summary["ncl_rate"] = (segment_summary["bad_balance"] / segment_summary["balance"] * 100).round(2).fillna(0) | |
| else: | |
| segment_summary["ncl_rate"] = 0 | |
| lines.append(f"Segment breakdown by {segment}:") | |
| for _, row in segment_summary.sort_values("ncl_rate", ascending=False).head(5).iterrows(): | |
| lines.append( | |
| f" - {row[segment]}: accounts={int(row['accounts'])}, balance={row['balance']:.0f}, ncl={_fmt_pct(row['ncl_rate'])}" | |
| ) | |
| return "\n".join(lines) | |
| def _extract_json(text: str): | |
| match = re.search(r"\{.*\}", text, re.S) | |
| if not match: | |
| return None | |
| payload = match.group(0) | |
| try: | |
| return json.loads(payload) | |
| except json.JSONDecodeError: | |
| try: | |
| cleaned = re.sub(r"[\n\r]+", " ", payload) | |
| cleaned = re.sub(r"(['\"])?([a-zA-Z0-9_]+)(['\"])?\s*:\s*", r'"\2": ', cleaned) | |
| return json.loads(cleaned) | |
| except Exception: | |
| return None | |
| def _extract_grouping(question: str, segment: str | None = None): | |
| lower = question.lower() | |
| for keyword, group_col in GROUPING_MAP.items(): | |
| if keyword in lower: | |
| return group_col | |
| return segment if segment else None | |
| def _extract_metrics(question: str): | |
| lower = question.lower() | |
| selected = [] | |
| for metric in RISK_METRICS: | |
| if metric.lower() in lower: | |
| selected.append(metric) | |
| return selected if selected else ["All"] | |
| def _infer_data_request(question: str, as_of_month: str | None = None, segment: str | None = None): | |
| client = _get_inference_client() | |
| prompt = ( | |
| "You are a data planning assistant for a risk analytics system. " | |
| "Translate the user's natural language question into a structured data request. " | |
| "Return only valid JSON with keys: data_req, grouping, metrics, as_of_month, segment. " | |
| "data_req should be a list containing one or more of: [\"vintage\", \"calendar\", \"portfolio\", \"classification\"]. " | |
| "grouping should be one of [\"fico_band\", \"sourcing_channel\", \"city_tier\", \"occupation_type\"] or null. " | |
| "metrics should be [\"All\"] or a list of one or more of: [\"30+@3\", \"30+@6\", \"60+@6\", \"Yr1 NCL\"]. " | |
| "as_of_month should be the requested month in YYYY-MM format, or \"All\" if the query wants full data or no explicit month." | |
| "segment should be the selected UI segment value or null.\n\n" | |
| "Example: If the query is: Analyse which vintages,the fico bands and occupation have the worst performance as of 2024-11." | |
| "The output will be a list with data_req having vintage,grouping key will have fico_band and occupation_type,metrics will have 30+@3, 30+@6, 60+@6, Yr1 NCL,as_of_month will have 2024-11.\n\n" | |
| "Question: " + question + "\n" | |
| "Selected UI month: " + (as_of_month or "All") + "\n" | |
| "Selected UI segment: " + (segment or "None") + "\n" | |
| ) | |
| response = client.chat.completions.create( | |
| model=HF_MODEL_ID, | |
| messages=[ | |
| {"role": "system", "content": "You are a system that returns only JSON structured data requests."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| max_tokens=200, | |
| temperature=0.0, | |
| top_p=0.95 | |
| ) | |
| print(response.choices[0].message.content) | |
| text = response.choices[0].message.content if hasattr(response, 'choices') else str(response) | |
| spec = _extract_json(text) | |
| if not spec: | |
| spec = { | |
| "data_req": [], | |
| "grouping": _extract_grouping(question, segment), | |
| "metrics": _extract_metrics(question), | |
| "as_of_month": as_of_month or "All", | |
| "segment": segment or None | |
| } | |
| if not spec.get("data_req"): | |
| q_lower = question.lower() | |
| reqs = [] | |
| if any(token in q_lower for token in ["vintage", "booking", "trend", "performance"]): | |
| reqs.append("vintage") | |
| if any(token in q_lower for token in ["month", "calendar", "snapshot", "as of"]): | |
| reqs.append("calendar") | |
| if any(token in q_lower for token in ["segment", "classification", "ranking", "category", "group"]): | |
| reqs.append("classification") | |
| if not reqs: | |
| reqs.append("portfolio") | |
| spec["data_req"] = reqs | |
| spec["as_of_month"] = spec.get("as_of_month") or as_of_month or "All" | |
| spec["grouping"] = spec.get("grouping") or _extract_grouping(question, segment) | |
| spec["metrics"] = spec.get("metrics") or ["All"] | |
| spec["segment"] = spec.get("segment") or segment or None | |
| return spec | |
| def _build_vintage_context(df: pd.DataFrame, grouping: str | None = None, metrics: list[str] | None = None): | |
| metrics = metrics or ["All"] | |
| if metrics == ["All"]: | |
| metrics = RISK_METRICS | |
| lines = ["Vintage performance context:"] | |
| for metric in metrics: | |
| try: | |
| view = generate_metric_view(df, metric_name=metric, group_col=grouping) | |
| if view.empty: | |
| continue | |
| rate_col = [c for c in view.columns if "rate" in c.lower()][0] | |
| if grouping: | |
| top_rows = view.sort_values(rate_col, ascending=False).head(3) | |
| lines.append(f"{metric} top vintage/group combinations:") | |
| for _, row in top_rows.iterrows(): | |
| lines.append( | |
| f" - vintage {row['booking_vintage']}, {grouping}={row[grouping]}, rate={_fmt_pct(row[rate_col])}" | |
| ) | |
| else: | |
| highest = view.sort_values(rate_col, ascending=False).head(1).iloc[0] | |
| lowest = view.sort_values(rate_col, ascending=True).head(1).iloc[0] | |
| lines.append( | |
| f"{metric}: highest vintage {highest['booking_vintage']} at {_fmt_pct(highest[rate_col])}, " | |
| f"lowest vintage {lowest['booking_vintage']} at {_fmt_pct(lowest[rate_col])}." | |
| ) | |
| except Exception: | |
| continue | |
| return "\n".join(lines) | |
| def _build_calendar_context(df: pd.DataFrame, as_of_month: str | None): | |
| if not as_of_month or as_of_month == "All": | |
| return "" | |
| month_df = _filter_by_month(df, as_of_month) | |
| if month_df.empty: | |
| return f"No data found for {as_of_month}." | |
| month_lines = [f"Calendar month performance context for {as_of_month}:"] | |
| month_lines.append(f" - Total accounts: {month_df['account_id'].nunique() if 'account_id' in month_df.columns else 0}") | |
| if 'balance' in month_df.columns: | |
| month_lines.append(f" - Total balance: {month_df['balance'].sum(skipna=True):.0f}") | |
| if 'dpd' in month_df.columns: | |
| month_lines.append(f" - Accounts with dpd>=30: {month_df.loc[month_df['dpd'].fillna(0) >= 30, 'account_id'].nunique() if 'account_id' in month_df.columns else 0}") | |
| return "\n".join(month_lines) | |
| def _build_context_from_spec(df: pd.DataFrame, spec: dict): | |
| sections = [] | |
| if 'portfolio' in spec.get('data_req', []): | |
| sections.append(build_portfolio_context(df, as_of_month=spec.get('as_of_month'), segment=spec.get('segment'))) | |
| if 'calendar' in spec.get('data_req', []): | |
| calendar_section = _build_calendar_context(df, spec.get('as_of_month')) | |
| if calendar_section: | |
| sections.append(calendar_section) | |
| if 'vintage' in spec.get('data_req', []): | |
| vintage_section = _build_vintage_context(df, grouping=spec.get('grouping'), metrics=spec.get('metrics')) | |
| if vintage_section: | |
| sections.append(vintage_section) | |
| if 'classification' in spec.get('data_req', []): | |
| classification = build_portfolio_context(df, as_of_month=spec.get('as_of_month'), segment=spec.get('segment')) | |
| if classification: | |
| sections.append(classification) | |
| if not sections: | |
| sections.append(build_portfolio_context(df, as_of_month=spec.get('as_of_month'), segment=spec.get('segment'))) | |
| return "\n\n".join(sections) | |
| def _get_inference_client(): | |
| if not HF_TOKEN: | |
| raise RuntimeError( | |
| "HUGGINGFACE_API_TOKEN is required for AI assistant. " | |
| "Set it in your environment before running the app." | |
| ) | |
| return InferenceClient(token=HF_TOKEN) | |
| def generate_ai_answer(question: str, df: pd.DataFrame, as_of_month: str | None = None, segment: str | None = None): | |
| request_spec = _infer_data_request(question, as_of_month=as_of_month, segment=segment) | |
| context = _build_context_from_spec(df, request_spec) | |
| prompt = ( | |
| "Provide a detailed analytical interpretation based only on the provided context.\n\n" | |
| "Data request spec:\n" + json.dumps(request_spec, indent=2) + "\n\n" | |
| "Context:\n" + context + "\n\n" | |
| "Question: " + question + "\n\n" | |
| "Answer as a risk manager with practical, concise guidance." | |
| ) | |
| print(context) | |
| client = _get_inference_client() | |
| messages = [ | |
| {"role": "system", "content": """You are a senior retail credit risk analyst reviewing portfolio monitoring outputs for a credit card business. | |
| Your role is to interpret portfolio risk trends using the provided analytical context and respond like an experienced risk analyst preparing insights for a portfolio review discussion. | |
| Focus on: | |
| - identifying the most important deterioration patterns | |
| - comparing segments and vintages | |
| - highlighting concentration of risk | |
| - assessing severity and consistency of deterioration | |
| - identifying unusual or contradictory patterns | |
| - suggesting plausible drivers or hypotheses | |
| - recommending focused next-step investigations or actions | |
| Do NOT give generic business advice. | |
| Do NOT simply restate the numbers. | |
| Do NOT hallucinate missing data. | |
| Do NOT assume causality unless supported by the context. | |
| You MAY provide cautious hypotheses when patterns strongly suggest possible drivers, but explicitly label them as hypotheses rather than facts. | |
| Your response should prioritize analytical insight over description. | |
| Structure the response into the following sections: | |
| 1. Key Risk Observations | |
| - Highlight the most important portfolio signals | |
| - Focus on deterioration severity, segment concentration, and vintage behavior | |
| - Compare across groups where relevant | |
| 2. Risk Interpretation | |
| - Explain what the observed patterns may imply for portfolio health | |
| - Identify whether deterioration appears broad-based or concentrated | |
| - Mention whether the issue appears temporary, structural, seasoning-related, or segment-driven | |
| 3. Potential Drivers / Hypotheses | |
| - Suggest possible explanations cautiously | |
| - Mention underwriting, channel mix, customer quality, macro effects, or operational factors only if supported by the data pattern | |
| 4. Recommended Investigations / Actions | |
| - Suggest practical next analyses or monitoring actions | |
| - Examples: | |
| - deeper drilldown by channel | |
| - compare booking mix shifts | |
| - investigate policy changes | |
| - monitor recent vintages closely | |
| - review underwriting in affected segments | |
| 5. Caveats / Data Limitations | |
| - Mention important limitations if the data is insufficient for strong conclusions | |
| Important behavioral rules: | |
| - Prioritize signal over coverage | |
| - Focus on the highest-risk combinations first | |
| - Treat sharp vintage spikes as important | |
| - Treat repeated deterioration across metrics as more serious | |
| - If high-FICO segments are deteriorating unusually, explicitly call that out | |
| - Distinguish between isolated anomalies and broad deterioration | |
| - Use concise but analytical language | |
| - Sound like an experienced portfolio analyst, not a generic chatbot"""}, | |
| {"role": "user", "content": prompt} | |
| ] | |
| response = client.chat.completions.create( | |
| model=HF_MODEL_ID, | |
| messages=messages, | |
| max_tokens=HF_MAX_TOKENS, | |
| temperature=HF_TEMPERATURE, | |
| top_p=0.95 | |
| ) | |
| print(response.choices[0].message.content) | |
| output_text = response.choices[0].message.content if hasattr(response, 'choices') else (response[0].get('generated_text') if isinstance(response, list) else str(response)) | |
| return output_text | |