Spaces:
Sleeping
Sleeping
File size: 12,979 Bytes
08e15f1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 |
from solverforge_legacy.solver import SolverStatus
from solverforge_legacy.solver.score import HardSoftScore
from solverforge_legacy.solver.domain import (
planning_entity,
planning_solution,
PlanningId,
PlanningScore,
PlanningListVariable,
PlanningEntityCollectionProperty,
ValueRangeProvider,
InverseRelationShadowVariable,
PreviousElementShadowVariable,
NextElementShadowVariable,
CascadingUpdateShadowVariable,
)
from datetime import datetime, timedelta
from typing import Annotated, Optional, List, Union, ClassVar, TYPE_CHECKING
from dataclasses import dataclass, field
from .json_serialization import JsonDomainBase
from pydantic import Field
if TYPE_CHECKING:
from .routing import DistanceMatrix
@dataclass
class Location:
"""
Represents a geographic location with latitude and longitude.
Driving times can be computed using either:
1. A precomputed distance matrix (if set) - uses real road network data
2. The Haversine formula (fallback) - uses great-circle distance
"""
latitude: float
longitude: float
# Class-level distance matrix (injected at problem load time)
_distance_matrix: ClassVar[Optional["DistanceMatrix"]] = None
# Earth radius in meters
_EARTH_RADIUS_M = 6371000
_TWICE_EARTH_RADIUS_M = 2 * _EARTH_RADIUS_M
# Average driving speed assumption: 50 km/h
_AVERAGE_SPEED_KMPH = 50
@classmethod
def set_distance_matrix(cls, matrix: "DistanceMatrix") -> None:
"""Inject a precomputed distance matrix for real road routing."""
cls._distance_matrix = matrix
@classmethod
def clear_distance_matrix(cls) -> None:
"""Clear the distance matrix (reverts to haversine fallback)."""
cls._distance_matrix = None
@classmethod
def get_distance_matrix(cls) -> Optional["DistanceMatrix"]:
"""Get the current distance matrix, if any."""
return cls._distance_matrix
def driving_time_to(self, other: "Location") -> int:
"""
Get driving time in seconds to another location.
Uses the precomputed distance matrix if available, otherwise
falls back to haversine calculation.
"""
if self._distance_matrix is not None:
return self._distance_matrix.get_driving_time(self, other)
return self._calculate_driving_time_haversine(other)
def _calculate_driving_time_haversine(self, other: "Location") -> int:
"""
Calculate driving time in seconds using Haversine distance.
Algorithm:
1. Convert lat/long to 3D Cartesian coordinates on a unit sphere
2. Calculate Euclidean distance between the two points
3. Use the arc sine formula to get the great-circle distance
4. Convert meters to driving seconds assuming average speed
"""
if self.latitude == other.latitude and self.longitude == other.longitude:
return 0
from_cartesian = self._to_cartesian()
to_cartesian = other._to_cartesian()
distance_meters = self._calculate_distance(from_cartesian, to_cartesian)
return self._meters_to_driving_seconds(distance_meters)
def _to_cartesian(self) -> tuple[float, float, float, float]:
"""Convert latitude/longitude to 3D Cartesian coordinates on a unit sphere."""
import math
lat_rad = math.radians(self.latitude)
lon_rad = math.radians(self.longitude)
# Cartesian coordinates, normalized for a sphere of diameter 1.0
x = 0.5 * math.cos(lat_rad) * math.sin(lon_rad)
y = 0.5 * math.cos(lat_rad) * math.cos(lon_rad)
z = 0.5 * math.sin(lat_rad)
return (x, y, z)
def _calculate_distance(self, from_c: tuple[float, float, float, float], to_c: tuple[float, float, float, float]) -> int:
"""Calculate great-circle distance in meters between two Cartesian points."""
import math
dx = from_c[0] - to_c[0]
dy = from_c[1] - to_c[1]
dz = from_c[2] - to_c[2]
r = math.sqrt(dx * dx + dy * dy + dz * dz)
return round(self._TWICE_EARTH_RADIUS_M * math.asin(r))
@classmethod
def _meters_to_driving_seconds(cls, meters: int) -> int:
"""Convert distance in meters to driving time in seconds."""
# Formula: seconds = meters / (km/h) * 3.6
# This is equivalent to: seconds = meters / (speed_m_per_s)
# where speed_m_per_s = km/h / 3.6
return round(meters / cls._AVERAGE_SPEED_KMPH * 3.6)
def __str__(self):
return f"[{self.latitude}, {self.longitude}]"
def __repr__(self):
return f"Location({self.latitude}, {self.longitude})"
@planning_entity
@dataclass
class Visit:
id: Annotated[str, PlanningId]
name: str
location: Location
demand: int
min_start_time: datetime
max_end_time: datetime
service_duration: timedelta
vehicle: Annotated[
Optional["Vehicle"],
InverseRelationShadowVariable(source_variable_name="visits"),
] = None
previous_visit: Annotated[
Optional["Visit"], PreviousElementShadowVariable(source_variable_name="visits")
] = None
next_visit: Annotated[
Optional["Visit"], NextElementShadowVariable(source_variable_name="visits")
] = None
arrival_time: Annotated[
Optional[datetime],
CascadingUpdateShadowVariable(target_method_name="update_arrival_time"),
] = None
def update_arrival_time(self):
if self.vehicle is None or (
self.previous_visit is not None and self.previous_visit.arrival_time is None
):
self.arrival_time = None
elif self.previous_visit is None:
self.arrival_time = self.vehicle.departure_time + timedelta(
seconds=self.vehicle.home_location.driving_time_to(self.location)
)
else:
self.arrival_time = (
self.previous_visit.calculate_departure_time()
+ timedelta(
seconds=self.previous_visit.location.driving_time_to(self.location)
)
)
def calculate_departure_time(self):
if self.arrival_time is None:
return None
return max(self.arrival_time, self.min_start_time) + self.service_duration
@property
def departure_time(self) -> Optional[datetime]:
return self.calculate_departure_time()
@property
def start_service_time(self) -> Optional[datetime]:
if self.arrival_time is None:
return None
return max(self.arrival_time, self.min_start_time)
def is_service_finished_after_max_end_time(self) -> bool:
return (
self.arrival_time is not None
and self.calculate_departure_time() > self.max_end_time
)
def service_finished_delay_in_minutes(self) -> int:
if self.arrival_time is None:
return 0
# Round up to next minute using the negative division trick:
# ex: 30 seconds / -1 minute = -0.5,
# so 30 seconds // -1 minute = -1,
# and negating that gives 1
return -(
(self.calculate_departure_time() - self.max_end_time)
// timedelta(minutes=-1)
)
@property
def driving_time_seconds_from_previous_standstill(self) -> Optional[int]:
if self.vehicle is None:
return None
if self.previous_visit is None:
return self.vehicle.home_location.driving_time_to(self.location)
else:
return self.previous_visit.location.driving_time_to(self.location)
def __str__(self):
return self.id
def __repr__(self):
return f"Visit({self.id})"
@planning_entity
@dataclass
class Vehicle:
id: Annotated[str, PlanningId]
name: str
capacity: int
home_location: Location
departure_time: datetime
visits: Annotated[list[Visit], PlanningListVariable] = field(default_factory=list)
@property
def arrival_time(self) -> datetime:
if len(self.visits) == 0:
return self.departure_time
return self.visits[-1].departure_time + timedelta(
seconds=self.visits[-1].location.driving_time_to(self.home_location)
)
@property
def total_demand(self) -> int:
return self.calculate_total_demand()
@property
def total_driving_time_seconds(self) -> int:
return self.calculate_total_driving_time_seconds()
def calculate_total_demand(self) -> int:
total_demand = 0
for visit in self.visits:
total_demand += visit.demand
return total_demand
def calculate_total_driving_time_seconds(self) -> int:
if len(self.visits) == 0:
return 0
total_driving_time_seconds = 0
previous_location = self.home_location
for visit in self.visits:
total_driving_time_seconds += previous_location.driving_time_to(
visit.location
)
previous_location = visit.location
total_driving_time_seconds += previous_location.driving_time_to(
self.home_location
)
return total_driving_time_seconds
def __str__(self):
return self.name
def __repr__(self):
return f"Vehicle({self.id}, {self.name})"
@planning_solution
@dataclass
class VehicleRoutePlan:
name: str
south_west_corner: Location
north_east_corner: Location
vehicles: Annotated[list[Vehicle], PlanningEntityCollectionProperty]
visits: Annotated[list[Visit], PlanningEntityCollectionProperty, ValueRangeProvider]
score: Annotated[Optional[HardSoftScore], PlanningScore] = None
solver_status: SolverStatus = SolverStatus.NOT_SOLVING
@property
def total_driving_time_seconds(self) -> int:
out = 0
for vehicle in self.vehicles:
out += vehicle.total_driving_time_seconds
return out
@property
def start_date_time(self) -> Optional[datetime]:
"""Earliest vehicle departure time - for timeline window."""
if not self.vehicles:
return None
return min(v.departure_time for v in self.vehicles)
@property
def end_date_time(self) -> Optional[datetime]:
"""Latest vehicle arrival time - for timeline window."""
if not self.vehicles:
return None
return max(v.arrival_time for v in self.vehicles)
def __str__(self):
return f"VehicleRoutePlan(name={self.name}, vehicles={self.vehicles}, visits={self.visits})"
# Pydantic REST models for API (used for deserialization and context)
class LocationModel(JsonDomainBase):
latitude: float
longitude: float
class VisitModel(JsonDomainBase):
id: str
name: str
location: List[float] # [lat, lng] array
demand: int
min_start_time: str = Field(..., alias="minStartTime") # ISO datetime string
max_end_time: str = Field(..., alias="maxEndTime") # ISO datetime string
service_duration: int = Field(..., alias="serviceDuration") # Duration in seconds
vehicle: Union[str, "VehicleModel", None] = None
previous_visit: Union[str, "VisitModel", None] = Field(None, alias="previousVisit")
next_visit: Union[str, "VisitModel", None] = Field(None, alias="nextVisit")
arrival_time: Optional[str] = Field(
None, alias="arrivalTime"
) # ISO datetime string
start_service_time: Optional[str] = Field(
None, alias="startServiceTime"
) # ISO datetime string
departure_time: Optional[str] = Field(
None, alias="departureTime"
) # ISO datetime string
driving_time_seconds_from_previous_standstill: Optional[int] = Field(
None, alias="drivingTimeSecondsFromPreviousStandstill"
)
class VehicleModel(JsonDomainBase):
id: str
name: str
capacity: int
home_location: List[float] = Field(..., alias="homeLocation") # [lat, lng] array
departure_time: str = Field(..., alias="departureTime") # ISO datetime string
visits: List[Union[str, VisitModel]] = Field(default_factory=list)
total_demand: int = Field(0, alias="totalDemand")
total_driving_time_seconds: int = Field(0, alias="totalDrivingTimeSeconds")
arrival_time: Optional[str] = Field(
None, alias="arrivalTime"
) # ISO datetime string
class VehicleRoutePlanModel(JsonDomainBase):
name: str
south_west_corner: List[float] = Field(
..., alias="southWestCorner"
) # [lat, lng] array
north_east_corner: List[float] = Field(
..., alias="northEastCorner"
) # [lat, lng] array
vehicles: List[VehicleModel]
visits: List[VisitModel]
score: Optional[str] = None
solver_status: Optional[str] = None
total_driving_time_seconds: int = Field(0, alias="totalDrivingTimeSeconds")
start_date_time: Optional[str] = Field(None, alias="startDateTime")
end_date_time: Optional[str] = Field(None, alias="endDateTime")
|