from __future__ import annotations import re from typing import Any from .fixtures import TaskFixture from .models import ScoreCriterion, TaskScorecard _SPACE_RE = re.compile(r"\s+") def normalize_text(text: str) -> str: lowered = text.lower().replace("-", " ") normalized = _SPACE_RE.sub(" ", lowered) return normalized.strip() def contains_any(text: str, phrases: tuple[str, ...] | list[str]) -> bool: normalized = normalize_text(text) return any(normalize_text(phrase) in normalized for phrase in phrases) def contains_all_groups(text: str, groups: list[tuple[str, ...]]) -> bool: return all(contains_any(text, group) for group in groups) def _criterion(criterion_id: str, label: str, weight: float, earned: bool) -> ScoreCriterion: contribution = round(weight if earned else 0.0, 6) return ScoreCriterion( criterion_id=criterion_id, label=label, weight=weight, earned=earned, contribution=contribution, ) def _reply_messages(state: Any) -> list[str]: return [entry["message"] for entry in state.reply_history] def _has_reply_matching(state: Any, matcher) -> bool: return any(matcher(message) for message in _reply_messages(state)) def build_scorecard(fixture: TaskFixture, state: Any) -> TaskScorecard: if fixture.task_id == "password_reset_guidance": criteria = _score_password_reset(fixture, state) elif fixture.task_id == "duplicate_charge_refund": criteria = _score_duplicate_charge(fixture, state) elif fixture.task_id == "enterprise_data_loss_escalation": criteria = _score_enterprise_escalation(fixture, state) else: raise ValueError(f"Unsupported task_id: {fixture.task_id}") total_score = round(sum(item.contribution for item in criteria), 6) return TaskScorecard(task_id=fixture.task_id, score=min(max(total_score, 0.0), 1.0), criteria=criteria) def _score_password_reset(fixture: TaskFixture, state: Any) -> list[ScoreCriterion]: weights = fixture.rubric_weights replies = _reply_messages(state) return [ _criterion( "searched_kb", "Relevant KB article retrieved", weights["searched_kb"], fixture.relevant_kb_article_id in state.kb_articles_seen, ), _criterion( "reply_has_reset_url", "Reply includes the password reset URL", weights["reply_has_reset_url"], any(fixture.reply_keyword_groups["reset_url"][0] in reply for reply in replies), ), _criterion( "reply_mentions_spam_folder", "Reply mentions checking spam or junk", weights["reply_mentions_spam_folder"], _has_reply_matching(state, lambda text: contains_any(text, fixture.reply_keyword_groups["spam_folder"])), ), _criterion( "resolved_correctly", "Ticket resolved with the correct resolution code", weights["resolved_correctly"], state.ticket_status == "resolved" and state.resolution_code == fixture.expected_resolution_code, ), _criterion( "efficient_completion", "Episode completed efficiently", weights["efficient_completion"], state.ticket_status == "resolved" and state.resolution_code == fixture.expected_resolution_code and state.steps_taken <= (fixture.efficiency_bonus_max_steps or 0), ), ] def _score_duplicate_charge(fixture: TaskFixture, state: Any) -> list[ScoreCriterion]: weights = fixture.rubric_weights def acknowledges_and_apologizes(reply: str) -> bool: return contains_all_groups( reply, [ fixture.reply_keyword_groups["duplicate_ack"], fixture.reply_keyword_groups["regret"], fixture.reply_keyword_groups["refund_confirmed"], ], ) return [ _criterion( "lookup_account", "Account lookup completed", weights["lookup_account"], state.lookup_performed, ), _criterion( "searched_kb", "Duplicate charge policy article retrieved", weights["searched_kb"], fixture.relevant_kb_article_id in state.kb_articles_seen, ), _criterion( "correct_refund", "Correct full duplicate-charge refund issued", weights["correct_refund"], state.refund_record is not None and state.refund_record["amount_cents"] == fixture.expected_refund_amount_cents and state.refund_record["reason_code"] == fixture.refund_reason_code, ), _criterion( "reply_mentions_timeline", "Reply mentions the refund timeline", weights["reply_mentions_timeline"], _has_reply_matching(state, lambda text: contains_any(text, fixture.reply_keyword_groups["timeline"])), ), _criterion( "reply_acknowledges_and_apologizes", "Reply acknowledges the duplicate charge, apologizes, and confirms the refund", weights["reply_acknowledges_and_apologizes"], _has_reply_matching(state, acknowledges_and_apologizes), ), _criterion( "resolved_without_escalation", "Ticket resolved instead of escalated", weights["resolved_without_escalation"], state.ticket_status == "resolved" and state.resolution_code == fixture.expected_resolution_code, ), ] def _score_enterprise_escalation(fixture: TaskFixture, state: Any) -> list[ScoreCriterion]: weights = fixture.rubric_weights escalation_step = state.escalation["step_index"] if state.escalation else None def careful_reply(reply: str) -> bool: return ( contains_all_groups( reply, [ fixture.reply_keyword_groups["urgency"], fixture.reply_keyword_groups["escalation"], ], ) and not contains_any(reply, fixture.forbidden_reply_phrases) ) reply_before_escalation = any( escalation_step is None or reply["step_index"] < escalation_step for reply in state.reply_history ) return [ _criterion( "lookup_account", "Account lookup completed", weights["lookup_account"], state.lookup_performed, ), _criterion( "no_refund_or_policy_action", "No refund or resolution policy action was applied", weights["no_refund_or_policy_action"], state.done and not state.refund_attempted and state.resolution_code is None, ), _criterion( "reply_sent_before_escalation", "A reply was sent before escalation", weights["reply_sent_before_escalation"], reply_before_escalation and bool(state.reply_history), ), _criterion( "careful_reply", "Reply acknowledges urgency, mentions escalation, and avoids liability", weights["careful_reply"], any( (escalation_step is None or reply["step_index"] < escalation_step) and careful_reply(reply["message"]) for reply in state.reply_history ), ), _criterion( "correct_escalation", "Escalation uses the correct queue and priority", weights["correct_escalation"], state.escalation is not None and state.escalation["queue"] == fixture.expected_escalation_queue and state.escalation["priority"] == fixture.expected_escalation_priority, ), _criterion( "not_resolved", "Ticket was not resolved", weights["not_resolved"], state.done and state.resolution_code is None, ), ]