# deep_dive_agentic.py """ Agentic analytical code generation + execution engine using Hugging Face FLOW: User Question ↓ LLM generates pandas code ↓ Python executes code safely ↓ LLM interprets results ↓ Return code + interpretation Environment: export HUGGINGFACE_API_TOKEN=... FIXES APPLIED (v2): - FIX 1: exec() now uses a single merged namespace dict so result variables are reliably written back (Python bug with separate globals/locals). - FIX 2: Smart result detection — scans namespace for any new DataFrame/Series instead of relying on hardcoded variable names (result_1, final_result). - FIX 3: _fix_pandas_compatibility() is now actually called before exec(). """ # --------------------------------------------------- # IMPORTS # --------------------------------------------------- import pandas as pd import json import os import re try: from huggingface_hub import InferenceClient except ImportError as exc: raise ImportError( "huggingface_hub is required. Install with `pip install huggingface-hub`." ) from exc from analytics.performance_analysis import generate_metric_view # --------------------------------------------------- # HF CONFIG # --------------------------------------------------- HF_MODEL_ID = os.environ.get("HF_MODEL_ID", "Qwen/Qwen2.5-7B-Instruct") HF_TOKEN = os.environ.get("HUGGINGFACE_API_TOKEN") # --------------------------------------------------- # HELPER: GET INFERENCE CLIENT # --------------------------------------------------- def _get_hf_client(): if not HF_TOKEN: raise RuntimeError( "HUGGINGFACE_API_TOKEN is required. Set it in your environment." ) return InferenceClient(token=HF_TOKEN) # --------------------------------------------------- # HELPER: EXTRACT JSON FROM LLM RESPONSE # --------------------------------------------------- def _extract_json(text: str): match = re.search(r"\{.*\}", text, re.S) if not match: return None payload = match.group(0) try: return json.loads(payload) except json.JSONDecodeError: try: cleaned = re.sub(r"[\n\r]+", " ", payload) cleaned = re.sub(r"(['\"])?([a-zA-Z0-9_]+)(['\"])?\s*:\s*", r'"\2": ', cleaned) return json.loads(cleaned) except Exception: return None # --------------------------------------------------- # HELPER: FIX COMMON PANDAS COMPATIBILITY ISSUES # --------------------------------------------------- def _fix_pandas_compatibility(code: str) -> str: """ Fix common pandas API compatibility issues in generated code. Handles version differences between pandas versions. """ # Fix: .reset_index(name=...) -> .reset_index(names=[...]) code = re.sub( r"\.reset_index\(name=(['\"])([^'\"]+)\1\)", r".reset_index(names=[\1\2\1])", code ) # Fix: .reset_index(name= with variable code = re.sub( r"\.reset_index\(name=([a-zA-Z_][a-zA-Z0-9_]*)\)", r".reset_index(names=[\1])", code ) # Fix: df.append() deprecated in newer pandas -> pd.concat() code = re.sub( r"(\w+)\.append\((\w+),\s*ignore_index=True\)", r"pd.concat([\1, \2], ignore_index=True)", code ) return code # --------------------------------------------------- # STEP 1: CODE GENERATION # --------------------------------------------------- def generate_analysis_requirements( question: str, acq: pd.DataFrame, perf: pd.DataFrame, master_df: pd.DataFrame ): """ LLM breaks down question into 1-3 structured analytics requirements. Each requirement includes a description and executable pandas code. """ client = _get_hf_client() # Build detailed column descriptions acq_cols = { "account_id": "unique account identifier", "booking_date": "when account was originated", "booking_vintage": "year-month of origination (YYYY-MM)", "fico_band": "FICO score bracket (e.g., 700-750, 750-800)", "sourcing_channel": "acquisition channel (e.g., Online, Branch, Broker)", "city_tier": "city classification (Tier-1, Tier-2, Tier-3)", "occupation_type": "borrower occupation category", "credit_limit": "approved credit line amount" } perf_cols = { "account_id": "unique account identifier", "reporting_month": "month of performance observation (YYYY-MM)", "mob": "months on books (age of account in months)", "dpd": "days past due (0, 30, 60, 90+)", "balance": "current outstanding balance", "ncl_amount": "net charge-off amount (dollars)", "payment": "payment amount in period" } prompt = ( "You are a senior retail credit risk analyst with 15+ years of portfolio management experience.\n\n" "Your task:\n" "1. Analyze the user's analytical question deeply\n" "2. Determine 1-3 specific analytics requirements needed to fully answer the question\n" "3. For EACH requirement, generate executable pandas code\n" "4. Return ONLY valid JSON, no other text\n\n" "AVAILABLE DATA:\n" "- acq: acquisition data with columns: " + ", ".join(acq_cols.keys()) + "\n" "- perf: performance data with columns: " + ", ".join(perf_cols.keys()) + "\n" "- master_df: merged acq+perf, includes all above columns\n\n" "COLUMN DESCRIPTIONS:\n" "Acquisition (acq):\n" + "\n".join([f" - {k}: {v}" for k, v in acq_cols.items()]) + "\n\n" "Performance (perf):\n" + "\n".join([f" - {k}: {v}" for k, v in perf_cols.items()]) + "\n\n" "Available Risk Metrics via generate_metric_view(df, metric_name, group_col):\n" " - 30+@3 (30+ dpd at 3 months)\n" " - 30+@6 (30+ dpd at 6 months)\n" " - 60+@6 (60+ dpd at 6 months)\n" " - Yr1 NCL (Year 1 net charge-off rate)\n\n" "CODE GENERATION RULES:\n" "- Generate pandas code ONLY\n" "- IMPORTANT: Always store your final result in a variable named exactly 'result_1', 'result_2', or 'result_3' matching the sequence number\n" "- Use meaningful intermediate variable names (e.g., vintage_analysis, segment_summary)\n" "- Focus on GROUP BY aggregations for insights\n" "- Calculate rates as dollars/total (percentage)\n" "- Sort by risk metrics (descending) to identify worst segments\n" "- Add brief comments for clarity\n" "- NO markdown, NO explanations outside JSON\n\n" "JSON STRUCTURE:\n" "{\n" ' "requirements": [\n' ' {\n' ' "sequence": 1,\n' ' "title": "Analysis title",\n' ' "description": "What this code does and why it matters",\n' ' "code": "pandas code here — must assign final result to result_1"\n' " }\n" " ]\n" "}\n\n" "User Question:\n" + question ) messages = [ { "role": "system", "content": ( "You are a senior credit risk analyst who generates pandas code for portfolio analytics. " "Return ONLY valid JSON. Always store the final result in result_1, result_2, or result_3." ) }, {"role": "user", "content": prompt} ] response = client.chat.completions.create( model=HF_MODEL_ID, messages=messages, max_tokens=2048, temperature=0.1, top_p=0.95 ) response_text = ( response.choices[0].message.content if hasattr(response, "choices") else str(response) ) # Extract JSON spec = _extract_json(response_text) if not spec: return { "success": False, "requirements": [], "error": f"Failed to parse JSON from LLM response: {response_text[:200]}", "raw_response": response_text } requirements = spec.get("requirements", []) if not requirements: return { "success": False, "requirements": [], "error": f"LLM returned no requirements. Response keys: {list(spec.keys())}", "raw_response": response_text[:300] } print(f"[DEBUG] Generated {len(requirements)} requirements for question: {question[:80]}") for i, req in enumerate(requirements, 1): print(f" Req {i}: {req.get('title')}") return { "success": True, "requirements": requirements, "error": None } # --------------------------------------------------- # STEP 2: CODE EXECUTION (LOOPED) # --------------------------------------------------- def execute_requirement_code( code: str, acq: pd.DataFrame, perf: pd.DataFrame, master_df: pd.DataFrame, requirement_num: int ): """ Safely execute generated pandas code for a single requirement. FIXES: - FIX 1: Single namespace dict passed to exec() so variable assignments are reliably captured (Python quirk with separate globals/locals). - FIX 2: Smart result detection — checks named keys first, then scans for any new DataFrame/Series, then any non-None new variable. - FIX 3: _fix_pandas_compatibility() called before exec(). """ # FIX 3: Apply pandas compatibility patches BEFORE executing code = _fix_pandas_compatibility(code) # FIX 1: Merge everything into ONE dict so exec() writes back correctly. # When you pass separate globals + locals to exec(), Python's bytecode # compiler uses STORE_FAST which writes to an internal frame and does NOT # update the locals dict you passed in — so result variables always come # back None. Using a single namespace avoids this entirely. namespace = { "pd": pd, "generate_metric_view": generate_metric_view, "__builtins__": __builtins__, # Data available to generated code "acq": acq, "perf": perf, "master_df": master_df, } # Snapshot of keys before exec so we can detect newly created variables keys_before = set(namespace.keys()) try: print(f"[DEBUG] Executing requirement {requirement_num}...") print(f"[DEBUG] Code preview: {code[:120].strip()}...") exec(code, namespace) # FIX 1: single namespace # FIX 2: Smart result detection — three priority tiers # --- Tier 1: expected named result variables --- result = None expected_keys = [ f"result_{requirement_num}", "final_result", "result", ] for key in expected_keys: if key in namespace and namespace[key] is not None: result = namespace[key] print(f"[DEBUG] Found result in expected variable: '{key}'") break # --- Tier 2: any NEW DataFrame or Series created during exec --- if result is None: new_keys = set(namespace.keys()) - keys_before for key in new_keys: val = namespace[key] if isinstance(val, (pd.DataFrame, pd.Series)) and val is not None: result = val print(f"[DEBUG] Found result by scanning new DataFrame/Series: '{key}'") break # --- Tier 3: any new non-None, non-private variable --- if result is None: new_keys = set(namespace.keys()) - keys_before for key in sorted(new_keys): # sorted for determinism if key.startswith("_"): continue val = namespace[key] if val is not None: result = val print(f"[DEBUG] Fallback: found result in new variable: '{key}'") break if result is None: result = "Code executed successfully but no result variable was found in namespace." print(f"[DEBUG] Req {requirement_num} success. Result type: {type(result).__name__}") return { "success": True, "result": result, "error": None } except Exception as e: import traceback tb = traceback.format_exc() print(f"[DEBUG] Req {requirement_num} FAILED: {str(e)}") print(f"[DEBUG] Traceback:\n{tb}") return { "success": False, "result": None, "error": str(e) } def execute_all_requirements( requirements: list, acq: pd.DataFrame, perf: pd.DataFrame, master_df: pd.DataFrame ): """ Execute all requirements sequentially, building context. """ print(f"[DEBUG] Starting execution of {len(requirements)} requirements") all_results = [] context_text = "" for i, req in enumerate(requirements, 1): code = req.get("code", "") description = req.get("description", "") title = req.get("title", f"Analysis {i}") exec_result = execute_requirement_code(code, acq, perf, master_df, i) all_results.append({ "sequence": i, "title": title, "description": description, "code": code, # "success" is what app.py checks via res.get("success") # "execution_success" kept for backward compatibility "success": exec_result["success"], "execution_success": exec_result["success"], "result": exec_result["result"], "error": exec_result.get("error") }) # Build context for interpretation if exec_result["success"]: context_text += f"\nAnalysis {i} ({title}):\n{str(exec_result['result'])}\n" else: context_text += f"\nAnalysis {i} ({title}) FAILED:\n{exec_result['error']}\n" return all_results, context_text # --------------------------------------------------- # STEP 3: RESULT INTERPRETATION # --------------------------------------------------- def interpret_all_results( question: str, all_results: list, context_text: str ): """ Senior risk analyst LLM interprets all results holistically. """ print(f"[DEBUG] Interpreting results for {len(all_results)} analyses") print(f"[DEBUG] Successful executions: {sum(1 for r in all_results if r.get('success'))}") client = _get_hf_client() # Format all analyses analyses_text = "" for res in all_results: analyses_text += f"\n{'=' * 60}\n" analyses_text += f"Analysis {res['sequence']}: {res['title']}\n" analyses_text += f"Description: {res['description']}\n" analyses_text += f"{'=' * 60}\n" if res["success"]: analyses_text += f"Result:\n{str(res['result'])}\n" else: analyses_text += f"Execution Error: {res['error']}\n" prompt = ( "You are a senior retail credit risk analyst with 15+ years of portfolio management experience.\n\n" "Your task:\n" "Synthesize the analytical results and provide comprehensive risk insights.\n\n" "Focus on:\n" "- Key findings and patterns across all analyses\n" "- Risk deterioration or improvement trends\n" "- Vintage/segment concentration issues and implications\n" "- Root causes of observed patterns\n" "- Unusual trends, anomalies, or red flags\n" "- Actionable recommendations for portfolio management\n" "- Comparative risk assessment (which segments/vintages are most/least risky)\n\n" "Guidelines:\n" "- Be analytical and specific (not generic)\n" "- Focus on business implications, not just statistics\n" "- Avoid repeating raw tables; interpret the meaning\n" "- Provide 3-5 key insights\n" "- Suggest next investigative steps if needed\n\n" "User's Original Question:\n" + question + "\n\n" "Analyses Performed:\n" + analyses_text + "\n\n" "Provide your senior analyst interpretation:" ) messages = [ { "role": "system", "content": "You are a senior credit risk analyst providing executive insights from portfolio analytics." }, {"role": "user", "content": prompt} ] response = client.chat.completions.create( model=HF_MODEL_ID, messages=messages, max_tokens=1024, temperature=0.3, top_p=0.95 ) interpretation = ( response.choices[0].message.content if hasattr(response, "choices") else str(response) ) return interpretation # --------------------------------------------------- # MASTER ORCHESTRATOR FUNCTION # --------------------------------------------------- def run_deep_dive_analysis( question: str, acq: pd.DataFrame, perf: pd.DataFrame, master_df: pd.DataFrame ): """ End-to-end deep dive analysis: 1. Break question into 1-3 structured requirements 2. Generate code for each requirement 3. Execute each requirement's code sequentially 4. Synthesize results and provide senior analyst interpretation """ print(f"\n[DEEP DIVE START] Question: {question}") print(f"[DEBUG] Data shapes - Acq: {acq.shape}, Perf: {perf.shape}, Master: {master_df.shape}") # Step 1: Generate requirements print("[DEBUG] Step 1: Generating requirements...") req_response = generate_analysis_requirements(question, acq, perf, master_df) if not req_response["success"]: return { "success": False, "question": question, "requirements": [], "all_results": [], "interpretation": f"Failed to generate requirements: {req_response['error']}", "error": req_response["error"] } requirements = req_response["requirements"][:3] # Cap at 3 # Step 2 & 3: Execute all requirements print(f"[DEBUG] Step 2-3: Executing {len(requirements)} requirements...") all_results, context_text = execute_all_requirements(requirements, acq, perf, master_df) # Step 4: Interpret results print("[DEBUG] Step 4: Interpreting all results...") interpretation = interpret_all_results(question, all_results, context_text) print("[DEEP DIVE END] Analysis complete\n") return { "success": True, "question": question, "requirements": requirements, "all_results": all_results, "interpretation": interpretation, "error": None }