from solverforge_legacy.solver import SolverStatus from solverforge_legacy.solver.score import HardSoftScore from solverforge_legacy.solver.domain import ( planning_entity, planning_solution, PlanningId, PlanningScore, PlanningListVariable, PlanningEntityCollectionProperty, ValueRangeProvider, InverseRelationShadowVariable, PreviousElementShadowVariable, NextElementShadowVariable, CascadingUpdateShadowVariable, ) from datetime import datetime, timedelta from typing import Annotated, Optional, List, Union, ClassVar, TYPE_CHECKING from dataclasses import dataclass, field from .json_serialization import JsonDomainBase from pydantic import Field if TYPE_CHECKING: from .routing import DistanceMatrix @dataclass class Location: """ Represents a geographic location with latitude and longitude. Driving times can be computed using either: 1. A precomputed distance matrix (if set) - uses real road network data 2. The Haversine formula (fallback) - uses great-circle distance """ latitude: float longitude: float # Class-level distance matrix (injected at problem load time) _distance_matrix: ClassVar[Optional["DistanceMatrix"]] = None # Earth radius in meters _EARTH_RADIUS_M = 6371000 _TWICE_EARTH_RADIUS_M = 2 * _EARTH_RADIUS_M # Average driving speed assumption: 50 km/h _AVERAGE_SPEED_KMPH = 50 @classmethod def set_distance_matrix(cls, matrix: "DistanceMatrix") -> None: """Inject a precomputed distance matrix for real road routing.""" cls._distance_matrix = matrix @classmethod def clear_distance_matrix(cls) -> None: """Clear the distance matrix (reverts to haversine fallback).""" cls._distance_matrix = None @classmethod def get_distance_matrix(cls) -> Optional["DistanceMatrix"]: """Get the current distance matrix, if any.""" return cls._distance_matrix def driving_time_to(self, other: "Location") -> int: """ Get driving time in seconds to another location. Uses the precomputed distance matrix if available, otherwise falls back to haversine calculation. """ if self._distance_matrix is not None: return self._distance_matrix.get_driving_time(self, other) return self._calculate_driving_time_haversine(other) def _calculate_driving_time_haversine(self, other: "Location") -> int: """ Calculate driving time in seconds using Haversine distance. Algorithm: 1. Convert lat/long to 3D Cartesian coordinates on a unit sphere 2. Calculate Euclidean distance between the two points 3. Use the arc sine formula to get the great-circle distance 4. Convert meters to driving seconds assuming average speed """ if self.latitude == other.latitude and self.longitude == other.longitude: return 0 from_cartesian = self._to_cartesian() to_cartesian = other._to_cartesian() distance_meters = self._calculate_distance(from_cartesian, to_cartesian) return self._meters_to_driving_seconds(distance_meters) def _to_cartesian(self) -> tuple[float, float, float, float]: """Convert latitude/longitude to 3D Cartesian coordinates on a unit sphere.""" import math lat_rad = math.radians(self.latitude) lon_rad = math.radians(self.longitude) # Cartesian coordinates, normalized for a sphere of diameter 1.0 x = 0.5 * math.cos(lat_rad) * math.sin(lon_rad) y = 0.5 * math.cos(lat_rad) * math.cos(lon_rad) z = 0.5 * math.sin(lat_rad) return (x, y, z) def _calculate_distance(self, from_c: tuple[float, float, float, float], to_c: tuple[float, float, float, float]) -> int: """Calculate great-circle distance in meters between two Cartesian points.""" import math dx = from_c[0] - to_c[0] dy = from_c[1] - to_c[1] dz = from_c[2] - to_c[2] r = math.sqrt(dx * dx + dy * dy + dz * dz) return round(self._TWICE_EARTH_RADIUS_M * math.asin(r)) @classmethod def _meters_to_driving_seconds(cls, meters: int) -> int: """Convert distance in meters to driving time in seconds.""" # Formula: seconds = meters / (km/h) * 3.6 # This is equivalent to: seconds = meters / (speed_m_per_s) # where speed_m_per_s = km/h / 3.6 return round(meters / cls._AVERAGE_SPEED_KMPH * 3.6) def __str__(self): return f"[{self.latitude}, {self.longitude}]" def __repr__(self): return f"Location({self.latitude}, {self.longitude})" @planning_entity @dataclass class Visit: id: Annotated[str, PlanningId] name: str location: Location demand: int min_start_time: datetime max_end_time: datetime service_duration: timedelta vehicle: Annotated[ Optional["Vehicle"], InverseRelationShadowVariable(source_variable_name="visits"), ] = None previous_visit: Annotated[ Optional["Visit"], PreviousElementShadowVariable(source_variable_name="visits") ] = None next_visit: Annotated[ Optional["Visit"], NextElementShadowVariable(source_variable_name="visits") ] = None arrival_time: Annotated[ Optional[datetime], CascadingUpdateShadowVariable(target_method_name="update_arrival_time"), ] = None def update_arrival_time(self): if self.vehicle is None or ( self.previous_visit is not None and self.previous_visit.arrival_time is None ): self.arrival_time = None elif self.previous_visit is None: self.arrival_time = self.vehicle.departure_time + timedelta( seconds=self.vehicle.home_location.driving_time_to(self.location) ) else: self.arrival_time = ( self.previous_visit.calculate_departure_time() + timedelta( seconds=self.previous_visit.location.driving_time_to(self.location) ) ) def calculate_departure_time(self): if self.arrival_time is None: return None return max(self.arrival_time, self.min_start_time) + self.service_duration @property def departure_time(self) -> Optional[datetime]: return self.calculate_departure_time() @property def start_service_time(self) -> Optional[datetime]: if self.arrival_time is None: return None return max(self.arrival_time, self.min_start_time) def is_service_finished_after_max_end_time(self) -> bool: return ( self.arrival_time is not None and self.calculate_departure_time() > self.max_end_time ) def service_finished_delay_in_minutes(self) -> int: if self.arrival_time is None: return 0 # Round up to next minute using the negative division trick: # ex: 30 seconds / -1 minute = -0.5, # so 30 seconds // -1 minute = -1, # and negating that gives 1 return -( (self.calculate_departure_time() - self.max_end_time) // timedelta(minutes=-1) ) @property def driving_time_seconds_from_previous_standstill(self) -> Optional[int]: if self.vehicle is None: return None if self.previous_visit is None: return self.vehicle.home_location.driving_time_to(self.location) else: return self.previous_visit.location.driving_time_to(self.location) def __str__(self): return self.id def __repr__(self): return f"Visit({self.id})" @planning_entity @dataclass class Vehicle: id: Annotated[str, PlanningId] name: str capacity: int home_location: Location departure_time: datetime visits: Annotated[list[Visit], PlanningListVariable] = field(default_factory=list) @property def arrival_time(self) -> datetime: if len(self.visits) == 0: return self.departure_time return self.visits[-1].departure_time + timedelta( seconds=self.visits[-1].location.driving_time_to(self.home_location) ) @property def total_demand(self) -> int: return self.calculate_total_demand() @property def total_driving_time_seconds(self) -> int: return self.calculate_total_driving_time_seconds() def calculate_total_demand(self) -> int: total_demand = 0 for visit in self.visits: total_demand += visit.demand return total_demand def calculate_total_driving_time_seconds(self) -> int: if len(self.visits) == 0: return 0 total_driving_time_seconds = 0 previous_location = self.home_location for visit in self.visits: total_driving_time_seconds += previous_location.driving_time_to( visit.location ) previous_location = visit.location total_driving_time_seconds += previous_location.driving_time_to( self.home_location ) return total_driving_time_seconds def __str__(self): return self.name def __repr__(self): return f"Vehicle({self.id}, {self.name})" @planning_solution @dataclass class VehicleRoutePlan: name: str south_west_corner: Location north_east_corner: Location vehicles: Annotated[list[Vehicle], PlanningEntityCollectionProperty] visits: Annotated[list[Visit], PlanningEntityCollectionProperty, ValueRangeProvider] score: Annotated[Optional[HardSoftScore], PlanningScore] = None solver_status: SolverStatus = SolverStatus.NOT_SOLVING @property def total_driving_time_seconds(self) -> int: out = 0 for vehicle in self.vehicles: out += vehicle.total_driving_time_seconds return out @property def start_date_time(self) -> Optional[datetime]: """Earliest vehicle departure time - for timeline window.""" if not self.vehicles: return None return min(v.departure_time for v in self.vehicles) @property def end_date_time(self) -> Optional[datetime]: """Latest vehicle arrival time - for timeline window.""" if not self.vehicles: return None return max(v.arrival_time for v in self.vehicles) def __str__(self): return f"VehicleRoutePlan(name={self.name}, vehicles={self.vehicles}, visits={self.visits})" # Pydantic REST models for API (used for deserialization and context) class LocationModel(JsonDomainBase): latitude: float longitude: float class VisitModel(JsonDomainBase): id: str name: str location: List[float] # [lat, lng] array demand: int min_start_time: str = Field(..., alias="minStartTime") # ISO datetime string max_end_time: str = Field(..., alias="maxEndTime") # ISO datetime string service_duration: int = Field(..., alias="serviceDuration") # Duration in seconds vehicle: Union[str, "VehicleModel", None] = None previous_visit: Union[str, "VisitModel", None] = Field(None, alias="previousVisit") next_visit: Union[str, "VisitModel", None] = Field(None, alias="nextVisit") arrival_time: Optional[str] = Field( None, alias="arrivalTime" ) # ISO datetime string start_service_time: Optional[str] = Field( None, alias="startServiceTime" ) # ISO datetime string departure_time: Optional[str] = Field( None, alias="departureTime" ) # ISO datetime string driving_time_seconds_from_previous_standstill: Optional[int] = Field( None, alias="drivingTimeSecondsFromPreviousStandstill" ) class VehicleModel(JsonDomainBase): id: str name: str capacity: int home_location: List[float] = Field(..., alias="homeLocation") # [lat, lng] array departure_time: str = Field(..., alias="departureTime") # ISO datetime string visits: List[Union[str, VisitModel]] = Field(default_factory=list) total_demand: int = Field(0, alias="totalDemand") total_driving_time_seconds: int = Field(0, alias="totalDrivingTimeSeconds") arrival_time: Optional[str] = Field( None, alias="arrivalTime" ) # ISO datetime string class VehicleRoutePlanModel(JsonDomainBase): name: str south_west_corner: List[float] = Field( ..., alias="southWestCorner" ) # [lat, lng] array north_east_corner: List[float] = Field( ..., alias="northEastCorner" ) # [lat, lng] array vehicles: List[VehicleModel] visits: List[VisitModel] score: Optional[str] = None solver_status: Optional[str] = None total_driving_time_seconds: int = Field(0, alias="totalDrivingTimeSeconds") start_date_time: Optional[str] = Field(None, alias="startDateTime") end_date_time: Optional[str] = Field(None, alias="endDateTime")