Spaces:
Runtime error
Runtime error
| import whisper | |
| import gradio as gr | |
| import ffmpeg | |
| from yt_dlp import YoutubeDL | |
| import os | |
| import sys | |
| from subprocess import PIPE, run | |
| youtube_livestream_codes = [ | |
| 91, | |
| 92, | |
| 93, | |
| 94, | |
| 95, | |
| 96, | |
| 300, | |
| 301, | |
| ] | |
| youtube_mp4_codes = [ | |
| 298, | |
| 18, | |
| 22, | |
| 140, | |
| 133, | |
| 134 | |
| ] | |
| def second_to_timecode(x: float) -> str: | |
| hour, x = divmod(x, 3600) | |
| minute, x = divmod(x, 60) | |
| second, x = divmod(x, 1) | |
| millisecond = int(x * 1000.) | |
| return '%.2d:%.2d:%.2d,%.3d' % (hour, minute, second, millisecond) | |
| def get_video_metadata(video_url: str = "https://www.youtube.com/watch?v=21X5lGlDOfg&ab_channel=NASA")-> dict: | |
| with YoutubeDL({'outtmpl': '%(id)s.%(ext)s'}) as ydl: | |
| info_dict = ydl.extract_info(video_url, download=False) | |
| video_title = info_dict.get('title', None) | |
| uploader_id = info_dict.get('uploader_id', None) | |
| print(f"[youtube] {video_title}: {uploader_id}") | |
| return info_dict | |
| def parse_metadata(metadata) -> dict: | |
| """ | |
| Parse metadata and send to discord. | |
| After a video is done recording, | |
| it will have both the livestream format and the mp4 format. | |
| """ | |
| # send metadata to discord | |
| formats = metadata.get("formats", []) | |
| # filter for ext = mp4 | |
| mp4_formats = [f for f in formats if f.get("ext", "") == "mp4"] | |
| try: | |
| format_ids = [int(f.get("format_id", 0)) for f in mp4_formats] | |
| video_entries = sorted(set(format_ids).intersection(youtube_mp4_codes)) | |
| is_livestream = True | |
| if len(video_entries) > 0: | |
| # use video format id over livestream id if available | |
| selected_id = video_entries[0] | |
| is_livestream = False | |
| except Exception as e: | |
| print(e) | |
| selected_id = mp4_formats[0].get("format_id") | |
| is_livestream = False | |
| return { | |
| "selected_id": selected_id, | |
| "is_livestream": is_livestream, | |
| } | |
| def get_video(url: str, config: dict): | |
| """ | |
| Get video from start time. | |
| """ | |
| # result = subprocess.run() | |
| # could delay start time by a few seconds to just sync up and capture the full video length | |
| # but would need to time how long it takes to fetch the video using youtube-dl and other adjustments and start a bit before | |
| filename = config.get("filename", "livestream01.mp4") | |
| end = config.get("end", "00:15:00") | |
| overlay_file = ffmpeg.input(filename) | |
| ( | |
| ffmpeg | |
| .input(url, t=end) | |
| .output(filename) | |
| .run() | |
| ) | |
| def get_all_files(url: str, end: str = "00:15:00"): | |
| metadata = get_video_metadata(url) | |
| temp_dict = parse_metadata(metadata) | |
| selected_id = temp_dict.get("selected_id", 0) | |
| formats = metadata.get("formats", []) | |
| selected_format = [f for f in formats if f.get("format_id", "") == str(selected_id)][0] | |
| format_url = selected_format.get("url", "") | |
| filename = "temp.mp4" | |
| get_video(format_url, {"filename": filename, "end": end}) | |
| return filename | |
| def get_text_from_mp3_whisper(inputType:str, mp3_file: str, url_path: str, taskName: str, srcLanguage: str)->str: | |
| # remove the file if it exists | |
| if os.path.exists("transcript.srt"): | |
| os.remove("transcript.srt") | |
| if os.path.exists("temp.mp4"): | |
| os.remove("temp.mp4") | |
| if os.path.exists("subtitled.mp4"): | |
| os.remove("subtitled.mp4") | |
| model = whisper.load_model("medium") | |
| # options = whisper.DecodingOptions(language="en", without_timestamps=True) | |
| options = dict(language=srcLanguage) | |
| transcribe_options = dict(task=taskName, **options) | |
| # return if url_path is not set, taskName is not set, srcLanguage is not set | |
| if inputType == "url": | |
| filename = get_all_files(url_path) | |
| print("Retrieved the file") | |
| result = model.transcribe(filename, **transcribe_options) | |
| print("transcribing the file") | |
| else: | |
| result = model.transcribe(mp3_file, **transcribe_options) | |
| # adjust for spacy mode | |
| html_text = "" | |
| lines = [] | |
| for count, segment in enumerate(result.get("segments")): | |
| # print(segment) | |
| start = segment.get("start") | |
| end = segment.get("end") | |
| lines.append(f"{count}") | |
| lines.append(f"{second_to_timecode(start)} --> {second_to_timecode(end)}") | |
| lines.append(segment.get("text", "").strip()) | |
| lines.append('') | |
| words = '\n'.join(lines) | |
| # save to transcript.srt | |
| with open("transcript.srt", "w") as f: | |
| f.write(words) | |
| print("done transcribing") | |
| input_file = 'temp.mp4' | |
| subtitles_file = 'transcript.srt' | |
| output_file = 'subtitled.mp4' | |
| try: | |
| print("attempt to output file") | |
| video = ffmpeg.input(input_file) | |
| audio = video.audio | |
| ffmpeg.concat(video.filter("subtitles", subtitles_file), audio, v=1, a=1).output(output_file).run() | |
| except Exception as e: | |
| print("failed to output file") | |
| print(e) | |
| output_file = "temp.mp4" | |
| # return temp.mp4 | |
| return result.get("segments"), words, output_file | |
| gr.Interface( | |
| title = 'Download Video From url and extract text from audio', | |
| fn=get_text_from_mp3_whisper, | |
| inputs=[ | |
| gr.Dropdown(["url", "file"], value="url"), | |
| gr.inputs.Audio(type="filepath"), | |
| gr.inputs.Textbox(), | |
| gr.Dropdown(["translate", "transcribe"], value="translate"), | |
| gr.Dropdown(["Japanese", "English"], value="Japanese") | |
| ], | |
| button_text="Go!", | |
| button_color="#333333", | |
| outputs=[ | |
| "json", "text", "file" | |
| ], | |
| live=True).launch() |