| import gradio as gr |
| import json |
| from difflib import Differ |
| import ffmpeg |
| import os |
| from pathlib import Path |
| import time |
| import aiohttp |
| import asyncio |
|
|
|
|
| |
| API_BACKEND = True |
| |
| |
| MODEL = "facebook/wav2vec2-base-960h" |
| |
| if API_BACKEND: |
| from dotenv import load_dotenv |
| import base64 |
| import asyncio |
| load_dotenv(Path(".env")) |
|
|
| HF_TOKEN = os.environ["HF_TOKEN"] |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} |
| API_URL = f'https://api-inference.huggingface.co/models/{MODEL}' |
|
|
| else: |
| import torch |
| from transformers import pipeline |
|
|
| |
| cuda = torch.device( |
| 'cuda:0') if torch.cuda.is_available() else torch.device('cpu') |
| device = 0 if torch.cuda.is_available() else -1 |
| speech_recognizer = pipeline( |
| task="automatic-speech-recognition", |
| model=f'{MODEL}', |
| tokenizer=f'{MODEL}', |
| framework="pt", |
| device=device, |
| ) |
|
|
| videos_out_path = Path("./videos_out") |
| videos_out_path.mkdir(parents=True, exist_ok=True) |
|
|
| samples_data = sorted(Path('examples').glob('*.json')) |
| SAMPLES = [] |
| for file in samples_data: |
| with open(file) as f: |
| sample = json.load(f) |
| SAMPLES.append(sample) |
| VIDEOS = list(map(lambda x: [x['video']], SAMPLES)) |
|
|
| total_inferences_since_reboot = 415 |
| total_cuts_since_reboot = 1539 |
|
|
|
|
| async def speech_to_text(video_file_path): |
| """ |
| Takes a video path to convert to audio, transcribe audio channel to text and char timestamps |
| |
| Using https://huggingface.co/tasks/automatic-speech-recognition pipeline |
| """ |
| global total_inferences_since_reboot |
| if (video_file_path == None): |
| raise ValueError("Error no video input") |
|
|
| video_path = Path(video_file_path) |
| try: |
| |
| audio_memory, _ = ffmpeg.input(video_path).output( |
| '-', format="wav", ac=1, ar='16k').overwrite_output().global_args('-loglevel', 'quiet').run(capture_stdout=True) |
| except Exception as e: |
| raise RuntimeError("Error converting video to audio") |
|
|
| ping("speech_to_text") |
| last_time = time.time() |
| if API_BACKEND: |
| |
| |
| for i in range(10): |
| for tries in range(4): |
| print(f'Transcribing from API attempt {tries}') |
| try: |
| inference_reponse = await query_api(audio_memory) |
| print(inference_reponse) |
| transcription = inference_reponse["text"].lower() |
| timestamps = [[chunk["text"].lower(), chunk["timestamp"][0], chunk["timestamp"][1]] |
| for chunk in inference_reponse['chunks']] |
|
|
| total_inferences_since_reboot += 1 |
| print("\n\ntotal_inferences_since_reboot: ", |
| total_inferences_since_reboot, "\n\n") |
| return (transcription, transcription, timestamps) |
| except Exception as e: |
| print(e) |
| if 'error' in inference_reponse and 'estimated_time' in inference_reponse: |
| wait_time = inference_reponse['estimated_time'] |
| print("Waiting for model to load....", wait_time) |
| |
| |
| await asyncio.sleep(wait_time + 5.0) |
| elif 'error' in inference_reponse: |
| raise RuntimeError("Error Fetching API", |
| inference_reponse['error']) |
| else: |
| break |
| else: |
| raise RuntimeError(inference_reponse, "Error Fetching API") |
| else: |
|
|
| try: |
| print(f'Transcribing via local model') |
| output = speech_recognizer( |
| audio_memory, return_timestamps="char", chunk_length_s=10, stride_length_s=(4, 2)) |
|
|
| transcription = output["text"].lower() |
| timestamps = [[chunk["text"].lower(), chunk["timestamp"][0].tolist(), chunk["timestamp"][1].tolist()] |
| for chunk in output['chunks']] |
| total_inferences_since_reboot += 1 |
|
|
| print("\n\ntotal_inferences_since_reboot: ", |
| total_inferences_since_reboot, "\n\n") |
| return (transcription, transcription, timestamps) |
| except Exception as e: |
| raise RuntimeError("Error Running inference with local model", e) |
|
|
|
|
| async def cut_timestamps_to_video(video_in, transcription, text_in, timestamps): |
| """ |
| Given original video input, text transcript + timestamps, |
| and edit ext cuts video segments into a single video |
| """ |
| global total_cuts_since_reboot |
|
|
| video_path = Path(video_in) |
| video_file_name = video_path.stem |
| if (video_in == None or text_in == None or transcription == None): |
| raise ValueError("Inputs undefined") |
|
|
| d = Differ() |
| |
| diff_chars = d.compare(transcription, text_in) |
| |
| filtered = list(filter(lambda x: x[0] != '+', diff_chars)) |
|
|
| |
| |
| |
|
|
| |
| idx = 0 |
| grouped = {} |
| for (a, b) in zip(filtered, timestamps): |
| if a[0] != '-': |
| if idx in grouped: |
| grouped[idx].append(b) |
| else: |
| grouped[idx] = [] |
| grouped[idx].append(b) |
| else: |
| idx += 1 |
|
|
| |
| timestamps_to_cut = [[v[0][1], v[-1][2]] for v in grouped.values()] |
|
|
| between_str = '+'.join( |
| map(lambda t: f'between(t,{t[0]},{t[1]})', timestamps_to_cut)) |
|
|
| if timestamps_to_cut: |
| video_file = ffmpeg.input(video_in) |
| video = video_file.video.filter( |
| "select", f'({between_str})').filter("setpts", "N/FRAME_RATE/TB") |
| audio = video_file.audio.filter( |
| "aselect", f'({between_str})').filter("asetpts", "N/SR/TB") |
|
|
| output_video = f'./videos_out/{video_file_name}.mp4' |
| ffmpeg.concat(video, audio, v=1, a=1).output( |
| output_video).overwrite_output().global_args('-loglevel', 'quiet').run() |
| else: |
| output_video = video_in |
|
|
| tokens = [(token[2:], token[0] if token[0] != " " else None) |
| for token in filtered] |
|
|
| total_cuts_since_reboot += 1 |
| ping("video_cuts") |
| print("\n\ntotal_cuts_since_reboot: ", total_cuts_since_reboot, "\n\n") |
| return (tokens, output_video) |
|
|
|
|
| async def query_api(audio_bytes: bytes): |
| """ |
| Query for Huggingface Inference API for Automatic Speech Recognition task |
| """ |
| payload = json.dumps({ |
| "inputs": base64.b64encode(audio_bytes).decode("utf-8"), |
| "parameters": { |
| "return_timestamps": "char", |
| "chunk_length_s": 10, |
| "stride_length_s": [4, 2] |
| }, |
| "options": {"use_gpu": False} |
| }).encode("utf-8") |
| async with aiohttp.ClientSession() as session: |
| async with session.post(API_URL, headers=headers, data=payload) as response: |
| print("API Response: ", response.status) |
| if response.headers['Content-Type'] == 'application/json': |
| return await response.json() |
| elif response.headers['Content-Type'] == 'application/octet-stream': |
| return await response.read() |
| elif response.headers['Content-Type'] == 'text/plain': |
| return await response.text() |
| else: |
| raise RuntimeError("Error Fetching API") |
|
|
|
|
| def ping(name): |
| url = f'https://huggingface.co/api/telemetry/spaces/radames/edit-video-by-editing-text/{name}' |
| print("ping: ", url) |
|
|
| async def req(): |
| async with aiohttp.ClientSession() as session: |
| async with session.get(url) as response: |
| print("pong: ", response.status) |
| asyncio.create_task(req()) |
|
|
|
|
| |
| video_in = gr.Video(label="Video file", elem_id="video-container") |
| text_in = gr.Textbox(label="Transcription", lines=10, interactive=True) |
| video_out = gr.Video(label="Video Out") |
| diff_out = gr.HighlightedText(label="Cuts Diffs", combine_adjacent=True) |
| examples = gr.Dataset(components=[video_in], samples=VIDEOS, type="index") |
|
|
| css = """ |
| footer { |
| visibility: hidden; |
| } |
| """ |
|
|
| with gr.Blocks(css=css) as demo: |
| transcription_var = gr.State() |
| timestamps_var = gr.State() |
|
|
| with gr.Row(): |
|
|
| examples.render() |
|
|
| def load_example(id): |
| video = SAMPLES[id]['video'] |
| transcription = SAMPLES[id]['transcription'].lower() |
| timestamps = SAMPLES[id]['timestamps'] |
|
|
| return (video, transcription, transcription, timestamps) |
|
|
| examples.click( |
| load_example, |
| inputs=[examples], |
| outputs=[video_in, text_in, transcription_var, timestamps_var], |
| queue=False) |
| with gr.Row(): |
| with gr.Column(): |
| video_in.render() |
| transcribe_btn = gr.Button("Transcribe Audio") |
| transcribe_btn.click(speech_to_text, [video_in], [ |
| text_in, transcription_var, timestamps_var]) |
|
|
| with gr.Row(): |
| gr.Markdown(""" |
| ### Now edit as text |
| After running the video transcription, you can make cuts to the text below (only cuts, not additions!)""") |
|
|
| with gr.Row(): |
| with gr.Column(): |
| text_in.render() |
| with gr.Row(): |
| cut_btn = gr.Button("Cut to video", elem_id="cut_btn") |
| |
| cut_btn.click(cut_timestamps_to_video, [ |
| video_in, transcription_var, text_in, timestamps_var], [diff_out, video_out]) |
|
|
| reset_transcription = gr.Button( |
| "Reset to last trascription", elem_id="reset_btn") |
| reset_transcription.click( |
| lambda x: x, transcription_var, text_in) |
| with gr.Column(): |
| video_out.render() |
| diff_out.render() |
|
|
| demo.queue() |
|
|
| if __name__ == "__main__": |
| demo.launch(debug=True) |
|
|