""" Integration tests for speaker extraction workflow Tests the complete end-to-end flow: 1. Load reference clip 2. Extract reference embedding 3. Load target audio 4. Extract target embeddings 5. Match segments based on similarity 6. Export matched segments (concatenated or separate) """ import json from pathlib import Path import numpy as np import pytest from src.services.speaker_extraction import SpeakerExtractionService from src.lib.audio_io import get_audio_duration, read_audio, write_audio @pytest.fixture def speaker_extraction_service(): """Create SpeakerExtractionService instance for integration testing""" return SpeakerExtractionService() @pytest.fixture def integration_audio_dir(): """Get path to integration test audio fixtures""" return Path("audio_fixtures/speaker_extraction/") @pytest.fixture def output_dir(tmp_path): """Create temporary output directory for test results""" output = tmp_path / "extraction_output" output.mkdir() return output class TestSpeakerExtractionWorkflow: """Integration tests for complete speaker extraction workflow""" @pytest.mark.integration def test_extract_single_speaker_concatenated( self, speaker_extraction_service, integration_audio_dir, output_dir ): """Test extracting a single speaker with concatenated output""" reference_clip = integration_audio_dir / "reference_speaker_a.m4a" target_file = integration_audio_dir / "multi_speaker_conversation.m4a" output_file = output_dir / "speaker_a_extracted.m4a" if not reference_clip.exists() or not target_file.exists(): pytest.skip("Integration audio fixtures not available") # Perform extraction report = speaker_extraction_service.extract_and_export( reference_clip=str(reference_clip), target_file=str(target_file), output_path=str(output_file), threshold=0.40, min_confidence=0.30, concatenate=True, silence_duration_ms=150, crossfade_duration_ms=75, ) # Verify output file was created assert output_file.exists() assert output_file.stat().st_size > 0 # Verify report contains expected fields assert report["reference_clip"] == str(reference_clip) assert report["target_file"] == str(target_file) assert report["segments_found"] > 0 assert report["segments_included"] > 0 assert report["total_duration_seconds"] > 0 assert 0.0 <= report["average_confidence"] <= 1.0 assert report["processing_time_seconds"] > 0 # Verify output audio is valid audio_data, sample_rate = read_audio(str(output_file)) assert len(audio_data) > 0 assert sample_rate > 0 @pytest.mark.integration def test_extract_single_speaker_separate_segments( self, speaker_extraction_service, integration_audio_dir, output_dir ): """Test extracting a single speaker with separate segment files""" reference_clip = integration_audio_dir / "reference_speaker_a.m4a" target_file = integration_audio_dir / "multi_speaker_conversation.m4a" output_dir_path = output_dir / "speaker_a_segments" if not reference_clip.exists() or not target_file.exists(): pytest.skip("Integration audio fixtures not available") # Perform extraction with separate segments report = speaker_extraction_service.extract_and_export( reference_clip=str(reference_clip), target_file=str(target_file), output_path=str(output_dir_path), threshold=0.40, concatenate=False, ) # Verify output directory was created assert output_dir_path.exists() assert output_dir_path.is_dir() # Verify segment files were created segment_files = list(output_dir_path.glob("segment_*.m4a")) assert len(segment_files) == report["segments_included"] # Verify each segment is valid audio for segment_file in segment_files: assert segment_file.stat().st_size > 0 audio_data, sample_rate = read_audio(str(segment_file)) assert len(audio_data) > 0 @pytest.mark.integration def test_extract_with_high_threshold( self, speaker_extraction_service, integration_audio_dir, output_dir ): """Test extraction with strict matching threshold""" reference_clip = integration_audio_dir / "reference_speaker_a.m4a" target_file = integration_audio_dir / "multi_speaker_conversation.m4a" output_file = output_dir / "strict_match.m4a" if not reference_clip.exists() or not target_file.exists(): pytest.skip("Integration audio fixtures not available") # Use strict threshold (lower threshold = stricter matching) report = speaker_extraction_service.extract_and_export( reference_clip=str(reference_clip), target_file=str(target_file), output_path=str(output_file), threshold=0.25, # Strict min_confidence=0.40, ) # Strict matching should find fewer segments assert report["segments_included"] <= report["segments_found"] # But should have higher average confidence if report["segments_included"] > 0: assert report["average_confidence"] >= 0.40 @pytest.mark.integration def test_extract_with_low_threshold( self, speaker_extraction_service, integration_audio_dir, output_dir ): """Test extraction with permissive matching threshold""" reference_clip = integration_audio_dir / "reference_speaker_a.m4a" target_file = integration_audio_dir / "multi_speaker_conversation.m4a" output_file = output_dir / "permissive_match.m4a" if not reference_clip.exists() or not target_file.exists(): pytest.skip("Integration audio fixtures not available") # Use permissive threshold (higher threshold = less strict) report = speaker_extraction_service.extract_and_export( reference_clip=str(reference_clip), target_file=str(target_file), output_path=str(output_file), threshold=0.60, # Permissive min_confidence=0.20, ) # Permissive matching should find more segments assert report["segments_included"] > 0 # May have lower average confidence assert report["average_confidence"] >= 0.20 @pytest.mark.integration def test_extract_no_matches_found( self, speaker_extraction_service, integration_audio_dir, output_dir ): """Test extraction when reference speaker not in target""" reference_clip = integration_audio_dir / "reference_speaker_a.m4a" target_file = integration_audio_dir / "different_speaker_only.m4a" output_file = output_dir / "no_matches.m4a" if not reference_clip.exists() or not target_file.exists(): pytest.skip("Integration audio fixtures not available") # Should complete but find no matches report = speaker_extraction_service.extract_and_export( reference_clip=str(reference_clip), target_file=str(target_file), output_path=str(output_file), threshold=0.40, ) # Report should indicate no matches assert report["segments_included"] == 0 # Output file should not be created or be empty assert not output_file.exists() or output_file.stat().st_size == 0 @pytest.mark.integration def test_extract_with_custom_output_format( self, speaker_extraction_service, integration_audio_dir, output_dir ): """Test extraction with custom sample rate and bitrate""" reference_clip = integration_audio_dir / "reference_speaker_a.m4a" target_file = integration_audio_dir / "multi_speaker_conversation.m4a" output_file = output_dir / "custom_format.m4a" if not reference_clip.exists() or not target_file.exists(): pytest.skip("Integration audio fixtures not available") # Extract with custom audio parameters report = speaker_extraction_service.extract_and_export( reference_clip=str(reference_clip), target_file=str(target_file), output_path=str(output_file), threshold=0.40, concatenate=True, sample_rate=48000, bitrate="256k", ) if output_file.exists(): # Verify output has expected sample rate audio_data, sample_rate = read_audio(str(output_file)) # Note: actual sample rate may differ based on conversion assert sample_rate > 0 @pytest.mark.integration def test_extract_report_json_format( self, speaker_extraction_service, integration_audio_dir, output_dir ): """Test that extraction report is valid JSON with all required fields""" reference_clip = integration_audio_dir / "reference_speaker_a.m4a" target_file = integration_audio_dir / "multi_speaker_conversation.m4a" output_file = output_dir / "extracted.m4a" report_file = output_dir / "extraction_report.json" if not reference_clip.exists() or not target_file.exists(): pytest.skip("Integration audio fixtures not available") # Perform extraction report = speaker_extraction_service.extract_and_export( reference_clip=str(reference_clip), target_file=str(target_file), output_path=str(output_file), threshold=0.40, ) # Write report to JSON file with open(report_file, "w") as f: json.dump(report, f, indent=2) # Verify JSON file is valid assert report_file.exists() with open(report_file, "r") as f: loaded_report = json.load(f) # Verify all required fields are present required_fields = [ "reference_clip", "target_file", "threshold", "segments_found", "segments_included", "total_duration_seconds", "average_confidence", "low_confidence_segments", "processing_time_seconds", "output_file", ] for field in required_fields: assert field in loaded_report, f"Missing field: {field}" @pytest.mark.integration def test_extract_with_progress_callback( self, speaker_extraction_service, integration_audio_dir, output_dir ): """Test extraction with progress reporting callback""" reference_clip = integration_audio_dir / "reference_speaker_a.m4a" target_file = integration_audio_dir / "multi_speaker_conversation.m4a" output_file = output_dir / "with_progress.m4a" if not reference_clip.exists() or not target_file.exists(): pytest.skip("Integration audio fixtures not available") progress_updates = [] def progress_callback(stage, current, total): progress_updates.append( { "stage": stage, "current": current, "total": total, "progress": current / total if total > 0 else 0, } ) # Perform extraction with progress tracking report = speaker_extraction_service.extract_and_export( reference_clip=str(reference_clip), target_file=str(target_file), output_path=str(output_file), threshold=0.40, progress_callback=progress_callback, ) # Verify progress callbacks were invoked assert len(progress_updates) > 0 # Verify progress stages are present stages = [update["stage"] for update in progress_updates] assert any("reference" in stage.lower() for stage in stages) assert any("target" in stage.lower() or "extract" in stage.lower() for stage in stages) @pytest.mark.integration def test_extract_crossfade_concatenation( self, speaker_extraction_service, integration_audio_dir, output_dir ): """Test that crossfade is applied when concatenating segments""" reference_clip = integration_audio_dir / "reference_speaker_a.m4a" target_file = integration_audio_dir / "multi_speaker_conversation.m4a" # Test with no crossfade output_no_fade = output_dir / "no_crossfade.m4a" report_no_fade = speaker_extraction_service.extract_and_export( reference_clip=str(reference_clip), target_file=str(target_file), output_path=str(output_no_fade), threshold=0.40, crossfade_duration_ms=0, ) # Test with crossfade output_with_fade = output_dir / "with_crossfade.m4a" report_with_fade = speaker_extraction_service.extract_and_export( reference_clip=str(reference_clip), target_file=str(target_file), output_path=str(output_with_fade), threshold=0.40, crossfade_duration_ms=100, ) if not reference_clip.exists() or not target_file.exists(): pytest.skip("Integration audio fixtures not available") # Both should produce valid output if report_no_fade["segments_included"] > 0: assert output_no_fade.exists() or output_with_fade.exists() @pytest.mark.integration @pytest.mark.slow def test_extract_long_audio_file( self, speaker_extraction_service, integration_audio_dir, output_dir ): """Test extraction with long audio file (performance test)""" reference_clip = integration_audio_dir / "reference_speaker_a.m4a" target_file = integration_audio_dir / "long_conversation_60min.m4a" output_file = output_dir / "long_extracted.m4a" if not reference_clip.exists() or not target_file.exists(): pytest.skip("Long audio test fixture not available") import time start_time = time.time() # Perform extraction report = speaker_extraction_service.extract_and_export( reference_clip=str(reference_clip), target_file=str(target_file), output_path=str(output_file), threshold=0.40, ) elapsed_time = time.time() - start_time # Verify completion assert report["processing_time_seconds"] > 0 # Should complete in reasonable time (< 2x audio duration) target_duration = get_audio_duration(str(target_file)) assert elapsed_time < target_duration * 2.0, "Processing too slow" class TestReferenceClipValidation: """Integration tests for reference clip validation""" @pytest.mark.integration def test_validate_good_reference_clip(self, speaker_extraction_service, integration_audio_dir): """Test validation accepts good quality reference clip""" reference_clip = integration_audio_dir / "reference_speaker_a.m4a" if not reference_clip.exists(): pytest.skip("Integration audio fixture not available") is_valid, message = speaker_extraction_service.validate_reference_clip(str(reference_clip)) assert is_valid is True @pytest.mark.integration def test_validate_short_reference_clip(self, speaker_extraction_service, integration_audio_dir): """Test validation rejects reference clip shorter than 3 seconds""" short_clip = integration_audio_dir / "reference_too_short.m4a" if not short_clip.exists(): pytest.skip("Integration audio fixture not available") is_valid, message = speaker_extraction_service.validate_reference_clip(str(short_clip)) assert is_valid is False assert "short" in message.lower() @pytest.mark.integration def test_validate_noisy_reference_clip(self, speaker_extraction_service, integration_audio_dir): """Test validation warns about low quality reference clip""" noisy_clip = integration_audio_dir / "reference_noisy.m4a" if not noisy_clip.exists(): pytest.skip("Integration audio fixture not available") is_valid, message = speaker_extraction_service.validate_reference_clip(str(noisy_clip)) # Should still be valid but with warning assert is_valid is True if message: assert "quality" in message.lower() or "noisy" in message.lower()