from __future__ import annotations import random import uuid from typing import Any, Optional from openenv.core.env_server.interfaces import Environment from models import ( HelpdeskTicketAction, HelpdeskTicketObservation, HelpdeskTicketRecord, HelpdeskTicketState, ) from server.grader import grade_action from server.reward import ( clamp_open_unit_interval, compute_step_adjustments, compute_trajectory_adjustments, ) from server.tasks import get_task_definition, load_dataset from vocabulary import ( ISSUE_TYPE_TO_ASSIGNMENT_GROUP, ISSUE_TYPE_TO_RESOLUTION_ACTION, ) QUEUE_SIZE_RANGE = (3, 5) BASE_AVAILABLE_TOOLS = ( "lookup_related_ticket", "lookup_requester_history", "lookup_internal_routing_note", "lookup_queue_capacity_forecast", "lookup_queue_cluster_summary", ) TASK_AVAILABLE_ACTION_TYPES: dict[int, tuple[str, ...]] = { 1: ("submit", "investigate"), 2: ("submit", "investigate", "request_info", "defer"), 3: ("submit", "investigate", "request_info", "defer", "open_incident"), } TASK_AVAILABLE_TOOLS: dict[int, tuple[str, ...]] = { 1: ( "lookup_related_ticket", "lookup_requester_history", "lookup_internal_routing_note", ), 2: ( "lookup_related_ticket", "lookup_requester_history", "lookup_internal_routing_note", "lookup_queue_cluster_summary", ), 3: BASE_AVAILABLE_TOOLS, } FREE_INVESTIGATIONS_PER_TICKET = 1 EXTRA_INVESTIGATION_COST = 0.04 MAX_EXTRA_INVESTIGATION_PENALTY = 0.25 USEFUL_INVESTIGATION_REWARD = 0.03 USEFUL_REQUEST_INFO_REWARD = 0.025 INCIDENT_OPEN_REWARD = 0.03 REQUEST_INFO_CONTEXT_COMPLETION_BONUS = 0.02 PREMATURE_SUBMIT_PENALTY = 0.22 NONDEFAULT_HIDDEN_CONTEXT_PENALTY = 0.08 CONTEXT_COMPLETION_BONUS = 0.06 TRAJECTORY_CONTEXT_COMPLETION_BONUS = 0.04 PRIORITY_UNDERSHOOT_PENALTY = 0.03 SEVERE_PRIORITY_UNDERSHOOT_PENALTY = 0.07 DANGEROUS_RESOLUTION_PENALTY = 0.05 NONDEFAULT_ROUTING_FOLLOWTHROUGH_BONUS = 0.02 TEAM_CAPACITY_OVERFLOW_PENALTY = 0.08 HIGH_PRIORITY_SLOT_OVERFLOW_PENALTY = 0.06 ESCALATION_SLOT_OVERFLOW_PENALTY = 0.05 PLANNING_SUCCESS_BONUS = 0.05 INCIDENT_SLOT_OVERFLOW_PENALTY = 0.05 INCIDENT_GAP_PENALTY = 0.07 SLA_BREACH_PENALTY = 0.04 FOLLOW_UP_SPAWN_THRESHOLD = 0.72 MAX_DEFERS_PER_TICKET = 1 CLUSTER_STABILIZE_SCORE_THRESHOLD = 0.84 CLUSTER_DESTABILIZE_SCORE_THRESHOLD = 0.72 CLUSTER_INCIDENT_RELIEF_MULTIPLIER = 0.94 CLUSTER_OWNER_RELIEF_MULTIPLIER = 0.86 TASK_QUEUE_MANAGEMENT_WEIGHT: dict[int, float] = { 1: 0.0, 2: 0.2, 3: 0.32, } TASK3_INVESTIGATION_TOOL_PLAN: dict[str, tuple[str, ...]] = { "ticket-021": ("lookup_related_ticket", "lookup_requester_history"), "ticket-022": ("lookup_internal_routing_note",), "ticket-027": ("lookup_internal_routing_note",), "ticket-029": ("lookup_internal_routing_note",), "ticket-038": ("lookup_related_ticket", "lookup_requester_history"), "ticket-045": ("lookup_related_ticket", "lookup_requester_history"), "TKT-NONDEFAULT-001": ("lookup_internal_routing_note",), "TKT-NONDEFAULT-002": ("lookup_internal_routing_note",), "TKT-NONDEFAULT-003": ("lookup_internal_routing_note",), } HARD_TASK_DESCRIPTION_REDACTIONS: dict[str, str] = { "ticket-021": ( "Production checkout is still unstable after a recent fix. " "Additional routing context is available via investigation." ), "ticket-022": ( "Usage charges increased while the integration was failing. " "Additional routing context is available via investigation." ), "ticket-027": ( "A vendor offer arrived with a near-term deadline. " "Additional routing context is available via investigation." ), "ticket-029": ( "A team needs a large seat expansion right away. " "Additional routing context is available via investigation." ), "ticket-038": ( "A prior invoice discrepancy is still unresolved and now time-sensitive. " "Additional routing context is available via investigation." ), "ticket-045": ( "A company-wide suspension remains unresolved after repeated follow-ups. " "Additional routing context is available via investigation." ), "TKT-NONDEFAULT-001": ( "A user needs help with a billing-style question. " "Additional routing context is available via investigation." ), "TKT-NONDEFAULT-002": ( "A client compliance scan surfaced a product-specific issue. " "Additional routing context is available via investigation." ), "TKT-NONDEFAULT-003": ( "A contractor onboarding workflow is blocked by an account problem. " "Additional routing context is available via investigation." ), } HARD_TASK_TITLE_REDACTIONS: dict[str, str] = { "ticket-021": "Production workflow regression", "ticket-022": "Time-sensitive account review", "ticket-027": "Commercial workflow request", "ticket-029": "Urgent expansion request", "ticket-038": "Repeated invoice follow-up", "ticket-045": "Company-wide account issue", "TKT-NONDEFAULT-001": "Billing-style routing question", "TKT-NONDEFAULT-002": "Compliance ownership question", "TKT-NONDEFAULT-003": "Workflow blocker with hidden owner", } def _coerce_optional_int(value: Any, field_name: str) -> Optional[int]: if value is None or value == "": return None if isinstance(value, bool): raise ValueError(f"{field_name} must be an integer") if isinstance(value, int): return value try: return int(value) except (TypeError, ValueError) as exc: raise ValueError(f"{field_name} must be an integer") from exc class HelpdeskTicketRoutingEnvironment( Environment[HelpdeskTicketAction, HelpdeskTicketObservation, HelpdeskTicketState] ): SUPPORTS_CONCURRENT_SESSIONS = True def __init__(self) -> None: super().__init__() self._dataset = load_dataset() self._tickets_by_id = {ticket.ticket_id: ticket for ticket in self._dataset} self._rng = random.Random() self._queue: list[HelpdeskTicketRecord] = [] self._state = HelpdeskTicketState() # ------------------------------------------------------------------ # OpenEnv required interface # ------------------------------------------------------------------ def reset( self, seed: Optional[int] = None, episode_id: Optional[str] = None, **kwargs: Any, ) -> HelpdeskTicketObservation: normalized_seed = _coerce_optional_int(seed, "seed") task_id_value = _coerce_optional_int(kwargs.get("task_id", 1), "task_id") queue_size_value = _coerce_optional_int(kwargs.get("queue_size"), "queue_size") task_id = 1 if task_id_value is None else task_id_value task = get_task_definition(task_id) if queue_size_value is not None and queue_size_value < 1: raise ValueError("queue_size must be >= 1") if normalized_seed is not None: self._rng.seed(normalized_seed) if queue_size_value is None: queue_size = self._rng.randint(*QUEUE_SIZE_RANGE) else: queue_size = min(queue_size_value, len(self._dataset)) self._queue = self._sample_queue(task_id, min(queue_size, len(self._dataset))) ( team_capacity_initial, high_priority_slots_initial, escalation_slots_initial, incident_slots_initial, ) = self._initial_capacity_state_for_queue(task_id) self._state = HelpdeskTicketState( episode_id=episode_id or str(uuid.uuid4()), step_count=0, current_task_id=task_id, seed=normalized_seed, queue_ticket_ids=[t.ticket_id for t in self._queue], current_ticket_index=0, per_ticket_scores=[], total_reward=0.0, average_score_so_far=0.0, investigation_budget_remaining=queue_size * FREE_INVESTIGATIONS_PER_TICKET, investigation_penalty_applied=0.0, planning_penalty_applied=0.0, last_reward_components={}, ticket_tool_usage={}, team_capacity_initial=team_capacity_initial, team_capacity_remaining=dict(team_capacity_initial), high_priority_slots_initial=high_priority_slots_initial, high_priority_slots_remaining=high_priority_slots_initial, escalation_slots_initial=escalation_slots_initial, escalation_slots_remaining=escalation_slots_initial, incident_slots_initial=incident_slots_initial, incident_slots_remaining=incident_slots_initial, planning_penalty_total=0.0, capacity_pressure_tickets_resolved=0, cluster_stabilizations_total=0, cluster_destabilizations_total=0, ticket_request_info_usage={}, ticket_defer_counts={}, open_incident_ticket_ids=[], incident_actions_used=0, incident_gap_total=0.0, deferred_ticket_count=0, sla_breach_count=0, spawned_follow_up_ticket_ids=[], spawned_follow_up_source_ids=[], dynamic_queue_events=[], queue_management_score=0.0, queue_management_breakdown={}, ) return self._build_observation(task) def step( self, action: HelpdeskTicketAction, timeout_s: Optional[float] = None, **kwargs: Any, ) -> HelpdeskTicketObservation: if not self._queue or self._state.current_task_id is None: raise RuntimeError("Environment has not been reset.") idx = self._state.current_ticket_index if idx >= len(self._queue): raise RuntimeError("Episode already done — call reset().") current_ticket = self._queue[idx] task_id = self._state.current_task_id task = get_task_definition(task_id) if action.action_type not in self._available_action_types_for_task(task_id): raise ValueError( f"Unsupported action_type {action.action_type!r} for task {task_id}" ) if action.action_type == "investigate": return self._handle_investigation_action(task, current_ticket, action, idx) if action.action_type == "request_info": return self._handle_request_info_action(task, current_ticket, action, idx) if action.action_type == "defer": return self._handle_defer_action(task, current_ticket, action, idx) if action.action_type == "open_incident": return self._handle_open_incident_action(task, current_ticket, action, idx) submitted_fields = { f for f, v in action.model_dump(exclude_none=True).items() if v is not None and f not in {"action_type", "tool_name", "tool_target_ticket_id", "metadata"} } allowed = set(task["allowed_fields"]) extra_fields = submitted_fields - allowed if extra_fields: # Penalty: record an open-interval score, advance index, return penalty observation invalid_score = clamp_open_unit_interval(0.0) self._state.per_ticket_scores.append(invalid_score) self._state.average_score_so_far = self._current_average_score() self._state.step_count += 1 self._state.current_ticket_index += 1 is_done = self._state.current_ticket_index >= len(self._queue) self._state.done = is_done trajectory_reward = None trajectory_components = None investigation_penalty = self._compute_episode_penalty() if is_done else 0.0 rubric_details: dict[str, Any] = {} if is_done: trajectory_components = compute_trajectory_adjustments( self._state.per_ticket_scores, len(self._queue), self._state.step_count, completion_bonus=self._trajectory_consistency_bonus(), ) trajectory_reward = trajectory_components["final_reward"] final_reward, rubric_details = self._finalize_terminal_rubric( trajectory_reward ) self._state.total_reward = final_reward else: final_reward = clamp_open_unit_interval(0.0) reward_components = self._build_reward_components( ticket_score=invalid_score, field_breakdown={}, shaped_step_reward=0.0, reward_kind="trajectory" if is_done else "step_penalty", final_reward=final_reward, trajectory_reward=trajectory_reward, investigation_penalty=investigation_penalty, penalty_reason=f"extra_fields: {sorted(extra_fields)}", extra_details={ "trajectory_average_reward": ( trajectory_components["average_reward"] if trajectory_components is not None else None ), "trajectory_completion_bonus": ( trajectory_components["completion_bonus"] if trajectory_components is not None else None ), "trajectory_consistency_bonus": ( trajectory_components["consistency_bonus"] if trajectory_components is not None else None ), **rubric_details, }, ) self._state.history_entries.append( self._build_history_entry( current_ticket, predicted=action.model_dump(exclude_none=True), score=invalid_score, breakdown={}, queue_position=idx + 1, reward=final_reward, rubric_reward=final_reward if is_done else None, reward_kind="trajectory" if is_done else "step_penalty", penalty_reason=f"extra_fields: {sorted(extra_fields)}", reward_components=reward_components, ) ) self._state.last_step_reward = final_reward self._state.reward = final_reward self._state.investigation_penalty_applied = self._compute_episode_penalty() self._state.last_tool_result = None self._state.last_reward_components = reward_components return self._build_observation( task, done=is_done, reward=final_reward, rubric_reward=final_reward if is_done else None, ) previous_average = self._current_average_score() score, breakdown = grade_action(action, current_ticket, task_id) context_penalty, missing_required_count = self._submit_context_penalty(current_ticket) process_bonus = self._context_completion_bonus( current_ticket, missing_required_count=missing_required_count, score=score, ) risk_penalty = self._operational_risk_penalty( current_ticket, action, task_id=task_id, ) incident_gap_penalty = self._incident_gap_penalty(current_ticket, action) capacity_penalty, capacity_details = self._apply_capacity_usage( current_ticket, action, ) step_adjustments = compute_step_adjustments( score, previous_average=previous_average, process_bonus=process_bonus, risk_penalty=risk_penalty, ) step_reward = step_adjustments["final_reward"] is_done = (self._state.current_ticket_index + 1) >= len(self._queue) trajectory_reward = None trajectory_components = None investigation_penalty = 0.0 rubric_reward = None rubric_details: dict[str, Any] = {} if is_done: self._state.per_ticket_scores.append(score) self._state.average_score_so_far = self._current_average_score() self._state.step_count += 1 self._state.current_ticket_index += 1 trajectory_components = compute_trajectory_adjustments( self._state.per_ticket_scores, len(self._queue), self._state.step_count, completion_bonus=( self._trajectory_consistency_bonus() + self._planning_success_bonus() ), ) trajectory_reward = trajectory_components["final_reward"] rubric_reward, rubric_details = self._finalize_terminal_rubric( trajectory_reward ) final_reward = clamp_open_unit_interval( rubric_reward - context_penalty - capacity_penalty - incident_gap_penalty ) self._state.total_reward = rubric_reward investigation_penalty = self._compute_episode_penalty() else: self._state.per_ticket_scores.append(score) self._state.average_score_so_far = self._current_average_score() self._state.step_count += 1 self._state.current_ticket_index += 1 final_reward = clamp_open_unit_interval( step_reward - context_penalty - capacity_penalty - incident_gap_penalty ) spawned_follow_up_ticket_id = None if self._should_spawn_follow_up( current_ticket, score=score, context_penalty=context_penalty, incident_gap_penalty=incident_gap_penalty, ): spawned_follow_up = self._spawn_follow_up_ticket(current_ticket) spawned_follow_up_ticket_id = spawned_follow_up.ticket_id if is_done: is_done = False trajectory_reward = None trajectory_components = None rubric_reward = None rubric_details = {} final_reward = clamp_open_unit_interval( step_reward - context_penalty - capacity_penalty - incident_gap_penalty ) self._state.total_reward = 0.0 self._state.queue_management_score = 0.0 self._state.queue_management_breakdown = {} if incident_gap_penalty > 0.0: self._state.incident_gap_total = round( self._state.incident_gap_total + incident_gap_penalty, 4, ) cluster_stabilized_ticket_ids = self._stabilize_future_cluster_tickets( current_ticket, score=score, context_penalty=context_penalty, incident_gap_penalty=incident_gap_penalty, ) cluster_destabilized_ticket_ids: list[str] = [] if not cluster_stabilized_ticket_ids: cluster_destabilized_ticket_ids = self._destabilize_future_cluster_tickets( current_ticket, score=score, context_penalty=context_penalty, incident_gap_penalty=incident_gap_penalty, ) reward_components = self._build_reward_components( ticket_score=score, field_breakdown=breakdown, shaped_step_reward=step_reward, reward_kind="trajectory" if is_done else "step", final_reward=final_reward, milestone_adjustment=step_adjustments["milestone_adjustment"], trajectory_reward=trajectory_reward, investigation_penalty=investigation_penalty, extra_details={ "context_gap_penalty": context_penalty, "context_completion_bonus": process_bonus, "risk_penalty": risk_penalty, "incident_gap_penalty": incident_gap_penalty, "capacity_penalty": capacity_penalty, "delta_adjustment": step_adjustments["delta_adjustment"], "required_investigation_count": len(self._required_tools_for_ticket(current_ticket)), "hidden_context_remaining_count": missing_required_count, "hidden_context_revealed_count": len( self._used_tools_for_ticket(current_ticket.ticket_id) ), "planning_penalty_total": self._state.planning_penalty_total, "planning_penalty_applied": self._state.planning_penalty_applied, "planning_success_bonus": self._planning_success_bonus() if is_done else 0.0, "spawned_follow_up_ticket_id": spawned_follow_up_ticket_id, "cluster_stabilized_ticket_ids": cluster_stabilized_ticket_ids, "cluster_destabilized_ticket_ids": cluster_destabilized_ticket_ids, "rubric_reward": rubric_reward, "trajectory_average_reward": ( trajectory_components["average_reward"] if trajectory_components is not None else None ), "trajectory_completion_bonus": ( trajectory_components["completion_bonus"] if trajectory_components is not None else None ), "trajectory_consistency_bonus": ( trajectory_components["consistency_bonus"] if trajectory_components is not None else None ), **rubric_details, }, ) reward_components.update(capacity_details) history_entry = self._build_history_entry( current_ticket, predicted=action.model_dump(exclude_none=True), score=score, breakdown=breakdown, queue_position=idx + 1, reward=final_reward, rubric_reward=rubric_reward if is_done else None, reward_kind="trajectory" if is_done else "step", reward_components=reward_components, ) self._state.history_entries.append(history_entry) self._state.last_step_reward = final_reward self._state.reward = final_reward self._state.done = is_done self._state.investigation_penalty_applied = self._compute_episode_penalty() self._state.planning_penalty_applied = capacity_penalty self._state.last_tool_result = None self._state.last_reward_components = reward_components return self._build_observation( task, done=is_done, reward=final_reward, rubric_reward=rubric_reward if is_done else None, ) @property def state(self) -> HelpdeskTicketState: return self._state.model_copy(deep=True) # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _compute_episode_penalty(self) -> float: free_investigations = len(self._queue) * FREE_INVESTIGATIONS_PER_TICKET extra_investigations = max(0, self._state.investigation_steps - free_investigations) return min( MAX_EXTRA_INVESTIGATION_PENALTY, extra_investigations * EXTRA_INVESTIGATION_COST, ) def _apply_episode_economics(self, base_reward: float) -> float: penalty = self._compute_episode_penalty() return clamp_open_unit_interval(base_reward - penalty) def _current_average_score(self) -> float: if not self._state.per_ticket_scores: return 0.0 return sum(self._state.per_ticket_scores) / len(self._state.per_ticket_scores) def _queue_management_blend_weight(self, task_id: int | None = None) -> float: resolved_task_id = self._state.current_task_id if task_id is None else task_id return TASK_QUEUE_MANAGEMENT_WEIGHT.get(int(resolved_task_id or 1), 0.0) def _context_resolution_score(self) -> float: hidden_context_tickets = [ ticket for ticket in self._queue if self._required_tools_for_ticket(ticket, self._state.current_task_id) ] if not hidden_context_tickets: return 1.0 total_required = 0 total_resolved = 0 for ticket in hidden_context_tickets: progress = self._tool_progress_for_ticket(ticket) total_required += max(1, len(progress["required_tools"])) total_resolved += max( 0, len(progress["required_tools"]) - len(progress["remaining_tools"]), ) return round( max(0.0, min(1.0, total_resolved / max(1, total_required))), 4, ) def _follow_up_containment_score(self) -> float: follow_up_risk_tickets = [ ticket for ticket in self._queue if ticket.generated_from_ticket_id is None and ( self._requires_incident(ticket) or self._ticket_mentions_follow_up(ticket) or ticket.related_ticket_id is not None or ticket.priority in {"high", "critical"} ) ] if not follow_up_risk_tickets: return 1.0 spawn_rate = len(self._state.spawned_follow_up_ticket_ids) / max( 1, len(follow_up_risk_tickets), ) generated_follow_up_scores = [ float(entry.get("score", 0.0)) for entry in self._state.history_entries if entry.get("generated_from_ticket_id") is not None ] recovery_credit = ( sum(generated_follow_up_scores) / len(generated_follow_up_scores) if generated_follow_up_scores else 0.0 ) score = (1.0 - min(1.0, 0.7 * spawn_rate)) + ( min(1.0, spawn_rate) * 0.3 * recovery_credit ) return round(max(0.0, min(1.0, score)), 4) def _incident_management_score(self) -> float: if (self._state.current_task_id or 1) < 3: return 1.0 incident_sensitive_tickets = [ ticket for ticket in self._queue if ticket.generated_from_ticket_id is None and self._requires_incident(ticket) ] if not incident_sensitive_tickets: return 1.0 coverage_ratio = sum( 1 for ticket in incident_sensitive_tickets if self._incident_open_for_ticket(ticket) ) / max(1, len(incident_sensitive_tickets)) gap_ratio = min( 1.0, self._state.incident_gap_total / max( INCIDENT_GAP_PENALTY, len(incident_sensitive_tickets) * INCIDENT_GAP_PENALTY, ), ) score = (0.65 * (1.0 - gap_ratio)) + (0.35 * coverage_ratio) return round(max(0.0, min(1.0, score)), 4) def _sla_quality_score(self) -> float: breach_denominator = max(1, self._state.deferred_ticket_count or len(self._queue)) breach_ratio = min(1.0, self._state.sla_breach_count / breach_denominator) score = 1.0 - breach_ratio return round(max(0.0, min(1.0, score)), 4) def _planning_quality_score(self) -> float: if (self._state.current_task_id or 1) < 3: return 1.0 capacity_sensitive_count = sum( 1 for ticket in self._queue if self._ticket_has_alternate_route(ticket) ) route_coverage = ( min( 1.0, self._state.capacity_pressure_tickets_resolved / capacity_sensitive_count, ) if capacity_sensitive_count else 1.0 ) max_expected_penalty = max( 0.12, len(self._queue) * ( TEAM_CAPACITY_OVERFLOW_PENALTY + HIGH_PRIORITY_SLOT_OVERFLOW_PENALTY + ESCALATION_SLOT_OVERFLOW_PENALTY ), ) penalty_score = 1.0 - min( 1.0, self._state.planning_penalty_total / max_expected_penalty, ) score = (0.6 * penalty_score) + (0.4 * route_coverage) return round(max(0.0, min(1.0, score)), 4) def _cluster_coordination_score(self) -> float: if (self._state.current_task_id or 1) < 2: return 1.0 clustered_tickets = [ ticket for ticket in self._queue if ticket.service_cluster_id or ticket.related_ticket_id is not None or ticket.generated_from_ticket_id is not None or self._ticket_repeated_requester_count(ticket) >= 2 ] if not clustered_tickets: return 1.0 cluster_count = max(1, len(clustered_tickets)) destabilization_ratio = min( 1.0, self._state.cluster_destabilizations_total / cluster_count, ) stabilization_ratio = min( 1.0, self._state.cluster_stabilizations_total / cluster_count, ) score = 1.0 - (0.75 * destabilization_ratio) + (0.25 * stabilization_ratio) return round(max(0.0, min(1.0, score)), 4) def _queue_management_breakdown(self, trajectory_reward: float) -> tuple[float, dict[str, Any]]: task_id = int(self._state.current_task_id or 1) if task_id < 2: proxy_score = round(clamp_open_unit_interval(trajectory_reward), 4) return proxy_score, {"routing_trajectory_proxy": proxy_score} component_scores: dict[str, float] = { "context_resolution": self._context_resolution_score(), "cluster_coordination": self._cluster_coordination_score(), "follow_up_containment": self._follow_up_containment_score(), "sla_management": self._sla_quality_score(), } if task_id >= 3: component_scores["planning_quality"] = self._planning_quality_score() component_scores["incident_management"] = self._incident_management_score() component_weights = { "context_resolution": 0.2, "planning_quality": 0.24, "incident_management": 0.2, "cluster_coordination": 0.16, "follow_up_containment": 0.12, "sla_management": 0.08, } else: component_weights = { "context_resolution": 0.38, "cluster_coordination": 0.26, "follow_up_containment": 0.2, "sla_management": 0.16, } aggregate_score = round( sum( component_scores[name] * weight for name, weight in component_weights.items() ), 4, ) breakdown: dict[str, Any] = { name: round(score, 4) for name, score in component_scores.items() } breakdown["weights"] = { name: round(weight, 4) for name, weight in component_weights.items() } breakdown["cluster_stabilizations_total"] = self._state.cluster_stabilizations_total breakdown["cluster_destabilizations_total"] = self._state.cluster_destabilizations_total breakdown["spawned_follow_up_count"] = len(self._state.spawned_follow_up_ticket_ids) breakdown["sla_breach_count"] = self._state.sla_breach_count breakdown["planning_penalty_total"] = round(self._state.planning_penalty_total, 4) breakdown["incident_gap_total"] = round(self._state.incident_gap_total, 4) breakdown["aggregate"] = aggregate_score return aggregate_score, breakdown def _finalize_terminal_rubric( self, trajectory_reward: float, ) -> tuple[float, dict[str, Any]]: task_id = int(self._state.current_task_id or 1) queue_management_score, queue_management_breakdown = self._queue_management_breakdown( trajectory_reward ) route_weight = round(1.0 - self._queue_management_blend_weight(task_id), 4) queue_weight = round(self._queue_management_blend_weight(task_id), 4) blended_reward = clamp_open_unit_interval( (route_weight * trajectory_reward) + (queue_weight * queue_management_score) ) episode_economics_penalty = round(self._compute_episode_penalty(), 4) rubric_reward = self._apply_episode_economics(blended_reward) self._state.queue_management_score = queue_management_score self._state.queue_management_breakdown = dict(queue_management_breakdown) return rubric_reward, { "trajectory_routing_reward": trajectory_reward, "queue_management_score": queue_management_score, "queue_management_breakdown": dict(queue_management_breakdown), "route_objective_weight": route_weight, "queue_management_weight": queue_weight, "blended_objective_before_economics": blended_reward, "episode_economics_penalty": episode_economics_penalty, } def _available_action_types_for_task(self, task_id: int | None = None) -> list[str]: resolved_task_id = self._state.current_task_id if task_id is None else task_id return list(TASK_AVAILABLE_ACTION_TYPES.get(int(resolved_task_id or 1), ("submit",))) def _available_tools_for_task(self, task_id: int | None = None) -> list[str]: resolved_task_id = self._state.current_task_id if task_id is None else task_id return list(TASK_AVAILABLE_TOOLS.get(int(resolved_task_id or 1), ())) def _sync_queue_ticket_ids(self) -> None: self._state.queue_ticket_ids = [ticket.ticket_id for ticket in self._queue] def _cluster_sample_groups(self) -> list[list[HelpdeskTicketRecord]]: groups: dict[str, list[HelpdeskTicketRecord]] = {} for ticket in self._dataset: if not ticket.service_cluster_id: continue groups.setdefault(ticket.service_cluster_id, []).append(ticket) return [tickets for tickets in groups.values() if len(tickets) >= 2] def _cluster_ticket_order_key(self, ticket: HelpdeskTicketRecord) -> tuple[int, int, str]: priority_rank = {"critical": 0, "high": 1, "medium": 2, "low": 3} follow_up_depth = 1 if ticket.related_ticket_id or ticket.generated_from_ticket_id else 0 return ( follow_up_depth, priority_rank.get(ticket.priority, 4), ticket.ticket_id, ) def _sample_queue(self, task_id: int, queue_size: int) -> list[HelpdeskTicketRecord]: if queue_size <= 0: return [] if task_id not in {2, 3} or queue_size < 3: return self._rng.sample(self._dataset, queue_size) cluster_groups = self._cluster_sample_groups() if not cluster_groups: return self._rng.sample(self._dataset, queue_size) chosen_group = self._rng.choice(cluster_groups) max_cluster_take = min(len(chosen_group), 3 if queue_size >= 4 else 2) cluster_take = max(2, min(max_cluster_take, queue_size - 1)) cluster_subset = self._rng.sample(chosen_group, cluster_take) cluster_subset_ids = {ticket.ticket_id for ticket in cluster_subset} filler_count = max(0, queue_size - len(cluster_subset)) remaining_pool = [ ticket for ticket in self._dataset if ticket.ticket_id not in cluster_subset_ids ] filler_subset = ( self._rng.sample(remaining_pool, filler_count) if filler_count > 0 else [] ) ordered_cluster = sorted(cluster_subset, key=self._cluster_ticket_order_key) remaining_cluster = ordered_cluster[1:] ordered_queue: list[HelpdeskTicketRecord] = [] if ordered_cluster: ordered_queue.append(ordered_cluster[0]) while filler_subset or remaining_cluster: if filler_subset: ordered_queue.append(filler_subset.pop(0)) if remaining_cluster: ordered_queue.append(remaining_cluster.pop(0)) return ordered_queue[:queue_size] def _cluster_keys_for_ticket(self, ticket: HelpdeskTicketRecord) -> set[str]: keys: set[str] = set() if ticket.service_cluster_id: keys.add(f"cluster:{ticket.service_cluster_id}") if ticket.related_ticket_id: keys.add(f"ticket:{ticket.related_ticket_id}") if ticket.generated_from_ticket_id: keys.add(f"ticket:{ticket.generated_from_ticket_id}") if any( candidate.ticket_id != ticket.ticket_id and ( candidate.related_ticket_id == ticket.ticket_id or candidate.generated_from_ticket_id == ticket.ticket_id ) for candidate in self._tickets_by_id.values() ): keys.add(f"ticket:{ticket.ticket_id}") if self._ticket_repeated_requester_count(ticket) >= 2: keys.add(f"requester:{ticket.requester}") return keys def _tickets_share_cluster( self, first: HelpdeskTicketRecord, second: HelpdeskTicketRecord, ) -> bool: if first.ticket_id == second.ticket_id: return False return bool(self._cluster_keys_for_ticket(first) & self._cluster_keys_for_ticket(second)) def _future_cluster_ticket_indexes( self, ticket: HelpdeskTicketRecord, *, start_index: int, ) -> list[int]: indexes: list[int] = [] for index in range(start_index, len(self._queue)): future_ticket = self._queue[index] if self._tickets_share_cluster(ticket, future_ticket): indexes.append(index) return indexes def _ticket_queue_index(self, ticket: HelpdeskTicketRecord) -> int | None: for index, candidate in enumerate(self._queue): if candidate.ticket_id == ticket.ticket_id: return index return None def _cluster_summary( self, ticket: HelpdeskTicketRecord, *, start_index: int | None = None, ) -> dict[str, Any]: if start_index is None: ticket_index = self._ticket_queue_index(ticket) effective_start = ( ticket_index + 1 if ticket_index is not None else self._state.current_ticket_index + 1 ) else: effective_start = start_index future_indexes = self._future_cluster_ticket_indexes( ticket, start_index=effective_start, ) future_tickets = [self._queue[index] for index in future_indexes] return { "service_cluster_id": ticket.service_cluster_id, "cluster_keys": sorted(self._cluster_keys_for_ticket(ticket)), "future_cluster_ticket_count": len(future_tickets), "future_cluster_ticket_ids": [candidate.ticket_id for candidate in future_tickets], "future_high_priority_count": sum( 1 for candidate in future_tickets if candidate.priority in {"high", "critical"} ), "shared_requester_count": self._ticket_repeated_requester_count(ticket), "active_incident_cover": self._incident_open_for_ticket(ticket), } def _append_note(self, existing_note: str | None, addition: str | None) -> str | None: if not addition: return existing_note if not existing_note: return addition if addition in existing_note: return existing_note return f"{existing_note} {addition}" def _replace_queue_ticket( self, index: int, updated_ticket: HelpdeskTicketRecord, ) -> None: self._queue[index] = updated_ticket self._tickets_by_id[updated_ticket.ticket_id] = updated_ticket def _stabilize_future_cluster_tickets( self, current_ticket: HelpdeskTicketRecord, *, score: float, context_penalty: float, incident_gap_penalty: float, ) -> list[str]: if (self._state.current_task_id or 1) < 2: return [] if score < CLUSTER_STABILIZE_SCORE_THRESHOLD: return [] if context_penalty > 0.0 or incident_gap_penalty > 0.0: return [] future_indexes = self._future_cluster_ticket_indexes( current_ticket, start_index=self._state.current_ticket_index, ) if not future_indexes: return [] incident_cover = self._incident_open_for_ticket(current_ticket) relief_multiplier = ( CLUSTER_INCIDENT_RELIEF_MULTIPLIER if incident_cover else CLUSTER_OWNER_RELIEF_MULTIPLIER ) planning_note = ( "An earlier incident bridge is already active for this request cluster, so later " "updates can be acknowledged and coordinated instead of being re-triaged from scratch." if incident_cover else "An earlier ticket in this request cluster already has an accountable owner, " "so later updates can be coordinated rather than fully re-triaged." ) customer_note = ( "The requester said a single coordinated owner is acceptable as long as the update is linked to the existing workstream." ) updated_ticket_ids: list[str] = [] for index in future_indexes: future_ticket = self._queue[index] updates: dict[str, Any] = { "planning_note": self._append_note(future_ticket.planning_note, planning_note), "customer_update_note": self._append_note( future_ticket.customer_update_note, customer_note, ), } if ( not self._ticket_has_alternate_route(future_ticket) or future_ticket.alternate_route_score_multiplier < relief_multiplier ): alternate_priority = ( "high" if incident_cover and future_ticket.priority == "critical" else "medium" if incident_cover and future_ticket.priority == "high" else future_ticket.alternate_priority or future_ticket.priority ) updates.update( { "alternate_issue_type": ( future_ticket.alternate_issue_type or future_ticket.issue_type ), "alternate_priority": alternate_priority, "alternate_assignment_group": "service_desk", "alternate_resolution_action": ( "acknowledge" if incident_cover else "assign" ), "alternate_route_score_multiplier": relief_multiplier, } ) updated_ticket = future_ticket.model_copy(update=updates) self._replace_queue_ticket(index, updated_ticket) updated_ticket_ids.append(updated_ticket.ticket_id) if updated_ticket_ids: self._state.cluster_stabilizations_total += len(updated_ticket_ids) self._record_dynamic_queue_event( "stabilize_cluster", source_ticket_id=current_ticket.ticket_id, affected_ticket_ids=updated_ticket_ids, incident_cover=incident_cover, ) return updated_ticket_ids def _destabilize_future_cluster_tickets( self, current_ticket: HelpdeskTicketRecord, *, score: float, context_penalty: float, incident_gap_penalty: float, ) -> list[str]: if (self._state.current_task_id or 1) < 2: return [] if score >= CLUSTER_DESTABILIZE_SCORE_THRESHOLD: if context_penalty <= 0.0 and incident_gap_penalty <= 0.0: return [] future_indexes = self._future_cluster_ticket_indexes( current_ticket, start_index=self._state.current_ticket_index, ) if not future_indexes: return [] planning_note = ( "Earlier handling in this request cluster did not settle ownership, so this follow-on " "arrives with more urgency and may need firmer coordination." ) customer_note = ( "The requester is escalating because the earlier response did not fully resolve the blocker." ) updated_ticket_ids: list[str] = [] for index in future_indexes: future_ticket = self._queue[index] updates: dict[str, Any] = { "priority": self._escalate_priority_level(future_ticket.priority), "planning_note": self._append_note(future_ticket.planning_note, planning_note), "customer_update_note": self._append_note( future_ticket.customer_update_note, customer_note, ), "incident_recommended": ( future_ticket.incident_recommended or current_ticket.priority in {"high", "critical"} or self._requires_incident(current_ticket) ), } if future_ticket.related_ticket_id is None: updates["related_ticket_id"] = current_ticket.ticket_id updated_ticket = future_ticket.model_copy(update=updates) self._replace_queue_ticket(index, updated_ticket) updated_ticket_ids.append(updated_ticket.ticket_id) if updated_ticket_ids: self._state.cluster_destabilizations_total += len(updated_ticket_ids) self._record_dynamic_queue_event( "destabilize_cluster", source_ticket_id=current_ticket.ticket_id, affected_ticket_ids=updated_ticket_ids, ) return updated_ticket_ids def _ticket_has_alternate_route(self, ticket: HelpdeskTicketRecord) -> bool: return any( value is not None for value in ( ticket.alternate_issue_type, ticket.alternate_priority, ticket.alternate_assignment_group, ticket.alternate_resolution_action, ) ) and ticket.alternate_route_score_multiplier > 0.0 def _route_for_ticket( self, ticket: HelpdeskTicketRecord, *, use_alternate: bool = False, ) -> dict[str, str]: if use_alternate and self._ticket_has_alternate_route(ticket): return { "issue_type": ticket.alternate_issue_type or ticket.issue_type, "priority": ticket.alternate_priority or ticket.priority, "assignment_group": ( ticket.alternate_assignment_group or ticket.assignment_group ), "resolution_action": ( ticket.alternate_resolution_action or ticket.resolution_action ), } return { "issue_type": ticket.issue_type, "priority": ticket.priority, "assignment_group": ticket.assignment_group, "resolution_action": ticket.resolution_action, } def _route_for_action( self, ticket: HelpdeskTicketRecord, action: HelpdeskTicketAction, ) -> dict[str, str]: primary_route = self._route_for_ticket(ticket) return { "issue_type": action.issue_type or primary_route["issue_type"], "priority": action.priority or primary_route["priority"], "assignment_group": ( action.assignment_group or primary_route["assignment_group"] ), "resolution_action": ( action.resolution_action or primary_route["resolution_action"] ), } def _route_capacity_cost(self, route: dict[str, str]) -> dict[str, Any]: return { "assignment_group": route["assignment_group"], "team_slots": 1, "high_priority_slots": 1 if route["priority"] in {"high", "critical"} else 0, "escalation_slots": 1 if route["resolution_action"] in {"assign", "escalate"} else 0, } def _routing_options_for_ticket(self, ticket: HelpdeskTicketRecord) -> list[dict[str, Any]]: options = [ { "label": "primary", "score_multiplier": 1.0, **self._route_for_ticket(ticket), "capacity_cost": self._route_capacity_cost(self._route_for_ticket(ticket)), } ] if self._ticket_has_alternate_route(ticket): alternate_route = self._route_for_ticket(ticket, use_alternate=True) options.append( { "label": "alternate", "score_multiplier": ticket.alternate_route_score_multiplier, **alternate_route, "capacity_cost": self._route_capacity_cost(alternate_route), } ) return options def _initial_capacity_state_for_queue( self, task_id: int, ) -> tuple[dict[str, int], int, int, int]: if task_id != 3: return {}, 0, 0, 0 primary_group_demand: dict[str, int] = {} alternate_relief_by_group: dict[str, int] = {} all_groups: set[str] = set() high_priority_demand = 0 high_priority_relief = 0 escalation_demand = 0 escalation_relief = 0 incident_demand = 0 for ticket in self._queue: primary_route = self._route_for_ticket(ticket) all_groups.add(primary_route["assignment_group"]) primary_group_demand[primary_route["assignment_group"]] = ( primary_group_demand.get(primary_route["assignment_group"], 0) + 1 ) if primary_route["priority"] in {"high", "critical"}: high_priority_demand += 1 if primary_route["resolution_action"] in {"assign", "escalate"}: escalation_demand += 1 if self._requires_incident(ticket): incident_demand += 1 if self._ticket_has_alternate_route(ticket): alternate_route = self._route_for_ticket(ticket, use_alternate=True) all_groups.add(alternate_route["assignment_group"]) if alternate_route["assignment_group"] != primary_route["assignment_group"]: alternate_relief_by_group[primary_route["assignment_group"]] = ( alternate_relief_by_group.get( primary_route["assignment_group"], 0, ) + 1 ) if ( primary_route["priority"] in {"high", "critical"} and alternate_route["priority"] not in {"high", "critical"} ): high_priority_relief += 1 if ( primary_route["resolution_action"] in {"assign", "escalate"} and alternate_route["resolution_action"] not in {"assign", "escalate"} ): escalation_relief += 1 team_capacity_initial: dict[str, int] = {} for group in sorted(all_groups): demand = primary_group_demand.get(group, 0) relief = alternate_relief_by_group.get(group, 0) if demand <= 1: team_capacity_initial[group] = 1 if group in all_groups else 0 elif relief > 0: team_capacity_initial[group] = max(1, demand - 1) else: team_capacity_initial[group] = demand if high_priority_demand <= 1: high_priority_slots_initial = high_priority_demand elif high_priority_relief > 0: high_priority_slots_initial = max(1, high_priority_demand - 1) else: high_priority_slots_initial = high_priority_demand if escalation_demand <= 1: escalation_slots_initial = escalation_demand elif escalation_relief > 0: escalation_slots_initial = max(1, escalation_demand - 1) else: escalation_slots_initial = escalation_demand if incident_demand <= 1: incident_slots_initial = incident_demand else: incident_slots_initial = max(1, incident_demand - 1) return ( team_capacity_initial, high_priority_slots_initial, escalation_slots_initial, incident_slots_initial, ) def _future_queue_demand(self) -> dict[str, Any]: future_tickets = self._queue[self._state.current_ticket_index + 1 :] team_demand: dict[str, int] = {} high_priority_needed = 0 escalation_needed = 0 capacity_sensitive_tickets = 0 incident_needed = 0 clustered_follow_ons = 0 for ticket in future_tickets: route = self._route_for_ticket(ticket) team_demand[route["assignment_group"]] = ( team_demand.get(route["assignment_group"], 0) + 1 ) if route["priority"] in {"high", "critical"}: high_priority_needed += 1 if route["resolution_action"] in {"assign", "escalate"}: escalation_needed += 1 if self._ticket_has_alternate_route(ticket): capacity_sensitive_tickets += 1 if self._requires_incident(ticket): incident_needed += 1 if self._cluster_keys_for_ticket(ticket): clustered_follow_ons += 1 return { "remaining_ticket_count": len(future_tickets), "team_demand": team_demand, "high_priority_needed": high_priority_needed, "escalation_needed": escalation_needed, "capacity_sensitive_tickets": capacity_sensitive_tickets, "incident_needed": incident_needed, "clustered_follow_ons": clustered_follow_ons, } def _capacity_state_snapshot(self) -> dict[str, Any]: return { "team_capacity_remaining": dict(self._state.team_capacity_remaining), "team_capacity_initial": dict(self._state.team_capacity_initial), "high_priority_slots_remaining": self._state.high_priority_slots_remaining, "high_priority_slots_initial": self._state.high_priority_slots_initial, "escalation_slots_remaining": self._state.escalation_slots_remaining, "escalation_slots_initial": self._state.escalation_slots_initial, "incident_slots_remaining": self._state.incident_slots_remaining, "incident_slots_initial": self._state.incident_slots_initial, } def _planning_route_recommendation(self, ticket: HelpdeskTicketRecord) -> dict[str, Any]: primary_route = self._route_for_ticket(ticket) alternate_route = ( self._route_for_ticket(ticket, use_alternate=True) if self._ticket_has_alternate_route(ticket) else None ) future_demand = self._future_queue_demand() capacity_state = self._capacity_state_snapshot() def pressure_score(route: dict[str, str]) -> int: cost = self._route_capacity_cost(route) group_remaining = capacity_state["team_capacity_remaining"].get( route["assignment_group"], 1, ) group_pressure = max( 0, future_demand["team_demand"].get(route["assignment_group"], 0) + cost["team_slots"] - group_remaining, ) priority_pressure = max( 0, future_demand["high_priority_needed"] + cost["high_priority_slots"] - capacity_state["high_priority_slots_remaining"], ) escalation_pressure = max( 0, future_demand["escalation_needed"] + cost["escalation_slots"] - capacity_state["escalation_slots_remaining"], ) return group_pressure + priority_pressure + escalation_pressure primary_pressure = pressure_score(primary_route) alternate_pressure = ( pressure_score(alternate_route) if alternate_route is not None else primary_pressure ) preferred_label = ( "alternate" if alternate_route is not None and alternate_pressure < primary_pressure else "primary" ) return { "preferred_label": preferred_label, "primary_pressure": primary_pressure, "alternate_pressure": alternate_pressure, "capacity_state": capacity_state, "future_demand": future_demand, } def _ticket_is_capacity_sensitive(self, ticket: HelpdeskTicketRecord) -> bool: if self._state.current_task_id != 3 or not self._ticket_has_alternate_route(ticket): return False recommendation = self._planning_route_recommendation(ticket) return recommendation["preferred_label"] == "alternate" or any( value > 0 for value in ( recommendation["primary_pressure"], recommendation["alternate_pressure"], ) ) def _route_matches_alternate( self, ticket: HelpdeskTicketRecord, route: dict[str, str], ) -> bool: if not self._ticket_has_alternate_route(ticket): return False return route == self._route_for_ticket(ticket, use_alternate=True) def _apply_capacity_usage( self, ticket: HelpdeskTicketRecord, action: HelpdeskTicketAction, ) -> tuple[float, dict[str, Any]]: if self._state.current_task_id != 3: return 0.0, {} route = self._route_for_action(ticket, action) capacity_cost = self._route_capacity_cost(route) group = str(capacity_cost["assignment_group"]) if group not in self._state.team_capacity_remaining: self._state.team_capacity_remaining[group] = 1 self._state.team_capacity_initial.setdefault(group, 1) group_remaining = self._state.team_capacity_remaining[group] group_overflow = max(0, int(capacity_cost["team_slots"]) - group_remaining) self._state.team_capacity_remaining[group] = max( 0, group_remaining - int(capacity_cost["team_slots"]), ) high_priority_cost = int(capacity_cost["high_priority_slots"]) high_priority_overflow = max( 0, high_priority_cost - self._state.high_priority_slots_remaining, ) self._state.high_priority_slots_remaining = max( 0, self._state.high_priority_slots_remaining - high_priority_cost, ) escalation_cost = int(capacity_cost["escalation_slots"]) escalation_overflow = max( 0, escalation_cost - self._state.escalation_slots_remaining, ) self._state.escalation_slots_remaining = max( 0, self._state.escalation_slots_remaining - escalation_cost, ) capacity_penalty = round( group_overflow * TEAM_CAPACITY_OVERFLOW_PENALTY + high_priority_overflow * HIGH_PRIORITY_SLOT_OVERFLOW_PENALTY + escalation_overflow * ESCALATION_SLOT_OVERFLOW_PENALTY, 4, ) self._state.planning_penalty_total = round( self._state.planning_penalty_total + capacity_penalty, 4, ) self._state.planning_penalty_applied = capacity_penalty used_alternate_route = self._route_matches_alternate(ticket, route) if used_alternate_route: self._state.capacity_pressure_tickets_resolved += 1 return capacity_penalty, { "capacity_cost": capacity_cost, "group_overflow": group_overflow, "high_priority_overflow": high_priority_overflow, "escalation_overflow": escalation_overflow, "used_alternate_route": used_alternate_route, "capacity_state_after_action": self._capacity_state_snapshot(), } def _planning_success_bonus(self) -> float: if self._state.current_task_id != 3 or self._state.planning_penalty_total > 0.0: return 0.0 capacity_sensitive_count = sum( 1 for ticket in self._queue if self._ticket_has_alternate_route(ticket) ) if capacity_sensitive_count == 0: return 0.0 coverage = min( 1.0, self._state.capacity_pressure_tickets_resolved / capacity_sensitive_count, ) return round(PLANNING_SUCCESS_BONUS * coverage, 4) def _internal_routing_note_for_ticket( self, ticket: HelpdeskTicketRecord, ) -> str | None: if self._state.current_task_id != 3: return ticket.ambiguity_note or ticket.planning_note note_parts: list[str] = [] if ticket.ambiguity_note is not None: note_parts.append(ticket.ambiguity_note) if ticket.planning_note is not None: note_parts.append(ticket.planning_note) default_group = ISSUE_TYPE_TO_ASSIGNMENT_GROUP.get( ticket.issue_type, ticket.assignment_group, ) default_action = ISSUE_TYPE_TO_RESOLUTION_ACTION.get( ticket.issue_type, ticket.resolution_action, ) if ticket.assignment_group != default_group: note_parts.append( "Routing override: send this to " f"{ticket.assignment_group} rather than the default {default_group} queue." ) if ticket.resolution_action != default_action: note_parts.append( "Action override: use " f"{ticket.resolution_action} instead of the default {default_action} next step." ) if ticket.issue_type == "onboarding" and ticket.assignment_group == "service_desk": note_parts.append( "The onboarding workflow is blocked by an access dependency, so the unblocker owns the next move." ) if ( ticket.issue_type == "security_compliance" and ticket.assignment_group == "application_team" ): note_parts.append( "This compliance issue needs a product-team fix rather than a central security handoff." ) if ticket.issue_type == "billing_license" and ticket.assignment_group == "procurement": note_parts.append( "Treat this as commercial procurement work instead of routine license fulfillment." ) if not note_parts: return None return " ".join(note_parts) def _ticket_has_nondefault_routing(self, ticket: HelpdeskTicketRecord) -> bool: return ( ticket.assignment_group != ISSUE_TYPE_TO_ASSIGNMENT_GROUP.get(ticket.issue_type, ticket.assignment_group) or ticket.resolution_action != ISSUE_TYPE_TO_RESOLUTION_ACTION.get( ticket.issue_type, ticket.resolution_action ) ) def _ticket_mentions_follow_up(self, ticket: HelpdeskTicketRecord) -> bool: text = f"{ticket.title} {ticket.description}".lower() return any( phrase in text for phrase in ( "re:", "follow-up", "following up", "still", "third update", "reference ticket", "regression", "unresolved", ) ) def _ticket_text(self, ticket: HelpdeskTicketRecord) -> str: return f"{ticket.title} {ticket.description}".lower() def _requires_incident(self, ticket: HelpdeskTicketRecord) -> bool: if ticket.incident_recommended: return True text = self._ticket_text(ticket) return ( ticket.priority in {"high", "critical"} and ticket.issue_type in {"application_support", "identity_access", "security_compliance"} and any( phrase in text for phrase in ( "outage", "cannot log in", "login", "regression", "unstable", "blocked", "lockout", "company-wide", "production", "unresolved", ) ) ) def _incident_open_for_ticket(self, ticket: HelpdeskTicketRecord) -> bool: related_ids = {ticket.ticket_id} if ticket.related_ticket_id: related_ids.add(ticket.related_ticket_id) if ticket.generated_from_ticket_id: related_ids.add(ticket.generated_from_ticket_id) if any(ticket_id in self._state.open_incident_ticket_ids for ticket_id in related_ids): return True ticket_cluster_keys = self._cluster_keys_for_ticket(ticket) if not ticket_cluster_keys: return False for open_ticket_id in self._state.open_incident_ticket_ids: open_ticket = self._tickets_by_id.get(open_ticket_id) if open_ticket is None: continue if ticket_cluster_keys & self._cluster_keys_for_ticket(open_ticket): return True return False def _request_info_note_for_ticket(self, ticket: HelpdeskTicketRecord) -> str | None: note_parts: list[str] = [] if ticket.customer_update_note: note_parts.append(ticket.customer_update_note) if ticket.related_ticket_id is not None: note_parts.append( "The requester confirmed this is connected to the earlier case and wants a single accountable owner." ) if self._ticket_has_nondefault_routing(ticket): note_parts.append( "The requester clarified that the blocker owner matters more than the superficial request label." ) if self._ticket_has_alternate_route(ticket): note_parts.append( "Operations said an acknowledged fallback path is acceptable if the preferred queue is saturated." ) if self._requires_incident(ticket): note_parts.append( "Stakeholders asked for incident-style coordination because the issue is still operationally active." ) if not note_parts: return None return " ".join(note_parts) def _request_info_used(self, ticket_id: str) -> bool: return self._state.ticket_request_info_usage.get(ticket_id, 0) > 0 def _defer_count(self, ticket_id: str) -> int: return self._state.ticket_defer_counts.get(ticket_id, 0) def _record_dynamic_queue_event(self, event_type: str, **details: Any) -> None: self._state.dynamic_queue_events.append({"event_type": event_type, **details}) def _escalate_priority_level(self, priority: str) -> str: if priority == "low": return "medium" if priority == "medium": return "high" return "critical" def _escalate_ticket_after_delay( self, ticket: HelpdeskTicketRecord, *, defer_count: int, ) -> HelpdeskTicketRecord: escalated_priority = self._escalate_priority_level(ticket.priority) description_suffix = ( " The ticket was deferred earlier in the queue and now needs firmer ownership." ) customer_update = ( ticket.customer_update_note or "The requester followed up after the delay and wants a committed owner." ) return ticket.model_copy( update={ "priority": escalated_priority, "title": ( ticket.title if ticket.title.lower().startswith("re:") else f"Re: {ticket.title}" ), "description": f"{ticket.description}{description_suffix}", "customer_update_note": customer_update, } ) def _should_spawn_follow_up( self, ticket: HelpdeskTicketRecord, *, score: float, context_penalty: float, incident_gap_penalty: float, ) -> bool: task_id = int(self._state.current_task_id or 1) if task_id < 2: return False if ticket.generated_from_ticket_id is not None: return False if ticket.ticket_id in self._state.spawned_follow_up_source_ids: return False follow_up_risk = ( self._requires_incident(ticket) or self._ticket_mentions_follow_up(ticket) or ticket.related_ticket_id is not None or ticket.priority in {"high", "critical"} or self._cluster_summary(ticket)["future_cluster_ticket_count"] > 0 ) if not follow_up_risk: return False if task_id == 2 and not ( ticket.related_ticket_id is not None or self._ticket_mentions_follow_up(ticket) or self._cluster_summary(ticket)["future_cluster_ticket_count"] > 0 or self._ticket_repeated_requester_count(ticket) >= 2 ): return False return ( score < FOLLOW_UP_SPAWN_THRESHOLD or (context_penalty >= 0.15 and score < 0.9) or incident_gap_penalty > 0.0 ) def _spawn_follow_up_ticket(self, ticket: HelpdeskTicketRecord) -> HelpdeskTicketRecord: follow_up_ticket = HelpdeskTicketRecord( ticket_id=f"{ticket.ticket_id}-followup", title=( ticket.title if ticket.title.lower().startswith("re:") else f"Re: {ticket.title}" ), requester=ticket.requester, description=( "The earlier handling did not fully resolve the issue. The requester is " f"following up on {ticket.ticket_id} and needs a single accountable owner now." ), issue_type=ticket.issue_type, priority=( "critical" if ticket.priority in {"high", "critical"} else self._escalate_priority_level(ticket.priority) ), assignment_group=ticket.assignment_group, resolution_action=( "escalate" if ticket.priority in {"high", "critical"} or self._requires_incident(ticket) else ticket.resolution_action ), ambiguity_note=( ticket.ambiguity_note or "Prior routing did not settle ownership; route to the team that can actually unblock the issue." ), related_ticket_id=ticket.ticket_id, planning_note=ticket.planning_note, customer_update_note=( "The requester said the last response did not resolve the blocker and wants an accountable next owner." ), incident_recommended=self._requires_incident(ticket), generated_from_ticket_id=ticket.ticket_id, service_cluster_id=ticket.service_cluster_id or ticket.ticket_id, ) self._queue.append(follow_up_ticket) self._tickets_by_id[follow_up_ticket.ticket_id] = follow_up_ticket self._sync_queue_ticket_ids() self._state.spawned_follow_up_ticket_ids.append(follow_up_ticket.ticket_id) self._state.spawned_follow_up_source_ids.append(ticket.ticket_id) self._record_dynamic_queue_event( "spawn_follow_up", source_ticket_id=ticket.ticket_id, follow_up_ticket_id=follow_up_ticket.ticket_id, ) return follow_up_ticket def _ticket_repeated_requester_count(self, ticket: HelpdeskTicketRecord) -> int: return sum( 1 for candidate in self._tickets_by_id.values() if candidate.requester == ticket.requester ) def _tool_has_available_context( self, ticket: HelpdeskTicketRecord, tool_name: str, ) -> bool: if tool_name == "lookup_related_ticket": return ( ticket.related_ticket_id is not None and ticket.related_ticket_id in self._tickets_by_id ) if tool_name == "lookup_requester_history": return self._ticket_repeated_requester_count(ticket) >= 2 if tool_name == "lookup_internal_routing_note": return self._internal_routing_note_for_ticket(ticket) is not None if tool_name == "lookup_queue_capacity_forecast": return self._state.current_task_id == 3 and ( self._ticket_has_alternate_route(ticket) or self._future_queue_demand()["remaining_ticket_count"] > 0 ) if tool_name == "lookup_queue_cluster_summary": if (self._state.current_task_id or 1) < 2: return False cluster_summary = self._cluster_summary(ticket) return ( cluster_summary["future_cluster_ticket_count"] > 0 or cluster_summary["shared_requester_count"] > 1 ) return False def _required_tools_for_ticket( self, ticket: HelpdeskTicketRecord, task_id: int | None = None, ) -> list[str]: resolved_task_id = self._state.current_task_id if task_id is None else task_id if resolved_task_id is None or resolved_task_id < 2: return [] required_tools: list[str] = list(TASK3_INVESTIGATION_TOOL_PLAN.get(ticket.ticket_id, ())) if ticket.related_ticket_id is not None and "lookup_related_ticket" not in required_tools: required_tools.append("lookup_related_ticket") if ( self._internal_routing_note_for_ticket(ticket) is not None and "lookup_internal_routing_note" not in required_tools ): required_tools.append("lookup_internal_routing_note") if ( self._ticket_repeated_requester_count(ticket) >= 2 and ( ticket.related_ticket_id is not None or self._ticket_mentions_follow_up(ticket) or self._ticket_has_nondefault_routing(ticket) or ticket.priority in {"high", "critical"} ) and "lookup_requester_history" not in required_tools ): required_tools.append("lookup_requester_history") if ( resolved_task_id == 3 and self._ticket_is_capacity_sensitive(ticket) and "lookup_queue_capacity_forecast" not in required_tools ): required_tools.append("lookup_queue_capacity_forecast") ticket_index = self._ticket_queue_index(ticket) cluster_start_index = ( ticket_index + 1 if ticket_index is not None else self._state.current_ticket_index + 1 ) if resolved_task_id == 3: cluster_summary = self._cluster_summary( ticket, start_index=cluster_start_index, ) if ( cluster_summary["future_cluster_ticket_count"] > 0 and "lookup_queue_cluster_summary" not in required_tools and ( self._requires_incident(ticket) or cluster_summary["future_high_priority_count"] > 0 or cluster_summary["shared_requester_count"] > 1 ) ): required_tools.append("lookup_queue_cluster_summary") if resolved_task_id == 2: cluster_summary = self._cluster_summary( ticket, start_index=cluster_start_index, ) if ( cluster_summary["future_cluster_ticket_count"] > 0 and "lookup_queue_cluster_summary" not in required_tools and ( ticket.related_ticket_id is not None or cluster_summary["shared_requester_count"] > 1 or self._ticket_mentions_follow_up(ticket) ) ): required_tools.append("lookup_queue_cluster_summary") filtered_required_tools: list[str] = [] allowed_tool_set = set(self._available_tools_for_task(resolved_task_id)) for tool_name in required_tools: if tool_name in filtered_required_tools: continue if tool_name not in allowed_tool_set: continue if self._tool_has_available_context(ticket, tool_name): filtered_required_tools.append(tool_name) return filtered_required_tools def _recommended_operational_actions(self, ticket: HelpdeskTicketRecord) -> list[str]: recommended_actions: list[str] = [] available_action_types = set(self._available_action_types_for_task()) cluster_summary = self._cluster_summary(ticket) if ( "request_info" in available_action_types and self._request_info_note_for_ticket(ticket) is not None and not self._request_info_used(ticket.ticket_id) ): recommended_actions.append("request_info") if ( "open_incident" in available_action_types and self._requires_incident(ticket) and not self._incident_open_for_ticket(ticket) ): recommended_actions.append("open_incident") if ( "defer" in available_action_types and self._defer_count(ticket.ticket_id) < MAX_DEFERS_PER_TICKET and self._state.current_ticket_index < len(self._queue) - 1 and ticket.priority not in {"high", "critical"} and ( bool(self._remaining_tools_for_ticket(ticket)) or self._ticket_is_capacity_sensitive(ticket) or self._request_info_note_for_ticket(ticket) is not None or cluster_summary["future_cluster_ticket_count"] > 0 ) ): recommended_actions.append("defer") return recommended_actions def _used_tools_for_ticket(self, ticket_id: str) -> list[str]: return list(self._state.ticket_tool_usage.get(ticket_id, [])) def _remaining_tools_for_ticket( self, ticket: HelpdeskTicketRecord, task_id: int | None = None, ) -> list[str]: required_tools = self._required_tools_for_ticket(ticket, task_id) used_tools = set(self._used_tools_for_ticket(ticket.ticket_id)) return [tool for tool in required_tools if tool not in used_tools] def _record_tool_usage(self, ticket_id: str, tool_name: str) -> None: used = self._state.ticket_tool_usage.setdefault(ticket_id, []) if tool_name not in used: used.append(tool_name) def _tool_progress_for_ticket(self, ticket: HelpdeskTicketRecord) -> dict[str, Any]: required_tools = self._required_tools_for_ticket(ticket) revealed_tools = self._used_tools_for_ticket(ticket.ticket_id) remaining_tools = self._remaining_tools_for_ticket(ticket) total_required = max(1, len(required_tools)) request_info_used = self._request_info_used(ticket.ticket_id) operational_actions = self._recommended_operational_actions(ticket) return { "required_tools": required_tools, "revealed_tools": revealed_tools, "remaining_tools": remaining_tools, "revealed_count": len(revealed_tools), "remaining_count": len(remaining_tools), "completeness": round(len(revealed_tools) / total_required, 2), "request_info_used": request_info_used, "recommended_operational_actions": operational_actions, } def _default_redacted_description(self, ticket: HelpdeskTicketRecord) -> str: cluster_summary = self._cluster_summary(ticket) if cluster_summary["future_cluster_ticket_count"] > 0: return ( "This ticket is part of a broader queue cluster and the best next step depends " "on downstream consequences. Additional routing context is available via investigation." ) if ticket.related_ticket_id is not None: return ( "This is a follow-up operational issue. " "Additional routing context is available via investigation." ) if self._internal_routing_note_for_ticket(ticket) is not None: return ( "The visible request is not enough to choose the final owner and next step. " "Additional routing context is available via investigation." ) if self._ticket_has_alternate_route(ticket): return ( "The queue is under resource pressure and this ticket may support more than " "one acceptable routing path. Additional planning context is available via investigation." ) if self._ticket_has_nondefault_routing(ticket): return ( "The visible request looks straightforward, but the decisive routing detail is hidden until investigation." ) return ( "Additional routing context is available via investigation before final submission." ) def _default_redacted_title(self, ticket: HelpdeskTicketRecord) -> str: if self._cluster_summary(ticket)["future_cluster_ticket_count"] > 0: return "Clustered queue decision with hidden downstream impact" if ticket.related_ticket_id is not None: return "Follow-up request with hidden routing context" if self._internal_routing_note_for_ticket(ticket) is not None: return "Routing clarification required" if self._ticket_has_alternate_route(ticket): return "Capacity-sensitive routing decision" if self._ticket_mentions_follow_up(ticket): return "Priority support follow-up" return "Helpdesk routing decision" def _visible_title(self, ticket: HelpdeskTicketRecord) -> str: if self._state.current_task_id in {2, 3} and self._remaining_tools_for_ticket(ticket): return HARD_TASK_TITLE_REDACTIONS.get( ticket.ticket_id, self._default_redacted_title(ticket), ) return ticket.title def _visible_description(self, ticket: HelpdeskTicketRecord) -> str: if self._state.current_task_id in {2, 3} and self._remaining_tools_for_ticket(ticket): return HARD_TASK_DESCRIPTION_REDACTIONS.get( ticket.ticket_id, self._default_redacted_description(ticket), ) return ticket.description def _submit_context_penalty(self, ticket: HelpdeskTicketRecord) -> tuple[float, int]: progress = self._tool_progress_for_ticket(ticket) required_tools = progress["required_tools"] remaining_tools = progress["remaining_tools"] if not required_tools or not remaining_tools: return 0.0, 0 penalty = PREMATURE_SUBMIT_PENALTY * ( len(remaining_tools) / max(1, len(required_tools)) ) if self._ticket_has_nondefault_routing(ticket): penalty += NONDEFAULT_HIDDEN_CONTEXT_PENALTY * ( len(remaining_tools) / max(1, len(required_tools)) ) return round(min(0.45, penalty), 4), len(remaining_tools) def _context_completion_bonus( self, ticket: HelpdeskTicketRecord, *, missing_required_count: int, score: float, ) -> float: if not self._required_tools_for_ticket(ticket): return 0.0 if missing_required_count != 0 or score < 0.75: return 0.0 bonus = CONTEXT_COMPLETION_BONUS if self._ticket_has_nondefault_routing(ticket): bonus += NONDEFAULT_ROUTING_FOLLOWTHROUGH_BONUS return bonus def _trajectory_consistency_bonus(self) -> float: if not self._queue: return 0.0 hidden_context_tickets = [ ticket for ticket in self._queue if self._required_tools_for_ticket(ticket) ] if not hidden_context_tickets: return 0.0 resolved = sum( 1 for ticket in hidden_context_tickets if not self._remaining_tools_for_ticket(ticket) ) resolution_rate = resolved / len(hidden_context_tickets) return round(TRAJECTORY_CONTEXT_COMPLETION_BONUS * resolution_rate, 4) def _operational_risk_penalty( self, ticket: HelpdeskTicketRecord, action: HelpdeskTicketAction, *, task_id: int, ) -> float: if task_id < 2 or action.priority is None: priority_penalty = 0.0 else: priority_rank = {"critical": 3, "high": 2, "medium": 1, "low": 0} expected_rank = priority_rank.get(ticket.priority, 0) predicted_rank = priority_rank.get(action.priority, 0) gap = expected_rank - predicted_rank if gap >= 2: priority_penalty = SEVERE_PRIORITY_UNDERSHOOT_PENALTY elif gap == 1 and ticket.priority in {"high", "critical"}: priority_penalty = PRIORITY_UNDERSHOOT_PENALTY else: priority_penalty = 0.0 resolution_penalty = 0.0 if task_id == 3 and action.resolution_action is not None: if ( ticket.issue_type in {"identity_access", "application_support", "security_compliance"} and ticket.priority in {"high", "critical"} and action.resolution_action == "acknowledge" ): resolution_penalty += DANGEROUS_RESOLUTION_PENALTY if ticket.issue_type == "spam_phishing" and action.resolution_action == "fulfill": resolution_penalty += PRIORITY_UNDERSHOOT_PENALTY return round(priority_penalty + resolution_penalty, 4) def _incident_gap_penalty( self, ticket: HelpdeskTicketRecord, action: HelpdeskTicketAction, ) -> float: if self._state.current_task_id != 3: return 0.0 if not self._requires_incident(ticket): return 0.0 if self._incident_open_for_ticket(ticket): return 0.0 if action.resolution_action in {"escalate", "assign"}: return round(INCIDENT_GAP_PENALTY / 2, 4) return INCIDENT_GAP_PENALTY def _build_reward_components( self, *, ticket_score: float, field_breakdown: dict[str, float], shaped_step_reward: float, reward_kind: str, final_reward: float, milestone_adjustment: float = 0.0, trajectory_reward: float | None = None, investigation_penalty: float = 0.0, penalty_reason: str | None = None, extra_details: dict[str, Any] | None = None, ) -> dict[str, Any]: components: dict[str, Any] = { "reward_kind": reward_kind, "ticket_score": ticket_score, "field_breakdown": field_breakdown, "shaped_step_reward": shaped_step_reward, "milestone_adjustment": milestone_adjustment, "final_reward": final_reward, "average_score_so_far": self._current_average_score(), "investigation_penalty_applied": investigation_penalty, } if trajectory_reward is not None: components["trajectory_reward"] = trajectory_reward if penalty_reason is not None: components["penalty_reason"] = penalty_reason if extra_details: components.update(extra_details) return components def _lookup_related_ticket( self, current_ticket: HelpdeskTicketRecord, target_ticket_id: str | None, ) -> dict[str, Any]: target_id = target_ticket_id or current_ticket.related_ticket_id if target_id is None: return { "tool_name": "lookup_related_ticket", "found": False, "message": "Current ticket has no linked related_ticket_id.", } related_ticket = self._tickets_by_id.get(target_id) if related_ticket is None: return { "tool_name": "lookup_related_ticket", "found": False, "message": f"Ticket {target_id!r} was not found in the dataset.", } return { "tool_name": "lookup_related_ticket", "found": True, "ticket": { "ticket_id": related_ticket.ticket_id, "title": related_ticket.title, "requester": related_ticket.requester, "description": related_ticket.description, "issue_type": related_ticket.issue_type, "priority": related_ticket.priority, "assignment_group": related_ticket.assignment_group, "resolution_action": related_ticket.resolution_action, }, } def _lookup_requester_history(self, current_ticket: HelpdeskTicketRecord) -> dict[str, Any]: matches = [ { "ticket_id": ticket.ticket_id, "title": ticket.title, "issue_type": ticket.issue_type, "priority": ticket.priority, "assignment_group": ticket.assignment_group, "resolution_action": ticket.resolution_action, } for ticket in self._tickets_by_id.values() if ticket.requester == current_ticket.requester and ticket.ticket_id != current_ticket.ticket_id ] return { "tool_name": "lookup_requester_history", "found": bool(matches), "requester": current_ticket.requester, "matches": matches, } def _lookup_internal_routing_note(self, current_ticket: HelpdeskTicketRecord) -> dict[str, Any]: routing_note = self._internal_routing_note_for_ticket(current_ticket) found = routing_note is not None return { "tool_name": "lookup_internal_routing_note", "found": found, "ticket_id": current_ticket.ticket_id, "routing_note": routing_note if found else "", } def _lookup_queue_capacity_forecast( self, current_ticket: HelpdeskTicketRecord, ) -> dict[str, Any]: recommendation = self._planning_route_recommendation(current_ticket) routing_options = self._routing_options_for_ticket(current_ticket) return { "tool_name": "lookup_queue_capacity_forecast", "found": True, "ticket_id": current_ticket.ticket_id, "preferred_route_label": recommendation["preferred_label"], "primary_pressure": recommendation["primary_pressure"], "alternate_pressure": recommendation["alternate_pressure"], "capacity_state": recommendation["capacity_state"], "future_queue_demand": recommendation["future_demand"], "routing_options": routing_options, "incident_recommended": self._requires_incident(current_ticket), } def _lookup_queue_cluster_summary( self, current_ticket: HelpdeskTicketRecord, ) -> dict[str, Any]: cluster_summary = self._cluster_summary(current_ticket) return { "tool_name": "lookup_queue_cluster_summary", "found": cluster_summary["future_cluster_ticket_count"] > 0 or cluster_summary["shared_requester_count"] > 1, "ticket_id": current_ticket.ticket_id, **cluster_summary, } def _run_investigation_tool( self, current_ticket: HelpdeskTicketRecord, tool_name: str, target_ticket_id: str | None, ) -> dict[str, Any]: if tool_name == "lookup_related_ticket": return self._lookup_related_ticket(current_ticket, target_ticket_id) if tool_name == "lookup_requester_history": return self._lookup_requester_history(current_ticket) if tool_name == "lookup_internal_routing_note": return self._lookup_internal_routing_note(current_ticket) if tool_name == "lookup_queue_capacity_forecast": return self._lookup_queue_capacity_forecast(current_ticket) if tool_name == "lookup_queue_cluster_summary": return self._lookup_queue_cluster_summary(current_ticket) raise ValueError(f"Unsupported tool_name: {tool_name}") def _handle_investigation_action( self, task: dict, current_ticket: HelpdeskTicketRecord, action: HelpdeskTicketAction, idx: int, ) -> HelpdeskTicketObservation: if action.tool_name is None: raise ValueError("Investigate actions require tool_name") if action.tool_name not in self._available_tools_for_task(): raise ValueError(f"Unsupported tool_name for current task: {action.tool_name}") submitted_fields = { field for field in ("issue_type", "priority", "assignment_group", "resolution_action") if getattr(action, field) is not None } if submitted_fields: raise ValueError( "Investigate actions cannot include submit fields: " f"{sorted(submitted_fields)}" ) tool_result = self._run_investigation_tool( current_ticket, action.tool_name, action.tool_target_ticket_id, ) required_tools = self._required_tools_for_ticket(current_ticket) already_used = action.tool_name in self._used_tools_for_ticket(current_ticket.ticket_id) useful_investigation = ( action.tool_name in required_tools and not already_used and bool(tool_result.get("found", True)) ) self._record_tool_usage(current_ticket.ticket_id, action.tool_name) self._state.step_count += 1 self._state.investigation_steps += 1 self._state.investigation_budget_remaining = max( 0, self._state.investigation_budget_remaining - 1, ) self._state.last_tool_result = tool_result investigation_reward = USEFUL_INVESTIGATION_REWARD if useful_investigation else 0.0 investigation_score = 0.0 self._state.last_step_reward = investigation_reward self._state.reward = investigation_reward self._state.done = False self._state.investigation_penalty_applied = self._compute_episode_penalty() progress = self._tool_progress_for_ticket(current_ticket) reward_components = self._build_reward_components( ticket_score=investigation_score, field_breakdown={}, shaped_step_reward=investigation_reward, reward_kind="investigation", final_reward=investigation_reward, investigation_penalty=self._state.investigation_penalty_applied, extra_details={ "new_context_revealed": useful_investigation, "required_investigation_count": len(required_tools), "hidden_context_remaining_count": progress["remaining_count"], "hidden_context_revealed_count": progress["revealed_count"], "context_completeness": progress["completeness"], "tool_name": action.tool_name, }, ) self._state.history_entries.append( self._build_history_entry( current_ticket, predicted=action.model_dump(exclude_none=True), score=investigation_score, breakdown={}, queue_position=idx + 1, reward=investigation_reward, reward_kind="investigation", tool_result=tool_result, reward_components=reward_components, ) ) self._state.last_reward_components = reward_components return self._build_observation(task, done=False, reward=investigation_reward) def _handle_request_info_action( self, task: dict, current_ticket: HelpdeskTicketRecord, action: HelpdeskTicketAction, idx: int, ) -> HelpdeskTicketObservation: submitted_fields = { field for field in ("issue_type", "priority", "assignment_group", "resolution_action") if getattr(action, field) is not None } if submitted_fields: raise ValueError( "request_info actions cannot include submit fields: " f"{sorted(submitted_fields)}" ) ticket_id = current_ticket.ticket_id note = self._request_info_note_for_ticket(current_ticket) already_used = self._request_info_used(ticket_id) useful_request = note is not None and not already_used self._state.ticket_request_info_usage[ticket_id] = ( self._state.ticket_request_info_usage.get(ticket_id, 0) + 1 ) self._state.step_count += 1 self._state.investigation_steps += 1 self._state.investigation_budget_remaining = max( 0, self._state.investigation_budget_remaining - 1, ) request_reward = USEFUL_REQUEST_INFO_REWARD if useful_request else 0.0 tool_result = { "action_type": "request_info", "found": useful_request, "ticket_id": ticket_id, "customer_update_note": note if useful_request else "", } self._state.last_tool_result = tool_result self._state.last_step_reward = request_reward self._state.reward = request_reward self._state.done = False self._state.investigation_penalty_applied = self._compute_episode_penalty() progress = self._tool_progress_for_ticket(current_ticket) reward_components = self._build_reward_components( ticket_score=0.0, field_breakdown={}, shaped_step_reward=request_reward, reward_kind="operational", final_reward=request_reward, investigation_penalty=self._state.investigation_penalty_applied, extra_details={ "operational_action": "request_info", "new_context_revealed": useful_request, "customer_update_visible": useful_request, "hidden_context_remaining_count": progress["remaining_count"], "context_completeness": progress["completeness"], }, ) self._state.history_entries.append( self._build_history_entry( current_ticket, predicted=action.model_dump(exclude_none=True), score=0.0, breakdown={}, queue_position=idx + 1, reward=request_reward, reward_kind="operational", tool_result=tool_result, reward_components=reward_components, ) ) self._state.last_reward_components = reward_components return self._build_observation(task, done=False, reward=request_reward) def _handle_defer_action( self, task: dict, current_ticket: HelpdeskTicketRecord, action: HelpdeskTicketAction, idx: int, ) -> HelpdeskTicketObservation: submitted_fields = { field for field in ("issue_type", "priority", "assignment_group", "resolution_action") if getattr(action, field) is not None } if submitted_fields: raise ValueError( "defer actions cannot include submit fields: " f"{sorted(submitted_fields)}" ) ticket_id = current_ticket.ticket_id existing_count = self._defer_count(ticket_id) defer_allowed = ( existing_count < MAX_DEFERS_PER_TICKET and idx < len(self._queue) - 1 and self._state.current_task_id in {2, 3} ) defer_count = existing_count + 1 reward = 0.0 sla_risk = current_ticket.priority in {"high", "critical"} or self._ticket_mentions_follow_up( current_ticket ) moved_ticket = current_ticket if defer_allowed: self._state.ticket_defer_counts[ticket_id] = defer_count self._state.deferred_ticket_count += 1 if sla_risk: self._state.sla_breach_count += 1 moved_ticket = self._escalate_ticket_after_delay( current_ticket, defer_count=defer_count, ) elif ( self._remaining_tools_for_ticket(current_ticket) or self._request_info_note_for_ticket(current_ticket) is not None or self._ticket_is_capacity_sensitive(current_ticket) ): reward = REQUEST_INFO_CONTEXT_COMPLETION_BONUS self._queue.pop(idx) self._queue.append(moved_ticket) self._tickets_by_id[moved_ticket.ticket_id] = moved_ticket self._sync_queue_ticket_ids() self._record_dynamic_queue_event( "defer", ticket_id=ticket_id, defer_count=defer_count, sla_risk=sla_risk, ) else: self._state.sla_breach_count += 1 self._record_dynamic_queue_event( "defer_denied", ticket_id=ticket_id, defer_count=defer_count, ) self._state.step_count += 1 self._state.last_tool_result = { "action_type": "defer", "ticket_id": ticket_id, "defer_allowed": defer_allowed, "defer_count": defer_count, "sla_risk": sla_risk, } self._state.last_step_reward = reward self._state.reward = reward self._state.done = False reward_components = self._build_reward_components( ticket_score=0.0, field_breakdown={}, shaped_step_reward=reward, reward_kind="operational", final_reward=reward, extra_details={ "operational_action": "defer", "defer_allowed": defer_allowed, "defer_count": defer_count, "sla_breach_count": self._state.sla_breach_count, }, ) self._state.history_entries.append( self._build_history_entry( current_ticket, predicted=action.model_dump(exclude_none=True), score=0.0, breakdown={}, queue_position=idx + 1, reward=reward, reward_kind="operational", tool_result=self._state.last_tool_result, reward_components=reward_components, ) ) self._state.last_reward_components = reward_components return self._build_observation(task, done=False, reward=reward) def _handle_open_incident_action( self, task: dict, current_ticket: HelpdeskTicketRecord, action: HelpdeskTicketAction, idx: int, ) -> HelpdeskTicketObservation: submitted_fields = { field for field in ("issue_type", "priority", "assignment_group", "resolution_action") if getattr(action, field) is not None } if submitted_fields: raise ValueError( "open_incident actions cannot include submit fields: " f"{sorted(submitted_fields)}" ) useful_incident = ( self._state.current_task_id == 3 and self._requires_incident(current_ticket) and not self._incident_open_for_ticket(current_ticket) ) overflow = 0 incident_reward = 0.0 if useful_incident: self._state.open_incident_ticket_ids.append(current_ticket.ticket_id) self._state.incident_actions_used += 1 overflow = max(0, 1 - self._state.incident_slots_remaining) self._state.incident_slots_remaining = max( 0, self._state.incident_slots_remaining - 1, ) overflow_penalty = round(overflow * INCIDENT_SLOT_OVERFLOW_PENALTY, 4) if overflow_penalty > 0.0: self._state.planning_penalty_total = round( self._state.planning_penalty_total + overflow_penalty, 4, ) self._state.planning_penalty_applied = overflow_penalty incident_reward = clamp_open_unit_interval( INCIDENT_OPEN_REWARD - overflow_penalty ) self._record_dynamic_queue_event( "open_incident", ticket_id=current_ticket.ticket_id, overflow=overflow, ) self._state.step_count += 1 self._state.last_tool_result = { "action_type": "open_incident", "ticket_id": current_ticket.ticket_id, "incident_open": useful_incident, "incident_slots_remaining": self._state.incident_slots_remaining, "overflow": overflow, } self._state.last_step_reward = incident_reward self._state.reward = incident_reward self._state.done = False reward_components = self._build_reward_components( ticket_score=0.0, field_breakdown={}, shaped_step_reward=incident_reward, reward_kind="operational", final_reward=incident_reward, extra_details={ "operational_action": "open_incident", "incident_open": useful_incident, "incident_slots_remaining": self._state.incident_slots_remaining, }, ) self._state.history_entries.append( self._build_history_entry( current_ticket, predicted=action.model_dump(exclude_none=True), score=0.0, breakdown={}, queue_position=idx + 1, reward=incident_reward, reward_kind="operational", tool_result=self._state.last_tool_result, reward_components=reward_components, ) ) self._state.last_reward_components = reward_components return self._build_observation(task, done=False, reward=incident_reward) def _build_ticket_view(self, ticket: HelpdeskTicketRecord) -> dict[str, Any]: progress = self._tool_progress_for_ticket(ticket) remaining_tools = progress["remaining_tools"] used_tools = set(self._used_tools_for_ticket(ticket.ticket_id)) operational_actions = progress["recommended_operational_actions"] cluster_summary = self._cluster_summary(ticket) cluster_hint = ( cluster_summary["future_cluster_ticket_count"] > 0 or cluster_summary["shared_requester_count"] > 1 ) ticket_view: dict[str, Any] = { "ticket_id": ticket.ticket_id, "title": self._visible_title(ticket), "requester": ticket.requester, "description": self._visible_description(ticket), } if self._state.current_task_id == 3: ticket_view["capacity_state"] = self._capacity_state_snapshot() if progress["required_tools"]: ticket_view["context_status"] = { "investigation_required": True, "hidden_context_remaining": bool(progress["remaining_count"]), "context_gap_count": progress["remaining_count"], "revealed_context_count": progress["revealed_count"], "context_completeness": progress["completeness"], "investigations_used_for_ticket": progress["revealed_count"], "recommended_tools": list(remaining_tools), } ticket_view["operational_context"] = { "request_info_available": self._request_info_note_for_ticket(ticket) is not None, "request_info_used": progress["request_info_used"], "defer_count": self._defer_count(ticket.ticket_id), "incident_recommended": self._requires_incident(ticket), "incident_open": self._incident_open_for_ticket(ticket), "recommended_actions": operational_actions, "cluster_coordination_hint": cluster_hint, "shared_requester_pressure": cluster_summary["shared_requester_count"] > 1, } if "lookup_queue_cluster_summary" in used_tools: ticket_view["operational_context"].update( { "service_cluster_id": ticket.service_cluster_id, "future_cluster_ticket_count": cluster_summary["future_cluster_ticket_count"], "future_cluster_ticket_ids": cluster_summary["future_cluster_ticket_ids"], "shared_requester_count": cluster_summary["shared_requester_count"], "active_incident_cover": cluster_summary["active_incident_cover"], } ) if ticket.ambiguity_note is not None and "lookup_internal_routing_note" not in remaining_tools: ticket_view["ambiguity_note"] = ticket.ambiguity_note if ( ticket.planning_note is not None and "lookup_internal_routing_note" not in remaining_tools ): ticket_view["planning_note"] = ticket.planning_note if self._request_info_used(ticket.ticket_id): ticket_view["customer_update_note"] = self._request_info_note_for_ticket(ticket) if ticket.related_ticket_id is not None and "lookup_related_ticket" not in remaining_tools: ticket_view["related_ticket_id"] = ticket.related_ticket_id related_ticket = self._tickets_by_id.get(ticket.related_ticket_id) if related_ticket is not None: ticket_view["related_ticket_preview"] = { "ticket_id": related_ticket.ticket_id, "title": related_ticket.title, "requester": related_ticket.requester, "description": related_ticket.description, } if self._ticket_has_alternate_route(ticket) and ( "lookup_internal_routing_note" in used_tools or "lookup_queue_capacity_forecast" in used_tools ): ticket_view["routing_options"] = self._routing_options_for_ticket(ticket) if "lookup_queue_cluster_summary" in used_tools: ticket_view["cluster_summary"] = cluster_summary if ticket.generated_from_ticket_id is not None: ticket_view["generated_from_ticket_id"] = ticket.generated_from_ticket_id return ticket_view def _build_feedback_summary( self, *, predicted: dict[str, Any], score: float, breakdown: dict[str, float], reward: float | None = None, rubric_reward: float | None = None, reward_kind: str | None = None, penalty_reason: str | None = None, tool_result: dict[str, Any] | None = None, reward_components: dict[str, Any] | None = None, ) -> str: parts: list[str] = [] if reward_kind == "investigation": tool_name = predicted.get("tool_name") or (tool_result or {}).get("tool_name") parts.append(f"Investigation step used {tool_name or 'a tool'}") if reward_components and reward_components.get("new_context_revealed"): parts.append("new context was revealed") elif reward_kind == "operational": operational_action = ( reward_components.get("operational_action") if reward_components else predicted.get("action_type") ) parts.append(f"Operational step used {operational_action or 'an action'}") elif penalty_reason is not None: parts.append(f"Penalty applied: {penalty_reason}") else: parts.append(f"Ticket score={score:.2f}") if breakdown: field_scores = ", ".join( f"{field}={value:.2f}" for field, value in sorted(breakdown.items()) ) parts.append(f"field_scores[{field_scores}]") if reward is not None: parts.append(f"reward={reward:.2f}") if rubric_reward is not None: parts.append(f"rubric_reward={rubric_reward:.2f}") if reward_components: context_gap_penalty = reward_components.get("context_gap_penalty") if context_gap_penalty: parts.append(f"context_gap_penalty={context_gap_penalty:.2f}") hidden_context_remaining_count = reward_components.get( "hidden_context_remaining_count" ) if hidden_context_remaining_count: parts.append( f"hidden_context_remaining={hidden_context_remaining_count}" ) context_completion_bonus = reward_components.get("context_completion_bonus") if context_completion_bonus: parts.append(f"context_bonus={context_completion_bonus:.2f}") risk_penalty = reward_components.get("risk_penalty") if risk_penalty: parts.append(f"risk_penalty={risk_penalty:.2f}") capacity_penalty = reward_components.get("capacity_penalty") if capacity_penalty: parts.append(f"capacity_penalty={capacity_penalty:.2f}") planning_penalty_total = reward_components.get("planning_penalty_total") if planning_penalty_total: parts.append(f"planning_penalty_total={planning_penalty_total:.2f}") incident_gap_penalty = reward_components.get("incident_gap_penalty") if incident_gap_penalty: parts.append(f"incident_gap_penalty={incident_gap_penalty:.2f}") queue_management_score = reward_components.get("queue_management_score") if queue_management_score is not None: parts.append(f"queue_management_score={queue_management_score:.2f}") spawned_follow_up_ticket_id = reward_components.get("spawned_follow_up_ticket_id") if spawned_follow_up_ticket_id: parts.append(f"spawned_follow_up={spawned_follow_up_ticket_id}") cluster_stabilized_ticket_ids = reward_components.get("cluster_stabilized_ticket_ids") if cluster_stabilized_ticket_ids: parts.append( "cluster_stabilized=" + ",".join(cluster_stabilized_ticket_ids) ) cluster_destabilized_ticket_ids = reward_components.get( "cluster_destabilized_ticket_ids" ) if cluster_destabilized_ticket_ids: parts.append( "cluster_destabilized=" + ",".join(cluster_destabilized_ticket_ids) ) return "; ".join(parts) def _build_history_entry( self, ticket: HelpdeskTicketRecord, *, predicted: dict[str, Any], score: float, breakdown: dict[str, float], queue_position: int, reward: float | None = None, rubric_reward: float | None = None, reward_kind: str | None = None, penalty_reason: str | None = None, tool_result: dict[str, Any] | None = None, reward_components: dict[str, Any] | None = None, ) -> dict[str, Any]: progress = self._tool_progress_for_ticket(ticket) remaining_tools = progress["remaining_tools"] cluster_summary = self._cluster_summary(ticket) history_entry: dict[str, Any] = { "ticket_id": ticket.ticket_id, "title": ticket.title, "requester": ticket.requester, "predicted": predicted, "score": score, "breakdown": breakdown, "queue_position": queue_position, "operational_context": { "request_info_used": progress["request_info_used"], "defer_count": self._defer_count(ticket.ticket_id), "incident_open": self._incident_open_for_ticket(ticket), "recommended_actions": progress["recommended_operational_actions"], "cluster_coordination_hint": ( cluster_summary["future_cluster_ticket_count"] > 0 or cluster_summary["shared_requester_count"] > 1 ), }, } if "lookup_queue_cluster_summary" in self._used_tools_for_ticket(ticket.ticket_id): history_entry["operational_context"].update( { "service_cluster_id": ticket.service_cluster_id, "future_cluster_ticket_count": cluster_summary["future_cluster_ticket_count"], "active_incident_cover": cluster_summary["active_incident_cover"], "shared_requester_count": cluster_summary["shared_requester_count"], } ) if self._state.current_task_id == 3: history_entry["capacity_state"] = self._capacity_state_snapshot() if reward is not None: history_entry["reward"] = reward if rubric_reward is not None: history_entry["rubric_reward"] = rubric_reward if reward_kind is not None: history_entry["reward_kind"] = reward_kind if ticket.ambiguity_note is not None and "lookup_internal_routing_note" not in remaining_tools: history_entry["ambiguity_note"] = ticket.ambiguity_note if ( ticket.planning_note is not None and "lookup_internal_routing_note" not in remaining_tools ): history_entry["planning_note"] = ticket.planning_note if self._request_info_used(ticket.ticket_id): history_entry["customer_update_note"] = self._request_info_note_for_ticket(ticket) if ticket.related_ticket_id is not None and "lookup_related_ticket" not in remaining_tools: history_entry["related_ticket_id"] = ticket.related_ticket_id related_ticket = self._tickets_by_id.get(ticket.related_ticket_id) if related_ticket is not None: history_entry["related_ticket_preview"] = { "ticket_id": related_ticket.ticket_id, "title": related_ticket.title, "requester": related_ticket.requester, "description": related_ticket.description, } if ( self._ticket_has_alternate_route(ticket) and ( "lookup_internal_routing_note" not in remaining_tools or "lookup_queue_capacity_forecast" in self._used_tools_for_ticket(ticket.ticket_id) ) ): history_entry["routing_options"] = self._routing_options_for_ticket(ticket) if "lookup_queue_cluster_summary" in self._used_tools_for_ticket(ticket.ticket_id): history_entry["cluster_summary"] = cluster_summary if penalty_reason is not None: history_entry["penalty_reason"] = penalty_reason if tool_result is not None: history_entry["tool_result"] = tool_result if reward_components is not None: history_entry["reward_components"] = reward_components if ticket.generated_from_ticket_id is not None: history_entry["generated_from_ticket_id"] = ticket.generated_from_ticket_id if progress["required_tools"]: history_entry["context_progress"] = { "hidden_context_remaining": bool(progress["remaining_count"]), "context_gap_count": progress["remaining_count"], "revealed_context_count": progress["revealed_count"], "context_completeness": progress["completeness"], } history_entry["feedback_summary"] = self._build_feedback_summary( predicted=predicted, score=score, breakdown=breakdown, reward=reward, rubric_reward=rubric_reward, reward_kind=reward_kind, penalty_reason=penalty_reason, tool_result=tool_result, reward_components=reward_components, ) return history_entry def _build_observation( self, task: dict, done: bool = False, reward: float | None = None, rubric_reward: float | None = None, ) -> HelpdeskTicketObservation: idx = self._state.current_ticket_index queue_size = len(self._queue) if idx < queue_size: ticket = self._queue[idx] ticket_view = self._build_ticket_view(ticket) queue_position = idx + 1 else: ticket_view = None queue_position = 0 history = list(self._state.history_entries) last_history_entry = history[-1] if history else None tickets_remaining = max(0, queue_size - idx) tickets_after_current = max( 0, tickets_remaining - (1 if ticket_view is not None else 0), ) progress_fraction = (idx / queue_size) if queue_size else 0.0 metadata = { "queue_position": queue_position, "tickets_remaining_includes_current": ticket_view is not None, "has_ambiguity_note": bool(ticket_view and ticket_view.get("ambiguity_note")), "has_related_ticket_context": bool( ticket_view and ticket_view.get("related_ticket_preview") ), "has_hidden_context": bool( ticket_view and (ticket_view.get("context_status") or {}).get("hidden_context_remaining") ), "action_mode": "investigate_or_submit", "available_action_types": self._available_action_types_for_task(), "average_score_so_far": self._state.average_score_so_far, "progress_fraction": progress_fraction, "investigation_penalty_applied": self._state.investigation_penalty_applied, "planning_penalty_total": self._state.planning_penalty_total, "planning_penalty_applied": self._state.planning_penalty_applied, "sla_breach_count": self._state.sla_breach_count, "incident_gap_total": self._state.incident_gap_total, "queue_management_score": self._state.queue_management_score, "queue_management_breakdown": dict(self._state.queue_management_breakdown), "dynamic_queue_events": list(self._state.dynamic_queue_events[-5:]), "clustered_follow_ons": self._future_queue_demand().get("clustered_follow_ons", 0), } if self._state.current_task_id == 3: metadata["capacity_state"] = self._capacity_state_snapshot() if last_history_entry is not None: metadata["last_score"] = last_history_entry.get("score") metadata["last_reward"] = last_history_entry.get("reward") metadata["last_reward_kind"] = last_history_entry.get("reward_kind") metadata["last_breakdown"] = last_history_entry.get("breakdown") metadata["last_feedback_summary"] = last_history_entry.get("feedback_summary") metadata["last_reward_components"] = last_history_entry.get("reward_components", {}) if "penalty_reason" in last_history_entry: metadata["last_penalty_reason"] = last_history_entry["penalty_reason"] return HelpdeskTicketObservation( done=done, reward=reward, rubric_reward=rubric_reward, metadata=metadata, task_id=task["id"], task_name=task["name"], instructions=task["instructions"], allowed_fields=list(task["allowed_fields"]), available_action_types=self._available_action_types_for_task(), available_tools=self._available_tools_for_task(), investigation_budget_remaining=self._state.investigation_budget_remaining, last_tool_result=self._state.last_tool_result, current_ticket=ticket_view, queue_size=queue_size, tickets_remaining=tickets_remaining, tickets_after_current=tickets_after_current, tickets_processed=idx, queue_position=queue_position, average_score_so_far=self._state.average_score_so_far, progress_fraction=progress_fraction, history=history, last_reward_components=dict(self._state.last_reward_components), )