| from __future__ import annotations |
|
|
| from functools import lru_cache |
|
|
| from .models import HiddenRecipe, RoundTemplate, V3Action, V3Observation, ZoneAllocation |
|
|
|
|
| Counts = tuple[int, ...] |
|
|
|
|
| def all_distributions(total_couriers: int, zone_count: int) -> tuple[Counts, ...]: |
| return _all_distributions(total_couriers, zone_count) |
|
|
|
|
| @lru_cache(maxsize=None) |
| def _all_distributions(total_couriers: int, zone_count: int) -> tuple[Counts, ...]: |
| if zone_count == 1: |
| return ((total_couriers,),) |
| layouts: list[Counts] = [] |
| for head in range(total_couriers + 1): |
| for tail in _all_distributions(total_couriers - head, zone_count - 1): |
| layouts.append((head, *tail)) |
| return tuple(layouts) |
|
|
|
|
| def parse_target_counts(observation: V3Observation, action: V3Action) -> Counts | None: |
| zone_ids = [zone.zone_id for zone in observation.zones] |
| counts_by_zone: dict[str, int] = {} |
| for allocation in action.target_allocations: |
| if allocation.zone_id in counts_by_zone: |
| return None |
| if allocation.courier_count < 0: |
| return None |
| counts_by_zone[allocation.zone_id] = allocation.courier_count |
| if set(counts_by_zone) != set(zone_ids): |
| return None |
| counts = tuple(counts_by_zone[zone_id] for zone_id in zone_ids) |
| if sum(counts) != observation.scenario_info.total_couriers: |
| return None |
| return counts |
|
|
|
|
| def allocations_from_counts(recipe: HiddenRecipe, counts: Counts) -> V3Action: |
| return V3Action( |
| target_allocations=[ |
| ZoneAllocation(zone_id=zone.zone_id, courier_count=count) |
| for zone, count in zip(recipe.zone_specs, counts, strict=True) |
| ] |
| ) |
|
|
|
|
| def count_moved(current_counts: Counts, target_counts: Counts) -> int: |
| return sum(max(0, current - target) for current, target in zip(current_counts, target_counts, strict=True)) |
|
|
|
|
| def round_service_reward( |
| round_template: RoundTemplate, |
| current_counts: Counts, |
| missed_order_penalty: float, |
| ) -> tuple[float, tuple[int, ...], tuple[int, ...]]: |
| served = tuple( |
| min(couriers, visible_orders) |
| for couriers, visible_orders in zip(current_counts, round_template.visible_orders_by_zone, strict=True) |
| ) |
| missed = tuple( |
| max(0, visible_orders - couriers) |
| for couriers, visible_orders in zip(current_counts, round_template.visible_orders_by_zone, strict=True) |
| ) |
| reward = 0.0 |
| for served_orders, missed_orders, reward_per_order in zip( |
| served, |
| missed, |
| round_template.reward_per_order_by_zone, |
| strict=True, |
| ): |
| reward += served_orders * reward_per_order |
| reward -= missed_orders * missed_order_penalty |
| return reward, served, missed |
|
|
|
|
| def move_cost(recipe: HiddenRecipe, round_index: int, current_counts: Counts, target_counts: Counts) -> float: |
| if current_counts == target_counts: |
| return 0.0 |
| round_template = recipe.rounds[round_index] |
| surplus = tuple(max(0, current - target) for current, target in zip(current_counts, target_counts, strict=True)) |
| deficit = tuple(max(0, target - current) for current, target in zip(current_counts, target_counts, strict=True)) |
| origin_slots = expand_counts(surplus) |
| target_slots = expand_counts(deficit) |
| return recipe.profile.move_cost_weight * _assignment_cost( |
| tuple(origin_slots), |
| tuple(target_slots), |
| tuple(zone.position for zone in recipe.zone_specs), |
| round_template.congestion_multiplier_by_zone, |
| ) |
|
|
|
|
| def legal_next_counts(recipe: HiddenRecipe, current_counts: Counts) -> tuple[Counts, ...]: |
| all_counts = all_distributions(recipe.profile.courier_count, recipe.profile.zone_count) |
| return tuple( |
| counts |
| for counts in all_counts |
| if count_moved(current_counts, counts) <= recipe.profile.max_repositions_per_round |
| ) |
|
|
|
|
| def pressure_label(round_template: RoundTemplate, zone_specs: tuple) -> str: |
| peak_index = max(range(len(zone_specs)), key=lambda index: round_template.visible_orders_by_zone[index]) |
| peak_zone = zone_specs[peak_index] |
| peak_orders = round_template.visible_orders_by_zone[peak_index] |
| return f"{peak_zone.label} leads with {peak_orders} visible orders" |
|
|
|
|
| def expand_counts(counts: Counts) -> list[int]: |
| slots: list[int] = [] |
| for zone_index, count in enumerate(counts): |
| slots.extend([zone_index] * count) |
| return slots |
|
|
|
|
| @lru_cache(maxsize=None) |
| def _assignment_cost( |
| origins: tuple[int, ...], |
| targets: tuple[int, ...], |
| positions: tuple[tuple[int, int], ...], |
| congestion_multiplier_by_zone: tuple[float, ...], |
| ) -> float: |
| courier_count = len(origins) |
| if courier_count == 0: |
| return 0.0 |
|
|
| @lru_cache(maxsize=None) |
| def solve(origin_index: int, used_mask: int) -> float: |
| if origin_index >= courier_count: |
| return 0.0 |
| best = float("inf") |
| for target_index in range(courier_count): |
| if used_mask & (1 << target_index): |
| continue |
| origin_zone = origins[origin_index] |
| target_zone = targets[target_index] |
| distance = manhattan(positions[origin_zone], positions[target_zone]) |
| congestion_multiplier = congestion_multiplier_by_zone[target_zone] |
| candidate = distance * congestion_multiplier + solve(origin_index + 1, used_mask | (1 << target_index)) |
| if candidate < best: |
| best = candidate |
| return best |
|
|
| return solve(0, 0) |
|
|
|
|
| def manhattan(a: tuple[int, int], b: tuple[int, int]) -> int: |
| return abs(a[0] - b[0]) + abs(a[1] - b[1]) |
|
|