"""Queue management service with virtual queue support.""" import logging from datetime import datetime, timedelta from typing import List, Dict, Optional from models.queue import QueueStation, VirtualQueueTicket logger = logging.getLogger(__name__) class QueueService: """Manages queues, wait times, and virtual queue reservations.""" def __init__(self): self._stations: Dict[str, QueueStation] = {} self._virtual_tickets: Dict[str, VirtualQueueTicket] = {} self._station_history: Dict[str, List[Dict]] = {} def register_station(self, station: QueueStation): """Register a queue station.""" self._stations[station.id] = station def register_stations(self, stations: List[QueueStation]): """Register multiple queue stations.""" for s in stations: self._stations[s.id] = s def get_station(self, station_id: str) -> Optional[QueueStation]: """Get a station by ID.""" return self._stations.get(station_id) def get_all_stations(self) -> List[Dict]: """Get all stations as dicts.""" return [s.to_dict() for s in self._stations.values()] def get_stations_by_category(self, category: str) -> List[Dict]: """Get stations filtered by category.""" return [ s.to_dict() for s in self._stations.values() if s.category == category and s.is_open ] def get_stations_by_zone(self, zone_id: str) -> List[Dict]: """Get stations in a specific zone.""" return [ s.to_dict() for s in self._stations.values() if s.zone_id == zone_id ] def update_queue_length(self, station_id: str, new_length: int) -> Optional[Dict]: """Update the queue length for a station.""" station = self._stations.get(station_id) if not station: return None station.current_length = max(0, new_length) # Track history if station_id not in self._station_history: self._station_history[station_id] = [] self._station_history[station_id].append({ "length": station.current_length, "wait_min": station.estimated_wait_minutes, "timestamp": datetime.utcnow().isoformat(), }) # Trim history if len(self._station_history[station_id]) > 100: self._station_history[station_id] = self._station_history[station_id][-50:] return station.to_dict() def get_shortest_queue(self, category: str = None) -> Optional[Dict]: """Find the station with the shortest wait time.""" stations = [ s for s in self._stations.values() if s.is_open and (category is None or s.category == category) ] if not stations: return None shortest = min(stations, key=lambda s: s.estimated_wait_minutes) return shortest.to_dict() def get_queue_summary(self) -> Dict: """Get an overview of all queues.""" open_stations = [s for s in self._stations.values() if s.is_open] categories = {} for station in open_stations: cat = station.category if cat not in categories: categories[cat] = { "category": cat, "total_stations": 0, "total_in_queue": 0, "avg_wait_minutes": 0.0, "shortest_wait": float("inf"), "shortest_station": None, } categories[cat]["total_stations"] += 1 categories[cat]["total_in_queue"] += station.current_length wait = station.estimated_wait_minutes if wait < categories[cat]["shortest_wait"]: categories[cat]["shortest_wait"] = wait categories[cat]["shortest_station"] = station.name # Calculate averages for cat_data in categories.values(): if cat_data["total_stations"] > 0: cat_data["avg_wait_minutes"] = round( sum( s.estimated_wait_minutes for s in open_stations if s.category == cat_data["category"] ) / cat_data["total_stations"], 1, ) if cat_data["shortest_wait"] == float("inf"): cat_data["shortest_wait"] = 0 return { "total_stations": len(open_stations), "total_people_waiting": sum(s.current_length for s in open_stations), "categories": categories, "stations": [s.to_dict() for s in open_stations], } # ─── Virtual Queue ─────────────────────────────────────────────────── def join_virtual_queue(self, user_id: str, station_id: str) -> Optional[Dict]: """Create a virtual queue reservation.""" station = self._stations.get(station_id) if not station or not station.is_open: return None # Check if user already has a ticket for this station existing = [ t for t in self._virtual_tickets.values() if t.user_id == user_id and t.station_id == station_id and t.status == "waiting" ] if existing: return existing[0].to_dict() # Count waiting tickets for this station waiting_count = sum( 1 for t in self._virtual_tickets.values() if t.station_id == station_id and t.status == "waiting" ) ticket = VirtualQueueTicket( user_id=user_id, station_id=station_id, station_name=station.name, category=station.category, position=waiting_count + 1, estimated_ready_time=datetime.utcnow() + timedelta( seconds=station.avg_service_time_sec * (waiting_count + 1) ), ) self._virtual_tickets[ticket.id] = ticket return ticket.to_dict() def get_user_tickets(self, user_id: str) -> List[Dict]: """Get all virtual tickets for a user.""" return [ t.to_dict() for t in self._virtual_tickets.values() if t.user_id == user_id ] def cancel_ticket(self, ticket_id: str, user_id: str) -> bool: """Cancel a virtual queue ticket.""" ticket = self._virtual_tickets.get(ticket_id) if ticket and ticket.user_id == user_id and ticket.status == "waiting": ticket.status = "expired" return True return False def get_station_history(self, station_id: str, last_n: int = 20) -> List[Dict]: """Get queue length history for a station.""" return self._station_history.get(station_id, [])[-last_n:]