import os import tempfile import subprocess import json import re from typing import List, Dict, Optional, Tuple, Generator import gradio as gr try: import spaces HAS_SPACES = True except ImportError: HAS_SPACES = False import torch import numpy as np MODEL_PATH = "microsoft/VibeVoice-ASR" model = None processor = None def get_model(): global model, processor if model is None: from vibevoice.modular.modeling_vibevoice_asr import VibeVoiceASRForConditionalGeneration from vibevoice.processor.vibevoice_asr_processor import VibeVoiceASRProcessor processor = VibeVoiceASRProcessor.from_pretrained(MODEL_PATH) model = VibeVoiceASRForConditionalGeneration.from_pretrained( MODEL_PATH, dtype=torch.bfloat16, device_map="auto", trust_remote_code=True ) model.eval() return model, processor def transcribe_audio_inner(audio_path: str) -> List[Dict]: model, processor = get_model() device = next(model.parameters()).device inputs = processor( audio=audio_path, sampling_rate=16000, return_tensors="pt", add_generation_prompt=True, ) inputs = {k: v.to(device) if isinstance(v, torch.Tensor) else v for k, v in inputs.items()} with torch.no_grad(): output_ids = model.generate( **inputs, max_new_tokens=8192, temperature=None, do_sample=False, num_beams=1, pad_token_id=processor.pad_id, eos_token_id=processor.tokenizer.eos_token_id, ) generated_ids = output_ids[0, inputs['input_ids'].shape[1]:] generated_text = processor.decode(generated_ids, skip_special_tokens=True) try: segments = processor.post_process_transcription(generated_text) except Exception: segments = parse_raw_transcript(generated_text) return segments def parse_raw_transcript(text: str) -> List[Dict]: segments = [] pattern = r'\[(\d+\.?\d*)\s*-\s*(\d+\.?\d*)\]\s*(?:\[([^\]]*)\])?\s*(.+?)(?=\[\d+\.?\d*\s*-|\Z)' matches = re.findall(pattern, text, re.DOTALL) for match in matches: start, end, speaker, content = match segments.append({ 'start': float(start), 'end': float(end), 'speaker': speaker.strip() if speaker else 'Speaker', 'text': content.strip() }) if not segments and text.strip(): sentences = re.split(r'(?<=[.!?])\s+', text.strip()) duration_per_sentence = 3.0 for i, sentence in enumerate(sentences): if sentence.strip(): segments.append({ 'start': i * duration_per_sentence, 'end': (i + 1) * duration_per_sentence, 'speaker': 'Speaker', 'text': sentence.strip() }) return segments if HAS_SPACES: @spaces.GPU(duration=120) def transcribe_with_gpu(audio_path: str) -> List[Dict]: return transcribe_audio_inner(audio_path) else: def transcribe_with_gpu(audio_path: str) -> List[Dict]: return transcribe_audio_inner(audio_path) def extract_audio(video_path: str) -> str: audio_path = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name cmd = [ "ffmpeg", "-y", "-i", video_path, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", audio_path ] subprocess.run(cmd, capture_output=True, check=True) return audio_path def get_video_duration(video_path: str) -> float: cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "json", video_path ] result = subprocess.run(cmd, capture_output=True, text=True, check=True) data = json.loads(result.stdout) return float(data["format"]["duration"]) def segments_to_transcript(segments: List[Dict]) -> str: lines = [] for seg in segments: start = seg['start'] end = seg['end'] text = seg['text'] lines.append(f"[{start:.2f}-{end:.2f}] {text}") return "\n".join(lines) def parse_transcript_to_segments(transcript: str) -> List[Dict]: segments = [] pattern = r'\[(\d+\.?\d*)-(\d+\.?\d*)\]\s*(.+)' for line in transcript.strip().split("\n"): line = line.strip() if not line: continue match = re.match(pattern, line) if match: start, end, text = match.groups() segments.append({ 'start': float(start), 'end': float(end), 'text': text.strip() }) return segments def find_current_segment_index(segments: List[Dict], current_time: float) -> int: for i, seg in enumerate(segments): if seg['start'] <= current_time < seg['end']: return i return -1 def format_transcript_with_highlight(segments: List[Dict], current_index: int) -> str: lines = [] for i, seg in enumerate(segments): start = seg['start'] end = seg['end'] text = seg['text'] line = f"[{start:.2f}-{end:.2f}] {text}" if i == current_index: line = line.upper() lines.append(line) return "\n".join(lines) def cut_video_segments(video_path: str, segments_to_keep: List[Dict]) -> Optional[str]: if not segments_to_keep: return None segments_to_keep = sorted(segments_to_keep, key=lambda x: x['start']) temp_dir = tempfile.mkdtemp() clip_files = [] for i, seg in enumerate(segments_to_keep): clip_path = os.path.join(temp_dir, f"clip_{i:04d}.mp4") cmd = [ "ffmpeg", "-y", "-i", video_path, "-ss", str(seg['start']), "-to", str(seg['end']), "-c:v", "libx264", "-c:a", "aac", "-avoid_negative_ts", "make_zero", clip_path ] subprocess.run(cmd, capture_output=True, check=True) clip_files.append(clip_path) list_file = os.path.join(temp_dir, "list.txt") with open(list_file, "w") as f: for clip in clip_files: f.write(f"file '{clip}'\n") output_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name cmd = [ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", list_file, "-c", "copy", output_path ] subprocess.run(cmd, capture_output=True, check=True) for clip in clip_files: os.remove(clip) os.remove(list_file) os.rmdir(temp_dir) return output_path def process_upload(video_file): if video_file is None: return None, "", [], "Please upload a video file." video_path = video_file return video_path, "", [], "Video uploaded. Click 'Transcribe' to start transcription." def run_transcription(video_path, progress=gr.Progress()): if video_path is None: return "", [], "No video uploaded." progress(0.1, desc="Extracting audio...") try: audio_path = extract_audio(video_path) except Exception as e: return "", [], f"Error extracting audio: {str(e)}" progress(0.3, desc="Running transcription (this may take a while)...") try: segments = transcribe_with_gpu(audio_path) except Exception as e: return "", [], f"Error during transcription: {str(e)}" finally: if os.path.exists(audio_path): os.remove(audio_path) progress(0.9, desc="Formatting transcript...") transcript = segments_to_transcript(segments) progress(1.0, desc="Done!") return transcript, segments, f"Transcription complete! {len(segments)} segments found." def update_highlight(video_path, original_segments, current_time): if not original_segments: return "" current_index = find_current_segment_index(original_segments, current_time) return format_transcript_with_highlight(original_segments, current_index) def apply_cuts(video_path, edited_transcript, original_segments): if video_path is None: return None, "No video to process." if not original_segments: return None, "No transcript available. Please transcribe first." edited_segments = parse_transcript_to_segments(edited_transcript) original_texts = {seg['text'].strip().lower() for seg in original_segments} edited_texts = {seg['text'].strip().lower() for seg in edited_segments} segments_to_keep = [] for seg in original_segments: if seg['text'].strip().lower() in edited_texts: segments_to_keep.append(seg) if not segments_to_keep: return None, "All segments were removed. Cannot create empty video." deleted_count = len(original_segments) - len(segments_to_keep) if deleted_count == 0: return video_path, "No changes detected. Original video returned." try: output_path = cut_video_segments(video_path, segments_to_keep) if output_path: return output_path, f"Video edited! Removed {deleted_count} segment(s)." else: return None, "Error creating edited video." except Exception as e: return None, f"Error cutting video: {str(e)}" JS_CODE = """ """ with gr.Blocks(title="TextCut - Edit Videos by Editing Transcripts") as demo: gr.Markdown("# TextCut") gr.Markdown("Edit videos by simply editing their transcript. Upload a video, transcribe it, then delete lines to cut those parts from the video.") gr.HTML(JS_CODE) original_segments = gr.State([]) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### Transcript") transcript_box = gr.Textbox( label="Transcript (delete lines to cut those parts)", lines=15, interactive=True, placeholder="Transcript will appear here after transcription..." ) current_time = gr.Number( label="Current Video Time (seconds)", value=0, visible=True, elem_id="current-time-input" ) highlight_btn = gr.Button("Update Highlight", size="sm") with gr.Column(scale=1): gr.Markdown("### Video") video_input = gr.Video( label="Upload Video", sources=["upload"], interactive=True ) with gr.Row(): transcribe_btn = gr.Button("Transcribe", variant="primary") cut_btn = gr.Button("Apply Cuts", variant="secondary") status_text = gr.Textbox(label="Status", interactive=False, lines=2) gr.Markdown("### Edited Video Output") video_output = gr.Video(label="Edited Video") video_input.change( fn=process_upload, inputs=[video_input], outputs=[video_input, transcript_box, original_segments, status_text] ) transcribe_btn.click( fn=run_transcription, inputs=[video_input], outputs=[transcript_box, original_segments, status_text] ) highlight_btn.click( fn=update_highlight, inputs=[video_input, original_segments, current_time], outputs=[transcript_box] ) current_time.change( fn=update_highlight, inputs=[video_input, original_segments, current_time], outputs=[transcript_box] ) cut_btn.click( fn=apply_cuts, inputs=[video_input, transcript_box, original_segments], outputs=[video_output, status_text] ) if __name__ == "__main__": demo.launch()