Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import json | |
| import asyncio | |
| import aiohttp | |
| import requests | |
| from tqdm.asyncio import tqdm | |
| API_BASE_URL = os.environ.get("DUBPRO_API_BASE_URL") | |
| def fetch_data_from_db(_id, details="GET_ADVANCE_DETAILS"): | |
| API_URL = f"{API_BASE_URL}/video-service/videos/{_id}/get_details" | |
| headers = { | |
| "Accept": "*/*", | |
| "Authorization": os.environ.get("DUBPRO_API_HEADER_KEY") | |
| } | |
| params = {"type": details} | |
| response = requests.get(API_URL, headers=headers, params=params) | |
| if response.status_code == 200: | |
| data = response.json() | |
| return data["data"] | |
| else: | |
| raise Exception(response.text) | |
| async def generate_audio(session, video_id, segment_id, text, progress_bar): | |
| payload = {"type": "EDIT_TEXT", "updatedText": text} | |
| headers = {"Authorization": os.environ.get("DUBPRO_API_HEADER_KEY")} | |
| API_URL = f"{API_BASE_URL}/video-service/videos/{video_id}/segment/{segment_id}" | |
| async with session.put(API_URL, headers=headers, json=payload) as response: | |
| if response.status != 200: | |
| print(segment_id, text, await response.text()) | |
| progress_bar.update(1) | |
| async def generate_audios(video_id, segments=[]): | |
| async with aiohttp.ClientSession() as session: | |
| progress_bar = tqdm(total=len(segments), desc="Generating Audios", unit="segment") | |
| tasks = [ | |
| generate_audio(session, video_id, segment["id"], segment["updatedText"], progress_bar) | |
| for segment in segments | |
| ] | |
| await asyncio.gather(*tasks) | |
| progress_bar.close() | |
| def perform_generation(video_id, gen_choice, progress_bar=gr.Progress(track_tqdm=True)): | |
| data = fetch_data_from_db(video_id) | |
| segments = data["targetTranscriptData"] | |
| if gen_choice == "Generate only non-generated segments": | |
| segments = [s for s in segments if s["audioUrl"] is None or "updated" not in s["audioUrl"]] | |
| elif gen_choice == "Re-generate generated only segments": | |
| segments = [s for s in segments if s["audioUrl"] is not None and "updated" in s["audioUrl"]] | |
| asyncio.run(generate_audios(video_id, segments=segments)) | |
| return "Done" | |
| async def update_tempo(session, video_id, segment_id, tempo): | |
| payload = {"type": "EDIT_SPEED", "tempo": tempo} | |
| headers = {"Authorization": os.environ.get("DUBPRO_API_HEADER_KEY")} | |
| API_URL = f"{API_BASE_URL}/video-service/videos/{video_id}/segment/{segment_id}" | |
| async with session.put(API_URL, headers=headers, json=payload) as response: | |
| if response.status != 200: | |
| print(await response.text()) | |
| async def process_segment(session, video_id, segment, progress_bar): | |
| WARNING_BOUNDARY = 0.1 | |
| duration = round(segment["endTimestamp"] - segment["startTimestamp"], 3) | |
| if WARNING_BOUNDARY <= abs(duration - segment["audioDuration"]) <= (WARNING_BOUNDARY + 2.5): | |
| if duration - segment["audioDuration"] > 0: | |
| # Gap case | |
| tempo = round(segment["audioDuration"] / duration, 3) | |
| else: | |
| tempo = round(segment["audioDuration"] / (duration + 0.1), 3) | |
| await update_tempo(session, video_id, segment["id"], tempo) | |
| progress_bar.update(1) | |
| async def update_tempos(video_id, segments=[]): | |
| async with aiohttp.ClientSession() as session: | |
| progress_bar = tqdm(total=len(segments), desc="Processing Segments", unit="segment") | |
| tasks = [ | |
| process_segment(session, video_id, segment, progress_bar) | |
| for segment in segments | |
| ] | |
| await asyncio.gather(*tasks) | |
| progress_bar.close() | |
| def perform_tempo_adjustment(video_id, gen_choice, progress_bar=gr.Progress(track_tqdm=True)): | |
| data = fetch_data_from_db(video_id) | |
| segments = data["targetTranscriptData"] | |
| if gen_choice == "Generate only non-generated segments": | |
| segments = [s for s in segments if s["audioUrl"] is None or "updated" not in s["audioUrl"]] | |
| elif gen_choice == "Re-generate generated only segments": | |
| segments = [s for s in segments if s["audioUrl"] is not None and "updated" in s["audioUrl"]] | |
| asyncio.run(update_tempos(video_id, segments=segments)) | |
| return "Done" | |
| def check_details(video_id): | |
| data = fetch_data_from_db(video_id) | |
| segments = data["targetTranscriptData"] | |
| results = json.dumps(segments[0]) + "\n\n" + json.dumps(segments[-1]) | |
| return results, gr.update(visible=True) | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Generate Audio") | |
| video_id_txt = gr.Text(label="Video ID") | |
| gen_choice_radio = gr.Radio(["Generate only non-generated segments", "Re-generate generated only segments", "Generate all the segments"], label="Audio generation style", value="Generate only non-generated segments") | |
| check_settings_btn = gr.Button("Check settings") | |
| logs_txt = gr.Textbox(label="Logs") | |
| generate_btn = gr.Button("Generate audio", visible=False) | |
| check_settings_btn.click(check_details, video_id_txt, [logs_txt, generate_btn]) | |
| generate_btn.click(perform_generation, [video_id_txt, gen_choice_radio], logs_txt) | |
| gr.Markdown("## Update Tempo [SAH]") | |
| update_tempo_btn = gr.Button("Adjust tempo") | |
| update_tempo_btn.click(perform_tempo_adjustment, [video_id_txt, gen_choice_radio], logs_txt) | |
| if __name__=="__main__": | |
| demo.queue().launch(auth=(os.environ.get("GRADIO_USERNAME"), os.environ.get("GRADIO_PASSWORD"))) |