File size: 6,424 Bytes
249e06d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
"""
AV Synchronization
================
Audio-Video sync using LiveKit's AVSynchronizer.
"""
from __future__ import annotations

import asyncio
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Optional

import numpy as np

# Use relative imports for standalone
import sys
from pathlib import Path

_backend_dir = Path(__file__).parent.parent
if str(_backend_dir) not in sys.path:
    sys.path.insert(0, str(_backend_dir))

from config import VIDEO_FPS, TTS_SAMPLE_RATE, LIVEKIT_AUDIO_SAMPLE_RATE

log = logging.getLogger(__name__)


class SyncState(Enum):
    IDLE = "idle"
    BUFFERING = "buffering"
    PLAYING = "playing"
    SYNCING = "syncing"


@dataclass
class AVSyncState:
    """Current AV synchronization state."""
    state: SyncState
    audio_pts: float = 0.0
    video_pts: float = 0.0
    audio_buffer_ms: float = 0.0
    target_latency_ms: float = 160.0  # 160ms initial target


class AVSyncGate:
    """
    Audio-Video sync gate.
    
    Controls when audio/video frames are published to maintain sync.
    Uses LiveKit's AVSynchronizer internally.
    """
    
    def __init__(
        self,
        video_fps: int = VIDEO_FPS,
        audio_sample_rate: int = TTS_SAMPLE_RATE,
        target_latency_ms: float = 160.0,
    ):
        self._video_fps = video_fps
        self._audio_sample_rate = audio_sample_rate
        self._target_latency_ms = target_latency_ms
        
        self._state = SyncState.IDLE
        self._audio_pts = 0.0
        self._video_pts = 0.0
        
        self._audio_queue: asyncio.Queue = asyncio.Queue()
        self._video_queue: asyncio.Queue = asyncio.Queue()
        
        self._sync_task: Optional[asyncio.Task] = None
        
        log.info("AVSyncGate initialized (fps=%d, sr=%d, latency=%dms)",
                 video_fps, audio_sample_rate, target_latency_ms)
    
    @property
    def state(self) -> SyncState:
        return self._state
    
    @property
    def audio_pts(self) -> float:
        return self._audio_pts
    
    @property
    def video_pts(self) -> float:
        return self._video_pts
    
    async def start(self):
        """Start the sync task."""
        self._sync_task = asyncio.create_task(self._sync_loop())
        log.info("AVSyncGate started")
    
    async def stop(self):
        """Stop the sync task."""
        if self._sync_task:
            self._sync_task.cancel()
            try:
                await self._sync_task
            except asyncio.CancelledError:
                pass
        log.info("AVSyncGate stopped")
    
    async def push_audio(self, audio_chunk: np.ndarray, pts_start: float, pts_end: float):
        """Push audio chunk to sync gate."""
        await self._audio_queue.put((audio_chunk, pts_start, pts_end))
        
        if self._state == SyncState.IDLE:
            self._state = SyncState.BUFFERING
    
    async def push_video(self, frames: list, pts_start: float, pts_end: float):
        """Push video frames to sync gate."""
        await self._video_queue.put((frames, pts_start, pts_end))
    
    async def get_synced_pair(self) -> tuple[np.ndarray, list, float, float]:
        """
        Get synchronized audio-video pair.
        Blocks until both are available and aligned.
        """
        # Get audio
        audio_chunk, audio_pts_start, audio_pts_end = await self._audio_queue.get()
        
        # Get video (should be same duration)
        frames, video_pts_start, video_pts_end = await self._video_queue.get()
        
        # Align timestamps
        self._audio_pts = audio_pts_end
        self._video_pts = video_pts_end
        
        if self._state == SyncState.BUFFERING:
            self._state = SyncState.PLAYING
        
        return audio_chunk, frames, audio_pts_start, video_pts_start
    
    async def _sync_loop(self):
        """Background sync monitoring."""
        while True:
            await asyncio.sleep(0.5)
            
            audio_qsize = self._audio_queue.qsize()
            video_qsize = self._video_queue.qsize()
            
            if audio_qsize > 0 and video_qsize > 0:
                if self._state == SyncState.BUFFERING:
                    self._state = SyncState.PLAYING
            elif audio_qsize == 0 and video_qsize == 0:
                if self._state == SyncState.PLAYING:
                    self._state = SyncState.IDLE


class SimpleAVSync:
    """
    Simplified AV sync without LiveKit AVSynchronizer.
    Uses PTS-based alignment for lower latency.
    """
    
    def __init__(
        self,
        video_fps: int = VIDEO_FPS,
    ):
        self._video_fps = video_fps
        self._frame_duration = 1.0 / video_fps  # 40ms per frame
        
        self._pending_audio: list = []
        self._pending_video: list = []
        
        log.info("SimpleAVSync initialized (fps=%d, frame_duration=%.3fs)",
                 video_fps, self._frame_duration)
    
    def add_audio(self, audio: np.ndarray, pts_start: float, pts_end: float):
        """Add audio chunk."""
        self._pending_audio.append((audio, pts_start, pts_end))
    
    def add_video(self, frames: list, pts_start: float, pts_end: float):
        """Add video frames."""
        self._pending_video.append((frames, pts_start, pts_end))
    
    def get_next_pair(self) -> Optional[tuple[list, np.ndarray]]:
        """
        Get next synchronized audio-video pair.
        Returns (video_frames, audio_pcm) or None if not aligned.
        """
        if not self._pending_audio or not self._pending_video:
            return None
        
        # Get first audio and video
        audio, audio_pts_start, audio_pts_end = self._pending_audio[0]
        video, video_pts_start, video_pts_end = self._pending_video[0]
        
        # Check if aligned (within tolerance)
        tolerance = 0.02  # 20ms tolerance
        
        if abs(audio_pts_end - video_pts_end) > tolerance:
            return None
        
        # Aligned - return pair
        self._pending_audio.pop(0)
        self._pending_video.pop(0)
        
        return video, audio
    
    def has_pending(self) -> bool:
        """Check if there are pending chunks."""
        return len(self._pending_audio) > 0 or len(self._pending_video) > 0
    
    def clear(self):
        """Clear pending chunks."""
        self._pending_audio.clear()
        self._pending_video.clear()