OCRBook2TTSConf / app.py
shethjenil's picture
Update app.py
cfc35b3 verified
import json
import pysrt
import re
import gradio as gr
import fitz
from base64 import b64decode, b64encode
from numpy import array as np_array
from edge_tts import Communicate, SubMaker
from concurrent.futures import ThreadPoolExecutor
from PIL import Image, ImageOps
from io import BytesIO
from pydub import AudioSegment
from moviepy import ImageSequenceClip
from proglog import TqdmProgressBarLogger
from deep_translator import GoogleTranslator
SPECIAL_CHARS = re.compile(r"[!@#$%^&*()_+=\{\}\[\]|\\:;\"'<>,.?/~`]")
def srt_string_to_obj(srt_string):
return [
{"index": sub.index, "start": str(sub.start), "end": str(sub.end), "text": sub.text.replace("\n", " ")}
for sub in pysrt.from_string(srt_string)
]
def remove_special_chars(word):
return re.sub(SPECIAL_CHARS, "", word)
def tts(text, voice):
submaker = SubMaker()
mp3_data = b""
communicator = Communicate(text, voice)
for chunk in communicator.stream_sync():
if chunk["type"] == "audio":
mp3_data += chunk["data"]
elif chunk["type"] == "WordBoundary":
submaker.feed(chunk)
srt = submaker.get_srt()
return mp3_data, srt_string_to_obj(srt),srt
def metadata2transcript(page_metadata, transcription):
conf = []
while page_metadata and transcription:
w = page_metadata.pop(0)
if w["type"] == "word" and remove_special_chars(w["content"]):
t = transcription.pop(0)
if remove_special_chars(w["content"]) == remove_special_chars(t["text"]):
w["start"], w["end"] = t["start"], t["end"]
conf.append(w)
return conf
def tts_process_page(page_metadata, selected_voice):
text_content = " ".join(i["content"] for i in page_metadata if i["type"] == "word")
mp3_data, transcription,srt_file = tts(text_content, selected_voice)
return {
"metadata": metadata2transcript(page_metadata, transcription),
"mp3data": b64encode(mp3_data).decode("utf-8"),
"srt":srt_file
}
def book2tts(book_conf, selected_voice, progress=gr.Progress(track_tqdm=True)):
with open(book_conf.name, "r", encoding="utf-8") as f:
pages = json.load(f)
output = []
with ThreadPoolExecutor() as executor:
results = list(progress.tqdm(
executor.map(lambda p: tts_process_page(p, selected_voice), pages),
desc="Processing TTS",
total=len(pages), # Ensure progress tracking works correctly
unit="page"
))
output.extend(results)
output_file = "book_tts.json"
with open(output_file, "w", encoding="utf-8") as f:
json.dump(output, f, ensure_ascii=False)
return output_file
def merge_srt_files(srt_strings, durations):
merged_subs = pysrt.SubRipFile()
current_offset = 0
for srt_string, duration in zip(srt_strings, durations):
subs = pysrt.from_string(srt_string)
for sub in subs:
sub.shift(seconds=current_offset)
merged_subs.append(sub)
current_offset += duration
return merged_subs
class CustomLogger(TqdmProgressBarLogger):
def __init__(self, gradio_progress: gr.Progress):
self.gradio_progress = gradio_progress
super().__init__(print_messages=False)
def bars_callback(self, bar, attr, value, old_value):
if bar=='frame_index':
self.gradio_progress(value / self.bars[bar]['total'],"Rendering Video",unit="Frames")
def process_page(page_num, page, tts_conf, max_width, max_height):
pix = page.get_pixmap(matrix=fitz.Matrix(1.5, 1.5))
img_pil = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
img_pil = img_pil.convert("RGBA")
img_pil = ImageOps.pad(img_pil, (max_width, max_height), color=(255, 255, 255))
frame = np_array(img_pil)
page_tts_conf = tts_conf[page_num]
try:
mp3_data = b64decode(page_tts_conf["mp3data"], validate=True)
audio = AudioSegment.from_file(BytesIO(mp3_data), format="mp3")
except:
audio = AudioSegment.silent(duration=3000)
return frame, audio.duration_seconds, audio,page_tts_conf["srt"]
def pdf_to_video(pdf_file, tts_file, progress=gr.Progress()):
pdf_path = pdf_file.name
tts_path = tts_file.name
pdf = fitz.open(pdf_path)
with open(tts_path, "r", encoding="utf-8") as f:
tts_conf = json.load(f)
max_width, max_height = 0, 0
for page in pdf:
pix = page.get_pixmap(matrix=fitz.Matrix(1.5, 1.5))
max_width = max(max_width, pix.width)
max_height = max(max_height, pix.height)
frames, durations, audio_clips, srt_files = [], [], [], []
with ThreadPoolExecutor() as executor:
results = list(progress.tqdm(executor.map(lambda x: process_page(x, pdf[x], tts_conf, max_width, max_height), range(len(pdf))), total=len(pdf), desc="Processing Pages & Audio", unit="page"))
for frame, duration, audio, srt_file in results:
frames.append(frame)
durations.append(duration)
audio_clips.append(audio)
srt_files.append(srt_file)
combined_audio = sum(audio_clips)
combined_audio.export("merged_audio.mp3", format="mp3")
ImageSequenceClip(frames, durations=durations).write_videofile(
"output.mp4",
fps=1,
codec="libx264",
audio="merged_audio.mp3", # Adds audio directly
audio_codec="aac",
audio_bitrate="128k",
preset="slow",
ffmpeg_params=["-crf", "20"],
logger=CustomLogger(progress)
)
merge_srt_files(srt_files, durations).save("merged_subs.srt", encoding="utf-8")
return "output.mp4", "merged_subs.srt"
TransLanguages = json.load(open("translate_language.json"))
def translate_json(book_conf:str,source:str):
texts = [i for i in ["\n".join(line.strip() for line in " ".join(i['content'] if i['type'] == 'word' else "\n" for i in con).split("\n")).strip() for con in json.load(open(book_conf.name, "r", encoding="utf-8"))]]
json.dump([{"original":ot,"translate":tr} for ot,tr in zip(texts,GoogleTranslator(source=TransLanguages[source], target='en').translate_batch(texts))],open("translate.json","w"),ensure_ascii=False)
return "translate.json"
gr.TabbedInterface([gr.Interface(book2tts,[gr.File(label="Upload Book JSON"),gr.Dropdown(choices=[voice["Name"] for voice in json.load(open("voices.json"))], label="Select Voice")],gr.File(label="Download TTS JSON"),),gr.Interface(pdf_to_video,[gr.File(label="Upload PDF"),gr.File(label="Upload TTS JSON")],[gr.Video(label="Output Video"),gr.File(label="Subtitle File")],),gr.Interface(translate_json,[gr.File(label="Upload Book Conf JSON"),gr.Dropdown(list(TransLanguages.keys()))],gr.File(label="Translated JSON"))],["TTSMaker","VideoMaker","TranslationMaker"]).launch()