Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from huggingface_hub import hf_hub_download | |
| from tokenizers import Tokenizer | |
| from tensorflow.keras.models import load_model | |
| from tensorflow.keras.preprocessing.sequence import pad_sequences | |
| import numpy as np | |
| import tensorflow as tf | |
| # Load từ Hugging Face model repo | |
| repo_id = "AnTrc2/khmer_to_vi" | |
| model_path = hf_hub_download(repo_id=repo_id, filename="best_model_fix.keras") | |
| src_tok_path = hf_hub_download(repo_id=repo_id, filename="khmer_tokenizer_hehe.json") | |
| tgt_tok_path = hf_hub_download(repo_id=repo_id, filename="vi_tokenizer_hehe.json") | |
| def positional_encoding(length, depth): | |
| depth = depth/2 | |
| positions = np.arange(length)[:, np.newaxis] # (seq, 1) | |
| depths = np.arange(depth)[np.newaxis, :]/depth # (1, depth) | |
| angle_rates = 1 / (10000**depths) # (1, depth) | |
| angle_rads = positions * angle_rates # (pos, depth) | |
| pos_encoding = np.concatenate( | |
| [np.sin(angle_rads), np.cos(angle_rads)], | |
| axis=-1) | |
| return tf.cast(pos_encoding, dtype=tf.float32) | |
| from tensorflow.keras.saving import register_keras_serializable | |
| class PositionalEmbedding(tf.keras.layers.Layer): | |
| def __init__(self, vocab_size, d_model, **kwargs): | |
| super().__init__(**kwargs) | |
| self.d_model = d_model | |
| self.vocab_size = vocab_size | |
| self.embedding = tf.keras.layers.Embedding(vocab_size, d_model, mask_zero=True) | |
| self.pos_encoding = positional_encoding(length=2048, depth=d_model) | |
| def compute_mask(self, *args, **kwargs): | |
| return self.embedding.compute_mask(*args, **kwargs) | |
| def call(self, x): | |
| length = tf.shape(x)[1] | |
| x = self.embedding(x) | |
| x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32)) | |
| x = x + self.pos_encoding[tf.newaxis, :length, :] | |
| return x | |
| def get_config(self): | |
| config = super().get_config() | |
| config.update({ | |
| "vocab_size": self.vocab_size, | |
| "d_model": self.d_model | |
| }) | |
| return config | |
| class BaseAttention(tf.keras.layers.Layer): | |
| def __init__(self, **kwargs): | |
| super().__init__() | |
| self.mha = tf.keras.layers.MultiHeadAttention(**kwargs) | |
| self.layernorm = tf.keras.layers.LayerNormalization() | |
| self.add = tf.keras.layers.Add() | |
| self.mha_kwargs = kwargs | |
| def get_config(self): | |
| config = super().get_config() | |
| config.update(self.mha_kwargs) | |
| return config | |
| class CrossAttention(BaseAttention): | |
| def call(self, x, context): | |
| attn_output, attn_scores = self.mha( | |
| query=x, | |
| key=context, | |
| value=context, | |
| return_attention_scores=True) | |
| self.last_attn_scores = attn_scores | |
| x = self.add([x, attn_output]) | |
| x = self.layernorm(x) | |
| return x | |
| class GlobalSelfAttention(BaseAttention): | |
| def call(self, x): | |
| attn_output = self.mha( | |
| query=x, | |
| value=x, | |
| key=x) | |
| x = self.add([x, attn_output]) | |
| x = self.layernorm(x) | |
| return x | |
| class CausalSelfAttention(BaseAttention): | |
| def call(self, x): | |
| attn_output = self.mha( | |
| query=x, | |
| value=x, | |
| key=x, | |
| use_causal_mask=True) | |
| x = self.add([x, attn_output]) | |
| x = self.layernorm(x) | |
| return x | |
| class FeedForward(tf.keras.layers.Layer): | |
| def __init__(self, d_model, dff, dropout_rate=0.1, **kwargs): | |
| super().__init__(**kwargs) | |
| self.d_model = d_model | |
| self.dff = dff | |
| self.dropout_rate = dropout_rate | |
| self.seq = tf.keras.Sequential([ | |
| tf.keras.layers.Dense(dff, activation='relu'), | |
| tf.keras.layers.Dense(d_model), | |
| tf.keras.layers.Dropout(dropout_rate) | |
| ]) | |
| self.add = tf.keras.layers.Add() | |
| self.layer_norm = tf.keras.layers.LayerNormalization() | |
| def call(self, x): | |
| x = self.add([x, self.seq(x)]) | |
| x = self.layer_norm(x) | |
| return x | |
| def get_config(self): | |
| config = super().get_config() | |
| config.update({ | |
| "d_model": self.d_model, | |
| "dff": self.dff, | |
| "dropout_rate": self.dropout_rate | |
| }) | |
| return config | |
| class EncoderLayer(tf.keras.layers.Layer): | |
| def __init__(self, *, d_model, num_heads, dff, dropout_rate=0.1, **kwargs): | |
| super().__init__(**kwargs) | |
| self.d_model = d_model | |
| self.num_heads = num_heads | |
| self.dff = dff | |
| self.dropout_rate = dropout_rate | |
| self.self_attention = GlobalSelfAttention( | |
| num_heads=num_heads, | |
| key_dim=d_model, | |
| dropout=dropout_rate) | |
| self.ffn = FeedForward(d_model, dff) | |
| def call(self, x): | |
| x = self.self_attention(x) | |
| x = self.ffn(x) | |
| return x | |
| def get_config(self): | |
| config = super().get_config() | |
| config.update({ | |
| "d_model": self.d_model, | |
| "num_heads": self.num_heads, | |
| "dff": self.dff, | |
| "dropout_rate": self.dropout_rate | |
| }) | |
| return config | |
| class Encoder(tf.keras.layers.Layer): | |
| def __init__(self, *, num_layers, d_model, num_heads, dff, vocab_size, dropout_rate=0.1, **kwargs): | |
| super().__init__(**kwargs) | |
| self.num_layers = num_layers | |
| self.d_model = d_model | |
| self.num_heads = num_heads | |
| self.dff = dff | |
| self.vocab_size = vocab_size | |
| self.dropout_rate = dropout_rate | |
| self.pos_embedding = PositionalEmbedding(vocab_size=vocab_size, d_model=d_model) | |
| self.enc_layers = [ | |
| EncoderLayer(d_model=d_model, | |
| num_heads=num_heads, | |
| dff=dff, | |
| dropout_rate=dropout_rate) | |
| for _ in range(num_layers)] | |
| self.dropout = tf.keras.layers.Dropout(dropout_rate) | |
| def call(self, x): | |
| x = self.pos_embedding(x) | |
| x = self.dropout(x) | |
| for i in range(self.num_layers): | |
| x = self.enc_layers[i](x) | |
| return x | |
| def get_config(self): | |
| config = super().get_config() | |
| config.update({ | |
| "num_layers": self.num_layers, | |
| "d_model": self.d_model, | |
| "num_heads": self.num_heads, | |
| "dff": self.dff, | |
| "vocab_size": self.vocab_size, | |
| "dropout_rate": self.dropout_rate | |
| }) | |
| return config | |
| class DecoderLayer(tf.keras.layers.Layer): | |
| def __init__(self, *, d_model, num_heads, dff, dropout_rate=0.1, **kwargs): | |
| super().__init__(**kwargs) | |
| self.d_model = d_model | |
| self.num_heads = num_heads | |
| self.dff = dff | |
| self.dropout_rate = dropout_rate | |
| self.causal_self_attention = CausalSelfAttention( | |
| num_heads=num_heads, | |
| key_dim=d_model, | |
| dropout=dropout_rate) | |
| self.cross_attention = CrossAttention( | |
| num_heads=num_heads, | |
| key_dim=d_model, | |
| dropout=dropout_rate) | |
| self.ffn = FeedForward(d_model, dff) | |
| def call(self, x, context): | |
| x = self.causal_self_attention(x=x) | |
| x = self.cross_attention(x=x, context=context) | |
| self.last_attn_scores = self.cross_attention.last_attn_scores | |
| x = self.ffn(x) | |
| return x | |
| def get_config(self): | |
| config = super().get_config() | |
| config.update({ | |
| "d_model": self.d_model, | |
| "num_heads": self.num_heads, | |
| "dff": self.dff, | |
| "dropout_rate": self.dropout_rate | |
| }) | |
| return config | |
| class Decoder(tf.keras.layers.Layer): | |
| def __init__(self, *, num_layers, d_model, num_heads, dff, vocab_size, dropout_rate=0.1, **kwargs): | |
| super().__init__(**kwargs) | |
| self.num_layers = num_layers | |
| self.d_model = d_model | |
| self.num_heads = num_heads | |
| self.dff = dff | |
| self.vocab_size = vocab_size | |
| self.dropout_rate = dropout_rate | |
| self.pos_embedding = PositionalEmbedding(vocab_size=vocab_size, d_model=d_model) | |
| self.dropout = tf.keras.layers.Dropout(dropout_rate) | |
| self.dec_layers = [ | |
| DecoderLayer(d_model=d_model, num_heads=num_heads, dff=dff, dropout_rate=dropout_rate) | |
| for _ in range(num_layers)] | |
| self.last_attn_scores = None | |
| def call(self, x, context): | |
| x = self.pos_embedding(x) | |
| x = self.dropout(x) | |
| for i in range(self.num_layers): | |
| x = self.dec_layers[i](x, context) | |
| self.last_attn_scores = self.dec_layers[-1].last_attn_scores | |
| return x | |
| def get_config(self): | |
| config = super().get_config() | |
| config.update({ | |
| "num_layers": self.num_layers, | |
| "d_model": self.d_model, | |
| "num_heads": self.num_heads, | |
| "dff": self.dff, | |
| "vocab_size": self.vocab_size, | |
| "dropout_rate": self.dropout_rate | |
| }) | |
| return config | |
| class Transformer(tf.keras.Model): | |
| def __init__(self, *, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, dropout_rate=0.1, **kwargs): | |
| super().__init__(**kwargs) | |
| self.num_layers = num_layers | |
| self.d_model = d_model | |
| self.num_heads = num_heads | |
| self.dff = dff | |
| self.input_vocab_size = input_vocab_size | |
| self.target_vocab_size = target_vocab_size | |
| self.dropout_rate = dropout_rate | |
| self.encoder = Encoder(num_layers=num_layers, d_model=d_model, num_heads=num_heads, dff=dff, | |
| vocab_size=input_vocab_size, dropout_rate=dropout_rate) | |
| self.decoder = Decoder(num_layers=num_layers, d_model=d_model, num_heads=num_heads, dff=dff, | |
| vocab_size=target_vocab_size, dropout_rate=dropout_rate) | |
| self.final_layer = tf.keras.layers.Dense(target_vocab_size) | |
| def call(self, inputs): | |
| context, x = inputs | |
| context = self.encoder(context) | |
| x = self.decoder(x, context) | |
| logits = self.final_layer(x) | |
| try: | |
| del logits._keras_mask | |
| except AttributeError: | |
| pass | |
| return logits | |
| def get_config(self): | |
| config = super().get_config() | |
| config.update({ | |
| "num_layers": self.num_layers, | |
| "d_model": self.d_model, | |
| "num_heads": self.num_heads, | |
| "dff": self.dff, | |
| "input_vocab_size": self.input_vocab_size, | |
| "target_vocab_size": self.target_vocab_size, | |
| "dropout_rate": self.dropout_rate | |
| }) | |
| return config | |
| def masked_loss(label, pred): | |
| mask = label != 0 | |
| loss_object = tf.keras.losses.SparseCategoricalCrossentropy( | |
| from_logits=True, reduction='none') | |
| loss = loss_object(label, pred) | |
| mask = tf.cast(mask, dtype=loss.dtype) | |
| loss *= mask | |
| loss = tf.reduce_sum(loss)/tf.reduce_sum(mask) | |
| return loss | |
| def masked_accuracy(label, pred): | |
| pred = tf.argmax(pred, axis=2) | |
| label = tf.cast(label, pred.dtype) | |
| match = label == pred | |
| mask = label != 0 | |
| match = match & mask | |
| match = tf.cast(match, dtype=tf.float32) | |
| mask = tf.cast(mask, dtype=tf.float32) | |
| return tf.reduce_sum(match)/tf.reduce_sum(mask) | |
| # Load mô hình và tokenizer | |
| model = load_model(model_path, custom_objects={ | |
| 'PositionalEmbedding':PositionalEmbedding , | |
| 'BaseAttention': BaseAttention, | |
| 'CrossAttention': CrossAttention, | |
| 'GlobalSelfAttention': GlobalSelfAttention, | |
| 'CausalSelfAttention': CausalSelfAttention, | |
| 'FeedForward': FeedForward, | |
| 'EncoderLayer': EncoderLayer, | |
| 'DecoderLayer': DecoderLayer, | |
| 'Encoder': Encoder, | |
| 'Decoder': Decoder, | |
| 'Transformer': Transformer, | |
| 'masked_loss': masked_loss, | |
| 'masked_accuracy': masked_accuracy | |
| },compile=False) | |
| src_tokenizer = Tokenizer.from_file(src_tok_path) | |
| tgt_tokenizer = Tokenizer.from_file(tgt_tok_path) | |
| # Độ dài tối đa | |
| max_len_src= 274 | |
| max_len_tgt= 268 - 1 | |
| def translate_sentence_stream(sentence): | |
| if not sentence.strip(): | |
| yield "Vui lòng nhập câu tiếng Khmer cần dịch." | |
| return | |
| sentence_ids = src_tokenizer.encode(sentence).ids | |
| encoder_input = pad_sequences([sentence_ids], maxlen=max_len_src, padding='post') | |
| start_token = tgt_tokenizer.token_to_id("<s>") | |
| end_token = tgt_tokenizer.token_to_id("</s>") | |
| decoder_input = [start_token] | |
| result_ids = [] | |
| translated_text = "" | |
| for _ in range(max_len_tgt): | |
| decoder_input_padded = pad_sequences([decoder_input], maxlen=max_len_tgt, padding='post') | |
| predictions = model.predict([encoder_input, decoder_input_padded], verbose=0) | |
| predicted_id = np.argmax(predictions[0, len(decoder_input)-1]) | |
| if predicted_id == end_token: | |
| break | |
| result_ids.append(predicted_id) | |
| # Dịch tạm thời từng bước | |
| translated_text = tgt_tokenizer.decode(result_ids) | |
| yield translated_text # mỗi lần sinh ra từ mới thì trả về luôn lên UI | |
| decoder_input.append(predicted_id) | |
| # Tạo giao diện Gradio | |
| iface = gr.Interface( | |
| fn=translate_sentence_stream, | |
| inputs=gr.Textbox(lines=4, label="Nhập câu tiếng Khmer"), | |
| outputs=gr.Textbox(label="Dịch sang tiếng Việt"), | |
| title="Khmer → Vietnamese Translator (Streaming)", | |
| description="Dịch đến đâu, trả ra màn hình đến đó.", | |
| live=False # Cho phép input phản hồi theo thời gian thực | |
| ) | |
| iface.launch() | |