Spaces:
Running on Zero
Running on Zero
| """ | |
| Integration tests for speaker extraction workflow | |
| Tests the complete end-to-end flow: | |
| 1. Load reference clip | |
| 2. Extract reference embedding | |
| 3. Load target audio | |
| 4. Extract target embeddings | |
| 5. Match segments based on similarity | |
| 6. Export matched segments (concatenated or separate) | |
| """ | |
| import json | |
| from pathlib import Path | |
| import numpy as np | |
| import pytest | |
| from src.services.speaker_extraction import SpeakerExtractionService | |
| from src.lib.audio_io import get_audio_duration, read_audio, write_audio | |
| def speaker_extraction_service(): | |
| """Create SpeakerExtractionService instance for integration testing""" | |
| return SpeakerExtractionService() | |
| def integration_audio_dir(): | |
| """Get path to integration test audio fixtures""" | |
| return Path("audio_fixtures/speaker_extraction/") | |
| def output_dir(tmp_path): | |
| """Create temporary output directory for test results""" | |
| output = tmp_path / "extraction_output" | |
| output.mkdir() | |
| return output | |
| class TestSpeakerExtractionWorkflow: | |
| """Integration tests for complete speaker extraction workflow""" | |
| def test_extract_single_speaker_concatenated( | |
| self, speaker_extraction_service, integration_audio_dir, output_dir | |
| ): | |
| """Test extracting a single speaker with concatenated output""" | |
| reference_clip = integration_audio_dir / "reference_speaker_a.m4a" | |
| target_file = integration_audio_dir / "multi_speaker_conversation.m4a" | |
| output_file = output_dir / "speaker_a_extracted.m4a" | |
| if not reference_clip.exists() or not target_file.exists(): | |
| pytest.skip("Integration audio fixtures not available") | |
| # Perform extraction | |
| report = speaker_extraction_service.extract_and_export( | |
| reference_clip=str(reference_clip), | |
| target_file=str(target_file), | |
| output_path=str(output_file), | |
| threshold=0.40, | |
| min_confidence=0.30, | |
| concatenate=True, | |
| silence_duration_ms=150, | |
| crossfade_duration_ms=75, | |
| ) | |
| # Verify output file was created | |
| assert output_file.exists() | |
| assert output_file.stat().st_size > 0 | |
| # Verify report contains expected fields | |
| assert report["reference_clip"] == str(reference_clip) | |
| assert report["target_file"] == str(target_file) | |
| assert report["segments_found"] > 0 | |
| assert report["segments_included"] > 0 | |
| assert report["total_duration_seconds"] > 0 | |
| assert 0.0 <= report["average_confidence"] <= 1.0 | |
| assert report["processing_time_seconds"] > 0 | |
| # Verify output audio is valid | |
| audio_data, sample_rate = read_audio(str(output_file)) | |
| assert len(audio_data) > 0 | |
| assert sample_rate > 0 | |
| def test_extract_single_speaker_separate_segments( | |
| self, speaker_extraction_service, integration_audio_dir, output_dir | |
| ): | |
| """Test extracting a single speaker with separate segment files""" | |
| reference_clip = integration_audio_dir / "reference_speaker_a.m4a" | |
| target_file = integration_audio_dir / "multi_speaker_conversation.m4a" | |
| output_dir_path = output_dir / "speaker_a_segments" | |
| if not reference_clip.exists() or not target_file.exists(): | |
| pytest.skip("Integration audio fixtures not available") | |
| # Perform extraction with separate segments | |
| report = speaker_extraction_service.extract_and_export( | |
| reference_clip=str(reference_clip), | |
| target_file=str(target_file), | |
| output_path=str(output_dir_path), | |
| threshold=0.40, | |
| concatenate=False, | |
| ) | |
| # Verify output directory was created | |
| assert output_dir_path.exists() | |
| assert output_dir_path.is_dir() | |
| # Verify segment files were created | |
| segment_files = list(output_dir_path.glob("segment_*.m4a")) | |
| assert len(segment_files) == report["segments_included"] | |
| # Verify each segment is valid audio | |
| for segment_file in segment_files: | |
| assert segment_file.stat().st_size > 0 | |
| audio_data, sample_rate = read_audio(str(segment_file)) | |
| assert len(audio_data) > 0 | |
| def test_extract_with_high_threshold( | |
| self, speaker_extraction_service, integration_audio_dir, output_dir | |
| ): | |
| """Test extraction with strict matching threshold""" | |
| reference_clip = integration_audio_dir / "reference_speaker_a.m4a" | |
| target_file = integration_audio_dir / "multi_speaker_conversation.m4a" | |
| output_file = output_dir / "strict_match.m4a" | |
| if not reference_clip.exists() or not target_file.exists(): | |
| pytest.skip("Integration audio fixtures not available") | |
| # Use strict threshold (lower threshold = stricter matching) | |
| report = speaker_extraction_service.extract_and_export( | |
| reference_clip=str(reference_clip), | |
| target_file=str(target_file), | |
| output_path=str(output_file), | |
| threshold=0.25, # Strict | |
| min_confidence=0.40, | |
| ) | |
| # Strict matching should find fewer segments | |
| assert report["segments_included"] <= report["segments_found"] | |
| # But should have higher average confidence | |
| if report["segments_included"] > 0: | |
| assert report["average_confidence"] >= 0.40 | |
| def test_extract_with_low_threshold( | |
| self, speaker_extraction_service, integration_audio_dir, output_dir | |
| ): | |
| """Test extraction with permissive matching threshold""" | |
| reference_clip = integration_audio_dir / "reference_speaker_a.m4a" | |
| target_file = integration_audio_dir / "multi_speaker_conversation.m4a" | |
| output_file = output_dir / "permissive_match.m4a" | |
| if not reference_clip.exists() or not target_file.exists(): | |
| pytest.skip("Integration audio fixtures not available") | |
| # Use permissive threshold (higher threshold = less strict) | |
| report = speaker_extraction_service.extract_and_export( | |
| reference_clip=str(reference_clip), | |
| target_file=str(target_file), | |
| output_path=str(output_file), | |
| threshold=0.60, # Permissive | |
| min_confidence=0.20, | |
| ) | |
| # Permissive matching should find more segments | |
| assert report["segments_included"] > 0 | |
| # May have lower average confidence | |
| assert report["average_confidence"] >= 0.20 | |
| def test_extract_no_matches_found( | |
| self, speaker_extraction_service, integration_audio_dir, output_dir | |
| ): | |
| """Test extraction when reference speaker not in target""" | |
| reference_clip = integration_audio_dir / "reference_speaker_a.m4a" | |
| target_file = integration_audio_dir / "different_speaker_only.m4a" | |
| output_file = output_dir / "no_matches.m4a" | |
| if not reference_clip.exists() or not target_file.exists(): | |
| pytest.skip("Integration audio fixtures not available") | |
| # Should complete but find no matches | |
| report = speaker_extraction_service.extract_and_export( | |
| reference_clip=str(reference_clip), | |
| target_file=str(target_file), | |
| output_path=str(output_file), | |
| threshold=0.40, | |
| ) | |
| # Report should indicate no matches | |
| assert report["segments_included"] == 0 | |
| # Output file should not be created or be empty | |
| assert not output_file.exists() or output_file.stat().st_size == 0 | |
| def test_extract_with_custom_output_format( | |
| self, speaker_extraction_service, integration_audio_dir, output_dir | |
| ): | |
| """Test extraction with custom sample rate and bitrate""" | |
| reference_clip = integration_audio_dir / "reference_speaker_a.m4a" | |
| target_file = integration_audio_dir / "multi_speaker_conversation.m4a" | |
| output_file = output_dir / "custom_format.m4a" | |
| if not reference_clip.exists() or not target_file.exists(): | |
| pytest.skip("Integration audio fixtures not available") | |
| # Extract with custom audio parameters | |
| report = speaker_extraction_service.extract_and_export( | |
| reference_clip=str(reference_clip), | |
| target_file=str(target_file), | |
| output_path=str(output_file), | |
| threshold=0.40, | |
| concatenate=True, | |
| sample_rate=48000, | |
| bitrate="256k", | |
| ) | |
| if output_file.exists(): | |
| # Verify output has expected sample rate | |
| audio_data, sample_rate = read_audio(str(output_file)) | |
| # Note: actual sample rate may differ based on conversion | |
| assert sample_rate > 0 | |
| def test_extract_report_json_format( | |
| self, speaker_extraction_service, integration_audio_dir, output_dir | |
| ): | |
| """Test that extraction report is valid JSON with all required fields""" | |
| reference_clip = integration_audio_dir / "reference_speaker_a.m4a" | |
| target_file = integration_audio_dir / "multi_speaker_conversation.m4a" | |
| output_file = output_dir / "extracted.m4a" | |
| report_file = output_dir / "extraction_report.json" | |
| if not reference_clip.exists() or not target_file.exists(): | |
| pytest.skip("Integration audio fixtures not available") | |
| # Perform extraction | |
| report = speaker_extraction_service.extract_and_export( | |
| reference_clip=str(reference_clip), | |
| target_file=str(target_file), | |
| output_path=str(output_file), | |
| threshold=0.40, | |
| ) | |
| # Write report to JSON file | |
| with open(report_file, "w") as f: | |
| json.dump(report, f, indent=2) | |
| # Verify JSON file is valid | |
| assert report_file.exists() | |
| with open(report_file, "r") as f: | |
| loaded_report = json.load(f) | |
| # Verify all required fields are present | |
| required_fields = [ | |
| "reference_clip", | |
| "target_file", | |
| "threshold", | |
| "segments_found", | |
| "segments_included", | |
| "total_duration_seconds", | |
| "average_confidence", | |
| "low_confidence_segments", | |
| "processing_time_seconds", | |
| "output_file", | |
| ] | |
| for field in required_fields: | |
| assert field in loaded_report, f"Missing field: {field}" | |
| def test_extract_with_progress_callback( | |
| self, speaker_extraction_service, integration_audio_dir, output_dir | |
| ): | |
| """Test extraction with progress reporting callback""" | |
| reference_clip = integration_audio_dir / "reference_speaker_a.m4a" | |
| target_file = integration_audio_dir / "multi_speaker_conversation.m4a" | |
| output_file = output_dir / "with_progress.m4a" | |
| if not reference_clip.exists() or not target_file.exists(): | |
| pytest.skip("Integration audio fixtures not available") | |
| progress_updates = [] | |
| def progress_callback(stage, current, total): | |
| progress_updates.append( | |
| { | |
| "stage": stage, | |
| "current": current, | |
| "total": total, | |
| "progress": current / total if total > 0 else 0, | |
| } | |
| ) | |
| # Perform extraction with progress tracking | |
| report = speaker_extraction_service.extract_and_export( | |
| reference_clip=str(reference_clip), | |
| target_file=str(target_file), | |
| output_path=str(output_file), | |
| threshold=0.40, | |
| progress_callback=progress_callback, | |
| ) | |
| # Verify progress callbacks were invoked | |
| assert len(progress_updates) > 0 | |
| # Verify progress stages are present | |
| stages = [update["stage"] for update in progress_updates] | |
| assert any("reference" in stage.lower() for stage in stages) | |
| assert any("target" in stage.lower() or "extract" in stage.lower() for stage in stages) | |
| def test_extract_crossfade_concatenation( | |
| self, speaker_extraction_service, integration_audio_dir, output_dir | |
| ): | |
| """Test that crossfade is applied when concatenating segments""" | |
| reference_clip = integration_audio_dir / "reference_speaker_a.m4a" | |
| target_file = integration_audio_dir / "multi_speaker_conversation.m4a" | |
| # Test with no crossfade | |
| output_no_fade = output_dir / "no_crossfade.m4a" | |
| report_no_fade = speaker_extraction_service.extract_and_export( | |
| reference_clip=str(reference_clip), | |
| target_file=str(target_file), | |
| output_path=str(output_no_fade), | |
| threshold=0.40, | |
| crossfade_duration_ms=0, | |
| ) | |
| # Test with crossfade | |
| output_with_fade = output_dir / "with_crossfade.m4a" | |
| report_with_fade = speaker_extraction_service.extract_and_export( | |
| reference_clip=str(reference_clip), | |
| target_file=str(target_file), | |
| output_path=str(output_with_fade), | |
| threshold=0.40, | |
| crossfade_duration_ms=100, | |
| ) | |
| if not reference_clip.exists() or not target_file.exists(): | |
| pytest.skip("Integration audio fixtures not available") | |
| # Both should produce valid output | |
| if report_no_fade["segments_included"] > 0: | |
| assert output_no_fade.exists() or output_with_fade.exists() | |
| def test_extract_long_audio_file( | |
| self, speaker_extraction_service, integration_audio_dir, output_dir | |
| ): | |
| """Test extraction with long audio file (performance test)""" | |
| reference_clip = integration_audio_dir / "reference_speaker_a.m4a" | |
| target_file = integration_audio_dir / "long_conversation_60min.m4a" | |
| output_file = output_dir / "long_extracted.m4a" | |
| if not reference_clip.exists() or not target_file.exists(): | |
| pytest.skip("Long audio test fixture not available") | |
| import time | |
| start_time = time.time() | |
| # Perform extraction | |
| report = speaker_extraction_service.extract_and_export( | |
| reference_clip=str(reference_clip), | |
| target_file=str(target_file), | |
| output_path=str(output_file), | |
| threshold=0.40, | |
| ) | |
| elapsed_time = time.time() - start_time | |
| # Verify completion | |
| assert report["processing_time_seconds"] > 0 | |
| # Should complete in reasonable time (< 2x audio duration) | |
| target_duration = get_audio_duration(str(target_file)) | |
| assert elapsed_time < target_duration * 2.0, "Processing too slow" | |
| class TestReferenceClipValidation: | |
| """Integration tests for reference clip validation""" | |
| def test_validate_good_reference_clip(self, speaker_extraction_service, integration_audio_dir): | |
| """Test validation accepts good quality reference clip""" | |
| reference_clip = integration_audio_dir / "reference_speaker_a.m4a" | |
| if not reference_clip.exists(): | |
| pytest.skip("Integration audio fixture not available") | |
| is_valid, message = speaker_extraction_service.validate_reference_clip(str(reference_clip)) | |
| assert is_valid is True | |
| def test_validate_short_reference_clip(self, speaker_extraction_service, integration_audio_dir): | |
| """Test validation rejects reference clip shorter than 3 seconds""" | |
| short_clip = integration_audio_dir / "reference_too_short.m4a" | |
| if not short_clip.exists(): | |
| pytest.skip("Integration audio fixture not available") | |
| is_valid, message = speaker_extraction_service.validate_reference_clip(str(short_clip)) | |
| assert is_valid is False | |
| assert "short" in message.lower() | |
| def test_validate_noisy_reference_clip(self, speaker_extraction_service, integration_audio_dir): | |
| """Test validation warns about low quality reference clip""" | |
| noisy_clip = integration_audio_dir / "reference_noisy.m4a" | |
| if not noisy_clip.exists(): | |
| pytest.skip("Integration audio fixture not available") | |
| is_valid, message = speaker_extraction_service.validate_reference_clip(str(noisy_clip)) | |
| # Should still be valid but with warning | |
| assert is_valid is True | |
| if message: | |
| assert "quality" in message.lower() or "noisy" in message.lower() | |