Spaces:
Running
Running
| import os | |
| import json | |
| import asyncio | |
| from pathlib import Path | |
| import tqdm.asyncio | |
| from deep_translator import GoogleTranslator | |
| # Lista de idiomas alvo | |
| target_languages = ['en'] | |
| # Dicionário de substituições por idioma | |
| substituicoes_por_idioma = { | |
| 'en': { | |
| # 'Original': 'Translation' | |
| }, | |
| } | |
| # Configurações de tradução | |
| sentence_endings = ['.', '!', '?', ')', 'よ', 'ね', 'の', 'さ', 'ぞ', 'な', 'か', '!', '。', '」', '…'] | |
| separator = " ◌ " | |
| separator_unjoin = separator.replace(' ', '') | |
| chunk_max_chars = 4999 | |
| def substituir_texto(text, substituicoes): | |
| """Função para substituir texto.""" | |
| for old, new in substituicoes.items(): | |
| text = text.replace(old, new) | |
| return text | |
| async def translate_chunk(index, chunk, target_lang): | |
| while True: | |
| try: | |
| translator = GoogleTranslator(source='auto', target=target_lang) | |
| translated_chunk = await asyncio.get_event_loop().run_in_executor(None, translator.translate, chunk) | |
| await asyncio.sleep(0) | |
| if translated_chunk is None or len(translated_chunk.replace(separator.strip(), '').split()) == 0: | |
| return chunk | |
| return translated_chunk | |
| except Exception as e: | |
| print(f"\r[chunk {index}]: Exception: {e.__doc__} Retrying in 30 seconds...", flush=True) | |
| await asyncio.sleep(30) | |
| def join_sentences(texts, max_chars): | |
| joined_texts = [] | |
| current_chunk = "" | |
| for text in texts: | |
| if not text or text is None: | |
| text = 'ㅤ' | |
| if len(current_chunk) + len(text) + len(separator) <= max_chars: | |
| current_chunk += text + separator | |
| if any(text.endswith(ending) for ending in sentence_endings): | |
| joined_texts.append(current_chunk) | |
| current_chunk = "" | |
| else: | |
| if current_chunk: | |
| joined_texts.append(current_chunk) | |
| current_chunk = "" | |
| if len(current_chunk) + len(text) + len(separator) <= max_chars: | |
| current_chunk += text + separator | |
| else: | |
| end_index = text.rfind(' ', 0, max_chars - (1 + len(separator))) | |
| if end_index == - (1 + len(separator)): | |
| end_index = max_chars - (1 + len(separator)) | |
| joined_texts.append((text[:end_index] + '…' + separator)[:max_chars]) | |
| if current_chunk: | |
| joined_texts.append(current_chunk) | |
| return joined_texts | |
| def unjoin_sentences(original_sentence: str, modified_sentence: str, separator: str): | |
| if original_sentence is None: | |
| return ' ' | |
| original_texts = original_sentence.split(separator) | |
| original_texts = [s.strip() for s in original_texts if s.strip()] | |
| if modified_sentence is None: | |
| return original_texts or ' ' | |
| modified_sentence = modified_sentence.replace(f"{separator_unjoin} ", f"{separator_unjoin}").replace(f" {separator_unjoin}", f"{separator_unjoin}").replace( | |
| f"{separator_unjoin}.", f".{separator_unjoin}").replace(f"{separator_unjoin},", f",{separator_unjoin}") | |
| modified_texts = modified_sentence.split(separator_unjoin) | |
| modified_texts = [s.strip() for s in modified_texts if s.strip()] | |
| if original_texts == "..." or original_texts == "…": | |
| return original_texts | |
| if len(original_texts) == len(modified_texts): | |
| return modified_texts | |
| original_word_count = sum(len(text.split()) for text in original_texts) | |
| modified_word_count = len(' '.join(modified_texts).split()) | |
| if original_word_count == 0 or modified_word_count == 0: | |
| return original_sentence.replace(separator, ' ').strip() | |
| modified_words_proportion = modified_word_count / original_word_count | |
| modified_words = ' '.join(modified_texts).split() | |
| new_modified_texts = [] | |
| current_index = 0 | |
| for original_text in original_texts: | |
| num_words = max(1, int(round(len(original_text.split()) * modified_words_proportion))) | |
| text_words = modified_words[current_index:current_index + num_words] | |
| new_modified_texts.append(' '.join(text_words)) | |
| current_index += num_words | |
| if current_index < len(modified_words): | |
| new_modified_texts[-1] += ' ' + ' '.join(modified_words[current_index:]) | |
| return new_modified_texts or original_texts or ' ' | |
| def adjust_segments(segments): | |
| for i in range(len(segments)): | |
| current_segment = segments[i] | |
| next_segment = segments[i + 1] if i < len(segments) - 1 else None | |
| # Divide o texto em palavras | |
| text_words = current_segment['text'].split() | |
| # Ajusta as palavras do segmento atual | |
| current_segment['words'] = [ | |
| { | |
| 'word': word, | |
| 'start': current_segment['start'] + (idx * (current_segment['end'] - current_segment['start']) / len(text_words)), | |
| 'end': current_segment['start'] + ((idx + 1) * (current_segment['end'] - current_segment['start']) / len(text_words)), | |
| 'score': 1.0 # Mantemos o score como 1.0 já que não temos informações precisas | |
| } | |
| for idx, word in enumerate(text_words) | |
| ] | |
| # Ajusta o fim da última palavra do segmento atual | |
| if current_segment['words']: | |
| last_word = current_segment['words'][-1] | |
| if next_segment: | |
| # Estende até o início do próximo segmento ou até 2 segundos, o que ocorrer primeiro | |
| extended_end = min(next_segment['start'], last_word['start'] + 2) | |
| else: | |
| # Se for o último segmento, estende por até 2 segundos | |
| extended_end = min(current_segment['end'] + 2, last_word['start'] + 2) | |
| last_word['end'] = extended_end | |
| current_segment['end'] = extended_end | |
| # Ajusta o início do próximo segmento se necessário | |
| if next_segment and next_segment['words']: | |
| next_segment['words'][0]['start'] = next_segment['start'] | |
| return segments | |
| async def translate_json_file(json_file_path: Path, translated_json_path: Path, target_lang): | |
| with open(json_file_path, 'r', encoding='utf-8') as file: | |
| data = json.load(file) | |
| segments = data['segments'] | |
| texts_to_translate = [segment['text'] for segment in segments if segment['text']] | |
| words_to_translate = [word['word'] for segment in segments for word in segment['words']] | |
| all_texts = texts_to_translate + words_to_translate | |
| chunks = join_sentences(all_texts, chunk_max_chars) | |
| translated_chunks = [None] * len(chunks) | |
| tasks = [] | |
| semaphore = asyncio.Semaphore(7) | |
| async def translate_async(): | |
| async def run_translate(index, chunk, lang): | |
| while True: | |
| try: | |
| async with semaphore: | |
| result = await asyncio.wait_for(translate_chunk(index, chunk, lang), 120) | |
| translated_chunks[index] = result | |
| break | |
| except Exception: | |
| await asyncio.sleep(3) | |
| for index, chunk in enumerate(chunks): | |
| task = asyncio.create_task(run_translate(index, chunk, target_lang)) | |
| tasks.append(task) | |
| for tsk in tqdm.asyncio.tqdm_asyncio.as_completed(tasks, total=len(tasks), desc="Translating", unit="chunks", unit_scale=False, leave=True, bar_format="{desc} {percentage:3.0f}% | {n_fmt}/{total_fmt} | ETA: {remaining} | ⏱: {elapsed}"): | |
| await tsk | |
| await translate_async() | |
| print('Processing translation...', end='') | |
| unjoined_texts = [unjoin_sentences(chunk, translated_chunks[i], separator_unjoin) for i, chunk in enumerate(chunks)] | |
| unjoined_texts = [text for sublist in unjoined_texts for text in sublist if text] | |
| translated_texts = unjoined_texts[:len(texts_to_translate)] | |
| translated_words = unjoined_texts[len(texts_to_translate):] | |
| word_index = 0 | |
| text_index = 0 | |
| for segment in segments: | |
| if segment['text']: | |
| segment['text'] = translated_texts[text_index] if text_index < len(translated_texts) else segment['text'] | |
| text_index += 1 | |
| for word in segment['words']: | |
| if word_index < len(translated_words): | |
| word['word'] = translated_words[word_index] | |
| word_index += 1 | |
| else: | |
| print(f"\nWarning: Not enough translated words. Keeping original word: {word['word']}") | |
| # Ajusta os segmentos após a tradução | |
| segments = adjust_segments(segments) | |
| data['segments'] = segments | |
| os.makedirs(translated_json_path.parent, exist_ok=True) | |
| with open(translated_json_path, 'w', encoding='utf-8') as file: | |
| json.dump(data, file, ensure_ascii=False, indent=2) | |
| print('\r ', end='\r') | |
| return data | |
| async def main(): | |
| folder_path = './JSON/' | |
| for filename in os.listdir(folder_path): | |
| if filename.endswith('.json'): | |
| base_name = os.path.splitext(filename)[0] | |
| for lang in target_languages: | |
| output_filename = f'{base_name}_{lang}.json' | |
| output_file_path = os.path.join(folder_path, output_filename) | |
| if not os.path.exists(output_file_path): | |
| print(f'Traduzindo para {lang}: {filename}') | |
| translated_data = await translate_json_file(Path(os.path.join(folder_path, filename)), Path(output_file_path), lang) | |
| if lang in substituicoes_por_idioma: | |
| for segment in translated_data['segments']: | |
| segment['text'] = substituir_texto(segment['text'], substituicoes_por_idioma[lang]) | |
| for word in segment['words']: | |
| word['word'] = substituir_texto(word['word'], substituicoes_por_idioma[lang]) | |
| with open(output_file_path, 'w', encoding='utf-8') as file: | |
| json.dump(translated_data, file, ensure_ascii=False, indent=2) | |
| # Realiza as substituições no arquivo original JSON após todas as traduções | |
| original_file_path = os.path.join(folder_path, filename) | |
| with open(original_file_path, 'r', encoding='utf-8') as file: | |
| original_data = json.load(file) | |
| for segment in original_data['segments']: | |
| segment['text'] = substituir_texto(segment['text'], substituicoes_por_idioma['en']) | |
| for word in segment['words']: | |
| word['word'] = substituir_texto(word['word'], substituicoes_por_idioma['en']) | |
| with open(original_file_path, 'w', encoding='utf-8') as file: | |
| json.dump(original_data, file, ensure_ascii=False, indent=2) | |
| print('Traduções e substituições concluídas.') | |
| async def translate_project_subs(project_folder: str, target_lang: str): | |
| """ | |
| Translates all _processed.json files in the 'subs' folder of the project. | |
| Creates a backup of the original as _original.json. | |
| """ | |
| subs_folder = Path(project_folder) / "subs" | |
| if not subs_folder.exists(): | |
| print(f"Subtitle folder not found: {subs_folder}") | |
| return | |
| # Look for files ending in _processed.json | |
| json_files = list(subs_folder.glob("*_processed.json")) | |
| if not json_files: | |
| print("No subtitle files found to translate.") | |
| return | |
| print(f"Found {len(json_files)} subtitle files to translate to '{target_lang}'...") | |
| for json_file in json_files: | |
| # Backup logic | |
| backup_file = json_file.with_name(json_file.stem + "_original" + json_file.suffix) | |
| source_file = json_file | |
| if backup_file.exists(): | |
| print(f"Using existing backup for {json_file.name} as source.") | |
| source_file = backup_file | |
| else: | |
| print(f"Backing up original to {backup_file.name}...") | |
| try: | |
| # Rename current to backup | |
| json_file.rename(backup_file) | |
| source_file = backup_file | |
| except Exception as e: | |
| print(f"Error creating backup for {json_file.name}: {e}") | |
| continue | |
| # Translate source (backup) -> target (original filename) | |
| # effectively replacing the file read by the next step | |
| print(f"Translating {source_file.name} -> {json_file.name} ({target_lang})...") | |
| try: | |
| await translate_json_file(source_file, json_file, target_lang) | |
| # Apply language specific substitutions if any | |
| if target_lang in substituicoes_por_idioma: | |
| with open(json_file, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| modified = False | |
| for segment in data.get('segments', []): | |
| # Text | |
| new_text = substituir_texto(segment['text'], substituicoes_por_idioma[target_lang]) | |
| if new_text != segment['text']: | |
| segment['text'] = new_text | |
| modified = True | |
| # Words | |
| for word in segment.get('words', []): | |
| w_text = word.get('word', '') | |
| new_w_text = substituir_texto(w_text, substituicoes_por_idioma[target_lang]) | |
| if new_w_text != w_text: | |
| word['word'] = new_w_text | |
| modified = True | |
| if modified: | |
| with open(json_file, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| except Exception as e: | |
| print(f"Error translating {json_file.name}: {e}") | |
| # If failed and output doesn't exist, try to restore backup? | |
| if not json_file.exists() and backup_file.exists(): | |
| print("Restoring backup due to failure...") | |
| backup_file.rename(json_file) | |
| print("Translation batch finished.") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |