Avatar-Speech / backend /sync /av_sync.py
agkavin
Initial commit: speech_to_video project with models via LFS
249e06d
"""
AV Synchronization
================
Audio-Video sync using LiveKit's AVSynchronizer.
"""
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import numpy as np
# Use relative imports for standalone
import sys
from pathlib import Path
_backend_dir = Path(__file__).parent.parent
if str(_backend_dir) not in sys.path:
sys.path.insert(0, str(_backend_dir))
from config import VIDEO_FPS, TTS_SAMPLE_RATE, LIVEKIT_AUDIO_SAMPLE_RATE
log = logging.getLogger(__name__)
class SyncState(Enum):
IDLE = "idle"
BUFFERING = "buffering"
PLAYING = "playing"
SYNCING = "syncing"
@dataclass
class AVSyncState:
"""Current AV synchronization state."""
state: SyncState
audio_pts: float = 0.0
video_pts: float = 0.0
audio_buffer_ms: float = 0.0
target_latency_ms: float = 160.0 # 160ms initial target
class AVSyncGate:
"""
Audio-Video sync gate.
Controls when audio/video frames are published to maintain sync.
Uses LiveKit's AVSynchronizer internally.
"""
def __init__(
self,
video_fps: int = VIDEO_FPS,
audio_sample_rate: int = TTS_SAMPLE_RATE,
target_latency_ms: float = 160.0,
):
self._video_fps = video_fps
self._audio_sample_rate = audio_sample_rate
self._target_latency_ms = target_latency_ms
self._state = SyncState.IDLE
self._audio_pts = 0.0
self._video_pts = 0.0
self._audio_queue: asyncio.Queue = asyncio.Queue()
self._video_queue: asyncio.Queue = asyncio.Queue()
self._sync_task: Optional[asyncio.Task] = None
log.info("AVSyncGate initialized (fps=%d, sr=%d, latency=%dms)",
video_fps, audio_sample_rate, target_latency_ms)
@property
def state(self) -> SyncState:
return self._state
@property
def audio_pts(self) -> float:
return self._audio_pts
@property
def video_pts(self) -> float:
return self._video_pts
async def start(self):
"""Start the sync task."""
self._sync_task = asyncio.create_task(self._sync_loop())
log.info("AVSyncGate started")
async def stop(self):
"""Stop the sync task."""
if self._sync_task:
self._sync_task.cancel()
try:
await self._sync_task
except asyncio.CancelledError:
pass
log.info("AVSyncGate stopped")
async def push_audio(self, audio_chunk: np.ndarray, pts_start: float, pts_end: float):
"""Push audio chunk to sync gate."""
await self._audio_queue.put((audio_chunk, pts_start, pts_end))
if self._state == SyncState.IDLE:
self._state = SyncState.BUFFERING
async def push_video(self, frames: list, pts_start: float, pts_end: float):
"""Push video frames to sync gate."""
await self._video_queue.put((frames, pts_start, pts_end))
async def get_synced_pair(self) -> tuple[np.ndarray, list, float, float]:
"""
Get synchronized audio-video pair.
Blocks until both are available and aligned.
"""
# Get audio
audio_chunk, audio_pts_start, audio_pts_end = await self._audio_queue.get()
# Get video (should be same duration)
frames, video_pts_start, video_pts_end = await self._video_queue.get()
# Align timestamps
self._audio_pts = audio_pts_end
self._video_pts = video_pts_end
if self._state == SyncState.BUFFERING:
self._state = SyncState.PLAYING
return audio_chunk, frames, audio_pts_start, video_pts_start
async def _sync_loop(self):
"""Background sync monitoring."""
while True:
await asyncio.sleep(0.5)
audio_qsize = self._audio_queue.qsize()
video_qsize = self._video_queue.qsize()
if audio_qsize > 0 and video_qsize > 0:
if self._state == SyncState.BUFFERING:
self._state = SyncState.PLAYING
elif audio_qsize == 0 and video_qsize == 0:
if self._state == SyncState.PLAYING:
self._state = SyncState.IDLE
class SimpleAVSync:
"""
Simplified AV sync without LiveKit AVSynchronizer.
Uses PTS-based alignment for lower latency.
"""
def __init__(
self,
video_fps: int = VIDEO_FPS,
):
self._video_fps = video_fps
self._frame_duration = 1.0 / video_fps # 40ms per frame
self._pending_audio: list = []
self._pending_video: list = []
log.info("SimpleAVSync initialized (fps=%d, frame_duration=%.3fs)",
video_fps, self._frame_duration)
def add_audio(self, audio: np.ndarray, pts_start: float, pts_end: float):
"""Add audio chunk."""
self._pending_audio.append((audio, pts_start, pts_end))
def add_video(self, frames: list, pts_start: float, pts_end: float):
"""Add video frames."""
self._pending_video.append((frames, pts_start, pts_end))
def get_next_pair(self) -> Optional[tuple[list, np.ndarray]]:
"""
Get next synchronized audio-video pair.
Returns (video_frames, audio_pcm) or None if not aligned.
"""
if not self._pending_audio or not self._pending_video:
return None
# Get first audio and video
audio, audio_pts_start, audio_pts_end = self._pending_audio[0]
video, video_pts_start, video_pts_end = self._pending_video[0]
# Check if aligned (within tolerance)
tolerance = 0.02 # 20ms tolerance
if abs(audio_pts_end - video_pts_end) > tolerance:
return None
# Aligned - return pair
self._pending_audio.pop(0)
self._pending_video.pop(0)
return video, audio
def has_pending(self) -> bool:
"""Check if there are pending chunks."""
return len(self._pending_audio) > 0 or len(self._pending_video) > 0
def clear(self):
"""Clear pending chunks."""
self._pending_audio.clear()
self._pending_video.clear()