| """ |
| AV Synchronization |
| ================ |
| Audio-Video sync using LiveKit's AVSynchronizer. |
| """ |
| from __future__ import annotations |
|
|
| import asyncio |
| import logging |
| from dataclasses import dataclass |
| from enum import Enum |
| from typing import Optional |
|
|
| import numpy as np |
|
|
| |
| import sys |
| from pathlib import Path |
|
|
| _backend_dir = Path(__file__).parent.parent |
| if str(_backend_dir) not in sys.path: |
| sys.path.insert(0, str(_backend_dir)) |
|
|
| from config import VIDEO_FPS, TTS_SAMPLE_RATE, LIVEKIT_AUDIO_SAMPLE_RATE |
|
|
| log = logging.getLogger(__name__) |
|
|
|
|
| class SyncState(Enum): |
| IDLE = "idle" |
| BUFFERING = "buffering" |
| PLAYING = "playing" |
| SYNCING = "syncing" |
|
|
|
|
| @dataclass |
| class AVSyncState: |
| """Current AV synchronization state.""" |
| state: SyncState |
| audio_pts: float = 0.0 |
| video_pts: float = 0.0 |
| audio_buffer_ms: float = 0.0 |
| target_latency_ms: float = 160.0 |
|
|
|
|
| class AVSyncGate: |
| """ |
| Audio-Video sync gate. |
| |
| Controls when audio/video frames are published to maintain sync. |
| Uses LiveKit's AVSynchronizer internally. |
| """ |
| |
| def __init__( |
| self, |
| video_fps: int = VIDEO_FPS, |
| audio_sample_rate: int = TTS_SAMPLE_RATE, |
| target_latency_ms: float = 160.0, |
| ): |
| self._video_fps = video_fps |
| self._audio_sample_rate = audio_sample_rate |
| self._target_latency_ms = target_latency_ms |
| |
| self._state = SyncState.IDLE |
| self._audio_pts = 0.0 |
| self._video_pts = 0.0 |
| |
| self._audio_queue: asyncio.Queue = asyncio.Queue() |
| self._video_queue: asyncio.Queue = asyncio.Queue() |
| |
| self._sync_task: Optional[asyncio.Task] = None |
| |
| log.info("AVSyncGate initialized (fps=%d, sr=%d, latency=%dms)", |
| video_fps, audio_sample_rate, target_latency_ms) |
| |
| @property |
| def state(self) -> SyncState: |
| return self._state |
| |
| @property |
| def audio_pts(self) -> float: |
| return self._audio_pts |
| |
| @property |
| def video_pts(self) -> float: |
| return self._video_pts |
| |
| async def start(self): |
| """Start the sync task.""" |
| self._sync_task = asyncio.create_task(self._sync_loop()) |
| log.info("AVSyncGate started") |
| |
| async def stop(self): |
| """Stop the sync task.""" |
| if self._sync_task: |
| self._sync_task.cancel() |
| try: |
| await self._sync_task |
| except asyncio.CancelledError: |
| pass |
| log.info("AVSyncGate stopped") |
| |
| async def push_audio(self, audio_chunk: np.ndarray, pts_start: float, pts_end: float): |
| """Push audio chunk to sync gate.""" |
| await self._audio_queue.put((audio_chunk, pts_start, pts_end)) |
| |
| if self._state == SyncState.IDLE: |
| self._state = SyncState.BUFFERING |
| |
| async def push_video(self, frames: list, pts_start: float, pts_end: float): |
| """Push video frames to sync gate.""" |
| await self._video_queue.put((frames, pts_start, pts_end)) |
| |
| async def get_synced_pair(self) -> tuple[np.ndarray, list, float, float]: |
| """ |
| Get synchronized audio-video pair. |
| Blocks until both are available and aligned. |
| """ |
| |
| audio_chunk, audio_pts_start, audio_pts_end = await self._audio_queue.get() |
| |
| |
| frames, video_pts_start, video_pts_end = await self._video_queue.get() |
| |
| |
| self._audio_pts = audio_pts_end |
| self._video_pts = video_pts_end |
| |
| if self._state == SyncState.BUFFERING: |
| self._state = SyncState.PLAYING |
| |
| return audio_chunk, frames, audio_pts_start, video_pts_start |
| |
| async def _sync_loop(self): |
| """Background sync monitoring.""" |
| while True: |
| await asyncio.sleep(0.5) |
| |
| audio_qsize = self._audio_queue.qsize() |
| video_qsize = self._video_queue.qsize() |
| |
| if audio_qsize > 0 and video_qsize > 0: |
| if self._state == SyncState.BUFFERING: |
| self._state = SyncState.PLAYING |
| elif audio_qsize == 0 and video_qsize == 0: |
| if self._state == SyncState.PLAYING: |
| self._state = SyncState.IDLE |
|
|
|
|
| class SimpleAVSync: |
| """ |
| Simplified AV sync without LiveKit AVSynchronizer. |
| Uses PTS-based alignment for lower latency. |
| """ |
| |
| def __init__( |
| self, |
| video_fps: int = VIDEO_FPS, |
| ): |
| self._video_fps = video_fps |
| self._frame_duration = 1.0 / video_fps |
| |
| self._pending_audio: list = [] |
| self._pending_video: list = [] |
| |
| log.info("SimpleAVSync initialized (fps=%d, frame_duration=%.3fs)", |
| video_fps, self._frame_duration) |
| |
| def add_audio(self, audio: np.ndarray, pts_start: float, pts_end: float): |
| """Add audio chunk.""" |
| self._pending_audio.append((audio, pts_start, pts_end)) |
| |
| def add_video(self, frames: list, pts_start: float, pts_end: float): |
| """Add video frames.""" |
| self._pending_video.append((frames, pts_start, pts_end)) |
| |
| def get_next_pair(self) -> Optional[tuple[list, np.ndarray]]: |
| """ |
| Get next synchronized audio-video pair. |
| Returns (video_frames, audio_pcm) or None if not aligned. |
| """ |
| if not self._pending_audio or not self._pending_video: |
| return None |
| |
| |
| audio, audio_pts_start, audio_pts_end = self._pending_audio[0] |
| video, video_pts_start, video_pts_end = self._pending_video[0] |
| |
| |
| tolerance = 0.02 |
| |
| if abs(audio_pts_end - video_pts_end) > tolerance: |
| return None |
| |
| |
| self._pending_audio.pop(0) |
| self._pending_video.pop(0) |
| |
| return video, audio |
| |
| def has_pending(self) -> bool: |
| """Check if there are pending chunks.""" |
| return len(self._pending_audio) > 0 or len(self._pending_video) > 0 |
| |
| def clear(self): |
| """Clear pending chunks.""" |
| self._pending_audio.clear() |
| self._pending_video.clear() |
|
|