import os os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" os.environ["CUDA_VISIBLE_DEVICES"] = "2" import argparse import json import re import time import unicodedata import urllib.error import urllib.request from typing import Dict, List, Tuple from openai import OpenAI from tqdm import tqdm DATA_PATH = "/home/mshahidul/readctrl/data/testing_data_gs/multiclinsum_gs_train_en.json" OUT_PATH = "/home/mshahidul/readctrl/data/translated_data/multiclinsum_gs_train_en2bn_gemma(0_200).json" # Tune if you hit model input limits. MAX_CHARS_PER_CHUNK = 1500 MAX_NEW_TOKENS = 512 SAVE_EVERY = 10 OPENAI_BASE_URL = os.environ.get("OPENAI_BASE_URL", "http://localhost:8081/v1") OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "no-key-required") OPENAI_MODEL = os.environ.get("OPENAI_MODEL", "translate_gemma") OPENAI_TIMEOUT_SEC = float(os.environ.get("OPENAI_TIMEOUT_SEC", "60")) VLLM_BASE_URL = os.environ.get("VLLM_BASE_URL", "http://localhost:8004/v1") JUDGE_MODEL = os.environ.get("JUDGE_MODEL", "Qwen/Qwen3-30B-A3B-Instruct-2507") JUDGE_MAX_RETRIES = 3 JUDGE_TIMEOUT_SEC = 60 JUDGE_TEMPERATURE = 0.0 _BENGALI_RANGE = (0x0980, 0x09FF) _ALLOWED_PUNCT = set(" \n\t\r.,;:!?-—()[]{}\"'`~") _ALLOWED_EN_WORDS = { w.strip().lower() for w in os.environ.get("ALLOWED_EN_WORDS", "").split(",") if w.strip() } def chunk_text(text: str, max_chars: int) -> List[str]: if len(text) <= max_chars: return [text] chunks: List[str] = [] paragraphs = [p for p in text.split("\n\n") if p.strip()] for para in paragraphs: if len(para) <= max_chars: chunks.append(para) continue sentences = [s.strip() for s in para.split(". ") if s.strip()] current = "" for sentence in sentences: sentence = sentence if sentence.endswith(".") else f"{sentence}." if not current: current = sentence continue if len(current) + 1 + len(sentence) <= max_chars: current = f"{current} {sentence}" else: chunks.append(current) current = sentence if current: chunks.append(current) return chunks def translate_text(client: OpenAI, text: str) -> str: if not text.strip(): return text chunks = chunk_text(text, MAX_CHARS_PER_CHUNK) if len(chunks) == 1: messages = [ { "role": "user", "content": ( "Translate the following text from English to Bengali:\n\n" f"{chunks[0]}" ), } ] completion = client.chat.completions.create( model=OPENAI_MODEL, messages=messages, max_tokens=MAX_NEW_TOKENS, stream=False, ) return completion.choices[0].message.content def _translate_chunk(chunk: str) -> str: messages = [ { "role": "user", "content": ( "Translate the following text from English to Bengali:\n\n" f"{chunk}" ), } ] completion = client.chat.completions.create( model=OPENAI_MODEL, messages=messages, max_tokens=MAX_NEW_TOKENS, stream=False, ) return completion.choices[0].message.content translated_chunks: List[str] = [] for chunk in chunks: translated_chunks.append(_translate_chunk(chunk)) return "\n\n".join(translated_chunks) def _strip_code_fences(text: str) -> str: text = text.strip() if text.startswith("```"): text = re.sub(r"^```[a-zA-Z]*\n?", "", text) text = re.sub(r"\n?```$", "", text) return text.strip() def _extract_json_payload(text: str) -> Dict: cleaned = _strip_code_fences(text) try: return json.loads(cleaned) except json.JSONDecodeError: match = re.search(r"\{.*\}", cleaned, flags=re.DOTALL) if match: return json.loads(match.group(0)) return {} def _contains_disallowed_chars(text: str) -> Tuple[bool, str]: # Allow common medical/tech symbols that might be marked as 'S' (Symbol) # like ±, μ, §, ©, or mathematical operators. allowed_extra_symbols = {"±", "μ", "°", "%", "+", "=", "<", ">", "/", "\\"} for ch in text: code = ord(ch) # 1. Allow Bengali Range if _BENGALI_RANGE[0] <= code <= _BENGALI_RANGE[1]: continue # 2. Allow Basic Latin (English + Punctuation) if 0x0000 <= code <= 0x007F: continue # 3. Allow specifically whitelisted symbols if ch in allowed_extra_symbols: continue category = unicodedata.category(ch) # Only fail if it's a 'Other, Not Assigned' or 'Private Use' character (junk) if category in ["Cn", "Co"]: return True, f"Corrupted character detected: {ch} (U+{code:04X})" return False, "" def _call_judge_model(source_text: str, translated_text: str) -> Dict: url = f"{VLLM_BASE_URL}/chat/completions" prompt = ( "You are a strict judge for Bengali translations. " "Return JSON only with keys ok (true/false) and reason. " "Check if the Bengali translation contains any non-Bengali, " "non-English letters, or strange symbols. " "Allow Bengali punctuation, Bengali digits, and common punctuation. " "English words and keywords are allowed. " "Minor punctuation differences are acceptable." "Allow common medical/tech symbols that might be marked as 'S' (Symbol) like ±, μ, §, ©, or mathematical operators." "If any issue exists, ok must be false.\n\n" f"English:\n{source_text}\n\nBengali:\n{translated_text}" ) payload = { "model": JUDGE_MODEL, "messages": [ {"role": "system", "content": "Respond with JSON only."}, {"role": "user", "content": prompt}, ], "temperature": JUDGE_TEMPERATURE, "max_tokens": 256, } data = json.dumps(payload).encode("utf-8") req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(req, timeout=JUDGE_TIMEOUT_SEC) as resp: response_json = json.loads(resp.read().decode("utf-8")) content = response_json["choices"][0]["message"]["content"] return _extract_json_payload(content) def _judge_translation(source_text: str, translated_text: str) -> Tuple[bool, str]: if not translated_text.strip(): return False, "Empty translation" try: response = _call_judge_model(source_text, translated_text) ok = bool(response.get("ok", False)) reason = str(response.get("reason", "")) except (urllib.error.URLError, json.JSONDecodeError, KeyError, TimeoutError) as exc: ok = False reason = f"Judge call failed: {exc}" disallowed, disallowed_reason = _contains_disallowed_chars(translated_text) if disallowed: return False, disallowed_reason if not ok: return False, reason or "Judge rejected translation" return True, "" def translate_with_judge( client: OpenAI, source_text: str, field_name: str, record_id: str ) -> str: if not source_text.strip(): return source_text for attempt in range(1, JUDGE_MAX_RETRIES + 1): translated = translate_text(client, source_text) ok, reason = _judge_translation(source_text, translated) if ok: return translated print( f"[Judge] id={record_id} field={field_name} attempt={attempt} failed: {reason}" ) time.sleep(1) print( f"[Judge] id={record_id} field={field_name} failed after " f"{JUDGE_MAX_RETRIES} attempts. Leaving empty for re-translation." ) return "" def load_json(path: str) -> List[Dict]: with open(path, "r", encoding="utf-8") as f: return json.load(f) def save_json(path: str, data: List[Dict]) -> None: os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Translate MultiClinSum EN to BN." ) parser.add_argument( "--limit", type=int, default=200, help="Only translate the first N instances.", ) return parser.parse_args() def main() -> None: args = parse_args() data = load_json(DATA_PATH) if args.limit is not None: data = data[: args.limit] existing: Dict[str, Dict] = {} existing_list: List[Dict] = [] resume_index = 0 if os.path.exists(OUT_PATH): existing_list = load_json(OUT_PATH) for item in existing_list: existing[item["id"]] = item if existing_list: prefix_ids = [item.get("id") for item in existing_list] data_prefix_ids = [item.get("id") for item in data[: len(prefix_ids)]] if prefix_ids == data_prefix_ids: resume_index = len(existing_list) client = OpenAI( base_url=OPENAI_BASE_URL, api_key=OPENAI_API_KEY, timeout=OPENAI_TIMEOUT_SEC, ) translated: List[Dict] = existing_list.copy() for idx, item in enumerate( tqdm(data[resume_index:], desc="Translating", unit="record"), start=resume_index + 1, ): if item["id"] in existing: translated.append(existing[item["id"]]) else: record_id = str(item.get("id", "")) fulltext_bn = translate_with_judge( client, item.get("fulltext", ""), "fulltext", record_id ) summary_bn = translate_with_judge( client, item.get("summary", ""), "summary", record_id ) translated.append( { "id": item.get("id"), "fulltext_en": item.get("fulltext", ""), "summary_en": item.get("summary", ""), "fulltext_bn": fulltext_bn, "summary_bn": summary_bn, } ) if idx % SAVE_EVERY == 0: save_json(OUT_PATH, translated) print(f"Saved {idx}/{len(data)} records to {OUT_PATH}") save_json(OUT_PATH, translated) print(f"Done. Saved {len(translated)} records to {OUT_PATH}") if __name__ == "__main__": main()