Spaces:
Paused
Paused
| import json | |
| import pysrt | |
| import re | |
| import gradio as gr | |
| import fitz | |
| from base64 import b64decode, b64encode | |
| from numpy import array as np_array | |
| from edge_tts import Communicate, SubMaker | |
| from concurrent.futures import ThreadPoolExecutor | |
| from PIL import Image, ImageOps | |
| from io import BytesIO | |
| from pydub import AudioSegment | |
| from moviepy import ImageSequenceClip | |
| from proglog import TqdmProgressBarLogger | |
| from deep_translator import GoogleTranslator | |
| SPECIAL_CHARS = re.compile(r"[!@#$%^&*()_+=\{\}\[\]|\\:;\"'<>,.?/~`]") | |
| def srt_string_to_obj(srt_string): | |
| return [ | |
| {"index": sub.index, "start": str(sub.start), "end": str(sub.end), "text": sub.text.replace("\n", " ")} | |
| for sub in pysrt.from_string(srt_string) | |
| ] | |
| def remove_special_chars(word): | |
| return re.sub(SPECIAL_CHARS, "", word) | |
| def tts(text, voice): | |
| submaker = SubMaker() | |
| mp3_data = b"" | |
| communicator = Communicate(text, voice) | |
| for chunk in communicator.stream_sync(): | |
| if chunk["type"] == "audio": | |
| mp3_data += chunk["data"] | |
| elif chunk["type"] == "WordBoundary": | |
| submaker.feed(chunk) | |
| srt = submaker.get_srt() | |
| return mp3_data, srt_string_to_obj(srt),srt | |
| def metadata2transcript(page_metadata, transcription): | |
| conf = [] | |
| while page_metadata and transcription: | |
| w = page_metadata.pop(0) | |
| if w["type"] == "word" and remove_special_chars(w["content"]): | |
| t = transcription.pop(0) | |
| if remove_special_chars(w["content"]) == remove_special_chars(t["text"]): | |
| w["start"], w["end"] = t["start"], t["end"] | |
| conf.append(w) | |
| return conf | |
| def tts_process_page(page_metadata, selected_voice): | |
| text_content = " ".join(i["content"] for i in page_metadata if i["type"] == "word") | |
| mp3_data, transcription,srt_file = tts(text_content, selected_voice) | |
| return { | |
| "metadata": metadata2transcript(page_metadata, transcription), | |
| "mp3data": b64encode(mp3_data).decode("utf-8"), | |
| "srt":srt_file | |
| } | |
| def book2tts(book_conf, selected_voice, progress=gr.Progress(track_tqdm=True)): | |
| with open(book_conf.name, "r", encoding="utf-8") as f: | |
| pages = json.load(f) | |
| output = [] | |
| with ThreadPoolExecutor() as executor: | |
| results = list(progress.tqdm( | |
| executor.map(lambda p: tts_process_page(p, selected_voice), pages), | |
| desc="Processing TTS", | |
| total=len(pages), # Ensure progress tracking works correctly | |
| unit="page" | |
| )) | |
| output.extend(results) | |
| output_file = "book_tts.json" | |
| with open(output_file, "w", encoding="utf-8") as f: | |
| json.dump(output, f, ensure_ascii=False) | |
| return output_file | |
| def merge_srt_files(srt_strings, durations): | |
| merged_subs = pysrt.SubRipFile() | |
| current_offset = 0 | |
| for srt_string, duration in zip(srt_strings, durations): | |
| subs = pysrt.from_string(srt_string) | |
| for sub in subs: | |
| sub.shift(seconds=current_offset) | |
| merged_subs.append(sub) | |
| current_offset += duration | |
| return merged_subs | |
| class CustomLogger(TqdmProgressBarLogger): | |
| def __init__(self, gradio_progress: gr.Progress): | |
| self.gradio_progress = gradio_progress | |
| super().__init__(print_messages=False) | |
| def bars_callback(self, bar, attr, value, old_value): | |
| if bar=='frame_index': | |
| self.gradio_progress(value / self.bars[bar]['total'],"Rendering Video",unit="Frames") | |
| def process_page(page_num, page, tts_conf, max_width, max_height): | |
| pix = page.get_pixmap(matrix=fitz.Matrix(1.5, 1.5)) | |
| img_pil = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
| img_pil = img_pil.convert("RGBA") | |
| img_pil = ImageOps.pad(img_pil, (max_width, max_height), color=(255, 255, 255)) | |
| frame = np_array(img_pil) | |
| page_tts_conf = tts_conf[page_num] | |
| try: | |
| mp3_data = b64decode(page_tts_conf["mp3data"], validate=True) | |
| audio = AudioSegment.from_file(BytesIO(mp3_data), format="mp3") | |
| except: | |
| audio = AudioSegment.silent(duration=3000) | |
| return frame, audio.duration_seconds, audio,page_tts_conf["srt"] | |
| def pdf_to_video(pdf_file, tts_file, progress=gr.Progress()): | |
| pdf_path = pdf_file.name | |
| tts_path = tts_file.name | |
| pdf = fitz.open(pdf_path) | |
| with open(tts_path, "r", encoding="utf-8") as f: | |
| tts_conf = json.load(f) | |
| max_width, max_height = 0, 0 | |
| for page in pdf: | |
| pix = page.get_pixmap(matrix=fitz.Matrix(1.5, 1.5)) | |
| max_width = max(max_width, pix.width) | |
| max_height = max(max_height, pix.height) | |
| frames, durations, audio_clips, srt_files = [], [], [], [] | |
| with ThreadPoolExecutor() as executor: | |
| results = list(progress.tqdm(executor.map(lambda x: process_page(x, pdf[x], tts_conf, max_width, max_height), range(len(pdf))), total=len(pdf), desc="Processing Pages & Audio", unit="page")) | |
| for frame, duration, audio, srt_file in results: | |
| frames.append(frame) | |
| durations.append(duration) | |
| audio_clips.append(audio) | |
| srt_files.append(srt_file) | |
| combined_audio = sum(audio_clips) | |
| combined_audio.export("merged_audio.mp3", format="mp3") | |
| ImageSequenceClip(frames, durations=durations).write_videofile( | |
| "output.mp4", | |
| fps=1, | |
| codec="libx264", | |
| audio="merged_audio.mp3", # Adds audio directly | |
| audio_codec="aac", | |
| audio_bitrate="128k", | |
| preset="slow", | |
| ffmpeg_params=["-crf", "20"], | |
| logger=CustomLogger(progress) | |
| ) | |
| merge_srt_files(srt_files, durations).save("merged_subs.srt", encoding="utf-8") | |
| return "output.mp4", "merged_subs.srt" | |
| TransLanguages = json.load(open("translate_language.json")) | |
| def translate_json(book_conf:str,source:str): | |
| texts = [i for i in ["\n".join(line.strip() for line in " ".join(i['content'] if i['type'] == 'word' else "\n" for i in con).split("\n")).strip() for con in json.load(open(book_conf.name, "r", encoding="utf-8"))]] | |
| json.dump([{"original":ot,"translate":tr} for ot,tr in zip(texts,GoogleTranslator(source=TransLanguages[source], target='en').translate_batch(texts))],open("translate.json","w"),ensure_ascii=False) | |
| return "translate.json" | |
| gr.TabbedInterface([gr.Interface(book2tts,[gr.File(label="Upload Book JSON"),gr.Dropdown(choices=[voice["Name"] for voice in json.load(open("voices.json"))], label="Select Voice")],gr.File(label="Download TTS JSON"),),gr.Interface(pdf_to_video,[gr.File(label="Upload PDF"),gr.File(label="Upload TTS JSON")],[gr.Video(label="Output Video"),gr.File(label="Subtitle File")],),gr.Interface(translate_json,[gr.File(label="Upload Book Conf JSON"),gr.Dropdown(list(TransLanguages.keys()))],gr.File(label="Translated JSON"))],["TTSMaker","VideoMaker","TranslationMaker"]).launch() |