Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import whisper | |
| import os | |
| import tempfile | |
| from pydub import AudioSegment | |
| import math | |
| import gc # Garbage Collector interface | |
| import requests | |
| import zipfile | |
| import re | |
| from urllib.parse import urlparse | |
| # --- Helper Functions --- | |
| def format_time(seconds): | |
| """Converts seconds to SRT time format (HH:MM:SS,ms)""" | |
| hours = int(seconds / 3600) | |
| minutes = int((seconds % 3600) / 60) | |
| secs = int(seconds % 60) | |
| milliseconds = int((seconds - int(seconds)) * 1000) | |
| return f"{hours:02d}:{minutes:02d}:{secs:02d},{milliseconds:03d}" | |
| def generate_srt_from_result(result, transcription_mode): | |
| """Generates SRT content from Whisper's result dictionary.""" | |
| srt_content = [] | |
| if transcription_mode == "word": | |
| # Word-level SRT generation | |
| entry_index = 1 | |
| for segment in result["segments"]: | |
| for word_info in segment.get("words", []): | |
| start_time = format_time(word_info["start"]) | |
| end_time = format_time(word_info["end"]) | |
| text = word_info["word"].strip() | |
| if text: # Ensure we don't add empty entries | |
| srt_content.append(f"{entry_index}\n{start_time} --> {end_time}\n{text}\n") | |
| entry_index += 1 | |
| else: # Default to segment-level | |
| for i, segment in enumerate(result["segments"], 1): | |
| start_time = format_time(segment["start"]) | |
| end_time = format_time(segment["end"]) | |
| text = segment["text"].strip() | |
| if text: | |
| srt_content.append(f"{i}\n{start_time} --> {end_time}\n{text}\n") | |
| return "\n".join(srt_content) | |
| # --- Google Drive Helper Functions --- | |
| def extract_file_id_from_drive_url(url): | |
| """Extract file ID from various Google Drive URL formats""" | |
| patterns = [ | |
| r'/file/d/([a-zA-Z0-9-_]+)', | |
| r'id=([a-zA-Z0-9-_]+)', | |
| r'/d/([a-zA-Z0-9-_]+)' | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, url) | |
| if match: | |
| return match.group(1) | |
| return None | |
| def download_from_google_drive(file_id, destination): | |
| """Download file from Google Drive using file ID""" | |
| def get_confirm_token(response): | |
| for key, value in response.cookies.items(): | |
| if key.startswith('download_warning'): | |
| return value | |
| return None | |
| def save_response_content(response, destination): | |
| CHUNK_SIZE = 32768 | |
| with open(destination, "wb") as f: | |
| for chunk in response.iter_content(CHUNK_SIZE): | |
| if chunk: | |
| f.write(chunk) | |
| URL = "https://docs.google.com/uc?export=download" | |
| session = requests.Session() | |
| response = session.get(URL, params={'id': file_id}, stream=True) | |
| token = get_confirm_token(response) | |
| if token: | |
| params = {'id': file_id, 'confirm': token} | |
| response = session.get(URL, params=params, stream=True) | |
| save_response_content(response, destination) | |
| def extract_zip_and_get_video_files(zip_path, extract_dir): | |
| """Extract zip file and return list of video files""" | |
| video_extensions = {'.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv', '.webm', '.m4v'} | |
| video_files = [] | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| zip_ref.extractall(extract_dir) | |
| # Find all video files in extracted content | |
| for root, dirs, files in os.walk(extract_dir): | |
| for file in files: | |
| if any(file.lower().endswith(ext) for ext in video_extensions): | |
| video_files.append(os.path.join(root, file)) | |
| return video_files | |
| def process_google_drive_zip(drive_url, temp_dir): | |
| """Download and extract Google Drive zip, return video files""" | |
| # Extract file ID from URL | |
| file_id = extract_file_id_from_drive_url(drive_url) | |
| if not file_id: | |
| raise ValueError("Invalid Google Drive URL. Please ensure it's a valid shareable link.") | |
| # Download zip file | |
| zip_path = os.path.join(temp_dir, "downloaded.zip") | |
| download_from_google_drive(file_id, zip_path) | |
| # Extract and find video files | |
| extract_dir = os.path.join(temp_dir, "extracted") | |
| os.makedirs(extract_dir, exist_ok=True) | |
| video_files = extract_zip_and_get_video_files(zip_path, extract_dir) | |
| if not video_files: | |
| raise ValueError("No video files found in the zip archive.") | |
| return video_files | |
| # --- New Function for Advanced Mode --- | |
| def process_advanced_segments(full_result, max_words): | |
| """ | |
| Post-processes segments for Word-level Advanced mode. | |
| Groups words into new segments with <= max_words per segment, splitting at nearest punctuation. | |
| Adjusts timestamps based on actual word times (or proportional if needed). | |
| Optimized: Single pass with limited lookahead. | |
| """ | |
| # Define punctuation for natural splits | |
| punctuation = {'.', '!', '?', ';', ',', '--'} | |
| # Flatten all words into a single list for continuous processing | |
| all_words = [] | |
| for segment in full_result["segments"]: | |
| all_words.extend(segment.get("words", [])) | |
| if not all_words: | |
| return full_result # Nothing to process | |
| new_segments = [] | |
| current_words = [] | |
| i = 0 | |
| while i < len(all_words): | |
| current_words.append(all_words[i]) | |
| if len(current_words) >= max_words: | |
| # Find nearest punctuation for split | |
| split_index = -1 | |
| # Look backward in current words for last punctuation | |
| for j in range(len(current_words) - 1, -1, -1): | |
| word_text = current_words[j]["word"].strip() | |
| if word_text[-1] in punctuation: | |
| split_index = j + 1 # Split after this word | |
| break | |
| # If none, look forward in next words (limited lookahead to optimize) | |
| if split_index == -1: | |
| lookahead_end = min(i + 1 + 10, len(all_words)) # Cap lookahead for efficiency | |
| for j in range(i + 1, lookahead_end): | |
| word_text = all_words[j]["word"].strip() | |
| current_words.append(all_words[j]) # Temporarily add to current | |
| i += 1 # Advance i as we add | |
| if word_text[-1] in punctuation: | |
| split_index = len(current_words) # Split after this added word | |
| break | |
| # Fallback: Split at max_words if no punctuation found | |
| if split_index == -1: | |
| split_index = max_words | |
| # Create new segment for current group up to split | |
| group_words = current_words[:split_index] | |
| if group_words: | |
| text = " ".join(w["word"].strip() for w in group_words) | |
| start = group_words[0]["start"] | |
| end = group_words[-1]["end"] | |
| new_segments.append({"start": start, "end": end, "text": text, "words": group_words}) | |
| # Remaining words become start of next group (timestamp adjustment: shifted to next) | |
| current_words = current_words[split_index:] | |
| i += 1 | |
| # Add any remaining words as last segment | |
| if current_words: | |
| text = " ".join(w["word"].strip() for w in current_words) | |
| start = current_words[0]["start"] | |
| end = current_words[-1]["end"] | |
| new_segments.append({"start": start, "end": end, "text": text, "words": current_words}) | |
| # Handle rare case: If no word timestamps, fall back to proportional adjustment | |
| for seg in new_segments: | |
| if "words" not in seg or not seg["words"]: | |
| # Proportional split (as per your description: adjust based on word count ratio) | |
| orig_start = seg["start"] | |
| orig_end = seg["end"] | |
| word_count = len(seg["text"].split()) | |
| if word_count > max_words: | |
| ratio = max_words / word_count | |
| split_time = orig_start + (orig_end - orig_start) * ratio | |
| seg["end"] = split_time # Minus from current | |
| # Next segment would start at split_time (but since we're rebuilding, it's handled in loop) | |
| # Replace original segments with new ones | |
| full_result["segments"] = new_segments | |
| return full_result | |
| # --- Main Transcription Logic --- | |
| def transcribe_video(video_path, drive_url, model_name, transcription_mode, chunk_length_min, max_words): | |
| """ | |
| Transcribes video file(s) - either uploaded directly or from Google Drive zip. | |
| """ | |
| # Determine input source | |
| if drive_url and drive_url.strip(): | |
| if video_path is not None: | |
| return "Please provide either a video file OR a Google Drive URL, not both.", None | |
| input_source = "drive" | |
| yield "Processing Google Drive URL...", None | |
| elif video_path is not None: | |
| input_source = "upload" | |
| yield "Processing uploaded video...", None | |
| else: | |
| return "Please upload a video file or provide a Google Drive zip URL.", None | |
| yield "Loading model...", None | |
| # Load the Whisper model | |
| try: | |
| model = whisper.load_model(model_name) | |
| except Exception as e: | |
| return f"Error loading model: {e}", None | |
| yield f"Model '{model_name}' loaded.", None | |
| # Use a temporary directory for all our files | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| try: | |
| # Get video file(s) based on input source | |
| if input_source == "drive": | |
| yield "Downloading and extracting from Google Drive...", None | |
| video_files = process_google_drive_zip(drive_url.strip(), temp_dir) | |
| yield f"Found {len(video_files)} video file(s) in zip archive.", None | |
| # For simplicity, process the first video file found | |
| # You could modify this to process all files if needed | |
| current_video_path = video_files[0] | |
| if len(video_files) > 1: | |
| yield f"Multiple videos found. Processing: {os.path.basename(current_video_path)}", None | |
| else: | |
| current_video_path = video_path | |
| yield "Extracting audio...", None | |
| # Extract audio from video using pydub | |
| audio_path = os.path.join(temp_dir, "extracted_audio.wav") | |
| try: | |
| video = AudioSegment.from_file(current_video_path) | |
| # Export as WAV, 16kHz, mono - ideal for Whisper | |
| video.set_channels(1).set_frame_rate(16000).export(audio_path, format="wav") | |
| audio = AudioSegment.from_wav(audio_path) | |
| except Exception as e: | |
| return f"Error processing video/audio: {e}", None | |
| # --- Chunking Logic --- | |
| chunk_length_ms = chunk_length_min * 60 * 1000 | |
| num_chunks = math.ceil(len(audio) / chunk_length_ms) | |
| full_result = {"segments": []} | |
| yield f"Audio extracted. Splitting into {num_chunks} chunk(s) of {chunk_length_min} min...", None | |
| for i in range(num_chunks): | |
| start_ms = i * chunk_length_ms | |
| end_ms = start_ms + chunk_length_ms | |
| chunk = audio[start_ms:end_ms] | |
| chunk_path = os.path.join(temp_dir, f"chunk_{i}.wav") | |
| chunk.export(chunk_path, format="wav") | |
| yield f"Transcribing chunk {i+1}/{num_chunks}...", None | |
| # Determine if word-level timestamps are needed | |
| should_get_word_timestamps = (transcription_mode in ["Word-level", "Word-level Advanced"]) | |
| # Transcribe the chunk | |
| try: | |
| result = model.transcribe( | |
| chunk_path, | |
| word_timestamps=should_get_word_timestamps, | |
| fp16=False # Set to False for CPU-only inference | |
| ) | |
| except Exception as e: | |
| # Clean up and report error | |
| del model | |
| gc.collect() | |
| return f"Error during transcription of chunk {i+1}: {e}", None | |
| # --- Timestamp Correction --- | |
| # Add the chunk's start time to all timestamps in the result | |
| time_offset_s = start_ms / 1000.0 | |
| for segment in result["segments"]: | |
| segment["start"] += time_offset_s | |
| segment["end"] += time_offset_s | |
| if "words" in segment: | |
| for word_info in segment["words"]: | |
| word_info["start"] += time_offset_s | |
| word_info["end"] += time_offset_s | |
| full_result["segments"].append(segment) | |
| # Clean up the chunk file immediately | |
| os.remove(chunk_path) | |
| # Clean up the model from memory to be safe | |
| del model | |
| gc.collect() | |
| # --- New: Process for Advanced Mode --- | |
| if transcription_mode == "Word-level Advanced": | |
| yield "Processing advanced word-level grouping...", None | |
| full_result = process_advanced_segments(full_result, max_words) | |
| yield "All chunks transcribed. Generating SRT file...", None | |
| # Generate the final SRT file from the combined results | |
| # For Advanced mode, force segment-level generation (grouped lines) | |
| srt_mode = "segment" if transcription_mode == "Word-level Advanced" else transcription_mode | |
| if transcription_mode == "Word-level": | |
| srt_mode = "word" | |
| srt_output = generate_srt_from_result(full_result, srt_mode) | |
| # Create a final SRT file in the temp directory to be returned by Gradio | |
| srt_file_path = os.path.join(temp_dir, "output.srt") | |
| with open(srt_file_path, "w", encoding="utf-8") as srt_file: | |
| srt_file.write(srt_output) | |
| yield "Done!", srt_file_path | |
| except Exception as e: | |
| return f"Error: {e}", None | |
| # --- Gradio UI --- | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown( | |
| """ | |
| # Whisper Video Transcriber π₯ -> π | |
| Upload a video, provide a Google Drive zip URL, choose your settings, and get a timed SRT subtitle file. | |
| This app handles large videos by automatically splitting them into manageable chunks. | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Input Source (choose one):") | |
| video_input = gr.Video(label="Upload Video File") | |
| gr.Markdown("**OR**") | |
| drive_url_input = gr.Textbox( | |
| label="Google Drive Zip URL", | |
| placeholder="https://drive.google.com/file/d/your-file-id/view?usp=sharing", | |
| info="Paste a public Google Drive link to a zip file containing video(s)" | |
| ) | |
| gr.Markdown("### Settings:") | |
| model_name = gr.Radio( | |
| ["tiny.en", "base.en"], | |
| label="Whisper Model", | |
| value="base.en", | |
| info="`tiny.en` is faster, `base.en` is more accurate." | |
| ) | |
| transcription_mode = gr.Radio( | |
| ["Segment-level", "Word-level", "Word-level Advanced"], # Added new mode | |
| label="Transcription Granularity", | |
| value="Segment-level", | |
| info="Word-level is more detailed but may be slightly slower. Word-level Advanced groups into lines with max words, splitting at punctuation." | |
| ) | |
| chunk_length_min = gr.Slider( | |
| minimum=5, | |
| maximum=20, | |
| value=10, | |
| step=1, | |
| label="Chunk Length (minutes)", | |
| info="Shorter chunks use less RAM but may be slightly less accurate at boundaries." | |
| ) | |
| max_words = gr.Slider( # New input for max_words | |
| minimum=5, | |
| maximum=30, | |
| value=10, | |
| step=1, | |
| label="Max Words per Line (Advanced mode only)", | |
| info="For Word-level Advanced: Limits words per subtitle line, splitting intelligently at punctuation." | |
| ) | |
| submit_button = gr.Button("Transcribe Video", variant="primary") | |
| with gr.Column(): | |
| status_output = gr.Textbox(label="Status", interactive=False, lines=5) | |
| srt_output_file = gr.File(label="Download SRT File") | |
| submit_button.click( | |
| fn=transcribe_video, | |
| inputs=[video_input, drive_url_input, model_name, transcription_mode, chunk_length_min, max_words], # Added drive_url_input | |
| outputs=[status_output, srt_output_file] | |
| ) | |
| gr.Markdown( | |
| """ | |
| ### How to Use | |
| 1. **Choose input method:** Either upload a video file OR provide a Google Drive zip URL (not both). | |
| 2. **For Google Drive:** Share your zip file publicly and paste the link. The zip should contain video files. | |
| 3. **Select a Whisper model.** For English, `base.en` provides a great balance of speed and accuracy. | |
| 4. **Choose the granularity.** 'Segment-level' is good for standard subtitles. 'Word-level' is great for karaoke-style highlighting. 'Word-level Advanced' groups into optimized subtitle lines. | |
| 5. **Click 'Transcribe Video'.** The status box will show the progress. | |
| 6. **Download the SRT file** when the process is complete. You can open this file in any text editor or load it into a video player like VLC. | |
| ### Google Drive Setup | |
| - Upload your video files in a zip archive to Google Drive | |
| - Right-click the zip file β Share β Change to "Anyone with the link" | |
| - Copy and paste the share link into the URL field above | |
| """ | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(debug=True) | |