| """Queue management service with virtual queue support.""" |
|
|
| import logging |
| from datetime import datetime, timedelta |
| from typing import List, Dict, Optional |
| from models.queue import QueueStation, VirtualQueueTicket |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class QueueService: |
| """Manages queues, wait times, and virtual queue reservations.""" |
|
|
| def __init__(self): |
| self._stations: Dict[str, QueueStation] = {} |
| self._virtual_tickets: Dict[str, VirtualQueueTicket] = {} |
| self._station_history: Dict[str, List[Dict]] = {} |
|
|
| def register_station(self, station: QueueStation): |
| """Register a queue station.""" |
| self._stations[station.id] = station |
|
|
| def register_stations(self, stations: List[QueueStation]): |
| """Register multiple queue stations.""" |
| for s in stations: |
| self._stations[s.id] = s |
|
|
| def get_station(self, station_id: str) -> Optional[QueueStation]: |
| """Get a station by ID.""" |
| return self._stations.get(station_id) |
|
|
| def get_all_stations(self) -> List[Dict]: |
| """Get all stations as dicts.""" |
| return [s.to_dict() for s in self._stations.values()] |
|
|
| def get_stations_by_category(self, category: str) -> List[Dict]: |
| """Get stations filtered by category.""" |
| return [ |
| s.to_dict() |
| for s in self._stations.values() |
| if s.category == category and s.is_open |
| ] |
|
|
| def get_stations_by_zone(self, zone_id: str) -> List[Dict]: |
| """Get stations in a specific zone.""" |
| return [ |
| s.to_dict() |
| for s in self._stations.values() |
| if s.zone_id == zone_id |
| ] |
|
|
| def update_queue_length(self, station_id: str, new_length: int) -> Optional[Dict]: |
| """Update the queue length for a station.""" |
| station = self._stations.get(station_id) |
| if not station: |
| return None |
|
|
| station.current_length = max(0, new_length) |
|
|
| |
| if station_id not in self._station_history: |
| self._station_history[station_id] = [] |
| self._station_history[station_id].append({ |
| "length": station.current_length, |
| "wait_min": station.estimated_wait_minutes, |
| "timestamp": datetime.utcnow().isoformat(), |
| }) |
|
|
| |
| if len(self._station_history[station_id]) > 100: |
| self._station_history[station_id] = self._station_history[station_id][-50:] |
|
|
| return station.to_dict() |
|
|
| def get_shortest_queue(self, category: str = None) -> Optional[Dict]: |
| """Find the station with the shortest wait time.""" |
| stations = [ |
| s for s in self._stations.values() |
| if s.is_open and (category is None or s.category == category) |
| ] |
| if not stations: |
| return None |
| shortest = min(stations, key=lambda s: s.estimated_wait_minutes) |
| return shortest.to_dict() |
|
|
| def get_queue_summary(self) -> Dict: |
| """Get an overview of all queues.""" |
| open_stations = [s for s in self._stations.values() if s.is_open] |
| categories = {} |
|
|
| for station in open_stations: |
| cat = station.category |
| if cat not in categories: |
| categories[cat] = { |
| "category": cat, |
| "total_stations": 0, |
| "total_in_queue": 0, |
| "avg_wait_minutes": 0.0, |
| "shortest_wait": float("inf"), |
| "shortest_station": None, |
| } |
| categories[cat]["total_stations"] += 1 |
| categories[cat]["total_in_queue"] += station.current_length |
|
|
| wait = station.estimated_wait_minutes |
| if wait < categories[cat]["shortest_wait"]: |
| categories[cat]["shortest_wait"] = wait |
| categories[cat]["shortest_station"] = station.name |
|
|
| |
| for cat_data in categories.values(): |
| if cat_data["total_stations"] > 0: |
| cat_data["avg_wait_minutes"] = round( |
| sum( |
| s.estimated_wait_minutes |
| for s in open_stations |
| if s.category == cat_data["category"] |
| ) / cat_data["total_stations"], |
| 1, |
| ) |
| if cat_data["shortest_wait"] == float("inf"): |
| cat_data["shortest_wait"] = 0 |
|
|
| return { |
| "total_stations": len(open_stations), |
| "total_people_waiting": sum(s.current_length for s in open_stations), |
| "categories": categories, |
| "stations": [s.to_dict() for s in open_stations], |
| } |
|
|
| |
|
|
| def join_virtual_queue(self, user_id: str, station_id: str) -> Optional[Dict]: |
| """Create a virtual queue reservation.""" |
| station = self._stations.get(station_id) |
| if not station or not station.is_open: |
| return None |
|
|
| |
| existing = [ |
| t for t in self._virtual_tickets.values() |
| if t.user_id == user_id and t.station_id == station_id and t.status == "waiting" |
| ] |
| if existing: |
| return existing[0].to_dict() |
|
|
| |
| waiting_count = sum( |
| 1 for t in self._virtual_tickets.values() |
| if t.station_id == station_id and t.status == "waiting" |
| ) |
|
|
| ticket = VirtualQueueTicket( |
| user_id=user_id, |
| station_id=station_id, |
| station_name=station.name, |
| category=station.category, |
| position=waiting_count + 1, |
| estimated_ready_time=datetime.utcnow() + timedelta( |
| seconds=station.avg_service_time_sec * (waiting_count + 1) |
| ), |
| ) |
|
|
| self._virtual_tickets[ticket.id] = ticket |
| return ticket.to_dict() |
|
|
| def get_user_tickets(self, user_id: str) -> List[Dict]: |
| """Get all virtual tickets for a user.""" |
| return [ |
| t.to_dict() |
| for t in self._virtual_tickets.values() |
| if t.user_id == user_id |
| ] |
|
|
| def cancel_ticket(self, ticket_id: str, user_id: str) -> bool: |
| """Cancel a virtual queue ticket.""" |
| ticket = self._virtual_tickets.get(ticket_id) |
| if ticket and ticket.user_id == user_id and ticket.status == "waiting": |
| ticket.status = "expired" |
| return True |
| return False |
|
|
| def get_station_history(self, station_id: str, last_n: int = 20) -> List[Dict]: |
| """Get queue length history for a station.""" |
| return self._station_history.get(station_id, [])[-last_n:] |
|
|