File size: 9,087 Bytes
a4e1d96
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
#!/usr/bin/env python3
"""
VibeVoice CoreML Inference Script

This script provides inference utilities for the converted VibeVoice models.
Note: This must be run on macOS to use CoreML models.

Usage:
    python inference.py --models-dir ./models --text "Hello world"
"""

import argparse
import json
from pathlib import Path
from typing import Optional, Tuple

import numpy as np

# CoreML is only available on macOS
try:
    import coremltools as ct
    COREML_AVAILABLE = True
except ImportError:
    COREML_AVAILABLE = False
    print("Warning: coremltools not available. Running in mock mode.")


class DPMSolverScheduler:
    """DPM-Solver scheduler for diffusion inference."""
    
    def __init__(
        self,
        num_train_timesteps: int = 1000,
        num_inference_steps: int = 20,
        beta_schedule: str = "cosine"
    ):
        self.num_train_timesteps = num_train_timesteps
        self.num_inference_steps = num_inference_steps
        
        # Compute beta schedule
        if beta_schedule == "cosine":
            steps = num_train_timesteps + 1
            t = np.linspace(0, 1, steps)
            alpha_bar = np.cos((t + 0.008) / 1.008 * np.pi / 2) ** 2
            self.betas = np.clip(1 - alpha_bar[1:] / alpha_bar[:-1], 0, 0.999)
        else:
            self.betas = np.linspace(0.0001, 0.02, num_train_timesteps)
        
        self.alphas = 1 - self.betas
        self.alphas_cumprod = np.cumprod(self.alphas)
        
        # Compute timesteps
        step_ratio = num_train_timesteps / num_inference_steps
        self.timesteps = (num_train_timesteps - 1 - np.arange(num_inference_steps) * step_ratio).astype(np.int64)
    
    def add_noise(self, original: np.ndarray, noise: np.ndarray, timestep: int) -> np.ndarray:
        """Add noise to sample at given timestep."""
        sqrt_alpha = np.sqrt(self.alphas_cumprod[timestep])
        sqrt_one_minus_alpha = np.sqrt(1 - self.alphas_cumprod[timestep])
        return sqrt_alpha * original + sqrt_one_minus_alpha * noise
    
    def step(
        self,
        model_output: np.ndarray,
        timestep: int,
        sample: np.ndarray,
        prediction_type: str = "v_prediction"
    ) -> np.ndarray:
        """Single denoising step."""
        alpha = self.alphas_cumprod[timestep]
        alpha_prev = self.alphas_cumprod[timestep - 1] if timestep > 0 else 1.0
        
        if prediction_type == "v_prediction":
            # Convert v to epsilon
            sqrt_alpha = np.sqrt(alpha)
            sqrt_one_minus_alpha = np.sqrt(1 - alpha)
            pred_original = sqrt_alpha * sample - sqrt_one_minus_alpha * model_output
            pred_epsilon = sqrt_alpha * model_output + sqrt_one_minus_alpha * sample
        else:
            pred_epsilon = model_output
            pred_original = (sample - sqrt_one_minus_alpha * pred_epsilon) / sqrt_alpha
        
        # Compute previous sample
        sqrt_alpha_prev = np.sqrt(alpha_prev)
        sqrt_one_minus_alpha_prev = np.sqrt(1 - alpha_prev)
        
        pred_sample_prev = sqrt_alpha_prev * pred_original + sqrt_one_minus_alpha_prev * pred_epsilon
        
        return pred_sample_prev


class VibeVoicePipeline:
    """VibeVoice CoreML inference pipeline."""
    
    def __init__(self, models_dir: Path):
        self.models_dir = Path(models_dir)
        self.models = {}
        
        # Load configuration
        config_path = self.models_dir / "vibevoice_pipeline_config.json"
        if config_path.exists():
            with open(config_path) as f:
                self.config = json.load(f)
        else:
            self.config = self._default_config()
        
        # Initialize scheduler
        self.scheduler = DPMSolverScheduler(
            num_inference_steps=self.config["inference"]["diffusion"]["num_steps"]
        )
        
        if COREML_AVAILABLE:
            self._load_models()
    
    def _default_config(self):
        return {
            "inference": {
                "audio": {"sample_rate": 24000, "downsample_factor": 3200},
                "diffusion": {"num_steps": 20, "prediction_type": "v_prediction"}
            }
        }
    
    def _load_models(self):
        """Load CoreML models."""
        model_files = {
            "acoustic_encoder": "vibevoice_acoustic_encoder.mlpackage",
            "acoustic_decoder": "vibevoice_acoustic_decoder.mlpackage",
            "semantic_encoder": "vibevoice_semantic_encoder.mlpackage",
            "llm": "vibevoice_llm.mlpackage",
            "diffusion_head": "vibevoice_diffusion_head.mlpackage"
        }
        
        for name, filename in model_files.items():
            path = self.models_dir / filename
            if path.exists():
                try:
                    self.models[name] = ct.models.MLModel(str(path))
                    print(f"Loaded {name}")
                except Exception as e:
                    print(f"Failed to load {name}: {e}")
    
    def encode_acoustic(self, audio: np.ndarray) -> np.ndarray:
        """Encode audio to acoustic latent."""
        if "acoustic_encoder" not in self.models:
            raise RuntimeError("Acoustic encoder not loaded")
        
        output = self.models["acoustic_encoder"].predict({"audio": audio})
        return output["acoustic_latent"]
    
    def decode_acoustic(self, latent: np.ndarray) -> np.ndarray:
        """Decode acoustic latent to audio."""
        if "acoustic_decoder" not in self.models:
            raise RuntimeError("Acoustic decoder not loaded")
        
        output = self.models["acoustic_decoder"].predict({"acoustic_latent": latent})
        return output["audio"]
    
    def run_llm(
        self,
        input_ids: np.ndarray,
        attention_mask: np.ndarray
    ) -> Tuple[np.ndarray, np.ndarray]:
        """Run LLM forward pass."""
        if "llm" not in self.models:
            raise RuntimeError("LLM not loaded")
        
        output = self.models["llm"].predict({
            "input_ids": input_ids.astype(np.int32),
            "attention_mask": attention_mask.astype(np.float32)
        })
        return output["hidden_states"], output["logits"]
    
    def diffusion_step(
        self,
        noisy_latent: np.ndarray,
        timestep: float,
        condition: np.ndarray
    ) -> np.ndarray:
        """Single diffusion denoising step."""
        if "diffusion_head" not in self.models:
            raise RuntimeError("Diffusion head not loaded")
        
        output = self.models["diffusion_head"].predict({
            "noisy_latent": noisy_latent.astype(np.float32),
            "timestep": np.array([timestep], dtype=np.float32),
            "condition": condition.astype(np.float32)
        })
        return output["prediction"]
    
    def generate_speech(
        self,
        hidden_states: np.ndarray,
        num_tokens: int = 8
    ) -> np.ndarray:
        """
        Generate speech latents using diffusion.
        
        Args:
            hidden_states: LLM hidden states [batch, seq, hidden_dim]
            num_tokens: Number of speech tokens to generate
        Returns:
            audio: Generated audio waveform
        """
        batch_size = hidden_states.shape[0]
        latent_dim = 64
        
        # Initialize with noise
        latents = np.random.randn(batch_size, num_tokens, latent_dim).astype(np.float32)
        
        # Get condition from last hidden states
        condition = hidden_states[:, -num_tokens:, :]  # [batch, num_tokens, hidden_dim]
        
        # Diffusion loop
        for t in self.scheduler.timesteps:
            for i in range(num_tokens):
                noisy = latents[:, i, :]  # [batch, latent_dim]
                cond = condition[:, i, :]  # [batch, hidden_dim]
                
                # Model prediction
                pred = self.diffusion_step(noisy, float(t), cond)
                
                # Scheduler step
                latents[:, i, :] = self.scheduler.step(
                    pred, int(t), noisy,
                    self.config["inference"]["diffusion"]["prediction_type"]
                )
        
        # Decode to audio
        audio = self.decode_acoustic(latents)
        
        return audio


def main():
    parser = argparse.ArgumentParser(description="VibeVoice CoreML Inference")
    parser.add_argument("--models-dir", required=True, help="Directory with CoreML models")
    parser.add_argument("--text", help="Text to synthesize")
    parser.add_argument("--output", default="output.wav", help="Output audio file")
    
    args = parser.parse_args()
    
    if not COREML_AVAILABLE:
        print("CoreML is only available on macOS. Exiting.")
        return
    
    pipeline = VibeVoicePipeline(args.models_dir)
    
    print(f"Pipeline initialized with models: {list(pipeline.models.keys())}")
    
    if args.text:
        print(f"Note: Full text-to-speech requires tokenizer and complete inference pipeline.")
        print("This script demonstrates individual component usage.")


if __name__ == "__main__":
    main()