import json import time from subword_nmt.apply_bpe import BPE import ctranslate2 class TranslationServer: def clean_text(self, text): """Clean input text.""" return " ".join(text.strip().split()) def normalize_output(self, text): """Normalize translation output.""" replacements = { "▁": " ", "'": "'", """: "\"", "&": "&", "@@": "", } for old, new in replacements.items(): text = text.replace(old, new) # remove double spaces return " ".join(text.split()).strip() def __init__(self, model_path="id_en"): self.model_path = model_path self.bpe_path = f"{model_path}/bpe.model" self.vocab_path = f"{model_path}/shared_vocabulary.json" # Load BPE with open(self.bpe_path, "r", encoding="utf-8") as bpe_file: self.bpe = BPE(bpe_file) # Load vocab with open(self.vocab_path, "r", encoding="utf-8") as f: self.vocab = json.load(f) self.token_to_id = {token: i for i, token in enumerate(self.vocab)} self.id_to_token = {i: token for i, token in enumerate(self.vocab)} # Load CTranslate2 model self.translator = ctranslate2.Translator(model_path) def translate_bpe(self, text, beam_size=5): # Tokenize with BPE tokens = self.bpe.process_line(text).split() # Run translation result = self.translator.translate_batch( [tokens], beam_size=beam_size, length_penalty=1.0 ) # Extract tokens (new CTranslate2 API) output_tokens = result[0].hypotheses[0] # Join tokens & de-BPE output = " ".join(output_tokens) output = self.normalize_output(output) return output def translate_text(self, text): text = self.clean_text(text) output = self.translate_bpe(text) return output if __name__ == "__main__": server = TranslationServer("id_en") text = "Saya menelepon dari kantor pajak." print(server.translate_text(text))