acul3's picture
Upload scripts/test_e2e.py with huggingface_hub
b371b10 verified
#!/usr/bin/env python3
"""
Phase 7: End-to-End Validation
================================
Tests the full voice clone pipeline using the exported PyTorch modules:
ref_audio β†’ speaker_encoder β†’ x_vector
ref_audio β†’ speech_tokenizer.encode β†’ ref_codes
text β†’ tokenizer β†’ input_ids
[x_vector, ref_codes, input_ids] β†’ talker β†’ codec_tokens
codec_tokens β†’ vocoder β†’ PCM waveform
Compares output with the original model.
Reports: audio duration, sample rate, waveform similarity.
"""
import sys
import os
import time
import copy
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
MODEL_PATH = os.path.expanduser("~/Documents/Qwen3-TTS/models/1.7B-Base")
VENV_SITE = os.path.expanduser("~/Documents/Qwen3-TTS/.venv/lib/python3.10/site-packages")
QWEN_TTS_SRC = os.path.expanduser("~/Documents/Qwen3-TTS")
OUTPUT_DIR = os.path.expanduser("~/Documents/Qwen3-TTS-ExecuTorch/exported")
if VENV_SITE not in sys.path:
sys.path.insert(0, VENV_SITE)
if QWEN_TTS_SRC not in sys.path:
sys.path.insert(0, QWEN_TTS_SRC)
print("=" * 70)
print("PHASE 7: End-to-End Validation")
print("=" * 70)
# ── 1. Load Original Model ──────────────────────────────────────────
print("\n[1/6] Loading original model...")
from qwen_tts.core.models.configuration_qwen3_tts import Qwen3TTSConfig
from qwen_tts.core.models.modeling_qwen3_tts import (
Qwen3TTSForConditionalGeneration, mel_spectrogram,
)
config = Qwen3TTSConfig.from_pretrained(MODEL_PATH)
model = Qwen3TTSForConditionalGeneration.from_pretrained(
MODEL_PATH, config=config, dtype=torch.float32,
attn_implementation="sdpa", device_map="cpu",
)
model.eval()
print(" Model loaded.")
# ── 2. Create Synthetic Test Data ────────────────────────────────────
print("\n[2/6] Creating synthetic test data...")
# Synthetic reference audio: 3 seconds at 24kHz
ref_sr = 24000
ref_duration = 3.0
np.random.seed(42)
ref_audio = np.random.randn(int(ref_sr * ref_duration)).astype(np.float32) * 0.1
print(f" Reference audio: {ref_duration}s at {ref_sr}Hz ({len(ref_audio)} samples)")
# ── 3. Test Speaker Encoder (.pte vs PyTorch) ───────────────────────
print("\n[3/6] Testing Speaker Encoder...")
# Compute mel spectrogram
mels = mel_spectrogram(
torch.from_numpy(ref_audio).unsqueeze(0),
n_fft=1024, num_mels=128, sampling_rate=24000,
hop_size=256, win_size=1024, fmin=0, fmax=12000,
).transpose(1, 2)
print(f" Mel shape: {list(mels.shape)}")
# PyTorch reference
with torch.no_grad():
orig_spk = model.speaker_encoder(mels)
print(f" Original x-vector shape: {list(orig_spk.shape)}")
# .pte execution
pte_path = os.path.join(OUTPUT_DIR, "speaker_encoder.pte")
if os.path.exists(pte_path):
from executorch.runtime import Runtime
runtime = Runtime.get()
prog = runtime.load_program(open(pte_path, "rb").read())
method = prog.load_method("forward")
# Pad/truncate mel to fixed 469 frames
FIXED_MEL = 469
mel_fixed = torch.zeros(1, FIXED_MEL, 128)
actual_frames = min(mels.shape[1], FIXED_MEL)
mel_fixed[:, :actual_frames, :] = mels[:, :actual_frames, :]
pte_out = method.execute([mel_fixed])
if isinstance(pte_out, (list, tuple)):
pte_out = pte_out[0]
cos_se = F.cosine_similarity(
orig_spk.flatten().unsqueeze(0),
pte_out.flatten().unsqueeze(0)
).item()
print(f" .pte vs PyTorch cosine sim: {cos_se:.6f}")
else:
print(f" .pte not found at {pte_path}, skipping runtime test")
cos_se = None
# ── 4. Test Speech Tokenizer Encode ──────────────────────────────────
print("\n[4/6] Testing Speech Tokenizer Encode...")
ref_wav_tensor = torch.from_numpy(ref_audio).unsqueeze(0).float()
padding_mask = torch.ones_like(ref_wav_tensor, dtype=torch.long)
with torch.no_grad():
enc_out = model.speech_tokenizer.model.encode(
ref_wav_tensor, padding_mask=padding_mask, return_dict=True
)
ref_codes = enc_out.audio_codes[0] # [T, 16]
print(f" Ref codes shape: {list(ref_codes.shape)}")
print(f" Ref codes frames: {ref_codes.shape[0]} ({ref_codes.shape[0] / 12.5:.1f}s at 12.5 Hz)")
# ── 5. Test Vocoder (.pte vs PyTorch) ───────────────────────────────
print("\n[5/6] Testing Vocoder...")
# Generate synthetic codec codes (same shape as what talker would produce)
# Use ref_codes as a stand-in
test_codes = ref_codes.unsqueeze(0).transpose(1, 2) # [1, 16, T]
print(f" Test codes shape: {list(test_codes.shape)}")
# PyTorch reference
with torch.no_grad():
orig_wav = model.speech_tokenizer.model.decoder(test_codes)
print(f" Original wav shape: {list(orig_wav.shape)}")
print(f" Original wav duration: {orig_wav.shape[-1] / 24000:.2f}s")
# .pte test (need fixed size, so we pad/truncate)
voc_pte_path = os.path.join(OUTPUT_DIR, "vocoder.pte")
if os.path.exists(voc_pte_path):
FIXED_CODE_LEN = 50
codes_fixed = torch.zeros(1, 16, FIXED_CODE_LEN, dtype=torch.long)
actual_len = min(test_codes.shape[-1], FIXED_CODE_LEN)
codes_fixed[:, :, :actual_len] = test_codes[:, :, :actual_len]
# PyTorch with fixed size
with torch.no_grad():
ref_wav_fixed = model.speech_tokenizer.model.decoder(codes_fixed)
prog_voc = runtime.load_program(open(voc_pte_path, "rb").read())
method_voc = prog_voc.load_method("forward")
pte_wav = method_voc.execute([codes_fixed])
if isinstance(pte_wav, (list, tuple)):
pte_wav = pte_wav[0]
cos_voc = F.cosine_similarity(
ref_wav_fixed.flatten().unsqueeze(0),
pte_wav.flatten().unsqueeze(0)
).item()
print(f" .pte vs PyTorch cosine sim: {cos_voc:.6f}")
print(f" .pte wav duration: {pte_wav.shape[-1] / 24000:.2f}s")
else:
print(f" .pte not found at {voc_pte_path}, skipping runtime test")
cos_voc = None
# ── 6. Test Talker (PyTorch wrapper validation) ─────────────────────
print("\n[6/6] Testing Talker Wrapper Consistency...")
# We can't easily run a full generate() through the exported wrapper
# (that requires the Python orchestration layer), but we can validate
# that the exported talker backbone gives the same logits as the original.
MAX_SEQ_LEN = 2048
NUM_LAYERS = 28
NUM_KV_HEADS = 8
HEAD_DIM = 128
HIDDEN_SIZE = 2048
class RMSNorm(nn.Module):
def __init__(self, dim, eps=1e-6):
super().__init__()
self.weight = nn.Parameter(torch.ones(dim))
self.eps = eps
def forward(self, x):
dtype = x.dtype; x = x.float()
return (self.weight * (x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps))).to(dtype)
def rotate_half(x):
return torch.cat((-x[..., x.shape[-1]//2:], x[..., :x.shape[-1]//2]), dim=-1)
class TalkerAttnTest(nn.Module):
def __init__(self, orig, _):
super().__init__()
self.q_proj = copy.deepcopy(orig.q_proj); self.k_proj = copy.deepcopy(orig.k_proj)
self.v_proj = copy.deepcopy(orig.v_proj); self.o_proj = copy.deepcopy(orig.o_proj)
self.q_norm = RMSNorm(HEAD_DIM); self.q_norm.weight = copy.deepcopy(orig.q_norm.weight)
self.k_norm = RMSNorm(HEAD_DIM); self.k_norm.weight = copy.deepcopy(orig.k_norm.weight)
self.g = 16 // NUM_KV_HEADS
def forward(self, h, cos, sin, cp, kc, vc, am):
B,S,_ = h.shape
q = self.q_norm(self.q_proj(h).view(B,S,16,HEAD_DIM)).transpose(1,2)
k = self.k_norm(self.k_proj(h).view(B,S,NUM_KV_HEADS,HEAD_DIM)).transpose(1,2)
v = self.v_proj(h).view(B,S,NUM_KV_HEADS,HEAD_DIM).transpose(1,2)
q = q*cos + rotate_half(q)*sin; k = k*cos + rotate_half(k)*sin
kc = kc.clone(); vc = vc.clone(); kc[:,:,cp,:] = k; vc[:,:,cp,:] = v
ke = kc.unsqueeze(2).repeat(1,1,self.g,1,1).reshape(B,16,MAX_SEQ_LEN,HEAD_DIM)
ve = vc.unsqueeze(2).repeat(1,1,self.g,1,1).reshape(B,16,MAX_SEQ_LEN,HEAD_DIM)
o = F.scaled_dot_product_attention(q,ke,ve,attn_mask=am,scale=HEAD_DIM**-0.5)
return self.o_proj(o.transpose(1,2).reshape(B,S,-1)), kc, vc
class TalkerLayerTest(nn.Module):
def __init__(self, orig, i):
super().__init__()
self.attn = TalkerAttnTest(orig.self_attn, i)
self.gp = copy.deepcopy(orig.mlp.gate_proj)
self.up = copy.deepcopy(orig.mlp.up_proj)
self.dp = copy.deepcopy(orig.mlp.down_proj)
self.n1 = RMSNorm(HIDDEN_SIZE); self.n1.weight = copy.deepcopy(orig.input_layernorm.weight)
self.n2 = RMSNorm(HIDDEN_SIZE); self.n2.weight = copy.deepcopy(orig.post_attention_layernorm.weight)
def forward(self, h, cos, sin, cp, kc, vc, am):
r=h; a,kc,vc = self.attn(self.n1(h),cos,sin,cp,kc,vc,am); h=r+a
r=h; x=self.n2(h); h=r+self.dp(F.silu(self.gp(x))*self.up(x))
return h, kc, vc
class TalkerTest(nn.Module):
def __init__(self, orig):
super().__init__()
self.layers = nn.ModuleList([TalkerLayerTest(l,i) for i,l in enumerate(orig.model.layers)])
self.norm = RMSNorm(HIDDEN_SIZE); self.norm.weight = copy.deepcopy(orig.model.norm.weight)
self.codec_head = copy.deepcopy(orig.codec_head)
self.register_buffer("inv_freq", orig.model.rotary_emb.inv_freq.clone())
self.rs = getattr(orig.model.rotary_emb, 'attention_scaling', 1.0)
def forward(self, ie, pid, cp, am, *kv):
pos = pid[0].float()
freqs = pos.unsqueeze(-1)*self.inv_freq.float().unsqueeze(0).unsqueeze(0)
emb = torch.cat([freqs,freqs],dim=-1)
cos = (emb.cos()*self.rs).to(ie.dtype).unsqueeze(1)
sin = (emb.sin()*self.rs).to(ie.dtype).unsqueeze(1)
h = ie; ukv = []
for i, l in enumerate(self.layers):
h,nk,nv = l(h,cos,sin,cp,kv[i*2],kv[i*2+1],am)
ukv.append(nk); ukv.append(nv)
return (self.codec_head(self.norm(h)), *ukv)
print(" Building talker wrapper...")
talker_test = TalkerTest(model.talker)
talker_test.eval()
# Run single forward pass
sl = 5
test_embeds = torch.randn(1, sl, HIDDEN_SIZE)
test_pos = torch.arange(sl).unsqueeze(0).unsqueeze(0).repeat(3, 1, 1)
test_cp = torch.arange(sl)
cm = torch.full((1, 1, sl, MAX_SEQ_LEN), float('-inf'))
for i in range(sl):
cm[:, :, i, :i+1] = 0.0
kv = [torch.zeros(1, NUM_KV_HEADS, MAX_SEQ_LEN, HEAD_DIM) for _ in range(NUM_LAYERS * 2)]
with torch.no_grad():
out = talker_test(test_embeds, test_pos, test_cp, cm, *kv)
logits = out[0]
print(f" Talker wrapper logits shape: {list(logits.shape)}")
print(f" Logits stats: min={logits.min():.3f}, max={logits.max():.3f}, mean={logits.mean():.3f}")
# Compare with original model's backbone
with torch.no_grad():
from transformers.cache_utils import DynamicCache
past_kv = DynamicCache()
cache_pos = torch.arange(sl)
pos_ids = torch.arange(sl).unsqueeze(0).unsqueeze(0).expand(3, 1, -1)
orig_out = model.talker.model(
input_ids=None,
inputs_embeds=test_embeds,
position_ids=pos_ids,
cache_position=cache_pos,
attention_mask=torch.ones(1, sl),
use_cache=False,
)
orig_logits = model.talker.codec_head(orig_out.last_hidden_state)
cos_talker = F.cosine_similarity(
logits.flatten().unsqueeze(0),
orig_logits.flatten().unsqueeze(0)
).item()
max_diff = (logits - orig_logits).abs().max().item()
print(f" Wrapper vs Original logits cosine sim: {cos_talker:.6f}")
print(f" Max abs difference: {max_diff:.2e}")
# ── Summary ──────────────────────────────────────────────────────────
print("\n" + "=" * 70)
print("E2E VALIDATION SUMMARY")
print("=" * 70)
print(f"""
Component Validation Results:
Speaker Encoder .pte vs PyTorch: {"PASS" if cos_se and cos_se > 0.99 else "N/A" if cos_se is None else "FAIL"} (cosine={cos_se if cos_se else 'N/A'})
Vocoder .pte vs PyTorch: {"PASS" if cos_voc and cos_voc > 0.99 else "N/A" if cos_voc is None else "FAIL"} (cosine={cos_voc if cos_voc else 'N/A'})
Talker Wrapper vs Original: {"PASS" if cos_talker > 0.99 else "FAIL"} (cosine={cos_talker:.6f})
Exported .pte Files:
""")
import glob
pte_files = sorted(glob.glob(os.path.join(OUTPUT_DIR, "*.pte")))
total_size = 0
for f in pte_files:
size = os.path.getsize(f) / 1e6
total_size += size
name = os.path.basename(f)
print(f" {name:40s} {size:8.1f} MB")
print(f" {'─' * 49}")
print(f" {'TOTAL':40s} {total_size:8.1f} MB")
print(f"""
Pipeline Architecture:
1. ref_audio (24kHz) β†’ mel_spectrogram β†’ speaker_encoder β†’ x_vector [1, 2048]
2. ref_audio β†’ speech_tokenizer.encode β†’ ref_codes [T, 16]
3. text β†’ tokenizer β†’ input_ids β†’ embedding orchestration β†’ inputs_embeds
4. talker.forward(inputs_embeds, ...) β†’ codec logits β†’ autoregressive decode
(each step calls code_predictor for 15 additional codebook predictions)
5. codec_tokens β†’ vocoder.forward(codes) β†’ PCM waveform
Note: The autoregressive generation loop (steps 3-4) runs in Python,
calling the exported talker and code_predictor .pte modules per step.
The speaker_encoder and vocoder are single-pass modules.
""")
print("Phase 7 complete!")
print("=" * 70)