| from __future__ import annotations |
|
|
| import secrets |
|
|
| from .dynamics import Counts, count_moved, move_cost, parse_target_counts, pressure_label, round_service_reward |
| from .generator import generate_recipe |
| from .models import ( |
| HiddenRecipe, |
| V3Action, |
| V3Feedback, |
| V3Observation, |
| V3Reward, |
| V3ScenarioInfo, |
| V3StepResult, |
| ZoneSnapshot, |
| ) |
| from .task_adapter import to_internal_task_id, to_public_task_id |
|
|
|
|
| class V3DeliveryDispatchEnv: |
| def __init__(self, default_task_id: str = "medium_dispatch") -> None: |
| self.default_task_id = default_task_id |
| self.recipe: HiddenRecipe | None = None |
| self.task_id = default_task_id |
| self.internal_task_id = to_internal_task_id(default_task_id) |
| self.public_seed = 0 |
| self.internal_seed = 0 |
| self.pool_name = "test" |
| self.round_index = 0 |
| self.courier_counts: Counts = () |
| self.cumulative_reward = 0.0 |
| self.last_step_reward = 0.0 |
| self.recent_events: list[str] = [] |
| self.done = False |
| self.last_episode_summary: dict[str, object] | None = None |
|
|
| def reset( |
| self, |
| task_id: str | None = None, |
| seed: int | None = None, |
| pool_name: str = "test", |
| ) -> V3Observation: |
| self.pool_name = pool_name |
| self.public_seed = seed if seed is not None else secrets.randbelow(1_000_000_000) |
| from .seed_catalog import ( |
| choose_random_curated_seed, |
| choose_random_task_id, |
| resolve_curated_seed, |
| resolve_task_id, |
| ) |
|
|
| if task_id is None: |
| self.internal_task_id = ( |
| resolve_task_id(self.public_seed) if seed is not None else choose_random_task_id() |
| ) |
| else: |
| self.internal_task_id = to_internal_task_id(task_id) |
| self.task_id = to_public_task_id(self.internal_task_id) |
|
|
| if seed is None: |
| self.internal_seed = choose_random_curated_seed(self.internal_task_id, pool_name=self.pool_name) |
| else: |
| self.internal_seed = resolve_curated_seed( |
| self.internal_task_id, |
| self.public_seed, |
| pool_name=self.pool_name, |
| ) |
| self.recipe = generate_recipe(self.internal_task_id, self.internal_seed) |
| self.round_index = 0 |
| self.courier_counts = self.recipe.initial_courier_counts |
| self.cumulative_reward = 0.0 |
| self.last_step_reward = 0.0 |
| self.recent_events = ["environment reset"] |
| self.done = False |
| self.last_episode_summary = None |
| return self.state() |
|
|
| def reset_internal( |
| self, |
| task_id: str, |
| internal_seed: int, |
| public_seed: int | None = None, |
| pool_name: str = "test", |
| ) -> V3Observation: |
| self.internal_task_id = to_internal_task_id(task_id) |
| self.task_id = to_public_task_id(self.internal_task_id) |
| self.pool_name = pool_name |
| self.public_seed = internal_seed if public_seed is None else public_seed |
| self.internal_seed = internal_seed |
| self.recipe = generate_recipe(self.internal_task_id, self.internal_seed) |
| self.round_index = 0 |
| self.courier_counts = self.recipe.initial_courier_counts |
| self.cumulative_reward = 0.0 |
| self.last_step_reward = 0.0 |
| self.recent_events = ["environment reset"] |
| self.done = False |
| self.last_episode_summary = None |
| return self.state() |
|
|
| def state(self) -> V3Observation: |
| recipe = self._require_recipe() |
| if self.done: |
| round_template = recipe.rounds[-1] |
| remaining_rounds = 0 |
| else: |
| round_template = recipe.rounds[self.round_index] |
| remaining_rounds = recipe.profile.total_rounds - self.round_index |
| return V3Observation( |
| round_index=self.round_index, |
| remaining_rounds=remaining_rounds, |
| task_id=self.task_id, |
| zones=[ |
| ZoneSnapshot( |
| zone_id=zone.zone_id, |
| label=zone.label, |
| courier_count=self.courier_counts[index], |
| visible_orders=round_template.visible_orders_by_zone[index], |
| reward_per_order=round_template.reward_per_order_by_zone[index], |
| congestion_multiplier=round_template.congestion_multiplier_by_zone[index], |
| ) |
| for index, zone in enumerate(recipe.zone_specs) |
| ], |
| feedback=V3Feedback( |
| last_step_reward=self.last_step_reward, |
| cumulative_reward=self.cumulative_reward, |
| recent_events=list(self.recent_events), |
| current_pressure=pressure_label(round_template, recipe.zone_specs), |
| ), |
| scenario_info=V3ScenarioInfo( |
| task_id=self.task_id, |
| used_seed=self.public_seed, |
| total_rounds=recipe.profile.total_rounds, |
| total_couriers=recipe.profile.courier_count, |
| max_repositions_per_round=recipe.profile.max_repositions_per_round, |
| objective_brief="Maximize cumulative delivery reward across the full episode, not just the current round.", |
| action_brief="Return target courier counts for every zone; counts should sum to the total courier count.", |
| episode_brief="An episode lasts for a fixed number of rounds and ends when done=true.", |
| ), |
| ) |
|
|
| def step(self, action: V3Action, grade_terminal: bool = True) -> V3StepResult: |
| if self.done: |
| info: dict[str, object] = {"message": "episode already finished"} |
| if self.last_episode_summary is not None: |
| info["episode_summary"] = dict(self.last_episode_summary) |
| return V3StepResult( |
| observation=self.state(), |
| reward=V3Reward(step_reward=0.0, cumulative_reward=self.cumulative_reward), |
| done=True, |
| info=info, |
| ) |
|
|
| recipe = self._require_recipe() |
| round_template = recipe.rounds[self.round_index] |
| step_reward, served, missed = round_service_reward( |
| round_template=round_template, |
| current_counts=self.courier_counts, |
| missed_order_penalty=recipe.profile.missed_order_penalty, |
| ) |
|
|
| events = [f"served {sum(served)} orders, missed {sum(missed)}"] |
| invalid_penalty = 0.0 |
| next_counts = self.courier_counts |
| if self.round_index < recipe.profile.total_rounds - 1: |
| parsed = parse_target_counts(self.state(), action) |
| if parsed is None: |
| invalid_penalty -= 8.0 |
| events.append("invalid target allocation; kept current fleet distribution") |
| elif count_moved(self.courier_counts, parsed) > recipe.profile.max_repositions_per_round: |
| invalid_penalty -= 6.0 |
| events.append("target allocation exceeded reposition cap; kept current fleet distribution") |
| else: |
| next_counts = parsed |
| else: |
| events.append("final round; ignored rebalancing target") |
|
|
| movement_cost = 0.0 |
| if self.round_index < recipe.profile.total_rounds - 1 and next_counts != self.courier_counts: |
| movement_cost = move_cost(recipe, self.round_index, self.courier_counts, next_counts) |
| step_reward -= movement_cost |
| events.append(f"rebalanced fleet for {movement_cost:.1f} movement cost") |
|
|
| step_reward += invalid_penalty |
| self.cumulative_reward += step_reward |
| self.last_step_reward = step_reward |
| self.recent_events = events |
| self.courier_counts = next_counts |
| self.round_index += 1 |
| self.done = self.round_index >= recipe.profile.total_rounds |
|
|
| info: dict[str, object] = {} |
| if self.done and grade_terminal: |
| from .grading import grade_episode |
|
|
| task_result = grade_episode( |
| task_id=self.internal_task_id, |
| seed=self.internal_seed, |
| raw_reward=self.cumulative_reward, |
| ) |
| self.last_episode_summary = { |
| "raw_reward": round(task_result.raw_reward, 3), |
| "baseline_reward": round(task_result.baseline_reward, 3), |
| "target_reward": round(task_result.target_reward, 3), |
| "heuristic_reward": None if task_result.heuristic_reward is None else round(task_result.heuristic_reward, 3), |
| "graded_score": round(task_result.score, 4), |
| } |
| info["episode_summary"] = dict(self.last_episode_summary) |
|
|
| return V3StepResult( |
| observation=self.state(), |
| reward=V3Reward(step_reward=step_reward, cumulative_reward=self.cumulative_reward), |
| done=self.done, |
| info=info, |
| ) |
|
|
| def clone(self) -> "V3DeliveryDispatchEnv": |
| clone = V3DeliveryDispatchEnv(default_task_id=self.default_task_id) |
| clone.recipe = self.recipe |
| clone.task_id = self.task_id |
| clone.internal_task_id = self.internal_task_id |
| clone.public_seed = self.public_seed |
| clone.internal_seed = self.internal_seed |
| clone.pool_name = self.pool_name |
| clone.round_index = self.round_index |
| clone.courier_counts = self.courier_counts |
| clone.cumulative_reward = self.cumulative_reward |
| clone.last_step_reward = self.last_step_reward |
| clone.recent_events = list(self.recent_events) |
| clone.done = self.done |
| clone.last_episode_summary = None if self.last_episode_summary is None else dict(self.last_episode_summary) |
| return clone |
|
|
| def _require_recipe(self) -> HiddenRecipe: |
| if self.recipe is None: |
| self.reset(self.default_task_id, 101) |
| assert self.recipe is not None |
| return self.recipe |
|
|