""" AV Synchronization ================ Audio-Video sync using LiveKit's AVSynchronizer. """ from __future__ import annotations import asyncio import logging from dataclasses import dataclass from enum import Enum from typing import Optional import numpy as np # Use relative imports for standalone import sys from pathlib import Path _backend_dir = Path(__file__).parent.parent if str(_backend_dir) not in sys.path: sys.path.insert(0, str(_backend_dir)) from config import VIDEO_FPS, TTS_SAMPLE_RATE, LIVEKIT_AUDIO_SAMPLE_RATE log = logging.getLogger(__name__) class SyncState(Enum): IDLE = "idle" BUFFERING = "buffering" PLAYING = "playing" SYNCING = "syncing" @dataclass class AVSyncState: """Current AV synchronization state.""" state: SyncState audio_pts: float = 0.0 video_pts: float = 0.0 audio_buffer_ms: float = 0.0 target_latency_ms: float = 160.0 # 160ms initial target class AVSyncGate: """ Audio-Video sync gate. Controls when audio/video frames are published to maintain sync. Uses LiveKit's AVSynchronizer internally. """ def __init__( self, video_fps: int = VIDEO_FPS, audio_sample_rate: int = TTS_SAMPLE_RATE, target_latency_ms: float = 160.0, ): self._video_fps = video_fps self._audio_sample_rate = audio_sample_rate self._target_latency_ms = target_latency_ms self._state = SyncState.IDLE self._audio_pts = 0.0 self._video_pts = 0.0 self._audio_queue: asyncio.Queue = asyncio.Queue() self._video_queue: asyncio.Queue = asyncio.Queue() self._sync_task: Optional[asyncio.Task] = None log.info("AVSyncGate initialized (fps=%d, sr=%d, latency=%dms)", video_fps, audio_sample_rate, target_latency_ms) @property def state(self) -> SyncState: return self._state @property def audio_pts(self) -> float: return self._audio_pts @property def video_pts(self) -> float: return self._video_pts async def start(self): """Start the sync task.""" self._sync_task = asyncio.create_task(self._sync_loop()) log.info("AVSyncGate started") async def stop(self): """Stop the sync task.""" if self._sync_task: self._sync_task.cancel() try: await self._sync_task except asyncio.CancelledError: pass log.info("AVSyncGate stopped") async def push_audio(self, audio_chunk: np.ndarray, pts_start: float, pts_end: float): """Push audio chunk to sync gate.""" await self._audio_queue.put((audio_chunk, pts_start, pts_end)) if self._state == SyncState.IDLE: self._state = SyncState.BUFFERING async def push_video(self, frames: list, pts_start: float, pts_end: float): """Push video frames to sync gate.""" await self._video_queue.put((frames, pts_start, pts_end)) async def get_synced_pair(self) -> tuple[np.ndarray, list, float, float]: """ Get synchronized audio-video pair. Blocks until both are available and aligned. """ # Get audio audio_chunk, audio_pts_start, audio_pts_end = await self._audio_queue.get() # Get video (should be same duration) frames, video_pts_start, video_pts_end = await self._video_queue.get() # Align timestamps self._audio_pts = audio_pts_end self._video_pts = video_pts_end if self._state == SyncState.BUFFERING: self._state = SyncState.PLAYING return audio_chunk, frames, audio_pts_start, video_pts_start async def _sync_loop(self): """Background sync monitoring.""" while True: await asyncio.sleep(0.5) audio_qsize = self._audio_queue.qsize() video_qsize = self._video_queue.qsize() if audio_qsize > 0 and video_qsize > 0: if self._state == SyncState.BUFFERING: self._state = SyncState.PLAYING elif audio_qsize == 0 and video_qsize == 0: if self._state == SyncState.PLAYING: self._state = SyncState.IDLE class SimpleAVSync: """ Simplified AV sync without LiveKit AVSynchronizer. Uses PTS-based alignment for lower latency. """ def __init__( self, video_fps: int = VIDEO_FPS, ): self._video_fps = video_fps self._frame_duration = 1.0 / video_fps # 40ms per frame self._pending_audio: list = [] self._pending_video: list = [] log.info("SimpleAVSync initialized (fps=%d, frame_duration=%.3fs)", video_fps, self._frame_duration) def add_audio(self, audio: np.ndarray, pts_start: float, pts_end: float): """Add audio chunk.""" self._pending_audio.append((audio, pts_start, pts_end)) def add_video(self, frames: list, pts_start: float, pts_end: float): """Add video frames.""" self._pending_video.append((frames, pts_start, pts_end)) def get_next_pair(self) -> Optional[tuple[list, np.ndarray]]: """ Get next synchronized audio-video pair. Returns (video_frames, audio_pcm) or None if not aligned. """ if not self._pending_audio or not self._pending_video: return None # Get first audio and video audio, audio_pts_start, audio_pts_end = self._pending_audio[0] video, video_pts_start, video_pts_end = self._pending_video[0] # Check if aligned (within tolerance) tolerance = 0.02 # 20ms tolerance if abs(audio_pts_end - video_pts_end) > tolerance: return None # Aligned - return pair self._pending_audio.pop(0) self._pending_video.pop(0) return video, audio def has_pending(self) -> bool: """Check if there are pending chunks.""" return len(self._pending_audio) > 0 or len(self._pending_video) > 0 def clear(self): """Clear pending chunks.""" self._pending_audio.clear() self._pending_video.clear()