vn6295337's picture
debug: Add detailed logging for numeric mismatches in critic
0e30a8c
from src.llm_client import get_llm_client
from langsmith import traceable
import json
import time
# Layer 4: Deterministic numeric validation
from src.utils.numeric_validator import (
validate_numeric_accuracy,
validate_uncited_numbers,
validate_minimum_citations,
)
from src.nodes.analyzer import _verify_reference_integrity
def _add_activity_log(workflow_id, progress_store, step, message):
"""Helper to add activity log entry."""
if workflow_id and progress_store:
from src.services.workflow_store import add_activity_log
add_activity_log(workflow_id, step, message)
# ============================================================
# LLM-ONLY WEIGHTED RUBRIC EVALUATION
# ============================================================
CRITIC_SYSTEM_PROMPT = """You are a SWOT Output Critic and Quality Gatekeeper.
## ROLE
Act as an independent, impartial evaluator that reviews SWOT analyses. Your function is to:
1. Verify factual accuracy against provided input data
2. Assess quality against a weighted rubric
3. Decide whether the output PASSES or FAILS
4. Provide actionable feedback if rejected
You are a quality gate, not a collaborator. Be strict.
## VALID METRICS SCHEMA
**Fundamentals:** revenue, net_income, net_margin_pct, total_assets, total_liabilities, stockholders_equity, operating_margin_pct, total_debt, operating_cash_flow, free_cash_flow
**Valuation:** current_price, market_cap, enterprise_value, trailing_pe, forward_pe, ps_ratio, pb_ratio, trailing_peg, forward_peg, earnings_growth, revenue_growth
**Volatility:** vix, vxn, beta, historical_volatility, implied_volatility
**Macro:** gdp_growth, interest_rate, cpi_inflation, unemployment
**Qualitative:** News (title, date, source, url), Sentiment (title, date, source, url)
## EVALUATION RUBRIC (Weighted)
### 1. Evidence Grounding (25%) — HARD FLOOR: >=7
- All claims cite specific metrics from input data
- No fabricated metrics (hallucination check)
- Field names match schema
- 9-10: Every claim traceable; 7-8: Nearly all grounded; 5-6: Most grounded, 2-3 unverifiable; 3-4: Multiple unsupported; 1-2: Clear hallucinations
- **If ANY fabricated metric detected, cap at 4**
### 2. Constraint Compliance (20%) — HARD FLOOR: >=6
- No buy/sell/hold recommendations
- Temporal labels accurate (TTM, FY, forward)
- "DATA NOT PROVIDED" used for missing metrics
- 9-10: All constraints respected; 7-8: Minor issues; 5-6: One moderate violation; 3-4: Multiple violations; 1-2: Systematic violations
### 3. Specificity & Actionability (20%)
- Company-specific, not generic templates
- Quantified findings (not "strong margins" but "31% operating margin")
- Avoids business cliches
- 9-10: Every point specific and quantified; 7-8: Mostly specific; 5-6: Mix of specific/generic; 3-4: Mostly generic; 1-2: Template-like
### 4. Strategic Insight (15%)
- Synthesis across multiple data sources
- Prioritization by materiality
- Goes beyond restating metrics to interpreting implications
- 9-10: Identifies causal relationships; 7-8: Good synthesis; 5-6: Surface-level; 3-4: Restates metrics; 1-2: No value-add
### 5. Completeness & Balance (10%)
Required sections:
- Strengths (Finding, Strategic Implication, Durability)
- Weaknesses (Finding, Severity, Trend, Remediation Levers)
- Opportunities (Catalyst, Timing, Execution Requirements)
- Threats (Risk Factor, Probability, Impact, Mitigation Options)
- Data Quality Notes
- 9-10: All present and substantive; 7-8: All present, minor gaps; 5-6: Missing 1 section; 3-4: Multiple missing; 1-2: Major gaps
### 6. Clarity & Structure (10%)
- Clean formatting, logical grouping
- Easy to scan (not walls of text)
- No contradictions
- 9-10: Impeccable; 7-8: Well-structured; 5-6: Readable but dense; 3-4: Hard to follow; 1-2: Poorly organized
## PASS CONDITIONS (ALL must be met)
1. Weighted average >= 6.0
2. Evidence Grounding >= 6
3. Constraint Compliance >= 6
4. No individual criterion below 5
## OUTPUT FORMAT (JSON only, no other text)
{
"status": "APPROVED" or "REJECTED",
"weighted_score": <float>,
"scores": {
"evidence_grounding": <1-10>,
"constraint_compliance": <1-10>,
"specificity_actionability": <1-10>,
"strategic_insight": <1-10>,
"completeness_balance": <1-10>,
"clarity_structure": <1-10>
},
"hard_floor_violations": ["list of violated floors or empty array"],
"hallucinations_detected": ["list of fabricated metrics or empty array"],
"key_deficiencies": ["prioritized list, max 5"],
"strengths_to_preserve": ["elements done well"],
"actionable_feedback": ["specific rewrite instructions, max 5"]
}
"""
# Weights for each criterion
CRITERION_WEIGHTS = {
"evidence_grounding": 0.25,
"constraint_compliance": 0.20,
"specificity_actionability": 0.20,
"strategic_insight": 0.15,
"completeness_balance": 0.10,
"clarity_structure": 0.10,
}
# Hard floor requirements
HARD_FLOORS = {
"evidence_grounding": 6,
"constraint_compliance": 6,
}
# Minimum score for any criterion
MIN_INDIVIDUAL_SCORE = 5
def calculate_weighted_score(scores: dict) -> float:
"""Calculate weighted average from individual criterion scores."""
total = 0.0
for criterion, weight in CRITERION_WEIGHTS.items():
score = scores.get(criterion, 5) # Default to 5 if missing
total += score * weight
return round(total, 2)
def check_pass_conditions(scores: dict, weighted_score: float) -> tuple:
"""
Check if all pass conditions are met.
Returns (passed: bool, violations: list)
"""
violations = []
# Check weighted average threshold
if weighted_score < 6.0:
violations.append(f"Weighted score {weighted_score:.1f} < 6.0 threshold")
# Check hard floors
for criterion, floor in HARD_FLOORS.items():
score = scores.get(criterion, 0)
if score < floor:
violations.append(f"{criterion}: {score} < {floor} (hard floor)")
# Check minimum individual scores
for criterion, score in scores.items():
if score < MIN_INDIVIDUAL_SCORE:
violations.append(f"{criterion}: {score} < {MIN_INDIVIDUAL_SCORE} (minimum)")
return (len(violations) == 0, violations)
def run_llm_evaluation(report: str, source_data: str, iteration: int, llm) -> dict:
"""
Run LLM-based evaluation with weighted rubric.
Args:
report: The SWOT output to evaluate
source_data: The source data the SWOT should be based on
iteration: Current revision number (1, 2, or 3)
llm: LLM client instance
Returns:
Evaluation result dict with scores, status, and feedback
"""
# Truncate source data if too long (Groq has ~8K token limit)
max_source_len = 4000
if len(source_data) > max_source_len:
source_data = source_data[:max_source_len] + "\n... [truncated]"
prompt = f"""{CRITIC_SYSTEM_PROMPT}
## INPUTS
**Iteration:** {iteration} of 3
**Source Data (the SWOT should be based ONLY on this):**
{source_data}
**SWOT Output to Evaluate:**
{report}
Evaluate strictly and respond with JSON only."""
response, provider, error, providers_failed = llm.query(prompt, temperature=0)
if error:
# Return default middle scores on error
return {
"status": "REJECTED",
"weighted_score": 5.0,
"scores": {k: 5 for k in CRITERION_WEIGHTS.keys()},
"hard_floor_violations": [],
"hallucinations_detected": [],
"key_deficiencies": [f"LLM evaluation failed: {error}"],
"strengths_to_preserve": [],
"actionable_feedback": ["Unable to evaluate - please retry"],
"provider": provider,
"providers_failed": providers_failed,
"error": True
}
try:
# Parse JSON from response
content = response.strip()
if "{" in content:
json_start = content.index("{")
json_end = content.rindex("}") + 1
content = content[json_start:json_end]
parsed = json.loads(content)
# Extract and validate scores
scores = parsed.get("scores", {})
for criterion in CRITERION_WEIGHTS.keys():
if criterion not in scores:
scores[criterion] = 5 # Default
else:
scores[criterion] = min(max(int(scores[criterion]), 1), 10) # Clamp 1-10
# Calculate weighted score
weighted_score = calculate_weighted_score(scores)
# Check pass conditions
passed, violations = check_pass_conditions(scores, weighted_score)
# Determine status
status = "APPROVED" if passed else "REJECTED"
# Override status if LLM said APPROVED but conditions not met
if parsed.get("status") == "APPROVED" and not passed:
status = "REJECTED"
return {
"status": status,
"weighted_score": weighted_score,
"scores": scores,
"hard_floor_violations": parsed.get("hard_floor_violations", violations),
"hallucinations_detected": parsed.get("hallucinations_detected", []),
"key_deficiencies": parsed.get("key_deficiencies", [])[:5],
"strengths_to_preserve": parsed.get("strengths_to_preserve", []),
"actionable_feedback": parsed.get("actionable_feedback", [])[:5],
"provider": provider,
"providers_failed": providers_failed,
"error": False
}
except (json.JSONDecodeError, ValueError) as e:
return {
"status": "REJECTED",
"weighted_score": 5.0,
"scores": {k: 5 for k in CRITERION_WEIGHTS.keys()},
"hard_floor_violations": [],
"hallucinations_detected": [],
"key_deficiencies": [f"JSON parsing failed: {str(e)[:100]}"],
"strengths_to_preserve": [],
"actionable_feedback": ["Evaluation response was malformed - please retry"],
"provider": provider,
"providers_failed": providers_failed,
"error": True
}
@traceable(name="Critic")
def critic_node(state, workflow_id=None, progress_store=None):
"""
Critic node with LLM-only weighted rubric evaluation.
Evaluates SWOT output on 6 criteria with weighted scoring:
- Evidence Grounding (25%) - hard floor >= 6
- Constraint Compliance (20%) - hard floor >= 6
- Specificity & Actionability (20%)
- Strategic Insight (15%)
- Completeness & Balance (10%)
- Clarity & Structure (10%)
Pass requires: weighted avg >= 6.0, hard floors met, no score < 5
"""
# Extract workflow_id and progress_store from state
if workflow_id is None:
workflow_id = state.get("workflow_id")
if progress_store is None:
progress_store = state.get("progress_store")
# Skip evaluation if workflow has an error (abort mode)
if state.get("error"):
_add_activity_log(workflow_id, progress_store, "critic", "Skipping evaluation - workflow aborted")
error_msg = state.get("error", "")
if "429" in error_msg or "Too Many Requests" in error_msg:
user_friendly_msg = "All AI providers are temporarily unavailable due to rate limits. Please wait a moment and try again."
elif "All LLM providers failed" in error_msg:
user_friendly_msg = "Unable to connect to AI providers. Please check your API keys or try again later."
else:
user_friendly_msg = "Analysis could not be completed. Please try again."
state["critique"] = user_friendly_msg
state["score"] = 0
return state
report = state.get("draft_report", "")
revision_count = state.get("revision_count", 0)
iteration = revision_count + 1 # 1-indexed for display
# Log evaluation start
_add_activity_log(workflow_id, progress_store, "critic", f"Evaluating SWOT quality (iteration {iteration}/3)...")
# Get source data for grounding verification
source_data = state.get("raw_data", "")
# Run LLM evaluation
print(f"Running LLM evaluation (iteration {iteration})...")
llm = get_llm_client()
# Add delay before LLM call to avoid rate limits (Analyzer just called LLM)
print("Waiting 10s before Critic LLM call (rate limit buffer)...")
time.sleep(10)
_add_activity_log(workflow_id, progress_store, "critic", "Calling LLM for quality evaluation...")
start_time = time.time()
result = run_llm_evaluation(report, source_data, iteration, llm)
elapsed = time.time() - start_time
provider = result.get('provider', 'unknown')
# Propagate LLM error to state to trigger graceful exit (prevents infinite retry loop)
if result.get("error"):
_add_activity_log(workflow_id, progress_store, "critic",
"LLM evaluation failed - exiting gracefully with current draft")
state["analyzer_revision_skipped"] = True # Triggers graceful exit in should_continue()
# Log failed providers
providers_failed = result.get('providers_failed', [])
for pf in providers_failed:
_add_activity_log(workflow_id, progress_store, "critic", f"LLM {pf['name']} failed: {pf['error']}")
# Track failed providers in state for frontend
if "llm_providers_failed" not in state:
state["llm_providers_failed"] = []
state["llm_providers_failed"].extend([pf["name"] for pf in providers_failed])
# Extract results
status = result["status"]
weighted_score = result["weighted_score"]
scores = result["scores"]
# ============================================================
# LAYER 4: Deterministic Numeric Validation
# ============================================================
metric_ref = state.get("metric_reference", {})
ref_hash = state.get("metric_reference_hash", "")
if metric_ref and ref_hash:
# Verify integrity before using
if _verify_reference_integrity(metric_ref, ref_hash):
mismatches = validate_numeric_accuracy(report, metric_ref)
if mismatches:
# Log each mismatch for debugging
for mismatch in mismatches:
_add_activity_log(workflow_id, progress_store, "critic",
f"MISMATCH: {mismatch}")
_add_activity_log(workflow_id, progress_store, "critic",
f"Numeric validation: {len(mismatches)} mismatch(es) detected")
# Ensure hallucinations_detected exists
if "hallucinations_detected" not in result:
result["hallucinations_detected"] = []
result["hallucinations_detected"].extend(mismatches)
# Cap evidence_grounding score
if scores.get("evidence_grounding", 0) > 4:
scores["evidence_grounding"] = 4
if "hard_floor_violations" not in result:
result["hard_floor_violations"] = []
result["hard_floor_violations"].append(
"Numeric mismatch detected - evidence_grounding capped at 4"
)
# Add specific feedback
if "actionable_feedback" not in result:
result["actionable_feedback"] = []
result["actionable_feedback"].insert(0,
f"Fix {len(mismatches)} numeric mismatch(es) - use exact values with [M##] citations from reference table"
)
# Recalculate weighted score with capped evidence_grounding
weighted_score = calculate_weighted_score(scores)
result["weighted_score"] = weighted_score
# Force rejection if numeric mismatches
status = "REJECTED"
result["status"] = status
else:
_add_activity_log(workflow_id, progress_store, "critic",
"Numeric validation: all citations verified")
# ============================================================
# LAYER 3: Uncited Number Detection
# ============================================================
# Only validate SWOT section (not Data Report tables which have raw metrics)
swot_section = report
if "## SWOT Analysis" in report:
swot_section = report[report.index("## SWOT Analysis"):]
uncited_warnings = validate_uncited_numbers(swot_section, metric_ref)
if uncited_warnings:
_add_activity_log(workflow_id, progress_store, "critic",
f"Uncited numbers: {len(uncited_warnings)} suspicious value(s) found")
# Add to hallucinations_detected
if "hallucinations_detected" not in result:
result["hallucinations_detected"] = []
result["hallucinations_detected"].extend(uncited_warnings)
# Cap score and add feedback (less severe than mismatches)
if scores.get("evidence_grounding", 0) > 6:
scores["evidence_grounding"] = 6
if "hard_floor_violations" not in result:
result["hard_floor_violations"] = []
result["hard_floor_violations"].append(
"Uncited metric-like numbers found - evidence_grounding capped at 6"
)
# Add feedback
if "actionable_feedback" not in result:
result["actionable_feedback"] = []
result["actionable_feedback"].append(
f"Add [M##] citations for {len(uncited_warnings)} uncited metric value(s)"
)
# Recalculate and reject
weighted_score = calculate_weighted_score(scores)
result["weighted_score"] = weighted_score
status = "REJECTED"
result["status"] = status
# ============================================================
# LAYER 2: Minimum Citation Count Enforcement
# ============================================================
citation_check = validate_minimum_citations(report, metric_ref, min_ratio=0.3)
if not citation_check["valid"]:
_add_activity_log(workflow_id, progress_store, "critic",
f"Citation coverage insufficient: {citation_check['message']}")
# Cap score severely - this indicates LLM ignored citation instructions
if scores.get("evidence_grounding", 0) > 3:
scores["evidence_grounding"] = 3
if "hard_floor_violations" not in result:
result["hard_floor_violations"] = []
result["hard_floor_violations"].append(
f"Insufficient citation coverage ({citation_check['ratio']:.0%}) - evidence_grounding capped at 3"
)
# Add feedback
if "actionable_feedback" not in result:
result["actionable_feedback"] = []
result["actionable_feedback"].insert(0,
f"CRITICAL: Add more [M##] citations. Current: {citation_check['citations_found']}/{citation_check['metrics_available']} ({citation_check['ratio']:.0%})"
)
# Recalculate and reject
weighted_score = calculate_weighted_score(scores)
result["weighted_score"] = weighted_score
status = "REJECTED"
result["status"] = status
else:
_add_activity_log(workflow_id, progress_store, "critic",
f"Citation coverage OK: {citation_check['message']}")
else:
_add_activity_log(workflow_id, progress_store, "critic",
"Warning: metric reference integrity check failed - skipping numeric validation")
# Handle ESCALATE if max iterations reached
if iteration > 3 and status == "REJECTED":
status = "ESCALATE"
_add_activity_log(workflow_id, progress_store, "critic", "Max iterations reached - escalating for human review")
# Log scores
print(f" Status: {status}")
print(f" Weighted Score: {weighted_score:.1f}/10")
for criterion, score in scores.items():
floor = HARD_FLOORS.get(criterion, "-")
print(f" {criterion}: {score}/10 (floor: {floor})")
_add_activity_log(workflow_id, progress_store, "critic", f"Evaluation via {provider} ({elapsed:.1f}s)")
# Log status and score
if status == "APPROVED":
score_msg = f"Score: {weighted_score:.1f}/10"
elif status == "ESCALATE":
score_msg = f"Score: {weighted_score:.1f}/10"
else:
score_msg = f"Score: {weighted_score:.1f}/10"
_add_activity_log(workflow_id, progress_store, "critic", score_msg)
# Build critique message
critique_lines = [
f"Status: {status}",
f"Weighted Score: {weighted_score:.1f}/10",
"",
"Criterion Scores:",
]
for criterion, score in scores.items():
weight = int(CRITERION_WEIGHTS[criterion] * 100)
floor = HARD_FLOORS.get(criterion)
floor_str = f" (floor: {floor})" if floor else ""
passed = "PASS" if score >= (floor or MIN_INDIVIDUAL_SCORE) else "FAIL"
critique_lines.append(f" {criterion}: {score}/10 [{weight}%] {floor_str} - {passed}")
if result.get("hard_floor_violations"):
critique_lines.append("")
critique_lines.append("Hard Floor Violations:")
for v in result["hard_floor_violations"]:
critique_lines.append(f" - {v}")
if result.get("hallucinations_detected"):
critique_lines.append("")
critique_lines.append("Hallucinations Detected:")
for h in result["hallucinations_detected"]:
critique_lines.append(f" - {h}")
if result.get("key_deficiencies"):
critique_lines.append("")
critique_lines.append("Key Deficiencies:")
for i, d in enumerate(result["key_deficiencies"], 1):
critique_lines.append(f" {i}. {d}")
if result.get("actionable_feedback"):
critique_lines.append("")
critique_lines.append("Actionable Feedback:")
for i, f in enumerate(result["actionable_feedback"], 1):
critique_lines.append(f" {i}. {f}")
if result.get("strengths_to_preserve"):
critique_lines.append("")
critique_lines.append("Strengths to Preserve:")
for s in result["strengths_to_preserve"]:
critique_lines.append(f" - {s}")
state["critique"] = "\n".join(critique_lines)
state["score"] = weighted_score
state["critique_details"] = {
"status": status,
"weighted_score": weighted_score,
"scores": scores,
"hard_floor_violations": result.get("hard_floor_violations", []),
"hallucinations_detected": result.get("hallucinations_detected", []),
"key_deficiencies": result.get("key_deficiencies", []),
"strengths_to_preserve": result.get("strengths_to_preserve", []),
"actionable_feedback": result.get("actionable_feedback", []),
}
# Debug: Log what's being set in critique_details
print(f"[DEBUG] Critic: Setting critique_details status={status}, score={weighted_score:.1f}")
# Update progress
if workflow_id and progress_store:
progress_store[workflow_id].update({
"current_step": "critic",
"revision_count": revision_count,
"score": weighted_score
})
return state