saadmannan's picture
initial commit
b77cba7
#!/usr/bin/env python3
"""
Utility functions for VAD + Diarization pipeline
"""
import numpy as np
import torch
from typing import List, Dict, Optional, Tuple
from pathlib import Path
import json
def load_audio(
path: str,
sampling_rate: int = 16000,
mono: bool = True
) -> Tuple[np.ndarray, int]:
"""
Load audio file with automatic format detection.
Args:
path: Path to audio file
sampling_rate: Target sample rate
mono: Convert to mono
Returns:
Tuple of (audio_data, sample_rate)
"""
try:
import librosa
audio, sr = librosa.load(path, sr=sampling_rate, mono=mono)
return audio, sr
except Exception as e:
print(f"Error loading audio with librosa: {e}")
# Fallback to soundfile
try:
import soundfile as sf
audio, sr = sf.read(path)
# Resample if needed
if sr != sampling_rate:
import librosa
audio = librosa.resample(audio, orig_sr=sr, target_sr=sampling_rate)
sr = sampling_rate
# Convert to mono if needed
if mono and len(audio.shape) > 1:
audio = audio.mean(axis=1)
return audio, sr
except Exception as e:
print(f"Error loading audio with soundfile: {e}")
raise
def save_audio(
audio: np.ndarray,
path: str,
sampling_rate: int = 16000
):
"""
Save audio to file.
Args:
audio: Audio data
path: Output path
sampling_rate: Sample rate
"""
import soundfile as sf
sf.write(path, audio, sampling_rate)
def merge_segments(
segments: List[Dict],
gap_threshold: float = 0.5
) -> List[Dict]:
"""
Merge nearby segments from the same speaker.
Args:
segments: List of segments with 'start', 'end', 'speaker'
gap_threshold: Maximum gap to merge (seconds)
Returns:
Merged segments
"""
if not segments:
return []
# Sort by start time
sorted_segments = sorted(segments, key=lambda x: x['start'])
merged = [sorted_segments[0].copy()]
for seg in sorted_segments[1:]:
last = merged[-1]
# Check if same speaker and close enough
if (seg['speaker'] == last['speaker'] and
seg['start'] - last['end'] <= gap_threshold):
# Merge
last['end'] = seg['end']
last['duration'] = last['end'] - last['start']
else:
# Add new segment
merged.append(seg.copy())
return merged
def filter_short_segments(
segments: List[Dict],
min_duration: float = 0.5
) -> List[Dict]:
"""
Filter out segments shorter than threshold.
Args:
segments: List of segments
min_duration: Minimum duration (seconds)
Returns:
Filtered segments
"""
return [seg for seg in segments if seg['duration'] >= min_duration]
def calculate_overlap(
seg1: Dict,
seg2: Dict
) -> float:
"""
Calculate overlap between two segments.
Args:
seg1: First segment with 'start' and 'end'
seg2: Second segment with 'start' and 'end'
Returns:
Overlap duration in seconds
"""
start = max(seg1['start'], seg2['start'])
end = min(seg1['end'], seg2['end'])
return max(0, end - start)
def segment_to_rttm(
segments: List[Dict],
file_id: str = "audio"
) -> str:
"""
Convert segments to RTTM format.
Args:
segments: List of segments
file_id: File identifier
Returns:
RTTM formatted string
"""
lines = []
for seg in segments:
# RTTM format: SPEAKER file 1 start duration <NA> <NA> speaker <NA> <NA>
line = f"SPEAKER {file_id} 1 {seg['start']:.3f} {seg['duration']:.3f} <NA> <NA> {seg['speaker']} <NA> <NA>"
lines.append(line)
return "\n".join(lines)
def rttm_to_segments(rttm_text: str) -> List[Dict]:
"""
Parse RTTM format to segments.
Args:
rttm_text: RTTM formatted text
Returns:
List of segments
"""
segments = []
for line in rttm_text.strip().split('\n'):
if not line.strip():
continue
parts = line.split()
if parts[0] != 'SPEAKER':
continue
start = float(parts[3])
duration = float(parts[4])
speaker = parts[7]
segments.append({
'start': start,
'end': start + duration,
'duration': duration,
'speaker': speaker
})
return segments
def visualize_timeline(
segments: List[Dict],
duration: Optional[float] = None,
width: int = 80
) -> str:
"""
Create ASCII visualization of speaker timeline.
Args:
segments: List of segments
duration: Total duration (auto-detect if None)
width: Width of visualization
Returns:
ASCII timeline string
"""
if not segments:
return "No segments to visualize"
# Determine duration
if duration is None:
duration = max(seg['end'] for seg in segments)
# Get unique speakers
speakers = sorted(set(seg['speaker'] for seg in segments))
speaker_chars = {}
chars = ['█', '▓', '▒', '░', '●', '○', '■', '□', '▪', '▫']
for i, speaker in enumerate(speakers):
speaker_chars[speaker] = chars[i % len(chars)]
# Create timeline
lines = []
lines.append(f"\nTimeline (0.00s - {duration:.2f}s):")
lines.append("─" * width)
# Time markers
time_line = ""
for i in range(width):
t = (i / width) * duration
if i % 10 == 0:
time_line += f"{t:.0f}s"
time_line += " " * (10 - len(f"{t:.0f}s"))
else:
time_line += " "
lines.append(time_line[:width])
# Speaker rows
for speaker in speakers:
row = [' '] * width
for seg in segments:
if seg['speaker'] == speaker:
start_pos = int((seg['start'] / duration) * width)
end_pos = int((seg['end'] / duration) * width)
for i in range(start_pos, min(end_pos, width)):
row[i] = speaker_chars[speaker]
lines.append(f"{speaker}: {''.join(row)}")
lines.append("─" * width)
return "\n".join(lines)
def export_results(
result: Dict,
output_dir: str,
formats: List[str] = ['json', 'rttm', 'txt']
):
"""
Export results in multiple formats.
Args:
result: Pipeline result
output_dir: Output directory
formats: List of formats to export
"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
base_name = Path(result['audio_path']).stem
for fmt in formats:
if fmt == 'json':
# JSON format
json_path = output_path / f"{base_name}.json"
with open(json_path, 'w') as f:
json.dump(result, f, indent=2)
print(f"✓ Saved JSON: {json_path}")
elif fmt == 'rttm':
# RTTM format
rttm_path = output_path / f"{base_name}.rttm"
rttm_text = segment_to_rttm(result['speaker_segments'], base_name)
with open(rttm_path, 'w') as f:
f.write(rttm_text)
print(f"✓ Saved RTTM: {rttm_path}")
elif fmt == 'txt':
# Text format
txt_path = output_path / f"{base_name}.txt"
lines = []
lines.append("="*60)
lines.append("SPEAKER DIARIZATION RESULTS")
lines.append("="*60)
lines.append(f"\nFile: {result['audio_path']}")
lines.append(f"Speakers: {result['metadata']['num_speakers']}")
lines.append(f"Segments: {result['metadata']['num_segments']}")
lines.append(f"\nTimeline:")
lines.append("-"*60)
for seg in result['speaker_segments']:
lines.append(f"{seg['start']:7.2f}s - {seg['end']:7.2f}s: {seg['speaker']}")
with open(txt_path, 'w') as f:
f.write("\n".join(lines))
print(f"✓ Saved TXT: {txt_path}")
def create_test_audio(
output_path: str = "test_audio.wav",
duration: float = 10.0,
sampling_rate: int = 16000
) -> str:
"""
Create synthetic test audio with speech-like patterns.
Args:
output_path: Output file path
duration: Duration in seconds
sampling_rate: Sample rate
Returns:
Path to created file
"""
import soundfile as sf
# Generate audio
t = np.linspace(0, duration, int(sampling_rate * duration))
# Create speech-like patterns with silence
signal = np.zeros_like(t)
# Calculate segment lengths
seg1_len = min(int(sampling_rate*3), len(signal))
seg2_start = int(sampling_rate*4)
seg2_end = min(int(sampling_rate*7), len(signal))
seg3_start = min(int(sampling_rate*8), len(signal))
# Speaker 1: 0-3s (or until end)
if seg1_len > 0:
signal[0:seg1_len] = 0.3 * np.sin(2 * np.pi * 440 * t[0:seg1_len])
# Silence: 3-4s
# Speaker 2: 4-7s (or until end)
if seg2_start < len(signal) and seg2_end > seg2_start:
seg2_len = seg2_end - seg2_start
signal[seg2_start:seg2_end] = 0.3 * np.sin(2 * np.pi * 880 * t[seg2_start:seg2_end])
# Silence: 7-8s
# Speaker 1: 8-10s (or until end)
if seg3_start < len(signal):
signal[seg3_start:] = 0.3 * np.sin(2 * np.pi * 440 * t[seg3_start:])
# Add some noise
signal += 0.01 * np.random.randn(len(signal))
# Save
sf.write(output_path, signal, sampling_rate)
return output_path
if __name__ == "__main__":
# Demo utilities
print("Utility functions loaded")
# Create test audio
test_path = create_test_audio()
print(f"✓ Created test audio: {test_path}")