Spaces:
Sleeping
Sleeping
| import os | |
| import pandas as pd | |
| try: | |
| from huggingface_hub import InferenceClient | |
| except ImportError as exc: | |
| raise ImportError( | |
| "huggingface_hub is required for AI assistant support. " | |
| "Install it with `pip install huggingface_hub`." | |
| ) from exc | |
| HF_MODEL_ID = os.environ.get("HF_MODEL_ID", "Qwen/Qwen2.5-7B-Instruct") | |
| HF_TOKEN = os.environ.get("HUGGINGFACE_API_TOKEN") | |
| HF_MAX_TOKENS = int(os.environ.get("HF_MAX_TOKENS", 512)) | |
| HF_TEMPERATURE = float(os.environ.get("HF_TEMPERATURE", 0.3)) | |
| RISK_METRICS = ["30+@3", "30+@6", "60+@6", "Yr1 NCL"] | |
| def _detect_date_column(df: pd.DataFrame): | |
| candidates = [ | |
| "reporting_month", | |
| "observation_date", | |
| "observation_month", | |
| "obs_date", | |
| "date", | |
| "calendar_month", | |
| "month", | |
| "report_date" | |
| ] | |
| for c in candidates: | |
| if c in df.columns: | |
| return c | |
| return None | |
| def _filter_by_month(df: pd.DataFrame, as_of_month: str | None): | |
| if not as_of_month or as_of_month == "All": | |
| return df.copy() | |
| date_col = _detect_date_column(df) | |
| if date_col is None: | |
| return df.copy() | |
| ser = pd.to_datetime(df[date_col], errors="coerce").dt.to_period("M").astype(str) | |
| return df[ser == as_of_month].copy() | |
| def _fmt_pct(value): | |
| try: | |
| return f"{round(float(value), 2)}%" | |
| except Exception: | |
| return "N/A" | |
| def build_portfolio_context(df: pd.DataFrame, as_of_month: str | None = None, segment: str | None = None): | |
| df = _filter_by_month(df, as_of_month) | |
| total_accounts = int(df["account_id"].nunique()) if "account_id" in df.columns else 0 | |
| open_accounts = int(df.loc[df["balance"] > 0, "account_id"].nunique()) if "balance" in df.columns else total_accounts | |
| bad_accounts = int(df.loc[df["dpd"].fillna(0) >= 30, "account_id"].nunique()) if "dpd" in df.columns else 0 | |
| total_balance = float(df["balance"].sum(skipna=True)) if "balance" in df.columns else 0.0 | |
| ncl_cols = [c for c in df.columns if "ncl" in c.lower()] | |
| if len(ncl_cols) > 0 and total_balance > 0: | |
| overall_ncl_rate = df[ncl_cols[0]].sum(skipna=True) / total_balance * 100 | |
| elif total_balance > 0 and "dpd" in df.columns: | |
| bad_balance = float(df.loc[df["dpd"].fillna(0) >= 30, "balance"].sum()) | |
| overall_ncl_rate = bad_balance / total_balance * 100 | |
| else: | |
| overall_ncl_rate = None | |
| if overall_ncl_rate is None: | |
| overall_ncl_rate_text = "N/A" | |
| else: | |
| overall_ncl_rate_text = _fmt_pct(overall_ncl_rate) | |
| if "fico_score" in df.columns: | |
| avg_fico = round(df["fico_score"].dropna().mean(), 1) | |
| elif "fico_band" in df.columns: | |
| def band_mid(val): | |
| try: | |
| lo, hi = val.split("-") | |
| return (int(lo) + int(hi)) / 2 | |
| except Exception: | |
| return None | |
| mid_vals = df["fico_band"].dropna().apply(band_mid).dropna() | |
| avg_fico = round(mid_vals.mean(), 1) if not mid_vals.empty else None | |
| else: | |
| avg_fico = None | |
| as_of_month_text = ( | |
| "all data" if as_of_month == "All" else (as_of_month or "latest available") | |
| ) | |
| lines = [ | |
| f"As of month: {as_of_month_text}", | |
| f"Total accounts: {total_accounts}", | |
| f"Open accounts: {open_accounts}", | |
| f"Bad accounts (dpd>=30): {bad_accounts}", | |
| f"Overall NCL rate: {overall_ncl_rate_text}", | |
| f"Average FICO: {avg_fico if avg_fico is not None else 'N/A'}" | |
| ] | |
| if segment and segment in df.columns: | |
| segment_summary = ( | |
| df.groupby(segment) | |
| .agg( | |
| accounts=("account_id", "nunique"), | |
| balance=("balance", "sum"), | |
| bad_balance=("balance", lambda x: x[df.loc[x.index, "dpd"].fillna(0) >= 30].sum() if "dpd" in df.columns else 0) | |
| ) | |
| .reset_index() | |
| ) | |
| if "balance" in df.columns: | |
| segment_summary["ncl_rate"] = (segment_summary["bad_balance"] / segment_summary["balance"] * 100).round(2).fillna(0) | |
| else: | |
| segment_summary["ncl_rate"] = 0 | |
| lines.append(f"Segment breakdown by {segment}:") | |
| for _, row in segment_summary.sort_values("ncl_rate", ascending=False).head(5).iterrows(): | |
| lines.append( | |
| f" - {row[segment]}: accounts={int(row['accounts'])}, balance={row['balance']:.0f}, ncl={_fmt_pct(row['ncl_rate'])}" | |
| ) | |
| return "\n".join(lines) | |
| def build_calendar_performance_context(df: pd.DataFrame, as_of_month: str | None = None): | |
| if not as_of_month or as_of_month == "All": | |
| return "" | |
| month_df = _filter_by_month(df, as_of_month) | |
| if month_df.empty: | |
| return f"No data found for {as_of_month}." | |
| lines = [f"Calendar month snapshot for {as_of_month}:"] | |
| if "balance" in month_df.columns: | |
| lines.append(f" - Total balance: {month_df['balance'].sum(skipna=True):.0f}") | |
| lines.append(f" - Total accounts: {month_df['account_id'].nunique() if 'account_id' in month_df.columns else 0}") | |
| if "dpd" in month_df.columns: | |
| lines.append(f" - Accounts with dpd>=30: {month_df.loc[month_df['dpd'].fillna(0) >= 30, 'account_id'].nunique() if 'account_id' in month_df.columns else 0}") | |
| for metric in RISK_METRICS: | |
| try: | |
| view = generate_metric_view(month_df, metric_name=metric, group_col=None) | |
| rate_col = [c for c in view.columns if "rate" in c.lower()][0] | |
| metric_rate = view.loc[0, rate_col] | |
| lines.append(f" - {metric} rate: {_fmt_pct(metric_rate)}") | |
| except Exception: | |
| continue | |
| return "\n".join(lines) | |
| def build_vintage_performance_context(df: pd.DataFrame): | |
| if "booking_vintage" not in df.columns: | |
| return "" | |
| lines = ["Vintage performance summary:"] | |
| for metric in RISK_METRICS: | |
| try: | |
| view = generate_metric_view(df, metric_name=metric, group_col="booking_vintage") | |
| rate_col = [c for c in view.columns if "rate" in c.lower()][0] | |
| if view.empty: | |
| continue | |
| top = view.sort_values(rate_col, ascending=False).head(2) | |
| bottom = view.sort_values(rate_col, ascending=True).head(1) | |
| lines.append( | |
| f" - {metric}: highest risk vintage {top.iloc[0]['booking_vintage']} at {_fmt_pct(top.iloc[0][rate_col])}, " | |
| f"lowest risk vintage {bottom.iloc[0]['booking_vintage']} at {_fmt_pct(bottom.iloc[0][rate_col])}." | |
| ) | |
| except Exception: | |
| continue | |
| return "\n".join(lines) | |
| def build_context_for_question(df: pd.DataFrame, as_of_month: str | None = None, segment: str | None = None, question: str | None = None): | |
| sections = [build_portfolio_context(df, as_of_month=as_of_month, segment=segment)] | |
| q = (question or "").lower() | |
| if "month" in q or "calendar" in q or "as of" in q: | |
| month_context = build_calendar_performance_context(df, as_of_month=as_of_month) | |
| if month_context: | |
| sections.append(month_context) | |
| if "vintage" in q or "trend" in q or "booking vintage" in q or "compare" in q: | |
| vintage_context = build_vintage_performance_context(df) | |
| if vintage_context: | |
| sections.append(vintage_context) | |
| if segment and segment in df.columns: | |
| segment_context = build_portfolio_context(df, as_of_month=as_of_month, segment=segment) | |
| if segment_context: | |
| sections.append(segment_context) | |
| return "\n\n".join(section for section in sections if section) | |
| def _get_inference_client(): | |
| if not HF_TOKEN: | |
| raise RuntimeError( | |
| "HUGGINGFACE_API_TOKEN is required for AI assistant. " | |
| "Set it in your environment before running the app." | |
| ) | |
| else: | |
| print("Inference set up successful.") | |
| return InferenceClient(token=HF_TOKEN) | |
| def generate_ai_answer(question: str, df: pd.DataFrame, as_of_month: str | None = None, segment: str | None = None): | |
| summary = build_context_for_question(df, as_of_month=as_of_month, segment=segment, question=question) | |
| prompt = ( | |
| "You are a senior risk manager assistant responding to portfolio analytics questions. " | |
| "Use the risk context below and answer the user clearly, describing what is happening, why it is happening, and what actions should be considered. " | |
| "If you cannot answer from the data, say so clearly." + | |
| "Context:\n" + summary + "\n\n" + | |
| "Question: " + question + "\n\n" + | |
| "Answer as a risk manager with practical, concise guidance." | |
| ) | |
| client = _get_inference_client() | |
| print("Inference called successfully") | |
| # 1. Format your prompt into OpenAI message style | |
| messages = [{"role": "user", "content": prompt}] | |
| print(messages) | |
| response = client.chat.completions.create( | |
| model=HF_MODEL_ID, | |
| messages=messages, # Changed from prompt | |
| max_tokens=HF_MAX_TOKENS, # Changed from max_new_tokens | |
| temperature=HF_TEMPERATURE, | |
| top_p=0.95 | |
| ) | |
| #response = client.chat.completions.create( | |
| # model=HF_MODEL_ID, | |
| # prompt = prompt, | |
| # max_new_tokens= HF_MAX_TOKENS, | |
| # temperature= HF_TEMPERATURE, | |
| # top_k= 50, | |
| # top_p= 0.95 | |
| #) | |
| print(response.choices[0].message.content) | |
| output_text = response.choices[0].message.content if hasattr(response, 'choices') else (response[0].get('generated_text') if isinstance(response, list) else response) | |
| return (output_text) | |
| #if isinstance(response, dict): | |
| # return response.get("generated_text") or response.get("text") or str(response) | |
| #if isinstance(response, list) and len(response) > 0: | |
| # return response[0].get("generated_text", str(response[0])) | |
| #return str(response) |