File size: 11,213 Bytes
7e68852
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
import torch
import asyncio
import logging
import base64
import io
import numpy as np
from typing import Optional
from backend.config import settings
try:
    from vibevoice.modular.modeling_vibevoice_inference import VibeVoiceForConditionalGenerationInference
    from vibevoice.processor.vibevoice_processor import VibeVoiceProcessor
    VIBEVOICE_AVAILABLE = True
except ImportError:
    VIBEVOICE_AVAILABLE = False

logger = logging.getLogger(__name__)

class VoiceSynthesizer:
    def __init__(self):
        self.voice_model = None
        self.voice_processor = None
        self.character_voice_configs = {}
        
    async def initialize(self):
        """Initialize voice synthesis model"""
        if not settings.ENABLE_VOICE:
            logger.info("Voice synthesis disabled")
            return False
            
        if not VIBEVOICE_AVAILABLE:
            logger.error("VibeVoice community package not available. Install with: pip install git+https://github.com/vibevoice-community/VibeVoice.git")
            return False
            
        logger.info("Loading VibeVoice model...")
        
        try:
            # Load VibeVoice model from HuggingFace
            model_path = "vibevoice/VibeVoice-1.5B"
            
            # Load processor
            logger.info(f"Loading processor from {model_path}")
            self.voice_processor = VibeVoiceProcessor.from_pretrained(model_path)
            
            # Determine device and dtype
            device = "cuda" if torch.cuda.is_available() else "cpu"
            load_dtype = torch.bfloat16 if device == "cuda" else torch.float32
            attn_impl = "flash_attention_2" if device == "cuda" else "sdpa"
            
            logger.info(f"Loading model with device: {device}, dtype: {load_dtype}, attention: {attn_impl}")
            
            # Load model
            if device == "cuda":
                self.voice_model = VibeVoiceForConditionalGenerationInference.from_pretrained(
                    model_path,
                    torch_dtype=load_dtype,
                    device_map="cuda",
                    attn_implementation=attn_impl,
                )
            else:
                self.voice_model = VibeVoiceForConditionalGenerationInference.from_pretrained(
                    model_path,
                    torch_dtype=load_dtype,
                    device_map="cpu",
                    attn_implementation=attn_impl,
                )
            
            # Set inference steps
            self.voice_model.eval()
            self.voice_model.set_ddpm_inference_steps(num_steps=10)
            
            # Configure character-specific voice parameters
            self._setup_character_voices()
            
            logger.info("VibeVoice synthesizer initialized successfully")
            return True
            
        except Exception as e:
            logger.error(f"Failed to initialize VibeVoice model: {e}")
            logger.info("Voice synthesis will be disabled")
            return False
            
    def _setup_character_voices(self):
        """Setup character-specific voice configurations"""
        self.character_voice_configs = {
            "moses": {
                "style": "authoritative",
                "speed": 0.9,  # Slightly slower, more measured
                "pitch": 0.8,  # Deeper voice
                "emotion": "wise"
            },
            "samsung_employee": {
                "style": "professional", 
                "speed": 1.0,  # Normal speed
                "pitch": 1.0,  # Normal pitch
                "emotion": "friendly"
            },
            "jinx": {
                "style": "energetic",
                "speed": 1.2,  # Faster, more manic
                "pitch": 1.3,  # Higher pitch
                "emotion": "playful"
            }
        }
        
    async def synthesize(self, text: str, character_id: str) -> Optional[str]:
        """Synthesize speech for given text and character"""
        if not settings.ENABLE_VOICE or not self.voice_model or not self.voice_tokenizer:
            return None
            
        try:
            # Get character voice config
            voice_config = self.character_voice_configs.get(
                character_id, 
                self.character_voice_configs["samsung_employee"]  # Default
            )
            
            # Prepare text for TTS
            processed_text = self._preprocess_text(text, character_id)
            
            # Process text with VibeVoice tokenizer
            inputs = self.voice_tokenizer(
                processed_text,
                return_tensors="pt",
                max_length=512,
                truncation=True,
                padding=True
            )
            
            if settings.DEVICE == "cuda" and torch.cuda.is_available():
                inputs = {k: v.cuda() for k, v in inputs.items()}
            
            # Generate audio using VibeVoice
            with torch.no_grad():
                outputs = self.voice_model.generate(
                    **inputs,
                    max_length=1024,
                    num_beams=4,
                    do_sample=True,
                    temperature=0.8
                )
                
                # Convert outputs to audio waveform
                audio_features = outputs
            
            # Convert model outputs to audio waveform
            audio_np = self._features_to_audio(audio_features, voice_config)
            
            # Apply character-specific modifications
            audio_np = self._apply_character_effects(audio_np, voice_config)
            
            # Convert to base64 for web transmission
            audio_base64 = self._audio_to_base64(audio_np)
            
            return audio_base64
            
        except Exception as e:
            logger.error(f"Error in voice synthesis: {e}")
            return None
            
    def _preprocess_text(self, text: str, character_id: str) -> str:
        """Preprocess text for character-specific speech patterns"""
        
        # Character-specific text modifications
        if character_id == "moses":
            # Add pauses for emphasis, make more formal
            text = text.replace("!", ".")  # Less exclamatory
            text = text.replace("...", "... ") # Add pauses
            
        elif character_id == "jinx":
            # Make more energetic and expressive
            text = text.replace(".", "!")  # More excitement
            text = text.replace(",", "... ") # Add dramatic pauses
            
        # Clean up text
        text = text.strip()
        
        # Add character voice prompt for better synthesis
        voice_prompts = {
            "moses": f"[Speaking with wisdom and authority] {text}",
            "samsung_employee": f"[Speaking professionally and clearly] {text}",
            "jinx": f"[Speaking energetically and playfully] {text}"
        }
        
        return voice_prompts.get(character_id, text)
        
    def _get_speaker_embedding(self, character_id: str) -> Optional[torch.Tensor]:
        """Get speaker embedding for character (simplified approach)"""
        # Create different speaker embeddings for different characters
        # This is a simplified approach - in practice, you'd train specific embeddings
        
        embeddings = {
            "moses": torch.randn(1, 512) * 0.1,  # Deeper, more authoritative
            "samsung_employee": torch.randn(1, 512) * 0.05,  # Neutral, professional
            "jinx": torch.randn(1, 512) * 0.15,  # More varied, energetic
        }
        
        # Set seed for consistency
        torch.manual_seed(hash(character_id) % 10000)
        embedding = embeddings.get(character_id, embeddings["samsung_employee"])
        
        return embedding
        
    def _spectrogram_to_audio(self, spectrogram: torch.Tensor, voice_config: dict) -> np.ndarray:
        """Convert spectrogram to audio waveform (fallback method)"""
        # This is a simplified conversion for when vocoder is not available
        
        if spectrogram.is_cuda:
            spectrogram = spectrogram.cpu()
        spec_np = spectrogram.squeeze().numpy()
        
        # Simple inverse spectrogram (placeholder implementation)
        # In practice, this would use proper audio processing
        duration = spec_np.shape[1] * 0.05  # Estimate duration
        samples = int(duration * settings.SAMPLE_RATE)
        
        # Generate audio based on spectral features
        audio = np.zeros(samples)
        for i in range(min(spec_np.shape[0], samples)):
            if i < len(audio):
                audio[i] = np.mean(spec_np[:, i % spec_np.shape[1]]) * 0.3
                
        return audio.astype(np.float32)
        
    def _apply_character_effects(self, audio: np.ndarray, voice_config: dict) -> np.ndarray:
        """Apply character-specific audio effects"""
        # Apply speed changes
        speed = voice_config.get("speed", 1.0)
        if speed != 1.0:
            audio = self._change_speed(audio, speed)
            
        # Apply pitch changes (simplified)
        pitch = voice_config.get("pitch", 1.0)
        if pitch != 1.0:
            audio = self._change_pitch(audio, pitch)
            
        return audio
        
    def _change_pitch(self, audio: np.ndarray, pitch_factor: float) -> np.ndarray:
        """Change pitch of audio (simplified implementation)"""
        if pitch_factor == 1.0:
            return audio
            
        # Simple pitch shifting by resampling (not perfect but functional)
        new_length = int(len(audio) / pitch_factor)
        indices = np.linspace(0, len(audio) - 1, new_length)
        return np.interp(indices, np.arange(len(audio)), audio)
        

        
    def _change_speed(self, audio: np.ndarray, speed: float) -> np.ndarray:
        """Change audio playback speed"""
        if speed == 1.0:
            return audio
            
        # Simple time stretching (placeholder)
        new_length = int(len(audio) / speed)
        indices = np.linspace(0, len(audio) - 1, new_length)
        return np.interp(indices, np.arange(len(audio)), audio)
        
    def _audio_to_base64(self, audio_data: np.ndarray) -> str:
        """Convert audio numpy array to base64 string"""
        # Convert to 16-bit PCM
        audio_int16 = (audio_data * 32767).astype(np.int16)
        
        # Create WAV file in memory
        buffer = io.BytesIO()
        
        # Write WAV header and data
        torchaudio.save(
            buffer, 
            torch.from_numpy(audio_int16).unsqueeze(0).float() / 32767.0,
            settings.SAMPLE_RATE,
            format="wav"
        )
        
        # Get bytes and encode to base64
        buffer.seek(0)
        audio_bytes = buffer.read()
        audio_base64 = base64.b64encode(audio_bytes).decode('utf-8')
        
        return f"data:audio/wav;base64,{audio_base64}"
        
    def get_character_voice_info(self, character_id: str) -> dict:
        """Get voice configuration for character"""
        return self.character_voice_configs.get(character_id, {})