Spaces:
Build error
Build error
| """ | |
| Stage 5 — Translation Service | |
| Translates transcribed segments into target language. | |
| Uses deep-translator (Google Translate free tier) — no API key needed. | |
| Batches nearby segments for better translation quality. | |
| """ | |
| import logging | |
| import json | |
| from pathlib import Path | |
| from typing import List, Dict, Optional | |
| from config import TRANSLATION_BATCH_SIZE | |
| logger = logging.getLogger(__name__) | |
| def translate_segments( | |
| segments: List[Dict], | |
| source_language: str, | |
| target_language: str, | |
| output_dir: Path, | |
| progress_callback=None | |
| ) -> List[Dict]: | |
| """ | |
| Translate all segments into the target language. | |
| Groups short consecutive segments from same speaker for better context. | |
| Returns segments with added 'translated_text' field. | |
| """ | |
| try: | |
| from deep_translator import GoogleTranslator | |
| except ImportError: | |
| raise RuntimeError("deep-translator not installed. Run: pip install deep-translator") | |
| translator = GoogleTranslator(source=source_language, target=target_language) | |
| total = len(segments) | |
| translated_segments = [] | |
| # Batch translate for efficiency and context | |
| batches = _create_translation_batches(segments) | |
| logger.info(f"Translating {total} segments in {len(batches)} batches → {target_language}") | |
| done_count = 0 | |
| for batch_idx, batch in enumerate(batches): | |
| # Combine batch texts with separator | |
| combined_text = " ||| ".join(seg["text"] for seg in batch) | |
| try: | |
| translated_combined = translator.translate(combined_text) | |
| # Split back | |
| translated_parts = translated_combined.split(" ||| ") | |
| # If split count doesn't match, translate individually | |
| if len(translated_parts) != len(batch): | |
| translated_parts = _translate_individually(translator, batch) | |
| except Exception as e: | |
| logger.warning(f"Batch translation failed: {e}. Translating individually.") | |
| translated_parts = _translate_individually(translator, batch) | |
| # Assign translations back to segments | |
| for seg, translated_text in zip(batch, translated_parts): | |
| seg_copy = seg.copy() | |
| seg_copy["translated_text"] = translated_text.strip() | |
| translated_segments.append(seg_copy) | |
| done_count += 1 | |
| if progress_callback: | |
| progress_callback(int(done_count / total * 100)) | |
| # Handle empty/non-speech segments | |
| for seg in translated_segments: | |
| if not seg.get("translated_text") or seg["translated_text"].strip() == "": | |
| seg["translated_text"] = seg["text"] # Keep original if translation empty | |
| # Save translated transcript | |
| output_path = output_dir / "translated_segments.json" | |
| with open(output_path, "w", encoding="utf-8") as f: | |
| json.dump({ | |
| "source_language": source_language, | |
| "target_language": target_language, | |
| "segments": translated_segments, | |
| "total": len(translated_segments), | |
| }, f, ensure_ascii=False, indent=2) | |
| logger.info(f"Translation complete: {len(translated_segments)} segments → {output_path}") | |
| return translated_segments | |
| def _create_translation_batches( | |
| segments: List[Dict], | |
| max_batch_chars: int = 4000 | |
| ) -> List[List[Dict]]: | |
| """ | |
| Group consecutive segments from same speaker into batches. | |
| Respects Google Translate's character limits (~5000 chars). | |
| """ | |
| batches = [] | |
| current_batch = [] | |
| current_chars = 0 | |
| current_speaker = None | |
| for seg in segments: | |
| text = seg.get("text", "").strip() | |
| if not text: | |
| continue | |
| # Start new batch if speaker changes or char limit reached | |
| if (current_speaker and seg["speaker"] != current_speaker) or \ | |
| (current_chars + len(text) > max_batch_chars) or \ | |
| len(current_batch) >= TRANSLATION_BATCH_SIZE: | |
| if current_batch: | |
| batches.append(current_batch) | |
| current_batch = [] | |
| current_chars = 0 | |
| current_batch.append(seg) | |
| current_chars += len(text) + 5 # +5 for separator | |
| current_speaker = seg["speaker"] | |
| if current_batch: | |
| batches.append(current_batch) | |
| return batches | |
| def _translate_individually(translator, batch: List[Dict]) -> List[str]: | |
| """Fallback: translate each segment one by one.""" | |
| results = [] | |
| for seg in batch: | |
| try: | |
| translated = translator.translate(seg["text"]) | |
| results.append(translated or seg["text"]) | |
| except Exception as e: | |
| logger.warning(f"Individual translation failed for '{seg['text'][:50]}...': {e}") | |
| results.append(seg["text"]) # Keep original on failure | |
| return results | |