Spaces:
Sleeping
Sleeping
| from src.llm_client import get_llm_client | |
| from langsmith import traceable | |
| import json | |
| import time | |
| # Layer 4: Deterministic numeric validation | |
| from src.utils.numeric_validator import ( | |
| validate_numeric_accuracy, | |
| validate_uncited_numbers, | |
| validate_minimum_citations, | |
| ) | |
| from src.nodes.analyzer import _verify_reference_integrity | |
| def _add_activity_log(workflow_id, progress_store, step, message): | |
| """Helper to add activity log entry.""" | |
| if workflow_id and progress_store: | |
| from src.services.workflow_store import add_activity_log | |
| add_activity_log(workflow_id, step, message) | |
| # ============================================================ | |
| # LLM-ONLY WEIGHTED RUBRIC EVALUATION | |
| # ============================================================ | |
| CRITIC_SYSTEM_PROMPT = """You are a SWOT Output Critic and Quality Gatekeeper. | |
| ## ROLE | |
| Act as an independent, impartial evaluator that reviews SWOT analyses. Your function is to: | |
| 1. Verify factual accuracy against provided input data | |
| 2. Assess quality against a weighted rubric | |
| 3. Decide whether the output PASSES or FAILS | |
| 4. Provide actionable feedback if rejected | |
| You are a quality gate, not a collaborator. Be strict. | |
| ## VALID METRICS SCHEMA | |
| **Fundamentals:** revenue, net_income, net_margin_pct, total_assets, total_liabilities, stockholders_equity, operating_margin_pct, total_debt, operating_cash_flow, free_cash_flow | |
| **Valuation:** current_price, market_cap, enterprise_value, trailing_pe, forward_pe, ps_ratio, pb_ratio, trailing_peg, forward_peg, earnings_growth, revenue_growth | |
| **Volatility:** vix, vxn, beta, historical_volatility, implied_volatility | |
| **Macro:** gdp_growth, interest_rate, cpi_inflation, unemployment | |
| **Qualitative:** News (title, date, source, url), Sentiment (title, date, source, url) | |
| ## EVALUATION RUBRIC (Weighted) | |
| ### 1. Evidence Grounding (25%) — HARD FLOOR: >=7 | |
| - All claims cite specific metrics from input data | |
| - No fabricated metrics (hallucination check) | |
| - Field names match schema | |
| - 9-10: Every claim traceable; 7-8: Nearly all grounded; 5-6: Most grounded, 2-3 unverifiable; 3-4: Multiple unsupported; 1-2: Clear hallucinations | |
| - **If ANY fabricated metric detected, cap at 4** | |
| ### 2. Constraint Compliance (20%) — HARD FLOOR: >=6 | |
| - No buy/sell/hold recommendations | |
| - Temporal labels accurate (TTM, FY, forward) | |
| - "DATA NOT PROVIDED" used for missing metrics | |
| - 9-10: All constraints respected; 7-8: Minor issues; 5-6: One moderate violation; 3-4: Multiple violations; 1-2: Systematic violations | |
| ### 3. Specificity & Actionability (20%) | |
| - Company-specific, not generic templates | |
| - Quantified findings (not "strong margins" but "31% operating margin") | |
| - Avoids business cliches | |
| - 9-10: Every point specific and quantified; 7-8: Mostly specific; 5-6: Mix of specific/generic; 3-4: Mostly generic; 1-2: Template-like | |
| ### 4. Strategic Insight (15%) | |
| - Synthesis across multiple data sources | |
| - Prioritization by materiality | |
| - Goes beyond restating metrics to interpreting implications | |
| - 9-10: Identifies causal relationships; 7-8: Good synthesis; 5-6: Surface-level; 3-4: Restates metrics; 1-2: No value-add | |
| ### 5. Completeness & Balance (10%) | |
| Required sections: | |
| - Strengths (Finding, Strategic Implication, Durability) | |
| - Weaknesses (Finding, Severity, Trend, Remediation Levers) | |
| - Opportunities (Catalyst, Timing, Execution Requirements) | |
| - Threats (Risk Factor, Probability, Impact, Mitigation Options) | |
| - Data Quality Notes | |
| - 9-10: All present and substantive; 7-8: All present, minor gaps; 5-6: Missing 1 section; 3-4: Multiple missing; 1-2: Major gaps | |
| ### 6. Clarity & Structure (10%) | |
| - Clean formatting, logical grouping | |
| - Easy to scan (not walls of text) | |
| - No contradictions | |
| - 9-10: Impeccable; 7-8: Well-structured; 5-6: Readable but dense; 3-4: Hard to follow; 1-2: Poorly organized | |
| ## PASS CONDITIONS (ALL must be met) | |
| 1. Weighted average >= 6.0 | |
| 2. Evidence Grounding >= 6 | |
| 3. Constraint Compliance >= 6 | |
| 4. No individual criterion below 5 | |
| ## OUTPUT FORMAT (JSON only, no other text) | |
| { | |
| "status": "APPROVED" or "REJECTED", | |
| "weighted_score": <float>, | |
| "scores": { | |
| "evidence_grounding": <1-10>, | |
| "constraint_compliance": <1-10>, | |
| "specificity_actionability": <1-10>, | |
| "strategic_insight": <1-10>, | |
| "completeness_balance": <1-10>, | |
| "clarity_structure": <1-10> | |
| }, | |
| "hard_floor_violations": ["list of violated floors or empty array"], | |
| "hallucinations_detected": ["list of fabricated metrics or empty array"], | |
| "key_deficiencies": ["prioritized list, max 5"], | |
| "strengths_to_preserve": ["elements done well"], | |
| "actionable_feedback": ["specific rewrite instructions, max 5"] | |
| } | |
| """ | |
| # Weights for each criterion | |
| CRITERION_WEIGHTS = { | |
| "evidence_grounding": 0.25, | |
| "constraint_compliance": 0.20, | |
| "specificity_actionability": 0.20, | |
| "strategic_insight": 0.15, | |
| "completeness_balance": 0.10, | |
| "clarity_structure": 0.10, | |
| } | |
| # Hard floor requirements | |
| HARD_FLOORS = { | |
| "evidence_grounding": 6, | |
| "constraint_compliance": 6, | |
| } | |
| # Minimum score for any criterion | |
| MIN_INDIVIDUAL_SCORE = 5 | |
| def calculate_weighted_score(scores: dict) -> float: | |
| """Calculate weighted average from individual criterion scores.""" | |
| total = 0.0 | |
| for criterion, weight in CRITERION_WEIGHTS.items(): | |
| score = scores.get(criterion, 5) # Default to 5 if missing | |
| total += score * weight | |
| return round(total, 2) | |
| def check_pass_conditions(scores: dict, weighted_score: float) -> tuple: | |
| """ | |
| Check if all pass conditions are met. | |
| Returns (passed: bool, violations: list) | |
| """ | |
| violations = [] | |
| # Check weighted average threshold | |
| if weighted_score < 6.0: | |
| violations.append(f"Weighted score {weighted_score:.1f} < 6.0 threshold") | |
| # Check hard floors | |
| for criterion, floor in HARD_FLOORS.items(): | |
| score = scores.get(criterion, 0) | |
| if score < floor: | |
| violations.append(f"{criterion}: {score} < {floor} (hard floor)") | |
| # Check minimum individual scores | |
| for criterion, score in scores.items(): | |
| if score < MIN_INDIVIDUAL_SCORE: | |
| violations.append(f"{criterion}: {score} < {MIN_INDIVIDUAL_SCORE} (minimum)") | |
| return (len(violations) == 0, violations) | |
| def run_llm_evaluation(report: str, source_data: str, iteration: int, llm) -> dict: | |
| """ | |
| Run LLM-based evaluation with weighted rubric. | |
| Args: | |
| report: The SWOT output to evaluate | |
| source_data: The source data the SWOT should be based on | |
| iteration: Current revision number (1, 2, or 3) | |
| llm: LLM client instance | |
| Returns: | |
| Evaluation result dict with scores, status, and feedback | |
| """ | |
| # Truncate source data if too long (Groq has ~8K token limit) | |
| max_source_len = 4000 | |
| if len(source_data) > max_source_len: | |
| source_data = source_data[:max_source_len] + "\n... [truncated]" | |
| prompt = f"""{CRITIC_SYSTEM_PROMPT} | |
| ## INPUTS | |
| **Iteration:** {iteration} of 3 | |
| **Source Data (the SWOT should be based ONLY on this):** | |
| {source_data} | |
| **SWOT Output to Evaluate:** | |
| {report} | |
| Evaluate strictly and respond with JSON only.""" | |
| response, provider, error, providers_failed = llm.query(prompt, temperature=0) | |
| if error: | |
| # Return default middle scores on error | |
| return { | |
| "status": "REJECTED", | |
| "weighted_score": 5.0, | |
| "scores": {k: 5 for k in CRITERION_WEIGHTS.keys()}, | |
| "hard_floor_violations": [], | |
| "hallucinations_detected": [], | |
| "key_deficiencies": [f"LLM evaluation failed: {error}"], | |
| "strengths_to_preserve": [], | |
| "actionable_feedback": ["Unable to evaluate - please retry"], | |
| "provider": provider, | |
| "providers_failed": providers_failed, | |
| "error": True | |
| } | |
| try: | |
| # Parse JSON from response | |
| content = response.strip() | |
| if "{" in content: | |
| json_start = content.index("{") | |
| json_end = content.rindex("}") + 1 | |
| content = content[json_start:json_end] | |
| parsed = json.loads(content) | |
| # Extract and validate scores | |
| scores = parsed.get("scores", {}) | |
| for criterion in CRITERION_WEIGHTS.keys(): | |
| if criterion not in scores: | |
| scores[criterion] = 5 # Default | |
| else: | |
| scores[criterion] = min(max(int(scores[criterion]), 1), 10) # Clamp 1-10 | |
| # Calculate weighted score | |
| weighted_score = calculate_weighted_score(scores) | |
| # Check pass conditions | |
| passed, violations = check_pass_conditions(scores, weighted_score) | |
| # Determine status | |
| status = "APPROVED" if passed else "REJECTED" | |
| # Override status if LLM said APPROVED but conditions not met | |
| if parsed.get("status") == "APPROVED" and not passed: | |
| status = "REJECTED" | |
| return { | |
| "status": status, | |
| "weighted_score": weighted_score, | |
| "scores": scores, | |
| "hard_floor_violations": parsed.get("hard_floor_violations", violations), | |
| "hallucinations_detected": parsed.get("hallucinations_detected", []), | |
| "key_deficiencies": parsed.get("key_deficiencies", [])[:5], | |
| "strengths_to_preserve": parsed.get("strengths_to_preserve", []), | |
| "actionable_feedback": parsed.get("actionable_feedback", [])[:5], | |
| "provider": provider, | |
| "providers_failed": providers_failed, | |
| "error": False | |
| } | |
| except (json.JSONDecodeError, ValueError) as e: | |
| return { | |
| "status": "REJECTED", | |
| "weighted_score": 5.0, | |
| "scores": {k: 5 for k in CRITERION_WEIGHTS.keys()}, | |
| "hard_floor_violations": [], | |
| "hallucinations_detected": [], | |
| "key_deficiencies": [f"JSON parsing failed: {str(e)[:100]}"], | |
| "strengths_to_preserve": [], | |
| "actionable_feedback": ["Evaluation response was malformed - please retry"], | |
| "provider": provider, | |
| "providers_failed": providers_failed, | |
| "error": True | |
| } | |
| def critic_node(state, workflow_id=None, progress_store=None): | |
| """ | |
| Critic node with LLM-only weighted rubric evaluation. | |
| Evaluates SWOT output on 6 criteria with weighted scoring: | |
| - Evidence Grounding (25%) - hard floor >= 6 | |
| - Constraint Compliance (20%) - hard floor >= 6 | |
| - Specificity & Actionability (20%) | |
| - Strategic Insight (15%) | |
| - Completeness & Balance (10%) | |
| - Clarity & Structure (10%) | |
| Pass requires: weighted avg >= 6.0, hard floors met, no score < 5 | |
| """ | |
| # Extract workflow_id and progress_store from state | |
| if workflow_id is None: | |
| workflow_id = state.get("workflow_id") | |
| if progress_store is None: | |
| progress_store = state.get("progress_store") | |
| # Skip evaluation if workflow has an error (abort mode) | |
| if state.get("error"): | |
| _add_activity_log(workflow_id, progress_store, "critic", "Skipping evaluation - workflow aborted") | |
| error_msg = state.get("error", "") | |
| if "429" in error_msg or "Too Many Requests" in error_msg: | |
| user_friendly_msg = "All AI providers are temporarily unavailable due to rate limits. Please wait a moment and try again." | |
| elif "All LLM providers failed" in error_msg: | |
| user_friendly_msg = "Unable to connect to AI providers. Please check your API keys or try again later." | |
| else: | |
| user_friendly_msg = "Analysis could not be completed. Please try again." | |
| state["critique"] = user_friendly_msg | |
| state["score"] = 0 | |
| return state | |
| report = state.get("draft_report", "") | |
| revision_count = state.get("revision_count", 0) | |
| iteration = revision_count + 1 # 1-indexed for display | |
| # Log evaluation start | |
| _add_activity_log(workflow_id, progress_store, "critic", f"Evaluating SWOT quality (iteration {iteration}/3)...") | |
| # Get source data for grounding verification | |
| source_data = state.get("raw_data", "") | |
| # Run LLM evaluation | |
| print(f"Running LLM evaluation (iteration {iteration})...") | |
| llm = get_llm_client() | |
| # Add delay before LLM call to avoid rate limits (Analyzer just called LLM) | |
| print("Waiting 10s before Critic LLM call (rate limit buffer)...") | |
| time.sleep(10) | |
| _add_activity_log(workflow_id, progress_store, "critic", "Calling LLM for quality evaluation...") | |
| start_time = time.time() | |
| result = run_llm_evaluation(report, source_data, iteration, llm) | |
| elapsed = time.time() - start_time | |
| provider = result.get('provider', 'unknown') | |
| # Propagate LLM error to state to trigger graceful exit (prevents infinite retry loop) | |
| if result.get("error"): | |
| _add_activity_log(workflow_id, progress_store, "critic", | |
| "LLM evaluation failed - exiting gracefully with current draft") | |
| state["analyzer_revision_skipped"] = True # Triggers graceful exit in should_continue() | |
| # Log failed providers | |
| providers_failed = result.get('providers_failed', []) | |
| for pf in providers_failed: | |
| _add_activity_log(workflow_id, progress_store, "critic", f"LLM {pf['name']} failed: {pf['error']}") | |
| # Track failed providers in state for frontend | |
| if "llm_providers_failed" not in state: | |
| state["llm_providers_failed"] = [] | |
| state["llm_providers_failed"].extend([pf["name"] for pf in providers_failed]) | |
| # Extract results | |
| status = result["status"] | |
| weighted_score = result["weighted_score"] | |
| scores = result["scores"] | |
| # ============================================================ | |
| # LAYER 4: Deterministic Numeric Validation | |
| # ============================================================ | |
| metric_ref = state.get("metric_reference", {}) | |
| ref_hash = state.get("metric_reference_hash", "") | |
| if metric_ref and ref_hash: | |
| # Verify integrity before using | |
| if _verify_reference_integrity(metric_ref, ref_hash): | |
| mismatches = validate_numeric_accuracy(report, metric_ref) | |
| if mismatches: | |
| # Log each mismatch for debugging | |
| for mismatch in mismatches: | |
| _add_activity_log(workflow_id, progress_store, "critic", | |
| f"MISMATCH: {mismatch}") | |
| _add_activity_log(workflow_id, progress_store, "critic", | |
| f"Numeric validation: {len(mismatches)} mismatch(es) detected") | |
| # Ensure hallucinations_detected exists | |
| if "hallucinations_detected" not in result: | |
| result["hallucinations_detected"] = [] | |
| result["hallucinations_detected"].extend(mismatches) | |
| # Cap evidence_grounding score | |
| if scores.get("evidence_grounding", 0) > 4: | |
| scores["evidence_grounding"] = 4 | |
| if "hard_floor_violations" not in result: | |
| result["hard_floor_violations"] = [] | |
| result["hard_floor_violations"].append( | |
| "Numeric mismatch detected - evidence_grounding capped at 4" | |
| ) | |
| # Add specific feedback | |
| if "actionable_feedback" not in result: | |
| result["actionable_feedback"] = [] | |
| result["actionable_feedback"].insert(0, | |
| f"Fix {len(mismatches)} numeric mismatch(es) - use exact values with [M##] citations from reference table" | |
| ) | |
| # Recalculate weighted score with capped evidence_grounding | |
| weighted_score = calculate_weighted_score(scores) | |
| result["weighted_score"] = weighted_score | |
| # Force rejection if numeric mismatches | |
| status = "REJECTED" | |
| result["status"] = status | |
| else: | |
| _add_activity_log(workflow_id, progress_store, "critic", | |
| "Numeric validation: all citations verified") | |
| # ============================================================ | |
| # LAYER 3: Uncited Number Detection | |
| # ============================================================ | |
| # Only validate SWOT section (not Data Report tables which have raw metrics) | |
| swot_section = report | |
| if "## SWOT Analysis" in report: | |
| swot_section = report[report.index("## SWOT Analysis"):] | |
| uncited_warnings = validate_uncited_numbers(swot_section, metric_ref) | |
| if uncited_warnings: | |
| _add_activity_log(workflow_id, progress_store, "critic", | |
| f"Uncited numbers: {len(uncited_warnings)} suspicious value(s) found") | |
| # Add to hallucinations_detected | |
| if "hallucinations_detected" not in result: | |
| result["hallucinations_detected"] = [] | |
| result["hallucinations_detected"].extend(uncited_warnings) | |
| # Cap score and add feedback (less severe than mismatches) | |
| if scores.get("evidence_grounding", 0) > 6: | |
| scores["evidence_grounding"] = 6 | |
| if "hard_floor_violations" not in result: | |
| result["hard_floor_violations"] = [] | |
| result["hard_floor_violations"].append( | |
| "Uncited metric-like numbers found - evidence_grounding capped at 6" | |
| ) | |
| # Add feedback | |
| if "actionable_feedback" not in result: | |
| result["actionable_feedback"] = [] | |
| result["actionable_feedback"].append( | |
| f"Add [M##] citations for {len(uncited_warnings)} uncited metric value(s)" | |
| ) | |
| # Recalculate and reject | |
| weighted_score = calculate_weighted_score(scores) | |
| result["weighted_score"] = weighted_score | |
| status = "REJECTED" | |
| result["status"] = status | |
| # ============================================================ | |
| # LAYER 2: Minimum Citation Count Enforcement | |
| # ============================================================ | |
| citation_check = validate_minimum_citations(report, metric_ref, min_ratio=0.3) | |
| if not citation_check["valid"]: | |
| _add_activity_log(workflow_id, progress_store, "critic", | |
| f"Citation coverage insufficient: {citation_check['message']}") | |
| # Cap score severely - this indicates LLM ignored citation instructions | |
| if scores.get("evidence_grounding", 0) > 3: | |
| scores["evidence_grounding"] = 3 | |
| if "hard_floor_violations" not in result: | |
| result["hard_floor_violations"] = [] | |
| result["hard_floor_violations"].append( | |
| f"Insufficient citation coverage ({citation_check['ratio']:.0%}) - evidence_grounding capped at 3" | |
| ) | |
| # Add feedback | |
| if "actionable_feedback" not in result: | |
| result["actionable_feedback"] = [] | |
| result["actionable_feedback"].insert(0, | |
| f"CRITICAL: Add more [M##] citations. Current: {citation_check['citations_found']}/{citation_check['metrics_available']} ({citation_check['ratio']:.0%})" | |
| ) | |
| # Recalculate and reject | |
| weighted_score = calculate_weighted_score(scores) | |
| result["weighted_score"] = weighted_score | |
| status = "REJECTED" | |
| result["status"] = status | |
| else: | |
| _add_activity_log(workflow_id, progress_store, "critic", | |
| f"Citation coverage OK: {citation_check['message']}") | |
| else: | |
| _add_activity_log(workflow_id, progress_store, "critic", | |
| "Warning: metric reference integrity check failed - skipping numeric validation") | |
| # Handle ESCALATE if max iterations reached | |
| if iteration > 3 and status == "REJECTED": | |
| status = "ESCALATE" | |
| _add_activity_log(workflow_id, progress_store, "critic", "Max iterations reached - escalating for human review") | |
| # Log scores | |
| print(f" Status: {status}") | |
| print(f" Weighted Score: {weighted_score:.1f}/10") | |
| for criterion, score in scores.items(): | |
| floor = HARD_FLOORS.get(criterion, "-") | |
| print(f" {criterion}: {score}/10 (floor: {floor})") | |
| _add_activity_log(workflow_id, progress_store, "critic", f"Evaluation via {provider} ({elapsed:.1f}s)") | |
| # Log status and score | |
| if status == "APPROVED": | |
| score_msg = f"Score: {weighted_score:.1f}/10" | |
| elif status == "ESCALATE": | |
| score_msg = f"Score: {weighted_score:.1f}/10" | |
| else: | |
| score_msg = f"Score: {weighted_score:.1f}/10" | |
| _add_activity_log(workflow_id, progress_store, "critic", score_msg) | |
| # Build critique message | |
| critique_lines = [ | |
| f"Status: {status}", | |
| f"Weighted Score: {weighted_score:.1f}/10", | |
| "", | |
| "Criterion Scores:", | |
| ] | |
| for criterion, score in scores.items(): | |
| weight = int(CRITERION_WEIGHTS[criterion] * 100) | |
| floor = HARD_FLOORS.get(criterion) | |
| floor_str = f" (floor: {floor})" if floor else "" | |
| passed = "PASS" if score >= (floor or MIN_INDIVIDUAL_SCORE) else "FAIL" | |
| critique_lines.append(f" {criterion}: {score}/10 [{weight}%] {floor_str} - {passed}") | |
| if result.get("hard_floor_violations"): | |
| critique_lines.append("") | |
| critique_lines.append("Hard Floor Violations:") | |
| for v in result["hard_floor_violations"]: | |
| critique_lines.append(f" - {v}") | |
| if result.get("hallucinations_detected"): | |
| critique_lines.append("") | |
| critique_lines.append("Hallucinations Detected:") | |
| for h in result["hallucinations_detected"]: | |
| critique_lines.append(f" - {h}") | |
| if result.get("key_deficiencies"): | |
| critique_lines.append("") | |
| critique_lines.append("Key Deficiencies:") | |
| for i, d in enumerate(result["key_deficiencies"], 1): | |
| critique_lines.append(f" {i}. {d}") | |
| if result.get("actionable_feedback"): | |
| critique_lines.append("") | |
| critique_lines.append("Actionable Feedback:") | |
| for i, f in enumerate(result["actionable_feedback"], 1): | |
| critique_lines.append(f" {i}. {f}") | |
| if result.get("strengths_to_preserve"): | |
| critique_lines.append("") | |
| critique_lines.append("Strengths to Preserve:") | |
| for s in result["strengths_to_preserve"]: | |
| critique_lines.append(f" - {s}") | |
| state["critique"] = "\n".join(critique_lines) | |
| state["score"] = weighted_score | |
| state["critique_details"] = { | |
| "status": status, | |
| "weighted_score": weighted_score, | |
| "scores": scores, | |
| "hard_floor_violations": result.get("hard_floor_violations", []), | |
| "hallucinations_detected": result.get("hallucinations_detected", []), | |
| "key_deficiencies": result.get("key_deficiencies", []), | |
| "strengths_to_preserve": result.get("strengths_to_preserve", []), | |
| "actionable_feedback": result.get("actionable_feedback", []), | |
| } | |
| # Debug: Log what's being set in critique_details | |
| print(f"[DEBUG] Critic: Setting critique_details status={status}, score={weighted_score:.1f}") | |
| # Update progress | |
| if workflow_id and progress_store: | |
| progress_store[workflow_id].update({ | |
| "current_step": "critic", | |
| "revision_count": revision_count, | |
| "score": weighted_score | |
| }) | |
| return state | |