ViralCutterPRO / scripts /translate_json.py
RafaG's picture
Upload 85 files
80b326d verified
import os
import json
import asyncio
from pathlib import Path
import tqdm.asyncio
from deep_translator import GoogleTranslator
# Lista de idiomas alvo
target_languages = ['en']
# Dicionário de substituições por idioma
substituicoes_por_idioma = {
'en': {
# 'Original': 'Translation'
},
}
# Configurações de tradução
sentence_endings = ['.', '!', '?', ')', 'よ', 'ね', 'の', 'さ', 'ぞ', 'な', 'か', '!', '。', '」', '…']
separator = " ◌ "
separator_unjoin = separator.replace(' ', '')
chunk_max_chars = 4999
def substituir_texto(text, substituicoes):
"""Função para substituir texto."""
for old, new in substituicoes.items():
text = text.replace(old, new)
return text
async def translate_chunk(index, chunk, target_lang):
while True:
try:
translator = GoogleTranslator(source='auto', target=target_lang)
translated_chunk = await asyncio.get_event_loop().run_in_executor(None, translator.translate, chunk)
await asyncio.sleep(0)
if translated_chunk is None or len(translated_chunk.replace(separator.strip(), '').split()) == 0:
return chunk
return translated_chunk
except Exception as e:
print(f"\r[chunk {index}]: Exception: {e.__doc__} Retrying in 30 seconds...", flush=True)
await asyncio.sleep(30)
def join_sentences(texts, max_chars):
joined_texts = []
current_chunk = ""
for text in texts:
if not text or text is None:
text = 'ㅤ'
if len(current_chunk) + len(text) + len(separator) <= max_chars:
current_chunk += text + separator
if any(text.endswith(ending) for ending in sentence_endings):
joined_texts.append(current_chunk)
current_chunk = ""
else:
if current_chunk:
joined_texts.append(current_chunk)
current_chunk = ""
if len(current_chunk) + len(text) + len(separator) <= max_chars:
current_chunk += text + separator
else:
end_index = text.rfind(' ', 0, max_chars - (1 + len(separator)))
if end_index == - (1 + len(separator)):
end_index = max_chars - (1 + len(separator))
joined_texts.append((text[:end_index] + '…' + separator)[:max_chars])
if current_chunk:
joined_texts.append(current_chunk)
return joined_texts
def unjoin_sentences(original_sentence: str, modified_sentence: str, separator: str):
if original_sentence is None:
return ' '
original_texts = original_sentence.split(separator)
original_texts = [s.strip() for s in original_texts if s.strip()]
if modified_sentence is None:
return original_texts or ' '
modified_sentence = modified_sentence.replace(f"{separator_unjoin} ", f"{separator_unjoin}").replace(f" {separator_unjoin}", f"{separator_unjoin}").replace(
f"{separator_unjoin}.", f".{separator_unjoin}").replace(f"{separator_unjoin},", f",{separator_unjoin}")
modified_texts = modified_sentence.split(separator_unjoin)
modified_texts = [s.strip() for s in modified_texts if s.strip()]
if original_texts == "..." or original_texts == "…":
return original_texts
if len(original_texts) == len(modified_texts):
return modified_texts
original_word_count = sum(len(text.split()) for text in original_texts)
modified_word_count = len(' '.join(modified_texts).split())
if original_word_count == 0 or modified_word_count == 0:
return original_sentence.replace(separator, ' ').strip()
modified_words_proportion = modified_word_count / original_word_count
modified_words = ' '.join(modified_texts).split()
new_modified_texts = []
current_index = 0
for original_text in original_texts:
num_words = max(1, int(round(len(original_text.split()) * modified_words_proportion)))
text_words = modified_words[current_index:current_index + num_words]
new_modified_texts.append(' '.join(text_words))
current_index += num_words
if current_index < len(modified_words):
new_modified_texts[-1] += ' ' + ' '.join(modified_words[current_index:])
return new_modified_texts or original_texts or ' '
def adjust_segments(segments):
for i in range(len(segments)):
current_segment = segments[i]
next_segment = segments[i + 1] if i < len(segments) - 1 else None
# Divide o texto em palavras
text_words = current_segment['text'].split()
# Ajusta as palavras do segmento atual
current_segment['words'] = [
{
'word': word,
'start': current_segment['start'] + (idx * (current_segment['end'] - current_segment['start']) / len(text_words)),
'end': current_segment['start'] + ((idx + 1) * (current_segment['end'] - current_segment['start']) / len(text_words)),
'score': 1.0 # Mantemos o score como 1.0 já que não temos informações precisas
}
for idx, word in enumerate(text_words)
]
# Ajusta o fim da última palavra do segmento atual
if current_segment['words']:
last_word = current_segment['words'][-1]
if next_segment:
# Estende até o início do próximo segmento ou até 2 segundos, o que ocorrer primeiro
extended_end = min(next_segment['start'], last_word['start'] + 2)
else:
# Se for o último segmento, estende por até 2 segundos
extended_end = min(current_segment['end'] + 2, last_word['start'] + 2)
last_word['end'] = extended_end
current_segment['end'] = extended_end
# Ajusta o início do próximo segmento se necessário
if next_segment and next_segment['words']:
next_segment['words'][0]['start'] = next_segment['start']
return segments
async def translate_json_file(json_file_path: Path, translated_json_path: Path, target_lang):
with open(json_file_path, 'r', encoding='utf-8') as file:
data = json.load(file)
segments = data['segments']
texts_to_translate = [segment['text'] for segment in segments if segment['text']]
words_to_translate = [word['word'] for segment in segments for word in segment['words']]
all_texts = texts_to_translate + words_to_translate
chunks = join_sentences(all_texts, chunk_max_chars)
translated_chunks = [None] * len(chunks)
tasks = []
semaphore = asyncio.Semaphore(7)
async def translate_async():
async def run_translate(index, chunk, lang):
while True:
try:
async with semaphore:
result = await asyncio.wait_for(translate_chunk(index, chunk, lang), 120)
translated_chunks[index] = result
break
except Exception:
await asyncio.sleep(3)
for index, chunk in enumerate(chunks):
task = asyncio.create_task(run_translate(index, chunk, target_lang))
tasks.append(task)
for tsk in tqdm.asyncio.tqdm_asyncio.as_completed(tasks, total=len(tasks), desc="Translating", unit="chunks", unit_scale=False, leave=True, bar_format="{desc} {percentage:3.0f}% | {n_fmt}/{total_fmt} | ETA: {remaining} | ⏱: {elapsed}"):
await tsk
await translate_async()
print('Processing translation...', end='')
unjoined_texts = [unjoin_sentences(chunk, translated_chunks[i], separator_unjoin) for i, chunk in enumerate(chunks)]
unjoined_texts = [text for sublist in unjoined_texts for text in sublist if text]
translated_texts = unjoined_texts[:len(texts_to_translate)]
translated_words = unjoined_texts[len(texts_to_translate):]
word_index = 0
text_index = 0
for segment in segments:
if segment['text']:
segment['text'] = translated_texts[text_index] if text_index < len(translated_texts) else segment['text']
text_index += 1
for word in segment['words']:
if word_index < len(translated_words):
word['word'] = translated_words[word_index]
word_index += 1
else:
print(f"\nWarning: Not enough translated words. Keeping original word: {word['word']}")
# Ajusta os segmentos após a tradução
segments = adjust_segments(segments)
data['segments'] = segments
os.makedirs(translated_json_path.parent, exist_ok=True)
with open(translated_json_path, 'w', encoding='utf-8') as file:
json.dump(data, file, ensure_ascii=False, indent=2)
print('\r ', end='\r')
return data
async def main():
folder_path = './JSON/'
for filename in os.listdir(folder_path):
if filename.endswith('.json'):
base_name = os.path.splitext(filename)[0]
for lang in target_languages:
output_filename = f'{base_name}_{lang}.json'
output_file_path = os.path.join(folder_path, output_filename)
if not os.path.exists(output_file_path):
print(f'Traduzindo para {lang}: {filename}')
translated_data = await translate_json_file(Path(os.path.join(folder_path, filename)), Path(output_file_path), lang)
if lang in substituicoes_por_idioma:
for segment in translated_data['segments']:
segment['text'] = substituir_texto(segment['text'], substituicoes_por_idioma[lang])
for word in segment['words']:
word['word'] = substituir_texto(word['word'], substituicoes_por_idioma[lang])
with open(output_file_path, 'w', encoding='utf-8') as file:
json.dump(translated_data, file, ensure_ascii=False, indent=2)
# Realiza as substituições no arquivo original JSON após todas as traduções
original_file_path = os.path.join(folder_path, filename)
with open(original_file_path, 'r', encoding='utf-8') as file:
original_data = json.load(file)
for segment in original_data['segments']:
segment['text'] = substituir_texto(segment['text'], substituicoes_por_idioma['en'])
for word in segment['words']:
word['word'] = substituir_texto(word['word'], substituicoes_por_idioma['en'])
with open(original_file_path, 'w', encoding='utf-8') as file:
json.dump(original_data, file, ensure_ascii=False, indent=2)
print('Traduções e substituições concluídas.')
async def translate_project_subs(project_folder: str, target_lang: str):
"""
Translates all _processed.json files in the 'subs' folder of the project.
Creates a backup of the original as _original.json.
"""
subs_folder = Path(project_folder) / "subs"
if not subs_folder.exists():
print(f"Subtitle folder not found: {subs_folder}")
return
# Look for files ending in _processed.json
json_files = list(subs_folder.glob("*_processed.json"))
if not json_files:
print("No subtitle files found to translate.")
return
print(f"Found {len(json_files)} subtitle files to translate to '{target_lang}'...")
for json_file in json_files:
# Backup logic
backup_file = json_file.with_name(json_file.stem + "_original" + json_file.suffix)
source_file = json_file
if backup_file.exists():
print(f"Using existing backup for {json_file.name} as source.")
source_file = backup_file
else:
print(f"Backing up original to {backup_file.name}...")
try:
# Rename current to backup
json_file.rename(backup_file)
source_file = backup_file
except Exception as e:
print(f"Error creating backup for {json_file.name}: {e}")
continue
# Translate source (backup) -> target (original filename)
# effectively replacing the file read by the next step
print(f"Translating {source_file.name} -> {json_file.name} ({target_lang})...")
try:
await translate_json_file(source_file, json_file, target_lang)
# Apply language specific substitutions if any
if target_lang in substituicoes_por_idioma:
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
modified = False
for segment in data.get('segments', []):
# Text
new_text = substituir_texto(segment['text'], substituicoes_por_idioma[target_lang])
if new_text != segment['text']:
segment['text'] = new_text
modified = True
# Words
for word in segment.get('words', []):
w_text = word.get('word', '')
new_w_text = substituir_texto(w_text, substituicoes_por_idioma[target_lang])
if new_w_text != w_text:
word['word'] = new_w_text
modified = True
if modified:
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"Error translating {json_file.name}: {e}")
# If failed and output doesn't exist, try to restore backup?
if not json_file.exists() and backup_file.exists():
print("Restoring backup due to failure...")
backup_file.rename(json_file)
print("Translation batch finished.")
if __name__ == "__main__":
asyncio.run(main())