d10g's picture
Updates to full race commentary
ba7e9b3
"""
Event Queue with prioritization for the F1 Commentary Robot.
This module implements a priority-based event queue that manages race events
awaiting commentary generation. Events are prioritized by importance and
processed in priority order rather than arrival order.
"""
import logging
import heapq
import threading
from typing import Optional, Tuple
from datetime import datetime
from reachy_f1_commentator.src.models import RaceEvent, EventType, EventPriority
logger = logging.getLogger(__name__)
class PriorityEventQueue:
"""
Priority queue for managing race events.
Events are prioritized by importance (CRITICAL > HIGH > MEDIUM > LOW)
and processed in priority order. The queue has a maximum size and
discards lowest priority events when full. Supports pause/resume
for Q&A interruption.
"""
def __init__(self, max_size: int = 10):
"""
Initialize priority event queue.
Args:
max_size: Maximum number of events to hold (default: 10)
"""
self._max_size = max_size
self._queue: list[Tuple[int, int, RaceEvent]] = [] # (priority, counter, event)
self._counter = 0 # Ensures FIFO for same priority
self._paused = False
self._lock = threading.Lock()
def enqueue(self, event: RaceEvent) -> None:
"""
Add event to queue with priority assignment.
If queue is full, discards lowest priority event to make room.
Priority is assigned based on event type.
Args:
event: Race event to enqueue
"""
try:
with self._lock:
priority = self._assign_priority(event)
# If queue is full, check if we should discard
if len(self._queue) >= self._max_size:
# Find lowest priority event (highest priority value)
if self._queue:
lowest_priority_item = max(self._queue, key=lambda x: x[0])
# Only add new event if it has higher priority than lowest
if priority.value < lowest_priority_item[0]:
# Remove lowest priority event
self._queue.remove(lowest_priority_item)
heapq.heapify(self._queue)
else:
# New event has lower priority, discard it
return
# Add event to queue
# Use counter to maintain FIFO order for same priority
heapq.heappush(self._queue, (priority.value, self._counter, event))
self._counter += 1
except Exception as e:
logger.error(f"[EventQueue] Error enqueueing event: {e}", exc_info=True)
def dequeue(self) -> Optional[RaceEvent]:
"""
Remove and return highest priority event.
Returns None if queue is empty or paused.
Returns:
Highest priority event, or None if empty/paused
"""
try:
with self._lock:
if self._paused or not self._queue:
return None
# Pop highest priority (lowest priority value)
_, _, event = heapq.heappop(self._queue)
return event
except Exception as e:
logger.error(f"[EventQueue] Error dequeueing event: {e}", exc_info=True)
return None
def pause(self) -> None:
"""
Pause event processing (for Q&A interruption).
When paused, dequeue() returns None even if events are available.
"""
with self._lock:
self._paused = True
def resume(self) -> None:
"""
Resume event processing after pause.
"""
with self._lock:
self._paused = False
def is_paused(self) -> bool:
"""
Check if queue is currently paused.
Returns:
True if paused, False otherwise
"""
with self._lock:
return self._paused
def size(self) -> int:
"""
Get current number of events in queue.
Returns:
Number of events currently queued
"""
with self._lock:
return len(self._queue)
def _assign_priority(self, event: RaceEvent) -> EventPriority:
"""
Assign priority based on event type.
Priority assignment logic:
- CRITICAL: Starting grid, race start, overtakes, pit stops, incidents, safety car, lead changes
- HIGH: Fastest laps
- MEDIUM: Race control messages (flags, etc.)
- LOW: Routine position updates
Args:
event: Race event to prioritize
Returns:
EventPriority enum value
"""
# Starting grid and race start get highest priority
if event.data.get('is_starting_grid') or event.data.get('is_race_start'):
return EventPriority.CRITICAL
# Overtakes and pit stops are the most interesting events - make them CRITICAL
if event.event_type in [EventType.OVERTAKE, EventType.PIT_STOP]:
return EventPriority.CRITICAL
# Safety car and lead changes also CRITICAL (incidents disabled for now)
if event.event_type in [EventType.SAFETY_CAR, EventType.LEAD_CHANGE]:
return EventPriority.CRITICAL
# Fastest laps are interesting but less critical
elif event.event_type == EventType.FASTEST_LAP:
return EventPriority.HIGH
# Race control messages (flags, etc.) are medium priority
elif event.event_type == EventType.FLAG:
return EventPriority.MEDIUM
# Everything else is low priority
else:
return EventPriority.LOW