fleetmind / src /delivery_dispatch /environment.py
Rishav
Refine dynamic task pacing
f62c8f0
Raw
History Blame Contribute Delete
34.3 kB
from __future__ import annotations
from copy import deepcopy
from random import SystemRandom
from typing import Any
from .models import (
Action,
AgentState,
Feedback,
Metrics,
Observation,
OrderState,
Reward,
Scenario,
ScenarioInfo,
StepResult,
ZonePhase,
)
from .policies import estimate_job_cost
from .scenarios import SCENARIO_BUILDERS
class DeliveryDispatchEnv:
"""Deterministic event-driven delivery dispatch simulator."""
invalid_assignment_penalty = -1.0
idle_penalty = -0.35
service_time = 1
missed_order_penalty_multiplier = 0.75
feasible_assignment_bonus = 0.5
infeasible_assignment_penalty = -1.5
rejection_penalty_multiplier = 0.4
passive_wait_penalty = -0.25
passive_wait_penalty_cap = 3
service_grace_window = 4
early_bonus_per_tick = 0.45
early_bonus_cap = 4
late_linear_penalty = 1.4
late_quadratic_penalty = 0.35
high_value_threshold = 16.0
def __init__(
self,
scenario_name: str = "low_demand",
max_decision_steps: int | None = None,
seed: int | None = None,
) -> None:
if scenario_name not in SCENARIO_BUILDERS:
raise ValueError(f"Unknown scenario: {scenario_name}")
self._scenario_name = scenario_name
self._configured_max_decision_steps = max_decision_steps
self._configured_seed = seed
self._scenario: Scenario | None = None
self.current_time = 0
self.decision_step = 0
self.max_decision_steps = max_decision_steps or 100
self.arrival_freeze_time: int | None = None
self.agents: list[AgentState] = []
self.orders: list[OrderState] = []
self.cumulative_reward = 0.0
self.last_step_reward = 0.0
self.recent_events: list[str] = []
self.last_reward_breakdown: dict[str, float] = {}
self.stats = {
"completed_orders": 0,
"on_time_orders": 0,
"late_orders": 0,
"expired_orders": 0,
"rejected_orders": 0,
"invalid_actions": 0,
}
self.last_error_summary: dict[str, int] = {}
self.used_seed: int | None = None
self.total_lateness = 0
self.consecutive_passive_steps = 0
self.cumulative_reward_components = {
"completion_reward": 0.0,
"expiry_penalty": 0.0,
"rejection_penalty": 0.0,
"idle_penalty": 0.0,
"passive_wait_penalty": 0.0,
"invalid_assignment_penalty": 0.0,
"infeasible_assignment_penalty": 0.0,
"valid_assignment_bonus": 0.0,
}
def reset(
self,
task_id: str | None = None,
max_decision_steps: int | None = None,
seed: int | None = None,
) -> Observation:
if task_id is not None:
if task_id not in SCENARIO_BUILDERS:
raise ValueError(f"Unknown scenario: {task_id}")
self._scenario_name = task_id
scenario_seed = seed if seed is not None else self._configured_seed
if scenario_seed is None:
scenario_seed = SystemRandom().randint(1, 10_000_000)
self.used_seed = scenario_seed
self._scenario = deepcopy(SCENARIO_BUILDERS[self._scenario_name](scenario_seed))
self.current_time = 0
self.decision_step = 0
self.max_decision_steps = (
max_decision_steps
or self._configured_max_decision_steps
or self._scenario.default_max_decision_steps
)
self.agents = list(self._scenario.agents)
self.orders = list(self._scenario.orders)
self.cumulative_reward = 0.0
self.last_step_reward = 0.0
self.arrival_freeze_time = None
self.recent_events = ["environment reset"]
self.last_reward_breakdown = {}
self.stats = {
"completed_orders": 0,
"on_time_orders": 0,
"late_orders": 0,
"expired_orders": 0,
"rejected_orders": 0,
"invalid_actions": 0,
}
self.last_error_summary = {}
self.total_lateness = 0
self.consecutive_passive_steps = 0
self.cumulative_reward_components = {
"completion_reward": 0.0,
"expiry_penalty": 0.0,
"rejection_penalty": 0.0,
"idle_penalty": 0.0,
"passive_wait_penalty": 0.0,
"invalid_assignment_penalty": 0.0,
"infeasible_assignment_penalty": 0.0,
"valid_assignment_bonus": 0.0,
}
return self.state()
def state(self) -> Observation:
scenario = self._require_scenario()
visible_orders = self._visible_orders()
agent_views = [self._agent_view(agent) for agent in self.agents]
order_views = [self._order_view(order) for order in visible_orders]
metrics = Metrics(
completed_orders=self.stats["completed_orders"],
on_time_orders=self.stats["on_time_orders"],
late_orders=self.stats["late_orders"],
expired_orders=self.stats["expired_orders"],
rejected_orders=self.stats["rejected_orders"],
invalid_actions=self.stats["invalid_actions"],
active_orders=len([order for order in visible_orders if order.status in {"unassigned", "assigned"}]),
pending_orders=len([order for order in visible_orders if order.status == "unassigned"]),
idle_agents=len([agent for agent in self.agents if agent.status == "idle"]),
busy_agents=len([agent for agent in self.agents if agent.status == "busy"]),
)
public_error_summary = {
key: value
for key, value in self.last_error_summary.items()
if key in {"expired_orders", "late_deliveries", "rejected_orders", "invalid_actions"}
}
return Observation(
time=self.current_time,
decision_step=self.decision_step,
max_decision_steps=self.max_decision_steps,
task_id=scenario.name,
episode_horizon=scenario.episode_horizon,
grid=self._current_grid(),
agents=agent_views,
orders=order_views,
feedback=Feedback(
last_step_reward=self.last_step_reward,
cumulative_reward=self.cumulative_reward,
recent_events=[],
reward_breakdown={},
error_summary=public_error_summary,
current_pressure="",
),
metrics=metrics,
scenario_info=ScenarioInfo(
name=scenario.name,
episode_horizon=scenario.episode_horizon,
default_max_decision_steps=scenario.default_max_decision_steps,
used_seed=self.used_seed,
briefing=scenario.briefing,
dispatch_objective=scenario.dispatch_objective,
known_future_signal="",
),
)
def step(self, action: Action | dict[str, Any]) -> StepResult:
scenario = self._require_scenario()
if self.decision_step >= self.max_decision_steps:
return StepResult(
observation=self.state(),
reward=Reward(step_reward=0.0, cumulative_reward=round(self.cumulative_reward, 3)),
done=True,
info={
"accepted_assignments": [],
"rejected_orders": [],
"invalid_assignments": [],
"time_advanced_to": self.current_time,
"stats": dict(self.stats),
"reward_breakdown": {},
"error_summary": {},
"current_pressure": "",
},
)
parsed_action = action if isinstance(action, Action) else Action.model_validate(action)
step_reward = 0.0
reward_breakdown = {
"rejection_penalty": 0.0,
"valid_assignment_bonus": 0.0,
"infeasible_assignment_penalty": 0.0,
"invalid_assignment_penalty": 0.0,
"idle_penalty": 0.0,
"passive_wait_penalty": 0.0,
"completion_reward": 0.0,
"expiry_penalty": 0.0,
}
error_summary = {
"rejected_orders": 0,
"late_deliveries": 0,
"expired_orders": 0,
"high_value_orders_missed": 0,
"urgent_orders_unassigned": 0,
}
accepted_assignments: list[dict[str, str]] = []
invalid_assignments: list[dict[str, str]] = []
rejected_orders: list[str] = []
self.recent_events = []
busy_agents = {agent.agent_id for agent in self.agents if agent.status == "busy"}
claimed_orders: set[str] = set()
for order_id in parsed_action.rejections:
order = self._find_order(order_id)
if order is None or order.status != "unassigned" or order.created_at > self.current_time:
step_reward += self.invalid_assignment_penalty
reward_breakdown["invalid_assignment_penalty"] += self.invalid_assignment_penalty
self.stats["invalid_actions"] += 1
invalid_assignments.append({"agent_id": "reject", "order_id": order_id})
self.recent_events.append(f"ignored invalid rejection for {order_id}")
continue
rejection_penalty = -self._rejection_penalty(order)
step_reward += rejection_penalty
reward_breakdown["rejection_penalty"] += rejection_penalty
order.status = "rejected"
order.rejected_at = self.current_time
self.stats["rejected_orders"] += 1
error_summary["rejected_orders"] += 1
rejected_orders.append(order.order_id)
self.recent_events.append(f"rejected {order.order_id}")
for assignment in parsed_action.assignments:
agent = self._find_agent(assignment.agent_id)
order = self._find_order(assignment.order_id)
valid = True
if agent is None or order is None:
valid = False
elif agent.agent_id in busy_agents or agent.status != "idle":
valid = False
elif assignment.order_id in claimed_orders:
valid = False
elif order.status != "unassigned" or order.created_at > self.current_time:
valid = False
if not valid:
step_reward += self.invalid_assignment_penalty
reward_breakdown["invalid_assignment_penalty"] += self.invalid_assignment_penalty
self.stats["invalid_actions"] += 1
invalid_assignments.append(assignment.model_dump())
self.recent_events.append(
f"ignored invalid assignment {assignment.agent_id}->{assignment.order_id}"
)
continue
job_time = self._job_time(agent, order)
agent.status = "busy"
agent.assigned_order_id = order.order_id
agent.busy_until = self.current_time + job_time
order.status = "assigned"
order.assigned_agent_id = agent.agent_id
order.scheduled_completion_time = agent.busy_until
claimed_orders.add(order.order_id)
busy_agents.add(agent.agent_id)
accepted_assignments.append(assignment.model_dump())
estimated_finish = self.current_time + job_time
if estimated_finish <= self._service_cutoff(order):
step_reward += self.feasible_assignment_bonus
reward_breakdown["valid_assignment_bonus"] += self.feasible_assignment_bonus
else:
step_reward += self.infeasible_assignment_penalty
reward_breakdown["infeasible_assignment_penalty"] += self.infeasible_assignment_penalty
self.recent_events.append(
f"assigned {order.order_id} to {agent.agent_id} until t={agent.busy_until}"
)
avoidable_idle_slots = self._avoidable_idle_slots()
if avoidable_idle_slots > 0:
idle_agents = len([agent for agent in self.agents if agent.status == "idle"])
idle_penalty = self.idle_penalty * min(idle_agents, avoidable_idle_slots)
step_reward += idle_penalty
reward_breakdown["idle_penalty"] += idle_penalty
self.recent_events.append("avoidable idle capacity remained")
if not accepted_assignments and not rejected_orders and avoidable_idle_slots > 0:
self.consecutive_passive_steps += 1
passive_penalty = self.passive_wait_penalty * min(
self.consecutive_passive_steps, self.passive_wait_penalty_cap
)
step_reward += passive_penalty
reward_breakdown["passive_wait_penalty"] += passive_penalty
self.recent_events.append("passive waiting while worthwhile work remained")
else:
self.consecutive_passive_steps = 0
next_time = self._next_event_time()
if next_time is None:
next_time = scenario.episode_horizon
self.current_time = min(next_time, scenario.episode_horizon)
completion_reward, completed_orders, late_deliveries = self._resolve_completions()
step_reward += completion_reward
reward_breakdown["completion_reward"] += completion_reward
expiry_penalty, expired_orders, high_value_missed = self._expire_orders()
step_reward += expiry_penalty
reward_breakdown["expiry_penalty"] += expiry_penalty
error_summary["late_deliveries"] += late_deliveries
error_summary["expired_orders"] += len(expired_orders)
error_summary["high_value_orders_missed"] += high_value_missed
error_summary["urgent_orders_unassigned"] = self._count_urgent_unassigned_orders()
if not self.recent_events:
self.recent_events.append("no state change")
if completed_orders:
self.recent_events.extend(completed_orders)
if expired_orders:
self.recent_events.extend(expired_orders)
self.decision_step += 1
terminal_info: dict[str, int] = {}
if self.decision_step >= self.max_decision_steps:
(
terminal_reward,
terminal_events,
terminal_error_summary,
terminal_info,
) = self._finalize_terminal_state()
step_reward += terminal_reward
for key, value in terminal_error_summary.items():
error_summary[key] = error_summary.get(key, 0) + value
if terminal_events:
self.recent_events.extend(terminal_events)
self.cumulative_reward += step_reward
self.last_step_reward = step_reward
self.last_reward_breakdown = {
key: round(value, 3)
for key, value in reward_breakdown.items()
if abs(value) > 1e-9
}
for key, value in reward_breakdown.items():
self.cumulative_reward_components[key] += value
self.last_error_summary = {key: value for key, value in error_summary.items() if value}
done = self._is_done()
public_error_summary = {
key: value
for key, value in self.last_error_summary.items()
if key in {"expired_orders", "late_deliveries", "rejected_orders", "invalid_actions"}
}
info = {
"accepted_assignments": accepted_assignments,
"rejected_orders": rejected_orders,
"invalid_assignments": invalid_assignments,
"time_advanced_to": self.current_time,
"stats": dict(self.stats),
"reward_breakdown": {},
"error_summary": public_error_summary,
"current_pressure": "",
"terminal_resolution": terminal_info,
}
if done:
info["episode_summary"] = self._episode_summary()
return StepResult(
observation=self.state(),
reward=Reward(
step_reward=round(step_reward, 3),
cumulative_reward=round(self.cumulative_reward, 3),
),
done=done,
info=info,
)
def _require_scenario(self) -> Scenario:
if self._scenario is None:
raise RuntimeError("Environment must be reset before use.")
return self._scenario
def _visible_orders(self) -> list[OrderState]:
visibility_time = self.arrival_freeze_time if self.arrival_freeze_time is not None else self.current_time
return [
order
for order in self.orders
if order.created_at <= visibility_time and order.status not in {"completed", "expired", "rejected"}
]
def _find_agent(self, agent_id: str) -> AgentState | None:
return next((agent for agent in self.agents if agent.agent_id == agent_id), None)
def _find_order(self, order_id: str) -> OrderState | None:
return next((order for order in self.orders if order.order_id == order_id), None)
def _agent_view(self, agent: AgentState) -> AgentState:
availability_in = max(agent.busy_until - self.current_time, 0) if agent.status == "busy" else 0
return agent.model_copy(
update={
"availability_in": availability_in,
"idle_now": agent.status == "idle",
},
deep=True,
)
def _order_view(self, order: OrderState) -> OrderState:
return order.model_copy(update={"service_cutoff_time": None}, deep=True)
def _nearest_idle_agent(self, order: OrderState) -> AgentState | None:
idle_agents = [agent for agent in self.agents if agent.status == "idle"]
if not idle_agents:
return None
ranked = sorted(
idle_agents,
key=lambda agent: (
self._job_time(agent, order),
agent.agent_id,
),
)
return ranked[0]
def _job_time(self, agent: AgentState, order: OrderState) -> int:
congested = set(self._current_grid().congested_zones)
return estimate_job_cost(
agent.location,
order.pickup_location,
order.drop_location,
congested,
self.service_time,
)
def _service_cutoff(self, order: OrderState) -> int:
return order.deadline + self.service_grace_window
def _next_event_time(self) -> int | None:
scenario = self._require_scenario()
completion_times = [
agent.busy_until
for agent in self.agents
if agent.status == "busy" and agent.busy_until > self.current_time
]
arrival_times = [
order.created_at
for order in self.orders
if order.status == "unassigned" and order.created_at > self.current_time
]
cutoff_times = [
self._service_cutoff(order) + 1
for order in self.orders
if order.status == "unassigned" and order.created_at <= self.current_time
]
candidates = completion_times + arrival_times + cutoff_times + [scenario.episode_horizon]
candidates = [candidate for candidate in candidates if candidate > self.current_time]
return min(candidates) if candidates else None
def _resolve_completions(self) -> tuple[float, list[str], int]:
reward = 0.0
events: list[str] = []
late_deliveries = 0
for agent in self.agents:
if agent.status != "busy" or agent.busy_until > self.current_time:
continue
if agent.assigned_order_id is None:
agent.status = "idle"
agent.busy_until = self.current_time
continue
order = self._find_order(agent.assigned_order_id)
if order is None:
agent.status = "idle"
agent.assigned_order_id = None
agent.busy_until = self.current_time
continue
order.completed_at = self.current_time
order.status = "completed"
agent.location = order.drop_location
agent.status = "idle"
agent.busy_until = self.current_time
agent.assigned_order_id = None
lateness = max(order.completed_at - order.deadline, 0)
order_reward = self._completion_reward(order, order.completed_at)
if lateness == 0:
early_ticks = max(order.deadline - order.completed_at, 0)
if early_ticks > 0:
events.append(f"order {order.order_id} completed early by {early_ticks}")
else:
events.append(f"order {order.order_id} completed on time")
self.stats["on_time_orders"] += 1
else:
self.stats["late_orders"] += 1
late_deliveries += 1
self.total_lateness += lateness
if order.completed_at > self._service_cutoff(order):
events.append(f"order {order.order_id} completed beyond service cutoff")
else:
events.append(f"order {order.order_id} completed late by {lateness}")
reward += order_reward
self.stats["completed_orders"] += 1
return reward, events, late_deliveries
def _expire_orders(self) -> tuple[float, list[str], int]:
penalty = 0.0
events: list[str] = []
high_value_missed = 0
for order in self.orders:
if order.status != "unassigned":
continue
if order.created_at > self.current_time:
continue
if self._service_cutoff(order) < self.current_time:
order.status = "expired"
self.stats["expired_orders"] += 1
penalty -= self._missed_order_penalty(order, self.current_time)
events.append(f"order {order.order_id} expired")
if order.reward_value >= self.high_value_threshold:
high_value_missed += 1
return penalty, events, high_value_missed
def _completion_reward(self, order: OrderState, completed_at: int) -> float:
early_ticks = max(order.deadline - completed_at, 0)
if completed_at <= order.deadline:
early_bonus = self.early_bonus_per_tick * min(early_ticks, self.early_bonus_cap)
return order.reward_value + early_bonus
lateness = completed_at - order.deadline
penalty = (self.late_linear_penalty * lateness) + (
self.late_quadratic_penalty * (lateness ** 2)
)
return max(0.0, order.reward_value - penalty)
def _count_urgent_unassigned_orders(self) -> int:
return len(
[
order
for order in self._visible_orders()
if order.status == "unassigned" and (order.deadline - self.current_time) <= 3
]
)
def _finalize_terminal_state(self) -> tuple[float, list[str], dict[str, int], dict[str, int]]:
reward = 0.0
events: list[str] = []
freeze_time = self.current_time
self.arrival_freeze_time = freeze_time
error_summary = {
"expired_orders": 0,
"high_value_orders_missed": 0,
"late_deliveries": 0,
}
terminal_info = {
"resolved_assigned_orders": 0,
"terminal_expired_unassigned": 0,
"terminal_expired_assigned": 0,
}
assigned_orders = [
order for order in self.orders if order.status == "assigned"
]
terminal_time = max(
[self.current_time]
+ [
order.scheduled_completion_time or self.current_time
for order in assigned_orders
if (order.scheduled_completion_time or self.current_time) <= self._service_cutoff(order)
]
)
for order in assigned_orders:
agent = self._find_agent(order.assigned_agent_id) if order.assigned_agent_id else None
finish_time = order.scheduled_completion_time or self.current_time
if finish_time <= self._service_cutoff(order):
order.completed_at = finish_time
order.status = "completed"
if agent is not None:
agent.location = order.drop_location
agent.status = "idle"
agent.busy_until = finish_time
agent.assigned_order_id = None
order_reward = self._completion_reward(order, finish_time)
reward += order_reward
self.stats["completed_orders"] += 1
terminal_info["resolved_assigned_orders"] += 1
if finish_time <= order.deadline:
self.stats["on_time_orders"] += 1
if finish_time < order.deadline:
events.append(
f"terminal rollout completed {order.order_id} early by {order.deadline - finish_time}"
)
else:
events.append(f"terminal rollout completed {order.order_id} on time")
else:
self.stats["late_orders"] += 1
error_summary["late_deliveries"] += 1
self.total_lateness += finish_time - order.deadline
events.append(
f"terminal rollout completed {order.order_id} late by {finish_time - order.deadline}"
)
else:
order.status = "expired"
self.stats["expired_orders"] += 1
terminal_info["terminal_expired_assigned"] += 1
penalty = -self._missed_order_penalty(order, finish_time)
reward += penalty
error_summary["expired_orders"] += 1
if order.reward_value >= self.high_value_threshold:
error_summary["high_value_orders_missed"] += 1
if agent is not None:
agent.status = "idle"
agent.busy_until = self.current_time
agent.assigned_order_id = None
events.append(f"terminal expiry for assigned order {order.order_id}")
for order in self.orders:
if order.status != "unassigned":
continue
if order.created_at > freeze_time:
continue
order.status = "expired"
self.stats["expired_orders"] += 1
terminal_info["terminal_expired_unassigned"] += 1
penalty = -self._missed_order_penalty(order, freeze_time)
reward += penalty
error_summary["expired_orders"] += 1
if order.reward_value >= self.high_value_threshold:
error_summary["high_value_orders_missed"] += 1
events.append(f"terminal expiry for unassigned order {order.order_id}")
for agent in self.agents:
if agent.status == "busy":
agent.status = "idle"
agent.assigned_order_id = None
agent.busy_until = terminal_time
self.current_time = terminal_time
return reward, events, error_summary, terminal_info
def _avoidable_idle_slots(self) -> int:
idle_agents = [agent for agent in self.agents if agent.status == "idle"]
if not idle_agents:
return 0
worthwhile_orders = [
order
for order in self._visible_orders()
if order.status == "unassigned" and self._is_worth_serving_now(order, idle_agents)
]
return len(worthwhile_orders)
def _is_done(self) -> bool:
scenario = self._require_scenario()
if self.decision_step >= self.max_decision_steps:
return True
if self.current_time >= scenario.episode_horizon:
return True
pending_orders = [
order
for order in self.orders
if order.status in {"unassigned", "assigned"}
and (order.created_at <= self.current_time or order.created_at <= scenario.episode_horizon)
]
any_busy = any(agent.status == "busy" for agent in self.agents)
future_orders = any(
order.status == "unassigned" and order.created_at > self.current_time
for order in self.orders
)
return not pending_orders and not any_busy and not future_orders
def _pressure_summary(self) -> str:
visible_orders = self._visible_orders()
unassigned_orders = [order for order in visible_orders if order.status == "unassigned"]
idle_agents = [agent for agent in self.agents if agent.status == "idle"]
current_grid = self._current_grid()
urgent_orders = [
order for order in unassigned_orders
if self._service_cutoff(order) - self.current_time <= 6
]
hotspot_orders = [
order for order in unassigned_orders
if order.pickup_location in current_grid.hotspots
]
if urgent_orders and len(idle_agents) < len(urgent_orders):
return "high urgency pressure: more urgent orders than idle agents"
if hotspot_orders and idle_agents:
return "hotspot pressure: current demand is concentrated near hotspot zones"
if unassigned_orders and not idle_agents:
return "capacity pressure: all agents are currently occupied"
if not unassigned_orders:
return "low pressure: no unassigned visible orders right now"
return "moderate pressure: feasible work is available without immediate overload"
def _current_grid(self) -> Any:
scenario = self._require_scenario()
return scenario.grid.model_copy(
update={
"hotspots": self._phase_points(scenario.hotspot_phases, scenario.grid.hotspots),
"congested_zones": self._phase_points(
scenario.congestion_phases, scenario.grid.congested_zones
),
},
deep=True,
)
def _phase_points(
self,
phases: tuple[ZonePhase, ...],
fallback: tuple[tuple[int, int], ...],
) -> tuple[tuple[int, int], ...]:
if not phases:
return fallback
chosen = fallback
for phase in sorted(phases, key=lambda item: item.start_time):
if self.current_time >= phase.start_time:
chosen = phase.points
else:
break
return chosen
def _episode_summary(self) -> dict[str, float | int]:
late_orders = self.stats["late_orders"]
expired_penalty = -self.cumulative_reward_components["expiry_penalty"]
rejection_penalty = -self.cumulative_reward_components["rejection_penalty"]
idle_penalty = -self.cumulative_reward_components["idle_penalty"]
return {
"cumulative_reward": round(self.cumulative_reward, 3),
"decision_steps_used": self.decision_step,
"average_lateness": round(self.total_lateness / late_orders, 3) if late_orders else 0.0,
"reward_lost_to_expiry": round(expired_penalty, 3),
"reward_lost_to_rejection": round(rejection_penalty, 3),
"cumulative_idle_penalty": round(idle_penalty, 3),
"cumulative_passive_wait_penalty": round(
-self.cumulative_reward_components["passive_wait_penalty"], 3
),
"cumulative_invalid_action_penalty": round(
-self.cumulative_reward_components["invalid_assignment_penalty"], 3
),
"cumulative_infeasible_assignment_penalty": round(
-self.cumulative_reward_components["infeasible_assignment_penalty"], 3
),
}
def _best_idle_finish_time(self, order: OrderState, idle_agents: list[AgentState] | None = None) -> int | None:
idle_agents = idle_agents or [agent for agent in self.agents if agent.status == "idle"]
if not idle_agents:
return None
best_cost = min(self._job_time(agent, order) for agent in idle_agents)
return self.current_time + best_cost
def _priority_multiplier(self, order: OrderState, reference_time: int) -> float:
urgency = order.deadline - reference_time
multiplier = 1.0
if order.reward_value >= self.high_value_threshold:
multiplier += 0.3
elif order.reward_value >= 12:
multiplier += 0.15
if urgency <= 3:
multiplier += 0.3
elif urgency <= 6:
multiplier += 0.15
return multiplier
def _missed_order_penalty(self, order: OrderState, reference_time: int) -> float:
return self.missed_order_penalty_multiplier * order.reward_value * self._priority_multiplier(
order, reference_time
)
def _rejection_penalty(self, order: OrderState) -> float:
expiry_penalty = self._missed_order_penalty(order, self.current_time)
best_finish = self._best_idle_finish_time(order)
if best_finish is None:
ratio = 0.55
elif best_finish > self._service_cutoff(order):
ratio = 0.5
elif best_finish > order.deadline:
ratio = 0.65
else:
ratio = 0.8
return max(1.0, expiry_penalty * ratio, self.rejection_penalty_multiplier * order.reward_value)
def _is_worth_serving_now(self, order: OrderState, idle_agents: list[AgentState]) -> bool:
best_finish = self._best_idle_finish_time(order, idle_agents)
if best_finish is None or best_finish > self._service_cutoff(order):
return False
delivery_value = self._completion_reward(order, best_finish)
return delivery_value >= max(3.0, self._rejection_penalty(order) * 1.2)