|
|
import tensorflow as tf |
|
|
import numpy as np |
|
|
from tensorflow.keras.layers import * |
|
|
import os |
|
|
from datetime import datetime |
|
|
import json |
|
|
|
|
|
class PositionalEncoding(Layer): |
|
|
def __init__(self, position, d_model): |
|
|
super(PositionalEncoding, self).__init__() |
|
|
self.pos_encoding = self.positional_encoding(position, d_model) |
|
|
|
|
|
def get_angles(self, position, i, d_model): |
|
|
angles = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model)) |
|
|
return position * angles |
|
|
|
|
|
def positional_encoding(self, position, d_model): |
|
|
angle_rads = self.get_angles( |
|
|
position=np.arange(position)[:, np.newaxis], |
|
|
i=np.arange(d_model)[np.newaxis, :], |
|
|
d_model=d_model) |
|
|
|
|
|
sines = np.sin(angle_rads[:, 0::2]) |
|
|
cosines = np.cos(angle_rads[:, 1::2]) |
|
|
pos_encoding = np.concatenate([sines, cosines], axis=-1) |
|
|
pos_encoding = pos_encoding[np.newaxis, ...] |
|
|
return tf.cast(pos_encoding, dtype=tf.float32) |
|
|
|
|
|
def call(self, inputs): |
|
|
return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :] |
|
|
|
|
|
class MultiHeadAttention(Layer): |
|
|
def __init__(self, d_model, num_heads): |
|
|
super(MultiHeadAttention, self).__init__() |
|
|
self.num_heads = num_heads |
|
|
self.d_model = d_model |
|
|
assert d_model % self.num_heads == 0 |
|
|
self.depth = d_model // self.num_heads |
|
|
self.wq = Dense(d_model) |
|
|
self.wk = Dense(d_model) |
|
|
self.wv = Dense(d_model) |
|
|
self.dense = Dense(d_model) |
|
|
|
|
|
def split_heads(self, x, batch_size): |
|
|
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth)) |
|
|
return tf.transpose(x, perm=[0, 2, 1, 3]) |
|
|
|
|
|
def call(self, v, k, q, mask=None): |
|
|
batch_size = tf.shape(q)[0] |
|
|
q = self.wq(q) |
|
|
k = self.wk(k) |
|
|
v = self.wv(v) |
|
|
q = self.split_heads(q, batch_size) |
|
|
k = self.split_heads(k, batch_size) |
|
|
v = self.split_heads(v, batch_size) |
|
|
matmul_qk = tf.matmul(q, k, transpose_b=True) |
|
|
dk = tf.cast(tf.shape(k)[-1], tf.float32) |
|
|
scaled_attention_logits = matmul_qk / tf.math.sqrt(dk) |
|
|
if mask is not None: |
|
|
scaled_attention_logits += (mask * -1e9) |
|
|
attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1) |
|
|
output = tf.matmul(attention_weights, v) |
|
|
output = tf.transpose(output, perm=[0, 2, 1, 3]) |
|
|
concat_attention = tf.reshape(output, (batch_size, -1, self.d_model)) |
|
|
output = self.dense(concat_attention) |
|
|
return output |
|
|
|
|
|
class TransformerBlock(Layer): |
|
|
def __init__(self, d_model, num_heads, dff, rate=0.1): |
|
|
super(TransformerBlock, self).__init__() |
|
|
self.mha = MultiHeadAttention(d_model, num_heads) |
|
|
self.ffn = tf.keras.Sequential([ |
|
|
Dense(dff, activation='relu'), |
|
|
Dense(d_model) |
|
|
]) |
|
|
self.layernorm1 = LayerNormalization(epsilon=1e-6) |
|
|
self.layernorm2 = LayerNormalization(epsilon=1e-6) |
|
|
self.dropout1 = Dropout(rate) |
|
|
self.dropout2 = Dropout(rate) |
|
|
|
|
|
def call(self, x, training=False, mask=None): |
|
|
attn_output = self.mha(x, x, x, mask) |
|
|
attn_output = self.dropout1(attn_output, training=training) |
|
|
out1 = self.layernorm1(x + attn_output) |
|
|
ffn_output = self.ffn(out1) |
|
|
ffn_output = self.dropout2(ffn_output, training=training) |
|
|
out2 = self.layernorm2(out1 + ffn_output) |
|
|
return out2 |
|
|
|
|
|
class TextToSpeechTransformer(tf.keras.Model): |
|
|
def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, |
|
|
maximum_position_encoding, rate=0.1): |
|
|
super(TextToSpeechTransformer, self).__init__() |
|
|
|
|
|
self.embedding = Embedding(input_vocab_size, d_model) |
|
|
self.pos_encoding = PositionalEncoding(maximum_position_encoding, d_model) |
|
|
self.dropout = Dropout(rate) |
|
|
|
|
|
self.transformer_blocks = [ |
|
|
TransformerBlock(d_model, num_heads, dff, rate) |
|
|
for _ in range(num_layers) |
|
|
] |
|
|
|
|
|
self.final_layer = Dense(80) |
|
|
|
|
|
def call(self, x, training=False, mask=None): |
|
|
x = self.embedding(x) |
|
|
x = self.pos_encoding(x) |
|
|
x = self.dropout(x, training=training) |
|
|
|
|
|
for transformer_block in self.transformer_blocks: |
|
|
x = transformer_block(x, training=training, mask=mask) |
|
|
|
|
|
return self.final_layer(x) |
|
|
|
|
|
class TTSTrainer: |
|
|
def __init__(self, model_params, training_params): |
|
|
self.model_params = model_params |
|
|
self.training_params = training_params |
|
|
self.model = self._build_model() |
|
|
self.timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') |
|
|
self.checkpoint_dir = f"checkpoints/{self.timestamp}" |
|
|
os.makedirs(self.checkpoint_dir, exist_ok=True) |
|
|
|
|
|
def _build_model(self): |
|
|
model = TextToSpeechTransformer(**self.model_params) |
|
|
|
|
|
optimizer = tf.keras.optimizers.Adam( |
|
|
learning_rate=self.training_params['learning_rate'] |
|
|
) |
|
|
|
|
|
model.compile( |
|
|
optimizer=optimizer, |
|
|
loss=tf.keras.losses.Huber(delta=1.0), |
|
|
metrics=['mae'] |
|
|
) |
|
|
return model |
|
|
|
|
|
def _create_dataset(self, texts, mels, batch_size): |
|
|
dataset = tf.data.Dataset.from_tensor_slices((texts, mels)) |
|
|
dataset = dataset.cache() |
|
|
dataset = dataset.shuffle(10000) |
|
|
dataset = dataset.batch(batch_size) |
|
|
dataset = dataset.prefetch(tf.data.AUTOTUNE) |
|
|
return dataset |
|
|
|
|
|
def train(self, texts, mels): |
|
|
train_size = int(0.9 * len(texts)) |
|
|
train_texts, val_texts = texts[:train_size], texts[train_size:] |
|
|
train_mels, val_mels = mels[:train_size], mels[train_size:] |
|
|
|
|
|
train_dataset = self._create_dataset( |
|
|
train_texts, train_mels, self.training_params['batch_size'] |
|
|
) |
|
|
val_dataset = self._create_dataset( |
|
|
val_texts, val_mels, self.training_params['batch_size'] |
|
|
) |
|
|
|
|
|
checkpoint_path = f"{self.checkpoint_dir}/model" |
|
|
os.makedirs(checkpoint_path, exist_ok=True) |
|
|
|
|
|
callbacks = [ |
|
|
tf.keras.callbacks.ModelCheckpoint( |
|
|
filepath=checkpoint_path, |
|
|
save_weights_only=True, |
|
|
save_best_only=True, |
|
|
monitor='val_loss' |
|
|
), |
|
|
tf.keras.callbacks.EarlyStopping( |
|
|
monitor='val_loss', |
|
|
patience=5, |
|
|
restore_best_weights=True |
|
|
), |
|
|
tf.keras.callbacks.ReduceLROnPlateau( |
|
|
monitor='val_loss', |
|
|
factor=0.5, |
|
|
patience=2 |
|
|
), |
|
|
tf.keras.callbacks.TensorBoard( |
|
|
log_dir=f"{self.checkpoint_dir}/logs" |
|
|
) |
|
|
] |
|
|
|
|
|
history = self.model.fit( |
|
|
train_dataset, |
|
|
validation_data=val_dataset, |
|
|
epochs=self.training_params['epochs'], |
|
|
callbacks=callbacks |
|
|
) |
|
|
|
|
|
self._save_model_and_config() |
|
|
return history |
|
|
|
|
|
def _save_model_and_config(self): |
|
|
config = { |
|
|
'model_params': self.model_params, |
|
|
'training_params': self.training_params |
|
|
} |
|
|
|
|
|
config_path = f"{self.checkpoint_dir}/config.json" |
|
|
with open(config_path, 'w') as f: |
|
|
json.dump(config, f) |
|
|
|
|
|
weights_path = f"{self.checkpoint_dir}/model_weights" |
|
|
self.model.save_weights(weights_path) |
|
|
|
|
|
tf.saved_model.save(self.model, f"{self.checkpoint_dir}/saved_model") |
|
|
|
|
|
def load_model(self, checkpoint_dir): |
|
|
config_path = f"{checkpoint_dir}/config.json" |
|
|
with open(config_path, 'r') as f: |
|
|
config = json.load(f) |
|
|
|
|
|
self.model = self._build_model() |
|
|
weights_path = f"{checkpoint_dir}/model_weights" |
|
|
self.model.load_weights(weights_path) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
model_params = { |
|
|
'num_layers': 6, |
|
|
'd_model': 256, |
|
|
'num_heads': 8, |
|
|
'dff': 1024, |
|
|
'input_vocab_size': 1000, |
|
|
'maximum_position_encoding': 2048, |
|
|
'rate': 0.1 |
|
|
} |
|
|
|
|
|
training_params = { |
|
|
'batch_size': 32, |
|
|
'epochs': 100, |
|
|
'learning_rate': 0.001 |
|
|
} |
|
|
|
|
|
trainer = TTSTrainer(model_params, training_params) |
|
|
|
|
|
|
|
|
input_texts = np.random.randint(0, 1000, size=(1000, 100)) |
|
|
target_mels = np.random.uniform(size=(1000, 100, 80)) |
|
|
|
|
|
history = trainer.train(input_texts, target_mels) |