import os import pandas as pd try: from huggingface_hub import InferenceClient except ImportError as exc: raise ImportError( "huggingface_hub is required for AI assistant support. " "Install it with `pip install huggingface_hub`." ) from exc HF_MODEL_ID = os.environ.get("HF_MODEL_ID", "Qwen/Qwen2.5-7B-Instruct") HF_TOKEN = os.environ.get("HUGGINGFACE_API_TOKEN") HF_MAX_TOKENS = int(os.environ.get("HF_MAX_TOKENS", 512)) HF_TEMPERATURE = float(os.environ.get("HF_TEMPERATURE", 0.3)) RISK_METRICS = ["30+@3", "30+@6", "60+@6", "Yr1 NCL"] def _detect_date_column(df: pd.DataFrame): candidates = [ "reporting_month", "observation_date", "observation_month", "obs_date", "date", "calendar_month", "month", "report_date" ] for c in candidates: if c in df.columns: return c return None def _filter_by_month(df: pd.DataFrame, as_of_month: str | None): if not as_of_month or as_of_month == "All": return df.copy() date_col = _detect_date_column(df) if date_col is None: return df.copy() ser = pd.to_datetime(df[date_col], errors="coerce").dt.to_period("M").astype(str) return df[ser == as_of_month].copy() def _fmt_pct(value): try: return f"{round(float(value), 2)}%" except Exception: return "N/A" def build_portfolio_context(df: pd.DataFrame, as_of_month: str | None = None, segment: str | None = None): df = _filter_by_month(df, as_of_month) total_accounts = int(df["account_id"].nunique()) if "account_id" in df.columns else 0 open_accounts = int(df.loc[df["balance"] > 0, "account_id"].nunique()) if "balance" in df.columns else total_accounts bad_accounts = int(df.loc[df["dpd"].fillna(0) >= 30, "account_id"].nunique()) if "dpd" in df.columns else 0 total_balance = float(df["balance"].sum(skipna=True)) if "balance" in df.columns else 0.0 ncl_cols = [c for c in df.columns if "ncl" in c.lower()] if len(ncl_cols) > 0 and total_balance > 0: overall_ncl_rate = df[ncl_cols[0]].sum(skipna=True) / total_balance * 100 elif total_balance > 0 and "dpd" in df.columns: bad_balance = float(df.loc[df["dpd"].fillna(0) >= 30, "balance"].sum()) overall_ncl_rate = bad_balance / total_balance * 100 else: overall_ncl_rate = None if overall_ncl_rate is None: overall_ncl_rate_text = "N/A" else: overall_ncl_rate_text = _fmt_pct(overall_ncl_rate) if "fico_score" in df.columns: avg_fico = round(df["fico_score"].dropna().mean(), 1) elif "fico_band" in df.columns: def band_mid(val): try: lo, hi = val.split("-") return (int(lo) + int(hi)) / 2 except Exception: return None mid_vals = df["fico_band"].dropna().apply(band_mid).dropna() avg_fico = round(mid_vals.mean(), 1) if not mid_vals.empty else None else: avg_fico = None as_of_month_text = ( "all data" if as_of_month == "All" else (as_of_month or "latest available") ) lines = [ f"As of month: {as_of_month_text}", f"Total accounts: {total_accounts}", f"Open accounts: {open_accounts}", f"Bad accounts (dpd>=30): {bad_accounts}", f"Overall NCL rate: {overall_ncl_rate_text}", f"Average FICO: {avg_fico if avg_fico is not None else 'N/A'}" ] if segment and segment in df.columns: segment_summary = ( df.groupby(segment) .agg( accounts=("account_id", "nunique"), balance=("balance", "sum"), bad_balance=("balance", lambda x: x[df.loc[x.index, "dpd"].fillna(0) >= 30].sum() if "dpd" in df.columns else 0) ) .reset_index() ) if "balance" in df.columns: segment_summary["ncl_rate"] = (segment_summary["bad_balance"] / segment_summary["balance"] * 100).round(2).fillna(0) else: segment_summary["ncl_rate"] = 0 lines.append(f"Segment breakdown by {segment}:") for _, row in segment_summary.sort_values("ncl_rate", ascending=False).head(5).iterrows(): lines.append( f" - {row[segment]}: accounts={int(row['accounts'])}, balance={row['balance']:.0f}, ncl={_fmt_pct(row['ncl_rate'])}" ) return "\n".join(lines) def build_calendar_performance_context(df: pd.DataFrame, as_of_month: str | None = None): if not as_of_month or as_of_month == "All": return "" month_df = _filter_by_month(df, as_of_month) if month_df.empty: return f"No data found for {as_of_month}." lines = [f"Calendar month snapshot for {as_of_month}:"] if "balance" in month_df.columns: lines.append(f" - Total balance: {month_df['balance'].sum(skipna=True):.0f}") lines.append(f" - Total accounts: {month_df['account_id'].nunique() if 'account_id' in month_df.columns else 0}") if "dpd" in month_df.columns: lines.append(f" - Accounts with dpd>=30: {month_df.loc[month_df['dpd'].fillna(0) >= 30, 'account_id'].nunique() if 'account_id' in month_df.columns else 0}") for metric in RISK_METRICS: try: view = generate_metric_view(month_df, metric_name=metric, group_col=None) rate_col = [c for c in view.columns if "rate" in c.lower()][0] metric_rate = view.loc[0, rate_col] lines.append(f" - {metric} rate: {_fmt_pct(metric_rate)}") except Exception: continue return "\n".join(lines) def build_vintage_performance_context(df: pd.DataFrame): if "booking_vintage" not in df.columns: return "" lines = ["Vintage performance summary:"] for metric in RISK_METRICS: try: view = generate_metric_view(df, metric_name=metric, group_col="booking_vintage") rate_col = [c for c in view.columns if "rate" in c.lower()][0] if view.empty: continue top = view.sort_values(rate_col, ascending=False).head(2) bottom = view.sort_values(rate_col, ascending=True).head(1) lines.append( f" - {metric}: highest risk vintage {top.iloc[0]['booking_vintage']} at {_fmt_pct(top.iloc[0][rate_col])}, " f"lowest risk vintage {bottom.iloc[0]['booking_vintage']} at {_fmt_pct(bottom.iloc[0][rate_col])}." ) except Exception: continue return "\n".join(lines) def build_context_for_question(df: pd.DataFrame, as_of_month: str | None = None, segment: str | None = None, question: str | None = None): sections = [build_portfolio_context(df, as_of_month=as_of_month, segment=segment)] q = (question or "").lower() if "month" in q or "calendar" in q or "as of" in q: month_context = build_calendar_performance_context(df, as_of_month=as_of_month) if month_context: sections.append(month_context) if "vintage" in q or "trend" in q or "booking vintage" in q or "compare" in q: vintage_context = build_vintage_performance_context(df) if vintage_context: sections.append(vintage_context) if segment and segment in df.columns: segment_context = build_portfolio_context(df, as_of_month=as_of_month, segment=segment) if segment_context: sections.append(segment_context) return "\n\n".join(section for section in sections if section) def _get_inference_client(): if not HF_TOKEN: raise RuntimeError( "HUGGINGFACE_API_TOKEN is required for AI assistant. " "Set it in your environment before running the app." ) else: print("Inference set up successful.") return InferenceClient(token=HF_TOKEN) def generate_ai_answer(question: str, df: pd.DataFrame, as_of_month: str | None = None, segment: str | None = None): summary = build_context_for_question(df, as_of_month=as_of_month, segment=segment, question=question) prompt = ( "You are a senior risk manager assistant responding to portfolio analytics questions. " "Use the risk context below and answer the user clearly, describing what is happening, why it is happening, and what actions should be considered. " "If you cannot answer from the data, say so clearly." + "Context:\n" + summary + "\n\n" + "Question: " + question + "\n\n" + "Answer as a risk manager with practical, concise guidance." ) client = _get_inference_client() print("Inference called successfully") # 1. Format your prompt into OpenAI message style messages = [{"role": "user", "content": prompt}] print(messages) response = client.chat.completions.create( model=HF_MODEL_ID, messages=messages, # Changed from prompt max_tokens=HF_MAX_TOKENS, # Changed from max_new_tokens temperature=HF_TEMPERATURE, top_p=0.95 ) #response = client.chat.completions.create( # model=HF_MODEL_ID, # prompt = prompt, # max_new_tokens= HF_MAX_TOKENS, # temperature= HF_TEMPERATURE, # top_k= 50, # top_p= 0.95 #) print(response.choices[0].message.content) output_text = response.choices[0].message.content if hasattr(response, 'choices') else (response[0].get('generated_text') if isinstance(response, list) else response) return (output_text) #if isinstance(response, dict): # return response.get("generated_text") or response.get("text") or str(response) #if isinstance(response, list) and len(response) > 0: # return response[0].get("generated_text", str(response[0])) #return str(response)