Risk_Manager / analytics /deep_dive_agenticv1.py
GenAICoder's picture
Rename analytics/deep_dive_agentic.py to analytics/deep_dive_agenticv1.py
37e1693 verified
Raw
History Blame Contribute Delete
15.3 kB
# deep_dive_agentic.py
"""
Agentic analytical code generation + execution engine using Hugging Face
FLOW:
User Question
LLM generates pandas code
Python executes code safely
LLM interprets results
Return code + interpretation
Environment:
export HUGGINGFACE_API_TOKEN=...
"""
# ---------------------------------------------------
# IMPORTS
# ---------------------------------------------------
import pandas as pd
import json
import os
import re
try:
from huggingface_hub import InferenceClient
except ImportError as exc:
raise ImportError(
"huggingface_hub is required. Install with `pip install huggingface-hub`."
) from exc
from analytics.performance_analysis import generate_metric_view
# ---------------------------------------------------
# HF CONFIG
# ---------------------------------------------------
HF_MODEL_ID = os.environ.get("HF_MODEL_ID", "Qwen/Qwen2.5-7B-Instruct")
HF_TOKEN = os.environ.get("HUGGINGFACE_API_TOKEN")
# ---------------------------------------------------
# HELPER: GET INFERENCE CLIENT
# ---------------------------------------------------
def _get_hf_client():
if not HF_TOKEN:
raise RuntimeError(
"HUGGINGFACE_API_TOKEN is required. Set it in your environment."
)
return InferenceClient(token=HF_TOKEN)
# ---------------------------------------------------
# HELPER: EXTRACT JSON FROM LLM RESPONSE
# ---------------------------------------------------
def _extract_json(text: str):
match = re.search(r"\{.*\}", text, re.S)
if not match:
return None
payload = match.group(0)
try:
return json.loads(payload)
except json.JSONDecodeError:
try:
cleaned = re.sub(r"[\n\r]+", " ", payload)
cleaned = re.sub(r"(['\"])?([a-zA-Z0-9_]+)(['\"])?\s*:\s*", r'"\2": ', cleaned)
return json.loads(cleaned)
except Exception:
return None
# ---------------------------------------------------
# HELPER: FIX COMMON PANDAS COMPATIBILITY ISSUES
# ---------------------------------------------------
def _fix_pandas_compatibility(code: str):
"""
Fix common pandas API compatibility issues in generated code.
Handles version differences between pandas versions.
"""
# Fix: .reset_index(name=...) -> .reset_index(names=[...])
code = re.sub(
r"\.reset_index\(name=(['\"])([^'\"]+)\1\)",
r".reset_index(names=[\1\2\1])",
code
)
# Fix: .reset_index(name= with variable
code = re.sub(
r"\.reset_index\(name=([a-zA-Z_][a-zA-Z0-9_]*)\)",
r".reset_index(names=[\1])",
code
)
return code
# ---------------------------------------------------
# STEP 1: CODE GENERATION
# ---------------------------------------------------
def generate_analysis_requirements(question: str, acq: pd.DataFrame, perf: pd.DataFrame, master_df: pd.DataFrame):
"""
LLM breaks down question into 1-3 structured analytics requirements.
Each requirement includes a description and executable pandas code.
"""
client = _get_hf_client()
# Build detailed column descriptions
acq_cols = {
"account_id": "unique account identifier",
"booking_date": "when account was originated",
"booking_vintage": "year-month of origination (YYYY-MM)",
"fico_band": "FICO score bracket (e.g., 700-750, 750-800)",
"sourcing_channel": "acquisition channel (e.g., Online, Branch, Broker)",
"city_tier": "city classification (Tier-1, Tier-2, Tier-3)",
"occupation_type": "borrower occupation category",
"credit_limit": "approved credit line amount"
}
perf_cols = {
"account_id": "unique account identifier",
"reporting_month": "month of performance observation (YYYY-MM)",
"mob": "months on books (age of account in months)",
"dpd": "days past due (0, 30, 60, 90+)",
"balance": "current outstanding balance",
"ncl_amount": "net charge-off amount (dollars)",
"payment": "payment amount in period"
}
master_df_desc = "acq merged with perf on account_id; contains all acquisition + performance columns"
prompt = (
"You are a senior retail credit risk analyst with 15+ years of portfolio management experience.\n\n"
"Your task:\n"
"1. Analyze the user's analytical question deeply\n"
"2. Determine 1-3 specific analytics requirements needed to fully answer the question\n"
"3. For EACH requirement, generate executable pandas code\n"
"4. Return ONLY valid JSON, no other text\n\n"
"AVAILABLE DATA:\n"
"- acq: acquisition data with columns: " + ", ".join(acq_cols.keys()) + "\n"
"- perf: performance data with columns: " + ", ".join(perf_cols.keys()) + "\n"
"- master_df: merged acq+perf, includes all above columns\n\n"
"COLUMN DESCRIPTIONS:\n"
"Acquisition (acq):\n"
+ "\n".join([f" - {k}: {v}" for k, v in acq_cols.items()]) + "\n\n"
"Performance (perf):\n"
+ "\n".join([f" - {k}: {v}" for k, v in perf_cols.items()]) + "\n\n"
"Available Risk Metrics via generate_metric_view(df, metric_name, group_col):\n"
" - 30+@3 (30+ dpd at 3 months)\n"
" - 30+@6 (30+ dpd at 6 months)\n"
" - 60+@6 (60+ dpd at 6 months)\n"
" - Yr1 NCL (Year 1 net charge-off rate)\n\n"
"CODE GENERATION RULES:\n"
"- Generate pandas code ONLY\n"
"- Use meaningful variable names (e.g., vintage_analysis, segment_summary)\n"
"- Store analysis in a variable (e.g., result_1, result_2, result_3)\n"
"- Focus on GROUP BY aggregations for insights\n"
"- Calculate rates as dollars/total (percentage)\n"
"- Sort by risk metrics (descending) to identify worst segments\n"
"- Add brief comments for clarity\n"
"- NO markdown, NO explanations outside JSON\n\n"
"JSON STRUCTURE:\n"
"{\n"
' "requirements": [\n'
' {\n'
' "sequence": 1,\n'
' "title": "Analysis title",\n'
' "description": "What this code does and why it matters",\n'
' "code": "pandas code here"\n'
" }\n"
" ]\n"
"}\n\n"
"User Question:\n" + question
)
messages = [
{"role": "system", "content": "You are a senior credit risk analyst who generates pandas code for portfolio analytics. Return ONLY valid JSON."},
{"role": "user", "content": prompt}
]
response = client.chat.completions.create(
model=HF_MODEL_ID,
messages=messages,
max_tokens=2048,
temperature=0.1,
top_p=0.95
)
response_text = response.choices[0].message.content if hasattr(response, 'choices') else str(response)
# Extract JSON
spec = _extract_json(response_text)
if not spec:
return {
"success": False,
"requirements": [],
"error": f"Failed to parse JSON from LLM response: {response_text[:200]}",
"raw_response": response_text
}
requirements = spec.get("requirements", [])
if not requirements:
return {
"success": False,
"requirements": [],
"error": f"LLM returned no requirements. Response keys: {list(spec.keys())}",
"raw_response": response_text[:300]
}
print(f"[DEBUG] Generated {len(requirements)} requirements for question: {question[:80]}")
for i, req in enumerate(requirements, 1):
print(f" Req {i}: {req.get('title')}")
return {
"success": True,
"requirements": requirements,
"error": None
}
# ---------------------------------------------------
# STEP 2: CODE EXECUTION (LOOPED)
# ---------------------------------------------------
def execute_requirement_code(code: str, acq: pd.DataFrame, perf: pd.DataFrame, master_df: pd.DataFrame, requirement_num: int):
"""
Safely execute generated pandas code for a single requirement.
"""
# Create global scope with all dependencies
global_scope = {
"pd": pd,
"generate_metric_view": generate_metric_view,
"__builtins__": __builtins__
}
local_scope = {
"acq": acq,
"perf": perf,
"master_df": master_df
}
try:
print(f"[DEBUG] Executing requirement {requirement_num}...")
exec(code, global_scope, local_scope)
# Look for result variables (result_1, result_2, result_3, or final_result)
result_key = f"result_{requirement_num}" if f"result_{requirement_num}" in local_scope else "final_result"
result = local_scope.get(result_key, local_scope.get("result", "No result generated"))
print(f"[DEBUG] Req {requirement_num} success. Result type: {type(result).__name__}")
return {
"success": True,
"result": result,
"error": None
}
except Exception as e:
print(f"[DEBUG] Req {requirement_num} FAILED: {str(e)}")
return {
"success": False,
"result": None,
"error": str(e)
}
def execute_all_requirements(requirements: list, acq: pd.DataFrame, perf: pd.DataFrame, master_df: pd.DataFrame):
"""
Execute all requirements sequentially, building context.
"""
print(f"[DEBUG] Starting execution of {len(requirements)} requirements")
all_results = []
context_text = ""
for i, req in enumerate(requirements, 1):
code = req.get("code", "")
description = req.get("description", "")
title = req.get("title", f"Analysis {i}")
exec_result = execute_requirement_code(code, acq, perf, master_df, i)
all_results.append({
"sequence": i,
"title": title,
"description": description,
"code": code,
"execution_success": exec_result["success"],
"result": exec_result["result"],
"error": exec_result.get("error")
})
# Build context for interpretation
if exec_result["success"]:
context_text += f"\nAnalysis {i} ({title}):\n{str(exec_result['result'])}\n"
else:
context_text += f"\nAnalysis {i} ({title}) FAILED:\n{exec_result['error']}\n"
return all_results, context_text
# ---------------------------------------------------
# STEP 3: RESULT INTERPRETATION
# ---------------------------------------------------
def interpret_all_results(question: str, all_results: list, context_text: str):
"""
Senior risk analyst LLM interprets all results holistically.
"""
print(f"[DEBUG] Interpreting results for {len(all_results)} analyses")
print(f"[DEBUG] Successful executions: {sum(1 for r in all_results if r.get('success'))}")
client = _get_hf_client()
# Format all analyses
analyses_text = ""
for res in all_results:
analyses_text += f"\n{'='*60}\n"
analyses_text += f"Analysis {res['sequence']}: {res['title']}\n"
analyses_text += f"Description: {res['description']}\n"
analyses_text += f"{'='*60}\n"
if res['execution_success']:
analyses_text += f"Result:\n{str(res['result'])}\n"
else:
analyses_text += f"Execution Error: {res['error']}\n"
prompt = (
"You are a senior retail credit risk analyst with 15+ years of portfolio management experience.\n\n"
"Your task:\n"
"Synthesize the analytical results and provide comprehensive risk insights.\n\n"
"Focus on:\n"
"- Key findings and patterns across all analyses\n"
"- Risk deterioration or improvement trends\n"
"- Vintage/segment concentration issues and implications\n"
"- Root causes of observed patterns\n"
"- Unusual trends, anomalies, or red flags\n"
"- Actionable recommendations for portfolio management\n"
"- Comparative risk assessment (which segments/vintages are most/least risky)\n\n"
"Guidelines:\n"
"- Be analytical and specific (not generic)\n"
"- Focus on business implications, not just statistics\n"
"- Avoid repeating raw tables; interpret the meaning\n"
"- Provide 3-5 key insights\n"
"- Suggest next investigative steps if needed\n\n"
"User's Original Question:\n" + question + "\n\n"
"Analyses Performed:\n" + analyses_text + "\n\n"
"Provide your senior analyst interpretation:"
)
messages = [
{"role": "system", "content": "You are a senior credit risk analyst providing executive insights from portfolio analytics."},
{"role": "user", "content": prompt}
]
response = client.chat.completions.create(
model=HF_MODEL_ID,
messages=messages,
max_tokens=1024,
temperature=0.3,
top_p=0.95
)
interpretation = response.choices[0].message.content if hasattr(response, 'choices') else str(response)
return interpretation
# ---------------------------------------------------
# MASTER ORCHESTRATOR FUNCTION
# ---------------------------------------------------
def run_deep_dive_analysis(question: str, acq: pd.DataFrame, perf: pd.DataFrame, master_df: pd.DataFrame):
"""
End-to-end deep dive analysis:
1. Break question into 1-3 structured requirements
2. Generate code for each requirement
3. Execute each requirement's code sequentially
4. Synthesize results and provide senior analyst interpretation
"""
print(f"\n[DEEP DIVE START] Question: {question}")
print(f"[DEBUG] Data shapes - Acq: {acq.shape}, Perf: {perf.shape}, Master: {master_df.shape}")
# Step 1: Generate requirements
print(f"[DEBUG] Step 1: Generating requirements...")
req_response = generate_analysis_requirements(question, acq, perf, master_df)
if not req_response["success"]:
return {
"success": False,
"question": question,
"requirements": [],
"all_results": [],
"interpretation": f"Failed to generate requirements: {req_response['error']}",
"error": req_response["error"]
}
requirements = req_response["requirements"][:3] # Cap at 3
# Step 2 & 3: Execute all requirements
print(f"[DEBUG] Step 2-3: Executing {len(requirements)} requirements...")
all_results, context_text = execute_all_requirements(requirements, acq, perf, master_df)
# Step 4: Interpret results
print(f"[DEBUG] Step 4: Interpreting all results...")
interpretation = interpret_all_results(question, all_results, context_text)
print(f"[DEEP DIVE END] Analysis complete\n")
return {
"success": True,
"question": question,
"requirements": requirements,
"all_results": all_results,
"interpretation": interpretation,
"error": None
}