Spaces:
Sleeping
Sleeping
Michael Hu
feat: replace legacy TTS providers with Chatterbox as the single, default provider
237cb26 | """Integration tests for the complete audio processing pipeline.""" | |
| import os | |
| import tempfile | |
| import time | |
| import pytest | |
| from pathlib import Path | |
| from unittest.mock import Mock, patch, MagicMock | |
| from typing import Dict, Any, Optional | |
| from src.application.services.audio_processing_service import AudioProcessingApplicationService | |
| from src.application.dtos.audio_upload_dto import AudioUploadDto | |
| from src.application.dtos.processing_request_dto import ProcessingRequestDto | |
| from src.application.dtos.processing_result_dto import ProcessingResultDto | |
| from src.infrastructure.config.dependency_container import DependencyContainer | |
| from src.infrastructure.config.app_config import AppConfig | |
| from src.domain.models.audio_content import AudioContent | |
| from src.domain.models.text_content import TextContent | |
| from src.domain.models.voice_settings import VoiceSettings | |
| from src.domain.exceptions import ( | |
| SpeechRecognitionException, | |
| TranslationFailedException, | |
| SpeechSynthesisException | |
| ) | |
| class TestAudioProcessingPipeline: | |
| """Integration tests for the complete audio processing pipeline.""" | |
| def temp_dir(self): | |
| """Create temporary directory for test files.""" | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| yield temp_dir | |
| def mock_config(self, temp_dir): | |
| """Create mock configuration for testing.""" | |
| config = Mock(spec=AppConfig) | |
| # Processing configuration | |
| config.get_processing_config.return_value = { | |
| 'max_file_size_mb': 50, | |
| 'supported_audio_formats': ['wav', 'mp3', 'flac'], | |
| 'temp_dir': temp_dir, | |
| 'cleanup_temp_files': True | |
| } | |
| # Logging configuration | |
| config.get_logging_config.return_value = { | |
| 'level': 'INFO', | |
| 'enable_file_logging': False, | |
| 'log_file_path': os.path.join(temp_dir, 'test.log'), | |
| 'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| } | |
| # STT configuration | |
| config.get_stt_config.return_value = { | |
| 'preferred_providers': ['parakeet', 'whisper-small', 'whisper-medium'] | |
| } | |
| # TTS configuration | |
| config.get_tts_config.return_value = { | |
| 'preferred_providers': ['chatterbox'] | |
| } | |
| return config | |
| def mock_container(self, mock_config): | |
| """Create mock dependency container for testing.""" | |
| container = Mock(spec=DependencyContainer) | |
| container.resolve.return_value = mock_config | |
| # Mock STT provider | |
| mock_stt_provider = Mock() | |
| mock_stt_provider.transcribe.return_value = TextContent( | |
| text="Hello, this is a test transcription.", | |
| language="en" | |
| ) | |
| container.get_stt_provider.return_value = mock_stt_provider | |
| # Mock translation provider | |
| mock_translation_provider = Mock() | |
| mock_translation_provider.translate.return_value = TextContent( | |
| text="Hola, esta es una transcripción de prueba.", | |
| language="es" | |
| ) | |
| container.get_translation_provider.return_value = mock_translation_provider | |
| # Mock TTS provider | |
| mock_tts_provider = Mock() | |
| mock_audio_content = AudioContent( | |
| data=b"fake_audio_data", | |
| format="wav", | |
| sample_rate=22050, | |
| duration=2.5 | |
| ) | |
| mock_tts_provider.synthesize.return_value = mock_audio_content | |
| container.get_tts_provider.return_value = mock_tts_provider | |
| return container | |
| def audio_service(self, mock_container, mock_config): | |
| """Create audio processing service for testing.""" | |
| return AudioProcessingApplicationService(mock_container, mock_config) | |
| def sample_audio_upload(self): | |
| """Create sample audio upload DTO.""" | |
| return AudioUploadDto( | |
| filename="test_audio.wav", | |
| content=b"fake_wav_audio_data", | |
| content_type="audio/wav", | |
| size=1024 | |
| ) | |
| def sample_processing_request(self, sample_audio_upload): | |
| """Create sample processing request DTO.""" | |
| return ProcessingRequestDto( | |
| audio=sample_audio_upload, | |
| asr_model="whisper-small", | |
| target_language="es", | |
| source_language="en", | |
| voice="chatterbox", | |
| speed=1.0, | |
| requires_translation=True | |
| ) | |
| def test_complete_pipeline_success(self, audio_service, sample_processing_request): | |
| """Test successful execution of the complete audio processing pipeline.""" | |
| # Execute the pipeline | |
| result = audio_service.process_audio_pipeline(sample_processing_request) | |
| # Verify successful result | |
| assert isinstance(result, ProcessingResultDto) | |
| assert result.success is True | |
| assert result.error_message is None | |
| assert result.original_text == "Hello, this is a test transcription." | |
| assert result.translated_text == "Hola, esta es una transcripción de prueba." | |
| assert result.audio_path is not None | |
| assert result.processing_time > 0 | |
| assert result.metadata is not None | |
| assert 'correlation_id' in result.metadata | |
| def test_pipeline_without_translation(self, audio_service, sample_audio_upload): | |
| """Test pipeline execution without translation (same language).""" | |
| request = ProcessingRequestDto( | |
| audio=sample_audio_upload, | |
| asr_model="whisper-small", | |
| target_language="en", | |
| source_language="en", | |
| voice="chatterbox", | |
| speed=1.0, | |
| requires_translation=False | |
| ) | |
| result = audio_service.process_audio_pipeline(request) | |
| assert result.success is True | |
| assert result.original_text == "Hello, this is a test transcription." | |
| assert result.translated_text is None # No translation performed | |
| assert result.audio_path is not None | |
| def test_pipeline_with_different_voice_settings(self, audio_service, sample_audio_upload): | |
| """Test pipeline with different voice settings.""" | |
| request = ProcessingRequestDto( | |
| audio=sample_audio_upload, | |
| asr_model="whisper-medium", | |
| target_language="fr", | |
| source_language="en", | |
| voice="chatterbox", | |
| speed=1.5, | |
| requires_translation=True | |
| ) | |
| result = audio_service.process_audio_pipeline(request) | |
| assert result.success is True | |
| assert result.metadata['voice'] == "chatterbox" | |
| assert result.metadata['speed'] == 1.5 | |
| assert result.metadata['asr_model'] == "whisper-medium" | |
| def test_pipeline_performance_metrics(self, audio_service, sample_processing_request): | |
| """Test that pipeline captures performance metrics.""" | |
| start_time = time.time() | |
| result = audio_service.process_audio_pipeline(sample_processing_request) | |
| end_time = time.time() | |
| assert result.success is True | |
| assert result.processing_time > 0 | |
| assert result.processing_time <= (end_time - start_time) + 0.1 # Allow small margin | |
| assert 'correlation_id' in result.metadata | |
| def test_pipeline_with_large_file(self, audio_service, mock_config): | |
| """Test pipeline behavior with large audio files.""" | |
| # Create large audio upload | |
| large_audio = AudioUploadDto( | |
| filename="large_audio.wav", | |
| content=b"x" * (10 * 1024 * 1024), # 10MB | |
| content_type="audio/wav", | |
| size=10 * 1024 * 1024 | |
| ) | |
| request = ProcessingRequestDto( | |
| audio=large_audio, | |
| asr_model="whisper-small", | |
| target_language="es", | |
| voice="chatterbox", | |
| speed=1.0, | |
| requires_translation=True | |
| ) | |
| result = audio_service.process_audio_pipeline(request) | |
| assert result.success is True | |
| assert result.metadata['file_size'] == 10 * 1024 * 1024 | |
| def test_pipeline_file_cleanup(self, audio_service, sample_processing_request, temp_dir): | |
| """Test that temporary files are properly cleaned up.""" | |
| # Count files before processing | |
| files_before = len(list(Path(temp_dir).rglob("*"))) | |
| result = audio_service.process_audio_pipeline(sample_processing_request) | |
| # Verify processing succeeded | |
| assert result.success is True | |
| # Verify cleanup occurred (no additional temp files) | |
| files_after = len(list(Path(temp_dir).rglob("*"))) | |
| assert files_after <= files_before + 1 # Allow for output file | |
| def test_pipeline_correlation_id_tracking(self, audio_service, sample_processing_request): | |
| """Test that correlation IDs are properly tracked throughout the pipeline.""" | |
| result = audio_service.process_audio_pipeline(sample_processing_request) | |
| assert result.success is True | |
| assert 'correlation_id' in result.metadata | |
| correlation_id = result.metadata['correlation_id'] | |
| assert isinstance(correlation_id, str) | |
| assert len(correlation_id) > 0 | |
| # Verify correlation ID is used in status tracking | |
| status = audio_service.get_processing_status(correlation_id) | |
| assert status['correlation_id'] == correlation_id | |
| def test_pipeline_metadata_completeness(self, audio_service, sample_processing_request): | |
| """Test that pipeline result contains complete metadata.""" | |
| result = audio_service.process_audio_pipeline(sample_processing_request) | |
| assert result.success is True | |
| assert result.metadata is not None | |
| expected_metadata_keys = [ | |
| 'correlation_id', 'asr_model', 'target_language', | |
| 'voice', 'speed', 'translation_required' | |
| ] | |
| for key in expected_metadata_keys: | |
| assert key in result.metadata | |
| def test_pipeline_supported_configurations(self, audio_service): | |
| """Test retrieval of supported pipeline configurations.""" | |
| config = audio_service.get_supported_configurations() | |
| assert 'asr_models' in config | |
| assert 'voices' in config | |
| assert 'languages' in config | |
| assert 'audio_formats' in config | |
| assert 'max_file_size_mb' in config | |
| assert 'speed_range' in config | |
| assert isinstance(config['asr_models'], list) | |
| assert isinstance(config['voices'], list) | |
| assert isinstance(config['languages'], list) | |
| assert len(config['asr_models']) > 0 | |
| assert len(config['voices']) > 0 | |
| def test_pipeline_context_manager(self, mock_container, mock_config): | |
| """Test audio service as context manager.""" | |
| with AudioProcessingApplicationService(mock_container, mock_config) as service: | |
| assert service is not None | |
| # Service should be usable within context | |
| config = service.get_supported_configurations() | |
| assert config is not None | |
| def test_pipeline_multiple_requests(self, audio_service, sample_audio_upload): | |
| """Test processing multiple requests in sequence.""" | |
| requests = [] | |
| for i in range(3): | |
| request = ProcessingRequestDto( | |
| audio=sample_audio_upload, | |
| asr_model="whisper-small", | |
| target_language="es", | |
| voice="chatterbox", | |
| speed=1.0, | |
| requires_translation=True | |
| ) | |
| requests.append(request) | |
| results = [] | |
| for request in requests: | |
| result = audio_service.process_audio_pipeline(request) | |
| results.append(result) | |
| # Verify all requests succeeded | |
| for result in results: | |
| assert result.success is True | |
| assert result.original_text is not None | |
| assert result.translated_text is not None | |
| # Verify each request has unique correlation ID | |
| correlation_ids = [r.metadata['correlation_id'] for r in results] | |
| assert len(set(correlation_ids)) == 3 # All unique | |
| def test_pipeline_concurrent_processing(self, audio_service, sample_processing_request): | |
| """Test pipeline behavior under concurrent processing.""" | |
| import threading | |
| import queue | |
| results_queue = queue.Queue() | |
| def process_request(): | |
| try: | |
| result = audio_service.process_audio_pipeline(sample_processing_request) | |
| results_queue.put(result) | |
| except Exception as e: | |
| results_queue.put(e) | |
| # Start multiple threads | |
| threads = [] | |
| for _ in range(3): | |
| thread = threading.Thread(target=process_request) | |
| threads.append(thread) | |
| thread.start() | |
| # Wait for completion | |
| for thread in threads: | |
| thread.join() | |
| # Verify all results | |
| results = [] | |
| while not results_queue.empty(): | |
| result = results_queue.get() | |
| if isinstance(result, Exception): | |
| pytest.fail(f"Concurrent processing failed: {result}") | |
| results.append(result) | |
| assert len(results) == 3 | |
| for result in results: | |
| assert result.success is True | |
| def test_pipeline_memory_usage(self, audio_service, sample_processing_request): | |
| """Test pipeline memory usage and cleanup.""" | |
| import psutil | |
| import os | |
| process = psutil.Process(os.getpid()) | |
| memory_before = process.memory_info().rss | |
| # Process multiple requests | |
| for _ in range(5): | |
| result = audio_service.process_audio_pipeline(sample_processing_request) | |
| assert result.success is True | |
| memory_after = process.memory_info().rss | |
| memory_increase = memory_after - memory_before | |
| # Memory increase should be reasonable (less than 50MB for test data) | |
| assert memory_increase < 50 * 1024 * 1024 | |
| def test_pipeline_with_streaming_synthesis(self, audio_service, sample_processing_request, mock_container): | |
| """Test pipeline with streaming TTS synthesis.""" | |
| # Mock streaming TTS provider | |
| mock_tts_provider = mock_container.get_tts_provider.return_value | |
| def mock_stream(): | |
| for i in range(3): | |
| yield AudioContent( | |
| data=f"chunk_{i}".encode(), | |
| format="wav", | |
| sample_rate=22050, | |
| duration=0.5 | |
| ) | |
| mock_tts_provider.synthesize_stream.return_value = mock_stream() | |
| result = audio_service.process_audio_pipeline(sample_processing_request) | |
| assert result.success is True | |
| assert result.audio_path is not None | |
| def test_pipeline_configuration_validation(self, audio_service): | |
| """Test pipeline configuration validation.""" | |
| config = audio_service.get_supported_configurations() | |
| # Verify configuration structure | |
| assert isinstance(config['asr_models'], list) | |
| assert isinstance(config['voices'], list) | |
| assert isinstance(config['languages'], list) | |
| assert isinstance(config['audio_formats'], list) | |
| assert isinstance(config['max_file_size_mb'], (int, float)) | |
| assert isinstance(config['speed_range'], dict) | |
| # Verify speed range | |
| speed_range = config['speed_range'] | |
| assert 'min' in speed_range | |
| assert 'max' in speed_range | |
| assert speed_range['min'] < speed_range['max'] | |
| assert speed_range['min'] > 0 | |
| assert speed_range['max'] <= 3.0 | |
| def test_pipeline_error_recovery_logging(self, audio_service, sample_processing_request, mock_container): | |
| """Test that error recovery attempts are properly logged.""" | |
| # Mock STT provider to fail first time, succeed second time | |
| mock_stt_provider = mock_container.get_stt_provider.return_value | |
| mock_stt_provider.transcribe.side_effect = [ | |
| SpeechRecognitionException("First attempt failed"), | |
| TextContent(text="Recovered transcription", language="en") | |
| ] | |
| with patch('src.application.services.audio_processing_service.logger') as mock_logger: | |
| result = audio_service.process_audio_pipeline(sample_processing_request) | |
| assert result.success is True | |
| # Verify error and recovery were logged | |
| mock_logger.warning.assert_called() | |
| mock_logger.info.assert_called() | |
| def test_pipeline_end_to_end_timing(self, audio_service, sample_processing_request): | |
| """Test end-to-end pipeline timing and performance.""" | |
| start_time = time.time() | |
| result = audio_service.process_audio_pipeline(sample_processing_request) | |
| end_time = time.time() | |
| total_time = end_time - start_time | |
| assert result.success is True | |
| assert result.processing_time > 0 | |
| assert result.processing_time <= total_time | |
| # For mock providers, processing should be fast | |
| assert total_time < 5.0 # Should complete within 5 seconds | |
| # Verify timing metadata | |
| assert 'correlation_id' in result.metadata | |
| timing_info = result.metadata | |
| assert timing_info is not None |