hlevring's picture
Add download link for transcript with updated timestamps
0dbc95b
# Timestamps Tester - Test word timestamps from any transcript
import gradio as gr
import soundfile as sf
import numpy as np
import base64
import io
import json
import os
import time
import traceback
from ten_vad import TenVad
from fireredvad import FireRedVad, FireRedVadConfig
from huggingface_hub import snapshot_download
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
_firered_vad_instance = None
_firered_model_dir = None
def get_firered_vad(speech_threshold=0.4, min_speech_frame=20, min_silence_frame=20,
smooth_window_size=5, max_speech_frame=2000,
merge_silence_frame=0, extend_speech_frame=0, chunk_max_frame=30000):
"""Get or create a FireRedVAD instance with lazy model download and caching."""
global _firered_vad_instance, _firered_model_dir
if _firered_model_dir is None:
print("[FireRedVAD] Downloading model from HuggingFace...")
repo_dir = snapshot_download("FireRedTeam/FireRedVAD")
_firered_model_dir = os.path.join(repo_dir, "VAD")
print(f"[FireRedVAD] Model cached at: {_firered_model_dir}")
config = FireRedVadConfig(
use_gpu=False,
smooth_window_size=smooth_window_size,
speech_threshold=speech_threshold,
min_speech_frame=min_speech_frame,
max_speech_frame=max_speech_frame,
min_silence_frame=min_silence_frame,
merge_silence_frame=merge_silence_frame,
extend_speech_frame=extend_speech_frame,
chunk_max_frame=chunk_max_frame,
)
_firered_vad_instance = FireRedVad.from_pretrained(_firered_model_dir, config)
return _firered_vad_instance
def detect_silence_periods(audio_data, sample_rate, prob_threshold=0.5, min_off_ms=48, min_on_ms=64):
"""Run TEN VAD to detect silence periods in audio.
Args:
audio_data: numpy array of audio samples (float, mono, 16kHz)
sample_rate: sample rate (must be 16000)
prob_threshold: VAD probability threshold (0.0-1.0), higher = less sensitive
min_off_ms: Minimum silence duration in ms - shorter silences are filled in as voice
min_on_ms: Minimum voice duration in ms - shorter voice bursts are removed
Returns:
List of dicts with 'start' and 'end' times for each silence period
"""
TARGET_SR = 16000 # TEN VAD requires 16kHz
HOP_SIZE = 256 # 16ms at 16kHz
FRAME_MS = 16.0 # Each frame is 16ms
print(f"[VAD] Settings: prob_threshold={prob_threshold}, min_off_ms={min_off_ms}, min_on_ms={min_on_ms}")
if sample_rate != TARGET_SR:
print(f"[VAD] Warning: Expected 16kHz audio, got {sample_rate}Hz")
# Convert float audio to int16 (TEN VAD expects int16)
if audio_data.dtype == np.float32 or audio_data.dtype == np.float64:
audio_int16 = (audio_data * 32767).astype(np.int16)
else:
audio_int16 = audio_data.astype(np.int16)
# Create VAD instance
vad = TenVad(hop_size=HOP_SIZE, threshold=prob_threshold)
frame_duration = HOP_SIZE / TARGET_SR # 0.016s = 16ms
# Process frame by frame and collect raw flags
num_frames = len(audio_int16) // HOP_SIZE
# Use list for mutable flags (will be modified by post-processing)
is_voice = [0] * num_frames
for i in range(num_frames):
frame_start = i * HOP_SIZE
frame = audio_int16[frame_start:frame_start + HOP_SIZE]
result = vad.process(frame)
# TEN VAD returns tuple: (probability, flag) or has .flag attribute
if isinstance(result, tuple):
flag = result[1] # (probability, flag)
else:
flag = result.flag
is_voice[i] = flag
# Convert ms thresholds to frame counts
min_off_frames = int(min_off_ms / FRAME_MS + 0.1)
min_on_frames = int(min_on_ms / FRAME_MS + 0.1)
# Post-processing loop (matches VadPipeline.cpp logic)
while True:
# Pass 1: Fill in short silence gaps (minOff)
# If silence duration <= min_off_frames, convert to voice
if min_off_frames > 0:
start_off = -1
for i in range(num_frames):
if is_voice[i]: # Voice detected
if start_off >= 0 and (i - start_off) <= min_off_frames:
# Short silence gap - fill it in as voice
for j in range(start_off, i):
is_voice[j] = 1
start_off = -1
elif start_off < 0:
start_off = i
# Pass 2: Remove short voice bursts (minOn)
# If voice duration <= min_on_frames, convert to silence
changed = False
if min_on_frames > 0:
start_on = -1
for i in range(num_frames):
if not is_voice[i]: # Silence detected
if start_on >= 0 and (i - start_on) <= min_on_frames:
# Short voice burst - remove it
changed = True
for j in range(start_on, i):
is_voice[j] = 0
start_on = -1
elif start_on < 0:
start_on = i
# Handle case where audio ends with short voice burst
if start_on >= 0 and (num_frames - start_on) <= min_on_frames:
changed = True
for j in range(start_on, num_frames):
is_voice[j] = 0
# Exit loop if no changes or minOff is disabled
if not changed or min_off_frames == 0:
break
# Convert frame flags to silence periods
silence_periods = []
in_silence = False
silence_start = 0.0
for i in range(num_frames):
current_time = i * frame_duration
if is_voice[i]:
# Voice frame
if in_silence:
# End of silence period
silence_periods.append({
'start': round(silence_start, 3),
'end': round(current_time, 3)
})
in_silence = False
else:
# Silence frame
if not in_silence:
# Start of silence period
silence_start = current_time
in_silence = True
# Handle case where audio ends in silence
if in_silence:
silence_periods.append({
'start': round(silence_start, 3),
'end': round(num_frames * frame_duration, 3)
})
return silence_periods
def detect_silence_periods_firered(audio_data, sample_rate, prob_threshold=0.4, min_off_ms=200, min_on_ms=200):
"""Run FireRedVAD to detect silence periods in audio.
Args:
audio_data: numpy array of audio samples (mono, 16kHz)
sample_rate: sample rate (must be 16000)
prob_threshold: speech probability threshold (0.0-1.0)
min_off_ms: Minimum silence duration in ms (maps to min_silence_frame)
min_on_ms: Minimum speech duration in ms (maps to min_speech_frame)
Returns:
List of dicts with 'start' and 'end' times for each silence period
"""
FRAME_SHIFT_MS = 10 # FireRedVAD uses 10ms frame shift
print(f"[FireRedVAD] Settings: prob_threshold={prob_threshold}, min_off_ms={min_off_ms}, min_on_ms={min_on_ms}")
if sample_rate != 16000:
print(f"[FireRedVAD] Warning: Expected 16kHz audio, got {sample_rate}Hz")
# Convert to int16 if needed (FireRedVAD reads int16 from files)
if audio_data.dtype in (np.float32, np.float64):
audio_int16 = (audio_data * 32767).astype(np.int16)
else:
audio_int16 = audio_data.astype(np.int16)
min_silence_frames = max(1, round(min_off_ms / FRAME_SHIFT_MS))
min_speech_frames = max(1, round(min_on_ms / FRAME_SHIFT_MS))
vad = get_firered_vad(
speech_threshold=prob_threshold,
min_speech_frame=min_speech_frames,
min_silence_frame=min_silence_frames,
)
result, probs = vad.detect((audio_int16, sample_rate))
speech_timestamps = result.get("timestamps", [])
audio_duration = result.get("dur", len(audio_int16) / sample_rate)
print(f"[FireRedVAD] Detected {len(speech_timestamps)} speech segments in {audio_duration:.2f}s audio")
# Invert speech timestamps to silence periods
silence_periods = []
prev_end = 0.0
for speech_start, speech_end in speech_timestamps:
if speech_start > prev_end + 0.001:
silence_periods.append({
'start': round(prev_end, 3),
'end': round(speech_start, 3)
})
prev_end = speech_end
if audio_duration > prev_end + 0.001:
silence_periods.append({
'start': round(prev_end, 3),
'end': round(audio_duration, 3)
})
return silence_periods
def print_speech_silence_log(timestamps_data, silence_periods):
"""Print interleaved speech and silence log sorted by start time."""
# Build unified list
entries = []
# Add speech entries (word timestamps)
for item in timestamps_data:
entries.append({
'type': 'speech',
'start': item['start'],
'end': item['end'],
'word': item['word']
})
# Add silence entries
for item in silence_periods:
entries.append({
'type': 'silence',
'start': item['start'],
'end': item['end']
})
# Sort by start time
entries.sort(key=lambda x: x['start'])
# Print log
print("\n=== SPEECH & SILENCE LOG ===")
for entry in entries:
if entry['type'] == 'speech':
print(f"[Speech] [{entry['start']:.3f}-{entry['end']:.3f}] {entry['word']}")
else:
duration_ms = int((entry['end'] - entry['start']) * 1000)
print(f"[Silence] [{entry['start']:.3f}-{entry['end']:.3f}] [{duration_ms}ms]")
# Calculate summary
total_silence = sum(p['end'] - p['start'] for p in silence_periods)
print(f"\n=== SUMMARY ===")
print(f"Words: {len(timestamps_data)}, Silence periods: {len(silence_periods)}, Total silence: {total_silence:.2f}s")
print("=" * 30 + "\n")
def parse_transcript_file(file_path):
"""Parse a transcript JSON file and extract word timestamps.
Supports three formats:
- Format 1: segments[].words[] with {start, end, word}
- Format 2: Top-level words[] with {start, end, word}
- Format 3: segments[] with {start, end, text} (text treated as single word)
Args:
file_path: Path to the JSON transcript file
Returns:
Tuple of (full_text, timestamps_data, format_type) where timestamps_data is list of
{word, start, end} dicts and format_type is 1, 2, or 3
Raises:
ValueError if format not recognized
"""
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
timestamps_data = []
full_text_parts = []
# Try Format 1: segments[].words[] with {start, end, word}
if 'segments' in data and len(data['segments']) > 0:
first_segment = data['segments'][0]
if 'words' in first_segment and isinstance(first_segment['words'], list):
# Format 1: Nested words inside segments
print("[IMPORT] Detected Format 1: segments[].words[]")
for segment in data['segments']:
for word_entry in segment.get('words', []):
word = word_entry.get('word', '').strip()
if word:
timestamps_data.append({
'word': word,
'start': float(word_entry.get('start', 0)),
'end': float(word_entry.get('end', 0))
})
full_text_parts.append(word)
return ' '.join(full_text_parts), timestamps_data, 1
# Try Format 3: segments[] with {start, end, text} (text as word)
if 'text' in first_segment and 'start' in first_segment and 'end' in first_segment:
print("[IMPORT] Detected Format 3: segments[] with {start, end, text}")
for segment in data['segments']:
text = segment.get('text', '').strip()
if text:
timestamps_data.append({
'word': text,
'start': float(segment.get('start', 0)),
'end': float(segment.get('end', 0))
})
full_text_parts.append(text)
# Use top-level text if available, otherwise join segments
full_text = data.get('text', ' '.join(full_text_parts))
return full_text, timestamps_data, 3
# Try Format 2: Top-level words[] with {start, end, word}
if 'words' in data and isinstance(data['words'], list):
print("[IMPORT] Detected Format 2: words[]")
for word_entry in data['words']:
word = word_entry.get('word', '').strip()
if word:
timestamps_data.append({
'word': word,
'start': float(word_entry.get('start', 0)),
'end': float(word_entry.get('end', 0))
})
full_text_parts.append(word)
# Use top-level text if available, otherwise join words
full_text = data.get('text', ' '.join(full_text_parts))
return full_text, timestamps_data, 2
raise ValueError("Unrecognized transcript format. Expected segments[].words[], words[], or segments[] with {start, end, text}")
def load_transcript(audio, transcript_file, prob_threshold=0.5, min_off_ms=48, min_on_ms=64, existing_audio_state=None, vad_engine_name="TEN VAD"):
"""Load external transcript and run VAD on audio.
Args:
audio: Path to audio file
transcript_file: Path to JSON transcript file
prob_threshold: VAD probability threshold
min_off_ms: Minimum silence duration in ms
min_on_ms: Minimum voice duration in ms
existing_audio_state: Optional (audio_data, sample_rate) tuple to reuse
vad_engine_name: Which VAD engine to use ("TEN VAD" or "FireRedVAD")
Returns:
Tuple of (text, timestamps_data, audio_data_tuple, raw_text, export_metadata, silence_periods)
"""
try:
# Check if transcript file is provided
if transcript_file is None:
return "No transcript file provided.", [], None, "", {}, []
# Parse the transcript file
try:
text, timestamps_data, format_type = parse_transcript_file(transcript_file)
print(f"[IMPORT] Loaded {len(timestamps_data)} word timestamps from transcript (format {format_type})")
except Exception as e:
return f"Error parsing transcript file: {str(e)}", [], None, "", {}, []
# Get audio data: reuse existing state or load from file
if existing_audio_state is not None:
audio_data, sample_rate = existing_audio_state
print("[AUDIO] Reusing audio from memory")
elif audio is not None:
audio_data, sample_rate = sf.read(audio)
# Convert stereo to mono by averaging channels
if len(audio_data.shape) > 1 and audio_data.shape[1] == 2:
audio_data = np.mean(audio_data, axis=1)
# Resample to 16kHz if needed (required by TEN VAD)
TARGET_SR = 16000
if sample_rate != TARGET_SR:
duration = len(audio_data) / sample_rate
new_length = int(duration * TARGET_SR)
x_old = np.linspace(0, duration, len(audio_data), endpoint=False)
x_new = np.linspace(0, duration, new_length, endpoint=False)
audio_data = np.interp(x_new, x_old, audio_data).astype(np.float32)
print(f"[AUDIO] Resampled from {sample_rate}Hz to {TARGET_SR}Hz")
sample_rate = TARGET_SR
else:
return "No audio provided. Please upload audio first.", [], None, "", {}, []
# Run VAD to detect silence periods
silence_periods = []
try:
if vad_engine_name == "FireRedVAD":
silence_periods = detect_silence_periods_firered(audio_data, sample_rate, prob_threshold, int(min_off_ms), int(min_on_ms))
else:
silence_periods = detect_silence_periods(audio_data, sample_rate, prob_threshold, int(min_off_ms), int(min_on_ms))
print_speech_silence_log(timestamps_data, silence_periods)
except Exception as e:
print(f"[VAD] Error during silence detection: {str(e)}\n{traceback.format_exc()}")
# Calculate audio duration
audio_duration = len(audio_data) / sample_rate
# Build export metadata
export_metadata = {
'model': 'imported-transcript',
'audio_duration': round(audio_duration, 2),
'word_count': len(timestamps_data),
'token_count': 0,
'hypothesis_score': None,
'frame_duration': None
}
# Return text, timestamps, audio data, raw_text (same as text for imports), export metadata, and silence periods
return text, timestamps_data, (audio_data, sample_rate), text, export_metadata, silence_periods
except Exception as e:
return f"Error loading transcript: {str(e)}\n{traceback.format_exc()}", [], None, "", {}, []
def extract_audio_segment(audio_state, intervals, current_window=None):
"""Fast audio extraction from memory with waveform visualization.
Args:
audio_state: Tuple of (audio_data, sample_rate)
intervals: List of (start_time, end_time) tuples to play
current_window: Dict with 'start' and 'end' of current waveform window, or None
Returns:
Tuple of (html_output, new_window_state)
"""
# Wrapper to ensure controls never collapse
def wrap_output(content, window_state=None):
return f'<div style="min-height: 200px;">{content}</div>', window_state
try:
if audio_state is None:
return wrap_output("<p style='color: red; padding: 20px;'>No audio loaded. Please upload audio and load a transcript first.</p>")
if not intervals:
return wrap_output("<p style='color: red; padding: 20px;'>No intervals provided.</p>")
audio_data, sample_rate = audio_state
audio_duration = len(audio_data) / sample_rate
# Calculate overall bounds from all intervals
overall_start = min(iv[0] for iv in intervals)
overall_end = max(iv[1] for iv in intervals)
# Default context padding is 160ms
DEFAULT_PADDING = 0.16
# Determine if we need to redraw the waveform or just update the shaded area
need_redraw = True
if current_window is not None:
# Check if ALL intervals fit within the current window
if overall_start >= current_window['start'] and overall_end <= current_window['end']:
need_redraw = False
# Reuse the current window boundaries
padded_start = current_window['start']
padded_end = current_window['end']
if need_redraw:
# Calculate new window with ±160ms padding around overall bounds
padded_start = max(0, overall_start - DEFAULT_PADDING)
padded_end = min(audio_duration, overall_end + DEFAULT_PADDING)
# Extract padded segment for waveform visualization
start_sample_padded = int(padded_start * sample_rate)
end_sample_padded = int(padded_end * sample_rate)
segment_for_waveform = audio_data[start_sample_padded:end_sample_padded]
# Generate waveform visualization with padded segment (reduced height)
fig, ax = plt.subplots(figsize=(12, 2.25))
# Downsample for visualization using block averaging (more accurate than skipping)
max_points = 8000
if len(segment_for_waveform) > max_points:
# Reshape into blocks and take mean of each block
# Pad to multiple of block_size to avoid losing end samples
block_size = len(segment_for_waveform) // max_points
remainder = len(segment_for_waveform) % block_size
if remainder > 0:
# Pad with the last value to make it divisible
padding_needed = block_size - remainder
segment_padded = np.pad(segment_for_waveform, (0, padding_needed), mode='edge')
else:
segment_padded = segment_for_waveform
segment_vis = segment_padded.reshape(-1, block_size).mean(axis=1)
# Generate matching time points spanning the FULL padded range
times_vis = np.linspace(padded_start, padded_end, len(segment_vis))
else:
segment_vis = segment_for_waveform
times_vis = np.linspace(padded_start, padded_end, len(segment_for_waveform))
ax.plot(times_vis, segment_vis, linewidth=0.5, color='#666')
ax.fill_between(times_vis, segment_vis, alpha=0.3, color='#ccc')
# Highlight context areas (gray) - areas outside all playback regions
# First, shade the entire padded area as context
ax.axvspan(padded_start, padded_end, alpha=0.1, color='#888', label='Context (not played)')
# Then highlight each playback interval in green (overwrites context shading)
for i, (start_time, end_time) in enumerate(intervals):
label = 'Playback region' if i == 0 else None
ax.axvspan(start_time, end_time, alpha=0.3, color='#4CAF50', label=label)
ax.set_xlabel('Time (seconds)', fontsize=10)
ax.set_ylabel('Amplitude', fontsize=10)
# Calculate context on each side in ms
left_context_ms = int((overall_start - padded_start) * 1000)
right_context_ms = int((padded_end - overall_end) * 1000)
# Format context string - symmetric or asymmetric
if left_context_ms == right_context_ms:
context_str = f'(±{left_context_ms}ms context)'
else:
context_str = f'(-{left_context_ms}ms / +{right_context_ms}ms context)'
# Build title showing intervals
if len(intervals) == 1:
interval_str = f'{intervals[0][0]:.3f}s – {intervals[0][1]:.3f}s'
else:
interval_str = f'{len(intervals)} intervals: {overall_start:.3f}s – {overall_end:.3f}s'
ax.set_title(f'Audio Segment: {interval_str} {context_str}', fontsize=11)
ax.legend(fontsize=9)
ax.grid(True, alpha=0.3)
# Convert plot to base64 image
buf = io.BytesIO()
plt.tight_layout()
plt.savefig(buf, format='png', dpi=100)
buf.seek(0)
img_base64 = base64.b64encode(buf.read()).decode()
plt.close(fig)
# Extract and concatenate all interval segments for playback
audio_segments = []
SILENCE_GAP_MS = 50 # Gap between intervals in ms
silence_samples = int(SILENCE_GAP_MS / 1000 * sample_rate)
silence_gap = np.zeros(silence_samples, dtype=audio_data.dtype)
for i, (start_time, end_time) in enumerate(intervals):
start_sample = int(start_time * sample_rate)
end_sample = int(end_time * sample_rate)
audio_segments.append(audio_data[start_sample:end_sample])
# Add silence gap between intervals (not after the last one)
if i < len(intervals) - 1:
audio_segments.append(silence_gap)
combined_playback = np.concatenate(audio_segments)
# Convert PLAYBACK segments (concatenated) to base64 WAV
audio_buf = io.BytesIO()
sf.write(audio_buf, combined_playback, sample_rate, format='WAV')
audio_buf.seek(0)
audio_base64 = base64.b64encode(audio_buf.read()).decode()
audio_data_url = f"data:audio/wav;base64,{audio_base64}"
# Add unique ID to force Gradio to re-render (triggers autoplay)
unique_id = int(time.time() * 1000)
# Calculate context on each side in ms for the info text
left_context_ms = int((overall_start - padded_start) * 1000)
right_context_ms = int((padded_end - overall_end) * 1000)
# Format context string - symmetric or asymmetric
if left_context_ms == right_context_ms:
context_info = f'±{left_context_ms}ms'
else:
context_info = f'-{left_context_ms}ms / +{right_context_ms}ms'
# Calculate total playback duration
total_duration_ms = sum((end - start) * 1000 for start, end in intervals)
if len(intervals) > 1:
total_duration_ms += SILENCE_GAP_MS * (len(intervals) - 1) # Include gaps
# Build segment info text
if len(intervals) == 1:
segment_info = f'{intervals[0][0]:.3f}s – {intervals[0][1]:.3f}s'
else:
segment_info = f'{len(intervals)} intervals'
# Create HTML with waveform and native audio controls
html_output = f'''
<div style="margin: 10px 0;" data-render-id="{unique_id}">
<img src="data:image/png;base64,{img_base64}" style="width: 100%; border-radius: 5px; box-shadow: 0 2px 4px rgba(0,0,0,0.1);">
<div style="margin-top: 10px; display: flex; align-items: center; gap: 15px;">
<audio id="segment-audio" controls autoplay style="flex: 1;">
<source src="{audio_data_url}" type="audio/wav">
</audio>
</div>
<div style="margin-top: 8px; text-align: center;">
<span style="font-size: 14px; font-weight: bold; color: #333;">
Segment: {segment_info}
</span>
<span style="font-size: 12px; color: #666; margin-left: 15px;">
Duration: {total_duration_ms:.0f}ms | Context shown: {context_info}
</span>
</div>
</div>
'''
# Return HTML and new window state
new_window = {'start': padded_start, 'end': padded_end}
return wrap_output(html_output, new_window)
except Exception as e:
return wrap_output(f"<pre style='padding: 20px;'>Error: {str(e)}\n{traceback.format_exc()}</pre>", current_window)
def build_timestamps_iframe_html(entries_json, transcript_source=None):
"""Build the interactive word timestamps iframe HTML.
Args:
entries_json: JSON string of word/silence entries
transcript_source: Optional dict with 'json_data', 'format_type', 'filename' for JSON download
Returns:
Complete iframe HTML for embedding in Gradio
"""
if transcript_source:
original_json_str = json.dumps(transcript_source['json_data'])
format_type = transcript_source['format_type']
transcript_filename = transcript_source.get('filename', 'transcript.json')
else:
original_json_str = 'null'
format_type = 0
transcript_filename = 'transcript.json'
iframe_html = f'''
<!DOCTYPE html>
<html>
<head>
<style>
* {{ margin: 0; padding: 0; box-sizing: border-box; }}
body {{ font-family: -apple-system, BlinkMacSystemFont, sans-serif; padding: 10px; background: #f9f9f9; }}
h3 {{ margin-bottom: 8px; font-size: 16px; }}
.help {{ font-size: 11px; color: #666; margin-bottom: 10px; }}
.container {{ max-height: 180px; overflow-y: auto; background: #fff; border-radius: 8px; padding: 8px; border: 1px solid #ddd; }}
.word-btn {{
display: inline-block;
background: #e8f4f8;
padding: 5px 10px;
margin: 3px;
border-radius: 4px;
cursor: pointer;
border: 1px solid #cde;
font-size: 13px;
transition: all 0.15s;
}}
.word-btn:hover {{ background: #c5e5f5; }}
.word-btn.selected {{ background: #4CAF50; color: white; border-color: #3a9; }}
.silence-btn {{
display: inline-block;
background: #ffe4c4;
padding: 5px 8px;
margin: 3px;
border-radius: 4px;
cursor: pointer;
border: 1px solid #dca;
font-size: 11px;
transition: all 0.15s;
}}
.silence-btn:hover {{ background: #ffd4a4; }}
.silence-btn.selected {{ background: #ff9800; color: white; border-color: #e68a00; }}
.untranscribed-btn {{
display: inline-block;
background: #ffcccc;
padding: 5px 8px;
margin: 3px;
border-radius: 4px;
cursor: pointer;
border: 1px solid #c88;
font-size: 11px;
transition: all 0.15s;
}}
.untranscribed-btn:hover {{ background: #ffaaaa; }}
.untranscribed-btn.selected {{ background: #e53935; color: white; border-color: #c62828; }}
.checkbox-container {{
display: inline-flex;
align-items: center;
margin-left: 15px;
font-size: 12px;
cursor: pointer;
}}
.checkbox-container input {{
margin-right: 5px;
cursor: pointer;
}}
.checkbox-container:hover {{
color: #0066cc;
}}
.time {{ color: #0066cc; font-size: 10px; font-weight: bold; }}
.silence-time {{ color: #996600; font-size: 10px; font-weight: bold; }}
.untranscribed-time {{ color: #b71c1c; font-size: 10px; font-weight: bold; }}
.duration {{ color: #666; font-size: 10px; margin-left: 3px; }}
.word {{ margin-left: 4px; }}
</style>
</head>
<body>
<div style="display: flex; justify-content: space-between; align-items: center; margin-bottom: 8px;">
<div style="display: flex; align-items: center;">
<h3 style="margin: 0;">Word Timestamps</h3>
<label class="checkbox-container" title="Adjusts word start and end times using VAD silence detection. Rule A: If a silence overlaps a word, the word start is moved to the silence end (corrects ASR onset errors). Rule B: If there is a gap between word end and the next silence, the word end is extended to the silence boundary. Rule C: If there is a gap between the upstream silence end and word start, the word start is extended back to the silence boundary. Rule D: For adjacent words with no silence between them, the word end is extended toward the midpoint of the gap.">
<input type="checkbox" id="adjust-intervals">
Apply Time Stamp Adjustments
</label>
<label class="checkbox-container" title="Extends adjusted word boundaries slightly into adjacent silence periods. Padding = min(N ms, half the silence duration). This accounts for imprecise VAD silence boundaries where speech may extend slightly into detected silence." style="margin-left: 10px;">
<input type="checkbox" id="pad-silence" disabled>
Pad into silence by
</label>
<input type="number" id="pad-silence-ms" value="50" min="1" max="200" step="5" disabled style="width: 50px; margin-left: 2px; font-size: 12px;">
<span style="font-size: 12px; color: #666;">ms</span>
</div>
<a href="#" id="download-json" onclick="downloadJSON(); return false;" style="font-size: 12px; color: #1a73e8; text-decoration: none; white-space: nowrap;">Download JSON</a>
</div>
<p class="help"><b>Click</b> = select &nbsp;|&nbsp; <b>Ctrl+Click</b> = toggle &nbsp;|&nbsp; <b>Shift+Click</b> = range &nbsp;&nbsp;&nbsp; <span style="background: #ffe4c4; padding: 2px 8px; border-radius: 3px; border: 1px solid #dca;"></span> = detected non speech &nbsp; <span style="background: #ffcccc; padding: 2px 8px; border-radius: 3px; border: 1px solid #c88;"></span> = speech without transcript</p>
<div class="container" id="words"></div>
<script>
var entries = {entries_json};
var originalJSON = {original_json_str};
var formatType = {format_type};
var transcriptFilename = "{transcript_filename}";
var container = document.getElementById('words');
// Merge consecutive silence periods (no word between them)
function mergeConsecutiveSilences(entryList) {{
var merged = [];
var pendingSilence = null;
entryList.forEach(function(entry) {{
if (entry.type === 'silence') {{
if (pendingSilence === null) {{
pendingSilence = {{ type: 'silence', start: entry.start, end: entry.end }};
}} else {{
pendingSilence.end = entry.end;
}}
}} else {{
if (pendingSilence !== null) {{
merged.push(pendingSilence);
pendingSilence = null;
}}
merged.push(entry);
}}
}});
if (pendingSilence !== null) {{
merged.push(pendingSilence);
}}
return merged;
}}
entries = mergeConsecutiveSilences(entries);
entries.sort(function(a, b) {{
if (a.start !== b.start) return a.start - b.start;
if (a.type === 'silence' && b.type !== 'silence') return -1;
if (a.type !== 'silence' && b.type === 'silence') return 1;
return 0;
}});
var words = entries.filter(function(e) {{ return e.type === 'word'; }});
var silences = entries.filter(function(e) {{ return e.type === 'silence'; }});
function adjustTimestamps(words, silences) {{
/*
* adjustTimestamps - Adjust word timestamps using VAD silence detection.
* Moves word boundaries to exact silence boundaries (no padding).
* Use padTimestampsIntoSilence() afterwards to add optional padding.
*
* Inputs:
* words: array of {{start, end, word}} - original ASR word timestamps (sorted by start)
* silences: array of {{start, end}} - VAD-detected silence periods (sorted by start)
*
* Returns:
* array of {{adjStart, adjEnd}} - one per word, same order as input
*
* Stage 1 processing:
*
* Pass 1 - Rule A (overlap correction):
* If a silence ends within the word (after word.start, before word.end),
* the ASR likely placed the word onset before an actual silence gap.
* Move adjStart to silence.end.
*
* Pass 2 - Rules B, C, D (using Rule-A-adjusted positions for blocking):
*
* Rule B - Downstream extension:
* If the first silence after word.end has no other word (using adjusted
* start positions) between them, extend adjEnd to silence.start.
*
* Rule C - Upstream extension:
* If the last silence before word.start has no other word (using adjusted
* end positions) between them, extend adjStart back to silence.end.
*
* Rule D - Midpoint fallback:
* For adjacent words with no silence between them, extend adjEnd by
* gap / 2.
*
* Pass 3 - Final gap fill:
* For any word whose end still has a gap to a downstream silence
* (using adjusted positions), extend adjEnd to silence.start.
*/
var result = [];
for (var i = 0; i < words.length; i++) {{
result.push({{ adjStart: words[i].start, adjEnd: words[i].end }});
}}
// --- Pass 1: Rule A - Overlap correction ---
for (var i = 0; i < words.length; i++) {{
var word = words[i];
var overlapSil = null;
for (var s = 0; s < silences.length; s++) {{
var sil = silences[s];
if (sil.end > word.start && sil.end < word.end) {{
overlapSil = sil;
}}
if (sil.start >= word.end) break;
}}
if (overlapSil) {{
result[i].adjStart = overlapSil.end;
}}
}}
// --- Pass 2: Rules B, C, D (using Rule-A-adjusted positions) ---
for (var i = 0; i < words.length; i++) {{
var word = words[i];
var adj = result[i];
var prevAdj = i > 0 ? result[i - 1] : null;
var nextAdj = i < words.length - 1 ? result[i + 1] : null;
var nextWord = i < words.length - 1 ? words[i + 1] : null;
// Rule C: Upstream extension (only if Rule A didn't adjust start)
if (adj.adjStart === word.start) {{
var upstreamSil = null;
for (var s = silences.length - 1; s >= 0; s--) {{
if (silences[s].end <= word.start) {{
upstreamSil = silences[s];
break;
}}
}}
if (upstreamSil) {{
var blocked = prevAdj && prevAdj.adjEnd > upstreamSil.end;
if (!blocked && word.start > upstreamSil.end) {{
adj.adjStart = upstreamSil.end;
}}
}}
}}
// Rule B: Downstream extension
var downstreamSil = null;
for (var s = 0; s < silences.length; s++) {{
if (silences[s].start >= word.end) {{
downstreamSil = silences[s];
break;
}}
}}
if (downstreamSil) {{
var blocked = nextAdj && nextAdj.adjStart < downstreamSil.start;
if (!blocked) {{
adj.adjEnd = downstreamSil.start;
}}
}}
// Rule D: Midpoint fallback (only if Rule B didn't apply)
if (adj.adjEnd === word.end && nextWord) {{
var silBetween = false;
for (var s = 0; s < silences.length; s++) {{
if (silences[s].start >= word.end && silences[s].start < nextWord.start) {{
silBetween = true;
break;
}}
}}
if (!silBetween) {{
var gap = nextWord.start - word.end;
if (gap > 0) {{
adj.adjEnd = word.end + gap / 2;
}}
}}
}}
}}
// --- Pass 3: Final gap fill ---
// After all adjustments, fill remaining gaps to downstream silence starts
for (var i = 0; i < words.length; i++) {{
var adj = result[i];
var nextAdj = i < words.length - 1 ? result[i + 1] : null;
var downstreamSil = null;
for (var s = 0; s < silences.length; s++) {{
if (silences[s].start >= adj.adjEnd) {{
downstreamSil = silences[s];
break;
}}
}}
if (downstreamSil) {{
var blocked = nextAdj && nextAdj.adjStart < downstreamSil.start;
if (!blocked && downstreamSil.start > adj.adjEnd) {{
adj.adjEnd = downstreamSil.start;
}}
}}
}}
return result;
}}
function padTimestampsIntoSilence(adjustments, words, silences, paddingMs) {{
/*
* padTimestampsIntoSilence - Extend adjusted word boundaries into adjacent
* silence periods by a configurable amount.
*
* Inputs:
* adjustments: array of {{adjStart, adjEnd}} from adjustTimestamps()
* words: original words array (for reference)
* silences: array of {{start, end}} - silence periods
* paddingMs: max padding in milliseconds (e.g. 50)
*
* Returns:
* array of {{adjStart, adjEnd}} - padded timestamps, same order
*
* For each word boundary that sits at a silence boundary:
* - If adjStart equals a silence.end, push it back by padding into the silence
* - If adjEnd equals a silence.start, push it forward by padding into the silence
* padding = min(paddingMs/1000, silence_duration / 2)
*/
var paddingSec = paddingMs / 1000;
var result = [];
for (var i = 0; i < adjustments.length; i++) {{
var adj = adjustments[i];
var padStart = adj.adjStart;
var padEnd = adj.adjEnd;
// Pad start into upstream silence
for (var s = silences.length - 1; s >= 0; s--) {{
if (Math.abs(silences[s].end - adj.adjStart) < 0.002) {{
var silDur = silences[s].end - silences[s].start;
var pad = Math.min(paddingSec, silDur / 2);
padStart = adj.adjStart - pad;
break;
}}
}}
// Pad end into downstream silence
for (var s = 0; s < silences.length; s++) {{
if (Math.abs(silences[s].start - adj.adjEnd) < 0.002) {{
var silDur = silences[s].end - silences[s].start;
var pad = Math.min(paddingSec, silDur / 2);
padEnd = adj.adjEnd + pad;
break;
}}
}}
result.push({{ adjStart: padStart, adjEnd: padEnd }});
}}
return result;
}}
var adjustments = adjustTimestamps(words, silences);
var paddedCache = {{}};
function downloadJSON() {{
if (!originalJSON) return;
var clone = JSON.parse(JSON.stringify(originalJSON));
var adjusted = document.getElementById('adjust-intervals').checked;
var wordBtns = Array.from(document.querySelectorAll('.word-btn'));
var timestamps = [];
for (var i = 0; i < wordBtns.length; i++) {{
timestamps.push({{
start: parseFloat(wordBtns[i].dataset.s),
end: parseFloat(wordBtns[i].dataset.e)
}});
}}
var idx = 0;
if (formatType === 1) {{
for (var si = 0; si < clone.segments.length; si++) {{
var seg = clone.segments[si];
if (!seg.words) continue;
for (var wi = 0; wi < seg.words.length; wi++) {{
var w = seg.words[wi].word;
if (w && w.trim() && idx < timestamps.length) {{
seg.words[wi].start = Math.round(timestamps[idx].start * 1000) / 1000;
seg.words[wi].end = Math.round(timestamps[idx].end * 1000) / 1000;
idx++;
}}
}}
}}
}} else if (formatType === 2) {{
for (var wi = 0; wi < clone.words.length; wi++) {{
var w = clone.words[wi].word;
if (w && w.trim() && idx < timestamps.length) {{
clone.words[wi].start = Math.round(timestamps[idx].start * 1000) / 1000;
clone.words[wi].end = Math.round(timestamps[idx].end * 1000) / 1000;
idx++;
}}
}}
}} else if (formatType === 3) {{
for (var si = 0; si < clone.segments.length; si++) {{
var t = clone.segments[si].text;
if (t && t.trim() && idx < timestamps.length) {{
clone.segments[si].start = Math.round(timestamps[idx].start * 1000) / 1000;
clone.segments[si].end = Math.round(timestamps[idx].end * 1000) / 1000;
idx++;
}}
}}
}}
var jsonStr = JSON.stringify(clone, null, 2);
var blob = new Blob([jsonStr], {{ type: 'application/json' }});
var url = URL.createObjectURL(blob);
var a = document.createElement('a');
a.href = url;
var baseName = transcriptFilename.replace(/\\.json$/i, '');
a.download = baseName + '_adjusted.json';
a.click();
URL.revokeObjectURL(url);
}}
var lastClickedIndex = -1;
function getAllButtons() {{
return Array.from(container.querySelectorAll('.word-btn, .silence-btn, .untranscribed-btn'));
}}
function handleItemClick(btn, e) {{
var allBtns = getAllButtons();
var clickedIndex = allBtns.indexOf(btn);
if (e.shiftKey && lastClickedIndex >= 0) {{
var start = Math.min(lastClickedIndex, clickedIndex);
var end = Math.max(lastClickedIndex, clickedIndex);
allBtns.forEach(function(b, i) {{
if (i >= start && i <= end) {{
b.classList.add('selected');
}}
}});
}} else if (e.ctrlKey) {{
btn.classList.toggle('selected');
}} else {{
allBtns.forEach(function(b) {{ b.classList.remove('selected'); }});
btn.classList.add('selected');
}}
lastClickedIndex = clickedIndex;
updateInterval();
}}
var wordIndex = 0;
entries.forEach(function(entry, i) {{
var btn = document.createElement('span');
if (entry.type === 'word') {{
var wi = wordIndex;
btn.className = 'word-btn';
btn.dataset.origS = entry.start;
btn.dataset.origE = entry.end;
btn.dataset.adjS = adjustments[wi].adjStart;
btn.dataset.adjE = adjustments[wi].adjEnd;
btn.dataset.wi = wi;
btn.dataset.s = entry.start;
btn.dataset.e = entry.end;
btn.dataset.word = entry.word;
btn.innerHTML = '<span class="time">[' + entry.start.toFixed(3) + '-' + entry.end.toFixed(3) + 's]</span><span class="word"> ' + entry.word + '</span>';
btn.onclick = function(e) {{ handleItemClick(this, e); }};
wordIndex++;
}} else if (entry.type === 'silence') {{
btn.className = 'silence-btn';
btn.dataset.s = entry.start;
btn.dataset.e = entry.end;
var durationMs = Math.round((entry.end - entry.start) * 1000);
btn.innerHTML = '<span class="silence-time">[' + entry.start.toFixed(3) + '-' + entry.end.toFixed(3) + 's]</span><span class="duration">' + durationMs + 'ms</span>';
btn.onclick = function(e) {{ handleItemClick(this, e); }};
}} else if (entry.type === 'untranscribed_speech') {{
btn.className = 'untranscribed-btn';
btn.dataset.s = entry.start;
btn.dataset.e = entry.end;
var durationMs = Math.round((entry.end - entry.start) * 1000);
btn.innerHTML = '<span class="untranscribed-time">[' + entry.start.toFixed(3) + '-' + entry.end.toFixed(3) + 's]</span><span class="duration"> XXXXX ' + durationMs + 'ms</span>';
btn.onclick = function(e) {{ handleItemClick(this, e); }};
}}
container.appendChild(btn);
// Add vertical space after sentence-ending punctuation
if (entry.type === 'word') {{
var lastChar = entry.word.slice(-1);
if (lastChar === '.' || lastChar === '!' || lastChar === '?') {{
var spacer = document.createElement('div');
spacer.style.height = '15px';
container.appendChild(spacer);
}}
}}
}});
function updateWordLabels() {{
var adjusted = document.getElementById('adjust-intervals').checked;
var padCheckbox = document.getElementById('pad-silence');
var padInput = document.getElementById('pad-silence-ms');
var doPad = adjusted && padCheckbox.checked;
// Enable/disable padding controls based on first checkbox
padCheckbox.disabled = !adjusted;
padInput.disabled = !adjusted;
if (!adjusted) {{
padCheckbox.checked = false;
}}
// Compute padded adjustments if needed
var effective = adjustments;
if (doPad) {{
var ms = parseInt(padInput.value) || 50;
if (!paddedCache[ms]) {{
paddedCache[ms] = padTimestampsIntoSilence(adjustments, words, silences, ms);
}}
effective = paddedCache[ms];
}}
var wordBtnsList = Array.from(document.querySelectorAll('.word-btn'));
wordBtnsList.forEach(function(btn) {{
var wi = parseInt(btn.dataset.wi);
var s, e;
if (adjusted) {{
s = effective[wi].adjStart;
e = effective[wi].adjEnd;
}} else {{
s = parseFloat(btn.dataset.origS);
e = parseFloat(btn.dataset.origE);
}}
btn.dataset.s = s;
btn.dataset.e = e;
btn.innerHTML = '<span class="time">[' + s.toFixed(3) + '-' + e.toFixed(3) + 's]</span><span class="word"> ' + btn.dataset.word + '</span>';
}});
// Hide untranscribed_speech blocks that are now covered by adjusted words
var wordBtns = Array.from(container.querySelectorAll('.word-btn'));
container.querySelectorAll('.untranscribed-btn').forEach(function(ubtn) {{
var us = parseFloat(ubtn.dataset.s);
var ue = parseFloat(ubtn.dataset.e);
var covered = wordBtns.some(function(wb) {{
return parseFloat(wb.dataset.s) <= us && parseFloat(wb.dataset.e) >= ue;
}});
ubtn.style.display = covered ? 'none' : '';
}});
// Re-sort all buttons by current start time (silence before word at same time)
var btns = Array.from(container.querySelectorAll('.word-btn, .silence-btn, .untranscribed-btn'));
btns.sort(function(a, b) {{
var sa = parseFloat(a.dataset.s);
var sb = parseFloat(b.dataset.s);
if (sa !== sb) return sa - sb;
var ta = a.classList.contains('silence-btn') ? 0 : 1;
var tb = b.classList.contains('silence-btn') ? 0 : 1;
return ta - tb;
}});
container.innerHTML = '';
btns.forEach(function(btn) {{
container.appendChild(btn);
if (btn.classList.contains('word-btn')) {{
var lastChar = (btn.dataset.word || '').slice(-1);
if (lastChar === '.' || lastChar === '!' || lastChar === '?') {{
var spacer = document.createElement('div');
spacer.style.height = '15px';
container.appendChild(spacer);
}}
}}
}});
updateInterval();
}}
document.getElementById('adjust-intervals').addEventListener('change', function() {{
paddedCache = {{}};
updateWordLabels();
}});
document.getElementById('pad-silence').addEventListener('change', updateWordLabels);
document.getElementById('pad-silence-ms').addEventListener('change', function() {{
paddedCache = {{}};
updateWordLabels();
}});
function updateInterval() {{
var sel = Array.from(document.querySelectorAll('.word-btn.selected, .silence-btn.selected, .untranscribed-btn.selected'));
if (sel.length === 0) return;
// Sort selected items by start time
sel.sort(function(a, b) {{
return parseFloat(a.dataset.s) - parseFloat(b.dataset.s);
}});
// Group into contiguous intervals based on DOM adjacency
var allBtns = getAllButtons();
var intervals = [];
var currentStart = null;
var currentEnd = null;
var lastIndex = -2;
sel.forEach(function(btn) {{
var idx = allBtns.indexOf(btn);
var s = parseFloat(btn.dataset.s);
var e = parseFloat(btn.dataset.e);
if (currentStart === null) {{
// First item
currentStart = s;
currentEnd = e;
lastIndex = idx;
}} else if (idx === lastIndex + 1) {{
// Adjacent item - extend current interval
currentEnd = e;
lastIndex = idx;
}} else {{
// Non-adjacent - save current interval, start new one
intervals.push(currentStart.toFixed(3) + '-' + currentEnd.toFixed(3));
currentStart = s;
currentEnd = e;
lastIndex = idx;
}}
}});
// Don't forget the last interval
if (currentStart !== null) {{
intervals.push(currentStart.toFixed(3) + '-' + currentEnd.toFixed(3));
}}
var intervalStr = intervals.join(' , ');
try {{
var boxes = parent.document.querySelectorAll('input[data-testid="textbox"], textarea');
boxes.forEach(function(box) {{
if (box.placeholder && box.placeholder.indexOf('start-end') !== -1) {{
box.value = intervalStr;
box.dispatchEvent(new Event('input', {{bubbles: true}}));
}}
}});
}} catch(err) {{ console.log('Could not update parent:', err); }}
}}
function highlightFromInterval(intervalStr) {{
if (!intervalStr) return;
// Parse multiple intervals separated by commas
var intervalParts = intervalStr.split(',');
var intervals = [];
intervalParts.forEach(function(part) {{
part = part.trim();
var match = part.match(/^([\\d.]+)\\s*-\\s*([\\d.]+)$/);
if (match) {{
var s = parseFloat(match[1]);
var e = parseFloat(match[2]);
if (!isNaN(s) && !isNaN(e)) {{
intervals.push({{start: s, end: e}});
}}
}}
}});
if (intervals.length === 0) return;
document.querySelectorAll('.word-btn').forEach(function(btn) {{
var ws = parseFloat(btn.dataset.s);
var we = parseFloat(btn.dataset.e);
var itemDuration = we - ws;
var isSelected = false;
// Check if this word overlaps with any interval
intervals.forEach(function(iv) {{
var overlapStart = Math.max(ws, iv.start);
var overlapEnd = Math.min(we, iv.end);
var overlap = Math.max(0, overlapEnd - overlapStart);
if (itemDuration > 0 && (overlap / itemDuration) > 0.5) {{
isSelected = true;
}}
}});
if (isSelected) {{
btn.classList.add('selected');
}} else {{
btn.classList.remove('selected');
}}
}});
}}
function setupParentWatcher() {{
try {{
var boxes = parent.document.querySelectorAll('input[data-testid="textbox"], textarea');
boxes.forEach(function(box) {{
if (box.placeholder && box.placeholder.indexOf('start-end') !== -1) {{
box.addEventListener('blur', function() {{
highlightFromInterval(this.value);
}});
box.addEventListener('keydown', function(e) {{
if (e.key === 'Enter') {{
highlightFromInterval(this.value);
}}
}});
}}
}});
}} catch(err) {{ console.log('Could not setup parent watcher:', err); }}
}}
setTimeout(setupParentWatcher, 500);
</script>
</body>
</html>
'''
iframe_srcdoc = iframe_html.replace('"', '&quot;')
return f'''
<iframe srcdoc="{iframe_srcdoc}" style="width: 100%; height: 250px; border: none; border-radius: 8px;"></iframe>
'''
def build_entries_from_timestamps_and_silence(timestamps_data, silence_periods):
"""Build combined and augmented entries list from word timestamps and silence periods.
Returns a list of word, silence, and untranscribed_speech entries sorted by start time.
"""
entries = []
for item in timestamps_data:
entries.append({
'type': 'word',
'word': item['word'],
'start': round(item['start'], 3),
'end': round(item['end'], 3)
})
for item in silence_periods:
entries.append({
'type': 'silence',
'start': round(item['start'], 3),
'end': round(item['end'], 3)
})
entries.sort(key=lambda x: x['start'])
words = [e for e in entries if e['type'] == 'word']
augmented_entries = []
for i, entry in enumerate(entries):
augmented_entries.append(entry)
if entry['type'] == 'silence' and i + 1 < len(entries) and entries[i + 1]['type'] == 'silence':
gap_start = entry['end']
gap_end = entries[i + 1]['start']
# Subtract word coverage from the gap
effective_start = gap_start
for w in words:
if w['start'] <= effective_start and w['end'] > effective_start:
effective_start = w['end']
if w['start'] < gap_end and w['end'] >= gap_end:
effective_start = gap_end
break
if gap_end - effective_start > 0.01:
augmented_entries.append({
'type': 'untranscribed_speech',
'start': round(effective_start, 3),
'end': round(gap_end, 3)
})
return augmented_entries
with gr.Blocks() as demo:
gr.Markdown(
"""# Timestamps Tester"""
)
gr.Markdown(
"""Testing word timestamps from any transcript. Upload audio and a transcript JSON in one of the supported formats. See [examples of supported formats here](https://drive.google.com/drive/folders/1qrjfHjfssAZQIvSLJi36rLjpuqw3eOjj)."""
)
# State to store audio data in memory for fast extraction
audio_state = gr.State()
timestamps_state = gr.State([]) # Store timestamps for dropdown
transcript_source_state = gr.State(None) # {json_data, format_type, filename} for JSON download
audio_input = gr.Audio(
type="filepath",
label="Upload or record your audio",
sources=["upload", "microphone"],
format="wav"
)
# VAD Controls - inline labels with number inputs
with gr.Row():
vad_engine = gr.Dropdown(
choices=["TEN VAD", "FireRedVAD"],
value="TEN VAD",
show_label=False,
scale=0,
min_width=130
)
gr.Markdown("**Probability**")
vad_prob_threshold = gr.Number(
show_label=False,
value=0.5,
minimum=0.0,
maximum=1.0,
step=0.05,
scale=0,
min_width=80
)
gr.Markdown("**Min Voice Off (ms)**")
vad_min_off = gr.Number(
show_label=False,
value=48,
minimum=16,
maximum=1000,
step=16,
scale=0,
min_width=100
)
gr.Markdown("**Min Voice On (ms)**")
vad_min_on = gr.Number(
show_label=False,
value=64,
minimum=16,
maximum=1000,
step=16,
scale=0,
min_width=100
)
refresh_vad_btn = gr.Button("↻ Refresh VAD", scale=0, min_width=120)
transcript_file_input = gr.File(
label="Load Transcript (JSON)",
file_types=[".json"]
)
transcription_output = gr.Textbox(label="Transcription", lines=5)
timestamps_output = gr.HTML(label="Word Timestamps")
# Time interval input - directly under timestamps (single row, no label)
with gr.Row():
time_input = gr.Textbox(
label="",
show_label=False,
container=False,
placeholder="Time interval(s): start-end (e.g., 0.56-1.20 or 0.5-1.2 , 3.0-4.5)",
scale=3,
elem_id="time-interval-box"
)
play_interval_button = gr.Button("▶ Play Interval", scale=1)
# Track last played interval for smart replay
last_interval_state = gr.State("")
# Track current waveform window boundaries for smart redraw
waveform_window_state = gr.State(None)
# Waveform player - below interval controls
waveform_player = gr.HTML(label="Segment Player")
def load_transcript_and_setup(audio, transcript_file, prob_threshold, min_off_ms, min_on_ms, existing_audio_state, engine):
"""Load external transcript and setup UI."""
if transcript_file is None:
return gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
# Read original JSON and detect format before load_transcript (which doesn't return these)
try:
with open(transcript_file, 'r', encoding='utf-8') as f:
original_json = json.load(f)
_, _, format_type = parse_transcript_file(transcript_file)
filename = os.path.basename(transcript_file)
transcript_source = {'json_data': original_json, 'format_type': format_type, 'filename': filename}
except Exception:
transcript_source = None
text, timestamps_data, audio_data, raw_text, export_metadata, silence_periods = load_transcript(
audio, transcript_file, prob_threshold, int(min_off_ms), int(min_on_ms),
existing_audio_state=existing_audio_state, vad_engine_name=engine
)
if audio_data is None:
return text, "", None, [], gr.update(), transcript_source
entries = build_entries_from_timestamps_and_silence(timestamps_data, silence_periods)
entries_json = json.dumps(entries)
timestamps_html = build_timestamps_iframe_html(entries_json, transcript_source)
initial_player = '''
<div style="padding: 20px; text-align: center; background: #f5f5f5; border-radius: 8px; color: #666;">
<p>Select words above and click <b>▶ Play Interval</b> to hear the segment</p>
</div>
'''
return text, timestamps_html, audio_data, timestamps_data, initial_player, transcript_source
def play_time_interval_fast(audio_state, time_interval, last_interval, current_window):
"""Fast extraction using preloaded audio from memory. Supports multiple intervals."""
def wrap_error(msg):
return f'<div style="min-height: 150px; padding: 20px; text-align: center; background: #f5f5f5; border-radius: 8px;"><p style="color: #666;">{msg}</p></div>', last_interval, current_window
try:
if not time_interval or not audio_state:
return wrap_error("No interval or audio loaded. Select words and try again.")
# Parse multiple intervals separated by commas
# Format: "0.000-2.224 , 4.752-5.696 , 7.216-7.856"
intervals = []
interval_parts = time_interval.split(',')
for part in interval_parts:
part = part.strip()
if '-' in part:
times = part.split('-')
if len(times) == 2:
try:
start_time = float(times[0].strip())
end_time = float(times[1].strip())
if start_time < end_time:
intervals.append((start_time, end_time))
except ValueError:
pass
if not intervals:
return wrap_error("Invalid interval format. Use: start-end (e.g., 1.20-2.50) or multiple: 0.5-1.0 , 2.0-3.0")
# Load/reload audio segment (autoplay will replay even if same interval)
# Pass current window state for smart redraw logic
result_html, new_window = extract_audio_segment(audio_state, intervals, current_window)
return result_html, time_interval, new_window
except Exception as e:
return wrap_error(f"Error: {str(e)}")
def refresh_vad(existing_audio_state, timestamps_data, prob_threshold, min_off_ms, min_on_ms, engine, transcript_source):
"""Re-run VAD and rebuild the timestamps iframe without reloading audio or transcript."""
if existing_audio_state is None or not timestamps_data:
return gr.update()
audio_data, sample_rate = existing_audio_state
silence_periods = []
try:
if engine == "FireRedVAD":
silence_periods = detect_silence_periods_firered(audio_data, sample_rate, prob_threshold, int(min_off_ms), int(min_on_ms))
else:
silence_periods = detect_silence_periods(audio_data, sample_rate, prob_threshold, int(min_off_ms), int(min_on_ms))
print_speech_silence_log(timestamps_data, silence_periods)
except Exception as e:
print(f"[VAD] Error during silence detection: {str(e)}\n{traceback.format_exc()}")
entries = build_entries_from_timestamps_and_silence(timestamps_data, silence_periods)
entries_json = json.dumps(entries)
return build_timestamps_iframe_html(entries_json, transcript_source)
VAD_DEFAULTS = {
"TEN VAD": {"prob": 0.5, "off": 48, "on": 64, "step": 16, "min": 16},
"FireRedVAD": {"prob": 0.4, "off": 200, "on": 200, "step": 10, "min": 10},
}
def on_engine_change(engine):
d = VAD_DEFAULTS.get(engine, VAD_DEFAULTS["TEN VAD"])
return (
gr.update(value=d["prob"]),
gr.update(value=d["off"], step=d["step"], minimum=d["min"]),
gr.update(value=d["on"], step=d["step"], minimum=d["min"]),
)
vad_engine.change(
fn=on_engine_change,
inputs=[vad_engine],
outputs=[vad_prob_threshold, vad_min_off, vad_min_on]
)
# Load transcript file input
transcript_file_input.change(
fn=load_transcript_and_setup,
inputs=[audio_input, transcript_file_input, vad_prob_threshold, vad_min_off, vad_min_on, audio_state, vad_engine],
outputs=[transcription_output, timestamps_output, audio_state, timestamps_state, waveform_player, transcript_source_state]
)
# Refresh VAD button
refresh_vad_btn.click(
fn=refresh_vad,
inputs=[audio_state, timestamps_state, vad_prob_threshold, vad_min_off, vad_min_on, vad_engine, transcript_source_state],
outputs=[timestamps_output],
show_progress="minimal"
)
# Play interval button
play_interval_button.click(
fn=play_time_interval_fast,
inputs=[audio_state, time_input, last_interval_state, waveform_window_state],
outputs=[waveform_player, last_interval_state, waveform_window_state]
)
demo.launch()