In [12]:
import os
import sys
import torch
import torchaudio
import random
import numpy as np
import torchaudio
from omegaconf import OmegaConf
from torch.nn import functional as F

from cosyvoice.flow.decoder import ConditionalDecoder, CausalConditionalDecoder
from cosyvoice.flow.flow import CausalMaskedDiffWithXvec
from cosyvoice.flow.flow_matching import CausalConditionalCFM
from cosyvoice.hifigan.f0_predictor import ConvRNNF0Predictor
from cosyvoice.hifigan.generator import HiFTGenerator
from cosyvoice.llm.llm import Qwen2Encoder, Qwen2LM
from cosyvoice.tokenizer.tokenizer import get_qwen_tokenizer
from cosyvoice.transformer.upsample_encoder import UpsampleConformerEncoder
from cosyvoice.utils.common import ras_sampling

# Set CUDA device
# os.environ["CUDA_VISIBLE_DEVICES"] = "0" # Use GPU 0
device = "cuda:0"


def set_deterministic_behavior(seed=42):
 """Set seeds for reproducibility across all random libraries"""
 random.seed(seed)
 np.random.seed(seed)
 torch.manual_seed(seed)
 torch.cuda.manual_seed_all(seed)
 torch.backends.cudnn.deterministic = True
 torch.backends.cudnn.benchmark = False
 os.environ["PYTHONHASHSEED"] = str(seed)


# Call this function at the beginning of your script
set_deterministic_behavior(70000)

In [13]:
model_dir = './pretrained_models/CosyVoice2-0.5B'
allowed_special = 'all'
sample_rate = 24000
fp16 = False

In [None]:
llm_config = {
 'llm_input_size': 896,
 'llm_output_size': 896,
 'speech_token_size': 6561,
 'length_normalized_loss': True,
 'lsm_weight': 0,
 'mix_ratio': [5, 15]
}

llm_encoder_config = {
 'pretrain_path': os.path.join(model_dir, 'CosyVoice-BlankEN')
}

sampling_config = {
 'top_p': 0.8,
 'top_k': 25,
 'win_size': 10,
 'tau_r': 0.1
}

flow_config = {
 'input_size': 512,
 'output_size': 80,
 'spk_embed_dim': 192,
 'output_type': 'mel',
 'vocab_size': 6561,
 'input_frame_rate': 25,
 'only_mask_loss': True,
 'token_mel_ratio': 2,
 'pre_lookahead_len': 3
}

encoder_config = {
 'output_size': 512,
 'attention_heads': 8,
 'linear_units': 2048,
 'num_blocks': 6,
 'dropout_rate': 0.1,
 'positional_dropout_rate': 0.1,
 'attention_dropout_rate': 0.1,
 'normalize_before': True,
 'input_layer': 'linear',
 'pos_enc_layer_type': 'rel_pos_espnet',
 'selfattention_layer_type': 'rel_selfattn',
 'input_size': 512,
 'use_cnn_module': False,
 'macaron_style': False
}

decoder_config = {
 'in_channels': 240,
 'n_spks': 1,
 'spk_emb_dim': 80,
 'cfm_params': {
 'sigma_min': 1e-06,
 'solver': 'euler',
 't_scheduler': 'cosine',
 'training_cfg_rate': 0.2,
 'inference_cfg_rate': 0.7,
 'reg_loss_type': 'l1',
 'use_immiscible': True,
 'immiscible_k': 8,
 'use_contrastive_fm': True,
 'contrastive_lambda': 0.05,
 }
}
decoder_config['cfm_params'] = OmegaConf.create(decoder_config['cfm_params'])

estimator_config = {
 'in_channels': 320,
 'out_channels': 80,
 'channels': [256],
 'dropout': 0.0,
 'attention_head_dim': 64,
 'n_blocks': 4,
 'num_mid_blocks': 12,
 'num_heads': 8,
 'act_fn': 'gelu',
 'static_chunk_size': 50,
 'num_decoding_left_chunks': 2
 }

f0_predictor_config = {
 'num_class': 1,
 'in_channels': 80,
 'cond_channels': 512,
}

hift_config = {
 'in_channels': 80,
 'base_channels': 512,
 'nb_harmonics': 8,
 'sampling_rate': 24000,
 'nsf_alpha': 0.1,
 'nsf_sigma': 0.003,
 'nsf_voiced_threshold': 10,
 'upsample_rates': [8, 5, 3],
 'upsample_kernel_sizes': [16, 11, 7],
 'istft_params': {
 'n_fft': 16,
 'hop_len': 4,
 },
 'resblock_kernel_sizes': [3, 7, 11],
 'resblock_dilation_sizes': [[1, 3, 5], [1, 3, 5], [1, 3, 5]],
 'source_resblock_kernel_sizes': [7, 7, 11],
 'source_resblock_dilation_sizes': [[1, 3, 5], [1, 3, 5], [1, 3, 5]],
 'lrelu_slope': 0.1,
 'audio_limit': 0.99,
}

In [15]:
llm_encoder = Qwen2Encoder(**llm_encoder_config)
llm_model = Qwen2LM(llm=llm_encoder, **llm_config, sampling=ras_sampling)

In [16]:
flow_encoder = UpsampleConformerEncoder(**encoder_config)
estimator = CausalConditionalDecoder(**estimator_config)
flow_decoder = CausalConditionalCFM(**decoder_config, estimator=estimator)
flow = CausalMaskedDiffWithXvec(
 encoder=flow_encoder,
 decoder=flow_decoder,
 **flow_config
)

 deprecate("LoRACompatibleLinear", "1.0.0", deprecation_message)


ConfigAttributeError: Missing key use_immiscible
 full_key: use_immiscible
 object_type=dict

In [None]:
f0_predictor = ConvRNNF0Predictor(**f0_predictor_config)
hifi = HiFTGenerator(**hift_config, f0_predictor=f0_predictor)

dict_keys(['utts', 'speech_token', 'speech_token_len', 'speech_feat', 'speech_feat_len', 'text', 'text_token', 'text_token_len', 'utt_embedding', 'spk_embedding', 'embedding'])

In [30]:
data['speech_token_len'][0], data['speech_token_len']

(tensor(47, dtype=torch.int32),
 tensor([47, 50, 49, 49, 49, 48, 48, 48, 48, 47, 43, 47, 46, 46, 46, 45, 45, 45,
 45, 43], dtype=torch.int32))

In [31]:
len(data['utts']), len(data['text']), len(data['speech_token_len'])

(20, 20, 20)

In [35]:
data['speech_token_len'].shape, data['speech_token_len'].shape, data['spk_embedding'].shape, data['speech_feat'].shape, data['embedding'].shape, data['speech_feat_len'].shape, data['embedding'].shape

(torch.Size([20]),
 torch.Size([20]),
 torch.Size([20, 192]),
 torch.Size([20, 98, 80]),
 torch.Size([20, 192]),
 torch.Size([20]),
 torch.Size([20, 192]))

In [37]:
token_len = data['speech_token_len']

In [38]:
from cosyvoice.utils.mask import make_pad_mask
mask = (~make_pad_mask(token_len)).float().unsqueeze(-1)

In [39]:
mask.shape

torch.Size([20, 50, 1])

In [40]:
token_len

tensor([47, 50, 49, 49, 49, 48, 48, 48, 48, 47, 43, 47, 46, 46, 46, 45, 45, 45,
 45, 43], dtype=torch.int32)

In [5]:
from torch.optim.lr_scheduler import _LRScheduler
import warnings

class ResumableSequentialLR(_LRScheduler):
 """A resumable version of SequentialLR that properly manages child schedulers"""
 
 def __init__(self, optimizer, schedulers, milestones, last_epoch=-1):
 """
 Args:
 optimizer: Wrapped optimizer
 schedulers: List of schedulers to sequentially use
 milestones: List of epoch/step numbers when to switch schedulers
 last_epoch: The index of last epoch/step
 """
 # Validate inputs
 if len(schedulers) != len(milestones) + 1:
 raise ValueError("Expected len(schedulers) == len(milestones) + 1")
 
 self.schedulers = schedulers
 self.milestones = milestones
 self._scheduler_idx = 0
 
 # Initialize parent class (this sets last_epoch and calls step())
 super().__init__(optimizer, last_epoch)
 
 def _get_scheduler_info(self, epoch):
 """Determine which scheduler to use and its relative epoch"""
 scheduler_idx = 0
 relative_epoch = epoch
 
 for i, milestone in enumerate(self.milestones):
 if epoch >= milestone:
 scheduler_idx = i + 1
 if i == 0:
 relative_epoch = epoch - milestone
 else:
 relative_epoch = epoch - milestone
 else:
 break
 
 # Calculate relative epoch for the current scheduler
 if scheduler_idx == 0:
 relative_epoch = epoch
 elif scheduler_idx < len(self.milestones):
 if scheduler_idx == 1:
 relative_epoch = epoch - self.milestones[0]
 else:
 relative_epoch = epoch - self.milestones[scheduler_idx - 1]
 
 return scheduler_idx, relative_epoch
 
 def get_lr(self):
 """Get learning rate from the appropriate scheduler"""
 if not self._get_lr_called_within_step:
 warnings.warn("To get the last learning rate computed by the scheduler, "
 "please use `get_last_lr()`.", UserWarning)
 
 # Get current scheduler and its relative epoch
 scheduler_idx, relative_epoch = self._get_scheduler_info(self.last_epoch)
 scheduler = self.schedulers[scheduler_idx]
 
 # Set the scheduler's last_epoch to match relative progress
 scheduler.last_epoch = relative_epoch
 
 # Get LR from the scheduler
 if hasattr(scheduler, '_get_closed_form_lr'):
 return scheduler._get_closed_form_lr()
 else:
 # Temporarily set the flag to avoid warning from child scheduler
 scheduler._get_lr_called_within_step = True
 lrs = scheduler.get_lr()
 scheduler._get_lr_called_within_step = False
 return lrs
 
 def step(self, epoch=None):
 """Step the scheduler"""
 # Step the parent class (updates last_epoch and sets _get_lr_called_within_step)
 super().step(epoch)
 
 def set_step(self, step):
 """Set the current step for resuming training"""
 self.last_epoch = step - 1
 
 # Update child schedulers' state
 scheduler_idx, relative_epoch = self._get_scheduler_info(step - 1)
 
 # Set all previous schedulers to their final state
 for i in range(scheduler_idx):
 if i < len(self.milestones):
 if i == 0:
 self.schedulers[i].last_epoch = self.milestones[i] - 1
 else:
 self.schedulers[i].last_epoch = self.milestones[i] - self.milestones[i-1] - 1
 
 # Set current scheduler to its relative position
 self.schedulers[scheduler_idx].last_epoch = relative_epoch
 
 # Update optimizer's learning rates
 for param_group, lr in zip(self.optimizer.param_groups, self.get_last_lr()):
 param_group['lr'] = lr


# Alternative simpler implementation that's more robust
class SimpleResumableSequentialLR(_LRScheduler):
 """Simpler implementation that manually tracks scheduler states"""
 
 def __init__(self, optimizer, schedulers, milestones, last_epoch=-1):
 self.schedulers = schedulers
 self.milestones = milestones
 super().__init__(optimizer, last_epoch)
 
 def get_lr(self):
 """Calculate learning rate based on current epoch"""
 epoch = self.last_epoch
 
 # For LinearLR with warmup
 if epoch < self.milestones[0]:
 # We're in warmup phase
 warmup_scheduler = self.schedulers[0]
 start_factor = warmup_scheduler.start_factor
 end_factor = warmup_scheduler.end_factor
 total_iters = warmup_scheduler.total_iters
 
 # Calculate factor
 if epoch >= total_iters:
 factor = end_factor
 else:
 factor = start_factor + (end_factor - start_factor) * epoch / total_iters
 
 # Apply factor to base learning rates
 return [base_lr * factor for base_lr in self.base_lrs]
 else:
 # We're in constant phase - just return base LRs
 return [base_lr * 1.0 for base_lr in self.base_lrs]


# Test function to verify the scheduler works correctly
def test_resumable_scheduler():
 """Test the ResumableSequentialLR implementation"""
 import torch
 import torch.optim as optim
 from torch.optim.lr_scheduler import LinearLR, ConstantLR
 
 # Create dummy model and optimizer
 model = torch.nn.Linear(10, 1)
 base_lr = 1e-3
 optimizer = optim.Adam(model.parameters(), lr=base_lr)
 
 # Create schedulers
 warmup_steps = 5
 warmup_scheduler = LinearLR(
 optimizer,
 start_factor=0.1,
 end_factor=1.0,
 total_iters=warmup_steps
 )
 
 constant_scheduler = ConstantLR(
 optimizer,
 factor=1.0,
 total_iters=float('inf')
 )
 
 # Test both implementations
 print("Testing ResumableSequentialLR:")
 print("-" * 50)
 
 # Reset optimizer
 for param_group in optimizer.param_groups:
 param_group['lr'] = base_lr
 
 scheduler = ResumableSequentialLR(
 optimizer,
 schedulers=[warmup_scheduler, constant_scheduler],
 milestones=[warmup_steps]
 )
 
 print(f"{'Step':<10} {'LR':<15} {'Expected':<15} {'Match':<10}")
 print("-" * 50)
 
 for step in range(10):
 current_lr = optimizer.param_groups[0]['lr']
 
 # Calculate expected LR
 if step < warmup_steps:
 expected_lr = base_lr * (0.1 + 0.9 * step / warmup_steps)
 else:
 expected_lr = base_lr
 
 match = "✓" if abs(current_lr - expected_lr) < 1e-10 else "✗"
 print(f"{step:<10} {current_lr:<15.6e} {expected_lr:<15.6e} {match:<10}")
 
 scheduler.step()
 
 # Test resuming
 print("\nTesting resume from step 7:")
 print("-" * 50)
 
 # Reset and jump to step 7
 for param_group in optimizer.param_groups:
 param_group['lr'] = base_lr
 
 scheduler = ResumableSequentialLR(
 optimizer,
 schedulers=[warmup_scheduler, constant_scheduler],
 milestones=[warmup_steps]
 )
 scheduler.set_step(7)
 
 for step in range(7, 10):
 scheduler.step()
 current_lr = optimizer.param_groups[0]['lr']
 expected_lr = base_lr # Should be constant phase
 match = "✓" if abs(current_lr - expected_lr) < 1e-10 else "✗"
 print(f"{step:<10} {current_lr:<15.6e} {expected_lr:<15.6e} {match:<10}")


if __name__ == "__main__":
 test_resumable_scheduler()

Testing ResumableSequentialLR:
--------------------------------------------------
Step LR Expected Match 
--------------------------------------------------
0 1.000000e-04 1.000000e-04 ✓ 
1 2.800000e-04 2.800000e-04 ✓ 
2 4.600000e-04 4.600000e-04 ✓ 
3 6.400000e-04 6.400000e-04 ✓ 
4 8.200000e-04 8.200000e-04 ✓ 
5 1.000000e-03 1.000000e-03 ✓ 
6 1.000000e-03 1.000000e-03 ✓ 
7 1.000000e-03 1.000000e-03 ✓ 
8 1.000000e-03 1.000000e-03 ✓ 
9 1.000000e-03 1.000000e-03 ✓ 

Testing resume from step 7:
--------------------------------------------------
7 1.000000e-03 1.000000e-03 ✓ 
8 1.000000e-03 1.000000e-03 ✓ 
9 1.000000e-03 1.000000e-03 ✓ 


In [3]:
import torch
import torch.optim as optim
from torch.optim.lr_scheduler import LinearLR, ConstantLR, SequentialLR

def verify_lr_sources():
 """Verify that optimizer.param_groups[0]['lr'] is the correct source"""
 
 # Create a simple model and optimizer
 model = torch.nn.Linear(10, 1)
 optimizer = optim.Adam(model.parameters(), lr=1e-3)
 
 # Create schedulers
 warmup_scheduler = LinearLR(
 optimizer,
 start_factor=0.1, # Start at 10% of base LR
 end_factor=1.0, # End at 100% of base LR
 total_iters=5 # 5 warmup steps
 )
 
 constant_scheduler = ConstantLR(
 optimizer,
 factor=1.0,
 total_iters=float('inf')
 )
 
 scheduler = SequentialLR(
 optimizer,
 schedulers=[warmup_scheduler, constant_scheduler],
 milestones=[5]
 )
 
 print("Comparing LR sources during warmup:\n")
 print(f"{'Step':<6} {'Optimizer LR':<15} {'Scheduler LR':<15} {'Match?':<10}")
 print("-" * 50)
 
 for step in range(10):
 # Get LR from optimizer
 optimizer_lr = optimizer.param_groups[0]['lr']
 
 # Get LR from scheduler (if available)
 # Note: scheduler.get_last_lr() returns the LR after the last step
 scheduler_lr = scheduler.get_last_lr()[0] if hasattr(scheduler, 'get_last_lr') else None
 
 # Print comparison
 match = "✓" if scheduler_lr is None or abs(optimizer_lr - scheduler_lr) < 1e-10 else "✗"
 print(f"{step:<6} {optimizer_lr:<15.2e} {scheduler_lr:<15.2e} {match:<10}")
 
 # Step the scheduler
 scheduler.step()
 
 print("\nConclusion: optimizer.param_groups[0]['lr'] is the authoritative source!")
 
 # Additional verification: what happens if we manually change the optimizer's LR?
 print("\n\nManual LR change test:")
 print(f"Current optimizer LR: {optimizer.param_groups[0]['lr']:.2e}")
 
 # Manually change it
 for param_group in optimizer.param_groups:
 param_group['lr'] = 0.01
 
 print(f"After manual change: {optimizer.param_groups[0]['lr']:.2e}")
 print("This confirms the optimizer holds the actual LR being used.")


def compare_lr_access_methods():
 """Compare different ways to access the learning rate"""
 
 model = torch.nn.Linear(10, 1)
 optimizer = optim.Adam(model.parameters(), lr=1e-3)
 
 scheduler = LinearLR(
 optimizer,
 start_factor=0.1,
 end_factor=1.0,
 total_iters=5
 )
 
 print("\nDifferent ways to access learning rate:\n")
 
 # Before any steps
 print("Initial state:")
 print(f" optimizer.param_groups[0]['lr']: {optimizer.param_groups[0]['lr']:.2e}")
 print(f" scheduler.get_last_lr(): {scheduler.get_last_lr()[0]:.2e}")
 
 # After stepping
 scheduler.step()
 print("\nAfter scheduler.step():")
 print(f" optimizer.param_groups[0]['lr']: {optimizer.param_groups[0]['lr']:.2e}")
 print(f" scheduler.get_last_lr(): {scheduler.get_last_lr()[0]:.2e}")
 
 # Key insight
 print("\nKey insights:")
 print("1. optimizer.param_groups[0]['lr'] - Always current, used by optimizer")
 print("2. scheduler.get_last_lr() - What scheduler set on last step()")
 print("3. scheduler.get_lr() - Internal method, calculates next LR (don't use directly)")


def check_multiple_param_groups():
 """Check how LR works with multiple parameter groups"""
 
 model = torch.nn.Sequential(
 torch.nn.Linear(10, 20),
 torch.nn.Linear(20, 1)
 )
 
 # Different LRs for different layers
 optimizer = optim.Adam([
 {'params': model[0].parameters(), 'lr': 1e-3},
 {'params': model[1].parameters(), 'lr': 1e-4}
 ])
 
 print("\nMultiple parameter groups:")
 for i, param_group in enumerate(optimizer.param_groups):
 print(f" Group {i}: lr = {param_group['lr']:.2e}")
 
 # Scheduler affects all groups
 scheduler = LinearLR(optimizer, start_factor=0.1, end_factor=1.0, total_iters=5)
 scheduler.step()
 
 print("\nAfter scheduler step:")
 for i, param_group in enumerate(optimizer.param_groups):
 print(f" Group {i}: lr = {param_group['lr']:.2e}")


if __name__ == "__main__":
 print("=== Learning Rate Source Verification ===\n")
 verify_lr_sources()
 print("\n" + "="*50 + "\n")
 compare_lr_access_methods()
 print("\n" + "="*50 + "\n")
 check_multiple_param_groups()

=== Learning Rate Source Verification ===

Comparing LR sources during warmup:

Step Optimizer LR Scheduler LR Match? 
--------------------------------------------------
0 1.00e-04 1.00e-04 ✓ 
1 2.80e-04 2.80e-04 ✓ 
2 4.60e-04 4.60e-04 ✓ 
3 6.40e-04 6.40e-04 ✓ 
4 8.20e-04 8.20e-04 ✓ 
5 1.00e-03 1.00e-03 ✓ 
6 1.00e-03 1.00e-03 ✓ 
7 1.00e-03 1.00e-03 ✓ 
8 1.00e-03 1.00e-03 ✓ 
9 1.00e-03 1.00e-03 ✓ 

Conclusion: optimizer.param_groups[0]['lr'] is the authoritative source!


Manual LR change test:
Current optimizer LR: 1.00e-03
After manual change: 1.00e-02
This confirms the optimizer holds the actual LR being used.



Different ways to access learning rate:

Initial state:
 optimizer.param_groups[0]['lr']: 1.00e-04
 scheduler.get_last_lr(): 1.00e-04

After scheduler.step():
 optimizer.param_groups[0]['lr']: 2.80e-04
 scheduler.get_last_lr(): 2.80e-04

Key insights:
1. optimizer.param_groups[0]['lr'] - Always current, used by optimizer
2. scheduler.get_last_lr() - What scheduler set on las