Spaces:
Sleeping
Sleeping
| """City-grid physics utilities for the 911 dispatch supervisor environment.""" | |
| from __future__ import annotations | |
| from src.models import IncidentState, UnitState, UnitStatus | |
| CITY_BLOCK_FT = 264.0 | |
| def compute_eta(unit: UnitState, incident: IncidentState, unit_speed: float) -> float: | |
| """Compute Manhattan-distance ETA in seconds. | |
| Args: | |
| unit: Current unit state. | |
| incident: Target incident state. | |
| unit_speed: Speed in blocks / second. | |
| """ | |
| speed = max(float(unit_speed), 1e-6) | |
| dx = abs(float(unit.location_x) - float(incident.location_x)) | |
| dy = abs(float(unit.location_y) - float(incident.location_y)) | |
| distance_blocks = dx + dy | |
| return distance_blocks / speed | |
| def move_unit_toward( | |
| unit: UnitState, | |
| incident: IncidentState, | |
| unit_speed: float, | |
| dt: float, | |
| ) -> UnitState: | |
| """Advance unit position toward incident by dt seconds. | |
| Uses Manhattan (grid) movement: consume movement along x, then y. | |
| """ | |
| speed = max(float(unit_speed), 0.0) | |
| remaining = max(0.0, speed * float(dt)) | |
| ux = float(unit.location_x) | |
| uy = float(unit.location_y) | |
| ix = float(incident.location_x) | |
| iy = float(incident.location_y) | |
| dx = ix - ux | |
| step_x = max(-remaining, min(remaining, dx)) | |
| ux += step_x | |
| remaining -= abs(step_x) | |
| if remaining > 0.0: | |
| dy = iy - uy | |
| step_y = max(-remaining, min(remaining, dy)) | |
| uy += step_y | |
| return unit.model_copy(update={"location_x": ux, "location_y": uy}) | |
| def check_arrival( | |
| unit: UnitState, | |
| incident: IncidentState, | |
| threshold_blocks: float = 0.5, | |
| ) -> bool: | |
| """Return True if unit is considered arrived on scene.""" | |
| dx = abs(float(unit.location_x) - float(incident.location_x)) | |
| dy = abs(float(unit.location_y) - float(incident.location_y)) | |
| return (dx + dy) <= float(threshold_blocks) | |
| def compute_coverage_score( | |
| units: dict[str, UnitState], | |
| grid_size: tuple[int, int], | |
| bins_x: int = 4, | |
| ) -> float: | |
| """Score geographic distribution of units across the x-axis. | |
| This is intentionally simple and deterministic: slice the city width into bins and | |
| compute fraction of bins that contain at least one AVAILABLE unit. | |
| """ | |
| width = max(1, int(grid_size[0])) | |
| bins = max(1, int(bins_x)) | |
| bin_width = width / bins | |
| covered: set[int] = set() | |
| for unit in units.values(): | |
| if unit.status != UnitStatus.AVAILABLE: | |
| continue | |
| idx = int(min(bins - 1, max(0.0, float(unit.location_x)) // max(bin_width, 1e-6))) | |
| covered.add(idx) | |
| return len(covered) / bins | |