#!/usr/bin/env python3 """ Phase 7: End-to-End Validation ================================ Tests the full voice clone pipeline using the exported PyTorch modules: ref_audio → speaker_encoder → x_vector ref_audio → speech_tokenizer.encode → ref_codes text → tokenizer → input_ids [x_vector, ref_codes, input_ids] → talker → codec_tokens codec_tokens → vocoder → PCM waveform Compares output with the original model. Reports: audio duration, sample rate, waveform similarity. """ import sys import os import time import copy import numpy as np import torch import torch.nn as nn import torch.nn.functional as F MODEL_PATH = os.path.expanduser("~/Documents/Qwen3-TTS/models/1.7B-Base") VENV_SITE = os.path.expanduser("~/Documents/Qwen3-TTS/.venv/lib/python3.10/site-packages") QWEN_TTS_SRC = os.path.expanduser("~/Documents/Qwen3-TTS") OUTPUT_DIR = os.path.expanduser("~/Documents/Qwen3-TTS-ExecuTorch/exported") if VENV_SITE not in sys.path: sys.path.insert(0, VENV_SITE) if QWEN_TTS_SRC not in sys.path: sys.path.insert(0, QWEN_TTS_SRC) print("=" * 70) print("PHASE 7: End-to-End Validation") print("=" * 70) # ── 1. Load Original Model ────────────────────────────────────────── print("\n[1/6] Loading original model...") from qwen_tts.core.models.configuration_qwen3_tts import Qwen3TTSConfig from qwen_tts.core.models.modeling_qwen3_tts import ( Qwen3TTSForConditionalGeneration, mel_spectrogram, ) config = Qwen3TTSConfig.from_pretrained(MODEL_PATH) model = Qwen3TTSForConditionalGeneration.from_pretrained( MODEL_PATH, config=config, dtype=torch.float32, attn_implementation="sdpa", device_map="cpu", ) model.eval() print(" Model loaded.") # ── 2. Create Synthetic Test Data ──────────────────────────────────── print("\n[2/6] Creating synthetic test data...") # Synthetic reference audio: 3 seconds at 24kHz ref_sr = 24000 ref_duration = 3.0 np.random.seed(42) ref_audio = np.random.randn(int(ref_sr * ref_duration)).astype(np.float32) * 0.1 print(f" Reference audio: {ref_duration}s at {ref_sr}Hz ({len(ref_audio)} samples)") # ── 3. Test Speaker Encoder (.pte vs PyTorch) ─────────────────────── print("\n[3/6] Testing Speaker Encoder...") # Compute mel spectrogram mels = mel_spectrogram( torch.from_numpy(ref_audio).unsqueeze(0), n_fft=1024, num_mels=128, sampling_rate=24000, hop_size=256, win_size=1024, fmin=0, fmax=12000, ).transpose(1, 2) print(f" Mel shape: {list(mels.shape)}") # PyTorch reference with torch.no_grad(): orig_spk = model.speaker_encoder(mels) print(f" Original x-vector shape: {list(orig_spk.shape)}") # .pte execution pte_path = os.path.join(OUTPUT_DIR, "speaker_encoder.pte") if os.path.exists(pte_path): from executorch.runtime import Runtime runtime = Runtime.get() prog = runtime.load_program(open(pte_path, "rb").read()) method = prog.load_method("forward") # Pad/truncate mel to fixed 469 frames FIXED_MEL = 469 mel_fixed = torch.zeros(1, FIXED_MEL, 128) actual_frames = min(mels.shape[1], FIXED_MEL) mel_fixed[:, :actual_frames, :] = mels[:, :actual_frames, :] pte_out = method.execute([mel_fixed]) if isinstance(pte_out, (list, tuple)): pte_out = pte_out[0] cos_se = F.cosine_similarity( orig_spk.flatten().unsqueeze(0), pte_out.flatten().unsqueeze(0) ).item() print(f" .pte vs PyTorch cosine sim: {cos_se:.6f}") else: print(f" .pte not found at {pte_path}, skipping runtime test") cos_se = None # ── 4. Test Speech Tokenizer Encode ────────────────────────────────── print("\n[4/6] Testing Speech Tokenizer Encode...") ref_wav_tensor = torch.from_numpy(ref_audio).unsqueeze(0).float() padding_mask = torch.ones_like(ref_wav_tensor, dtype=torch.long) with torch.no_grad(): enc_out = model.speech_tokenizer.model.encode( ref_wav_tensor, padding_mask=padding_mask, return_dict=True ) ref_codes = enc_out.audio_codes[0] # [T, 16] print(f" Ref codes shape: {list(ref_codes.shape)}") print(f" Ref codes frames: {ref_codes.shape[0]} ({ref_codes.shape[0] / 12.5:.1f}s at 12.5 Hz)") # ── 5. Test Vocoder (.pte vs PyTorch) ─────────────────────────────── print("\n[5/6] Testing Vocoder...") # Generate synthetic codec codes (same shape as what talker would produce) # Use ref_codes as a stand-in test_codes = ref_codes.unsqueeze(0).transpose(1, 2) # [1, 16, T] print(f" Test codes shape: {list(test_codes.shape)}") # PyTorch reference with torch.no_grad(): orig_wav = model.speech_tokenizer.model.decoder(test_codes) print(f" Original wav shape: {list(orig_wav.shape)}") print(f" Original wav duration: {orig_wav.shape[-1] / 24000:.2f}s") # .pte test (need fixed size, so we pad/truncate) voc_pte_path = os.path.join(OUTPUT_DIR, "vocoder.pte") if os.path.exists(voc_pte_path): FIXED_CODE_LEN = 50 codes_fixed = torch.zeros(1, 16, FIXED_CODE_LEN, dtype=torch.long) actual_len = min(test_codes.shape[-1], FIXED_CODE_LEN) codes_fixed[:, :, :actual_len] = test_codes[:, :, :actual_len] # PyTorch with fixed size with torch.no_grad(): ref_wav_fixed = model.speech_tokenizer.model.decoder(codes_fixed) prog_voc = runtime.load_program(open(voc_pte_path, "rb").read()) method_voc = prog_voc.load_method("forward") pte_wav = method_voc.execute([codes_fixed]) if isinstance(pte_wav, (list, tuple)): pte_wav = pte_wav[0] cos_voc = F.cosine_similarity( ref_wav_fixed.flatten().unsqueeze(0), pte_wav.flatten().unsqueeze(0) ).item() print(f" .pte vs PyTorch cosine sim: {cos_voc:.6f}") print(f" .pte wav duration: {pte_wav.shape[-1] / 24000:.2f}s") else: print(f" .pte not found at {voc_pte_path}, skipping runtime test") cos_voc = None # ── 6. Test Talker (PyTorch wrapper validation) ───────────────────── print("\n[6/6] Testing Talker Wrapper Consistency...") # We can't easily run a full generate() through the exported wrapper # (that requires the Python orchestration layer), but we can validate # that the exported talker backbone gives the same logits as the original. MAX_SEQ_LEN = 2048 NUM_LAYERS = 28 NUM_KV_HEADS = 8 HEAD_DIM = 128 HIDDEN_SIZE = 2048 class RMSNorm(nn.Module): def __init__(self, dim, eps=1e-6): super().__init__() self.weight = nn.Parameter(torch.ones(dim)) self.eps = eps def forward(self, x): dtype = x.dtype; x = x.float() return (self.weight * (x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps))).to(dtype) def rotate_half(x): return torch.cat((-x[..., x.shape[-1]//2:], x[..., :x.shape[-1]//2]), dim=-1) class TalkerAttnTest(nn.Module): def __init__(self, orig, _): super().__init__() self.q_proj = copy.deepcopy(orig.q_proj); self.k_proj = copy.deepcopy(orig.k_proj) self.v_proj = copy.deepcopy(orig.v_proj); self.o_proj = copy.deepcopy(orig.o_proj) self.q_norm = RMSNorm(HEAD_DIM); self.q_norm.weight = copy.deepcopy(orig.q_norm.weight) self.k_norm = RMSNorm(HEAD_DIM); self.k_norm.weight = copy.deepcopy(orig.k_norm.weight) self.g = 16 // NUM_KV_HEADS def forward(self, h, cos, sin, cp, kc, vc, am): B,S,_ = h.shape q = self.q_norm(self.q_proj(h).view(B,S,16,HEAD_DIM)).transpose(1,2) k = self.k_norm(self.k_proj(h).view(B,S,NUM_KV_HEADS,HEAD_DIM)).transpose(1,2) v = self.v_proj(h).view(B,S,NUM_KV_HEADS,HEAD_DIM).transpose(1,2) q = q*cos + rotate_half(q)*sin; k = k*cos + rotate_half(k)*sin kc = kc.clone(); vc = vc.clone(); kc[:,:,cp,:] = k; vc[:,:,cp,:] = v ke = kc.unsqueeze(2).repeat(1,1,self.g,1,1).reshape(B,16,MAX_SEQ_LEN,HEAD_DIM) ve = vc.unsqueeze(2).repeat(1,1,self.g,1,1).reshape(B,16,MAX_SEQ_LEN,HEAD_DIM) o = F.scaled_dot_product_attention(q,ke,ve,attn_mask=am,scale=HEAD_DIM**-0.5) return self.o_proj(o.transpose(1,2).reshape(B,S,-1)), kc, vc class TalkerLayerTest(nn.Module): def __init__(self, orig, i): super().__init__() self.attn = TalkerAttnTest(orig.self_attn, i) self.gp = copy.deepcopy(orig.mlp.gate_proj) self.up = copy.deepcopy(orig.mlp.up_proj) self.dp = copy.deepcopy(orig.mlp.down_proj) self.n1 = RMSNorm(HIDDEN_SIZE); self.n1.weight = copy.deepcopy(orig.input_layernorm.weight) self.n2 = RMSNorm(HIDDEN_SIZE); self.n2.weight = copy.deepcopy(orig.post_attention_layernorm.weight) def forward(self, h, cos, sin, cp, kc, vc, am): r=h; a,kc,vc = self.attn(self.n1(h),cos,sin,cp,kc,vc,am); h=r+a r=h; x=self.n2(h); h=r+self.dp(F.silu(self.gp(x))*self.up(x)) return h, kc, vc class TalkerTest(nn.Module): def __init__(self, orig): super().__init__() self.layers = nn.ModuleList([TalkerLayerTest(l,i) for i,l in enumerate(orig.model.layers)]) self.norm = RMSNorm(HIDDEN_SIZE); self.norm.weight = copy.deepcopy(orig.model.norm.weight) self.codec_head = copy.deepcopy(orig.codec_head) self.register_buffer("inv_freq", orig.model.rotary_emb.inv_freq.clone()) self.rs = getattr(orig.model.rotary_emb, 'attention_scaling', 1.0) def forward(self, ie, pid, cp, am, *kv): pos = pid[0].float() freqs = pos.unsqueeze(-1)*self.inv_freq.float().unsqueeze(0).unsqueeze(0) emb = torch.cat([freqs,freqs],dim=-1) cos = (emb.cos()*self.rs).to(ie.dtype).unsqueeze(1) sin = (emb.sin()*self.rs).to(ie.dtype).unsqueeze(1) h = ie; ukv = [] for i, l in enumerate(self.layers): h,nk,nv = l(h,cos,sin,cp,kv[i*2],kv[i*2+1],am) ukv.append(nk); ukv.append(nv) return (self.codec_head(self.norm(h)), *ukv) print(" Building talker wrapper...") talker_test = TalkerTest(model.talker) talker_test.eval() # Run single forward pass sl = 5 test_embeds = torch.randn(1, sl, HIDDEN_SIZE) test_pos = torch.arange(sl).unsqueeze(0).unsqueeze(0).repeat(3, 1, 1) test_cp = torch.arange(sl) cm = torch.full((1, 1, sl, MAX_SEQ_LEN), float('-inf')) for i in range(sl): cm[:, :, i, :i+1] = 0.0 kv = [torch.zeros(1, NUM_KV_HEADS, MAX_SEQ_LEN, HEAD_DIM) for _ in range(NUM_LAYERS * 2)] with torch.no_grad(): out = talker_test(test_embeds, test_pos, test_cp, cm, *kv) logits = out[0] print(f" Talker wrapper logits shape: {list(logits.shape)}") print(f" Logits stats: min={logits.min():.3f}, max={logits.max():.3f}, mean={logits.mean():.3f}") # Compare with original model's backbone with torch.no_grad(): from transformers.cache_utils import DynamicCache past_kv = DynamicCache() cache_pos = torch.arange(sl) pos_ids = torch.arange(sl).unsqueeze(0).unsqueeze(0).expand(3, 1, -1) orig_out = model.talker.model( input_ids=None, inputs_embeds=test_embeds, position_ids=pos_ids, cache_position=cache_pos, attention_mask=torch.ones(1, sl), use_cache=False, ) orig_logits = model.talker.codec_head(orig_out.last_hidden_state) cos_talker = F.cosine_similarity( logits.flatten().unsqueeze(0), orig_logits.flatten().unsqueeze(0) ).item() max_diff = (logits - orig_logits).abs().max().item() print(f" Wrapper vs Original logits cosine sim: {cos_talker:.6f}") print(f" Max abs difference: {max_diff:.2e}") # ── Summary ────────────────────────────────────────────────────────── print("\n" + "=" * 70) print("E2E VALIDATION SUMMARY") print("=" * 70) print(f""" Component Validation Results: Speaker Encoder .pte vs PyTorch: {"PASS" if cos_se and cos_se > 0.99 else "N/A" if cos_se is None else "FAIL"} (cosine={cos_se if cos_se else 'N/A'}) Vocoder .pte vs PyTorch: {"PASS" if cos_voc and cos_voc > 0.99 else "N/A" if cos_voc is None else "FAIL"} (cosine={cos_voc if cos_voc else 'N/A'}) Talker Wrapper vs Original: {"PASS" if cos_talker > 0.99 else "FAIL"} (cosine={cos_talker:.6f}) Exported .pte Files: """) import glob pte_files = sorted(glob.glob(os.path.join(OUTPUT_DIR, "*.pte"))) total_size = 0 for f in pte_files: size = os.path.getsize(f) / 1e6 total_size += size name = os.path.basename(f) print(f" {name:40s} {size:8.1f} MB") print(f" {'─' * 49}") print(f" {'TOTAL':40s} {total_size:8.1f} MB") print(f""" Pipeline Architecture: 1. ref_audio (24kHz) → mel_spectrogram → speaker_encoder → x_vector [1, 2048] 2. ref_audio → speech_tokenizer.encode → ref_codes [T, 16] 3. text → tokenizer → input_ids → embedding orchestration → inputs_embeds 4. talker.forward(inputs_embeds, ...) → codec logits → autoregressive decode (each step calls code_predictor for 15 additional codebook predictions) 5. codec_tokens → vocoder.forward(codes) → PCM waveform Note: The autoregressive generation loop (steps 3-4) runs in Python, calling the exported talker and code_predictor .pte modules per step. The speaker_encoder and vocoder are single-pass modules. """) print("Phase 7 complete!") print("=" * 70)