File size: 12,979 Bytes
08e15f1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
from solverforge_legacy.solver import SolverStatus
from solverforge_legacy.solver.score import HardSoftScore
from solverforge_legacy.solver.domain import (
    planning_entity,
    planning_solution,
    PlanningId,
    PlanningScore,
    PlanningListVariable,
    PlanningEntityCollectionProperty,
    ValueRangeProvider,
    InverseRelationShadowVariable,
    PreviousElementShadowVariable,
    NextElementShadowVariable,
    CascadingUpdateShadowVariable,
)

from datetime import datetime, timedelta
from typing import Annotated, Optional, List, Union, ClassVar, TYPE_CHECKING
from dataclasses import dataclass, field
from .json_serialization import JsonDomainBase
from pydantic import Field

if TYPE_CHECKING:
    from .routing import DistanceMatrix


@dataclass
class Location:
    """
    Represents a geographic location with latitude and longitude.

    Driving times can be computed using either:
    1. A precomputed distance matrix (if set) - uses real road network data
    2. The Haversine formula (fallback) - uses great-circle distance
    """
    latitude: float
    longitude: float

    # Class-level distance matrix (injected at problem load time)
    _distance_matrix: ClassVar[Optional["DistanceMatrix"]] = None

    # Earth radius in meters
    _EARTH_RADIUS_M = 6371000
    _TWICE_EARTH_RADIUS_M = 2 * _EARTH_RADIUS_M
    # Average driving speed assumption: 50 km/h
    _AVERAGE_SPEED_KMPH = 50

    @classmethod
    def set_distance_matrix(cls, matrix: "DistanceMatrix") -> None:
        """Inject a precomputed distance matrix for real road routing."""
        cls._distance_matrix = matrix

    @classmethod
    def clear_distance_matrix(cls) -> None:
        """Clear the distance matrix (reverts to haversine fallback)."""
        cls._distance_matrix = None

    @classmethod
    def get_distance_matrix(cls) -> Optional["DistanceMatrix"]:
        """Get the current distance matrix, if any."""
        return cls._distance_matrix

    def driving_time_to(self, other: "Location") -> int:
        """
        Get driving time in seconds to another location.

        Uses the precomputed distance matrix if available, otherwise
        falls back to haversine calculation.
        """
        if self._distance_matrix is not None:
            return self._distance_matrix.get_driving_time(self, other)
        return self._calculate_driving_time_haversine(other)

    def _calculate_driving_time_haversine(self, other: "Location") -> int:
        """
        Calculate driving time in seconds using Haversine distance.

        Algorithm:
        1. Convert lat/long to 3D Cartesian coordinates on a unit sphere
        2. Calculate Euclidean distance between the two points
        3. Use the arc sine formula to get the great-circle distance
        4. Convert meters to driving seconds assuming average speed
        """
        if self.latitude == other.latitude and self.longitude == other.longitude:
            return 0

        from_cartesian = self._to_cartesian()
        to_cartesian = other._to_cartesian()
        distance_meters = self._calculate_distance(from_cartesian, to_cartesian)
        return self._meters_to_driving_seconds(distance_meters)

    def _to_cartesian(self) -> tuple[float, float, float, float]:
        """Convert latitude/longitude to 3D Cartesian coordinates on a unit sphere."""
        import math
        lat_rad = math.radians(self.latitude)
        lon_rad = math.radians(self.longitude)
        # Cartesian coordinates, normalized for a sphere of diameter 1.0
        x = 0.5 * math.cos(lat_rad) * math.sin(lon_rad)
        y = 0.5 * math.cos(lat_rad) * math.cos(lon_rad)
        z = 0.5 * math.sin(lat_rad)
        return (x, y, z)

    def _calculate_distance(self, from_c: tuple[float, float, float, float], to_c: tuple[float, float, float, float]) -> int:
        """Calculate great-circle distance in meters between two Cartesian points."""
        import math
        dx = from_c[0] - to_c[0]
        dy = from_c[1] - to_c[1]
        dz = from_c[2] - to_c[2]
        r = math.sqrt(dx * dx + dy * dy + dz * dz)
        return round(self._TWICE_EARTH_RADIUS_M * math.asin(r))

    @classmethod
    def _meters_to_driving_seconds(cls, meters: int) -> int:
        """Convert distance in meters to driving time in seconds."""
        # Formula: seconds = meters / (km/h) * 3.6
        # This is equivalent to: seconds = meters / (speed_m_per_s)
        # where speed_m_per_s = km/h / 3.6
        return round(meters / cls._AVERAGE_SPEED_KMPH * 3.6)

    def __str__(self):
        return f"[{self.latitude}, {self.longitude}]"

    def __repr__(self):
        return f"Location({self.latitude}, {self.longitude})"


@planning_entity
@dataclass
class Visit:
    id: Annotated[str, PlanningId]
    name: str
    location: Location
    demand: int
    min_start_time: datetime
    max_end_time: datetime
    service_duration: timedelta
    vehicle: Annotated[
        Optional["Vehicle"],
        InverseRelationShadowVariable(source_variable_name="visits"),
    ] = None
    previous_visit: Annotated[
        Optional["Visit"], PreviousElementShadowVariable(source_variable_name="visits")
    ] = None
    next_visit: Annotated[
        Optional["Visit"], NextElementShadowVariable(source_variable_name="visits")
    ] = None
    arrival_time: Annotated[
        Optional[datetime],
        CascadingUpdateShadowVariable(target_method_name="update_arrival_time"),
    ] = None

    def update_arrival_time(self):
        if self.vehicle is None or (
            self.previous_visit is not None and self.previous_visit.arrival_time is None
        ):
            self.arrival_time = None
        elif self.previous_visit is None:
            self.arrival_time = self.vehicle.departure_time + timedelta(
                seconds=self.vehicle.home_location.driving_time_to(self.location)
            )
        else:
            self.arrival_time = (
                self.previous_visit.calculate_departure_time()
                + timedelta(
                    seconds=self.previous_visit.location.driving_time_to(self.location)
                )
            )

    def calculate_departure_time(self):
        if self.arrival_time is None:
            return None

        return max(self.arrival_time, self.min_start_time) + self.service_duration

    @property
    def departure_time(self) -> Optional[datetime]:
        return self.calculate_departure_time()

    @property
    def start_service_time(self) -> Optional[datetime]:
        if self.arrival_time is None:
            return None
        return max(self.arrival_time, self.min_start_time)

    def is_service_finished_after_max_end_time(self) -> bool:
        return (
            self.arrival_time is not None
            and self.calculate_departure_time() > self.max_end_time
        )

    def service_finished_delay_in_minutes(self) -> int:
        if self.arrival_time is None:
            return 0
        # Round up to next minute using the negative division trick:
        # ex: 30 seconds / -1 minute = -0.5,
        # so 30 seconds // -1 minute = -1,
        # and negating that gives 1
        return -(
            (self.calculate_departure_time() - self.max_end_time)
            // timedelta(minutes=-1)
        )

    @property
    def driving_time_seconds_from_previous_standstill(self) -> Optional[int]:
        if self.vehicle is None:
            return None

        if self.previous_visit is None:
            return self.vehicle.home_location.driving_time_to(self.location)
        else:
            return self.previous_visit.location.driving_time_to(self.location)

    def __str__(self):
        return self.id

    def __repr__(self):
        return f"Visit({self.id})"


@planning_entity
@dataclass
class Vehicle:
    id: Annotated[str, PlanningId]
    name: str
    capacity: int
    home_location: Location
    departure_time: datetime
    visits: Annotated[list[Visit], PlanningListVariable] = field(default_factory=list)

    @property
    def arrival_time(self) -> datetime:
        if len(self.visits) == 0:
            return self.departure_time
        return self.visits[-1].departure_time + timedelta(
            seconds=self.visits[-1].location.driving_time_to(self.home_location)
        )

    @property
    def total_demand(self) -> int:
        return self.calculate_total_demand()

    @property
    def total_driving_time_seconds(self) -> int:
        return self.calculate_total_driving_time_seconds()

    def calculate_total_demand(self) -> int:
        total_demand = 0
        for visit in self.visits:
            total_demand += visit.demand
        return total_demand

    def calculate_total_driving_time_seconds(self) -> int:
        if len(self.visits) == 0:
            return 0
        total_driving_time_seconds = 0
        previous_location = self.home_location

        for visit in self.visits:
            total_driving_time_seconds += previous_location.driving_time_to(
                visit.location
            )
            previous_location = visit.location

        total_driving_time_seconds += previous_location.driving_time_to(
            self.home_location
        )
        return total_driving_time_seconds

    def __str__(self):
        return self.name

    def __repr__(self):
        return f"Vehicle({self.id}, {self.name})"


@planning_solution
@dataclass
class VehicleRoutePlan:
    name: str
    south_west_corner: Location
    north_east_corner: Location
    vehicles: Annotated[list[Vehicle], PlanningEntityCollectionProperty]
    visits: Annotated[list[Visit], PlanningEntityCollectionProperty, ValueRangeProvider]
    score: Annotated[Optional[HardSoftScore], PlanningScore] = None
    solver_status: SolverStatus = SolverStatus.NOT_SOLVING

    @property
    def total_driving_time_seconds(self) -> int:
        out = 0
        for vehicle in self.vehicles:
            out += vehicle.total_driving_time_seconds
        return out

    @property
    def start_date_time(self) -> Optional[datetime]:
        """Earliest vehicle departure time - for timeline window."""
        if not self.vehicles:
            return None
        return min(v.departure_time for v in self.vehicles)

    @property
    def end_date_time(self) -> Optional[datetime]:
        """Latest vehicle arrival time - for timeline window."""
        if not self.vehicles:
            return None
        return max(v.arrival_time for v in self.vehicles)

    def __str__(self):
        return f"VehicleRoutePlan(name={self.name}, vehicles={self.vehicles}, visits={self.visits})"


# Pydantic REST models for API (used for deserialization and context)
class LocationModel(JsonDomainBase):
    latitude: float
    longitude: float


class VisitModel(JsonDomainBase):
    id: str
    name: str
    location: List[float]  # [lat, lng] array
    demand: int
    min_start_time: str = Field(..., alias="minStartTime")  # ISO datetime string
    max_end_time: str = Field(..., alias="maxEndTime")  # ISO datetime string
    service_duration: int = Field(..., alias="serviceDuration")  # Duration in seconds
    vehicle: Union[str, "VehicleModel", None] = None
    previous_visit: Union[str, "VisitModel", None] = Field(None, alias="previousVisit")
    next_visit: Union[str, "VisitModel", None] = Field(None, alias="nextVisit")
    arrival_time: Optional[str] = Field(
        None, alias="arrivalTime"
    )  # ISO datetime string
    start_service_time: Optional[str] = Field(
        None, alias="startServiceTime"
    )  # ISO datetime string
    departure_time: Optional[str] = Field(
        None, alias="departureTime"
    )  # ISO datetime string
    driving_time_seconds_from_previous_standstill: Optional[int] = Field(
        None, alias="drivingTimeSecondsFromPreviousStandstill"
    )


class VehicleModel(JsonDomainBase):
    id: str
    name: str
    capacity: int
    home_location: List[float] = Field(..., alias="homeLocation")  # [lat, lng] array
    departure_time: str = Field(..., alias="departureTime")  # ISO datetime string
    visits: List[Union[str, VisitModel]] = Field(default_factory=list)
    total_demand: int = Field(0, alias="totalDemand")
    total_driving_time_seconds: int = Field(0, alias="totalDrivingTimeSeconds")
    arrival_time: Optional[str] = Field(
        None, alias="arrivalTime"
    )  # ISO datetime string


class VehicleRoutePlanModel(JsonDomainBase):
    name: str
    south_west_corner: List[float] = Field(
        ..., alias="southWestCorner"
    )  # [lat, lng] array
    north_east_corner: List[float] = Field(
        ..., alias="northEastCorner"
    )  # [lat, lng] array
    vehicles: List[VehicleModel]
    visits: List[VisitModel]
    score: Optional[str] = None
    solver_status: Optional[str] = None
    total_driving_time_seconds: int = Field(0, alias="totalDrivingTimeSeconds")
    start_date_time: Optional[str] = Field(None, alias="startDateTime")
    end_date_time: Optional[str] = Field(None, alias="endDateTime")