khmer_to_vi / app.py
AnTrc2's picture
change translate model
9242889 verified
import gradio as gr
from huggingface_hub import hf_hub_download
from tokenizers import Tokenizer
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing.sequence import pad_sequences
import numpy as np
import tensorflow as tf
# Load từ Hugging Face model repo
repo_id = "AnTrc2/khmer_to_vi"
model_path = hf_hub_download(repo_id=repo_id, filename="best_model_fix.keras")
src_tok_path = hf_hub_download(repo_id=repo_id, filename="khmer_tokenizer_hehe.json")
tgt_tok_path = hf_hub_download(repo_id=repo_id, filename="vi_tokenizer_hehe.json")
def positional_encoding(length, depth):
depth = depth/2
positions = np.arange(length)[:, np.newaxis] # (seq, 1)
depths = np.arange(depth)[np.newaxis, :]/depth # (1, depth)
angle_rates = 1 / (10000**depths) # (1, depth)
angle_rads = positions * angle_rates # (pos, depth)
pos_encoding = np.concatenate(
[np.sin(angle_rads), np.cos(angle_rads)],
axis=-1)
return tf.cast(pos_encoding, dtype=tf.float32)
from tensorflow.keras.saving import register_keras_serializable
@register_keras_serializable(package='custom_layers')
class PositionalEmbedding(tf.keras.layers.Layer):
def __init__(self, vocab_size, d_model, **kwargs):
super().__init__(**kwargs)
self.d_model = d_model
self.vocab_size = vocab_size
self.embedding = tf.keras.layers.Embedding(vocab_size, d_model, mask_zero=True)
self.pos_encoding = positional_encoding(length=2048, depth=d_model)
def compute_mask(self, *args, **kwargs):
return self.embedding.compute_mask(*args, **kwargs)
def call(self, x):
length = tf.shape(x)[1]
x = self.embedding(x)
x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
x = x + self.pos_encoding[tf.newaxis, :length, :]
return x
def get_config(self):
config = super().get_config()
config.update({
"vocab_size": self.vocab_size,
"d_model": self.d_model
})
return config
@register_keras_serializable(package='custom_layers')
class BaseAttention(tf.keras.layers.Layer):
def __init__(self, **kwargs):
super().__init__()
self.mha = tf.keras.layers.MultiHeadAttention(**kwargs)
self.layernorm = tf.keras.layers.LayerNormalization()
self.add = tf.keras.layers.Add()
self.mha_kwargs = kwargs
def get_config(self):
config = super().get_config()
config.update(self.mha_kwargs)
return config
@register_keras_serializable(package='custom_layers')
class CrossAttention(BaseAttention):
def call(self, x, context):
attn_output, attn_scores = self.mha(
query=x,
key=context,
value=context,
return_attention_scores=True)
self.last_attn_scores = attn_scores
x = self.add([x, attn_output])
x = self.layernorm(x)
return x
@register_keras_serializable(package='custom_layers')
class GlobalSelfAttention(BaseAttention):
def call(self, x):
attn_output = self.mha(
query=x,
value=x,
key=x)
x = self.add([x, attn_output])
x = self.layernorm(x)
return x
@register_keras_serializable(package='custom_layers')
class CausalSelfAttention(BaseAttention):
def call(self, x):
attn_output = self.mha(
query=x,
value=x,
key=x,
use_causal_mask=True)
x = self.add([x, attn_output])
x = self.layernorm(x)
return x
@register_keras_serializable(package='custom_layers')
class FeedForward(tf.keras.layers.Layer):
def __init__(self, d_model, dff, dropout_rate=0.1, **kwargs):
super().__init__(**kwargs)
self.d_model = d_model
self.dff = dff
self.dropout_rate = dropout_rate
self.seq = tf.keras.Sequential([
tf.keras.layers.Dense(dff, activation='relu'),
tf.keras.layers.Dense(d_model),
tf.keras.layers.Dropout(dropout_rate)
])
self.add = tf.keras.layers.Add()
self.layer_norm = tf.keras.layers.LayerNormalization()
def call(self, x):
x = self.add([x, self.seq(x)])
x = self.layer_norm(x)
return x
def get_config(self):
config = super().get_config()
config.update({
"d_model": self.d_model,
"dff": self.dff,
"dropout_rate": self.dropout_rate
})
return config
@register_keras_serializable(package='custom_layers')
class EncoderLayer(tf.keras.layers.Layer):
def __init__(self, *, d_model, num_heads, dff, dropout_rate=0.1, **kwargs):
super().__init__(**kwargs)
self.d_model = d_model
self.num_heads = num_heads
self.dff = dff
self.dropout_rate = dropout_rate
self.self_attention = GlobalSelfAttention(
num_heads=num_heads,
key_dim=d_model,
dropout=dropout_rate)
self.ffn = FeedForward(d_model, dff)
def call(self, x):
x = self.self_attention(x)
x = self.ffn(x)
return x
def get_config(self):
config = super().get_config()
config.update({
"d_model": self.d_model,
"num_heads": self.num_heads,
"dff": self.dff,
"dropout_rate": self.dropout_rate
})
return config
@register_keras_serializable(package='custom_layers')
class Encoder(tf.keras.layers.Layer):
def __init__(self, *, num_layers, d_model, num_heads, dff, vocab_size, dropout_rate=0.1, **kwargs):
super().__init__(**kwargs)
self.num_layers = num_layers
self.d_model = d_model
self.num_heads = num_heads
self.dff = dff
self.vocab_size = vocab_size
self.dropout_rate = dropout_rate
self.pos_embedding = PositionalEmbedding(vocab_size=vocab_size, d_model=d_model)
self.enc_layers = [
EncoderLayer(d_model=d_model,
num_heads=num_heads,
dff=dff,
dropout_rate=dropout_rate)
for _ in range(num_layers)]
self.dropout = tf.keras.layers.Dropout(dropout_rate)
def call(self, x):
x = self.pos_embedding(x)
x = self.dropout(x)
for i in range(self.num_layers):
x = self.enc_layers[i](x)
return x
def get_config(self):
config = super().get_config()
config.update({
"num_layers": self.num_layers,
"d_model": self.d_model,
"num_heads": self.num_heads,
"dff": self.dff,
"vocab_size": self.vocab_size,
"dropout_rate": self.dropout_rate
})
return config
@register_keras_serializable(package='custom_layers')
class DecoderLayer(tf.keras.layers.Layer):
def __init__(self, *, d_model, num_heads, dff, dropout_rate=0.1, **kwargs):
super().__init__(**kwargs)
self.d_model = d_model
self.num_heads = num_heads
self.dff = dff
self.dropout_rate = dropout_rate
self.causal_self_attention = CausalSelfAttention(
num_heads=num_heads,
key_dim=d_model,
dropout=dropout_rate)
self.cross_attention = CrossAttention(
num_heads=num_heads,
key_dim=d_model,
dropout=dropout_rate)
self.ffn = FeedForward(d_model, dff)
def call(self, x, context):
x = self.causal_self_attention(x=x)
x = self.cross_attention(x=x, context=context)
self.last_attn_scores = self.cross_attention.last_attn_scores
x = self.ffn(x)
return x
def get_config(self):
config = super().get_config()
config.update({
"d_model": self.d_model,
"num_heads": self.num_heads,
"dff": self.dff,
"dropout_rate": self.dropout_rate
})
return config
@register_keras_serializable(package='custom_layers')
class Decoder(tf.keras.layers.Layer):
def __init__(self, *, num_layers, d_model, num_heads, dff, vocab_size, dropout_rate=0.1, **kwargs):
super().__init__(**kwargs)
self.num_layers = num_layers
self.d_model = d_model
self.num_heads = num_heads
self.dff = dff
self.vocab_size = vocab_size
self.dropout_rate = dropout_rate
self.pos_embedding = PositionalEmbedding(vocab_size=vocab_size, d_model=d_model)
self.dropout = tf.keras.layers.Dropout(dropout_rate)
self.dec_layers = [
DecoderLayer(d_model=d_model, num_heads=num_heads, dff=dff, dropout_rate=dropout_rate)
for _ in range(num_layers)]
self.last_attn_scores = None
def call(self, x, context):
x = self.pos_embedding(x)
x = self.dropout(x)
for i in range(self.num_layers):
x = self.dec_layers[i](x, context)
self.last_attn_scores = self.dec_layers[-1].last_attn_scores
return x
def get_config(self):
config = super().get_config()
config.update({
"num_layers": self.num_layers,
"d_model": self.d_model,
"num_heads": self.num_heads,
"dff": self.dff,
"vocab_size": self.vocab_size,
"dropout_rate": self.dropout_rate
})
return config
@register_keras_serializable(package='custom_layers')
class Transformer(tf.keras.Model):
def __init__(self, *, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, dropout_rate=0.1, **kwargs):
super().__init__(**kwargs)
self.num_layers = num_layers
self.d_model = d_model
self.num_heads = num_heads
self.dff = dff
self.input_vocab_size = input_vocab_size
self.target_vocab_size = target_vocab_size
self.dropout_rate = dropout_rate
self.encoder = Encoder(num_layers=num_layers, d_model=d_model, num_heads=num_heads, dff=dff,
vocab_size=input_vocab_size, dropout_rate=dropout_rate)
self.decoder = Decoder(num_layers=num_layers, d_model=d_model, num_heads=num_heads, dff=dff,
vocab_size=target_vocab_size, dropout_rate=dropout_rate)
self.final_layer = tf.keras.layers.Dense(target_vocab_size)
def call(self, inputs):
context, x = inputs
context = self.encoder(context)
x = self.decoder(x, context)
logits = self.final_layer(x)
try:
del logits._keras_mask
except AttributeError:
pass
return logits
def get_config(self):
config = super().get_config()
config.update({
"num_layers": self.num_layers,
"d_model": self.d_model,
"num_heads": self.num_heads,
"dff": self.dff,
"input_vocab_size": self.input_vocab_size,
"target_vocab_size": self.target_vocab_size,
"dropout_rate": self.dropout_rate
})
return config
def masked_loss(label, pred):
mask = label != 0
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction='none')
loss = loss_object(label, pred)
mask = tf.cast(mask, dtype=loss.dtype)
loss *= mask
loss = tf.reduce_sum(loss)/tf.reduce_sum(mask)
return loss
def masked_accuracy(label, pred):
pred = tf.argmax(pred, axis=2)
label = tf.cast(label, pred.dtype)
match = label == pred
mask = label != 0
match = match & mask
match = tf.cast(match, dtype=tf.float32)
mask = tf.cast(mask, dtype=tf.float32)
return tf.reduce_sum(match)/tf.reduce_sum(mask)
# Load mô hình và tokenizer
model = load_model(model_path, custom_objects={
'PositionalEmbedding':PositionalEmbedding ,
'BaseAttention': BaseAttention,
'CrossAttention': CrossAttention,
'GlobalSelfAttention': GlobalSelfAttention,
'CausalSelfAttention': CausalSelfAttention,
'FeedForward': FeedForward,
'EncoderLayer': EncoderLayer,
'DecoderLayer': DecoderLayer,
'Encoder': Encoder,
'Decoder': Decoder,
'Transformer': Transformer,
'masked_loss': masked_loss,
'masked_accuracy': masked_accuracy
},compile=False)
src_tokenizer = Tokenizer.from_file(src_tok_path)
tgt_tokenizer = Tokenizer.from_file(tgt_tok_path)
# Độ dài tối đa
max_len_src= 274
max_len_tgt= 268 - 1
def translate_sentence_stream(sentence):
if not sentence.strip():
yield "Vui lòng nhập câu tiếng Khmer cần dịch."
return
sentence_ids = src_tokenizer.encode(sentence).ids
encoder_input = pad_sequences([sentence_ids], maxlen=max_len_src, padding='post')
start_token = tgt_tokenizer.token_to_id("<s>")
end_token = tgt_tokenizer.token_to_id("</s>")
decoder_input = [start_token]
result_ids = []
translated_text = ""
for _ in range(max_len_tgt):
decoder_input_padded = pad_sequences([decoder_input], maxlen=max_len_tgt, padding='post')
predictions = model.predict([encoder_input, decoder_input_padded], verbose=0)
predicted_id = np.argmax(predictions[0, len(decoder_input)-1])
if predicted_id == end_token:
break
result_ids.append(predicted_id)
# Dịch tạm thời từng bước
translated_text = tgt_tokenizer.decode(result_ids)
yield translated_text # mỗi lần sinh ra từ mới thì trả về luôn lên UI
decoder_input.append(predicted_id)
# Tạo giao diện Gradio
iface = gr.Interface(
fn=translate_sentence_stream,
inputs=gr.Textbox(lines=4, label="Nhập câu tiếng Khmer"),
outputs=gr.Textbox(label="Dịch sang tiếng Việt"),
title="Khmer → Vietnamese Translator (Streaming)",
description="Dịch đến đâu, trả ra màn hình đến đó.",
live=False # Cho phép input phản hồi theo thời gian thực
)
iface.launch()