Spaces:
Sleeping
Sleeping
File size: 2,608 Bytes
6172160 4904e85 6172160 4904e85 6172160 4904e85 6172160 4904e85 6172160 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | """City-grid physics utilities for the 911 dispatch supervisor environment."""
from __future__ import annotations
from src.models import IncidentState, UnitState, UnitStatus
CITY_BLOCK_FT = 264.0
def compute_eta(unit: UnitState, incident: IncidentState, unit_speed: float) -> float:
"""Compute Manhattan-distance ETA in seconds.
Args:
unit: Current unit state.
incident: Target incident state.
unit_speed: Speed in blocks / second.
"""
speed = max(float(unit_speed), 1e-6)
dx = abs(float(unit.location_x) - float(incident.location_x))
dy = abs(float(unit.location_y) - float(incident.location_y))
distance_blocks = dx + dy
return distance_blocks / speed
def move_unit_toward(
unit: UnitState,
incident: IncidentState,
unit_speed: float,
dt: float,
) -> UnitState:
"""Advance unit position toward incident by dt seconds.
Uses Manhattan (grid) movement: consume movement along x, then y.
"""
speed = max(float(unit_speed), 0.0)
remaining = max(0.0, speed * float(dt))
ux = float(unit.location_x)
uy = float(unit.location_y)
ix = float(incident.location_x)
iy = float(incident.location_y)
dx = ix - ux
step_x = max(-remaining, min(remaining, dx))
ux += step_x
remaining -= abs(step_x)
if remaining > 0.0:
dy = iy - uy
step_y = max(-remaining, min(remaining, dy))
uy += step_y
return unit.model_copy(update={"location_x": ux, "location_y": uy})
def check_arrival(
unit: UnitState,
incident: IncidentState,
threshold_blocks: float = 0.5,
) -> bool:
"""Return True if unit is considered arrived on scene."""
dx = abs(float(unit.location_x) - float(incident.location_x))
dy = abs(float(unit.location_y) - float(incident.location_y))
return (dx + dy) <= float(threshold_blocks)
def compute_coverage_score(
units: dict[str, UnitState],
grid_size: tuple[int, int],
bins_x: int = 4,
) -> float:
"""Score geographic distribution of units across the x-axis.
This is intentionally simple and deterministic: slice the city width into bins and
compute fraction of bins that contain at least one AVAILABLE unit.
"""
width = max(1, int(grid_size[0]))
bins = max(1, int(bins_x))
bin_width = width / bins
covered: set[int] = set()
for unit in units.values():
if unit.status != UnitStatus.AVAILABLE:
continue
idx = int(min(bins - 1, max(0.0, float(unit.location_x)) // max(bin_width, 1e-6)))
covered.add(idx)
return len(covered) / bins
|