Spaces:
Sleeping
Sleeping
| # deep_dive_agentic.py | |
| """ | |
| Agentic analytical code generation + execution engine using Hugging Face | |
| FLOW: | |
| User Question | |
| ↓ | |
| LLM generates pandas code | |
| ↓ | |
| Python executes code safely | |
| ↓ | |
| LLM interprets results | |
| ↓ | |
| Return code + interpretation | |
| Environment: | |
| export HUGGINGFACE_API_TOKEN=... | |
| FIXES APPLIED (v2): | |
| - FIX 1: exec() now uses a single merged namespace dict so result variables | |
| are reliably written back (Python bug with separate globals/locals). | |
| - FIX 2: Smart result detection — scans namespace for any new DataFrame/Series | |
| instead of relying on hardcoded variable names (result_1, final_result). | |
| - FIX 3: _fix_pandas_compatibility() is now actually called before exec(). | |
| """ | |
| # --------------------------------------------------- | |
| # IMPORTS | |
| # --------------------------------------------------- | |
| import pandas as pd | |
| import json | |
| import os | |
| import re | |
| try: | |
| from huggingface_hub import InferenceClient | |
| except ImportError as exc: | |
| raise ImportError( | |
| "huggingface_hub is required. Install with `pip install huggingface-hub`." | |
| ) from exc | |
| from analytics.performance_analysis import generate_metric_view | |
| # --------------------------------------------------- | |
| # HF CONFIG | |
| # --------------------------------------------------- | |
| HF_MODEL_ID = os.environ.get("HF_MODEL_ID", "Qwen/Qwen2.5-7B-Instruct") | |
| HF_TOKEN = os.environ.get("HUGGINGFACE_API_TOKEN") | |
| # --------------------------------------------------- | |
| # HELPER: GET INFERENCE CLIENT | |
| # --------------------------------------------------- | |
| def _get_hf_client(): | |
| if not HF_TOKEN: | |
| raise RuntimeError( | |
| "HUGGINGFACE_API_TOKEN is required. Set it in your environment." | |
| ) | |
| return InferenceClient(token=HF_TOKEN) | |
| # --------------------------------------------------- | |
| # HELPER: EXTRACT JSON FROM LLM RESPONSE | |
| # --------------------------------------------------- | |
| def _extract_json(text: str): | |
| match = re.search(r"\{.*\}", text, re.S) | |
| if not match: | |
| return None | |
| payload = match.group(0) | |
| try: | |
| return json.loads(payload) | |
| except json.JSONDecodeError: | |
| try: | |
| cleaned = re.sub(r"[\n\r]+", " ", payload) | |
| cleaned = re.sub(r"(['\"])?([a-zA-Z0-9_]+)(['\"])?\s*:\s*", r'"\2": ', cleaned) | |
| return json.loads(cleaned) | |
| except Exception: | |
| return None | |
| # --------------------------------------------------- | |
| # HELPER: FIX COMMON PANDAS COMPATIBILITY ISSUES | |
| # --------------------------------------------------- | |
| def _fix_pandas_compatibility(code: str) -> str: | |
| """ | |
| Fix common pandas API compatibility issues in generated code. | |
| Handles version differences between pandas versions. | |
| """ | |
| # Fix: .reset_index(name=...) -> .reset_index(names=[...]) | |
| code = re.sub( | |
| r"\.reset_index\(name=(['\"])([^'\"]+)\1\)", | |
| r".reset_index(names=[\1\2\1])", | |
| code | |
| ) | |
| # Fix: .reset_index(name= with variable | |
| code = re.sub( | |
| r"\.reset_index\(name=([a-zA-Z_][a-zA-Z0-9_]*)\)", | |
| r".reset_index(names=[\1])", | |
| code | |
| ) | |
| # Fix: df.append() deprecated in newer pandas -> pd.concat() | |
| code = re.sub( | |
| r"(\w+)\.append\((\w+),\s*ignore_index=True\)", | |
| r"pd.concat([\1, \2], ignore_index=True)", | |
| code | |
| ) | |
| return code | |
| # --------------------------------------------------- | |
| # STEP 1: CODE GENERATION | |
| # --------------------------------------------------- | |
| def generate_analysis_requirements( | |
| question: str, | |
| acq: pd.DataFrame, | |
| perf: pd.DataFrame, | |
| master_df: pd.DataFrame | |
| ): | |
| """ | |
| LLM breaks down question into 1-3 structured analytics requirements. | |
| Each requirement includes a description and executable pandas code. | |
| """ | |
| client = _get_hf_client() | |
| # Build detailed column descriptions | |
| acq_cols = { | |
| "account_id": "unique account identifier", | |
| "booking_date": "when account was originated", | |
| "booking_vintage": "year-month of origination (YYYY-MM)", | |
| "fico_band": "FICO score bracket (e.g., 700-750, 750-800)", | |
| "sourcing_channel": "acquisition channel (e.g., Online, Branch, Broker)", | |
| "city_tier": "city classification (Tier-1, Tier-2, Tier-3)", | |
| "occupation_type": "borrower occupation category", | |
| "credit_limit": "approved credit line amount" | |
| } | |
| perf_cols = { | |
| "account_id": "unique account identifier", | |
| "reporting_month": "month of performance observation (YYYY-MM)", | |
| "mob": "months on books (age of account in months)", | |
| "dpd": "days past due (0, 30, 60, 90+)", | |
| "balance": "current outstanding balance", | |
| "ncl_amount": "net charge-off amount (dollars)", | |
| "payment": "payment amount in period" | |
| } | |
| prompt = ( | |
| "You are a senior retail credit risk analyst with 15+ years of portfolio management experience.\n\n" | |
| "Your task:\n" | |
| "1. Analyze the user's analytical question deeply\n" | |
| "2. Determine 1-3 specific analytics requirements needed to fully answer the question\n" | |
| "3. For EACH requirement, generate executable pandas code\n" | |
| "4. Return ONLY valid JSON, no other text\n\n" | |
| "AVAILABLE DATA:\n" | |
| "- acq: acquisition data with columns: " + ", ".join(acq_cols.keys()) + "\n" | |
| "- perf: performance data with columns: " + ", ".join(perf_cols.keys()) + "\n" | |
| "- master_df: merged acq+perf, includes all above columns\n\n" | |
| "COLUMN DESCRIPTIONS:\n" | |
| "Acquisition (acq):\n" | |
| + "\n".join([f" - {k}: {v}" for k, v in acq_cols.items()]) + "\n\n" | |
| "Performance (perf):\n" | |
| + "\n".join([f" - {k}: {v}" for k, v in perf_cols.items()]) + "\n\n" | |
| "Available Risk Metrics via generate_metric_view(df, metric_name, group_col):\n" | |
| " - 30+@3 (30+ dpd at 3 months)\n" | |
| " - 30+@6 (30+ dpd at 6 months)\n" | |
| " - 60+@6 (60+ dpd at 6 months)\n" | |
| " - Yr1 NCL (Year 1 net charge-off rate)\n\n" | |
| "CODE GENERATION RULES:\n" | |
| "- Generate pandas code ONLY\n" | |
| "- IMPORTANT: Always store your final result in a variable named exactly 'result_1', 'result_2', or 'result_3' matching the sequence number\n" | |
| "- Use meaningful intermediate variable names (e.g., vintage_analysis, segment_summary)\n" | |
| "- Focus on GROUP BY aggregations for insights\n" | |
| "- Calculate rates as dollars/total (percentage)\n" | |
| "- Sort by risk metrics (descending) to identify worst segments\n" | |
| "- Add brief comments for clarity\n" | |
| "- NO markdown, NO explanations outside JSON\n\n" | |
| "JSON STRUCTURE:\n" | |
| "{\n" | |
| ' "requirements": [\n' | |
| ' {\n' | |
| ' "sequence": 1,\n' | |
| ' "title": "Analysis title",\n' | |
| ' "description": "What this code does and why it matters",\n' | |
| ' "code": "pandas code here — must assign final result to result_1"\n' | |
| " }\n" | |
| " ]\n" | |
| "}\n\n" | |
| "User Question:\n" + question | |
| ) | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "You are a senior credit risk analyst who generates pandas code for portfolio analytics. " | |
| "Return ONLY valid JSON. Always store the final result in result_1, result_2, or result_3." | |
| ) | |
| }, | |
| {"role": "user", "content": prompt} | |
| ] | |
| response = client.chat.completions.create( | |
| model=HF_MODEL_ID, | |
| messages=messages, | |
| max_tokens=2048, | |
| temperature=0.1, | |
| top_p=0.95 | |
| ) | |
| response_text = ( | |
| response.choices[0].message.content | |
| if hasattr(response, "choices") | |
| else str(response) | |
| ) | |
| # Extract JSON | |
| spec = _extract_json(response_text) | |
| if not spec: | |
| return { | |
| "success": False, | |
| "requirements": [], | |
| "error": f"Failed to parse JSON from LLM response: {response_text[:200]}", | |
| "raw_response": response_text | |
| } | |
| requirements = spec.get("requirements", []) | |
| if not requirements: | |
| return { | |
| "success": False, | |
| "requirements": [], | |
| "error": f"LLM returned no requirements. Response keys: {list(spec.keys())}", | |
| "raw_response": response_text[:300] | |
| } | |
| print(f"[DEBUG] Generated {len(requirements)} requirements for question: {question[:80]}") | |
| for i, req in enumerate(requirements, 1): | |
| print(f" Req {i}: {req.get('title')}") | |
| return { | |
| "success": True, | |
| "requirements": requirements, | |
| "error": None | |
| } | |
| # --------------------------------------------------- | |
| # STEP 2: CODE EXECUTION (LOOPED) | |
| # --------------------------------------------------- | |
| def execute_requirement_code( | |
| code: str, | |
| acq: pd.DataFrame, | |
| perf: pd.DataFrame, | |
| master_df: pd.DataFrame, | |
| requirement_num: int | |
| ): | |
| """ | |
| Safely execute generated pandas code for a single requirement. | |
| FIXES: | |
| - FIX 1: Single namespace dict passed to exec() so variable assignments | |
| are reliably captured (Python quirk with separate globals/locals). | |
| - FIX 2: Smart result detection — checks named keys first, then scans | |
| for any new DataFrame/Series, then any non-None new variable. | |
| - FIX 3: _fix_pandas_compatibility() called before exec(). | |
| """ | |
| # FIX 3: Apply pandas compatibility patches BEFORE executing | |
| code = _fix_pandas_compatibility(code) | |
| # FIX 1: Merge everything into ONE dict so exec() writes back correctly. | |
| # When you pass separate globals + locals to exec(), Python's bytecode | |
| # compiler uses STORE_FAST which writes to an internal frame and does NOT | |
| # update the locals dict you passed in — so result variables always come | |
| # back None. Using a single namespace avoids this entirely. | |
| namespace = { | |
| "pd": pd, | |
| "generate_metric_view": generate_metric_view, | |
| "__builtins__": __builtins__, | |
| # Data available to generated code | |
| "acq": acq, | |
| "perf": perf, | |
| "master_df": master_df, | |
| } | |
| # Snapshot of keys before exec so we can detect newly created variables | |
| keys_before = set(namespace.keys()) | |
| try: | |
| print(f"[DEBUG] Executing requirement {requirement_num}...") | |
| print(f"[DEBUG] Code preview: {code[:120].strip()}...") | |
| exec(code, namespace) # FIX 1: single namespace | |
| # FIX 2: Smart result detection — three priority tiers | |
| # --- Tier 1: expected named result variables --- | |
| result = None | |
| expected_keys = [ | |
| f"result_{requirement_num}", | |
| "final_result", | |
| "result", | |
| ] | |
| for key in expected_keys: | |
| if key in namespace and namespace[key] is not None: | |
| result = namespace[key] | |
| print(f"[DEBUG] Found result in expected variable: '{key}'") | |
| break | |
| # --- Tier 2: any NEW DataFrame or Series created during exec --- | |
| if result is None: | |
| new_keys = set(namespace.keys()) - keys_before | |
| for key in new_keys: | |
| val = namespace[key] | |
| if isinstance(val, (pd.DataFrame, pd.Series)) and val is not None: | |
| result = val | |
| print(f"[DEBUG] Found result by scanning new DataFrame/Series: '{key}'") | |
| break | |
| # --- Tier 3: any new non-None, non-private variable --- | |
| if result is None: | |
| new_keys = set(namespace.keys()) - keys_before | |
| for key in sorted(new_keys): # sorted for determinism | |
| if key.startswith("_"): | |
| continue | |
| val = namespace[key] | |
| if val is not None: | |
| result = val | |
| print(f"[DEBUG] Fallback: found result in new variable: '{key}'") | |
| break | |
| if result is None: | |
| result = "Code executed successfully but no result variable was found in namespace." | |
| print(f"[DEBUG] Req {requirement_num} success. Result type: {type(result).__name__}") | |
| return { | |
| "success": True, | |
| "result": result, | |
| "error": None | |
| } | |
| except Exception as e: | |
| import traceback | |
| tb = traceback.format_exc() | |
| print(f"[DEBUG] Req {requirement_num} FAILED: {str(e)}") | |
| print(f"[DEBUG] Traceback:\n{tb}") | |
| return { | |
| "success": False, | |
| "result": None, | |
| "error": str(e) | |
| } | |
| def execute_all_requirements( | |
| requirements: list, | |
| acq: pd.DataFrame, | |
| perf: pd.DataFrame, | |
| master_df: pd.DataFrame | |
| ): | |
| """ | |
| Execute all requirements sequentially, building context. | |
| """ | |
| print(f"[DEBUG] Starting execution of {len(requirements)} requirements") | |
| all_results = [] | |
| context_text = "" | |
| for i, req in enumerate(requirements, 1): | |
| code = req.get("code", "") | |
| description = req.get("description", "") | |
| title = req.get("title", f"Analysis {i}") | |
| exec_result = execute_requirement_code(code, acq, perf, master_df, i) | |
| all_results.append({ | |
| "sequence": i, | |
| "title": title, | |
| "description": description, | |
| "code": code, | |
| # "success" is what app.py checks via res.get("success") | |
| # "execution_success" kept for backward compatibility | |
| "success": exec_result["success"], | |
| "execution_success": exec_result["success"], | |
| "result": exec_result["result"], | |
| "error": exec_result.get("error") | |
| }) | |
| # Build context for interpretation | |
| if exec_result["success"]: | |
| context_text += f"\nAnalysis {i} ({title}):\n{str(exec_result['result'])}\n" | |
| else: | |
| context_text += f"\nAnalysis {i} ({title}) FAILED:\n{exec_result['error']}\n" | |
| return all_results, context_text | |
| # --------------------------------------------------- | |
| # STEP 3: RESULT INTERPRETATION | |
| # --------------------------------------------------- | |
| def interpret_all_results( | |
| question: str, | |
| all_results: list, | |
| context_text: str | |
| ): | |
| """ | |
| Senior risk analyst LLM interprets all results holistically. | |
| """ | |
| print(f"[DEBUG] Interpreting results for {len(all_results)} analyses") | |
| print(f"[DEBUG] Successful executions: {sum(1 for r in all_results if r.get('success'))}") | |
| client = _get_hf_client() | |
| # Format all analyses | |
| analyses_text = "" | |
| for res in all_results: | |
| analyses_text += f"\n{'=' * 60}\n" | |
| analyses_text += f"Analysis {res['sequence']}: {res['title']}\n" | |
| analyses_text += f"Description: {res['description']}\n" | |
| analyses_text += f"{'=' * 60}\n" | |
| if res["success"]: | |
| analyses_text += f"Result:\n{str(res['result'])}\n" | |
| else: | |
| analyses_text += f"Execution Error: {res['error']}\n" | |
| prompt = ( | |
| "You are a senior retail credit risk analyst with 15+ years of portfolio management experience.\n\n" | |
| "Your task:\n" | |
| "Synthesize the analytical results and provide comprehensive risk insights.\n\n" | |
| "Focus on:\n" | |
| "- Key findings and patterns across all analyses\n" | |
| "- Risk deterioration or improvement trends\n" | |
| "- Vintage/segment concentration issues and implications\n" | |
| "- Root causes of observed patterns\n" | |
| "- Unusual trends, anomalies, or red flags\n" | |
| "- Actionable recommendations for portfolio management\n" | |
| "- Comparative risk assessment (which segments/vintages are most/least risky)\n\n" | |
| "Guidelines:\n" | |
| "- Be analytical and specific (not generic)\n" | |
| "- Focus on business implications, not just statistics\n" | |
| "- Avoid repeating raw tables; interpret the meaning\n" | |
| "- Provide 3-5 key insights\n" | |
| "- Suggest next investigative steps if needed\n\n" | |
| "User's Original Question:\n" + question + "\n\n" | |
| "Analyses Performed:\n" + analyses_text + "\n\n" | |
| "Provide your senior analyst interpretation:" | |
| ) | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": "You are a senior credit risk analyst providing executive insights from portfolio analytics." | |
| }, | |
| {"role": "user", "content": prompt} | |
| ] | |
| response = client.chat.completions.create( | |
| model=HF_MODEL_ID, | |
| messages=messages, | |
| max_tokens=1024, | |
| temperature=0.3, | |
| top_p=0.95 | |
| ) | |
| interpretation = ( | |
| response.choices[0].message.content | |
| if hasattr(response, "choices") | |
| else str(response) | |
| ) | |
| return interpretation | |
| # --------------------------------------------------- | |
| # MASTER ORCHESTRATOR FUNCTION | |
| # --------------------------------------------------- | |
| def run_deep_dive_analysis( | |
| question: str, | |
| acq: pd.DataFrame, | |
| perf: pd.DataFrame, | |
| master_df: pd.DataFrame | |
| ): | |
| """ | |
| End-to-end deep dive analysis: | |
| 1. Break question into 1-3 structured requirements | |
| 2. Generate code for each requirement | |
| 3. Execute each requirement's code sequentially | |
| 4. Synthesize results and provide senior analyst interpretation | |
| """ | |
| print(f"\n[DEEP DIVE START] Question: {question}") | |
| print(f"[DEBUG] Data shapes - Acq: {acq.shape}, Perf: {perf.shape}, Master: {master_df.shape}") | |
| # Step 1: Generate requirements | |
| print("[DEBUG] Step 1: Generating requirements...") | |
| req_response = generate_analysis_requirements(question, acq, perf, master_df) | |
| if not req_response["success"]: | |
| return { | |
| "success": False, | |
| "question": question, | |
| "requirements": [], | |
| "all_results": [], | |
| "interpretation": f"Failed to generate requirements: {req_response['error']}", | |
| "error": req_response["error"] | |
| } | |
| requirements = req_response["requirements"][:3] # Cap at 3 | |
| # Step 2 & 3: Execute all requirements | |
| print(f"[DEBUG] Step 2-3: Executing {len(requirements)} requirements...") | |
| all_results, context_text = execute_all_requirements(requirements, acq, perf, master_df) | |
| # Step 4: Interpret results | |
| print("[DEBUG] Step 4: Interpreting all results...") | |
| interpretation = interpret_all_results(question, all_results, context_text) | |
| print("[DEEP DIVE END] Analysis complete\n") | |
| return { | |
| "success": True, | |
| "question": question, | |
| "requirements": requirements, | |
| "all_results": all_results, | |
| "interpretation": interpretation, | |
| "error": None | |
| } | |