voice-tools / tests /integration /test_speaker_extraction_workflow.py
jcudit's picture
jcudit HF Staff
feat: complete audio speaker separation feature with 3 workflows
cb39c05
"""
Integration tests for speaker extraction workflow
Tests the complete end-to-end flow:
1. Load reference clip
2. Extract reference embedding
3. Load target audio
4. Extract target embeddings
5. Match segments based on similarity
6. Export matched segments (concatenated or separate)
"""
import json
from pathlib import Path
import numpy as np
import pytest
from src.services.speaker_extraction import SpeakerExtractionService
from src.lib.audio_io import get_audio_duration, read_audio, write_audio
@pytest.fixture
def speaker_extraction_service():
"""Create SpeakerExtractionService instance for integration testing"""
return SpeakerExtractionService()
@pytest.fixture
def integration_audio_dir():
"""Get path to integration test audio fixtures"""
return Path("audio_fixtures/speaker_extraction/")
@pytest.fixture
def output_dir(tmp_path):
"""Create temporary output directory for test results"""
output = tmp_path / "extraction_output"
output.mkdir()
return output
class TestSpeakerExtractionWorkflow:
"""Integration tests for complete speaker extraction workflow"""
@pytest.mark.integration
def test_extract_single_speaker_concatenated(
self, speaker_extraction_service, integration_audio_dir, output_dir
):
"""Test extracting a single speaker with concatenated output"""
reference_clip = integration_audio_dir / "reference_speaker_a.m4a"
target_file = integration_audio_dir / "multi_speaker_conversation.m4a"
output_file = output_dir / "speaker_a_extracted.m4a"
if not reference_clip.exists() or not target_file.exists():
pytest.skip("Integration audio fixtures not available")
# Perform extraction
report = speaker_extraction_service.extract_and_export(
reference_clip=str(reference_clip),
target_file=str(target_file),
output_path=str(output_file),
threshold=0.40,
min_confidence=0.30,
concatenate=True,
silence_duration_ms=150,
crossfade_duration_ms=75,
)
# Verify output file was created
assert output_file.exists()
assert output_file.stat().st_size > 0
# Verify report contains expected fields
assert report["reference_clip"] == str(reference_clip)
assert report["target_file"] == str(target_file)
assert report["segments_found"] > 0
assert report["segments_included"] > 0
assert report["total_duration_seconds"] > 0
assert 0.0 <= report["average_confidence"] <= 1.0
assert report["processing_time_seconds"] > 0
# Verify output audio is valid
audio_data, sample_rate = read_audio(str(output_file))
assert len(audio_data) > 0
assert sample_rate > 0
@pytest.mark.integration
def test_extract_single_speaker_separate_segments(
self, speaker_extraction_service, integration_audio_dir, output_dir
):
"""Test extracting a single speaker with separate segment files"""
reference_clip = integration_audio_dir / "reference_speaker_a.m4a"
target_file = integration_audio_dir / "multi_speaker_conversation.m4a"
output_dir_path = output_dir / "speaker_a_segments"
if not reference_clip.exists() or not target_file.exists():
pytest.skip("Integration audio fixtures not available")
# Perform extraction with separate segments
report = speaker_extraction_service.extract_and_export(
reference_clip=str(reference_clip),
target_file=str(target_file),
output_path=str(output_dir_path),
threshold=0.40,
concatenate=False,
)
# Verify output directory was created
assert output_dir_path.exists()
assert output_dir_path.is_dir()
# Verify segment files were created
segment_files = list(output_dir_path.glob("segment_*.m4a"))
assert len(segment_files) == report["segments_included"]
# Verify each segment is valid audio
for segment_file in segment_files:
assert segment_file.stat().st_size > 0
audio_data, sample_rate = read_audio(str(segment_file))
assert len(audio_data) > 0
@pytest.mark.integration
def test_extract_with_high_threshold(
self, speaker_extraction_service, integration_audio_dir, output_dir
):
"""Test extraction with strict matching threshold"""
reference_clip = integration_audio_dir / "reference_speaker_a.m4a"
target_file = integration_audio_dir / "multi_speaker_conversation.m4a"
output_file = output_dir / "strict_match.m4a"
if not reference_clip.exists() or not target_file.exists():
pytest.skip("Integration audio fixtures not available")
# Use strict threshold (lower threshold = stricter matching)
report = speaker_extraction_service.extract_and_export(
reference_clip=str(reference_clip),
target_file=str(target_file),
output_path=str(output_file),
threshold=0.25, # Strict
min_confidence=0.40,
)
# Strict matching should find fewer segments
assert report["segments_included"] <= report["segments_found"]
# But should have higher average confidence
if report["segments_included"] > 0:
assert report["average_confidence"] >= 0.40
@pytest.mark.integration
def test_extract_with_low_threshold(
self, speaker_extraction_service, integration_audio_dir, output_dir
):
"""Test extraction with permissive matching threshold"""
reference_clip = integration_audio_dir / "reference_speaker_a.m4a"
target_file = integration_audio_dir / "multi_speaker_conversation.m4a"
output_file = output_dir / "permissive_match.m4a"
if not reference_clip.exists() or not target_file.exists():
pytest.skip("Integration audio fixtures not available")
# Use permissive threshold (higher threshold = less strict)
report = speaker_extraction_service.extract_and_export(
reference_clip=str(reference_clip),
target_file=str(target_file),
output_path=str(output_file),
threshold=0.60, # Permissive
min_confidence=0.20,
)
# Permissive matching should find more segments
assert report["segments_included"] > 0
# May have lower average confidence
assert report["average_confidence"] >= 0.20
@pytest.mark.integration
def test_extract_no_matches_found(
self, speaker_extraction_service, integration_audio_dir, output_dir
):
"""Test extraction when reference speaker not in target"""
reference_clip = integration_audio_dir / "reference_speaker_a.m4a"
target_file = integration_audio_dir / "different_speaker_only.m4a"
output_file = output_dir / "no_matches.m4a"
if not reference_clip.exists() or not target_file.exists():
pytest.skip("Integration audio fixtures not available")
# Should complete but find no matches
report = speaker_extraction_service.extract_and_export(
reference_clip=str(reference_clip),
target_file=str(target_file),
output_path=str(output_file),
threshold=0.40,
)
# Report should indicate no matches
assert report["segments_included"] == 0
# Output file should not be created or be empty
assert not output_file.exists() or output_file.stat().st_size == 0
@pytest.mark.integration
def test_extract_with_custom_output_format(
self, speaker_extraction_service, integration_audio_dir, output_dir
):
"""Test extraction with custom sample rate and bitrate"""
reference_clip = integration_audio_dir / "reference_speaker_a.m4a"
target_file = integration_audio_dir / "multi_speaker_conversation.m4a"
output_file = output_dir / "custom_format.m4a"
if not reference_clip.exists() or not target_file.exists():
pytest.skip("Integration audio fixtures not available")
# Extract with custom audio parameters
report = speaker_extraction_service.extract_and_export(
reference_clip=str(reference_clip),
target_file=str(target_file),
output_path=str(output_file),
threshold=0.40,
concatenate=True,
sample_rate=48000,
bitrate="256k",
)
if output_file.exists():
# Verify output has expected sample rate
audio_data, sample_rate = read_audio(str(output_file))
# Note: actual sample rate may differ based on conversion
assert sample_rate > 0
@pytest.mark.integration
def test_extract_report_json_format(
self, speaker_extraction_service, integration_audio_dir, output_dir
):
"""Test that extraction report is valid JSON with all required fields"""
reference_clip = integration_audio_dir / "reference_speaker_a.m4a"
target_file = integration_audio_dir / "multi_speaker_conversation.m4a"
output_file = output_dir / "extracted.m4a"
report_file = output_dir / "extraction_report.json"
if not reference_clip.exists() or not target_file.exists():
pytest.skip("Integration audio fixtures not available")
# Perform extraction
report = speaker_extraction_service.extract_and_export(
reference_clip=str(reference_clip),
target_file=str(target_file),
output_path=str(output_file),
threshold=0.40,
)
# Write report to JSON file
with open(report_file, "w") as f:
json.dump(report, f, indent=2)
# Verify JSON file is valid
assert report_file.exists()
with open(report_file, "r") as f:
loaded_report = json.load(f)
# Verify all required fields are present
required_fields = [
"reference_clip",
"target_file",
"threshold",
"segments_found",
"segments_included",
"total_duration_seconds",
"average_confidence",
"low_confidence_segments",
"processing_time_seconds",
"output_file",
]
for field in required_fields:
assert field in loaded_report, f"Missing field: {field}"
@pytest.mark.integration
def test_extract_with_progress_callback(
self, speaker_extraction_service, integration_audio_dir, output_dir
):
"""Test extraction with progress reporting callback"""
reference_clip = integration_audio_dir / "reference_speaker_a.m4a"
target_file = integration_audio_dir / "multi_speaker_conversation.m4a"
output_file = output_dir / "with_progress.m4a"
if not reference_clip.exists() or not target_file.exists():
pytest.skip("Integration audio fixtures not available")
progress_updates = []
def progress_callback(stage, current, total):
progress_updates.append(
{
"stage": stage,
"current": current,
"total": total,
"progress": current / total if total > 0 else 0,
}
)
# Perform extraction with progress tracking
report = speaker_extraction_service.extract_and_export(
reference_clip=str(reference_clip),
target_file=str(target_file),
output_path=str(output_file),
threshold=0.40,
progress_callback=progress_callback,
)
# Verify progress callbacks were invoked
assert len(progress_updates) > 0
# Verify progress stages are present
stages = [update["stage"] for update in progress_updates]
assert any("reference" in stage.lower() for stage in stages)
assert any("target" in stage.lower() or "extract" in stage.lower() for stage in stages)
@pytest.mark.integration
def test_extract_crossfade_concatenation(
self, speaker_extraction_service, integration_audio_dir, output_dir
):
"""Test that crossfade is applied when concatenating segments"""
reference_clip = integration_audio_dir / "reference_speaker_a.m4a"
target_file = integration_audio_dir / "multi_speaker_conversation.m4a"
# Test with no crossfade
output_no_fade = output_dir / "no_crossfade.m4a"
report_no_fade = speaker_extraction_service.extract_and_export(
reference_clip=str(reference_clip),
target_file=str(target_file),
output_path=str(output_no_fade),
threshold=0.40,
crossfade_duration_ms=0,
)
# Test with crossfade
output_with_fade = output_dir / "with_crossfade.m4a"
report_with_fade = speaker_extraction_service.extract_and_export(
reference_clip=str(reference_clip),
target_file=str(target_file),
output_path=str(output_with_fade),
threshold=0.40,
crossfade_duration_ms=100,
)
if not reference_clip.exists() or not target_file.exists():
pytest.skip("Integration audio fixtures not available")
# Both should produce valid output
if report_no_fade["segments_included"] > 0:
assert output_no_fade.exists() or output_with_fade.exists()
@pytest.mark.integration
@pytest.mark.slow
def test_extract_long_audio_file(
self, speaker_extraction_service, integration_audio_dir, output_dir
):
"""Test extraction with long audio file (performance test)"""
reference_clip = integration_audio_dir / "reference_speaker_a.m4a"
target_file = integration_audio_dir / "long_conversation_60min.m4a"
output_file = output_dir / "long_extracted.m4a"
if not reference_clip.exists() or not target_file.exists():
pytest.skip("Long audio test fixture not available")
import time
start_time = time.time()
# Perform extraction
report = speaker_extraction_service.extract_and_export(
reference_clip=str(reference_clip),
target_file=str(target_file),
output_path=str(output_file),
threshold=0.40,
)
elapsed_time = time.time() - start_time
# Verify completion
assert report["processing_time_seconds"] > 0
# Should complete in reasonable time (< 2x audio duration)
target_duration = get_audio_duration(str(target_file))
assert elapsed_time < target_duration * 2.0, "Processing too slow"
class TestReferenceClipValidation:
"""Integration tests for reference clip validation"""
@pytest.mark.integration
def test_validate_good_reference_clip(self, speaker_extraction_service, integration_audio_dir):
"""Test validation accepts good quality reference clip"""
reference_clip = integration_audio_dir / "reference_speaker_a.m4a"
if not reference_clip.exists():
pytest.skip("Integration audio fixture not available")
is_valid, message = speaker_extraction_service.validate_reference_clip(str(reference_clip))
assert is_valid is True
@pytest.mark.integration
def test_validate_short_reference_clip(self, speaker_extraction_service, integration_audio_dir):
"""Test validation rejects reference clip shorter than 3 seconds"""
short_clip = integration_audio_dir / "reference_too_short.m4a"
if not short_clip.exists():
pytest.skip("Integration audio fixture not available")
is_valid, message = speaker_extraction_service.validate_reference_clip(str(short_clip))
assert is_valid is False
assert "short" in message.lower()
@pytest.mark.integration
def test_validate_noisy_reference_clip(self, speaker_extraction_service, integration_audio_dir):
"""Test validation warns about low quality reference clip"""
noisy_clip = integration_audio_dir / "reference_noisy.m4a"
if not noisy_clip.exists():
pytest.skip("Integration audio fixture not available")
is_valid, message = speaker_extraction_service.validate_reference_clip(str(noisy_clip))
# Should still be valid but with warning
assert is_valid is True
if message:
assert "quality" in message.lower() or "noisy" in message.lower()