Spaces:
Running
Running
File size: 6,068 Bytes
c95ad37 ba7e9b3 c95ad37 ba7e9b3 c95ad37 ba7e9b3 c95ad37 ba7e9b3 c95ad37 ba7e9b3 c95ad37 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 | """
Event Queue with prioritization for the F1 Commentary Robot.
This module implements a priority-based event queue that manages race events
awaiting commentary generation. Events are prioritized by importance and
processed in priority order rather than arrival order.
"""
import logging
import heapq
import threading
from typing import Optional, Tuple
from datetime import datetime
from reachy_f1_commentator.src.models import RaceEvent, EventType, EventPriority
logger = logging.getLogger(__name__)
class PriorityEventQueue:
"""
Priority queue for managing race events.
Events are prioritized by importance (CRITICAL > HIGH > MEDIUM > LOW)
and processed in priority order. The queue has a maximum size and
discards lowest priority events when full. Supports pause/resume
for Q&A interruption.
"""
def __init__(self, max_size: int = 10):
"""
Initialize priority event queue.
Args:
max_size: Maximum number of events to hold (default: 10)
"""
self._max_size = max_size
self._queue: list[Tuple[int, int, RaceEvent]] = [] # (priority, counter, event)
self._counter = 0 # Ensures FIFO for same priority
self._paused = False
self._lock = threading.Lock()
def enqueue(self, event: RaceEvent) -> None:
"""
Add event to queue with priority assignment.
If queue is full, discards lowest priority event to make room.
Priority is assigned based on event type.
Args:
event: Race event to enqueue
"""
try:
with self._lock:
priority = self._assign_priority(event)
# If queue is full, check if we should discard
if len(self._queue) >= self._max_size:
# Find lowest priority event (highest priority value)
if self._queue:
lowest_priority_item = max(self._queue, key=lambda x: x[0])
# Only add new event if it has higher priority than lowest
if priority.value < lowest_priority_item[0]:
# Remove lowest priority event
self._queue.remove(lowest_priority_item)
heapq.heapify(self._queue)
else:
# New event has lower priority, discard it
return
# Add event to queue
# Use counter to maintain FIFO order for same priority
heapq.heappush(self._queue, (priority.value, self._counter, event))
self._counter += 1
except Exception as e:
logger.error(f"[EventQueue] Error enqueueing event: {e}", exc_info=True)
def dequeue(self) -> Optional[RaceEvent]:
"""
Remove and return highest priority event.
Returns None if queue is empty or paused.
Returns:
Highest priority event, or None if empty/paused
"""
try:
with self._lock:
if self._paused or not self._queue:
return None
# Pop highest priority (lowest priority value)
_, _, event = heapq.heappop(self._queue)
return event
except Exception as e:
logger.error(f"[EventQueue] Error dequeueing event: {e}", exc_info=True)
return None
def pause(self) -> None:
"""
Pause event processing (for Q&A interruption).
When paused, dequeue() returns None even if events are available.
"""
with self._lock:
self._paused = True
def resume(self) -> None:
"""
Resume event processing after pause.
"""
with self._lock:
self._paused = False
def is_paused(self) -> bool:
"""
Check if queue is currently paused.
Returns:
True if paused, False otherwise
"""
with self._lock:
return self._paused
def size(self) -> int:
"""
Get current number of events in queue.
Returns:
Number of events currently queued
"""
with self._lock:
return len(self._queue)
def _assign_priority(self, event: RaceEvent) -> EventPriority:
"""
Assign priority based on event type.
Priority assignment logic:
- CRITICAL: Starting grid, race start, overtakes, pit stops, incidents, safety car, lead changes
- HIGH: Fastest laps
- MEDIUM: Race control messages (flags, etc.)
- LOW: Routine position updates
Args:
event: Race event to prioritize
Returns:
EventPriority enum value
"""
# Starting grid and race start get highest priority
if event.data.get('is_starting_grid') or event.data.get('is_race_start'):
return EventPriority.CRITICAL
# Overtakes and pit stops are the most interesting events - make them CRITICAL
if event.event_type in [EventType.OVERTAKE, EventType.PIT_STOP]:
return EventPriority.CRITICAL
# Safety car and lead changes also CRITICAL (incidents disabled for now)
if event.event_type in [EventType.SAFETY_CAR, EventType.LEAD_CHANGE]:
return EventPriority.CRITICAL
# Fastest laps are interesting but less critical
elif event.event_type == EventType.FASTEST_LAP:
return EventPriority.HIGH
# Race control messages (flags, etc.) are medium priority
elif event.event_type == EventType.FLAG:
return EventPriority.MEDIUM
# Everything else is low priority
else:
return EventPriority.LOW
|