PITCHFIGHT_AI / core /retry_handler.py
Aspectgg's picture
UI REFINEMENT
d1b7226
Raw
History Blame Contribute Delete
24.9 kB
"""Retry weakest-question drill handler (Phase 8)."""
from __future__ import annotations
import logging
import re
import uuid
from datetime import datetime, timezone
from typing import Any
from core.claim_extractor import extract_concrete_signals
from core.judge_settings import get_label, normalize_difficulty
from core.json_utils import parse_model_json, sanitize_for_log, _score_label
from core.scoring_engine import _sync_overall_to_dimensions
from core import model_router
from core.deal_verdict import build_judge_verdict
logger = logging.getLogger(__name__)
_VALID_VERDICTS = frozenset({"improved", "slightly_improved", "needs_more_work"})
_NON_ANSWER_RE = re.compile(
r"^(ok|yeah|yes|no|idk|i don'?t know|not sure|maybe|n/?a)\.?$",
re.IGNORECASE,
)
_DIM_RETRY_QUESTIONS: dict[str, str] = {
"clarity": (
"Explain your product again in one clear sentence. "
"Who is it for, what does it do, and what outcome does it create?"
),
"problem_understanding": (
"Give one specific example that proves this user pain is real and repeated."
),
"market_awareness": (
"Name your first target segment and one number that proves this market is worth starting with."
),
"differentiation": (
"Why would someone choose your product over existing alternatives? "
"Give one concrete mechanism or proof point."
),
"business_model": (
"Who pays, how much do they pay, and why does the math work?"
),
"objection_handling": (
"Answer the judge's objection directly using one specific number, example, or proof point."
),
}
def build_local_retry_question(answer_to_retry: dict) -> str:
"""Build a coaching retry question from dimension when original judge text is missing."""
dim = str(answer_to_retry.get("dimension", "")).strip().lower()
return _DIM_RETRY_QUESTIONS.get(
dim,
_DIM_RETRY_QUESTIONS["objection_handling"],
)
def _find_original_question(
session: dict,
round_num: int | None,
attack_tag: str,
) -> str:
"""Locate the judge question that prompted the weak answer."""
history = session.get("history", [])
if round_num and int(round_num) > 0:
target = int(round_num)
user_count = 0
for idx, msg in enumerate(history):
if msg.get("role") != "user":
continue
user_count += 1
if user_count == target:
for j in range(idx - 1, -1, -1):
if history[j].get("role") == "assistant":
return str(history[j].get("content", "")).strip()
break
tag_norm = str(attack_tag or "").lower().replace("_", " ").strip()
if tag_norm:
for msg in reversed(history):
if msg.get("role") != "assistant":
continue
msg_tag = str(msg.get("attack_tag", "")).lower().replace("_", " ").strip()
if msg_tag and (tag_norm in msg_tag or msg_tag in tag_norm):
return str(msg.get("content", "")).strip()
for msg in reversed(history):
if msg.get("role") == "assistant":
return str(msg.get("content", "")).strip()
return ""
def _dimension_score(scorecard: dict, dimension: str) -> int:
scores = scorecard.get("scores") or {}
dim_data = scores.get(dimension) or {}
try:
return int(dim_data.get("score", 30))
except (TypeError, ValueError):
return 30
def start_retry_drill(session: dict) -> dict[str, Any]:
"""Prepare a retry drill from the latest scorecard answer_to_retry."""
scorecard = session.get("latest_scorecard")
if not scorecard:
return {"error": "No scorecard found. End a battle before retrying."}
se = scorecard.get("score_explanation") or {}
atr = se.get("answer_to_retry") or {}
dimension = str(atr.get("dimension", "")).strip()
if not dimension:
return {"error": "No answer to retry found in scorecard."}
session_id = str(session.get("session_id", ""))
attack_tag = str(atr.get("attack_tag", ""))
round_num = atr.get("round")
original_answer = str(atr.get("original_answer", ""))
why_it_hurt = str(atr.get("why_it_hurt", ""))
sample_stronger = str(atr.get("sample_stronger_answer", ""))
original_question = _find_original_question(session, round_num, attack_tag)
retry_question = original_question or build_local_retry_question(atr)
difficulty_profile = session.get("difficulty_profile") or normalize_difficulty(
session.get("difficulty", "practice")
)
difficulty_label = session.get("difficulty_label") or get_label(difficulty_profile)
# Snapshot the scorecard baseline at drill-creation time so that any later
# scorecard mutation (or session reload) cannot shift the projection baseline.
sc_scores = scorecard.get("scores") or {}
original_overall_score = int(scorecard.get("overall", 0) or 0)
original_dimension_scores = {
k: int(v.get("score", 0) or 0)
for k, v in sc_scores.items()
if isinstance(v, dict)
}
dim_score_before = original_dimension_scores.get(
dimension, _dimension_score(scorecard, dimension)
)
retry_id = str(uuid.uuid4())
drill = {
"retry_id": retry_id,
"created_at": datetime.now(timezone.utc).isoformat(),
"source": "scorecard_path_to_80",
"dimension": dimension,
"attack_tag": attack_tag,
"original_question": original_question,
"retry_question": retry_question,
"original_answer": original_answer,
"why_it_hurt": why_it_hurt,
"sample_stronger_answer": sample_stronger,
"input_mode": "",
"retry_answer": "",
"result": {},
"dimension_score_before": dim_score_before,
# Authoritative baseline — never re-read from session after this point.
"original_overall_score": original_overall_score,
"original_dimension_scores": original_dimension_scores,
}
session.setdefault("retry_drills", {})[retry_id] = drill
return {
"session_id": session_id,
"retry_id": retry_id,
"retry_question": retry_question,
"original_question": original_question,
"original_answer": original_answer,
"dimension": dimension,
"attack_tag": attack_tag,
"why_it_hurt": why_it_hurt,
"sample_stronger_answer": sample_stronger,
"difficulty_profile": difficulty_profile,
"difficulty_label": difficulty_label,
}
def _answer_has_signals(text: str) -> bool:
sigs = extract_concrete_signals({
"history": [{"role": "user", "content": text}],
"startup": {},
})
return sigs.get("signal_count", 0) > 0 or bool(re.search(r"\d", text))
def build_local_retry_fallback(
original_answer: str,
retry_answer: str,
dimension: str,
dimension_before: int = 30,
) -> dict[str, Any]:
"""Local comparison when Nemotron is unavailable."""
original = original_answer.strip()
retry = retry_answer.strip()
before = max(0, min(100, int(dimension_before)))
if not retry or _NON_ANSWER_RE.match(retry) or len(retry.split()) < 4:
after = before
verdict = "needs_more_work"
what_improved = "The retry answer was too brief or did not address the question."
still_missing = "A specific fact, number, user example, or mechanism is still missing."
tip = build_local_retry_question({"dimension": dimension})
elif _answer_has_signals(retry) and len(retry) > len(original) + 8:
gain = min(26, max(12, len(retry.split()) // 2))
after = min(before + gain, 78)
verdict = "improved" if gain >= 12 else "slightly_improved"
what_improved = "You added concrete evidence or specifics that were missing before."
still_missing = (
"Tighten the answer further with one sharper proof point tied to the judge's question."
if after < 55 else "Good progress — add one more proof point to make it investor-ready."
)
tip = f"Lead with your strongest number or example when answering {dimension.replace('_', ' ')} questions."
elif len(retry) > len(original) + 4:
after = min(before + 8, 58)
verdict = "slightly_improved" if after > before else "needs_more_work"
what_improved = "The retry answer is more complete, but proof is still thin."
still_missing = "Add one number, named user segment, or competitor contrast."
tip = build_local_retry_question({"dimension": dimension})
else:
after = before if len(retry) <= len(original) else min(before + 5, 50)
verdict = "needs_more_work" if after == before else "slightly_improved"
what_improved = "Some extra detail was added, but the core objection may still be open."
still_missing = "Answer the exact question with one verifiable fact or example."
tip = build_local_retry_question({"dimension": dimension})
overall_lift = max(0, min(15, int((after - before) * 0.45)))
if overall_lift < 4 and after > before:
overall_lift = 4
return {
"comparison": {
"old_answer_summary": original[:200] or "No substantive prior answer.",
"new_answer_summary": retry[:200],
"what_improved": what_improved,
"still_missing": still_missing,
"specific_tip": tip,
"estimated_dimension_before": before,
"estimated_dimension_after": after,
"estimated_overall_lift": overall_lift,
"verdict": verdict,
},
"next_practice_prompt": build_local_retry_question({"dimension": dimension}),
}
def _build_retry_comparison_messages(
session: dict,
drill: dict,
retry_answer: str,
) -> list[dict[str, str]]:
startup = session.get("startup", {}) or {}
scorecard = session.get("latest_scorecard") or {}
difficulty_profile = session.get("difficulty_profile") or "practice"
difficulty_label = session.get("difficulty_label") or get_label(difficulty_profile)
dim = drill.get("dimension", "")
dim_before = drill.get("dimension_score_before", _dimension_score(scorecard, dim))
startup_lines = [
f"Name: {startup.get('name', '')}",
f"Problem: {startup.get('problem', '')}",
f"Solution: {startup.get('solution', '')}",
f"Traction: {startup.get('traction', '')}",
]
system = (
"You are a startup pitch coach comparing an old weak answer to a new retry answer.\n"
"You are NOT rescoring the whole battle — only one dimension.\n"
"Be specific and coaching-oriented. Do not overpraise. Do not hallucinate facts.\n"
"Use only the provided text. Return ONLY valid JSON.\n\n"
"REQUIRED JSON:\n"
'{"comparison":{"old_answer_summary":"","new_answer_summary":"","what_improved":"",'
'"still_missing":"","specific_tip":"","estimated_dimension_before":0,'
'"estimated_dimension_after":0,"estimated_overall_lift":0,'
'"verdict":"improved|slightly_improved|needs_more_work"},'
'"next_practice_prompt":""}\n\n'
"Rules:\n"
f"- estimated_dimension_before should be near {dim_before}.\n"
"- estimated_dimension_after must be realistic (do not jump above 75 unless strong proof).\n"
"- estimated_overall_lift usually 3–12 points.\n"
"- Each text field: 1–2 sentences max.\n"
"- next_practice_prompt: one coaching question only.\n"
"- verdict must be improved, slightly_improved, or needs_more_work."
)
user = (
f"Difficulty: {difficulty_label} ({difficulty_profile})\n"
f"Dimension: {dim}\n"
f"Attack tag: {drill.get('attack_tag', '')}\n\n"
f"Startup context:\n" + "\n".join(startup_lines) + "\n\n"
f"Original judge question:\n{drill.get('original_question') or drill.get('retry_question', '')}\n\n"
f"Retry question:\n{drill.get('retry_question', '')}\n\n"
f"Original weak answer:\n{drill.get('original_answer', '')}\n\n"
f"Why it hurt:\n{drill.get('why_it_hurt', '')}\n\n"
f"Sample stronger direction:\n{drill.get('sample_stronger_answer', '')}\n\n"
f"New retry answer:\n{retry_answer}\n"
)
return [
{"role": "system", "content": system},
{"role": "user", "content": user},
]
def _normalize_comparison_result(
parsed: dict,
drill: dict,
original_answer: str,
retry_answer: str,
) -> dict[str, Any]:
comp = parsed.get("comparison") if isinstance(parsed.get("comparison"), dict) else parsed
if not isinstance(comp, dict):
raise ValueError("missing comparison object")
before = drill.get("dimension_score_before", 30)
try:
est_before = int(comp.get("estimated_dimension_before", before))
except (TypeError, ValueError):
est_before = before
try:
est_after = int(comp.get("estimated_dimension_after", est_before))
except (TypeError, ValueError):
est_after = est_before
est_before = max(0, min(100, est_before))
est_after = max(est_before, min(82, est_after))
if est_after < est_before:
est_after = est_before
verdict = str(comp.get("verdict", "needs_more_work")).strip().lower()
if verdict not in _VALID_VERDICTS:
verdict = "slightly_improved" if est_after > est_before else "needs_more_work"
try:
lift = int(comp.get("estimated_overall_lift", 0))
except (TypeError, ValueError):
lift = max(0, int((est_after - est_before) * 0.35))
lift = max(0, min(15, lift))
if est_after > est_before and lift < 4:
lift = 4
return {
"comparison": {
"old_answer_summary": str(comp.get("old_answer_summary", original_answer[:200]))[:300],
"new_answer_summary": str(comp.get("new_answer_summary", retry_answer[:200]))[:300],
"what_improved": str(comp.get("what_improved", ""))[:300],
"still_missing": str(comp.get("still_missing", ""))[:300],
"specific_tip": str(comp.get("specific_tip", ""))[:300],
"estimated_dimension_before": est_before,
"estimated_dimension_after": est_after,
"estimated_overall_lift": lift,
"verdict": verdict,
},
"next_practice_prompt": str(
parsed.get("next_practice_prompt")
or build_local_retry_question({"dimension": drill.get("dimension", "")})
)[:300],
}
def call_nemotron_retry_comparison(
session: dict,
drill: dict,
retry_answer: str,
model_mode: str | None = None,
) -> dict[str, Any] | None:
"""Call Nemotron to compare old vs new retry answer. Returns None on failure."""
messages = _build_retry_comparison_messages(session, drill, retry_answer)
resolved = model_mode or session.get("model_mode") or "premium_nvidia"
result = model_router.generate_retry_comparison_response(messages, model_mode=resolved)
if not result.get("ok") or not result.get("content"):
logger.warning("retry_handler: Nemotron comparison failed — %s", result.get("error"))
return None
raw = result["content"]
parsed, _ = parse_model_json(raw)
if not isinstance(parsed, dict) or not parsed:
repair = model_router.generate_retry_comparison_repair_response(raw, model_mode=resolved)
if repair.get("ok") and repair.get("content"):
parsed, _ = parse_model_json(repair["content"])
if not isinstance(parsed, dict) or not parsed:
logger.warning(
"retry_handler: comparison JSON parse failed preview=%r",
sanitize_for_log(raw),
)
return None
try:
return _normalize_comparison_result(
parsed, drill, drill.get("original_answer", ""), retry_answer
)
except ValueError as exc:
logger.warning("retry_handler: comparison normalize failed — %s", exc)
return None
def compute_retry_projection(
session: dict,
drill: dict,
comparison: dict,
) -> dict[str, Any]:
"""Non-destructive training projection — original scorecard stays unchanged.
Uses the baseline snapshotted onto the drill at start_retry_drill time so that
any scorecard mutation between drill-start and drill-submit cannot corrupt the
displayed baseline (the bug was: practice-nudge stripped by a later resync left
scorecard["overall"]=28 while the UI showed 31 from the original API response).
"""
scorecard = session.get("latest_scorecard") or {}
dim = str(drill.get("dimension", "")).strip()
# --- Authoritative baseline: prefer drill snapshot, fall back to live session ---
original_overall = int(
drill.get("original_overall_score")
if drill.get("original_overall_score") is not None
else (scorecard.get("overall", 0) or 0)
)
# Use snapshotted dimension scores; fall back to live scorecard scores.
original_dim_scores: dict[str, int] = drill.get("original_dimension_scores") or {}
if not original_dim_scores:
scores = scorecard.get("scores") or {}
original_dim_scores = {
k: int(v.get("score", 0) or 0)
for k, v in scores.items() if isinstance(v, dict)
}
# --- Old dimension score for this specific target ---
old_dim_score = int(
original_dim_scores.get(
dim,
drill.get("dimension_score_before", 0) or 0,
)
)
# --- New dimension score from Nemotron/fallback comparison ---
try:
raw_new = int(comparison.get("estimated_dimension_after", old_dim_score))
except (TypeError, ValueError):
raw_new = old_dim_score
# Never allow the new score to appear lower than the old score in the projection.
new_dim_score = max(old_dim_score, raw_new)
dimension_delta = new_dim_score - old_dim_score
if dimension_delta > 0:
# Replace only the target dimension; all others stay at their original values.
projected_scores = dict(original_dim_scores)
projected_scores[dim] = new_dim_score
n_dims = len(projected_scores) or 1
dim_avg_projection = round(sum(projected_scores.values()) / n_dims)
# Proportional lift ensures even a single-dim improvement is visible when
# the raw average is still dragged down by other weak dims.
proportional_lift = max(1, round(dimension_delta / n_dims))
projected_overall = max(
dim_avg_projection,
original_overall,
min(100, original_overall + proportional_lift),
)
projected_overall_delta = max(0, projected_overall - original_overall)
else:
projected_overall = original_overall
projected_overall_delta = 0
return {
"target_dimension": dim,
"old_dimension_score": old_dim_score,
"new_dimension_score": new_dim_score,
"dimension_delta": dimension_delta,
"original_overall_score": original_overall,
"projected_overall_score": projected_overall,
"projected_overall_delta": projected_overall_delta,
"original_scorecard_unchanged": True,
"projection_method": "replace_target_dimension_only",
}
def apply_retry_to_scorecard(
session: dict,
drill: dict,
comparison: dict,
) -> dict[str, Any] | None:
"""Apply retry improvement to stored scorecard so UI reflects the new score."""
scorecard = session.get("latest_scorecard")
if not scorecard or not isinstance(scorecard, dict):
return None
dim = str(drill.get("dimension", "")).strip()
if not dim:
return None
try:
after_dim = int(comparison.get("estimated_dimension_after", 0))
lift = int(comparison.get("estimated_overall_lift", 0))
except (TypeError, ValueError):
return None
verdict = str(comparison.get("verdict", "")).lower()
if verdict == "needs_more_work" and after_dim <= int(drill.get("dimension_score_before", 0)):
return scorecard
scores = scorecard.get("scores") or {}
dim_data = scores.get(dim)
# Capture the overall and dimension-sum BEFORE the update so we can apply the
# improvement as a delta. This preserves any offset baked into the displayed overall
# (e.g. the Practice nudge) instead of silently dropping it on a pure-mean recompute —
# which previously made a real dimension gain look like "overall didn't change".
old_overall = int(scorecard.get("overall", 0) or 0)
n_dims = len(scores) or 1
old_sum = sum(int(v.get("score", 0)) for v in scores.values())
updated = False
if isinstance(dim_data, dict) and after_dim > int(dim_data.get("score", 0)):
dim_data = dict(dim_data)
dim_data["score"] = after_dim
dim_data["label"] = _score_label(after_dim)
improved = str(comparison.get("what_improved", "")).strip()
if improved:
dim_data["reason"] = improved[:280]
retry_text = str(drill.get("retry_answer", "")).strip()
if retry_text:
dim_data["quote"] = retry_text[:200]
scores[dim] = dim_data
scorecard["scores"] = scores
updated = True
if updated:
new_sum = sum(int(v.get("score", 0)) for v in scores.values())
delta = round((new_sum - old_sum) / n_dims)
new_overall = max(0, min(100, old_overall + delta))
scorecard["overall"] = new_overall
scorecard["overall_label"] = _score_label(new_overall)
# Real lift the UI can trust (matches the overall it now displays).
actual_lift = new_overall - old_overall
else:
new_overall = old_overall
actual_lift = 0
se = dict(scorecard.get("score_explanation") or {})
esif = dict(se.get("estimated_score_if_fixed") or {})
esif["current_overall"] = new_overall
esif["estimated_new_overall"] = min(95, max(new_overall + 4, int(esif.get("estimated_new_overall", new_overall))))
se["estimated_score_if_fixed"] = esif
atr = dict(se.get("answer_to_retry") or {})
if drill.get("retry_answer"):
atr["original_answer"] = str(drill["retry_answer"])[:300]
se["answer_to_retry"] = atr
scorecard["score_explanation"] = se
if drill.get("retry_answer"):
scorecard["weakest_answer"] = str(drill["retry_answer"])[:400]
scorecard["retry_applied"] = True
scorecard["retry_dimension"] = dim
scorecard["retry_overall_lift"] = actual_lift
session["latest_scorecard"] = scorecard
return scorecard
def evaluate_retry_answer(
session: dict,
retry_id: str,
retry_answer: str,
input_mode: str = "text",
voice_turn_id: str = "",
) -> dict[str, Any]:
"""Evaluate a retry answer and store the result on the session."""
session_id = str(session.get("session_id", ""))
drills = session.get("retry_drills") or {}
drill = drills.get(retry_id)
if not drill:
return {"error": "Retry drill not found. Start a new retry from the scorecard."}
answer = str(retry_answer or "").strip()
if not answer:
return {"error": "Retry answer cannot be empty."}
drill["retry_answer"] = answer
drill["input_mode"] = input_mode or "text"
if voice_turn_id:
drill["voice_turn_id"] = voice_turn_id
nemotron_result = call_nemotron_retry_comparison(session, drill, answer)
if nemotron_result is not None:
comparison_result = nemotron_result
retry_score_source = "nemotron"
model_ok = True
fallback_reason = ""
else:
comparison_result = build_local_retry_fallback(
drill.get("original_answer", ""),
answer,
drill.get("dimension", "objection_handling"),
drill.get("dimension_score_before", 30),
)
retry_score_source = "local_fallback"
model_ok = False
fallback_reason = "Nemotron unavailable — local heuristic used"
drill["result"] = comparison_result
comp = comparison_result.get("comparison", {})
projection = compute_retry_projection(session, drill, comp)
response: dict[str, Any] = {
"session_id": session_id,
"retry_id": retry_id,
"dimension": drill.get("dimension", ""),
"attack_tag": drill.get("attack_tag", ""),
"original_question": drill.get("original_question", ""),
"retry_question": drill.get("retry_question", ""),
"original_answer": drill.get("original_answer", ""),
"retry_answer": answer,
"comparison": comp,
"projection": projection,
"next_practice_prompt": comparison_result.get("next_practice_prompt", ""),
"scorecard_unchanged": True,
"retry_score_source": retry_score_source,
"model_ok": model_ok,
"fallback_reason": fallback_reason,
}
return response