| import os |
| import json |
| import asyncio |
| import argparse |
| import httpx |
| from tqdm.asyncio import tqdm |
| from transformers import AutoProcessor |
|
|
| |
| DATA_PATH = "/home/mshahidul/readctrl/data/testing_data_gs/multiclinsum_gs_train_en.json" |
| OUT_PATH_TEMPLATE = ( |
| "/home/mshahidul/readctrl/data/translated_data/" |
| "multiclinsum_gs_train_{source_lang}2{target_lang}_gemma(0_200).json" |
| ) |
|
|
| TRANSLATE_URL = "http://localhost:8081/v1/chat/completions" |
| JUDGE_URL = "http://localhost:8004/v1/chat/completions" |
| CONCURRENCY_LIMIT = 8 |
|
|
| model_id = "google/translategemma-27b-it" |
| processor = AutoProcessor.from_pretrained(model_id) |
|
|
| semaphore = asyncio.Semaphore(CONCURRENCY_LIMIT) |
|
|
| async def call_llm(client, url, model, messages, temperature=0.1, max_tokens=None): |
| """Generic async caller for both Translation and Judge.""" |
| async with semaphore: |
| try: |
| payload = { |
| "model": model, |
| "messages": messages, |
| "temperature": temperature |
| } |
| if max_tokens is not None: |
| payload["max_tokens"] = max_tokens |
| response = await client.post(url, json=payload, timeout=60.0) |
| result = response.json() |
| return result['choices'][0]['message']['content'].strip() |
| except Exception as e: |
| return None |
|
|
| def build_gemma_prompt(text, source_lang="en", target_lang="bn"): |
| messages = [{ |
| "role": "user", |
| "content": [ |
| { |
| "type": "text", |
| "source_lang_code": source_lang, |
| "target_lang_code": target_lang, |
| "text": text, |
| } |
| ], |
| }] |
| prompt = processor.apply_chat_template( |
| messages, tokenize=False, add_generation_prompt=True |
| ) |
| messages=[{"role": "user", "content": prompt}] |
| return messages |
|
|
| def describe_lang(code): |
| lang_names = { |
| "en": "English", |
| "bn": "Bengali", |
| "zh": "Chinese", |
| "vi": "Vietnamese", |
| "hi": "Hindi" |
| } |
| return lang_names.get(code, "Unknown Language") |
|
|
| async def process_record(client, record, source_lang, target_lang): |
| """Translates and judges a single JSON record.""" |
| |
| |
| translated_fulltext_prompt = build_gemma_prompt( |
| record['fulltext'], source_lang=source_lang, target_lang=target_lang |
| ) |
| translated_summary_prompt = build_gemma_prompt( |
| record['summary'], source_lang=source_lang, target_lang=target_lang |
| ) |
| translated_fulltext = await call_llm( |
| client, TRANSLATE_URL, "translate_gemma", translated_fulltext_prompt, max_tokens=1024 |
| ) |
| translated_summary = await call_llm( |
| client, TRANSLATE_URL, "translate_gemma", translated_summary_prompt, max_tokens=512 |
| ) |
|
|
| |
| source_lang_label = describe_lang(source_lang) |
| target_lang_label = describe_lang(target_lang) |
| judge_prompt = f""" |
| You are a linguistic judge. Evaluate the following {target_lang_label} translation of a {source_lang_label} medical text. |
| Check for: |
| 1. Presence of any language other than {target_lang_label} or {source_lang_label} medical terms. |
| 2. Hallucinated keywords not present in the original. |
| |
| Original {source_lang_label}: {record['fulltext']} |
| Translated {target_lang_label}: {translated_fulltext} |
| |
| Does this translation pass? Respond with ONLY 'PASS' or 'FAIL'. |
| """ |
| judge_pass = False |
| for _ in range(3): |
| judge_res = await call_llm(client, JUDGE_URL, "Qwen/Qwen3-30B-A3B-Instruct-2507", [ |
| {"role": "user", "content": judge_prompt} |
| ]) |
| judge_pass = "PASS" in (judge_res or "").upper() |
| if judge_pass: |
| break |
|
|
| if not judge_pass: |
| return None |
|
|
| record['translated_fulltext'] = translated_fulltext |
| record['translated_summary'] = translated_summary |
| record['judge_pass'] = True |
| return record |
|
|
| def record_key(record): |
| record_id = record.get("id") |
| if record_id is not None: |
| return str(record_id) |
| return f"{record.get('fulltext', '')}||{record.get('summary', '')}" |
|
|
| async def main(): |
| parser = argparse.ArgumentParser(description="Translate Multiclinsum dataset.") |
| parser.add_argument("--source-lang", default="en", help="Source language code") |
| parser.add_argument("--target-lang", default="bn", help="Target language code") |
| args = parser.parse_args() |
|
|
| out_path = OUT_PATH_TEMPLATE.format( |
| source_lang=args.source_lang, target_lang=args.target_lang |
| ) |
|
|
| with open(DATA_PATH, 'r', encoding='utf-8') as f: |
| data = json.load(f)[0:200] |
|
|
| async with httpx.AsyncClient() as client: |
| existing_results = [] |
| if os.path.exists(out_path): |
| with open(out_path, 'r', encoding='utf-8') as f: |
| existing_results = json.load(f) |
|
|
| existing_by_key = {record_key(rec): rec for rec in existing_results} |
| output_results = [] |
|
|
| batch_size = 10 |
| for i in tqdm(range(0, len(data), batch_size)): |
| batch = data[i:i + batch_size] |
| pending = [] |
| pending_keys = [] |
|
|
| for rec in batch: |
| key = record_key(rec) |
| if key in existing_by_key: |
| output_results.append(existing_by_key[key]) |
| else: |
| pending.append(process_record(client, rec, args.source_lang, args.target_lang)) |
| pending_keys.append(key) |
|
|
| if pending: |
| processed = await asyncio.gather(*pending) |
| for key, rec in zip(pending_keys, processed): |
| if rec is not None: |
| existing_by_key[key] = rec |
| output_results.append(rec) |
|
|
| os.makedirs(os.path.dirname(out_path), exist_ok=True) |
| with open(out_path, 'w', encoding='utf-8') as f: |
| json.dump(output_results, f, ensure_ascii=False, indent=4) |
|
|
| if __name__ == "__main__": |
| asyncio.run(main()) |