|
|
import os |
|
|
import tempfile |
|
|
import subprocess |
|
|
import json |
|
|
import re |
|
|
from typing import List, Dict, Optional, Tuple, Generator |
|
|
import gradio as gr |
|
|
|
|
|
try: |
|
|
import spaces |
|
|
HAS_SPACES = True |
|
|
except ImportError: |
|
|
HAS_SPACES = False |
|
|
|
|
|
import torch |
|
|
import numpy as np |
|
|
|
|
|
|
|
|
MODEL_PATH = "microsoft/VibeVoice-ASR" |
|
|
model = None |
|
|
processor = None |
|
|
|
|
|
|
|
|
def get_model(): |
|
|
global model, processor |
|
|
if model is None: |
|
|
from vibevoice.modular.modeling_vibevoice_asr import VibeVoiceASRForConditionalGeneration |
|
|
from vibevoice.processor.vibevoice_asr_processor import VibeVoiceASRProcessor |
|
|
|
|
|
processor = VibeVoiceASRProcessor.from_pretrained(MODEL_PATH) |
|
|
model = VibeVoiceASRForConditionalGeneration.from_pretrained( |
|
|
MODEL_PATH, |
|
|
dtype=torch.bfloat16, |
|
|
device_map="auto", |
|
|
trust_remote_code=True |
|
|
) |
|
|
model.eval() |
|
|
return model, processor |
|
|
|
|
|
|
|
|
def transcribe_audio_inner(audio_path: str) -> List[Dict]: |
|
|
model, processor = get_model() |
|
|
device = next(model.parameters()).device |
|
|
|
|
|
inputs = processor( |
|
|
audio=audio_path, |
|
|
sampling_rate=16000, |
|
|
return_tensors="pt", |
|
|
add_generation_prompt=True, |
|
|
) |
|
|
|
|
|
inputs = {k: v.to(device) if isinstance(v, torch.Tensor) else v for k, v in inputs.items()} |
|
|
|
|
|
with torch.no_grad(): |
|
|
output_ids = model.generate( |
|
|
**inputs, |
|
|
max_new_tokens=8192, |
|
|
temperature=None, |
|
|
do_sample=False, |
|
|
num_beams=1, |
|
|
pad_token_id=processor.pad_id, |
|
|
eos_token_id=processor.tokenizer.eos_token_id, |
|
|
) |
|
|
|
|
|
generated_ids = output_ids[0, inputs['input_ids'].shape[1]:] |
|
|
generated_text = processor.decode(generated_ids, skip_special_tokens=True) |
|
|
|
|
|
try: |
|
|
segments = processor.post_process_transcription(generated_text) |
|
|
except Exception: |
|
|
segments = parse_raw_transcript(generated_text) |
|
|
|
|
|
return segments |
|
|
|
|
|
|
|
|
def parse_raw_transcript(text: str) -> List[Dict]: |
|
|
segments = [] |
|
|
pattern = r'\[(\d+\.?\d*)\s*-\s*(\d+\.?\d*)\]\s*(?:\[([^\]]*)\])?\s*(.+?)(?=\[\d+\.?\d*\s*-|\Z)' |
|
|
matches = re.findall(pattern, text, re.DOTALL) |
|
|
|
|
|
for match in matches: |
|
|
start, end, speaker, content = match |
|
|
segments.append({ |
|
|
'start': float(start), |
|
|
'end': float(end), |
|
|
'speaker': speaker.strip() if speaker else 'Speaker', |
|
|
'text': content.strip() |
|
|
}) |
|
|
|
|
|
if not segments and text.strip(): |
|
|
sentences = re.split(r'(?<=[.!?])\s+', text.strip()) |
|
|
duration_per_sentence = 3.0 |
|
|
for i, sentence in enumerate(sentences): |
|
|
if sentence.strip(): |
|
|
segments.append({ |
|
|
'start': i * duration_per_sentence, |
|
|
'end': (i + 1) * duration_per_sentence, |
|
|
'speaker': 'Speaker', |
|
|
'text': sentence.strip() |
|
|
}) |
|
|
|
|
|
return segments |
|
|
|
|
|
|
|
|
if HAS_SPACES: |
|
|
@spaces.GPU(duration=120) |
|
|
def transcribe_with_gpu(audio_path: str) -> List[Dict]: |
|
|
return transcribe_audio_inner(audio_path) |
|
|
else: |
|
|
def transcribe_with_gpu(audio_path: str) -> List[Dict]: |
|
|
return transcribe_audio_inner(audio_path) |
|
|
|
|
|
|
|
|
def extract_audio(video_path: str) -> str: |
|
|
audio_path = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name |
|
|
cmd = [ |
|
|
"ffmpeg", "-y", "-i", video_path, |
|
|
"-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", |
|
|
audio_path |
|
|
] |
|
|
subprocess.run(cmd, capture_output=True, check=True) |
|
|
return audio_path |
|
|
|
|
|
|
|
|
def get_video_duration(video_path: str) -> float: |
|
|
cmd = [ |
|
|
"ffprobe", "-v", "error", |
|
|
"-show_entries", "format=duration", |
|
|
"-of", "json", video_path |
|
|
] |
|
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True) |
|
|
data = json.loads(result.stdout) |
|
|
return float(data["format"]["duration"]) |
|
|
|
|
|
|
|
|
def segments_to_transcript(segments: List[Dict]) -> str: |
|
|
lines = [] |
|
|
for seg in segments: |
|
|
start = seg['start'] |
|
|
end = seg['end'] |
|
|
text = seg['text'] |
|
|
lines.append(f"[{start:.2f}-{end:.2f}] {text}") |
|
|
return "\n".join(lines) |
|
|
|
|
|
|
|
|
def parse_transcript_to_segments(transcript: str) -> List[Dict]: |
|
|
segments = [] |
|
|
pattern = r'\[(\d+\.?\d*)-(\d+\.?\d*)\]\s*(.+)' |
|
|
|
|
|
for line in transcript.strip().split("\n"): |
|
|
line = line.strip() |
|
|
if not line: |
|
|
continue |
|
|
match = re.match(pattern, line) |
|
|
if match: |
|
|
start, end, text = match.groups() |
|
|
segments.append({ |
|
|
'start': float(start), |
|
|
'end': float(end), |
|
|
'text': text.strip() |
|
|
}) |
|
|
|
|
|
return segments |
|
|
|
|
|
|
|
|
def find_current_segment_index(segments: List[Dict], current_time: float) -> int: |
|
|
for i, seg in enumerate(segments): |
|
|
if seg['start'] <= current_time < seg['end']: |
|
|
return i |
|
|
return -1 |
|
|
|
|
|
|
|
|
def format_transcript_with_highlight(segments: List[Dict], current_index: int) -> str: |
|
|
lines = [] |
|
|
for i, seg in enumerate(segments): |
|
|
start = seg['start'] |
|
|
end = seg['end'] |
|
|
text = seg['text'] |
|
|
line = f"[{start:.2f}-{end:.2f}] {text}" |
|
|
if i == current_index: |
|
|
line = line.upper() |
|
|
lines.append(line) |
|
|
return "\n".join(lines) |
|
|
|
|
|
|
|
|
def cut_video_segments(video_path: str, segments_to_keep: List[Dict]) -> Optional[str]: |
|
|
if not segments_to_keep: |
|
|
return None |
|
|
|
|
|
segments_to_keep = sorted(segments_to_keep, key=lambda x: x['start']) |
|
|
|
|
|
temp_dir = tempfile.mkdtemp() |
|
|
clip_files = [] |
|
|
|
|
|
for i, seg in enumerate(segments_to_keep): |
|
|
clip_path = os.path.join(temp_dir, f"clip_{i:04d}.mp4") |
|
|
cmd = [ |
|
|
"ffmpeg", "-y", "-i", video_path, |
|
|
"-ss", str(seg['start']), |
|
|
"-to", str(seg['end']), |
|
|
"-c:v", "libx264", "-c:a", "aac", |
|
|
"-avoid_negative_ts", "make_zero", |
|
|
clip_path |
|
|
] |
|
|
subprocess.run(cmd, capture_output=True, check=True) |
|
|
clip_files.append(clip_path) |
|
|
|
|
|
list_file = os.path.join(temp_dir, "list.txt") |
|
|
with open(list_file, "w") as f: |
|
|
for clip in clip_files: |
|
|
f.write(f"file '{clip}'\n") |
|
|
|
|
|
output_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name |
|
|
cmd = [ |
|
|
"ffmpeg", "-y", "-f", "concat", "-safe", "0", |
|
|
"-i", list_file, |
|
|
"-c", "copy", |
|
|
output_path |
|
|
] |
|
|
subprocess.run(cmd, capture_output=True, check=True) |
|
|
|
|
|
for clip in clip_files: |
|
|
os.remove(clip) |
|
|
os.remove(list_file) |
|
|
os.rmdir(temp_dir) |
|
|
|
|
|
return output_path |
|
|
|
|
|
|
|
|
def process_upload(video_file): |
|
|
if video_file is None: |
|
|
return None, "", [], "Please upload a video file." |
|
|
|
|
|
video_path = video_file |
|
|
return video_path, "", [], "Video uploaded. Click 'Transcribe' to start transcription." |
|
|
|
|
|
|
|
|
def run_transcription(video_path, progress=gr.Progress()): |
|
|
if video_path is None: |
|
|
return "", [], "No video uploaded." |
|
|
|
|
|
progress(0.1, desc="Extracting audio...") |
|
|
|
|
|
try: |
|
|
audio_path = extract_audio(video_path) |
|
|
except Exception as e: |
|
|
return "", [], f"Error extracting audio: {str(e)}" |
|
|
|
|
|
progress(0.3, desc="Running transcription (this may take a while)...") |
|
|
|
|
|
try: |
|
|
segments = transcribe_with_gpu(audio_path) |
|
|
except Exception as e: |
|
|
return "", [], f"Error during transcription: {str(e)}" |
|
|
finally: |
|
|
if os.path.exists(audio_path): |
|
|
os.remove(audio_path) |
|
|
|
|
|
progress(0.9, desc="Formatting transcript...") |
|
|
|
|
|
transcript = segments_to_transcript(segments) |
|
|
|
|
|
progress(1.0, desc="Done!") |
|
|
|
|
|
return transcript, segments, f"Transcription complete! {len(segments)} segments found." |
|
|
|
|
|
|
|
|
def update_highlight(video_path, original_segments, current_time): |
|
|
if not original_segments: |
|
|
return "" |
|
|
|
|
|
current_index = find_current_segment_index(original_segments, current_time) |
|
|
return format_transcript_with_highlight(original_segments, current_index) |
|
|
|
|
|
|
|
|
def apply_cuts(video_path, edited_transcript, original_segments): |
|
|
if video_path is None: |
|
|
return None, "No video to process." |
|
|
|
|
|
if not original_segments: |
|
|
return None, "No transcript available. Please transcribe first." |
|
|
|
|
|
edited_segments = parse_transcript_to_segments(edited_transcript) |
|
|
|
|
|
original_texts = {seg['text'].strip().lower() for seg in original_segments} |
|
|
edited_texts = {seg['text'].strip().lower() for seg in edited_segments} |
|
|
|
|
|
segments_to_keep = [] |
|
|
for seg in original_segments: |
|
|
if seg['text'].strip().lower() in edited_texts: |
|
|
segments_to_keep.append(seg) |
|
|
|
|
|
if not segments_to_keep: |
|
|
return None, "All segments were removed. Cannot create empty video." |
|
|
|
|
|
deleted_count = len(original_segments) - len(segments_to_keep) |
|
|
|
|
|
if deleted_count == 0: |
|
|
return video_path, "No changes detected. Original video returned." |
|
|
|
|
|
try: |
|
|
output_path = cut_video_segments(video_path, segments_to_keep) |
|
|
if output_path: |
|
|
return output_path, f"Video edited! Removed {deleted_count} segment(s)." |
|
|
else: |
|
|
return None, "Error creating edited video." |
|
|
except Exception as e: |
|
|
return None, f"Error cutting video: {str(e)}" |
|
|
|
|
|
|
|
|
JS_CODE = """ |
|
|
<script> |
|
|
(function() { |
|
|
let lastUpdate = 0; |
|
|
const updateInterval = 500; |
|
|
|
|
|
function findVideoElement() { |
|
|
const videos = document.querySelectorAll('video'); |
|
|
for (const video of videos) { |
|
|
if (video.src && !video.src.includes('blob:')) { |
|
|
return video; |
|
|
} |
|
|
} |
|
|
return videos[0]; |
|
|
} |
|
|
|
|
|
function setupVideoListener() { |
|
|
const video = findVideoElement(); |
|
|
if (!video) { |
|
|
setTimeout(setupVideoListener, 1000); |
|
|
return; |
|
|
} |
|
|
|
|
|
video.addEventListener('timeupdate', function() { |
|
|
const now = Date.now(); |
|
|
if (now - lastUpdate < updateInterval) return; |
|
|
lastUpdate = now; |
|
|
|
|
|
const timeInput = document.querySelector('#current-time-input input'); |
|
|
if (timeInput) { |
|
|
timeInput.value = video.currentTime.toFixed(2); |
|
|
timeInput.dispatchEvent(new Event('input', { bubbles: true })); |
|
|
} |
|
|
}); |
|
|
} |
|
|
|
|
|
if (document.readyState === 'loading') { |
|
|
document.addEventListener('DOMContentLoaded', setupVideoListener); |
|
|
} else { |
|
|
setupVideoListener(); |
|
|
} |
|
|
|
|
|
const observer = new MutationObserver(function(mutations) { |
|
|
setupVideoListener(); |
|
|
}); |
|
|
observer.observe(document.body, { childList: true, subtree: true }); |
|
|
})(); |
|
|
</script> |
|
|
""" |
|
|
|
|
|
|
|
|
with gr.Blocks(title="TextCut - Edit Videos by Editing Transcripts") as demo: |
|
|
gr.Markdown("# TextCut") |
|
|
gr.Markdown("Edit videos by simply editing their transcript. Upload a video, transcribe it, then delete lines to cut those parts from the video.") |
|
|
gr.HTML(JS_CODE) |
|
|
|
|
|
original_segments = gr.State([]) |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### Transcript") |
|
|
transcript_box = gr.Textbox( |
|
|
label="Transcript (delete lines to cut those parts)", |
|
|
lines=15, |
|
|
interactive=True, |
|
|
placeholder="Transcript will appear here after transcription..." |
|
|
) |
|
|
|
|
|
current_time = gr.Number( |
|
|
label="Current Video Time (seconds)", |
|
|
value=0, |
|
|
visible=True, |
|
|
elem_id="current-time-input" |
|
|
) |
|
|
|
|
|
highlight_btn = gr.Button("Update Highlight", size="sm") |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### Video") |
|
|
video_input = gr.Video( |
|
|
label="Upload Video", |
|
|
sources=["upload"], |
|
|
interactive=True |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
transcribe_btn = gr.Button("Transcribe", variant="primary") |
|
|
cut_btn = gr.Button("Apply Cuts", variant="secondary") |
|
|
|
|
|
status_text = gr.Textbox(label="Status", interactive=False, lines=2) |
|
|
|
|
|
gr.Markdown("### Edited Video Output") |
|
|
video_output = gr.Video(label="Edited Video") |
|
|
|
|
|
video_input.change( |
|
|
fn=process_upload, |
|
|
inputs=[video_input], |
|
|
outputs=[video_input, transcript_box, original_segments, status_text] |
|
|
) |
|
|
|
|
|
transcribe_btn.click( |
|
|
fn=run_transcription, |
|
|
inputs=[video_input], |
|
|
outputs=[transcript_box, original_segments, status_text] |
|
|
) |
|
|
|
|
|
highlight_btn.click( |
|
|
fn=update_highlight, |
|
|
inputs=[video_input, original_segments, current_time], |
|
|
outputs=[transcript_box] |
|
|
) |
|
|
|
|
|
current_time.change( |
|
|
fn=update_highlight, |
|
|
inputs=[video_input, original_segments, current_time], |
|
|
outputs=[transcript_box] |
|
|
) |
|
|
|
|
|
cut_btn.click( |
|
|
fn=apply_cuts, |
|
|
inputs=[video_input, transcript_box, original_segments], |
|
|
outputs=[video_output, status_text] |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |
|
|
|