| import os |
| os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" |
| os.environ["CUDA_VISIBLE_DEVICES"] = "2" |
|
|
| import argparse |
| import json |
| import re |
| import time |
| import unicodedata |
| import urllib.error |
| import urllib.request |
| from typing import Dict, List, Tuple |
|
|
| import torch |
| from tqdm import tqdm |
| from transformers import pipeline |
|
|
|
|
| DATA_PATH = "/home/mshahidul/readctrl/data/testing_data_gs/multiclinsum_gs_train_en.json" |
| OUT_PATH = "/home/mshahidul/readctrl/data/translated_data/multiclinsum_gs_train_en2bn(0_200).json" |
|
|
| SOURCE_LANG = "en" |
| TARGET_LANG = "bn" |
|
|
| |
| MAX_CHARS_PER_CHUNK = 1500 |
| MAX_NEW_TOKENS = 512 |
| SAVE_EVERY = 10 |
| BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "16")) |
|
|
| VLLM_BASE_URL = os.environ.get("VLLM_BASE_URL", "http://localhost:8004/v1") |
| JUDGE_MODEL = os.environ.get("JUDGE_MODEL", "Qwen/Qwen3-30B-A3B-Instruct-2507") |
| JUDGE_MAX_RETRIES = 3 |
| JUDGE_TIMEOUT_SEC = 60 |
| JUDGE_TEMPERATURE = 0.0 |
|
|
| _BENGALI_RANGE = (0x0980, 0x09FF) |
| _ALLOWED_PUNCT = set(" \n\t\r.,;:!?-—()[]{}\"'`~") |
| _ALLOWED_EN_WORDS = { |
| w.strip().lower() |
| for w in os.environ.get("ALLOWED_EN_WORDS", "").split(",") |
| if w.strip() |
| } |
|
|
|
|
| def chunk_text(text: str, max_chars: int) -> List[str]: |
| if len(text) <= max_chars: |
| return [text] |
|
|
| chunks: List[str] = [] |
| paragraphs = [p for p in text.split("\n\n") if p.strip()] |
| for para in paragraphs: |
| if len(para) <= max_chars: |
| chunks.append(para) |
| continue |
|
|
| sentences = [s.strip() for s in para.split(". ") if s.strip()] |
| current = "" |
| for sentence in sentences: |
| sentence = sentence if sentence.endswith(".") else f"{sentence}." |
| if not current: |
| current = sentence |
| continue |
|
|
| if len(current) + 1 + len(sentence) <= max_chars: |
| current = f"{current} {sentence}" |
| else: |
| chunks.append(current) |
| current = sentence |
|
|
| if current: |
| chunks.append(current) |
|
|
| return chunks |
|
|
|
|
| def translate_text(pipe, text: str) -> str: |
| if not text.strip(): |
| return text |
|
|
| chunks = chunk_text(text, MAX_CHARS_PER_CHUNK) |
| translated_chunks: List[str] = [] |
| messages_list = [] |
| for chunk in chunks: |
| messages_list.append( |
| [ |
| { |
| "role": "user", |
| "content": [ |
| { |
| "type": "text", |
| "source_lang_code": SOURCE_LANG, |
| "target_lang_code": TARGET_LANG, |
| "text": chunk, |
| } |
| ], |
| } |
| ] |
| ) |
|
|
| for start in range(0, len(messages_list), BATCH_SIZE): |
| batch = messages_list[start : start + BATCH_SIZE] |
| outputs = pipe( |
| text=batch, |
| max_new_tokens=MAX_NEW_TOKENS, |
| batch_size=BATCH_SIZE, |
| ) |
| for output in outputs: |
| if isinstance(output, list): |
| output = output[0] |
| translated_chunks.append(output["generated_text"][-1]["content"]) |
|
|
| return "\n\n".join(translated_chunks) |
|
|
|
|
| def _strip_code_fences(text: str) -> str: |
| text = text.strip() |
| if text.startswith("```"): |
| text = re.sub(r"^```[a-zA-Z]*\n?", "", text) |
| text = re.sub(r"\n?```$", "", text) |
| return text.strip() |
|
|
|
|
| def _extract_json_payload(text: str) -> Dict: |
| cleaned = _strip_code_fences(text) |
| try: |
| return json.loads(cleaned) |
| except json.JSONDecodeError: |
| match = re.search(r"\{.*\}", cleaned, flags=re.DOTALL) |
| if match: |
| return json.loads(match.group(0)) |
| return {} |
|
|
|
|
| def _contains_disallowed_chars(text: str) -> Tuple[bool, str]: |
| if _ALLOWED_EN_WORDS: |
| normalized = re.sub(r"[^\w\s]", " ", text.lower()) |
| for token in normalized.split(): |
| if token.isalpha() and token in _ALLOWED_EN_WORDS: |
| text = re.sub(rf"\b{re.escape(token)}\b", "", text, flags=re.IGNORECASE) |
|
|
| for ch in text: |
| if ch.isalpha(): |
| code = ord(ch) |
| if _BENGALI_RANGE[0] <= code <= _BENGALI_RANGE[1]: |
| continue |
| if ("A" <= ch <= "Z") or ("a" <= ch <= "z"): |
| continue |
| return True, f"Non-Bengali/English letter detected: {ch}" |
|
|
| category = unicodedata.category(ch) |
| if category.startswith("S"): |
| return True, f"Symbol detected: {ch}" |
| if ch.isdigit(): |
| continue |
| if category.startswith("P") or category.startswith("Z"): |
| continue |
| if ch in _ALLOWED_PUNCT: |
| continue |
| return False, "" |
|
|
|
|
| def _call_judge_model(source_text: str, translated_text: str) -> Dict: |
| url = f"{VLLM_BASE_URL}/chat/completions" |
| prompt = ( |
| "You are a strict judge for Bengali translations. " |
| "Return JSON only with keys ok (true/false) and reason. " |
| "Check if the Bengali translation contains any non-Bengali, " |
| "non-English letters, or strange symbols. " |
| "Allow Bengali punctuation, Bengali digits, and common punctuation. " |
| "English words and keywords are allowed. " |
| "If any issue exists, ok must be false.\n\n" |
| f"English:\n{source_text}\n\nBengali:\n{translated_text}" |
| ) |
| payload = { |
| "model": JUDGE_MODEL, |
| "messages": [ |
| {"role": "system", "content": "Respond with JSON only."}, |
| {"role": "user", "content": prompt}, |
| ], |
| "temperature": JUDGE_TEMPERATURE, |
| "max_tokens": 256, |
| } |
| data = json.dumps(payload).encode("utf-8") |
| req = urllib.request.Request( |
| url, |
| data=data, |
| headers={"Content-Type": "application/json"}, |
| method="POST", |
| ) |
| with urllib.request.urlopen(req, timeout=JUDGE_TIMEOUT_SEC) as resp: |
| response_json = json.loads(resp.read().decode("utf-8")) |
| content = response_json["choices"][0]["message"]["content"] |
| return _extract_json_payload(content) |
|
|
|
|
| def _judge_translation(source_text: str, translated_text: str) -> Tuple[bool, str]: |
| if not translated_text.strip(): |
| return False, "Empty translation" |
|
|
| try: |
| response = _call_judge_model(source_text, translated_text) |
| ok = bool(response.get("ok", False)) |
| reason = str(response.get("reason", "")) |
| except (urllib.error.URLError, json.JSONDecodeError, KeyError, TimeoutError) as exc: |
| ok = False |
| reason = f"Judge call failed: {exc}" |
|
|
| disallowed, disallowed_reason = _contains_disallowed_chars(translated_text) |
| if disallowed: |
| return False, disallowed_reason |
| if not ok: |
| return False, reason or "Judge rejected translation" |
| return True, "" |
|
|
|
|
| def translate_with_judge(pipe, source_text: str, field_name: str, record_id: str) -> str: |
| if not source_text.strip(): |
| return source_text |
|
|
| for attempt in range(1, JUDGE_MAX_RETRIES + 1): |
| translated = translate_text(pipe, source_text) |
| ok, reason = _judge_translation(source_text, translated) |
| if ok: |
| return translated |
| print( |
| f"[Judge] id={record_id} field={field_name} attempt={attempt} failed: {reason}" |
| ) |
| time.sleep(1) |
|
|
| print( |
| f"[Judge] id={record_id} field={field_name} failed after " |
| f"{JUDGE_MAX_RETRIES} attempts. Leaving empty for re-translation." |
| ) |
| return "" |
|
|
|
|
| def load_json(path: str) -> List[Dict]: |
| with open(path, "r", encoding="utf-8") as f: |
| return json.load(f) |
|
|
|
|
| def save_json(path: str, data: List[Dict]) -> None: |
| os.makedirs(os.path.dirname(path), exist_ok=True) |
| with open(path, "w", encoding="utf-8") as f: |
| json.dump(data, f, ensure_ascii=False, indent=2) |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser( |
| description="Translate MultiClinSum EN to BN." |
| ) |
| parser.add_argument( |
| "--limit", |
| type=int, |
| default=200, |
| help="Only translate the first N instances.", |
| ) |
| return parser.parse_args() |
|
|
|
|
| def main() -> None: |
| args = parse_args() |
| data = load_json(DATA_PATH) |
| if args.limit is not None: |
| data = data[: args.limit] |
|
|
| existing: Dict[str, Dict] = {} |
| existing_list: List[Dict] = [] |
| resume_index = 0 |
| if os.path.exists(OUT_PATH): |
| existing_list = load_json(OUT_PATH) |
| for item in existing_list: |
| existing[item["id"]] = item |
| if existing_list: |
| prefix_ids = [item.get("id") for item in existing_list] |
| data_prefix_ids = [item.get("id") for item in data[: len(prefix_ids)]] |
| if prefix_ids == data_prefix_ids: |
| resume_index = len(existing_list) |
|
|
| pipe = pipeline( |
| "image-text-to-text", |
| model="google/translategemma-27b-it", |
| device="cuda", |
| dtype=torch.bfloat16, |
| ) |
|
|
| translated: List[Dict] = existing_list.copy() |
| for idx, item in enumerate( |
| tqdm(data[resume_index:], desc="Translating", unit="record"), |
| start=resume_index + 1, |
| ): |
| if item["id"] in existing: |
| translated.append(existing[item["id"]]) |
| else: |
| record_id = str(item.get("id", "")) |
| fulltext_bn = translate_with_judge( |
| pipe, item.get("fulltext", ""), "fulltext", record_id |
| ) |
| summary_bn = translate_with_judge( |
| pipe, item.get("summary", ""), "summary", record_id |
| ) |
| translated.append( |
| { |
| "id": item.get("id"), |
| "fulltext_en": item.get("fulltext", ""), |
| "summary_en": item.get("summary", ""), |
| "fulltext_bn": fulltext_bn, |
| "summary_bn": summary_bn, |
| } |
| ) |
|
|
| if idx % SAVE_EVERY == 0: |
| save_json(OUT_PATH, translated) |
| print(f"Saved {idx}/{len(data)} records to {OUT_PATH}") |
|
|
| save_json(OUT_PATH, translated) |
| print(f"Done. Saved {len(translated)} records to {OUT_PATH}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|