Spaces:
Sleeping
Sleeping
| import os | |
| import pandas as pd | |
| try: | |
| from huggingface_hub import InferenceClient | |
| except ImportError as exc: | |
| raise ImportError( | |
| "huggingface_hub is required for AI assistant support. " | |
| "Install it with `pip install huggingface_hub`." | |
| ) from exc | |
| HF_MODEL_ID = os.environ.get("HF_MODEL_ID", "Qwen/Qwen2.5-7B-Instruct") | |
| HF_TOKEN = os.environ.get("HUGGINGFACE_API_TOKEN") | |
| HF_MAX_TOKENS = int(os.environ.get("HF_MAX_TOKENS", 512)) | |
| HF_TEMPERATURE = float(os.environ.get("HF_TEMPERATURE", 0.3)) | |
| def _detect_date_column(df: pd.DataFrame): | |
| candidates = [ | |
| "reporting_month", | |
| "observation_date", | |
| "observation_month", | |
| "obs_date", | |
| "date", | |
| "calendar_month", | |
| "month", | |
| "report_date" | |
| ] | |
| for c in candidates: | |
| if c in df.columns: | |
| return c | |
| return None | |
| def _filter_by_month(df: pd.DataFrame, as_of_month: str): | |
| if not as_of_month: | |
| return df.copy() | |
| date_col = _detect_date_column(df) | |
| if date_col is None: | |
| return df.copy() | |
| ser = pd.to_datetime(df[date_col], errors="coerce").dt.to_period("M").astype(str) | |
| return df[ser == as_of_month].copy() | |
| def _fmt_pct(value): | |
| try: | |
| return f"{round(float(value), 2)}%" | |
| except Exception: | |
| return "N/A" | |
| def build_portfolio_context(df: pd.DataFrame, as_of_month: str = None, segment: str = None): | |
| df = _filter_by_month(df, as_of_month) | |
| total_accounts = int(df["account_id"].nunique()) if "account_id" in df.columns else 0 | |
| open_accounts = int(df.loc[df["balance"] > 0, "account_id"].nunique()) if "balance" in df.columns else total_accounts | |
| bad_accounts = int(df.loc[df["dpd"].fillna(0) >= 30, "account_id"].nunique()) if "dpd" in df.columns else 0 | |
| total_balance = float(df["balance"].sum(skipna=True)) if "balance" in df.columns else 0.0 | |
| ncl_cols = [c for c in df.columns if "ncl" in c.lower()] | |
| if len(ncl_cols) > 0 and total_balance > 0: | |
| overall_ncl_rate = df[ncl_cols[0]].sum(skipna=True) / total_balance * 100 | |
| elif total_balance > 0 and "dpd" in df.columns: | |
| bad_balance = float(df.loc[df["dpd"].fillna(0) >= 30, "balance"].sum()) | |
| overall_ncl_rate = bad_balance / total_balance * 100 | |
| else: | |
| overall_ncl_rate = None | |
| if overall_ncl_rate is None: | |
| overall_ncl_rate_text = "N/A" | |
| else: | |
| overall_ncl_rate_text = _fmt_pct(overall_ncl_rate) | |
| if "fico_score" in df.columns: | |
| avg_fico = round(df["fico_score"].dropna().mean(), 1) | |
| elif "fico_band" in df.columns: | |
| def band_mid(val): | |
| try: | |
| lo, hi = val.split("-") | |
| return (int(lo) + int(hi)) / 2 | |
| except Exception: | |
| return None | |
| mid_vals = df["fico_band"].dropna().apply(band_mid).dropna() | |
| avg_fico = round(mid_vals.mean(), 1) if not mid_vals.empty else None | |
| else: | |
| avg_fico = None | |
| lines = [ | |
| f"As of month: {as_of_month or 'latest available'}", | |
| f"Total accounts: {total_accounts}", | |
| f"Open accounts: {open_accounts}", | |
| f"Bad accounts (dpd>=30): {bad_accounts}", | |
| f"Overall NCL rate: {overall_ncl_rate_text}", | |
| f"Average FICO: {avg_fico if avg_fico is not None else 'N/A'}" | |
| ] | |
| if segment and segment in df.columns: | |
| segment_summary = ( | |
| df.groupby(segment) | |
| .agg( | |
| accounts=("account_id", "nunique"), | |
| balance=("balance", "sum"), | |
| bad_balance=("balance", lambda x: x[df.loc[x.index, "dpd"].fillna(0) >= 30].sum() if "dpd" in df.columns else 0) | |
| ) | |
| .reset_index() | |
| ) | |
| if "balance" in df.columns: | |
| segment_summary["ncl_rate"] = (segment_summary["bad_balance"] / segment_summary["balance"] * 100).round(2).fillna(0) | |
| else: | |
| segment_summary["ncl_rate"] = 0 | |
| lines.append(f"Segment breakdown by {segment}:") | |
| for _, row in segment_summary.sort_values("ncl_rate", ascending=False).head(5).iterrows(): | |
| lines.append( | |
| f" - {row[segment]}: accounts={int(row['accounts'])}, balance={row['balance']:.0f}, ncl={_fmt_pct(row['ncl_rate'])}" | |
| ) | |
| return "\n".join(lines) | |
| def _get_inference_client(): | |
| if not HF_TOKEN: | |
| raise RuntimeError( | |
| "HUGGINGFACE_API_TOKEN is required for AI assistant. " | |
| "Set it in your environment before running the app." | |
| ) | |
| else: | |
| print("Inference set up successful.") | |
| return InferenceClient(token=HF_TOKEN) | |
| def generate_ai_answer(question: str, df: pd.DataFrame, as_of_month: str = None, segment: str = None): | |
| summary = build_portfolio_context(df, as_of_month=as_of_month, segment=segment) | |
| prompt = ( | |
| "You are a senior risk manager assistant responding to portfolio analytics questions. " | |
| "Use the risk context below and answer the user clearly, describing what is happening, why it is happening, and what actions should be considered. " | |
| "If you cannot answer from the data, say so clearly." + | |
| "Context:\n" + summary + "\n\n" + | |
| "Question: " + question + "\n\n" + | |
| "Answer as a risk manager with practical, concise guidance." | |
| ) | |
| client = _get_inference_client() | |
| print("Inference called successfully") | |
| # 1. Format your prompt into OpenAI message style | |
| messages = [{"role": "user", "content": prompt}] | |
| print(messages) | |
| response = client.chat.completions.create( | |
| model=HF_MODEL_ID, | |
| messages=messages, # Changed from prompt | |
| max_tokens=HF_MAX_TOKENS, # Changed from max_new_tokens | |
| temperature=HF_TEMPERATURE, | |
| top_p=0.95 | |
| ) | |
| #response = client.chat.completions.create( | |
| # model=HF_MODEL_ID, | |
| # prompt = prompt, | |
| # max_new_tokens= HF_MAX_TOKENS, | |
| # temperature= HF_TEMPERATURE, | |
| # top_k= 50, | |
| # top_p= 0.95 | |
| #) | |
| print(response.choices[0].message.content) | |
| output_text = response.choices[0].message.content if hasattr(response, 'choices') else (response[0].get('generated_text') if isinstance(response, list) else response) | |
| return (output_text) | |
| #if isinstance(response, dict): | |
| # return response.get("generated_text") or response.get("text") or str(response) | |
| #if isinstance(response, list) and len(response) > 0: | |
| # return response[0].get("generated_text", str(response[0])) | |
| #return str(response) | |