Spaces:
Runtime error
Runtime error
| from slam_llm.utils.train_utils import print_module_size | |
| import torch | |
| import torchaudio | |
| import os | |
| import torch.nn as nn | |
| import uuid | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| def setup_codec(train_config, model_config, **kwargs): | |
| import sys | |
| sys.path.append(os.path.dirname(os.path.abspath(__file__))) | |
| sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "third_party/Matcha-TTS")) | |
| from cosyvoice.cli.cosyvoice import CosyVoice,CosyVoice2 | |
| if model_config.cosyvoice_version==1: | |
| codec_decoder = CosyVoice(model_config.codec_decoder_path, load_jit=False, load_trt=False, fp16=False) | |
| elif model_config.cosyvoice_version==2: | |
| codec_decoder = CosyVoice2(model_config.codec_decoder_path, load_jit=False, load_trt=False, fp16=False) | |
| else: | |
| raise NotImplementedError | |
| codec_decoder_module = nn.ModuleList((codec_decoder.model.flow,codec_decoder.model.hift)) | |
| print_module_size(codec_decoder_module, model_config.codec_decoder_type + " Codec", int(os.environ["RANK"]) if train_config.enable_fsdp or train_config.enable_ddp else 0) | |
| return codec_decoder | |
| def get_single_layer_answer_token(audio_tokens, num_latency_tokens, padding_token, end_of_audio): | |
| audio_length = len(audio_tokens) + num_latency_tokens + 1 # 1 is due to end of audio token | |
| result = [padding_token] * num_latency_tokens + list(audio_tokens) + [end_of_audio] | |
| result_tensor = torch.tensor(result).unsqueeze(0) | |
| return result_tensor, audio_length | |
| def get_group_answer_token(audio_tokens, num_latency_tokens, padding_token, end_of_audio, num_layers): | |
| padded_audio_tokens = audio_tokens + [end_of_audio] | |
| padding_needed = (num_layers - len(padded_audio_tokens) % num_layers ) % num_layers | |
| # Add padding to ensure even distribution across layers | |
| padded_audio_tokens = padded_audio_tokens + [padding_token] * padding_needed | |
| total_length = len(padded_audio_tokens) | |
| audio_length = total_length // num_layers + num_latency_tokens | |
| # Create the result for each layer | |
| result = [] | |
| for layer in range(num_layers): | |
| layer_tokens = [padding_token] * num_latency_tokens | |
| layer_tokens.extend(padded_audio_tokens[layer::num_layers]) | |
| result.append(torch.tensor(layer_tokens)) | |
| result_tensor = torch.stack(result) | |
| return result_tensor, audio_length | |
| def audio_decode_cosyvoice(audio_tokens, model_config, codec_decoder, audio_prompt_path=None, code_layer=1, num_latency_tokens=1, speed=1.0, replace_token=4095): | |
| """ | |
| Generate audio from tokens with optional tone and prompt embedding. | |
| Args: | |
| audio_tokens (list): List of audio tokens to be processed. | |
| model_config: Configuration object containing vocab settings. | |
| codec_decoder: Codec decoder for generating audio. | |
| audio_prompt_path (str, optional): Path to the audio prompt file. Required when tone_dir is not "default_tone". | |
| code_layer (int, optional): Number of code layers. Defaults to 1. | |
| num_latency_tokens (int, optional): Number of latency tokens to ignore. Defaults to 0. | |
| speed (float, optional): Speed factor for audio generation. Defaults to 1.0. | |
| Returns: | |
| torch.Tensor: Generated audio waveform. | |
| """ | |
| # Reshape audio tokens based on code_layer | |
| if code_layer > 1: | |
| audio_tokens_tensor = torch.stack(audio_tokens, dim=0) | |
| audio_tokens_permuted = audio_tokens_tensor.permute(1, 0) | |
| audio_tokens = audio_tokens_permuted.reshape(-1).unsqueeze(0) | |
| audio_tokens = audio_tokens[..., num_latency_tokens * code_layer:] | |
| elif code_layer == 1: | |
| audio_tokens = torch.cat(audio_tokens, dim=-1).unsqueeze(0) | |
| audio_tokens = audio_tokens[..., num_latency_tokens:] | |
| else: | |
| audio_tokens = audio_tokens[..., num_latency_tokens:] | |
| # Get vocabulary configuration for end of audio (EOA) and padding token | |
| eoa = model_config.vocab_config.eoa | |
| pad_a = model_config.vocab_config.pad_a | |
| # Truncate audio tokens at the EOA token | |
| if eoa not in audio_tokens[0]: | |
| return None | |
| end_index = torch.nonzero(audio_tokens[0] == eoa)[0] | |
| audio_tokens = audio_tokens[..., :end_index] | |
| # Handle padding tokens if present, # FIXME: this is a temporary fix for the padding issue, where the padding token may be included in the audio tokens | |
| if pad_a in audio_tokens: | |
| audio_tokens = audio_tokens.masked_fill(audio_tokens == pad_a, replace_token) | |
| if model_config.save_audio_token: | |
| return audio_tokens | |
| if audio_tokens.numel()==0: | |
| return None | |
| this_uuid = str(uuid.uuid1()) # Generate a unique ID for this audio generation | |
| from utils.cosyvoice.utils.file_utils import load_wav | |
| prompt_speech_16k = load_wav(audio_prompt_path, 16000) | |
| flow_prompt_speech_token, flow_prompt_speech_token_len = codec_decoder.frontend._extract_speech_token(prompt_speech_16k) | |
| if model_config.cosyvoice_version==1: | |
| prompt_speech_22050 = torchaudio.transforms.Resample(orig_freq=16000, new_freq=22050)(prompt_speech_16k) | |
| prompt_speech_feat, prompt_speech_feat_len = codec_decoder.frontend._extract_speech_feat(prompt_speech_22050) | |
| elif model_config.cosyvoice_version==2: | |
| prompt_speech_24000 = torchaudio.transforms.Resample(orig_freq=16000, new_freq=24000)(prompt_speech_16k) | |
| prompt_speech_feat, prompt_speech_feat_len = codec_decoder.frontend._extract_speech_feat(prompt_speech_24000) | |
| flow_embedding = codec_decoder.frontend._extract_spk_embedding(prompt_speech_16k) | |
| # Convert tokens to audio waveform | |
| if model_config.cosyvoice_version==1: | |
| audio_hat = codec_decoder.model.token2wav( | |
| token=audio_tokens, | |
| prompt_token=flow_prompt_speech_token, | |
| prompt_feat=prompt_speech_feat, | |
| embedding=flow_embedding, | |
| uuid=this_uuid, | |
| finalize=True, | |
| speed=speed | |
| ) | |
| elif model_config.cosyvoice_version==2: | |
| audio_hat = codec_decoder.model.token2wav( | |
| token=audio_tokens, | |
| prompt_token=flow_prompt_speech_token, | |
| prompt_feat=prompt_speech_feat, | |
| embedding=flow_embedding, | |
| uuid=this_uuid, | |
| token_offset=0, | |
| finalize=True, | |
| speed=speed | |
| ) | |
| else: | |
| raise NotImplementedError | |
| return audio_hat | |
| def layershift(input_id, layer, stride=4160, shift=152000): | |
| return input_id + shift + layer * stride | |
| def simple_shift(input_id, layer, stride=4160, shift=152000): | |
| return input_id + shift |