Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import re | |
| from typing import Any | |
| from .fixtures import TaskFixture | |
| from .models import ScoreCriterion, TaskScorecard | |
| _SPACE_RE = re.compile(r"\s+") | |
| def normalize_text(text: str) -> str: | |
| lowered = text.lower().replace("-", " ") | |
| normalized = _SPACE_RE.sub(" ", lowered) | |
| return normalized.strip() | |
| def contains_any(text: str, phrases: tuple[str, ...] | list[str]) -> bool: | |
| normalized = normalize_text(text) | |
| return any(normalize_text(phrase) in normalized for phrase in phrases) | |
| def contains_all_groups(text: str, groups: list[tuple[str, ...]]) -> bool: | |
| return all(contains_any(text, group) for group in groups) | |
| def _criterion(criterion_id: str, label: str, weight: float, earned: bool) -> ScoreCriterion: | |
| contribution = round(weight if earned else 0.0, 6) | |
| return ScoreCriterion( | |
| criterion_id=criterion_id, | |
| label=label, | |
| weight=weight, | |
| earned=earned, | |
| contribution=contribution, | |
| ) | |
| def _reply_messages(state: Any) -> list[str]: | |
| return [entry["message"] for entry in state.reply_history] | |
| def _has_reply_matching(state: Any, matcher) -> bool: | |
| return any(matcher(message) for message in _reply_messages(state)) | |
| def build_scorecard(fixture: TaskFixture, state: Any) -> TaskScorecard: | |
| if fixture.task_id == "password_reset_guidance": | |
| criteria = _score_password_reset(fixture, state) | |
| elif fixture.task_id == "duplicate_charge_refund": | |
| criteria = _score_duplicate_charge(fixture, state) | |
| elif fixture.task_id == "enterprise_data_loss_escalation": | |
| criteria = _score_enterprise_escalation(fixture, state) | |
| else: | |
| raise ValueError(f"Unsupported task_id: {fixture.task_id}") | |
| total_score = round(sum(item.contribution for item in criteria), 6) | |
| return TaskScorecard(task_id=fixture.task_id, score=min(max(total_score, 0.0), 1.0), criteria=criteria) | |
| def _score_password_reset(fixture: TaskFixture, state: Any) -> list[ScoreCriterion]: | |
| weights = fixture.rubric_weights | |
| replies = _reply_messages(state) | |
| return [ | |
| _criterion( | |
| "searched_kb", | |
| "Relevant KB article retrieved", | |
| weights["searched_kb"], | |
| fixture.relevant_kb_article_id in state.kb_articles_seen, | |
| ), | |
| _criterion( | |
| "reply_has_reset_url", | |
| "Reply includes the password reset URL", | |
| weights["reply_has_reset_url"], | |
| any(fixture.reply_keyword_groups["reset_url"][0] in reply for reply in replies), | |
| ), | |
| _criterion( | |
| "reply_mentions_spam_folder", | |
| "Reply mentions checking spam or junk", | |
| weights["reply_mentions_spam_folder"], | |
| _has_reply_matching(state, lambda text: contains_any(text, fixture.reply_keyword_groups["spam_folder"])), | |
| ), | |
| _criterion( | |
| "resolved_correctly", | |
| "Ticket resolved with the correct resolution code", | |
| weights["resolved_correctly"], | |
| state.ticket_status == "resolved" and state.resolution_code == fixture.expected_resolution_code, | |
| ), | |
| _criterion( | |
| "efficient_completion", | |
| "Episode completed efficiently", | |
| weights["efficient_completion"], | |
| state.ticket_status == "resolved" | |
| and state.resolution_code == fixture.expected_resolution_code | |
| and state.steps_taken <= (fixture.efficiency_bonus_max_steps or 0), | |
| ), | |
| ] | |
| def _score_duplicate_charge(fixture: TaskFixture, state: Any) -> list[ScoreCriterion]: | |
| weights = fixture.rubric_weights | |
| def acknowledges_and_apologizes(reply: str) -> bool: | |
| return contains_all_groups( | |
| reply, | |
| [ | |
| fixture.reply_keyword_groups["duplicate_ack"], | |
| fixture.reply_keyword_groups["regret"], | |
| fixture.reply_keyword_groups["refund_confirmed"], | |
| ], | |
| ) | |
| return [ | |
| _criterion( | |
| "lookup_account", | |
| "Account lookup completed", | |
| weights["lookup_account"], | |
| state.lookup_performed, | |
| ), | |
| _criterion( | |
| "searched_kb", | |
| "Duplicate charge policy article retrieved", | |
| weights["searched_kb"], | |
| fixture.relevant_kb_article_id in state.kb_articles_seen, | |
| ), | |
| _criterion( | |
| "correct_refund", | |
| "Correct full duplicate-charge refund issued", | |
| weights["correct_refund"], | |
| state.refund_record is not None | |
| and state.refund_record["amount_cents"] == fixture.expected_refund_amount_cents | |
| and state.refund_record["reason_code"] == fixture.refund_reason_code, | |
| ), | |
| _criterion( | |
| "reply_mentions_timeline", | |
| "Reply mentions the refund timeline", | |
| weights["reply_mentions_timeline"], | |
| _has_reply_matching(state, lambda text: contains_any(text, fixture.reply_keyword_groups["timeline"])), | |
| ), | |
| _criterion( | |
| "reply_acknowledges_and_apologizes", | |
| "Reply acknowledges the duplicate charge, apologizes, and confirms the refund", | |
| weights["reply_acknowledges_and_apologizes"], | |
| _has_reply_matching(state, acknowledges_and_apologizes), | |
| ), | |
| _criterion( | |
| "resolved_without_escalation", | |
| "Ticket resolved instead of escalated", | |
| weights["resolved_without_escalation"], | |
| state.ticket_status == "resolved" and state.resolution_code == fixture.expected_resolution_code, | |
| ), | |
| ] | |
| def _score_enterprise_escalation(fixture: TaskFixture, state: Any) -> list[ScoreCriterion]: | |
| weights = fixture.rubric_weights | |
| escalation_step = state.escalation["step_index"] if state.escalation else None | |
| def careful_reply(reply: str) -> bool: | |
| return ( | |
| contains_all_groups( | |
| reply, | |
| [ | |
| fixture.reply_keyword_groups["urgency"], | |
| fixture.reply_keyword_groups["escalation"], | |
| ], | |
| ) | |
| and not contains_any(reply, fixture.forbidden_reply_phrases) | |
| ) | |
| reply_before_escalation = any( | |
| escalation_step is None or reply["step_index"] < escalation_step for reply in state.reply_history | |
| ) | |
| return [ | |
| _criterion( | |
| "lookup_account", | |
| "Account lookup completed", | |
| weights["lookup_account"], | |
| state.lookup_performed, | |
| ), | |
| _criterion( | |
| "no_refund_or_policy_action", | |
| "No refund or resolution policy action was applied", | |
| weights["no_refund_or_policy_action"], | |
| state.done and not state.refund_attempted and state.resolution_code is None, | |
| ), | |
| _criterion( | |
| "reply_sent_before_escalation", | |
| "A reply was sent before escalation", | |
| weights["reply_sent_before_escalation"], | |
| reply_before_escalation and bool(state.reply_history), | |
| ), | |
| _criterion( | |
| "careful_reply", | |
| "Reply acknowledges urgency, mentions escalation, and avoids liability", | |
| weights["careful_reply"], | |
| any( | |
| (escalation_step is None or reply["step_index"] < escalation_step) | |
| and careful_reply(reply["message"]) | |
| for reply in state.reply_history | |
| ), | |
| ), | |
| _criterion( | |
| "correct_escalation", | |
| "Escalation uses the correct queue and priority", | |
| weights["correct_escalation"], | |
| state.escalation is not None | |
| and state.escalation["queue"] == fixture.expected_escalation_queue | |
| and state.escalation["priority"] == fixture.expected_escalation_priority, | |
| ), | |
| _criterion( | |
| "not_resolved", | |
| "Ticket was not resolved", | |
| weights["not_resolved"], | |
| state.done and state.resolution_code is None, | |
| ), | |
| ] | |