| |
| """ |
| Phase 7: End-to-End Validation |
| ================================ |
| Tests the full voice clone pipeline using the exported PyTorch modules: |
| ref_audio β speaker_encoder β x_vector |
| ref_audio β speech_tokenizer.encode β ref_codes |
| text β tokenizer β input_ids |
| [x_vector, ref_codes, input_ids] β talker β codec_tokens |
| codec_tokens β vocoder β PCM waveform |
| |
| Compares output with the original model. |
| Reports: audio duration, sample rate, waveform similarity. |
| """ |
|
|
| import sys |
| import os |
| import time |
| import copy |
| import numpy as np |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
|
|
| MODEL_PATH = os.path.expanduser("~/Documents/Qwen3-TTS/models/1.7B-Base") |
| VENV_SITE = os.path.expanduser("~/Documents/Qwen3-TTS/.venv/lib/python3.10/site-packages") |
| QWEN_TTS_SRC = os.path.expanduser("~/Documents/Qwen3-TTS") |
| OUTPUT_DIR = os.path.expanduser("~/Documents/Qwen3-TTS-ExecuTorch/exported") |
|
|
| if VENV_SITE not in sys.path: |
| sys.path.insert(0, VENV_SITE) |
| if QWEN_TTS_SRC not in sys.path: |
| sys.path.insert(0, QWEN_TTS_SRC) |
|
|
| print("=" * 70) |
| print("PHASE 7: End-to-End Validation") |
| print("=" * 70) |
|
|
| |
|
|
| print("\n[1/6] Loading original model...") |
| from qwen_tts.core.models.configuration_qwen3_tts import Qwen3TTSConfig |
| from qwen_tts.core.models.modeling_qwen3_tts import ( |
| Qwen3TTSForConditionalGeneration, mel_spectrogram, |
| ) |
|
|
| config = Qwen3TTSConfig.from_pretrained(MODEL_PATH) |
| model = Qwen3TTSForConditionalGeneration.from_pretrained( |
| MODEL_PATH, config=config, dtype=torch.float32, |
| attn_implementation="sdpa", device_map="cpu", |
| ) |
| model.eval() |
| print(" Model loaded.") |
|
|
| |
|
|
| print("\n[2/6] Creating synthetic test data...") |
|
|
| |
| ref_sr = 24000 |
| ref_duration = 3.0 |
| np.random.seed(42) |
| ref_audio = np.random.randn(int(ref_sr * ref_duration)).astype(np.float32) * 0.1 |
| print(f" Reference audio: {ref_duration}s at {ref_sr}Hz ({len(ref_audio)} samples)") |
|
|
| |
|
|
| print("\n[3/6] Testing Speaker Encoder...") |
|
|
| |
| mels = mel_spectrogram( |
| torch.from_numpy(ref_audio).unsqueeze(0), |
| n_fft=1024, num_mels=128, sampling_rate=24000, |
| hop_size=256, win_size=1024, fmin=0, fmax=12000, |
| ).transpose(1, 2) |
| print(f" Mel shape: {list(mels.shape)}") |
|
|
| |
| with torch.no_grad(): |
| orig_spk = model.speaker_encoder(mels) |
| print(f" Original x-vector shape: {list(orig_spk.shape)}") |
|
|
| |
| pte_path = os.path.join(OUTPUT_DIR, "speaker_encoder.pte") |
| if os.path.exists(pte_path): |
| from executorch.runtime import Runtime |
| runtime = Runtime.get() |
|
|
| prog = runtime.load_program(open(pte_path, "rb").read()) |
| method = prog.load_method("forward") |
|
|
| |
| FIXED_MEL = 469 |
| mel_fixed = torch.zeros(1, FIXED_MEL, 128) |
| actual_frames = min(mels.shape[1], FIXED_MEL) |
| mel_fixed[:, :actual_frames, :] = mels[:, :actual_frames, :] |
|
|
| pte_out = method.execute([mel_fixed]) |
| if isinstance(pte_out, (list, tuple)): |
| pte_out = pte_out[0] |
|
|
| cos_se = F.cosine_similarity( |
| orig_spk.flatten().unsqueeze(0), |
| pte_out.flatten().unsqueeze(0) |
| ).item() |
| print(f" .pte vs PyTorch cosine sim: {cos_se:.6f}") |
| else: |
| print(f" .pte not found at {pte_path}, skipping runtime test") |
| cos_se = None |
|
|
| |
|
|
| print("\n[4/6] Testing Speech Tokenizer Encode...") |
|
|
| ref_wav_tensor = torch.from_numpy(ref_audio).unsqueeze(0).float() |
| padding_mask = torch.ones_like(ref_wav_tensor, dtype=torch.long) |
|
|
| with torch.no_grad(): |
| enc_out = model.speech_tokenizer.model.encode( |
| ref_wav_tensor, padding_mask=padding_mask, return_dict=True |
| ) |
| ref_codes = enc_out.audio_codes[0] |
| print(f" Ref codes shape: {list(ref_codes.shape)}") |
| print(f" Ref codes frames: {ref_codes.shape[0]} ({ref_codes.shape[0] / 12.5:.1f}s at 12.5 Hz)") |
|
|
| |
|
|
| print("\n[5/6] Testing Vocoder...") |
|
|
| |
| |
| test_codes = ref_codes.unsqueeze(0).transpose(1, 2) |
| print(f" Test codes shape: {list(test_codes.shape)}") |
|
|
| |
| with torch.no_grad(): |
| orig_wav = model.speech_tokenizer.model.decoder(test_codes) |
| print(f" Original wav shape: {list(orig_wav.shape)}") |
| print(f" Original wav duration: {orig_wav.shape[-1] / 24000:.2f}s") |
|
|
| |
| voc_pte_path = os.path.join(OUTPUT_DIR, "vocoder.pte") |
| if os.path.exists(voc_pte_path): |
| FIXED_CODE_LEN = 50 |
| codes_fixed = torch.zeros(1, 16, FIXED_CODE_LEN, dtype=torch.long) |
| actual_len = min(test_codes.shape[-1], FIXED_CODE_LEN) |
| codes_fixed[:, :, :actual_len] = test_codes[:, :, :actual_len] |
|
|
| |
| with torch.no_grad(): |
| ref_wav_fixed = model.speech_tokenizer.model.decoder(codes_fixed) |
|
|
| prog_voc = runtime.load_program(open(voc_pte_path, "rb").read()) |
| method_voc = prog_voc.load_method("forward") |
| pte_wav = method_voc.execute([codes_fixed]) |
| if isinstance(pte_wav, (list, tuple)): |
| pte_wav = pte_wav[0] |
|
|
| cos_voc = F.cosine_similarity( |
| ref_wav_fixed.flatten().unsqueeze(0), |
| pte_wav.flatten().unsqueeze(0) |
| ).item() |
| print(f" .pte vs PyTorch cosine sim: {cos_voc:.6f}") |
| print(f" .pte wav duration: {pte_wav.shape[-1] / 24000:.2f}s") |
| else: |
| print(f" .pte not found at {voc_pte_path}, skipping runtime test") |
| cos_voc = None |
|
|
| |
|
|
| print("\n[6/6] Testing Talker Wrapper Consistency...") |
|
|
| |
| |
| |
|
|
| MAX_SEQ_LEN = 2048 |
| NUM_LAYERS = 28 |
| NUM_KV_HEADS = 8 |
| HEAD_DIM = 128 |
| HIDDEN_SIZE = 2048 |
|
|
| class RMSNorm(nn.Module): |
| def __init__(self, dim, eps=1e-6): |
| super().__init__() |
| self.weight = nn.Parameter(torch.ones(dim)) |
| self.eps = eps |
| def forward(self, x): |
| dtype = x.dtype; x = x.float() |
| return (self.weight * (x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps))).to(dtype) |
|
|
| def rotate_half(x): |
| return torch.cat((-x[..., x.shape[-1]//2:], x[..., :x.shape[-1]//2]), dim=-1) |
|
|
| class TalkerAttnTest(nn.Module): |
| def __init__(self, orig, _): |
| super().__init__() |
| self.q_proj = copy.deepcopy(orig.q_proj); self.k_proj = copy.deepcopy(orig.k_proj) |
| self.v_proj = copy.deepcopy(orig.v_proj); self.o_proj = copy.deepcopy(orig.o_proj) |
| self.q_norm = RMSNorm(HEAD_DIM); self.q_norm.weight = copy.deepcopy(orig.q_norm.weight) |
| self.k_norm = RMSNorm(HEAD_DIM); self.k_norm.weight = copy.deepcopy(orig.k_norm.weight) |
| self.g = 16 // NUM_KV_HEADS |
|
|
| def forward(self, h, cos, sin, cp, kc, vc, am): |
| B,S,_ = h.shape |
| q = self.q_norm(self.q_proj(h).view(B,S,16,HEAD_DIM)).transpose(1,2) |
| k = self.k_norm(self.k_proj(h).view(B,S,NUM_KV_HEADS,HEAD_DIM)).transpose(1,2) |
| v = self.v_proj(h).view(B,S,NUM_KV_HEADS,HEAD_DIM).transpose(1,2) |
| q = q*cos + rotate_half(q)*sin; k = k*cos + rotate_half(k)*sin |
| kc = kc.clone(); vc = vc.clone(); kc[:,:,cp,:] = k; vc[:,:,cp,:] = v |
| ke = kc.unsqueeze(2).repeat(1,1,self.g,1,1).reshape(B,16,MAX_SEQ_LEN,HEAD_DIM) |
| ve = vc.unsqueeze(2).repeat(1,1,self.g,1,1).reshape(B,16,MAX_SEQ_LEN,HEAD_DIM) |
| o = F.scaled_dot_product_attention(q,ke,ve,attn_mask=am,scale=HEAD_DIM**-0.5) |
| return self.o_proj(o.transpose(1,2).reshape(B,S,-1)), kc, vc |
|
|
| class TalkerLayerTest(nn.Module): |
| def __init__(self, orig, i): |
| super().__init__() |
| self.attn = TalkerAttnTest(orig.self_attn, i) |
| self.gp = copy.deepcopy(orig.mlp.gate_proj) |
| self.up = copy.deepcopy(orig.mlp.up_proj) |
| self.dp = copy.deepcopy(orig.mlp.down_proj) |
| self.n1 = RMSNorm(HIDDEN_SIZE); self.n1.weight = copy.deepcopy(orig.input_layernorm.weight) |
| self.n2 = RMSNorm(HIDDEN_SIZE); self.n2.weight = copy.deepcopy(orig.post_attention_layernorm.weight) |
|
|
| def forward(self, h, cos, sin, cp, kc, vc, am): |
| r=h; a,kc,vc = self.attn(self.n1(h),cos,sin,cp,kc,vc,am); h=r+a |
| r=h; x=self.n2(h); h=r+self.dp(F.silu(self.gp(x))*self.up(x)) |
| return h, kc, vc |
|
|
| class TalkerTest(nn.Module): |
| def __init__(self, orig): |
| super().__init__() |
| self.layers = nn.ModuleList([TalkerLayerTest(l,i) for i,l in enumerate(orig.model.layers)]) |
| self.norm = RMSNorm(HIDDEN_SIZE); self.norm.weight = copy.deepcopy(orig.model.norm.weight) |
| self.codec_head = copy.deepcopy(orig.codec_head) |
| self.register_buffer("inv_freq", orig.model.rotary_emb.inv_freq.clone()) |
| self.rs = getattr(orig.model.rotary_emb, 'attention_scaling', 1.0) |
|
|
| def forward(self, ie, pid, cp, am, *kv): |
| pos = pid[0].float() |
| freqs = pos.unsqueeze(-1)*self.inv_freq.float().unsqueeze(0).unsqueeze(0) |
| emb = torch.cat([freqs,freqs],dim=-1) |
| cos = (emb.cos()*self.rs).to(ie.dtype).unsqueeze(1) |
| sin = (emb.sin()*self.rs).to(ie.dtype).unsqueeze(1) |
| h = ie; ukv = [] |
| for i, l in enumerate(self.layers): |
| h,nk,nv = l(h,cos,sin,cp,kv[i*2],kv[i*2+1],am) |
| ukv.append(nk); ukv.append(nv) |
| return (self.codec_head(self.norm(h)), *ukv) |
|
|
| print(" Building talker wrapper...") |
| talker_test = TalkerTest(model.talker) |
| talker_test.eval() |
|
|
| |
| sl = 5 |
| test_embeds = torch.randn(1, sl, HIDDEN_SIZE) |
| test_pos = torch.arange(sl).unsqueeze(0).unsqueeze(0).repeat(3, 1, 1) |
| test_cp = torch.arange(sl) |
| cm = torch.full((1, 1, sl, MAX_SEQ_LEN), float('-inf')) |
| for i in range(sl): |
| cm[:, :, i, :i+1] = 0.0 |
| kv = [torch.zeros(1, NUM_KV_HEADS, MAX_SEQ_LEN, HEAD_DIM) for _ in range(NUM_LAYERS * 2)] |
|
|
| with torch.no_grad(): |
| out = talker_test(test_embeds, test_pos, test_cp, cm, *kv) |
|
|
| logits = out[0] |
| print(f" Talker wrapper logits shape: {list(logits.shape)}") |
| print(f" Logits stats: min={logits.min():.3f}, max={logits.max():.3f}, mean={logits.mean():.3f}") |
|
|
| |
| with torch.no_grad(): |
| from transformers.cache_utils import DynamicCache |
| past_kv = DynamicCache() |
| cache_pos = torch.arange(sl) |
| pos_ids = torch.arange(sl).unsqueeze(0).unsqueeze(0).expand(3, 1, -1) |
|
|
| orig_out = model.talker.model( |
| input_ids=None, |
| inputs_embeds=test_embeds, |
| position_ids=pos_ids, |
| cache_position=cache_pos, |
| attention_mask=torch.ones(1, sl), |
| use_cache=False, |
| ) |
| orig_logits = model.talker.codec_head(orig_out.last_hidden_state) |
|
|
| cos_talker = F.cosine_similarity( |
| logits.flatten().unsqueeze(0), |
| orig_logits.flatten().unsqueeze(0) |
| ).item() |
| max_diff = (logits - orig_logits).abs().max().item() |
| print(f" Wrapper vs Original logits cosine sim: {cos_talker:.6f}") |
| print(f" Max abs difference: {max_diff:.2e}") |
|
|
| |
|
|
| print("\n" + "=" * 70) |
| print("E2E VALIDATION SUMMARY") |
| print("=" * 70) |
|
|
| print(f""" |
| Component Validation Results: |
| Speaker Encoder .pte vs PyTorch: {"PASS" if cos_se and cos_se > 0.99 else "N/A" if cos_se is None else "FAIL"} (cosine={cos_se if cos_se else 'N/A'}) |
| Vocoder .pte vs PyTorch: {"PASS" if cos_voc and cos_voc > 0.99 else "N/A" if cos_voc is None else "FAIL"} (cosine={cos_voc if cos_voc else 'N/A'}) |
| Talker Wrapper vs Original: {"PASS" if cos_talker > 0.99 else "FAIL"} (cosine={cos_talker:.6f}) |
| |
| Exported .pte Files: |
| """) |
|
|
| import glob |
| pte_files = sorted(glob.glob(os.path.join(OUTPUT_DIR, "*.pte"))) |
| total_size = 0 |
| for f in pte_files: |
| size = os.path.getsize(f) / 1e6 |
| total_size += size |
| name = os.path.basename(f) |
| print(f" {name:40s} {size:8.1f} MB") |
| print(f" {'β' * 49}") |
| print(f" {'TOTAL':40s} {total_size:8.1f} MB") |
|
|
| print(f""" |
| Pipeline Architecture: |
| 1. ref_audio (24kHz) β mel_spectrogram β speaker_encoder β x_vector [1, 2048] |
| 2. ref_audio β speech_tokenizer.encode β ref_codes [T, 16] |
| 3. text β tokenizer β input_ids β embedding orchestration β inputs_embeds |
| 4. talker.forward(inputs_embeds, ...) β codec logits β autoregressive decode |
| (each step calls code_predictor for 15 additional codebook predictions) |
| 5. codec_tokens β vocoder.forward(codes) β PCM waveform |
| |
| Note: The autoregressive generation loop (steps 3-4) runs in Python, |
| calling the exported talker and code_predictor .pte modules per step. |
| The speaker_encoder and vocoder are single-pass modules. |
| """) |
|
|
| print("Phase 7 complete!") |
| print("=" * 70) |
|
|