Risk_Manager / analytics /ai_assistantv2.py
GenAICoder's picture
Rename analytics/ai_assistant.py to analytics/ai_assistantv2.py
e1e9406 verified
Raw
History Blame Contribute Delete
9.82 kB
import os
import pandas as pd
try:
from huggingface_hub import InferenceClient
except ImportError as exc:
raise ImportError(
"huggingface_hub is required for AI assistant support. "
"Install it with `pip install huggingface_hub`."
) from exc
HF_MODEL_ID = os.environ.get("HF_MODEL_ID", "Qwen/Qwen2.5-7B-Instruct")
HF_TOKEN = os.environ.get("HUGGINGFACE_API_TOKEN")
HF_MAX_TOKENS = int(os.environ.get("HF_MAX_TOKENS", 512))
HF_TEMPERATURE = float(os.environ.get("HF_TEMPERATURE", 0.3))
RISK_METRICS = ["30+@3", "30+@6", "60+@6", "Yr1 NCL"]
def _detect_date_column(df: pd.DataFrame):
candidates = [
"reporting_month",
"observation_date",
"observation_month",
"obs_date",
"date",
"calendar_month",
"month",
"report_date"
]
for c in candidates:
if c in df.columns:
return c
return None
def _filter_by_month(df: pd.DataFrame, as_of_month: str | None):
if not as_of_month or as_of_month == "All":
return df.copy()
date_col = _detect_date_column(df)
if date_col is None:
return df.copy()
ser = pd.to_datetime(df[date_col], errors="coerce").dt.to_period("M").astype(str)
return df[ser == as_of_month].copy()
def _fmt_pct(value):
try:
return f"{round(float(value), 2)}%"
except Exception:
return "N/A"
def build_portfolio_context(df: pd.DataFrame, as_of_month: str | None = None, segment: str | None = None):
df = _filter_by_month(df, as_of_month)
total_accounts = int(df["account_id"].nunique()) if "account_id" in df.columns else 0
open_accounts = int(df.loc[df["balance"] > 0, "account_id"].nunique()) if "balance" in df.columns else total_accounts
bad_accounts = int(df.loc[df["dpd"].fillna(0) >= 30, "account_id"].nunique()) if "dpd" in df.columns else 0
total_balance = float(df["balance"].sum(skipna=True)) if "balance" in df.columns else 0.0
ncl_cols = [c for c in df.columns if "ncl" in c.lower()]
if len(ncl_cols) > 0 and total_balance > 0:
overall_ncl_rate = df[ncl_cols[0]].sum(skipna=True) / total_balance * 100
elif total_balance > 0 and "dpd" in df.columns:
bad_balance = float(df.loc[df["dpd"].fillna(0) >= 30, "balance"].sum())
overall_ncl_rate = bad_balance / total_balance * 100
else:
overall_ncl_rate = None
if overall_ncl_rate is None:
overall_ncl_rate_text = "N/A"
else:
overall_ncl_rate_text = _fmt_pct(overall_ncl_rate)
if "fico_score" in df.columns:
avg_fico = round(df["fico_score"].dropna().mean(), 1)
elif "fico_band" in df.columns:
def band_mid(val):
try:
lo, hi = val.split("-")
return (int(lo) + int(hi)) / 2
except Exception:
return None
mid_vals = df["fico_band"].dropna().apply(band_mid).dropna()
avg_fico = round(mid_vals.mean(), 1) if not mid_vals.empty else None
else:
avg_fico = None
as_of_month_text = (
"all data" if as_of_month == "All" else (as_of_month or "latest available")
)
lines = [
f"As of month: {as_of_month_text}",
f"Total accounts: {total_accounts}",
f"Open accounts: {open_accounts}",
f"Bad accounts (dpd>=30): {bad_accounts}",
f"Overall NCL rate: {overall_ncl_rate_text}",
f"Average FICO: {avg_fico if avg_fico is not None else 'N/A'}"
]
if segment and segment in df.columns:
segment_summary = (
df.groupby(segment)
.agg(
accounts=("account_id", "nunique"),
balance=("balance", "sum"),
bad_balance=("balance", lambda x: x[df.loc[x.index, "dpd"].fillna(0) >= 30].sum() if "dpd" in df.columns else 0)
)
.reset_index()
)
if "balance" in df.columns:
segment_summary["ncl_rate"] = (segment_summary["bad_balance"] / segment_summary["balance"] * 100).round(2).fillna(0)
else:
segment_summary["ncl_rate"] = 0
lines.append(f"Segment breakdown by {segment}:")
for _, row in segment_summary.sort_values("ncl_rate", ascending=False).head(5).iterrows():
lines.append(
f" - {row[segment]}: accounts={int(row['accounts'])}, balance={row['balance']:.0f}, ncl={_fmt_pct(row['ncl_rate'])}"
)
return "\n".join(lines)
def build_calendar_performance_context(df: pd.DataFrame, as_of_month: str | None = None):
if not as_of_month or as_of_month == "All":
return ""
month_df = _filter_by_month(df, as_of_month)
if month_df.empty:
return f"No data found for {as_of_month}."
lines = [f"Calendar month snapshot for {as_of_month}:"]
if "balance" in month_df.columns:
lines.append(f" - Total balance: {month_df['balance'].sum(skipna=True):.0f}")
lines.append(f" - Total accounts: {month_df['account_id'].nunique() if 'account_id' in month_df.columns else 0}")
if "dpd" in month_df.columns:
lines.append(f" - Accounts with dpd>=30: {month_df.loc[month_df['dpd'].fillna(0) >= 30, 'account_id'].nunique() if 'account_id' in month_df.columns else 0}")
for metric in RISK_METRICS:
try:
view = generate_metric_view(month_df, metric_name=metric, group_col=None)
rate_col = [c for c in view.columns if "rate" in c.lower()][0]
metric_rate = view.loc[0, rate_col]
lines.append(f" - {metric} rate: {_fmt_pct(metric_rate)}")
except Exception:
continue
return "\n".join(lines)
def build_vintage_performance_context(df: pd.DataFrame):
if "booking_vintage" not in df.columns:
return ""
lines = ["Vintage performance summary:"]
for metric in RISK_METRICS:
try:
view = generate_metric_view(df, metric_name=metric, group_col="booking_vintage")
rate_col = [c for c in view.columns if "rate" in c.lower()][0]
if view.empty:
continue
top = view.sort_values(rate_col, ascending=False).head(2)
bottom = view.sort_values(rate_col, ascending=True).head(1)
lines.append(
f" - {metric}: highest risk vintage {top.iloc[0]['booking_vintage']} at {_fmt_pct(top.iloc[0][rate_col])}, "
f"lowest risk vintage {bottom.iloc[0]['booking_vintage']} at {_fmt_pct(bottom.iloc[0][rate_col])}."
)
except Exception:
continue
return "\n".join(lines)
def build_context_for_question(df: pd.DataFrame, as_of_month: str | None = None, segment: str | None = None, question: str | None = None):
sections = [build_portfolio_context(df, as_of_month=as_of_month, segment=segment)]
q = (question or "").lower()
if "month" in q or "calendar" in q or "as of" in q:
month_context = build_calendar_performance_context(df, as_of_month=as_of_month)
if month_context:
sections.append(month_context)
if "vintage" in q or "trend" in q or "booking vintage" in q or "compare" in q:
vintage_context = build_vintage_performance_context(df)
if vintage_context:
sections.append(vintage_context)
if segment and segment in df.columns:
segment_context = build_portfolio_context(df, as_of_month=as_of_month, segment=segment)
if segment_context:
sections.append(segment_context)
return "\n\n".join(section for section in sections if section)
def _get_inference_client():
if not HF_TOKEN:
raise RuntimeError(
"HUGGINGFACE_API_TOKEN is required for AI assistant. "
"Set it in your environment before running the app."
)
else:
print("Inference set up successful.")
return InferenceClient(token=HF_TOKEN)
def generate_ai_answer(question: str, df: pd.DataFrame, as_of_month: str | None = None, segment: str | None = None):
summary = build_context_for_question(df, as_of_month=as_of_month, segment=segment, question=question)
prompt = (
"You are a senior risk manager assistant responding to portfolio analytics questions. "
"Use the risk context below and answer the user clearly, describing what is happening, why it is happening, and what actions should be considered. "
"If you cannot answer from the data, say so clearly." +
"Context:\n" + summary + "\n\n" +
"Question: " + question + "\n\n" +
"Answer as a risk manager with practical, concise guidance."
)
client = _get_inference_client()
print("Inference called successfully")
# 1. Format your prompt into OpenAI message style
messages = [{"role": "user", "content": prompt}]
print(messages)
response = client.chat.completions.create(
model=HF_MODEL_ID,
messages=messages, # Changed from prompt
max_tokens=HF_MAX_TOKENS, # Changed from max_new_tokens
temperature=HF_TEMPERATURE,
top_p=0.95
)
#response = client.chat.completions.create(
# model=HF_MODEL_ID,
# prompt = prompt,
# max_new_tokens= HF_MAX_TOKENS,
# temperature= HF_TEMPERATURE,
# top_k= 50,
# top_p= 0.95
#)
print(response.choices[0].message.content)
output_text = response.choices[0].message.content if hasattr(response, 'choices') else (response[0].get('generated_text') if isinstance(response, list) else response)
return (output_text)
#if isinstance(response, dict):
# return response.get("generated_text") or response.get("text") or str(response)
#if isinstance(response, list) and len(response) > 0:
# return response[0].get("generated_text", str(response[0]))
#return str(response)