Risk_Manager / analytics /deep_dive_agenticv2.py
GenAICoder's picture
Rename analytics/deep_dive_agentic.py to analytics/deep_dive_agenticv2.py
ae3693a verified
Raw
History Blame Contribute Delete
18.6 kB
# deep_dive_agentic.py
"""
Agentic analytical code generation + execution engine using Hugging Face
FLOW:
User Question
LLM generates pandas code
Python executes code safely
LLM interprets results
Return code + interpretation
Environment:
export HUGGINGFACE_API_TOKEN=...
FIXES APPLIED (v2):
- FIX 1: exec() now uses a single merged namespace dict so result variables
are reliably written back (Python bug with separate globals/locals).
- FIX 2: Smart result detection — scans namespace for any new DataFrame/Series
instead of relying on hardcoded variable names (result_1, final_result).
- FIX 3: _fix_pandas_compatibility() is now actually called before exec().
"""
# ---------------------------------------------------
# IMPORTS
# ---------------------------------------------------
import pandas as pd
import json
import os
import re
try:
from huggingface_hub import InferenceClient
except ImportError as exc:
raise ImportError(
"huggingface_hub is required. Install with `pip install huggingface-hub`."
) from exc
from analytics.performance_analysis import generate_metric_view
# ---------------------------------------------------
# HF CONFIG
# ---------------------------------------------------
HF_MODEL_ID = os.environ.get("HF_MODEL_ID", "Qwen/Qwen2.5-7B-Instruct")
HF_TOKEN = os.environ.get("HUGGINGFACE_API_TOKEN")
# ---------------------------------------------------
# HELPER: GET INFERENCE CLIENT
# ---------------------------------------------------
def _get_hf_client():
if not HF_TOKEN:
raise RuntimeError(
"HUGGINGFACE_API_TOKEN is required. Set it in your environment."
)
return InferenceClient(token=HF_TOKEN)
# ---------------------------------------------------
# HELPER: EXTRACT JSON FROM LLM RESPONSE
# ---------------------------------------------------
def _extract_json(text: str):
match = re.search(r"\{.*\}", text, re.S)
if not match:
return None
payload = match.group(0)
try:
return json.loads(payload)
except json.JSONDecodeError:
try:
cleaned = re.sub(r"[\n\r]+", " ", payload)
cleaned = re.sub(r"(['\"])?([a-zA-Z0-9_]+)(['\"])?\s*:\s*", r'"\2": ', cleaned)
return json.loads(cleaned)
except Exception:
return None
# ---------------------------------------------------
# HELPER: FIX COMMON PANDAS COMPATIBILITY ISSUES
# ---------------------------------------------------
def _fix_pandas_compatibility(code: str) -> str:
"""
Fix common pandas API compatibility issues in generated code.
Handles version differences between pandas versions.
"""
# Fix: .reset_index(name=...) -> .reset_index(names=[...])
code = re.sub(
r"\.reset_index\(name=(['\"])([^'\"]+)\1\)",
r".reset_index(names=[\1\2\1])",
code
)
# Fix: .reset_index(name= with variable
code = re.sub(
r"\.reset_index\(name=([a-zA-Z_][a-zA-Z0-9_]*)\)",
r".reset_index(names=[\1])",
code
)
# Fix: df.append() deprecated in newer pandas -> pd.concat()
code = re.sub(
r"(\w+)\.append\((\w+),\s*ignore_index=True\)",
r"pd.concat([\1, \2], ignore_index=True)",
code
)
return code
# ---------------------------------------------------
# STEP 1: CODE GENERATION
# ---------------------------------------------------
def generate_analysis_requirements(
question: str,
acq: pd.DataFrame,
perf: pd.DataFrame,
master_df: pd.DataFrame
):
"""
LLM breaks down question into 1-3 structured analytics requirements.
Each requirement includes a description and executable pandas code.
"""
client = _get_hf_client()
# Build detailed column descriptions
acq_cols = {
"account_id": "unique account identifier",
"booking_date": "when account was originated",
"booking_vintage": "year-month of origination (YYYY-MM)",
"fico_band": "FICO score bracket (e.g., 700-750, 750-800)",
"sourcing_channel": "acquisition channel (e.g., Online, Branch, Broker)",
"city_tier": "city classification (Tier-1, Tier-2, Tier-3)",
"occupation_type": "borrower occupation category",
"credit_limit": "approved credit line amount"
}
perf_cols = {
"account_id": "unique account identifier",
"reporting_month": "month of performance observation (YYYY-MM)",
"mob": "months on books (age of account in months)",
"dpd": "days past due (0, 30, 60, 90+)",
"balance": "current outstanding balance",
"ncl_amount": "net charge-off amount (dollars)",
"payment": "payment amount in period"
}
prompt = (
"You are a senior retail credit risk analyst with 15+ years of portfolio management experience.\n\n"
"Your task:\n"
"1. Analyze the user's analytical question deeply\n"
"2. Determine 1-3 specific analytics requirements needed to fully answer the question\n"
"3. For EACH requirement, generate executable pandas code\n"
"4. Return ONLY valid JSON, no other text\n\n"
"AVAILABLE DATA:\n"
"- acq: acquisition data with columns: " + ", ".join(acq_cols.keys()) + "\n"
"- perf: performance data with columns: " + ", ".join(perf_cols.keys()) + "\n"
"- master_df: merged acq+perf, includes all above columns\n\n"
"COLUMN DESCRIPTIONS:\n"
"Acquisition (acq):\n"
+ "\n".join([f" - {k}: {v}" for k, v in acq_cols.items()]) + "\n\n"
"Performance (perf):\n"
+ "\n".join([f" - {k}: {v}" for k, v in perf_cols.items()]) + "\n\n"
"Available Risk Metrics via generate_metric_view(df, metric_name, group_col):\n"
" - 30+@3 (30+ dpd at 3 months)\n"
" - 30+@6 (30+ dpd at 6 months)\n"
" - 60+@6 (60+ dpd at 6 months)\n"
" - Yr1 NCL (Year 1 net charge-off rate)\n\n"
"CODE GENERATION RULES:\n"
"- Generate pandas code ONLY\n"
"- IMPORTANT: Always store your final result in a variable named exactly 'result_1', 'result_2', or 'result_3' matching the sequence number\n"
"- Use meaningful intermediate variable names (e.g., vintage_analysis, segment_summary)\n"
"- Focus on GROUP BY aggregations for insights\n"
"- Calculate rates as dollars/total (percentage)\n"
"- Sort by risk metrics (descending) to identify worst segments\n"
"- Add brief comments for clarity\n"
"- NO markdown, NO explanations outside JSON\n\n"
"JSON STRUCTURE:\n"
"{\n"
' "requirements": [\n'
' {\n'
' "sequence": 1,\n'
' "title": "Analysis title",\n'
' "description": "What this code does and why it matters",\n'
' "code": "pandas code here — must assign final result to result_1"\n'
" }\n"
" ]\n"
"}\n\n"
"User Question:\n" + question
)
messages = [
{
"role": "system",
"content": (
"You are a senior credit risk analyst who generates pandas code for portfolio analytics. "
"Return ONLY valid JSON. Always store the final result in result_1, result_2, or result_3."
)
},
{"role": "user", "content": prompt}
]
response = client.chat.completions.create(
model=HF_MODEL_ID,
messages=messages,
max_tokens=2048,
temperature=0.1,
top_p=0.95
)
response_text = (
response.choices[0].message.content
if hasattr(response, "choices")
else str(response)
)
# Extract JSON
spec = _extract_json(response_text)
if not spec:
return {
"success": False,
"requirements": [],
"error": f"Failed to parse JSON from LLM response: {response_text[:200]}",
"raw_response": response_text
}
requirements = spec.get("requirements", [])
if not requirements:
return {
"success": False,
"requirements": [],
"error": f"LLM returned no requirements. Response keys: {list(spec.keys())}",
"raw_response": response_text[:300]
}
print(f"[DEBUG] Generated {len(requirements)} requirements for question: {question[:80]}")
for i, req in enumerate(requirements, 1):
print(f" Req {i}: {req.get('title')}")
return {
"success": True,
"requirements": requirements,
"error": None
}
# ---------------------------------------------------
# STEP 2: CODE EXECUTION (LOOPED)
# ---------------------------------------------------
def execute_requirement_code(
code: str,
acq: pd.DataFrame,
perf: pd.DataFrame,
master_df: pd.DataFrame,
requirement_num: int
):
"""
Safely execute generated pandas code for a single requirement.
FIXES:
- FIX 1: Single namespace dict passed to exec() so variable assignments
are reliably captured (Python quirk with separate globals/locals).
- FIX 2: Smart result detection — checks named keys first, then scans
for any new DataFrame/Series, then any non-None new variable.
- FIX 3: _fix_pandas_compatibility() called before exec().
"""
# FIX 3: Apply pandas compatibility patches BEFORE executing
code = _fix_pandas_compatibility(code)
# FIX 1: Merge everything into ONE dict so exec() writes back correctly.
# When you pass separate globals + locals to exec(), Python's bytecode
# compiler uses STORE_FAST which writes to an internal frame and does NOT
# update the locals dict you passed in — so result variables always come
# back None. Using a single namespace avoids this entirely.
namespace = {
"pd": pd,
"generate_metric_view": generate_metric_view,
"__builtins__": __builtins__,
# Data available to generated code
"acq": acq,
"perf": perf,
"master_df": master_df,
}
# Snapshot of keys before exec so we can detect newly created variables
keys_before = set(namespace.keys())
try:
print(f"[DEBUG] Executing requirement {requirement_num}...")
print(f"[DEBUG] Code preview: {code[:120].strip()}...")
exec(code, namespace) # FIX 1: single namespace
# FIX 2: Smart result detection — three priority tiers
# --- Tier 1: expected named result variables ---
result = None
expected_keys = [
f"result_{requirement_num}",
"final_result",
"result",
]
for key in expected_keys:
if key in namespace and namespace[key] is not None:
result = namespace[key]
print(f"[DEBUG] Found result in expected variable: '{key}'")
break
# --- Tier 2: any NEW DataFrame or Series created during exec ---
if result is None:
new_keys = set(namespace.keys()) - keys_before
for key in new_keys:
val = namespace[key]
if isinstance(val, (pd.DataFrame, pd.Series)) and val is not None:
result = val
print(f"[DEBUG] Found result by scanning new DataFrame/Series: '{key}'")
break
# --- Tier 3: any new non-None, non-private variable ---
if result is None:
new_keys = set(namespace.keys()) - keys_before
for key in sorted(new_keys): # sorted for determinism
if key.startswith("_"):
continue
val = namespace[key]
if val is not None:
result = val
print(f"[DEBUG] Fallback: found result in new variable: '{key}'")
break
if result is None:
result = "Code executed successfully but no result variable was found in namespace."
print(f"[DEBUG] Req {requirement_num} success. Result type: {type(result).__name__}")
return {
"success": True,
"result": result,
"error": None
}
except Exception as e:
import traceback
tb = traceback.format_exc()
print(f"[DEBUG] Req {requirement_num} FAILED: {str(e)}")
print(f"[DEBUG] Traceback:\n{tb}")
return {
"success": False,
"result": None,
"error": str(e)
}
def execute_all_requirements(
requirements: list,
acq: pd.DataFrame,
perf: pd.DataFrame,
master_df: pd.DataFrame
):
"""
Execute all requirements sequentially, building context.
"""
print(f"[DEBUG] Starting execution of {len(requirements)} requirements")
all_results = []
context_text = ""
for i, req in enumerate(requirements, 1):
code = req.get("code", "")
description = req.get("description", "")
title = req.get("title", f"Analysis {i}")
exec_result = execute_requirement_code(code, acq, perf, master_df, i)
all_results.append({
"sequence": i,
"title": title,
"description": description,
"code": code,
# "success" is what app.py checks via res.get("success")
# "execution_success" kept for backward compatibility
"success": exec_result["success"],
"execution_success": exec_result["success"],
"result": exec_result["result"],
"error": exec_result.get("error")
})
# Build context for interpretation
if exec_result["success"]:
context_text += f"\nAnalysis {i} ({title}):\n{str(exec_result['result'])}\n"
else:
context_text += f"\nAnalysis {i} ({title}) FAILED:\n{exec_result['error']}\n"
return all_results, context_text
# ---------------------------------------------------
# STEP 3: RESULT INTERPRETATION
# ---------------------------------------------------
def interpret_all_results(
question: str,
all_results: list,
context_text: str
):
"""
Senior risk analyst LLM interprets all results holistically.
"""
print(f"[DEBUG] Interpreting results for {len(all_results)} analyses")
print(f"[DEBUG] Successful executions: {sum(1 for r in all_results if r.get('success'))}")
client = _get_hf_client()
# Format all analyses
analyses_text = ""
for res in all_results:
analyses_text += f"\n{'=' * 60}\n"
analyses_text += f"Analysis {res['sequence']}: {res['title']}\n"
analyses_text += f"Description: {res['description']}\n"
analyses_text += f"{'=' * 60}\n"
if res["success"]:
analyses_text += f"Result:\n{str(res['result'])}\n"
else:
analyses_text += f"Execution Error: {res['error']}\n"
prompt = (
"You are a senior retail credit risk analyst with 15+ years of portfolio management experience.\n\n"
"Your task:\n"
"Synthesize the analytical results and provide comprehensive risk insights.\n\n"
"Focus on:\n"
"- Key findings and patterns across all analyses\n"
"- Risk deterioration or improvement trends\n"
"- Vintage/segment concentration issues and implications\n"
"- Root causes of observed patterns\n"
"- Unusual trends, anomalies, or red flags\n"
"- Actionable recommendations for portfolio management\n"
"- Comparative risk assessment (which segments/vintages are most/least risky)\n\n"
"Guidelines:\n"
"- Be analytical and specific (not generic)\n"
"- Focus on business implications, not just statistics\n"
"- Avoid repeating raw tables; interpret the meaning\n"
"- Provide 3-5 key insights\n"
"- Suggest next investigative steps if needed\n\n"
"User's Original Question:\n" + question + "\n\n"
"Analyses Performed:\n" + analyses_text + "\n\n"
"Provide your senior analyst interpretation:"
)
messages = [
{
"role": "system",
"content": "You are a senior credit risk analyst providing executive insights from portfolio analytics."
},
{"role": "user", "content": prompt}
]
response = client.chat.completions.create(
model=HF_MODEL_ID,
messages=messages,
max_tokens=1024,
temperature=0.3,
top_p=0.95
)
interpretation = (
response.choices[0].message.content
if hasattr(response, "choices")
else str(response)
)
return interpretation
# ---------------------------------------------------
# MASTER ORCHESTRATOR FUNCTION
# ---------------------------------------------------
def run_deep_dive_analysis(
question: str,
acq: pd.DataFrame,
perf: pd.DataFrame,
master_df: pd.DataFrame
):
"""
End-to-end deep dive analysis:
1. Break question into 1-3 structured requirements
2. Generate code for each requirement
3. Execute each requirement's code sequentially
4. Synthesize results and provide senior analyst interpretation
"""
print(f"\n[DEEP DIVE START] Question: {question}")
print(f"[DEBUG] Data shapes - Acq: {acq.shape}, Perf: {perf.shape}, Master: {master_df.shape}")
# Step 1: Generate requirements
print("[DEBUG] Step 1: Generating requirements...")
req_response = generate_analysis_requirements(question, acq, perf, master_df)
if not req_response["success"]:
return {
"success": False,
"question": question,
"requirements": [],
"all_results": [],
"interpretation": f"Failed to generate requirements: {req_response['error']}",
"error": req_response["error"]
}
requirements = req_response["requirements"][:3] # Cap at 3
# Step 2 & 3: Execute all requirements
print(f"[DEBUG] Step 2-3: Executing {len(requirements)} requirements...")
all_results, context_text = execute_all_requirements(requirements, acq, perf, master_df)
# Step 4: Interpret results
print("[DEBUG] Step 4: Interpreting all results...")
interpretation = interpret_all_results(question, all_results, context_text)
print("[DEEP DIVE END] Analysis complete\n")
return {
"success": True,
"question": question,
"requirements": requirements,
"all_results": all_results,
"interpretation": interpretation,
"error": None
}