File size: 2,165 Bytes
6840de4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
import json
import time
from subword_nmt.apply_bpe import BPE
import ctranslate2
class TranslationServer:
def clean_text(self, text):
"""Clean input text."""
return " ".join(text.strip().split())
def normalize_output(self, text):
"""Normalize translation output."""
replacements = {
"▁": " ",
"'": "'",
""": "\"",
"&": "&",
"@@": "",
}
for old, new in replacements.items():
text = text.replace(old, new)
# remove double spaces
return " ".join(text.split()).strip()
def __init__(self, model_path="id_en"):
self.model_path = model_path
self.bpe_path = f"{model_path}/bpe.model"
self.vocab_path = f"{model_path}/shared_vocabulary.json"
# Load BPE
with open(self.bpe_path, "r", encoding="utf-8") as bpe_file:
self.bpe = BPE(bpe_file)
# Load vocab
with open(self.vocab_path, "r", encoding="utf-8") as f:
self.vocab = json.load(f)
self.token_to_id = {token: i for i, token in enumerate(self.vocab)}
self.id_to_token = {i: token for i, token in enumerate(self.vocab)}
# Load CTranslate2 model
self.translator = ctranslate2.Translator(model_path)
def translate_bpe(self, text, beam_size=5):
# Tokenize with BPE
tokens = self.bpe.process_line(text).split()
# Run translation
result = self.translator.translate_batch(
[tokens],
beam_size=beam_size,
length_penalty=1.0
)
# Extract tokens (new CTranslate2 API)
output_tokens = result[0].hypotheses[0]
# Join tokens & de-BPE
output = " ".join(output_tokens)
output = self.normalize_output(output)
return output
def translate_text(self, text):
text = self.clean_text(text)
output = self.translate_bpe(text)
return output
if __name__ == "__main__":
server = TranslationServer("id_en")
text = "Saya menelepon dari kantor pajak."
print(server.translate_text(text))
|