readCtrl_lambda / code /translation /misc /translate_multiclinsum_en2bn_v2.py
mshahidul
Initial commit of readCtrl code without large models
030876e
import os
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "2"
import argparse
import json
import re
import time
import unicodedata
import urllib.error
import urllib.request
from typing import Dict, List, Tuple
from openai import OpenAI
from tqdm import tqdm
DATA_PATH = "/home/mshahidul/readctrl/data/testing_data_gs/multiclinsum_gs_train_en.json"
OUT_PATH = "/home/mshahidul/readctrl/data/translated_data/multiclinsum_gs_train_en2bn_gemma(0_200).json"
# Tune if you hit model input limits.
MAX_CHARS_PER_CHUNK = 1500
MAX_NEW_TOKENS = 512
SAVE_EVERY = 10
OPENAI_BASE_URL = os.environ.get("OPENAI_BASE_URL", "http://localhost:8081/v1")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "no-key-required")
OPENAI_MODEL = os.environ.get("OPENAI_MODEL", "translate_gemma")
OPENAI_TIMEOUT_SEC = float(os.environ.get("OPENAI_TIMEOUT_SEC", "60"))
VLLM_BASE_URL = os.environ.get("VLLM_BASE_URL", "http://localhost:8004/v1")
JUDGE_MODEL = os.environ.get("JUDGE_MODEL", "Qwen/Qwen3-30B-A3B-Instruct-2507")
JUDGE_MAX_RETRIES = 3
JUDGE_TIMEOUT_SEC = 60
JUDGE_TEMPERATURE = 0.0
_BENGALI_RANGE = (0x0980, 0x09FF)
_ALLOWED_PUNCT = set(" \n\t\r.,;:!?-—()[]{}\"'`~")
_ALLOWED_EN_WORDS = {
w.strip().lower()
for w in os.environ.get("ALLOWED_EN_WORDS", "").split(",")
if w.strip()
}
def chunk_text(text: str, max_chars: int) -> List[str]:
if len(text) <= max_chars:
return [text]
chunks: List[str] = []
paragraphs = [p for p in text.split("\n\n") if p.strip()]
for para in paragraphs:
if len(para) <= max_chars:
chunks.append(para)
continue
sentences = [s.strip() for s in para.split(". ") if s.strip()]
current = ""
for sentence in sentences:
sentence = sentence if sentence.endswith(".") else f"{sentence}."
if not current:
current = sentence
continue
if len(current) + 1 + len(sentence) <= max_chars:
current = f"{current} {sentence}"
else:
chunks.append(current)
current = sentence
if current:
chunks.append(current)
return chunks
def translate_text(client: OpenAI, text: str) -> str:
if not text.strip():
return text
chunks = chunk_text(text, MAX_CHARS_PER_CHUNK)
if len(chunks) == 1:
messages = [
{
"role": "user",
"content": (
"Translate the following text from English to Bengali:\n\n"
f"{chunks[0]}"
),
}
]
completion = client.chat.completions.create(
model=OPENAI_MODEL,
messages=messages,
max_tokens=MAX_NEW_TOKENS,
stream=False,
)
return completion.choices[0].message.content
def _translate_chunk(chunk: str) -> str:
messages = [
{
"role": "user",
"content": (
"Translate the following text from English to Bengali:\n\n"
f"{chunk}"
),
}
]
completion = client.chat.completions.create(
model=OPENAI_MODEL,
messages=messages,
max_tokens=MAX_NEW_TOKENS,
stream=False,
)
return completion.choices[0].message.content
translated_chunks: List[str] = []
for chunk in chunks:
translated_chunks.append(_translate_chunk(chunk))
return "\n\n".join(translated_chunks)
def _strip_code_fences(text: str) -> str:
text = text.strip()
if text.startswith("```"):
text = re.sub(r"^```[a-zA-Z]*\n?", "", text)
text = re.sub(r"\n?```$", "", text)
return text.strip()
def _extract_json_payload(text: str) -> Dict:
cleaned = _strip_code_fences(text)
try:
return json.loads(cleaned)
except json.JSONDecodeError:
match = re.search(r"\{.*\}", cleaned, flags=re.DOTALL)
if match:
return json.loads(match.group(0))
return {}
def _contains_disallowed_chars(text: str) -> Tuple[bool, str]:
# Allow common medical/tech symbols that might be marked as 'S' (Symbol)
# like ±, μ, §, ©, or mathematical operators.
allowed_extra_symbols = {"±", "μ", "°", "%", "+", "=", "<", ">", "/", "\\"}
for ch in text:
code = ord(ch)
# 1. Allow Bengali Range
if _BENGALI_RANGE[0] <= code <= _BENGALI_RANGE[1]:
continue
# 2. Allow Basic Latin (English + Punctuation)
if 0x0000 <= code <= 0x007F:
continue
# 3. Allow specifically whitelisted symbols
if ch in allowed_extra_symbols:
continue
category = unicodedata.category(ch)
# Only fail if it's a 'Other, Not Assigned' or 'Private Use' character (junk)
if category in ["Cn", "Co"]:
return True, f"Corrupted character detected: {ch} (U+{code:04X})"
return False, ""
def _call_judge_model(source_text: str, translated_text: str) -> Dict:
url = f"{VLLM_BASE_URL}/chat/completions"
prompt = (
"You are a strict judge for Bengali translations. "
"Return JSON only with keys ok (true/false) and reason. "
"Check if the Bengali translation contains any non-Bengali, "
"non-English letters, or strange symbols. "
"Allow Bengali punctuation, Bengali digits, and common punctuation. "
"English words and keywords are allowed. "
"Minor punctuation differences are acceptable."
"Allow common medical/tech symbols that might be marked as 'S' (Symbol) like ±, μ, §, ©, or mathematical operators."
"If any issue exists, ok must be false.\n\n"
f"English:\n{source_text}\n\nBengali:\n{translated_text}"
)
payload = {
"model": JUDGE_MODEL,
"messages": [
{"role": "system", "content": "Respond with JSON only."},
{"role": "user", "content": prompt},
],
"temperature": JUDGE_TEMPERATURE,
"max_tokens": 256,
}
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
url,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=JUDGE_TIMEOUT_SEC) as resp:
response_json = json.loads(resp.read().decode("utf-8"))
content = response_json["choices"][0]["message"]["content"]
return _extract_json_payload(content)
def _judge_translation(source_text: str, translated_text: str) -> Tuple[bool, str]:
if not translated_text.strip():
return False, "Empty translation"
try:
response = _call_judge_model(source_text, translated_text)
ok = bool(response.get("ok", False))
reason = str(response.get("reason", ""))
except (urllib.error.URLError, json.JSONDecodeError, KeyError, TimeoutError) as exc:
ok = False
reason = f"Judge call failed: {exc}"
disallowed, disallowed_reason = _contains_disallowed_chars(translated_text)
if disallowed:
return False, disallowed_reason
if not ok:
return False, reason or "Judge rejected translation"
return True, ""
def translate_with_judge(
client: OpenAI, source_text: str, field_name: str, record_id: str
) -> str:
if not source_text.strip():
return source_text
for attempt in range(1, JUDGE_MAX_RETRIES + 1):
translated = translate_text(client, source_text)
ok, reason = _judge_translation(source_text, translated)
if ok:
return translated
print(
f"[Judge] id={record_id} field={field_name} attempt={attempt} failed: {reason}"
)
time.sleep(1)
print(
f"[Judge] id={record_id} field={field_name} failed after "
f"{JUDGE_MAX_RETRIES} attempts. Leaving empty for re-translation."
)
return ""
def load_json(path: str) -> List[Dict]:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def save_json(path: str, data: List[Dict]) -> None:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Translate MultiClinSum EN to BN."
)
parser.add_argument(
"--limit",
type=int,
default=200,
help="Only translate the first N instances.",
)
return parser.parse_args()
def main() -> None:
args = parse_args()
data = load_json(DATA_PATH)
if args.limit is not None:
data = data[: args.limit]
existing: Dict[str, Dict] = {}
existing_list: List[Dict] = []
resume_index = 0
if os.path.exists(OUT_PATH):
existing_list = load_json(OUT_PATH)
for item in existing_list:
existing[item["id"]] = item
if existing_list:
prefix_ids = [item.get("id") for item in existing_list]
data_prefix_ids = [item.get("id") for item in data[: len(prefix_ids)]]
if prefix_ids == data_prefix_ids:
resume_index = len(existing_list)
client = OpenAI(
base_url=OPENAI_BASE_URL,
api_key=OPENAI_API_KEY,
timeout=OPENAI_TIMEOUT_SEC,
)
translated: List[Dict] = existing_list.copy()
for idx, item in enumerate(
tqdm(data[resume_index:], desc="Translating", unit="record"),
start=resume_index + 1,
):
if item["id"] in existing:
translated.append(existing[item["id"]])
else:
record_id = str(item.get("id", ""))
fulltext_bn = translate_with_judge(
client, item.get("fulltext", ""), "fulltext", record_id
)
summary_bn = translate_with_judge(
client, item.get("summary", ""), "summary", record_id
)
translated.append(
{
"id": item.get("id"),
"fulltext_en": item.get("fulltext", ""),
"summary_en": item.get("summary", ""),
"fulltext_bn": fulltext_bn,
"summary_bn": summary_bn,
}
)
if idx % SAVE_EVERY == 0:
save_json(OUT_PATH, translated)
print(f"Saved {idx}/{len(data)} records to {OUT_PATH}")
save_json(OUT_PATH, translated)
print(f"Done. Saved {len(translated)} records to {OUT_PATH}")
if __name__ == "__main__":
main()