TextCut / app.py
abidlabs's picture
abidlabs HF Staff
changes
20292f7
import os
import tempfile
import subprocess
import json
import re
from typing import List, Dict, Optional, Tuple, Generator
import gradio as gr
try:
import spaces
HAS_SPACES = True
except ImportError:
HAS_SPACES = False
import torch
import numpy as np
MODEL_PATH = "microsoft/VibeVoice-ASR"
model = None
processor = None
def get_model():
global model, processor
if model is None:
from vibevoice.modular.modeling_vibevoice_asr import VibeVoiceASRForConditionalGeneration
from vibevoice.processor.vibevoice_asr_processor import VibeVoiceASRProcessor
processor = VibeVoiceASRProcessor.from_pretrained(MODEL_PATH)
model = VibeVoiceASRForConditionalGeneration.from_pretrained(
MODEL_PATH,
dtype=torch.bfloat16,
device_map="auto",
trust_remote_code=True
)
model.eval()
return model, processor
def transcribe_audio_inner(audio_path: str) -> List[Dict]:
model, processor = get_model()
device = next(model.parameters()).device
inputs = processor(
audio=audio_path,
sampling_rate=16000,
return_tensors="pt",
add_generation_prompt=True,
)
inputs = {k: v.to(device) if isinstance(v, torch.Tensor) else v for k, v in inputs.items()}
with torch.no_grad():
output_ids = model.generate(
**inputs,
max_new_tokens=8192,
temperature=None,
do_sample=False,
num_beams=1,
pad_token_id=processor.pad_id,
eos_token_id=processor.tokenizer.eos_token_id,
)
generated_ids = output_ids[0, inputs['input_ids'].shape[1]:]
generated_text = processor.decode(generated_ids, skip_special_tokens=True)
try:
segments = processor.post_process_transcription(generated_text)
except Exception:
segments = parse_raw_transcript(generated_text)
return segments
def parse_raw_transcript(text: str) -> List[Dict]:
segments = []
pattern = r'\[(\d+\.?\d*)\s*-\s*(\d+\.?\d*)\]\s*(?:\[([^\]]*)\])?\s*(.+?)(?=\[\d+\.?\d*\s*-|\Z)'
matches = re.findall(pattern, text, re.DOTALL)
for match in matches:
start, end, speaker, content = match
segments.append({
'start': float(start),
'end': float(end),
'speaker': speaker.strip() if speaker else 'Speaker',
'text': content.strip()
})
if not segments and text.strip():
sentences = re.split(r'(?<=[.!?])\s+', text.strip())
duration_per_sentence = 3.0
for i, sentence in enumerate(sentences):
if sentence.strip():
segments.append({
'start': i * duration_per_sentence,
'end': (i + 1) * duration_per_sentence,
'speaker': 'Speaker',
'text': sentence.strip()
})
return segments
if HAS_SPACES:
@spaces.GPU(duration=120)
def transcribe_with_gpu(audio_path: str) -> List[Dict]:
return transcribe_audio_inner(audio_path)
else:
def transcribe_with_gpu(audio_path: str) -> List[Dict]:
return transcribe_audio_inner(audio_path)
def extract_audio(video_path: str) -> str:
audio_path = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name
cmd = [
"ffmpeg", "-y", "-i", video_path,
"-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1",
audio_path
]
subprocess.run(cmd, capture_output=True, check=True)
return audio_path
def get_video_duration(video_path: str) -> float:
cmd = [
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "json", video_path
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
data = json.loads(result.stdout)
return float(data["format"]["duration"])
def segments_to_transcript(segments: List[Dict]) -> str:
lines = []
for seg in segments:
start = seg['start']
end = seg['end']
text = seg['text']
lines.append(f"[{start:.2f}-{end:.2f}] {text}")
return "\n".join(lines)
def parse_transcript_to_segments(transcript: str) -> List[Dict]:
segments = []
pattern = r'\[(\d+\.?\d*)-(\d+\.?\d*)\]\s*(.+)'
for line in transcript.strip().split("\n"):
line = line.strip()
if not line:
continue
match = re.match(pattern, line)
if match:
start, end, text = match.groups()
segments.append({
'start': float(start),
'end': float(end),
'text': text.strip()
})
return segments
def find_current_segment_index(segments: List[Dict], current_time: float) -> int:
for i, seg in enumerate(segments):
if seg['start'] <= current_time < seg['end']:
return i
return -1
def format_transcript_with_highlight(segments: List[Dict], current_index: int) -> str:
lines = []
for i, seg in enumerate(segments):
start = seg['start']
end = seg['end']
text = seg['text']
line = f"[{start:.2f}-{end:.2f}] {text}"
if i == current_index:
line = line.upper()
lines.append(line)
return "\n".join(lines)
def cut_video_segments(video_path: str, segments_to_keep: List[Dict]) -> Optional[str]:
if not segments_to_keep:
return None
segments_to_keep = sorted(segments_to_keep, key=lambda x: x['start'])
temp_dir = tempfile.mkdtemp()
clip_files = []
for i, seg in enumerate(segments_to_keep):
clip_path = os.path.join(temp_dir, f"clip_{i:04d}.mp4")
cmd = [
"ffmpeg", "-y", "-i", video_path,
"-ss", str(seg['start']),
"-to", str(seg['end']),
"-c:v", "libx264", "-c:a", "aac",
"-avoid_negative_ts", "make_zero",
clip_path
]
subprocess.run(cmd, capture_output=True, check=True)
clip_files.append(clip_path)
list_file = os.path.join(temp_dir, "list.txt")
with open(list_file, "w") as f:
for clip in clip_files:
f.write(f"file '{clip}'\n")
output_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name
cmd = [
"ffmpeg", "-y", "-f", "concat", "-safe", "0",
"-i", list_file,
"-c", "copy",
output_path
]
subprocess.run(cmd, capture_output=True, check=True)
for clip in clip_files:
os.remove(clip)
os.remove(list_file)
os.rmdir(temp_dir)
return output_path
def process_upload(video_file):
if video_file is None:
return None, "", [], "Please upload a video file."
video_path = video_file
return video_path, "", [], "Video uploaded. Click 'Transcribe' to start transcription."
def run_transcription(video_path, progress=gr.Progress()):
if video_path is None:
return "", [], "No video uploaded."
progress(0.1, desc="Extracting audio...")
try:
audio_path = extract_audio(video_path)
except Exception as e:
return "", [], f"Error extracting audio: {str(e)}"
progress(0.3, desc="Running transcription (this may take a while)...")
try:
segments = transcribe_with_gpu(audio_path)
except Exception as e:
return "", [], f"Error during transcription: {str(e)}"
finally:
if os.path.exists(audio_path):
os.remove(audio_path)
progress(0.9, desc="Formatting transcript...")
transcript = segments_to_transcript(segments)
progress(1.0, desc="Done!")
return transcript, segments, f"Transcription complete! {len(segments)} segments found."
def update_highlight(video_path, original_segments, current_time):
if not original_segments:
return ""
current_index = find_current_segment_index(original_segments, current_time)
return format_transcript_with_highlight(original_segments, current_index)
def apply_cuts(video_path, edited_transcript, original_segments):
if video_path is None:
return None, "No video to process."
if not original_segments:
return None, "No transcript available. Please transcribe first."
edited_segments = parse_transcript_to_segments(edited_transcript)
original_texts = {seg['text'].strip().lower() for seg in original_segments}
edited_texts = {seg['text'].strip().lower() for seg in edited_segments}
segments_to_keep = []
for seg in original_segments:
if seg['text'].strip().lower() in edited_texts:
segments_to_keep.append(seg)
if not segments_to_keep:
return None, "All segments were removed. Cannot create empty video."
deleted_count = len(original_segments) - len(segments_to_keep)
if deleted_count == 0:
return video_path, "No changes detected. Original video returned."
try:
output_path = cut_video_segments(video_path, segments_to_keep)
if output_path:
return output_path, f"Video edited! Removed {deleted_count} segment(s)."
else:
return None, "Error creating edited video."
except Exception as e:
return None, f"Error cutting video: {str(e)}"
JS_CODE = """
<script>
(function() {
let lastUpdate = 0;
const updateInterval = 500;
function findVideoElement() {
const videos = document.querySelectorAll('video');
for (const video of videos) {
if (video.src && !video.src.includes('blob:')) {
return video;
}
}
return videos[0];
}
function setupVideoListener() {
const video = findVideoElement();
if (!video) {
setTimeout(setupVideoListener, 1000);
return;
}
video.addEventListener('timeupdate', function() {
const now = Date.now();
if (now - lastUpdate < updateInterval) return;
lastUpdate = now;
const timeInput = document.querySelector('#current-time-input input');
if (timeInput) {
timeInput.value = video.currentTime.toFixed(2);
timeInput.dispatchEvent(new Event('input', { bubbles: true }));
}
});
}
if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', setupVideoListener);
} else {
setupVideoListener();
}
const observer = new MutationObserver(function(mutations) {
setupVideoListener();
});
observer.observe(document.body, { childList: true, subtree: true });
})();
</script>
"""
with gr.Blocks(title="TextCut - Edit Videos by Editing Transcripts") as demo:
gr.Markdown("# TextCut")
gr.Markdown("Edit videos by simply editing their transcript. Upload a video, transcribe it, then delete lines to cut those parts from the video.")
gr.HTML(JS_CODE)
original_segments = gr.State([])
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Transcript")
transcript_box = gr.Textbox(
label="Transcript (delete lines to cut those parts)",
lines=15,
interactive=True,
placeholder="Transcript will appear here after transcription..."
)
current_time = gr.Number(
label="Current Video Time (seconds)",
value=0,
visible=True,
elem_id="current-time-input"
)
highlight_btn = gr.Button("Update Highlight", size="sm")
with gr.Column(scale=1):
gr.Markdown("### Video")
video_input = gr.Video(
label="Upload Video",
sources=["upload"],
interactive=True
)
with gr.Row():
transcribe_btn = gr.Button("Transcribe", variant="primary")
cut_btn = gr.Button("Apply Cuts", variant="secondary")
status_text = gr.Textbox(label="Status", interactive=False, lines=2)
gr.Markdown("### Edited Video Output")
video_output = gr.Video(label="Edited Video")
video_input.change(
fn=process_upload,
inputs=[video_input],
outputs=[video_input, transcript_box, original_segments, status_text]
)
transcribe_btn.click(
fn=run_transcription,
inputs=[video_input],
outputs=[transcript_box, original_segments, status_text]
)
highlight_btn.click(
fn=update_highlight,
inputs=[video_input, original_segments, current_time],
outputs=[transcript_box]
)
current_time.change(
fn=update_highlight,
inputs=[video_input, original_segments, current_time],
outputs=[transcript_box]
)
cut_btn.click(
fn=apply_cuts,
inputs=[video_input, transcript_box, original_segments],
outputs=[video_output, status_text]
)
if __name__ == "__main__":
demo.launch()