Dar3devil's picture
Initial customer support OpenEnv upload
2b73c16 verified
Raw
History Blame Contribute Delete
8.27 kB
from __future__ import annotations
import re
from typing import Any
from .fixtures import TaskFixture
from .models import ScoreCriterion, TaskScorecard
_SPACE_RE = re.compile(r"\s+")
def normalize_text(text: str) -> str:
lowered = text.lower().replace("-", " ")
normalized = _SPACE_RE.sub(" ", lowered)
return normalized.strip()
def contains_any(text: str, phrases: tuple[str, ...] | list[str]) -> bool:
normalized = normalize_text(text)
return any(normalize_text(phrase) in normalized for phrase in phrases)
def contains_all_groups(text: str, groups: list[tuple[str, ...]]) -> bool:
return all(contains_any(text, group) for group in groups)
def _criterion(criterion_id: str, label: str, weight: float, earned: bool) -> ScoreCriterion:
contribution = round(weight if earned else 0.0, 6)
return ScoreCriterion(
criterion_id=criterion_id,
label=label,
weight=weight,
earned=earned,
contribution=contribution,
)
def _reply_messages(state: Any) -> list[str]:
return [entry["message"] for entry in state.reply_history]
def _has_reply_matching(state: Any, matcher) -> bool:
return any(matcher(message) for message in _reply_messages(state))
def build_scorecard(fixture: TaskFixture, state: Any) -> TaskScorecard:
if fixture.task_id == "password_reset_guidance":
criteria = _score_password_reset(fixture, state)
elif fixture.task_id == "duplicate_charge_refund":
criteria = _score_duplicate_charge(fixture, state)
elif fixture.task_id == "enterprise_data_loss_escalation":
criteria = _score_enterprise_escalation(fixture, state)
else:
raise ValueError(f"Unsupported task_id: {fixture.task_id}")
total_score = round(sum(item.contribution for item in criteria), 6)
return TaskScorecard(task_id=fixture.task_id, score=min(max(total_score, 0.0), 1.0), criteria=criteria)
def _score_password_reset(fixture: TaskFixture, state: Any) -> list[ScoreCriterion]:
weights = fixture.rubric_weights
replies = _reply_messages(state)
return [
_criterion(
"searched_kb",
"Relevant KB article retrieved",
weights["searched_kb"],
fixture.relevant_kb_article_id in state.kb_articles_seen,
),
_criterion(
"reply_has_reset_url",
"Reply includes the password reset URL",
weights["reply_has_reset_url"],
any(fixture.reply_keyword_groups["reset_url"][0] in reply for reply in replies),
),
_criterion(
"reply_mentions_spam_folder",
"Reply mentions checking spam or junk",
weights["reply_mentions_spam_folder"],
_has_reply_matching(state, lambda text: contains_any(text, fixture.reply_keyword_groups["spam_folder"])),
),
_criterion(
"resolved_correctly",
"Ticket resolved with the correct resolution code",
weights["resolved_correctly"],
state.ticket_status == "resolved" and state.resolution_code == fixture.expected_resolution_code,
),
_criterion(
"efficient_completion",
"Episode completed efficiently",
weights["efficient_completion"],
state.ticket_status == "resolved"
and state.resolution_code == fixture.expected_resolution_code
and state.steps_taken <= (fixture.efficiency_bonus_max_steps or 0),
),
]
def _score_duplicate_charge(fixture: TaskFixture, state: Any) -> list[ScoreCriterion]:
weights = fixture.rubric_weights
def acknowledges_and_apologizes(reply: str) -> bool:
return contains_all_groups(
reply,
[
fixture.reply_keyword_groups["duplicate_ack"],
fixture.reply_keyword_groups["regret"],
fixture.reply_keyword_groups["refund_confirmed"],
],
)
return [
_criterion(
"lookup_account",
"Account lookup completed",
weights["lookup_account"],
state.lookup_performed,
),
_criterion(
"searched_kb",
"Duplicate charge policy article retrieved",
weights["searched_kb"],
fixture.relevant_kb_article_id in state.kb_articles_seen,
),
_criterion(
"correct_refund",
"Correct full duplicate-charge refund issued",
weights["correct_refund"],
state.refund_record is not None
and state.refund_record["amount_cents"] == fixture.expected_refund_amount_cents
and state.refund_record["reason_code"] == fixture.refund_reason_code,
),
_criterion(
"reply_mentions_timeline",
"Reply mentions the refund timeline",
weights["reply_mentions_timeline"],
_has_reply_matching(state, lambda text: contains_any(text, fixture.reply_keyword_groups["timeline"])),
),
_criterion(
"reply_acknowledges_and_apologizes",
"Reply acknowledges the duplicate charge, apologizes, and confirms the refund",
weights["reply_acknowledges_and_apologizes"],
_has_reply_matching(state, acknowledges_and_apologizes),
),
_criterion(
"resolved_without_escalation",
"Ticket resolved instead of escalated",
weights["resolved_without_escalation"],
state.ticket_status == "resolved" and state.resolution_code == fixture.expected_resolution_code,
),
]
def _score_enterprise_escalation(fixture: TaskFixture, state: Any) -> list[ScoreCriterion]:
weights = fixture.rubric_weights
escalation_step = state.escalation["step_index"] if state.escalation else None
def careful_reply(reply: str) -> bool:
return (
contains_all_groups(
reply,
[
fixture.reply_keyword_groups["urgency"],
fixture.reply_keyword_groups["escalation"],
],
)
and not contains_any(reply, fixture.forbidden_reply_phrases)
)
reply_before_escalation = any(
escalation_step is None or reply["step_index"] < escalation_step for reply in state.reply_history
)
return [
_criterion(
"lookup_account",
"Account lookup completed",
weights["lookup_account"],
state.lookup_performed,
),
_criterion(
"no_refund_or_policy_action",
"No refund or resolution policy action was applied",
weights["no_refund_or_policy_action"],
state.done and not state.refund_attempted and state.resolution_code is None,
),
_criterion(
"reply_sent_before_escalation",
"A reply was sent before escalation",
weights["reply_sent_before_escalation"],
reply_before_escalation and bool(state.reply_history),
),
_criterion(
"careful_reply",
"Reply acknowledges urgency, mentions escalation, and avoids liability",
weights["careful_reply"],
any(
(escalation_step is None or reply["step_index"] < escalation_step)
and careful_reply(reply["message"])
for reply in state.reply_history
),
),
_criterion(
"correct_escalation",
"Escalation uses the correct queue and priority",
weights["correct_escalation"],
state.escalation is not None
and state.escalation["queue"] == fixture.expected_escalation_queue
and state.escalation["priority"] == fixture.expected_escalation_priority,
),
_criterion(
"not_resolved",
"Ticket was not resolved",
weights["not_resolved"],
state.done and state.resolution_code is None,
),
]