AIHack-ITHelpDesk / server /environment.py
Roopalgn's picture
Strengthen queue benchmark and refresh landing page
1d9d3ee
from __future__ import annotations
import random
import uuid
from typing import Any, Optional
from openenv.core.env_server.interfaces import Environment
from models import (
HelpdeskTicketAction,
HelpdeskTicketObservation,
HelpdeskTicketRecord,
HelpdeskTicketState,
)
from server.grader import grade_action
from server.reward import (
clamp_open_unit_interval,
compute_step_adjustments,
compute_trajectory_adjustments,
)
from server.tasks import get_task_definition, load_dataset
from vocabulary import (
ISSUE_TYPE_TO_ASSIGNMENT_GROUP,
ISSUE_TYPE_TO_RESOLUTION_ACTION,
)
QUEUE_SIZE_RANGE = (3, 5)
BASE_AVAILABLE_TOOLS = (
"lookup_related_ticket",
"lookup_requester_history",
"lookup_internal_routing_note",
"lookup_queue_capacity_forecast",
"lookup_queue_cluster_summary",
)
TASK_AVAILABLE_ACTION_TYPES: dict[int, tuple[str, ...]] = {
1: ("submit", "investigate"),
2: ("submit", "investigate", "request_info", "defer"),
3: ("submit", "investigate", "request_info", "defer", "open_incident"),
}
TASK_AVAILABLE_TOOLS: dict[int, tuple[str, ...]] = {
1: (
"lookup_related_ticket",
"lookup_requester_history",
"lookup_internal_routing_note",
),
2: (
"lookup_related_ticket",
"lookup_requester_history",
"lookup_internal_routing_note",
"lookup_queue_cluster_summary",
),
3: BASE_AVAILABLE_TOOLS,
}
FREE_INVESTIGATIONS_PER_TICKET = 1
EXTRA_INVESTIGATION_COST = 0.04
MAX_EXTRA_INVESTIGATION_PENALTY = 0.25
USEFUL_INVESTIGATION_REWARD = 0.03
USEFUL_REQUEST_INFO_REWARD = 0.025
INCIDENT_OPEN_REWARD = 0.03
REQUEST_INFO_CONTEXT_COMPLETION_BONUS = 0.02
PREMATURE_SUBMIT_PENALTY = 0.22
NONDEFAULT_HIDDEN_CONTEXT_PENALTY = 0.08
CONTEXT_COMPLETION_BONUS = 0.06
TRAJECTORY_CONTEXT_COMPLETION_BONUS = 0.04
PRIORITY_UNDERSHOOT_PENALTY = 0.03
SEVERE_PRIORITY_UNDERSHOOT_PENALTY = 0.07
DANGEROUS_RESOLUTION_PENALTY = 0.05
NONDEFAULT_ROUTING_FOLLOWTHROUGH_BONUS = 0.02
TEAM_CAPACITY_OVERFLOW_PENALTY = 0.08
HIGH_PRIORITY_SLOT_OVERFLOW_PENALTY = 0.06
ESCALATION_SLOT_OVERFLOW_PENALTY = 0.05
PLANNING_SUCCESS_BONUS = 0.05
INCIDENT_SLOT_OVERFLOW_PENALTY = 0.05
INCIDENT_GAP_PENALTY = 0.07
SLA_BREACH_PENALTY = 0.04
FOLLOW_UP_SPAWN_THRESHOLD = 0.72
MAX_DEFERS_PER_TICKET = 1
CLUSTER_STABILIZE_SCORE_THRESHOLD = 0.84
CLUSTER_DESTABILIZE_SCORE_THRESHOLD = 0.72
CLUSTER_INCIDENT_RELIEF_MULTIPLIER = 0.94
CLUSTER_OWNER_RELIEF_MULTIPLIER = 0.86
TASK_QUEUE_MANAGEMENT_WEIGHT: dict[int, float] = {
1: 0.0,
2: 0.2,
3: 0.32,
}
TASK3_INVESTIGATION_TOOL_PLAN: dict[str, tuple[str, ...]] = {
"ticket-021": ("lookup_related_ticket", "lookup_requester_history"),
"ticket-022": ("lookup_internal_routing_note",),
"ticket-027": ("lookup_internal_routing_note",),
"ticket-029": ("lookup_internal_routing_note",),
"ticket-038": ("lookup_related_ticket", "lookup_requester_history"),
"ticket-045": ("lookup_related_ticket", "lookup_requester_history"),
"TKT-NONDEFAULT-001": ("lookup_internal_routing_note",),
"TKT-NONDEFAULT-002": ("lookup_internal_routing_note",),
"TKT-NONDEFAULT-003": ("lookup_internal_routing_note",),
}
HARD_TASK_DESCRIPTION_REDACTIONS: dict[str, str] = {
"ticket-021": (
"Production checkout is still unstable after a recent fix. "
"Additional routing context is available via investigation."
),
"ticket-022": (
"Usage charges increased while the integration was failing. "
"Additional routing context is available via investigation."
),
"ticket-027": (
"A vendor offer arrived with a near-term deadline. "
"Additional routing context is available via investigation."
),
"ticket-029": (
"A team needs a large seat expansion right away. "
"Additional routing context is available via investigation."
),
"ticket-038": (
"A prior invoice discrepancy is still unresolved and now time-sensitive. "
"Additional routing context is available via investigation."
),
"ticket-045": (
"A company-wide suspension remains unresolved after repeated follow-ups. "
"Additional routing context is available via investigation."
),
"TKT-NONDEFAULT-001": (
"A user needs help with a billing-style question. "
"Additional routing context is available via investigation."
),
"TKT-NONDEFAULT-002": (
"A client compliance scan surfaced a product-specific issue. "
"Additional routing context is available via investigation."
),
"TKT-NONDEFAULT-003": (
"A contractor onboarding workflow is blocked by an account problem. "
"Additional routing context is available via investigation."
),
}
HARD_TASK_TITLE_REDACTIONS: dict[str, str] = {
"ticket-021": "Production workflow regression",
"ticket-022": "Time-sensitive account review",
"ticket-027": "Commercial workflow request",
"ticket-029": "Urgent expansion request",
"ticket-038": "Repeated invoice follow-up",
"ticket-045": "Company-wide account issue",
"TKT-NONDEFAULT-001": "Billing-style routing question",
"TKT-NONDEFAULT-002": "Compliance ownership question",
"TKT-NONDEFAULT-003": "Workflow blocker with hidden owner",
}
def _coerce_optional_int(value: Any, field_name: str) -> Optional[int]:
if value is None or value == "":
return None
if isinstance(value, bool):
raise ValueError(f"{field_name} must be an integer")
if isinstance(value, int):
return value
try:
return int(value)
except (TypeError, ValueError) as exc:
raise ValueError(f"{field_name} must be an integer") from exc
class HelpdeskTicketRoutingEnvironment(
Environment[HelpdeskTicketAction, HelpdeskTicketObservation, HelpdeskTicketState]
):
SUPPORTS_CONCURRENT_SESSIONS = True
def __init__(self) -> None:
super().__init__()
self._dataset = load_dataset()
self._tickets_by_id = {ticket.ticket_id: ticket for ticket in self._dataset}
self._rng = random.Random()
self._queue: list[HelpdeskTicketRecord] = []
self._state = HelpdeskTicketState()
# ------------------------------------------------------------------
# OpenEnv required interface
# ------------------------------------------------------------------
def reset(
self,
seed: Optional[int] = None,
episode_id: Optional[str] = None,
**kwargs: Any,
) -> HelpdeskTicketObservation:
normalized_seed = _coerce_optional_int(seed, "seed")
task_id_value = _coerce_optional_int(kwargs.get("task_id", 1), "task_id")
queue_size_value = _coerce_optional_int(kwargs.get("queue_size"), "queue_size")
task_id = 1 if task_id_value is None else task_id_value
task = get_task_definition(task_id)
if queue_size_value is not None and queue_size_value < 1:
raise ValueError("queue_size must be >= 1")
if normalized_seed is not None:
self._rng.seed(normalized_seed)
if queue_size_value is None:
queue_size = self._rng.randint(*QUEUE_SIZE_RANGE)
else:
queue_size = min(queue_size_value, len(self._dataset))
self._queue = self._sample_queue(task_id, min(queue_size, len(self._dataset)))
(
team_capacity_initial,
high_priority_slots_initial,
escalation_slots_initial,
incident_slots_initial,
) = self._initial_capacity_state_for_queue(task_id)
self._state = HelpdeskTicketState(
episode_id=episode_id or str(uuid.uuid4()),
step_count=0,
current_task_id=task_id,
seed=normalized_seed,
queue_ticket_ids=[t.ticket_id for t in self._queue],
current_ticket_index=0,
per_ticket_scores=[],
total_reward=0.0,
average_score_so_far=0.0,
investigation_budget_remaining=queue_size * FREE_INVESTIGATIONS_PER_TICKET,
investigation_penalty_applied=0.0,
planning_penalty_applied=0.0,
last_reward_components={},
ticket_tool_usage={},
team_capacity_initial=team_capacity_initial,
team_capacity_remaining=dict(team_capacity_initial),
high_priority_slots_initial=high_priority_slots_initial,
high_priority_slots_remaining=high_priority_slots_initial,
escalation_slots_initial=escalation_slots_initial,
escalation_slots_remaining=escalation_slots_initial,
incident_slots_initial=incident_slots_initial,
incident_slots_remaining=incident_slots_initial,
planning_penalty_total=0.0,
capacity_pressure_tickets_resolved=0,
cluster_stabilizations_total=0,
cluster_destabilizations_total=0,
ticket_request_info_usage={},
ticket_defer_counts={},
open_incident_ticket_ids=[],
incident_actions_used=0,
incident_gap_total=0.0,
deferred_ticket_count=0,
sla_breach_count=0,
spawned_follow_up_ticket_ids=[],
spawned_follow_up_source_ids=[],
dynamic_queue_events=[],
queue_management_score=0.0,
queue_management_breakdown={},
)
return self._build_observation(task)
def step(
self,
action: HelpdeskTicketAction,
timeout_s: Optional[float] = None,
**kwargs: Any,
) -> HelpdeskTicketObservation:
if not self._queue or self._state.current_task_id is None:
raise RuntimeError("Environment has not been reset.")
idx = self._state.current_ticket_index
if idx >= len(self._queue):
raise RuntimeError("Episode already done — call reset().")
current_ticket = self._queue[idx]
task_id = self._state.current_task_id
task = get_task_definition(task_id)
if action.action_type not in self._available_action_types_for_task(task_id):
raise ValueError(
f"Unsupported action_type {action.action_type!r} for task {task_id}"
)
if action.action_type == "investigate":
return self._handle_investigation_action(task, current_ticket, action, idx)
if action.action_type == "request_info":
return self._handle_request_info_action(task, current_ticket, action, idx)
if action.action_type == "defer":
return self._handle_defer_action(task, current_ticket, action, idx)
if action.action_type == "open_incident":
return self._handle_open_incident_action(task, current_ticket, action, idx)
submitted_fields = {
f
for f, v in action.model_dump(exclude_none=True).items()
if v is not None
and f not in {"action_type", "tool_name", "tool_target_ticket_id", "metadata"}
}
allowed = set(task["allowed_fields"])
extra_fields = submitted_fields - allowed
if extra_fields:
# Penalty: record an open-interval score, advance index, return penalty observation
invalid_score = clamp_open_unit_interval(0.0)
self._state.per_ticket_scores.append(invalid_score)
self._state.average_score_so_far = self._current_average_score()
self._state.step_count += 1
self._state.current_ticket_index += 1
is_done = self._state.current_ticket_index >= len(self._queue)
self._state.done = is_done
trajectory_reward = None
trajectory_components = None
investigation_penalty = self._compute_episode_penalty() if is_done else 0.0
rubric_details: dict[str, Any] = {}
if is_done:
trajectory_components = compute_trajectory_adjustments(
self._state.per_ticket_scores,
len(self._queue),
self._state.step_count,
completion_bonus=self._trajectory_consistency_bonus(),
)
trajectory_reward = trajectory_components["final_reward"]
final_reward, rubric_details = self._finalize_terminal_rubric(
trajectory_reward
)
self._state.total_reward = final_reward
else:
final_reward = clamp_open_unit_interval(0.0)
reward_components = self._build_reward_components(
ticket_score=invalid_score,
field_breakdown={},
shaped_step_reward=0.0,
reward_kind="trajectory" if is_done else "step_penalty",
final_reward=final_reward,
trajectory_reward=trajectory_reward,
investigation_penalty=investigation_penalty,
penalty_reason=f"extra_fields: {sorted(extra_fields)}",
extra_details={
"trajectory_average_reward": (
trajectory_components["average_reward"]
if trajectory_components is not None
else None
),
"trajectory_completion_bonus": (
trajectory_components["completion_bonus"]
if trajectory_components is not None
else None
),
"trajectory_consistency_bonus": (
trajectory_components["consistency_bonus"]
if trajectory_components is not None
else None
),
**rubric_details,
},
)
self._state.history_entries.append(
self._build_history_entry(
current_ticket,
predicted=action.model_dump(exclude_none=True),
score=invalid_score,
breakdown={},
queue_position=idx + 1,
reward=final_reward,
rubric_reward=final_reward if is_done else None,
reward_kind="trajectory" if is_done else "step_penalty",
penalty_reason=f"extra_fields: {sorted(extra_fields)}",
reward_components=reward_components,
)
)
self._state.last_step_reward = final_reward
self._state.reward = final_reward
self._state.investigation_penalty_applied = self._compute_episode_penalty()
self._state.last_tool_result = None
self._state.last_reward_components = reward_components
return self._build_observation(
task,
done=is_done,
reward=final_reward,
rubric_reward=final_reward if is_done else None,
)
previous_average = self._current_average_score()
score, breakdown = grade_action(action, current_ticket, task_id)
context_penalty, missing_required_count = self._submit_context_penalty(current_ticket)
process_bonus = self._context_completion_bonus(
current_ticket,
missing_required_count=missing_required_count,
score=score,
)
risk_penalty = self._operational_risk_penalty(
current_ticket,
action,
task_id=task_id,
)
incident_gap_penalty = self._incident_gap_penalty(current_ticket, action)
capacity_penalty, capacity_details = self._apply_capacity_usage(
current_ticket,
action,
)
step_adjustments = compute_step_adjustments(
score,
previous_average=previous_average,
process_bonus=process_bonus,
risk_penalty=risk_penalty,
)
step_reward = step_adjustments["final_reward"]
is_done = (self._state.current_ticket_index + 1) >= len(self._queue)
trajectory_reward = None
trajectory_components = None
investigation_penalty = 0.0
rubric_reward = None
rubric_details: dict[str, Any] = {}
if is_done:
self._state.per_ticket_scores.append(score)
self._state.average_score_so_far = self._current_average_score()
self._state.step_count += 1
self._state.current_ticket_index += 1
trajectory_components = compute_trajectory_adjustments(
self._state.per_ticket_scores,
len(self._queue),
self._state.step_count,
completion_bonus=(
self._trajectory_consistency_bonus() + self._planning_success_bonus()
),
)
trajectory_reward = trajectory_components["final_reward"]
rubric_reward, rubric_details = self._finalize_terminal_rubric(
trajectory_reward
)
final_reward = clamp_open_unit_interval(
rubric_reward - context_penalty - capacity_penalty - incident_gap_penalty
)
self._state.total_reward = rubric_reward
investigation_penalty = self._compute_episode_penalty()
else:
self._state.per_ticket_scores.append(score)
self._state.average_score_so_far = self._current_average_score()
self._state.step_count += 1
self._state.current_ticket_index += 1
final_reward = clamp_open_unit_interval(
step_reward - context_penalty - capacity_penalty - incident_gap_penalty
)
spawned_follow_up_ticket_id = None
if self._should_spawn_follow_up(
current_ticket,
score=score,
context_penalty=context_penalty,
incident_gap_penalty=incident_gap_penalty,
):
spawned_follow_up = self._spawn_follow_up_ticket(current_ticket)
spawned_follow_up_ticket_id = spawned_follow_up.ticket_id
if is_done:
is_done = False
trajectory_reward = None
trajectory_components = None
rubric_reward = None
rubric_details = {}
final_reward = clamp_open_unit_interval(
step_reward - context_penalty - capacity_penalty - incident_gap_penalty
)
self._state.total_reward = 0.0
self._state.queue_management_score = 0.0
self._state.queue_management_breakdown = {}
if incident_gap_penalty > 0.0:
self._state.incident_gap_total = round(
self._state.incident_gap_total + incident_gap_penalty,
4,
)
cluster_stabilized_ticket_ids = self._stabilize_future_cluster_tickets(
current_ticket,
score=score,
context_penalty=context_penalty,
incident_gap_penalty=incident_gap_penalty,
)
cluster_destabilized_ticket_ids: list[str] = []
if not cluster_stabilized_ticket_ids:
cluster_destabilized_ticket_ids = self._destabilize_future_cluster_tickets(
current_ticket,
score=score,
context_penalty=context_penalty,
incident_gap_penalty=incident_gap_penalty,
)
reward_components = self._build_reward_components(
ticket_score=score,
field_breakdown=breakdown,
shaped_step_reward=step_reward,
reward_kind="trajectory" if is_done else "step",
final_reward=final_reward,
milestone_adjustment=step_adjustments["milestone_adjustment"],
trajectory_reward=trajectory_reward,
investigation_penalty=investigation_penalty,
extra_details={
"context_gap_penalty": context_penalty,
"context_completion_bonus": process_bonus,
"risk_penalty": risk_penalty,
"incident_gap_penalty": incident_gap_penalty,
"capacity_penalty": capacity_penalty,
"delta_adjustment": step_adjustments["delta_adjustment"],
"required_investigation_count": len(self._required_tools_for_ticket(current_ticket)),
"hidden_context_remaining_count": missing_required_count,
"hidden_context_revealed_count": len(
self._used_tools_for_ticket(current_ticket.ticket_id)
),
"planning_penalty_total": self._state.planning_penalty_total,
"planning_penalty_applied": self._state.planning_penalty_applied,
"planning_success_bonus": self._planning_success_bonus()
if is_done
else 0.0,
"spawned_follow_up_ticket_id": spawned_follow_up_ticket_id,
"cluster_stabilized_ticket_ids": cluster_stabilized_ticket_ids,
"cluster_destabilized_ticket_ids": cluster_destabilized_ticket_ids,
"rubric_reward": rubric_reward,
"trajectory_average_reward": (
trajectory_components["average_reward"]
if trajectory_components is not None
else None
),
"trajectory_completion_bonus": (
trajectory_components["completion_bonus"]
if trajectory_components is not None
else None
),
"trajectory_consistency_bonus": (
trajectory_components["consistency_bonus"]
if trajectory_components is not None
else None
),
**rubric_details,
},
)
reward_components.update(capacity_details)
history_entry = self._build_history_entry(
current_ticket,
predicted=action.model_dump(exclude_none=True),
score=score,
breakdown=breakdown,
queue_position=idx + 1,
reward=final_reward,
rubric_reward=rubric_reward if is_done else None,
reward_kind="trajectory" if is_done else "step",
reward_components=reward_components,
)
self._state.history_entries.append(history_entry)
self._state.last_step_reward = final_reward
self._state.reward = final_reward
self._state.done = is_done
self._state.investigation_penalty_applied = self._compute_episode_penalty()
self._state.planning_penalty_applied = capacity_penalty
self._state.last_tool_result = None
self._state.last_reward_components = reward_components
return self._build_observation(
task,
done=is_done,
reward=final_reward,
rubric_reward=rubric_reward if is_done else None,
)
@property
def state(self) -> HelpdeskTicketState:
return self._state.model_copy(deep=True)
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _compute_episode_penalty(self) -> float:
free_investigations = len(self._queue) * FREE_INVESTIGATIONS_PER_TICKET
extra_investigations = max(0, self._state.investigation_steps - free_investigations)
return min(
MAX_EXTRA_INVESTIGATION_PENALTY,
extra_investigations * EXTRA_INVESTIGATION_COST,
)
def _apply_episode_economics(self, base_reward: float) -> float:
penalty = self._compute_episode_penalty()
return clamp_open_unit_interval(base_reward - penalty)
def _current_average_score(self) -> float:
if not self._state.per_ticket_scores:
return 0.0
return sum(self._state.per_ticket_scores) / len(self._state.per_ticket_scores)
def _queue_management_blend_weight(self, task_id: int | None = None) -> float:
resolved_task_id = self._state.current_task_id if task_id is None else task_id
return TASK_QUEUE_MANAGEMENT_WEIGHT.get(int(resolved_task_id or 1), 0.0)
def _context_resolution_score(self) -> float:
hidden_context_tickets = [
ticket
for ticket in self._queue
if self._required_tools_for_ticket(ticket, self._state.current_task_id)
]
if not hidden_context_tickets:
return 1.0
total_required = 0
total_resolved = 0
for ticket in hidden_context_tickets:
progress = self._tool_progress_for_ticket(ticket)
total_required += max(1, len(progress["required_tools"]))
total_resolved += max(
0,
len(progress["required_tools"]) - len(progress["remaining_tools"]),
)
return round(
max(0.0, min(1.0, total_resolved / max(1, total_required))),
4,
)
def _follow_up_containment_score(self) -> float:
follow_up_risk_tickets = [
ticket
for ticket in self._queue
if ticket.generated_from_ticket_id is None
and (
self._requires_incident(ticket)
or self._ticket_mentions_follow_up(ticket)
or ticket.related_ticket_id is not None
or ticket.priority in {"high", "critical"}
)
]
if not follow_up_risk_tickets:
return 1.0
spawn_rate = len(self._state.spawned_follow_up_ticket_ids) / max(
1,
len(follow_up_risk_tickets),
)
generated_follow_up_scores = [
float(entry.get("score", 0.0))
for entry in self._state.history_entries
if entry.get("generated_from_ticket_id") is not None
]
recovery_credit = (
sum(generated_follow_up_scores) / len(generated_follow_up_scores)
if generated_follow_up_scores
else 0.0
)
score = (1.0 - min(1.0, 0.7 * spawn_rate)) + (
min(1.0, spawn_rate) * 0.3 * recovery_credit
)
return round(max(0.0, min(1.0, score)), 4)
def _incident_management_score(self) -> float:
if (self._state.current_task_id or 1) < 3:
return 1.0
incident_sensitive_tickets = [
ticket
for ticket in self._queue
if ticket.generated_from_ticket_id is None and self._requires_incident(ticket)
]
if not incident_sensitive_tickets:
return 1.0
coverage_ratio = sum(
1 for ticket in incident_sensitive_tickets if self._incident_open_for_ticket(ticket)
) / max(1, len(incident_sensitive_tickets))
gap_ratio = min(
1.0,
self._state.incident_gap_total
/ max(
INCIDENT_GAP_PENALTY,
len(incident_sensitive_tickets) * INCIDENT_GAP_PENALTY,
),
)
score = (0.65 * (1.0 - gap_ratio)) + (0.35 * coverage_ratio)
return round(max(0.0, min(1.0, score)), 4)
def _sla_quality_score(self) -> float:
breach_denominator = max(1, self._state.deferred_ticket_count or len(self._queue))
breach_ratio = min(1.0, self._state.sla_breach_count / breach_denominator)
score = 1.0 - breach_ratio
return round(max(0.0, min(1.0, score)), 4)
def _planning_quality_score(self) -> float:
if (self._state.current_task_id or 1) < 3:
return 1.0
capacity_sensitive_count = sum(
1 for ticket in self._queue if self._ticket_has_alternate_route(ticket)
)
route_coverage = (
min(
1.0,
self._state.capacity_pressure_tickets_resolved / capacity_sensitive_count,
)
if capacity_sensitive_count
else 1.0
)
max_expected_penalty = max(
0.12,
len(self._queue)
* (
TEAM_CAPACITY_OVERFLOW_PENALTY
+ HIGH_PRIORITY_SLOT_OVERFLOW_PENALTY
+ ESCALATION_SLOT_OVERFLOW_PENALTY
),
)
penalty_score = 1.0 - min(
1.0,
self._state.planning_penalty_total / max_expected_penalty,
)
score = (0.6 * penalty_score) + (0.4 * route_coverage)
return round(max(0.0, min(1.0, score)), 4)
def _cluster_coordination_score(self) -> float:
if (self._state.current_task_id or 1) < 2:
return 1.0
clustered_tickets = [
ticket
for ticket in self._queue
if ticket.service_cluster_id
or ticket.related_ticket_id is not None
or ticket.generated_from_ticket_id is not None
or self._ticket_repeated_requester_count(ticket) >= 2
]
if not clustered_tickets:
return 1.0
cluster_count = max(1, len(clustered_tickets))
destabilization_ratio = min(
1.0,
self._state.cluster_destabilizations_total / cluster_count,
)
stabilization_ratio = min(
1.0,
self._state.cluster_stabilizations_total / cluster_count,
)
score = 1.0 - (0.75 * destabilization_ratio) + (0.25 * stabilization_ratio)
return round(max(0.0, min(1.0, score)), 4)
def _queue_management_breakdown(self, trajectory_reward: float) -> tuple[float, dict[str, Any]]:
task_id = int(self._state.current_task_id or 1)
if task_id < 2:
proxy_score = round(clamp_open_unit_interval(trajectory_reward), 4)
return proxy_score, {"routing_trajectory_proxy": proxy_score}
component_scores: dict[str, float] = {
"context_resolution": self._context_resolution_score(),
"cluster_coordination": self._cluster_coordination_score(),
"follow_up_containment": self._follow_up_containment_score(),
"sla_management": self._sla_quality_score(),
}
if task_id >= 3:
component_scores["planning_quality"] = self._planning_quality_score()
component_scores["incident_management"] = self._incident_management_score()
component_weights = {
"context_resolution": 0.2,
"planning_quality": 0.24,
"incident_management": 0.2,
"cluster_coordination": 0.16,
"follow_up_containment": 0.12,
"sla_management": 0.08,
}
else:
component_weights = {
"context_resolution": 0.38,
"cluster_coordination": 0.26,
"follow_up_containment": 0.2,
"sla_management": 0.16,
}
aggregate_score = round(
sum(
component_scores[name] * weight
for name, weight in component_weights.items()
),
4,
)
breakdown: dict[str, Any] = {
name: round(score, 4) for name, score in component_scores.items()
}
breakdown["weights"] = {
name: round(weight, 4) for name, weight in component_weights.items()
}
breakdown["cluster_stabilizations_total"] = self._state.cluster_stabilizations_total
breakdown["cluster_destabilizations_total"] = self._state.cluster_destabilizations_total
breakdown["spawned_follow_up_count"] = len(self._state.spawned_follow_up_ticket_ids)
breakdown["sla_breach_count"] = self._state.sla_breach_count
breakdown["planning_penalty_total"] = round(self._state.planning_penalty_total, 4)
breakdown["incident_gap_total"] = round(self._state.incident_gap_total, 4)
breakdown["aggregate"] = aggregate_score
return aggregate_score, breakdown
def _finalize_terminal_rubric(
self,
trajectory_reward: float,
) -> tuple[float, dict[str, Any]]:
task_id = int(self._state.current_task_id or 1)
queue_management_score, queue_management_breakdown = self._queue_management_breakdown(
trajectory_reward
)
route_weight = round(1.0 - self._queue_management_blend_weight(task_id), 4)
queue_weight = round(self._queue_management_blend_weight(task_id), 4)
blended_reward = clamp_open_unit_interval(
(route_weight * trajectory_reward) + (queue_weight * queue_management_score)
)
episode_economics_penalty = round(self._compute_episode_penalty(), 4)
rubric_reward = self._apply_episode_economics(blended_reward)
self._state.queue_management_score = queue_management_score
self._state.queue_management_breakdown = dict(queue_management_breakdown)
return rubric_reward, {
"trajectory_routing_reward": trajectory_reward,
"queue_management_score": queue_management_score,
"queue_management_breakdown": dict(queue_management_breakdown),
"route_objective_weight": route_weight,
"queue_management_weight": queue_weight,
"blended_objective_before_economics": blended_reward,
"episode_economics_penalty": episode_economics_penalty,
}
def _available_action_types_for_task(self, task_id: int | None = None) -> list[str]:
resolved_task_id = self._state.current_task_id if task_id is None else task_id
return list(TASK_AVAILABLE_ACTION_TYPES.get(int(resolved_task_id or 1), ("submit",)))
def _available_tools_for_task(self, task_id: int | None = None) -> list[str]:
resolved_task_id = self._state.current_task_id if task_id is None else task_id
return list(TASK_AVAILABLE_TOOLS.get(int(resolved_task_id or 1), ()))
def _sync_queue_ticket_ids(self) -> None:
self._state.queue_ticket_ids = [ticket.ticket_id for ticket in self._queue]
def _cluster_sample_groups(self) -> list[list[HelpdeskTicketRecord]]:
groups: dict[str, list[HelpdeskTicketRecord]] = {}
for ticket in self._dataset:
if not ticket.service_cluster_id:
continue
groups.setdefault(ticket.service_cluster_id, []).append(ticket)
return [tickets for tickets in groups.values() if len(tickets) >= 2]
def _cluster_ticket_order_key(self, ticket: HelpdeskTicketRecord) -> tuple[int, int, str]:
priority_rank = {"critical": 0, "high": 1, "medium": 2, "low": 3}
follow_up_depth = 1 if ticket.related_ticket_id or ticket.generated_from_ticket_id else 0
return (
follow_up_depth,
priority_rank.get(ticket.priority, 4),
ticket.ticket_id,
)
def _sample_queue(self, task_id: int, queue_size: int) -> list[HelpdeskTicketRecord]:
if queue_size <= 0:
return []
if task_id not in {2, 3} or queue_size < 3:
return self._rng.sample(self._dataset, queue_size)
cluster_groups = self._cluster_sample_groups()
if not cluster_groups:
return self._rng.sample(self._dataset, queue_size)
chosen_group = self._rng.choice(cluster_groups)
max_cluster_take = min(len(chosen_group), 3 if queue_size >= 4 else 2)
cluster_take = max(2, min(max_cluster_take, queue_size - 1))
cluster_subset = self._rng.sample(chosen_group, cluster_take)
cluster_subset_ids = {ticket.ticket_id for ticket in cluster_subset}
filler_count = max(0, queue_size - len(cluster_subset))
remaining_pool = [
ticket for ticket in self._dataset if ticket.ticket_id not in cluster_subset_ids
]
filler_subset = (
self._rng.sample(remaining_pool, filler_count) if filler_count > 0 else []
)
ordered_cluster = sorted(cluster_subset, key=self._cluster_ticket_order_key)
remaining_cluster = ordered_cluster[1:]
ordered_queue: list[HelpdeskTicketRecord] = []
if ordered_cluster:
ordered_queue.append(ordered_cluster[0])
while filler_subset or remaining_cluster:
if filler_subset:
ordered_queue.append(filler_subset.pop(0))
if remaining_cluster:
ordered_queue.append(remaining_cluster.pop(0))
return ordered_queue[:queue_size]
def _cluster_keys_for_ticket(self, ticket: HelpdeskTicketRecord) -> set[str]:
keys: set[str] = set()
if ticket.service_cluster_id:
keys.add(f"cluster:{ticket.service_cluster_id}")
if ticket.related_ticket_id:
keys.add(f"ticket:{ticket.related_ticket_id}")
if ticket.generated_from_ticket_id:
keys.add(f"ticket:{ticket.generated_from_ticket_id}")
if any(
candidate.ticket_id != ticket.ticket_id
and (
candidate.related_ticket_id == ticket.ticket_id
or candidate.generated_from_ticket_id == ticket.ticket_id
)
for candidate in self._tickets_by_id.values()
):
keys.add(f"ticket:{ticket.ticket_id}")
if self._ticket_repeated_requester_count(ticket) >= 2:
keys.add(f"requester:{ticket.requester}")
return keys
def _tickets_share_cluster(
self,
first: HelpdeskTicketRecord,
second: HelpdeskTicketRecord,
) -> bool:
if first.ticket_id == second.ticket_id:
return False
return bool(self._cluster_keys_for_ticket(first) & self._cluster_keys_for_ticket(second))
def _future_cluster_ticket_indexes(
self,
ticket: HelpdeskTicketRecord,
*,
start_index: int,
) -> list[int]:
indexes: list[int] = []
for index in range(start_index, len(self._queue)):
future_ticket = self._queue[index]
if self._tickets_share_cluster(ticket, future_ticket):
indexes.append(index)
return indexes
def _ticket_queue_index(self, ticket: HelpdeskTicketRecord) -> int | None:
for index, candidate in enumerate(self._queue):
if candidate.ticket_id == ticket.ticket_id:
return index
return None
def _cluster_summary(
self,
ticket: HelpdeskTicketRecord,
*,
start_index: int | None = None,
) -> dict[str, Any]:
if start_index is None:
ticket_index = self._ticket_queue_index(ticket)
effective_start = (
ticket_index + 1
if ticket_index is not None
else self._state.current_ticket_index + 1
)
else:
effective_start = start_index
future_indexes = self._future_cluster_ticket_indexes(
ticket,
start_index=effective_start,
)
future_tickets = [self._queue[index] for index in future_indexes]
return {
"service_cluster_id": ticket.service_cluster_id,
"cluster_keys": sorted(self._cluster_keys_for_ticket(ticket)),
"future_cluster_ticket_count": len(future_tickets),
"future_cluster_ticket_ids": [candidate.ticket_id for candidate in future_tickets],
"future_high_priority_count": sum(
1 for candidate in future_tickets if candidate.priority in {"high", "critical"}
),
"shared_requester_count": self._ticket_repeated_requester_count(ticket),
"active_incident_cover": self._incident_open_for_ticket(ticket),
}
def _append_note(self, existing_note: str | None, addition: str | None) -> str | None:
if not addition:
return existing_note
if not existing_note:
return addition
if addition in existing_note:
return existing_note
return f"{existing_note} {addition}"
def _replace_queue_ticket(
self,
index: int,
updated_ticket: HelpdeskTicketRecord,
) -> None:
self._queue[index] = updated_ticket
self._tickets_by_id[updated_ticket.ticket_id] = updated_ticket
def _stabilize_future_cluster_tickets(
self,
current_ticket: HelpdeskTicketRecord,
*,
score: float,
context_penalty: float,
incident_gap_penalty: float,
) -> list[str]:
if (self._state.current_task_id or 1) < 2:
return []
if score < CLUSTER_STABILIZE_SCORE_THRESHOLD:
return []
if context_penalty > 0.0 or incident_gap_penalty > 0.0:
return []
future_indexes = self._future_cluster_ticket_indexes(
current_ticket,
start_index=self._state.current_ticket_index,
)
if not future_indexes:
return []
incident_cover = self._incident_open_for_ticket(current_ticket)
relief_multiplier = (
CLUSTER_INCIDENT_RELIEF_MULTIPLIER
if incident_cover
else CLUSTER_OWNER_RELIEF_MULTIPLIER
)
planning_note = (
"An earlier incident bridge is already active for this request cluster, so later "
"updates can be acknowledged and coordinated instead of being re-triaged from scratch."
if incident_cover
else "An earlier ticket in this request cluster already has an accountable owner, "
"so later updates can be coordinated rather than fully re-triaged."
)
customer_note = (
"The requester said a single coordinated owner is acceptable as long as the update is linked to the existing workstream."
)
updated_ticket_ids: list[str] = []
for index in future_indexes:
future_ticket = self._queue[index]
updates: dict[str, Any] = {
"planning_note": self._append_note(future_ticket.planning_note, planning_note),
"customer_update_note": self._append_note(
future_ticket.customer_update_note,
customer_note,
),
}
if (
not self._ticket_has_alternate_route(future_ticket)
or future_ticket.alternate_route_score_multiplier < relief_multiplier
):
alternate_priority = (
"high"
if incident_cover and future_ticket.priority == "critical"
else "medium"
if incident_cover and future_ticket.priority == "high"
else future_ticket.alternate_priority or future_ticket.priority
)
updates.update(
{
"alternate_issue_type": (
future_ticket.alternate_issue_type or future_ticket.issue_type
),
"alternate_priority": alternate_priority,
"alternate_assignment_group": "service_desk",
"alternate_resolution_action": (
"acknowledge" if incident_cover else "assign"
),
"alternate_route_score_multiplier": relief_multiplier,
}
)
updated_ticket = future_ticket.model_copy(update=updates)
self._replace_queue_ticket(index, updated_ticket)
updated_ticket_ids.append(updated_ticket.ticket_id)
if updated_ticket_ids:
self._state.cluster_stabilizations_total += len(updated_ticket_ids)
self._record_dynamic_queue_event(
"stabilize_cluster",
source_ticket_id=current_ticket.ticket_id,
affected_ticket_ids=updated_ticket_ids,
incident_cover=incident_cover,
)
return updated_ticket_ids
def _destabilize_future_cluster_tickets(
self,
current_ticket: HelpdeskTicketRecord,
*,
score: float,
context_penalty: float,
incident_gap_penalty: float,
) -> list[str]:
if (self._state.current_task_id or 1) < 2:
return []
if score >= CLUSTER_DESTABILIZE_SCORE_THRESHOLD:
if context_penalty <= 0.0 and incident_gap_penalty <= 0.0:
return []
future_indexes = self._future_cluster_ticket_indexes(
current_ticket,
start_index=self._state.current_ticket_index,
)
if not future_indexes:
return []
planning_note = (
"Earlier handling in this request cluster did not settle ownership, so this follow-on "
"arrives with more urgency and may need firmer coordination."
)
customer_note = (
"The requester is escalating because the earlier response did not fully resolve the blocker."
)
updated_ticket_ids: list[str] = []
for index in future_indexes:
future_ticket = self._queue[index]
updates: dict[str, Any] = {
"priority": self._escalate_priority_level(future_ticket.priority),
"planning_note": self._append_note(future_ticket.planning_note, planning_note),
"customer_update_note": self._append_note(
future_ticket.customer_update_note,
customer_note,
),
"incident_recommended": (
future_ticket.incident_recommended
or current_ticket.priority in {"high", "critical"}
or self._requires_incident(current_ticket)
),
}
if future_ticket.related_ticket_id is None:
updates["related_ticket_id"] = current_ticket.ticket_id
updated_ticket = future_ticket.model_copy(update=updates)
self._replace_queue_ticket(index, updated_ticket)
updated_ticket_ids.append(updated_ticket.ticket_id)
if updated_ticket_ids:
self._state.cluster_destabilizations_total += len(updated_ticket_ids)
self._record_dynamic_queue_event(
"destabilize_cluster",
source_ticket_id=current_ticket.ticket_id,
affected_ticket_ids=updated_ticket_ids,
)
return updated_ticket_ids
def _ticket_has_alternate_route(self, ticket: HelpdeskTicketRecord) -> bool:
return any(
value is not None
for value in (
ticket.alternate_issue_type,
ticket.alternate_priority,
ticket.alternate_assignment_group,
ticket.alternate_resolution_action,
)
) and ticket.alternate_route_score_multiplier > 0.0
def _route_for_ticket(
self,
ticket: HelpdeskTicketRecord,
*,
use_alternate: bool = False,
) -> dict[str, str]:
if use_alternate and self._ticket_has_alternate_route(ticket):
return {
"issue_type": ticket.alternate_issue_type or ticket.issue_type,
"priority": ticket.alternate_priority or ticket.priority,
"assignment_group": (
ticket.alternate_assignment_group or ticket.assignment_group
),
"resolution_action": (
ticket.alternate_resolution_action or ticket.resolution_action
),
}
return {
"issue_type": ticket.issue_type,
"priority": ticket.priority,
"assignment_group": ticket.assignment_group,
"resolution_action": ticket.resolution_action,
}
def _route_for_action(
self,
ticket: HelpdeskTicketRecord,
action: HelpdeskTicketAction,
) -> dict[str, str]:
primary_route = self._route_for_ticket(ticket)
return {
"issue_type": action.issue_type or primary_route["issue_type"],
"priority": action.priority or primary_route["priority"],
"assignment_group": (
action.assignment_group or primary_route["assignment_group"]
),
"resolution_action": (
action.resolution_action or primary_route["resolution_action"]
),
}
def _route_capacity_cost(self, route: dict[str, str]) -> dict[str, Any]:
return {
"assignment_group": route["assignment_group"],
"team_slots": 1,
"high_priority_slots": 1
if route["priority"] in {"high", "critical"}
else 0,
"escalation_slots": 1
if route["resolution_action"] in {"assign", "escalate"}
else 0,
}
def _routing_options_for_ticket(self, ticket: HelpdeskTicketRecord) -> list[dict[str, Any]]:
options = [
{
"label": "primary",
"score_multiplier": 1.0,
**self._route_for_ticket(ticket),
"capacity_cost": self._route_capacity_cost(self._route_for_ticket(ticket)),
}
]
if self._ticket_has_alternate_route(ticket):
alternate_route = self._route_for_ticket(ticket, use_alternate=True)
options.append(
{
"label": "alternate",
"score_multiplier": ticket.alternate_route_score_multiplier,
**alternate_route,
"capacity_cost": self._route_capacity_cost(alternate_route),
}
)
return options
def _initial_capacity_state_for_queue(
self,
task_id: int,
) -> tuple[dict[str, int], int, int, int]:
if task_id != 3:
return {}, 0, 0, 0
primary_group_demand: dict[str, int] = {}
alternate_relief_by_group: dict[str, int] = {}
all_groups: set[str] = set()
high_priority_demand = 0
high_priority_relief = 0
escalation_demand = 0
escalation_relief = 0
incident_demand = 0
for ticket in self._queue:
primary_route = self._route_for_ticket(ticket)
all_groups.add(primary_route["assignment_group"])
primary_group_demand[primary_route["assignment_group"]] = (
primary_group_demand.get(primary_route["assignment_group"], 0) + 1
)
if primary_route["priority"] in {"high", "critical"}:
high_priority_demand += 1
if primary_route["resolution_action"] in {"assign", "escalate"}:
escalation_demand += 1
if self._requires_incident(ticket):
incident_demand += 1
if self._ticket_has_alternate_route(ticket):
alternate_route = self._route_for_ticket(ticket, use_alternate=True)
all_groups.add(alternate_route["assignment_group"])
if alternate_route["assignment_group"] != primary_route["assignment_group"]:
alternate_relief_by_group[primary_route["assignment_group"]] = (
alternate_relief_by_group.get(
primary_route["assignment_group"],
0,
)
+ 1
)
if (
primary_route["priority"] in {"high", "critical"}
and alternate_route["priority"] not in {"high", "critical"}
):
high_priority_relief += 1
if (
primary_route["resolution_action"] in {"assign", "escalate"}
and alternate_route["resolution_action"] not in {"assign", "escalate"}
):
escalation_relief += 1
team_capacity_initial: dict[str, int] = {}
for group in sorted(all_groups):
demand = primary_group_demand.get(group, 0)
relief = alternate_relief_by_group.get(group, 0)
if demand <= 1:
team_capacity_initial[group] = 1 if group in all_groups else 0
elif relief > 0:
team_capacity_initial[group] = max(1, demand - 1)
else:
team_capacity_initial[group] = demand
if high_priority_demand <= 1:
high_priority_slots_initial = high_priority_demand
elif high_priority_relief > 0:
high_priority_slots_initial = max(1, high_priority_demand - 1)
else:
high_priority_slots_initial = high_priority_demand
if escalation_demand <= 1:
escalation_slots_initial = escalation_demand
elif escalation_relief > 0:
escalation_slots_initial = max(1, escalation_demand - 1)
else:
escalation_slots_initial = escalation_demand
if incident_demand <= 1:
incident_slots_initial = incident_demand
else:
incident_slots_initial = max(1, incident_demand - 1)
return (
team_capacity_initial,
high_priority_slots_initial,
escalation_slots_initial,
incident_slots_initial,
)
def _future_queue_demand(self) -> dict[str, Any]:
future_tickets = self._queue[self._state.current_ticket_index + 1 :]
team_demand: dict[str, int] = {}
high_priority_needed = 0
escalation_needed = 0
capacity_sensitive_tickets = 0
incident_needed = 0
clustered_follow_ons = 0
for ticket in future_tickets:
route = self._route_for_ticket(ticket)
team_demand[route["assignment_group"]] = (
team_demand.get(route["assignment_group"], 0) + 1
)
if route["priority"] in {"high", "critical"}:
high_priority_needed += 1
if route["resolution_action"] in {"assign", "escalate"}:
escalation_needed += 1
if self._ticket_has_alternate_route(ticket):
capacity_sensitive_tickets += 1
if self._requires_incident(ticket):
incident_needed += 1
if self._cluster_keys_for_ticket(ticket):
clustered_follow_ons += 1
return {
"remaining_ticket_count": len(future_tickets),
"team_demand": team_demand,
"high_priority_needed": high_priority_needed,
"escalation_needed": escalation_needed,
"capacity_sensitive_tickets": capacity_sensitive_tickets,
"incident_needed": incident_needed,
"clustered_follow_ons": clustered_follow_ons,
}
def _capacity_state_snapshot(self) -> dict[str, Any]:
return {
"team_capacity_remaining": dict(self._state.team_capacity_remaining),
"team_capacity_initial": dict(self._state.team_capacity_initial),
"high_priority_slots_remaining": self._state.high_priority_slots_remaining,
"high_priority_slots_initial": self._state.high_priority_slots_initial,
"escalation_slots_remaining": self._state.escalation_slots_remaining,
"escalation_slots_initial": self._state.escalation_slots_initial,
"incident_slots_remaining": self._state.incident_slots_remaining,
"incident_slots_initial": self._state.incident_slots_initial,
}
def _planning_route_recommendation(self, ticket: HelpdeskTicketRecord) -> dict[str, Any]:
primary_route = self._route_for_ticket(ticket)
alternate_route = (
self._route_for_ticket(ticket, use_alternate=True)
if self._ticket_has_alternate_route(ticket)
else None
)
future_demand = self._future_queue_demand()
capacity_state = self._capacity_state_snapshot()
def pressure_score(route: dict[str, str]) -> int:
cost = self._route_capacity_cost(route)
group_remaining = capacity_state["team_capacity_remaining"].get(
route["assignment_group"],
1,
)
group_pressure = max(
0,
future_demand["team_demand"].get(route["assignment_group"], 0)
+ cost["team_slots"]
- group_remaining,
)
priority_pressure = max(
0,
future_demand["high_priority_needed"] + cost["high_priority_slots"]
- capacity_state["high_priority_slots_remaining"],
)
escalation_pressure = max(
0,
future_demand["escalation_needed"] + cost["escalation_slots"]
- capacity_state["escalation_slots_remaining"],
)
return group_pressure + priority_pressure + escalation_pressure
primary_pressure = pressure_score(primary_route)
alternate_pressure = (
pressure_score(alternate_route) if alternate_route is not None else primary_pressure
)
preferred_label = (
"alternate"
if alternate_route is not None and alternate_pressure < primary_pressure
else "primary"
)
return {
"preferred_label": preferred_label,
"primary_pressure": primary_pressure,
"alternate_pressure": alternate_pressure,
"capacity_state": capacity_state,
"future_demand": future_demand,
}
def _ticket_is_capacity_sensitive(self, ticket: HelpdeskTicketRecord) -> bool:
if self._state.current_task_id != 3 or not self._ticket_has_alternate_route(ticket):
return False
recommendation = self._planning_route_recommendation(ticket)
return recommendation["preferred_label"] == "alternate" or any(
value > 0
for value in (
recommendation["primary_pressure"],
recommendation["alternate_pressure"],
)
)
def _route_matches_alternate(
self,
ticket: HelpdeskTicketRecord,
route: dict[str, str],
) -> bool:
if not self._ticket_has_alternate_route(ticket):
return False
return route == self._route_for_ticket(ticket, use_alternate=True)
def _apply_capacity_usage(
self,
ticket: HelpdeskTicketRecord,
action: HelpdeskTicketAction,
) -> tuple[float, dict[str, Any]]:
if self._state.current_task_id != 3:
return 0.0, {}
route = self._route_for_action(ticket, action)
capacity_cost = self._route_capacity_cost(route)
group = str(capacity_cost["assignment_group"])
if group not in self._state.team_capacity_remaining:
self._state.team_capacity_remaining[group] = 1
self._state.team_capacity_initial.setdefault(group, 1)
group_remaining = self._state.team_capacity_remaining[group]
group_overflow = max(0, int(capacity_cost["team_slots"]) - group_remaining)
self._state.team_capacity_remaining[group] = max(
0,
group_remaining - int(capacity_cost["team_slots"]),
)
high_priority_cost = int(capacity_cost["high_priority_slots"])
high_priority_overflow = max(
0,
high_priority_cost - self._state.high_priority_slots_remaining,
)
self._state.high_priority_slots_remaining = max(
0,
self._state.high_priority_slots_remaining - high_priority_cost,
)
escalation_cost = int(capacity_cost["escalation_slots"])
escalation_overflow = max(
0,
escalation_cost - self._state.escalation_slots_remaining,
)
self._state.escalation_slots_remaining = max(
0,
self._state.escalation_slots_remaining - escalation_cost,
)
capacity_penalty = round(
group_overflow * TEAM_CAPACITY_OVERFLOW_PENALTY
+ high_priority_overflow * HIGH_PRIORITY_SLOT_OVERFLOW_PENALTY
+ escalation_overflow * ESCALATION_SLOT_OVERFLOW_PENALTY,
4,
)
self._state.planning_penalty_total = round(
self._state.planning_penalty_total + capacity_penalty,
4,
)
self._state.planning_penalty_applied = capacity_penalty
used_alternate_route = self._route_matches_alternate(ticket, route)
if used_alternate_route:
self._state.capacity_pressure_tickets_resolved += 1
return capacity_penalty, {
"capacity_cost": capacity_cost,
"group_overflow": group_overflow,
"high_priority_overflow": high_priority_overflow,
"escalation_overflow": escalation_overflow,
"used_alternate_route": used_alternate_route,
"capacity_state_after_action": self._capacity_state_snapshot(),
}
def _planning_success_bonus(self) -> float:
if self._state.current_task_id != 3 or self._state.planning_penalty_total > 0.0:
return 0.0
capacity_sensitive_count = sum(
1 for ticket in self._queue if self._ticket_has_alternate_route(ticket)
)
if capacity_sensitive_count == 0:
return 0.0
coverage = min(
1.0,
self._state.capacity_pressure_tickets_resolved / capacity_sensitive_count,
)
return round(PLANNING_SUCCESS_BONUS * coverage, 4)
def _internal_routing_note_for_ticket(
self,
ticket: HelpdeskTicketRecord,
) -> str | None:
if self._state.current_task_id != 3:
return ticket.ambiguity_note or ticket.planning_note
note_parts: list[str] = []
if ticket.ambiguity_note is not None:
note_parts.append(ticket.ambiguity_note)
if ticket.planning_note is not None:
note_parts.append(ticket.planning_note)
default_group = ISSUE_TYPE_TO_ASSIGNMENT_GROUP.get(
ticket.issue_type,
ticket.assignment_group,
)
default_action = ISSUE_TYPE_TO_RESOLUTION_ACTION.get(
ticket.issue_type,
ticket.resolution_action,
)
if ticket.assignment_group != default_group:
note_parts.append(
"Routing override: send this to "
f"{ticket.assignment_group} rather than the default {default_group} queue."
)
if ticket.resolution_action != default_action:
note_parts.append(
"Action override: use "
f"{ticket.resolution_action} instead of the default {default_action} next step."
)
if ticket.issue_type == "onboarding" and ticket.assignment_group == "service_desk":
note_parts.append(
"The onboarding workflow is blocked by an access dependency, so the unblocker owns the next move."
)
if (
ticket.issue_type == "security_compliance"
and ticket.assignment_group == "application_team"
):
note_parts.append(
"This compliance issue needs a product-team fix rather than a central security handoff."
)
if ticket.issue_type == "billing_license" and ticket.assignment_group == "procurement":
note_parts.append(
"Treat this as commercial procurement work instead of routine license fulfillment."
)
if not note_parts:
return None
return " ".join(note_parts)
def _ticket_has_nondefault_routing(self, ticket: HelpdeskTicketRecord) -> bool:
return (
ticket.assignment_group
!= ISSUE_TYPE_TO_ASSIGNMENT_GROUP.get(ticket.issue_type, ticket.assignment_group)
or ticket.resolution_action
!= ISSUE_TYPE_TO_RESOLUTION_ACTION.get(
ticket.issue_type, ticket.resolution_action
)
)
def _ticket_mentions_follow_up(self, ticket: HelpdeskTicketRecord) -> bool:
text = f"{ticket.title} {ticket.description}".lower()
return any(
phrase in text
for phrase in (
"re:",
"follow-up",
"following up",
"still",
"third update",
"reference ticket",
"regression",
"unresolved",
)
)
def _ticket_text(self, ticket: HelpdeskTicketRecord) -> str:
return f"{ticket.title} {ticket.description}".lower()
def _requires_incident(self, ticket: HelpdeskTicketRecord) -> bool:
if ticket.incident_recommended:
return True
text = self._ticket_text(ticket)
return (
ticket.priority in {"high", "critical"}
and ticket.issue_type
in {"application_support", "identity_access", "security_compliance"}
and any(
phrase in text
for phrase in (
"outage",
"cannot log in",
"login",
"regression",
"unstable",
"blocked",
"lockout",
"company-wide",
"production",
"unresolved",
)
)
)
def _incident_open_for_ticket(self, ticket: HelpdeskTicketRecord) -> bool:
related_ids = {ticket.ticket_id}
if ticket.related_ticket_id:
related_ids.add(ticket.related_ticket_id)
if ticket.generated_from_ticket_id:
related_ids.add(ticket.generated_from_ticket_id)
if any(ticket_id in self._state.open_incident_ticket_ids for ticket_id in related_ids):
return True
ticket_cluster_keys = self._cluster_keys_for_ticket(ticket)
if not ticket_cluster_keys:
return False
for open_ticket_id in self._state.open_incident_ticket_ids:
open_ticket = self._tickets_by_id.get(open_ticket_id)
if open_ticket is None:
continue
if ticket_cluster_keys & self._cluster_keys_for_ticket(open_ticket):
return True
return False
def _request_info_note_for_ticket(self, ticket: HelpdeskTicketRecord) -> str | None:
note_parts: list[str] = []
if ticket.customer_update_note:
note_parts.append(ticket.customer_update_note)
if ticket.related_ticket_id is not None:
note_parts.append(
"The requester confirmed this is connected to the earlier case and wants a single accountable owner."
)
if self._ticket_has_nondefault_routing(ticket):
note_parts.append(
"The requester clarified that the blocker owner matters more than the superficial request label."
)
if self._ticket_has_alternate_route(ticket):
note_parts.append(
"Operations said an acknowledged fallback path is acceptable if the preferred queue is saturated."
)
if self._requires_incident(ticket):
note_parts.append(
"Stakeholders asked for incident-style coordination because the issue is still operationally active."
)
if not note_parts:
return None
return " ".join(note_parts)
def _request_info_used(self, ticket_id: str) -> bool:
return self._state.ticket_request_info_usage.get(ticket_id, 0) > 0
def _defer_count(self, ticket_id: str) -> int:
return self._state.ticket_defer_counts.get(ticket_id, 0)
def _record_dynamic_queue_event(self, event_type: str, **details: Any) -> None:
self._state.dynamic_queue_events.append({"event_type": event_type, **details})
def _escalate_priority_level(self, priority: str) -> str:
if priority == "low":
return "medium"
if priority == "medium":
return "high"
return "critical"
def _escalate_ticket_after_delay(
self,
ticket: HelpdeskTicketRecord,
*,
defer_count: int,
) -> HelpdeskTicketRecord:
escalated_priority = self._escalate_priority_level(ticket.priority)
description_suffix = (
" The ticket was deferred earlier in the queue and now needs firmer ownership."
)
customer_update = (
ticket.customer_update_note
or "The requester followed up after the delay and wants a committed owner."
)
return ticket.model_copy(
update={
"priority": escalated_priority,
"title": (
ticket.title
if ticket.title.lower().startswith("re:")
else f"Re: {ticket.title}"
),
"description": f"{ticket.description}{description_suffix}",
"customer_update_note": customer_update,
}
)
def _should_spawn_follow_up(
self,
ticket: HelpdeskTicketRecord,
*,
score: float,
context_penalty: float,
incident_gap_penalty: float,
) -> bool:
task_id = int(self._state.current_task_id or 1)
if task_id < 2:
return False
if ticket.generated_from_ticket_id is not None:
return False
if ticket.ticket_id in self._state.spawned_follow_up_source_ids:
return False
follow_up_risk = (
self._requires_incident(ticket)
or self._ticket_mentions_follow_up(ticket)
or ticket.related_ticket_id is not None
or ticket.priority in {"high", "critical"}
or self._cluster_summary(ticket)["future_cluster_ticket_count"] > 0
)
if not follow_up_risk:
return False
if task_id == 2 and not (
ticket.related_ticket_id is not None
or self._ticket_mentions_follow_up(ticket)
or self._cluster_summary(ticket)["future_cluster_ticket_count"] > 0
or self._ticket_repeated_requester_count(ticket) >= 2
):
return False
return (
score < FOLLOW_UP_SPAWN_THRESHOLD
or (context_penalty >= 0.15 and score < 0.9)
or incident_gap_penalty > 0.0
)
def _spawn_follow_up_ticket(self, ticket: HelpdeskTicketRecord) -> HelpdeskTicketRecord:
follow_up_ticket = HelpdeskTicketRecord(
ticket_id=f"{ticket.ticket_id}-followup",
title=(
ticket.title
if ticket.title.lower().startswith("re:")
else f"Re: {ticket.title}"
),
requester=ticket.requester,
description=(
"The earlier handling did not fully resolve the issue. The requester is "
f"following up on {ticket.ticket_id} and needs a single accountable owner now."
),
issue_type=ticket.issue_type,
priority=(
"critical"
if ticket.priority in {"high", "critical"}
else self._escalate_priority_level(ticket.priority)
),
assignment_group=ticket.assignment_group,
resolution_action=(
"escalate"
if ticket.priority in {"high", "critical"} or self._requires_incident(ticket)
else ticket.resolution_action
),
ambiguity_note=(
ticket.ambiguity_note
or "Prior routing did not settle ownership; route to the team that can actually unblock the issue."
),
related_ticket_id=ticket.ticket_id,
planning_note=ticket.planning_note,
customer_update_note=(
"The requester said the last response did not resolve the blocker and wants an accountable next owner."
),
incident_recommended=self._requires_incident(ticket),
generated_from_ticket_id=ticket.ticket_id,
service_cluster_id=ticket.service_cluster_id or ticket.ticket_id,
)
self._queue.append(follow_up_ticket)
self._tickets_by_id[follow_up_ticket.ticket_id] = follow_up_ticket
self._sync_queue_ticket_ids()
self._state.spawned_follow_up_ticket_ids.append(follow_up_ticket.ticket_id)
self._state.spawned_follow_up_source_ids.append(ticket.ticket_id)
self._record_dynamic_queue_event(
"spawn_follow_up",
source_ticket_id=ticket.ticket_id,
follow_up_ticket_id=follow_up_ticket.ticket_id,
)
return follow_up_ticket
def _ticket_repeated_requester_count(self, ticket: HelpdeskTicketRecord) -> int:
return sum(
1
for candidate in self._tickets_by_id.values()
if candidate.requester == ticket.requester
)
def _tool_has_available_context(
self,
ticket: HelpdeskTicketRecord,
tool_name: str,
) -> bool:
if tool_name == "lookup_related_ticket":
return (
ticket.related_ticket_id is not None
and ticket.related_ticket_id in self._tickets_by_id
)
if tool_name == "lookup_requester_history":
return self._ticket_repeated_requester_count(ticket) >= 2
if tool_name == "lookup_internal_routing_note":
return self._internal_routing_note_for_ticket(ticket) is not None
if tool_name == "lookup_queue_capacity_forecast":
return self._state.current_task_id == 3 and (
self._ticket_has_alternate_route(ticket)
or self._future_queue_demand()["remaining_ticket_count"] > 0
)
if tool_name == "lookup_queue_cluster_summary":
if (self._state.current_task_id or 1) < 2:
return False
cluster_summary = self._cluster_summary(ticket)
return (
cluster_summary["future_cluster_ticket_count"] > 0
or cluster_summary["shared_requester_count"] > 1
)
return False
def _required_tools_for_ticket(
self,
ticket: HelpdeskTicketRecord,
task_id: int | None = None,
) -> list[str]:
resolved_task_id = self._state.current_task_id if task_id is None else task_id
if resolved_task_id is None or resolved_task_id < 2:
return []
required_tools: list[str] = list(TASK3_INVESTIGATION_TOOL_PLAN.get(ticket.ticket_id, ()))
if ticket.related_ticket_id is not None and "lookup_related_ticket" not in required_tools:
required_tools.append("lookup_related_ticket")
if (
self._internal_routing_note_for_ticket(ticket) is not None
and "lookup_internal_routing_note" not in required_tools
):
required_tools.append("lookup_internal_routing_note")
if (
self._ticket_repeated_requester_count(ticket) >= 2
and (
ticket.related_ticket_id is not None
or self._ticket_mentions_follow_up(ticket)
or self._ticket_has_nondefault_routing(ticket)
or ticket.priority in {"high", "critical"}
)
and "lookup_requester_history" not in required_tools
):
required_tools.append("lookup_requester_history")
if (
resolved_task_id == 3
and self._ticket_is_capacity_sensitive(ticket)
and "lookup_queue_capacity_forecast" not in required_tools
):
required_tools.append("lookup_queue_capacity_forecast")
ticket_index = self._ticket_queue_index(ticket)
cluster_start_index = (
ticket_index + 1
if ticket_index is not None
else self._state.current_ticket_index + 1
)
if resolved_task_id == 3:
cluster_summary = self._cluster_summary(
ticket,
start_index=cluster_start_index,
)
if (
cluster_summary["future_cluster_ticket_count"] > 0
and "lookup_queue_cluster_summary" not in required_tools
and (
self._requires_incident(ticket)
or cluster_summary["future_high_priority_count"] > 0
or cluster_summary["shared_requester_count"] > 1
)
):
required_tools.append("lookup_queue_cluster_summary")
if resolved_task_id == 2:
cluster_summary = self._cluster_summary(
ticket,
start_index=cluster_start_index,
)
if (
cluster_summary["future_cluster_ticket_count"] > 0
and "lookup_queue_cluster_summary" not in required_tools
and (
ticket.related_ticket_id is not None
or cluster_summary["shared_requester_count"] > 1
or self._ticket_mentions_follow_up(ticket)
)
):
required_tools.append("lookup_queue_cluster_summary")
filtered_required_tools: list[str] = []
allowed_tool_set = set(self._available_tools_for_task(resolved_task_id))
for tool_name in required_tools:
if tool_name in filtered_required_tools:
continue
if tool_name not in allowed_tool_set:
continue
if self._tool_has_available_context(ticket, tool_name):
filtered_required_tools.append(tool_name)
return filtered_required_tools
def _recommended_operational_actions(self, ticket: HelpdeskTicketRecord) -> list[str]:
recommended_actions: list[str] = []
available_action_types = set(self._available_action_types_for_task())
cluster_summary = self._cluster_summary(ticket)
if (
"request_info" in available_action_types
and self._request_info_note_for_ticket(ticket) is not None
and not self._request_info_used(ticket.ticket_id)
):
recommended_actions.append("request_info")
if (
"open_incident" in available_action_types
and self._requires_incident(ticket)
and not self._incident_open_for_ticket(ticket)
):
recommended_actions.append("open_incident")
if (
"defer" in available_action_types
and self._defer_count(ticket.ticket_id) < MAX_DEFERS_PER_TICKET
and self._state.current_ticket_index < len(self._queue) - 1
and ticket.priority not in {"high", "critical"}
and (
bool(self._remaining_tools_for_ticket(ticket))
or self._ticket_is_capacity_sensitive(ticket)
or self._request_info_note_for_ticket(ticket) is not None
or cluster_summary["future_cluster_ticket_count"] > 0
)
):
recommended_actions.append("defer")
return recommended_actions
def _used_tools_for_ticket(self, ticket_id: str) -> list[str]:
return list(self._state.ticket_tool_usage.get(ticket_id, []))
def _remaining_tools_for_ticket(
self,
ticket: HelpdeskTicketRecord,
task_id: int | None = None,
) -> list[str]:
required_tools = self._required_tools_for_ticket(ticket, task_id)
used_tools = set(self._used_tools_for_ticket(ticket.ticket_id))
return [tool for tool in required_tools if tool not in used_tools]
def _record_tool_usage(self, ticket_id: str, tool_name: str) -> None:
used = self._state.ticket_tool_usage.setdefault(ticket_id, [])
if tool_name not in used:
used.append(tool_name)
def _tool_progress_for_ticket(self, ticket: HelpdeskTicketRecord) -> dict[str, Any]:
required_tools = self._required_tools_for_ticket(ticket)
revealed_tools = self._used_tools_for_ticket(ticket.ticket_id)
remaining_tools = self._remaining_tools_for_ticket(ticket)
total_required = max(1, len(required_tools))
request_info_used = self._request_info_used(ticket.ticket_id)
operational_actions = self._recommended_operational_actions(ticket)
return {
"required_tools": required_tools,
"revealed_tools": revealed_tools,
"remaining_tools": remaining_tools,
"revealed_count": len(revealed_tools),
"remaining_count": len(remaining_tools),
"completeness": round(len(revealed_tools) / total_required, 2),
"request_info_used": request_info_used,
"recommended_operational_actions": operational_actions,
}
def _default_redacted_description(self, ticket: HelpdeskTicketRecord) -> str:
cluster_summary = self._cluster_summary(ticket)
if cluster_summary["future_cluster_ticket_count"] > 0:
return (
"This ticket is part of a broader queue cluster and the best next step depends "
"on downstream consequences. Additional routing context is available via investigation."
)
if ticket.related_ticket_id is not None:
return (
"This is a follow-up operational issue. "
"Additional routing context is available via investigation."
)
if self._internal_routing_note_for_ticket(ticket) is not None:
return (
"The visible request is not enough to choose the final owner and next step. "
"Additional routing context is available via investigation."
)
if self._ticket_has_alternate_route(ticket):
return (
"The queue is under resource pressure and this ticket may support more than "
"one acceptable routing path. Additional planning context is available via investigation."
)
if self._ticket_has_nondefault_routing(ticket):
return (
"The visible request looks straightforward, but the decisive routing detail is hidden until investigation."
)
return (
"Additional routing context is available via investigation before final submission."
)
def _default_redacted_title(self, ticket: HelpdeskTicketRecord) -> str:
if self._cluster_summary(ticket)["future_cluster_ticket_count"] > 0:
return "Clustered queue decision with hidden downstream impact"
if ticket.related_ticket_id is not None:
return "Follow-up request with hidden routing context"
if self._internal_routing_note_for_ticket(ticket) is not None:
return "Routing clarification required"
if self._ticket_has_alternate_route(ticket):
return "Capacity-sensitive routing decision"
if self._ticket_mentions_follow_up(ticket):
return "Priority support follow-up"
return "Helpdesk routing decision"
def _visible_title(self, ticket: HelpdeskTicketRecord) -> str:
if self._state.current_task_id in {2, 3} and self._remaining_tools_for_ticket(ticket):
return HARD_TASK_TITLE_REDACTIONS.get(
ticket.ticket_id,
self._default_redacted_title(ticket),
)
return ticket.title
def _visible_description(self, ticket: HelpdeskTicketRecord) -> str:
if self._state.current_task_id in {2, 3} and self._remaining_tools_for_ticket(ticket):
return HARD_TASK_DESCRIPTION_REDACTIONS.get(
ticket.ticket_id,
self._default_redacted_description(ticket),
)
return ticket.description
def _submit_context_penalty(self, ticket: HelpdeskTicketRecord) -> tuple[float, int]:
progress = self._tool_progress_for_ticket(ticket)
required_tools = progress["required_tools"]
remaining_tools = progress["remaining_tools"]
if not required_tools or not remaining_tools:
return 0.0, 0
penalty = PREMATURE_SUBMIT_PENALTY * (
len(remaining_tools) / max(1, len(required_tools))
)
if self._ticket_has_nondefault_routing(ticket):
penalty += NONDEFAULT_HIDDEN_CONTEXT_PENALTY * (
len(remaining_tools) / max(1, len(required_tools))
)
return round(min(0.45, penalty), 4), len(remaining_tools)
def _context_completion_bonus(
self,
ticket: HelpdeskTicketRecord,
*,
missing_required_count: int,
score: float,
) -> float:
if not self._required_tools_for_ticket(ticket):
return 0.0
if missing_required_count != 0 or score < 0.75:
return 0.0
bonus = CONTEXT_COMPLETION_BONUS
if self._ticket_has_nondefault_routing(ticket):
bonus += NONDEFAULT_ROUTING_FOLLOWTHROUGH_BONUS
return bonus
def _trajectory_consistency_bonus(self) -> float:
if not self._queue:
return 0.0
hidden_context_tickets = [
ticket for ticket in self._queue if self._required_tools_for_ticket(ticket)
]
if not hidden_context_tickets:
return 0.0
resolved = sum(
1 for ticket in hidden_context_tickets if not self._remaining_tools_for_ticket(ticket)
)
resolution_rate = resolved / len(hidden_context_tickets)
return round(TRAJECTORY_CONTEXT_COMPLETION_BONUS * resolution_rate, 4)
def _operational_risk_penalty(
self,
ticket: HelpdeskTicketRecord,
action: HelpdeskTicketAction,
*,
task_id: int,
) -> float:
if task_id < 2 or action.priority is None:
priority_penalty = 0.0
else:
priority_rank = {"critical": 3, "high": 2, "medium": 1, "low": 0}
expected_rank = priority_rank.get(ticket.priority, 0)
predicted_rank = priority_rank.get(action.priority, 0)
gap = expected_rank - predicted_rank
if gap >= 2:
priority_penalty = SEVERE_PRIORITY_UNDERSHOOT_PENALTY
elif gap == 1 and ticket.priority in {"high", "critical"}:
priority_penalty = PRIORITY_UNDERSHOOT_PENALTY
else:
priority_penalty = 0.0
resolution_penalty = 0.0
if task_id == 3 and action.resolution_action is not None:
if (
ticket.issue_type in {"identity_access", "application_support", "security_compliance"}
and ticket.priority in {"high", "critical"}
and action.resolution_action == "acknowledge"
):
resolution_penalty += DANGEROUS_RESOLUTION_PENALTY
if ticket.issue_type == "spam_phishing" and action.resolution_action == "fulfill":
resolution_penalty += PRIORITY_UNDERSHOOT_PENALTY
return round(priority_penalty + resolution_penalty, 4)
def _incident_gap_penalty(
self,
ticket: HelpdeskTicketRecord,
action: HelpdeskTicketAction,
) -> float:
if self._state.current_task_id != 3:
return 0.0
if not self._requires_incident(ticket):
return 0.0
if self._incident_open_for_ticket(ticket):
return 0.0
if action.resolution_action in {"escalate", "assign"}:
return round(INCIDENT_GAP_PENALTY / 2, 4)
return INCIDENT_GAP_PENALTY
def _build_reward_components(
self,
*,
ticket_score: float,
field_breakdown: dict[str, float],
shaped_step_reward: float,
reward_kind: str,
final_reward: float,
milestone_adjustment: float = 0.0,
trajectory_reward: float | None = None,
investigation_penalty: float = 0.0,
penalty_reason: str | None = None,
extra_details: dict[str, Any] | None = None,
) -> dict[str, Any]:
components: dict[str, Any] = {
"reward_kind": reward_kind,
"ticket_score": ticket_score,
"field_breakdown": field_breakdown,
"shaped_step_reward": shaped_step_reward,
"milestone_adjustment": milestone_adjustment,
"final_reward": final_reward,
"average_score_so_far": self._current_average_score(),
"investigation_penalty_applied": investigation_penalty,
}
if trajectory_reward is not None:
components["trajectory_reward"] = trajectory_reward
if penalty_reason is not None:
components["penalty_reason"] = penalty_reason
if extra_details:
components.update(extra_details)
return components
def _lookup_related_ticket(
self,
current_ticket: HelpdeskTicketRecord,
target_ticket_id: str | None,
) -> dict[str, Any]:
target_id = target_ticket_id or current_ticket.related_ticket_id
if target_id is None:
return {
"tool_name": "lookup_related_ticket",
"found": False,
"message": "Current ticket has no linked related_ticket_id.",
}
related_ticket = self._tickets_by_id.get(target_id)
if related_ticket is None:
return {
"tool_name": "lookup_related_ticket",
"found": False,
"message": f"Ticket {target_id!r} was not found in the dataset.",
}
return {
"tool_name": "lookup_related_ticket",
"found": True,
"ticket": {
"ticket_id": related_ticket.ticket_id,
"title": related_ticket.title,
"requester": related_ticket.requester,
"description": related_ticket.description,
"issue_type": related_ticket.issue_type,
"priority": related_ticket.priority,
"assignment_group": related_ticket.assignment_group,
"resolution_action": related_ticket.resolution_action,
},
}
def _lookup_requester_history(self, current_ticket: HelpdeskTicketRecord) -> dict[str, Any]:
matches = [
{
"ticket_id": ticket.ticket_id,
"title": ticket.title,
"issue_type": ticket.issue_type,
"priority": ticket.priority,
"assignment_group": ticket.assignment_group,
"resolution_action": ticket.resolution_action,
}
for ticket in self._tickets_by_id.values()
if ticket.requester == current_ticket.requester
and ticket.ticket_id != current_ticket.ticket_id
]
return {
"tool_name": "lookup_requester_history",
"found": bool(matches),
"requester": current_ticket.requester,
"matches": matches,
}
def _lookup_internal_routing_note(self, current_ticket: HelpdeskTicketRecord) -> dict[str, Any]:
routing_note = self._internal_routing_note_for_ticket(current_ticket)
found = routing_note is not None
return {
"tool_name": "lookup_internal_routing_note",
"found": found,
"ticket_id": current_ticket.ticket_id,
"routing_note": routing_note if found else "",
}
def _lookup_queue_capacity_forecast(
self,
current_ticket: HelpdeskTicketRecord,
) -> dict[str, Any]:
recommendation = self._planning_route_recommendation(current_ticket)
routing_options = self._routing_options_for_ticket(current_ticket)
return {
"tool_name": "lookup_queue_capacity_forecast",
"found": True,
"ticket_id": current_ticket.ticket_id,
"preferred_route_label": recommendation["preferred_label"],
"primary_pressure": recommendation["primary_pressure"],
"alternate_pressure": recommendation["alternate_pressure"],
"capacity_state": recommendation["capacity_state"],
"future_queue_demand": recommendation["future_demand"],
"routing_options": routing_options,
"incident_recommended": self._requires_incident(current_ticket),
}
def _lookup_queue_cluster_summary(
self,
current_ticket: HelpdeskTicketRecord,
) -> dict[str, Any]:
cluster_summary = self._cluster_summary(current_ticket)
return {
"tool_name": "lookup_queue_cluster_summary",
"found": cluster_summary["future_cluster_ticket_count"] > 0
or cluster_summary["shared_requester_count"] > 1,
"ticket_id": current_ticket.ticket_id,
**cluster_summary,
}
def _run_investigation_tool(
self,
current_ticket: HelpdeskTicketRecord,
tool_name: str,
target_ticket_id: str | None,
) -> dict[str, Any]:
if tool_name == "lookup_related_ticket":
return self._lookup_related_ticket(current_ticket, target_ticket_id)
if tool_name == "lookup_requester_history":
return self._lookup_requester_history(current_ticket)
if tool_name == "lookup_internal_routing_note":
return self._lookup_internal_routing_note(current_ticket)
if tool_name == "lookup_queue_capacity_forecast":
return self._lookup_queue_capacity_forecast(current_ticket)
if tool_name == "lookup_queue_cluster_summary":
return self._lookup_queue_cluster_summary(current_ticket)
raise ValueError(f"Unsupported tool_name: {tool_name}")
def _handle_investigation_action(
self,
task: dict,
current_ticket: HelpdeskTicketRecord,
action: HelpdeskTicketAction,
idx: int,
) -> HelpdeskTicketObservation:
if action.tool_name is None:
raise ValueError("Investigate actions require tool_name")
if action.tool_name not in self._available_tools_for_task():
raise ValueError(f"Unsupported tool_name for current task: {action.tool_name}")
submitted_fields = {
field
for field in ("issue_type", "priority", "assignment_group", "resolution_action")
if getattr(action, field) is not None
}
if submitted_fields:
raise ValueError(
"Investigate actions cannot include submit fields: "
f"{sorted(submitted_fields)}"
)
tool_result = self._run_investigation_tool(
current_ticket,
action.tool_name,
action.tool_target_ticket_id,
)
required_tools = self._required_tools_for_ticket(current_ticket)
already_used = action.tool_name in self._used_tools_for_ticket(current_ticket.ticket_id)
useful_investigation = (
action.tool_name in required_tools
and not already_used
and bool(tool_result.get("found", True))
)
self._record_tool_usage(current_ticket.ticket_id, action.tool_name)
self._state.step_count += 1
self._state.investigation_steps += 1
self._state.investigation_budget_remaining = max(
0,
self._state.investigation_budget_remaining - 1,
)
self._state.last_tool_result = tool_result
investigation_reward = USEFUL_INVESTIGATION_REWARD if useful_investigation else 0.0
investigation_score = 0.0
self._state.last_step_reward = investigation_reward
self._state.reward = investigation_reward
self._state.done = False
self._state.investigation_penalty_applied = self._compute_episode_penalty()
progress = self._tool_progress_for_ticket(current_ticket)
reward_components = self._build_reward_components(
ticket_score=investigation_score,
field_breakdown={},
shaped_step_reward=investigation_reward,
reward_kind="investigation",
final_reward=investigation_reward,
investigation_penalty=self._state.investigation_penalty_applied,
extra_details={
"new_context_revealed": useful_investigation,
"required_investigation_count": len(required_tools),
"hidden_context_remaining_count": progress["remaining_count"],
"hidden_context_revealed_count": progress["revealed_count"],
"context_completeness": progress["completeness"],
"tool_name": action.tool_name,
},
)
self._state.history_entries.append(
self._build_history_entry(
current_ticket,
predicted=action.model_dump(exclude_none=True),
score=investigation_score,
breakdown={},
queue_position=idx + 1,
reward=investigation_reward,
reward_kind="investigation",
tool_result=tool_result,
reward_components=reward_components,
)
)
self._state.last_reward_components = reward_components
return self._build_observation(task, done=False, reward=investigation_reward)
def _handle_request_info_action(
self,
task: dict,
current_ticket: HelpdeskTicketRecord,
action: HelpdeskTicketAction,
idx: int,
) -> HelpdeskTicketObservation:
submitted_fields = {
field
for field in ("issue_type", "priority", "assignment_group", "resolution_action")
if getattr(action, field) is not None
}
if submitted_fields:
raise ValueError(
"request_info actions cannot include submit fields: "
f"{sorted(submitted_fields)}"
)
ticket_id = current_ticket.ticket_id
note = self._request_info_note_for_ticket(current_ticket)
already_used = self._request_info_used(ticket_id)
useful_request = note is not None and not already_used
self._state.ticket_request_info_usage[ticket_id] = (
self._state.ticket_request_info_usage.get(ticket_id, 0) + 1
)
self._state.step_count += 1
self._state.investigation_steps += 1
self._state.investigation_budget_remaining = max(
0,
self._state.investigation_budget_remaining - 1,
)
request_reward = USEFUL_REQUEST_INFO_REWARD if useful_request else 0.0
tool_result = {
"action_type": "request_info",
"found": useful_request,
"ticket_id": ticket_id,
"customer_update_note": note if useful_request else "",
}
self._state.last_tool_result = tool_result
self._state.last_step_reward = request_reward
self._state.reward = request_reward
self._state.done = False
self._state.investigation_penalty_applied = self._compute_episode_penalty()
progress = self._tool_progress_for_ticket(current_ticket)
reward_components = self._build_reward_components(
ticket_score=0.0,
field_breakdown={},
shaped_step_reward=request_reward,
reward_kind="operational",
final_reward=request_reward,
investigation_penalty=self._state.investigation_penalty_applied,
extra_details={
"operational_action": "request_info",
"new_context_revealed": useful_request,
"customer_update_visible": useful_request,
"hidden_context_remaining_count": progress["remaining_count"],
"context_completeness": progress["completeness"],
},
)
self._state.history_entries.append(
self._build_history_entry(
current_ticket,
predicted=action.model_dump(exclude_none=True),
score=0.0,
breakdown={},
queue_position=idx + 1,
reward=request_reward,
reward_kind="operational",
tool_result=tool_result,
reward_components=reward_components,
)
)
self._state.last_reward_components = reward_components
return self._build_observation(task, done=False, reward=request_reward)
def _handle_defer_action(
self,
task: dict,
current_ticket: HelpdeskTicketRecord,
action: HelpdeskTicketAction,
idx: int,
) -> HelpdeskTicketObservation:
submitted_fields = {
field
for field in ("issue_type", "priority", "assignment_group", "resolution_action")
if getattr(action, field) is not None
}
if submitted_fields:
raise ValueError(
"defer actions cannot include submit fields: "
f"{sorted(submitted_fields)}"
)
ticket_id = current_ticket.ticket_id
existing_count = self._defer_count(ticket_id)
defer_allowed = (
existing_count < MAX_DEFERS_PER_TICKET
and idx < len(self._queue) - 1
and self._state.current_task_id in {2, 3}
)
defer_count = existing_count + 1
reward = 0.0
sla_risk = current_ticket.priority in {"high", "critical"} or self._ticket_mentions_follow_up(
current_ticket
)
moved_ticket = current_ticket
if defer_allowed:
self._state.ticket_defer_counts[ticket_id] = defer_count
self._state.deferred_ticket_count += 1
if sla_risk:
self._state.sla_breach_count += 1
moved_ticket = self._escalate_ticket_after_delay(
current_ticket,
defer_count=defer_count,
)
elif (
self._remaining_tools_for_ticket(current_ticket)
or self._request_info_note_for_ticket(current_ticket) is not None
or self._ticket_is_capacity_sensitive(current_ticket)
):
reward = REQUEST_INFO_CONTEXT_COMPLETION_BONUS
self._queue.pop(idx)
self._queue.append(moved_ticket)
self._tickets_by_id[moved_ticket.ticket_id] = moved_ticket
self._sync_queue_ticket_ids()
self._record_dynamic_queue_event(
"defer",
ticket_id=ticket_id,
defer_count=defer_count,
sla_risk=sla_risk,
)
else:
self._state.sla_breach_count += 1
self._record_dynamic_queue_event(
"defer_denied",
ticket_id=ticket_id,
defer_count=defer_count,
)
self._state.step_count += 1
self._state.last_tool_result = {
"action_type": "defer",
"ticket_id": ticket_id,
"defer_allowed": defer_allowed,
"defer_count": defer_count,
"sla_risk": sla_risk,
}
self._state.last_step_reward = reward
self._state.reward = reward
self._state.done = False
reward_components = self._build_reward_components(
ticket_score=0.0,
field_breakdown={},
shaped_step_reward=reward,
reward_kind="operational",
final_reward=reward,
extra_details={
"operational_action": "defer",
"defer_allowed": defer_allowed,
"defer_count": defer_count,
"sla_breach_count": self._state.sla_breach_count,
},
)
self._state.history_entries.append(
self._build_history_entry(
current_ticket,
predicted=action.model_dump(exclude_none=True),
score=0.0,
breakdown={},
queue_position=idx + 1,
reward=reward,
reward_kind="operational",
tool_result=self._state.last_tool_result,
reward_components=reward_components,
)
)
self._state.last_reward_components = reward_components
return self._build_observation(task, done=False, reward=reward)
def _handle_open_incident_action(
self,
task: dict,
current_ticket: HelpdeskTicketRecord,
action: HelpdeskTicketAction,
idx: int,
) -> HelpdeskTicketObservation:
submitted_fields = {
field
for field in ("issue_type", "priority", "assignment_group", "resolution_action")
if getattr(action, field) is not None
}
if submitted_fields:
raise ValueError(
"open_incident actions cannot include submit fields: "
f"{sorted(submitted_fields)}"
)
useful_incident = (
self._state.current_task_id == 3
and self._requires_incident(current_ticket)
and not self._incident_open_for_ticket(current_ticket)
)
overflow = 0
incident_reward = 0.0
if useful_incident:
self._state.open_incident_ticket_ids.append(current_ticket.ticket_id)
self._state.incident_actions_used += 1
overflow = max(0, 1 - self._state.incident_slots_remaining)
self._state.incident_slots_remaining = max(
0,
self._state.incident_slots_remaining - 1,
)
overflow_penalty = round(overflow * INCIDENT_SLOT_OVERFLOW_PENALTY, 4)
if overflow_penalty > 0.0:
self._state.planning_penalty_total = round(
self._state.planning_penalty_total + overflow_penalty,
4,
)
self._state.planning_penalty_applied = overflow_penalty
incident_reward = clamp_open_unit_interval(
INCIDENT_OPEN_REWARD - overflow_penalty
)
self._record_dynamic_queue_event(
"open_incident",
ticket_id=current_ticket.ticket_id,
overflow=overflow,
)
self._state.step_count += 1
self._state.last_tool_result = {
"action_type": "open_incident",
"ticket_id": current_ticket.ticket_id,
"incident_open": useful_incident,
"incident_slots_remaining": self._state.incident_slots_remaining,
"overflow": overflow,
}
self._state.last_step_reward = incident_reward
self._state.reward = incident_reward
self._state.done = False
reward_components = self._build_reward_components(
ticket_score=0.0,
field_breakdown={},
shaped_step_reward=incident_reward,
reward_kind="operational",
final_reward=incident_reward,
extra_details={
"operational_action": "open_incident",
"incident_open": useful_incident,
"incident_slots_remaining": self._state.incident_slots_remaining,
},
)
self._state.history_entries.append(
self._build_history_entry(
current_ticket,
predicted=action.model_dump(exclude_none=True),
score=0.0,
breakdown={},
queue_position=idx + 1,
reward=incident_reward,
reward_kind="operational",
tool_result=self._state.last_tool_result,
reward_components=reward_components,
)
)
self._state.last_reward_components = reward_components
return self._build_observation(task, done=False, reward=incident_reward)
def _build_ticket_view(self, ticket: HelpdeskTicketRecord) -> dict[str, Any]:
progress = self._tool_progress_for_ticket(ticket)
remaining_tools = progress["remaining_tools"]
used_tools = set(self._used_tools_for_ticket(ticket.ticket_id))
operational_actions = progress["recommended_operational_actions"]
cluster_summary = self._cluster_summary(ticket)
cluster_hint = (
cluster_summary["future_cluster_ticket_count"] > 0
or cluster_summary["shared_requester_count"] > 1
)
ticket_view: dict[str, Any] = {
"ticket_id": ticket.ticket_id,
"title": self._visible_title(ticket),
"requester": ticket.requester,
"description": self._visible_description(ticket),
}
if self._state.current_task_id == 3:
ticket_view["capacity_state"] = self._capacity_state_snapshot()
if progress["required_tools"]:
ticket_view["context_status"] = {
"investigation_required": True,
"hidden_context_remaining": bool(progress["remaining_count"]),
"context_gap_count": progress["remaining_count"],
"revealed_context_count": progress["revealed_count"],
"context_completeness": progress["completeness"],
"investigations_used_for_ticket": progress["revealed_count"],
"recommended_tools": list(remaining_tools),
}
ticket_view["operational_context"] = {
"request_info_available": self._request_info_note_for_ticket(ticket) is not None,
"request_info_used": progress["request_info_used"],
"defer_count": self._defer_count(ticket.ticket_id),
"incident_recommended": self._requires_incident(ticket),
"incident_open": self._incident_open_for_ticket(ticket),
"recommended_actions": operational_actions,
"cluster_coordination_hint": cluster_hint,
"shared_requester_pressure": cluster_summary["shared_requester_count"] > 1,
}
if "lookup_queue_cluster_summary" in used_tools:
ticket_view["operational_context"].update(
{
"service_cluster_id": ticket.service_cluster_id,
"future_cluster_ticket_count": cluster_summary["future_cluster_ticket_count"],
"future_cluster_ticket_ids": cluster_summary["future_cluster_ticket_ids"],
"shared_requester_count": cluster_summary["shared_requester_count"],
"active_incident_cover": cluster_summary["active_incident_cover"],
}
)
if ticket.ambiguity_note is not None and "lookup_internal_routing_note" not in remaining_tools:
ticket_view["ambiguity_note"] = ticket.ambiguity_note
if (
ticket.planning_note is not None
and "lookup_internal_routing_note" not in remaining_tools
):
ticket_view["planning_note"] = ticket.planning_note
if self._request_info_used(ticket.ticket_id):
ticket_view["customer_update_note"] = self._request_info_note_for_ticket(ticket)
if ticket.related_ticket_id is not None and "lookup_related_ticket" not in remaining_tools:
ticket_view["related_ticket_id"] = ticket.related_ticket_id
related_ticket = self._tickets_by_id.get(ticket.related_ticket_id)
if related_ticket is not None:
ticket_view["related_ticket_preview"] = {
"ticket_id": related_ticket.ticket_id,
"title": related_ticket.title,
"requester": related_ticket.requester,
"description": related_ticket.description,
}
if self._ticket_has_alternate_route(ticket) and (
"lookup_internal_routing_note" in used_tools
or "lookup_queue_capacity_forecast" in used_tools
):
ticket_view["routing_options"] = self._routing_options_for_ticket(ticket)
if "lookup_queue_cluster_summary" in used_tools:
ticket_view["cluster_summary"] = cluster_summary
if ticket.generated_from_ticket_id is not None:
ticket_view["generated_from_ticket_id"] = ticket.generated_from_ticket_id
return ticket_view
def _build_feedback_summary(
self,
*,
predicted: dict[str, Any],
score: float,
breakdown: dict[str, float],
reward: float | None = None,
rubric_reward: float | None = None,
reward_kind: str | None = None,
penalty_reason: str | None = None,
tool_result: dict[str, Any] | None = None,
reward_components: dict[str, Any] | None = None,
) -> str:
parts: list[str] = []
if reward_kind == "investigation":
tool_name = predicted.get("tool_name") or (tool_result or {}).get("tool_name")
parts.append(f"Investigation step used {tool_name or 'a tool'}")
if reward_components and reward_components.get("new_context_revealed"):
parts.append("new context was revealed")
elif reward_kind == "operational":
operational_action = (
reward_components.get("operational_action")
if reward_components
else predicted.get("action_type")
)
parts.append(f"Operational step used {operational_action or 'an action'}")
elif penalty_reason is not None:
parts.append(f"Penalty applied: {penalty_reason}")
else:
parts.append(f"Ticket score={score:.2f}")
if breakdown:
field_scores = ", ".join(
f"{field}={value:.2f}" for field, value in sorted(breakdown.items())
)
parts.append(f"field_scores[{field_scores}]")
if reward is not None:
parts.append(f"reward={reward:.2f}")
if rubric_reward is not None:
parts.append(f"rubric_reward={rubric_reward:.2f}")
if reward_components:
context_gap_penalty = reward_components.get("context_gap_penalty")
if context_gap_penalty:
parts.append(f"context_gap_penalty={context_gap_penalty:.2f}")
hidden_context_remaining_count = reward_components.get(
"hidden_context_remaining_count"
)
if hidden_context_remaining_count:
parts.append(
f"hidden_context_remaining={hidden_context_remaining_count}"
)
context_completion_bonus = reward_components.get("context_completion_bonus")
if context_completion_bonus:
parts.append(f"context_bonus={context_completion_bonus:.2f}")
risk_penalty = reward_components.get("risk_penalty")
if risk_penalty:
parts.append(f"risk_penalty={risk_penalty:.2f}")
capacity_penalty = reward_components.get("capacity_penalty")
if capacity_penalty:
parts.append(f"capacity_penalty={capacity_penalty:.2f}")
planning_penalty_total = reward_components.get("planning_penalty_total")
if planning_penalty_total:
parts.append(f"planning_penalty_total={planning_penalty_total:.2f}")
incident_gap_penalty = reward_components.get("incident_gap_penalty")
if incident_gap_penalty:
parts.append(f"incident_gap_penalty={incident_gap_penalty:.2f}")
queue_management_score = reward_components.get("queue_management_score")
if queue_management_score is not None:
parts.append(f"queue_management_score={queue_management_score:.2f}")
spawned_follow_up_ticket_id = reward_components.get("spawned_follow_up_ticket_id")
if spawned_follow_up_ticket_id:
parts.append(f"spawned_follow_up={spawned_follow_up_ticket_id}")
cluster_stabilized_ticket_ids = reward_components.get("cluster_stabilized_ticket_ids")
if cluster_stabilized_ticket_ids:
parts.append(
"cluster_stabilized=" + ",".join(cluster_stabilized_ticket_ids)
)
cluster_destabilized_ticket_ids = reward_components.get(
"cluster_destabilized_ticket_ids"
)
if cluster_destabilized_ticket_ids:
parts.append(
"cluster_destabilized=" + ",".join(cluster_destabilized_ticket_ids)
)
return "; ".join(parts)
def _build_history_entry(
self,
ticket: HelpdeskTicketRecord,
*,
predicted: dict[str, Any],
score: float,
breakdown: dict[str, float],
queue_position: int,
reward: float | None = None,
rubric_reward: float | None = None,
reward_kind: str | None = None,
penalty_reason: str | None = None,
tool_result: dict[str, Any] | None = None,
reward_components: dict[str, Any] | None = None,
) -> dict[str, Any]:
progress = self._tool_progress_for_ticket(ticket)
remaining_tools = progress["remaining_tools"]
cluster_summary = self._cluster_summary(ticket)
history_entry: dict[str, Any] = {
"ticket_id": ticket.ticket_id,
"title": ticket.title,
"requester": ticket.requester,
"predicted": predicted,
"score": score,
"breakdown": breakdown,
"queue_position": queue_position,
"operational_context": {
"request_info_used": progress["request_info_used"],
"defer_count": self._defer_count(ticket.ticket_id),
"incident_open": self._incident_open_for_ticket(ticket),
"recommended_actions": progress["recommended_operational_actions"],
"cluster_coordination_hint": (
cluster_summary["future_cluster_ticket_count"] > 0
or cluster_summary["shared_requester_count"] > 1
),
},
}
if "lookup_queue_cluster_summary" in self._used_tools_for_ticket(ticket.ticket_id):
history_entry["operational_context"].update(
{
"service_cluster_id": ticket.service_cluster_id,
"future_cluster_ticket_count": cluster_summary["future_cluster_ticket_count"],
"active_incident_cover": cluster_summary["active_incident_cover"],
"shared_requester_count": cluster_summary["shared_requester_count"],
}
)
if self._state.current_task_id == 3:
history_entry["capacity_state"] = self._capacity_state_snapshot()
if reward is not None:
history_entry["reward"] = reward
if rubric_reward is not None:
history_entry["rubric_reward"] = rubric_reward
if reward_kind is not None:
history_entry["reward_kind"] = reward_kind
if ticket.ambiguity_note is not None and "lookup_internal_routing_note" not in remaining_tools:
history_entry["ambiguity_note"] = ticket.ambiguity_note
if (
ticket.planning_note is not None
and "lookup_internal_routing_note" not in remaining_tools
):
history_entry["planning_note"] = ticket.planning_note
if self._request_info_used(ticket.ticket_id):
history_entry["customer_update_note"] = self._request_info_note_for_ticket(ticket)
if ticket.related_ticket_id is not None and "lookup_related_ticket" not in remaining_tools:
history_entry["related_ticket_id"] = ticket.related_ticket_id
related_ticket = self._tickets_by_id.get(ticket.related_ticket_id)
if related_ticket is not None:
history_entry["related_ticket_preview"] = {
"ticket_id": related_ticket.ticket_id,
"title": related_ticket.title,
"requester": related_ticket.requester,
"description": related_ticket.description,
}
if (
self._ticket_has_alternate_route(ticket)
and (
"lookup_internal_routing_note" not in remaining_tools
or "lookup_queue_capacity_forecast" in self._used_tools_for_ticket(ticket.ticket_id)
)
):
history_entry["routing_options"] = self._routing_options_for_ticket(ticket)
if "lookup_queue_cluster_summary" in self._used_tools_for_ticket(ticket.ticket_id):
history_entry["cluster_summary"] = cluster_summary
if penalty_reason is not None:
history_entry["penalty_reason"] = penalty_reason
if tool_result is not None:
history_entry["tool_result"] = tool_result
if reward_components is not None:
history_entry["reward_components"] = reward_components
if ticket.generated_from_ticket_id is not None:
history_entry["generated_from_ticket_id"] = ticket.generated_from_ticket_id
if progress["required_tools"]:
history_entry["context_progress"] = {
"hidden_context_remaining": bool(progress["remaining_count"]),
"context_gap_count": progress["remaining_count"],
"revealed_context_count": progress["revealed_count"],
"context_completeness": progress["completeness"],
}
history_entry["feedback_summary"] = self._build_feedback_summary(
predicted=predicted,
score=score,
breakdown=breakdown,
reward=reward,
rubric_reward=rubric_reward,
reward_kind=reward_kind,
penalty_reason=penalty_reason,
tool_result=tool_result,
reward_components=reward_components,
)
return history_entry
def _build_observation(
self,
task: dict,
done: bool = False,
reward: float | None = None,
rubric_reward: float | None = None,
) -> HelpdeskTicketObservation:
idx = self._state.current_ticket_index
queue_size = len(self._queue)
if idx < queue_size:
ticket = self._queue[idx]
ticket_view = self._build_ticket_view(ticket)
queue_position = idx + 1
else:
ticket_view = None
queue_position = 0
history = list(self._state.history_entries)
last_history_entry = history[-1] if history else None
tickets_remaining = max(0, queue_size - idx)
tickets_after_current = max(
0,
tickets_remaining - (1 if ticket_view is not None else 0),
)
progress_fraction = (idx / queue_size) if queue_size else 0.0
metadata = {
"queue_position": queue_position,
"tickets_remaining_includes_current": ticket_view is not None,
"has_ambiguity_note": bool(ticket_view and ticket_view.get("ambiguity_note")),
"has_related_ticket_context": bool(
ticket_view and ticket_view.get("related_ticket_preview")
),
"has_hidden_context": bool(
ticket_view
and (ticket_view.get("context_status") or {}).get("hidden_context_remaining")
),
"action_mode": "investigate_or_submit",
"available_action_types": self._available_action_types_for_task(),
"average_score_so_far": self._state.average_score_so_far,
"progress_fraction": progress_fraction,
"investigation_penalty_applied": self._state.investigation_penalty_applied,
"planning_penalty_total": self._state.planning_penalty_total,
"planning_penalty_applied": self._state.planning_penalty_applied,
"sla_breach_count": self._state.sla_breach_count,
"incident_gap_total": self._state.incident_gap_total,
"queue_management_score": self._state.queue_management_score,
"queue_management_breakdown": dict(self._state.queue_management_breakdown),
"dynamic_queue_events": list(self._state.dynamic_queue_events[-5:]),
"clustered_follow_ons": self._future_queue_demand().get("clustered_follow_ons", 0),
}
if self._state.current_task_id == 3:
metadata["capacity_state"] = self._capacity_state_snapshot()
if last_history_entry is not None:
metadata["last_score"] = last_history_entry.get("score")
metadata["last_reward"] = last_history_entry.get("reward")
metadata["last_reward_kind"] = last_history_entry.get("reward_kind")
metadata["last_breakdown"] = last_history_entry.get("breakdown")
metadata["last_feedback_summary"] = last_history_entry.get("feedback_summary")
metadata["last_reward_components"] = last_history_entry.get("reward_components", {})
if "penalty_reason" in last_history_entry:
metadata["last_penalty_reason"] = last_history_entry["penalty_reason"]
return HelpdeskTicketObservation(
done=done,
reward=reward,
rubric_reward=rubric_reward,
metadata=metadata,
task_id=task["id"],
task_name=task["name"],
instructions=task["instructions"],
allowed_fields=list(task["allowed_fields"]),
available_action_types=self._available_action_types_for_task(),
available_tools=self._available_tools_for_task(),
investigation_budget_remaining=self._state.investigation_budget_remaining,
last_tool_result=self._state.last_tool_result,
current_ticket=ticket_view,
queue_size=queue_size,
tickets_remaining=tickets_remaining,
tickets_after_current=tickets_after_current,
tickets_processed=idx,
queue_position=queue_position,
average_score_so_far=self._state.average_score_so_far,
progress_fraction=progress_fraction,
history=history,
last_reward_components=dict(self._state.last_reward_components),
)