Spaces:
Runtime error
Runtime error
| """ | |
| Unit Tests for TTS Pipeline Components | |
| ====================================== | |
| Comprehensive test suite for the optimized TTS system. | |
| """ | |
| import unittest | |
| import numpy as np | |
| import tempfile | |
| import os | |
| import sys | |
| from unittest.mock import Mock, patch, MagicMock | |
| # Add src to path | |
| sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'src')) | |
| from src.preprocessing import TextProcessor | |
| from src.audio_processing import AudioProcessor | |
| class TestTextProcessor(unittest.TestCase): | |
| """Test cases for text preprocessing functionality.""" | |
| def setUp(self): | |
| """Set up test fixtures.""" | |
| self.processor = TextProcessor(max_chunk_length=100, overlap_words=3) | |
| def test_empty_text_processing(self): | |
| """Test handling of empty text.""" | |
| result = self.processor.process_text("") | |
| self.assertEqual(result, "") | |
| result = self.processor.process_text(None) | |
| self.assertEqual(result, "") | |
| def test_number_conversion_cache(self): | |
| """Test number conversion with caching.""" | |
| # First call should populate cache | |
| result1 = self.processor._convert_number_to_armenian_words(42) | |
| # Second call should use cache | |
| result2 = self.processor._convert_number_to_armenian_words(42) | |
| self.assertEqual(result1, result2) | |
| self.assertIn("42", self.processor.number_cache) | |
| def test_text_chunking_short_text(self): | |
| """Test chunking behavior with short text.""" | |
| short_text = "Կարճ տեքստ:" | |
| chunks = self.processor.chunk_text(short_text) | |
| self.assertEqual(len(chunks), 1) | |
| self.assertEqual(chunks[0], short_text) | |
| def test_text_chunking_long_text(self): | |
| """Test chunking behavior with long text.""" | |
| long_text = "Այս շատ երկար տեքստ է, որը պետք է բաժանվի մի քանի մասի: " * 5 | |
| chunks = self.processor.chunk_text(long_text) | |
| self.assertGreater(len(chunks), 1) | |
| # Check that each chunk is within limits | |
| for chunk in chunks: | |
| self.assertLessEqual(len(chunk), self.processor.max_chunk_length + 50) # Some tolerance | |
| def test_sentence_splitting(self): | |
| """Test sentence splitting functionality.""" | |
| text = "Առաջին նախադասություն: Երկրորդ նախադասություն! Երրորդ նախադասություն?" | |
| sentences = self.processor._split_into_sentences(text) | |
| self.assertEqual(len(sentences), 3) | |
| self.assertIn("Առաջին նախադասություն", sentences[0]) | |
| def test_overlap_addition(self): | |
| """Test overlap addition between chunks.""" | |
| chunks = ["Առաջին մաս շատ կարևոր է", "Երկրորդ մասը նույնպես կարևոր"] | |
| overlapped = self.processor._add_overlap(chunks) | |
| self.assertEqual(len(overlapped), 2) | |
| # Second chunk should contain words from first | |
| self.assertIn("կարևոր", overlapped[1]) | |
| def test_cache_clearing(self): | |
| """Test cache clearing functionality.""" | |
| # Add some data to caches | |
| self.processor.number_cache["test"] = "test_value" | |
| self.processor._cached_translate("test") | |
| # Clear caches | |
| self.processor.clear_cache() | |
| self.assertEqual(len(self.processor.number_cache), 0) | |
| def test_cache_stats(self): | |
| """Test cache statistics functionality.""" | |
| stats = self.processor.get_cache_stats() | |
| self.assertIn("translation_cache_size", stats) | |
| self.assertIn("number_cache_size", stats) | |
| self.assertIn("lru_cache_hits", stats) | |
| self.assertIn("lru_cache_misses", stats) | |
| class TestAudioProcessor(unittest.TestCase): | |
| """Test cases for audio processing functionality.""" | |
| def setUp(self): | |
| """Set up test fixtures.""" | |
| self.processor = AudioProcessor( | |
| crossfade_duration=0.1, | |
| sample_rate=16000, | |
| apply_noise_gate=True, | |
| normalize_audio=True | |
| ) | |
| def test_empty_audio_processing(self): | |
| """Test handling of empty audio.""" | |
| empty_audio = np.array([], dtype=np.int16) | |
| result = self.processor.process_audio(empty_audio) | |
| self.assertEqual(len(result), 0) | |
| self.assertEqual(result.dtype, np.int16) | |
| def test_audio_normalization(self): | |
| """Test audio normalization.""" | |
| # Create test audio with known peak | |
| test_audio = np.array([1000, -2000, 3000, -1500], dtype=np.int16) | |
| normalized = self.processor._normalize_audio(test_audio) | |
| # Peak should be close to target | |
| peak = np.max(np.abs(normalized)) | |
| expected_peak = 0.95 * 32767 | |
| self.assertAlmostEqual(peak, expected_peak, delta=100) | |
| def test_crossfade_window_creation(self): | |
| """Test crossfade window creation.""" | |
| length = 100 | |
| fade_out, fade_in = self.processor._create_crossfade_window(length) | |
| self.assertEqual(len(fade_out), length) | |
| self.assertEqual(len(fade_in), length) | |
| # Windows should sum to approximately 1 | |
| window_sum = fade_out + fade_in | |
| np.testing.assert_allclose(window_sum, 1.0, atol=0.01) | |
| def test_single_segment_crossfade(self): | |
| """Test crossfading with single audio segment.""" | |
| audio = np.random.randint(-1000, 1000, 1000, dtype=np.int16) | |
| result = self.processor.crossfade_audio_segments([audio]) | |
| np.testing.assert_array_equal(result, audio) | |
| def test_multiple_segment_crossfade(self): | |
| """Test crossfading with multiple audio segments.""" | |
| segment1 = np.random.randint(-1000, 1000, 1000, dtype=np.int16) | |
| segment2 = np.random.randint(-1000, 1000, 1000, dtype=np.int16) | |
| result = self.processor.crossfade_audio_segments([segment1, segment2]) | |
| # Result should be longer than either segment but shorter than sum | |
| self.assertGreater(len(result), len(segment1)) | |
| self.assertLess(len(result), len(segment1) + len(segment2)) | |
| def test_silence_addition(self): | |
| """Test silence padding.""" | |
| audio = np.random.randint(-1000, 1000, 1000, dtype=np.int16) | |
| padded = self.processor.add_silence(audio, start_silence=0.1, end_silence=0.1) | |
| expected_padding = int(0.1 * self.processor.sample_rate) | |
| expected_length = len(audio) + 2 * expected_padding | |
| self.assertEqual(len(padded), expected_length) | |
| # Start and end should be silent | |
| self.assertTrue(np.all(padded[:expected_padding] == 0)) | |
| self.assertTrue(np.all(padded[-expected_padding:] == 0)) | |
| def test_audio_stats(self): | |
| """Test audio statistics calculation.""" | |
| # Create test audio | |
| audio = np.random.randint(-10000, 10000, 16000, dtype=np.int16) # 1 second | |
| stats = self.processor.get_audio_stats(audio) | |
| self.assertAlmostEqual(stats["duration_seconds"], 1.0, places=2) | |
| self.assertEqual(stats["sample_count"], 16000) | |
| self.assertIn("peak_amplitude", stats) | |
| self.assertIn("rms_level", stats) | |
| self.assertIn("dynamic_range_db", stats) | |
| def test_empty_audio_stats(self): | |
| """Test statistics for empty audio.""" | |
| empty_audio = np.array([], dtype=np.int16) | |
| stats = self.processor.get_audio_stats(empty_audio) | |
| self.assertIn("error", stats) | |
| def test_process_and_concatenate(self): | |
| """Test full processing and concatenation pipeline.""" | |
| segments = [ | |
| np.random.randint(-1000, 1000, 500, dtype=np.int16), | |
| np.random.randint(-1000, 1000, 600, dtype=np.int16), | |
| np.random.randint(-1000, 1000, 700, dtype=np.int16) | |
| ] | |
| result = self.processor.process_and_concatenate(segments) | |
| self.assertGreater(len(result), 0) | |
| self.assertEqual(result.dtype, np.int16) | |
| class TestModelIntegration(unittest.TestCase): | |
| """Integration tests for model components.""" | |
| def setUp(self): | |
| """Set up mock components for testing.""" | |
| self.mock_processor = Mock() | |
| self.mock_model = Mock() | |
| self.mock_vocoder = Mock() | |
| def test_model_initialization_mocked(self, mock_np, mock_torch, | |
| mock_vocoder_class, mock_model_class, | |
| mock_processor_class): | |
| """Test model initialization with mocked dependencies.""" | |
| # Configure mocks | |
| mock_torch.cuda.is_available.return_value = False | |
| mock_torch.device.return_value = Mock() | |
| mock_processor_instance = Mock() | |
| mock_processor_class.from_pretrained.return_value = mock_processor_instance | |
| mock_model_instance = Mock() | |
| mock_model_class.from_pretrained.return_value = mock_model_instance | |
| mock_vocoder_instance = Mock() | |
| mock_vocoder_class.from_pretrained.return_value = mock_vocoder_instance | |
| # Create temporary numpy file | |
| with tempfile.NamedTemporaryFile(suffix='.npy', delete=False) as tmp: | |
| test_embedding = np.random.rand(512).astype(np.float32) | |
| np.save(tmp.name, test_embedding) | |
| tmp_path = tmp.name | |
| try: | |
| # This would normally import and test OptimizedTTSModel | |
| # But since we're testing in isolation, we'll verify the mocks were called | |
| mock_processor_class.from_pretrained.assert_called_once() | |
| mock_model_class.from_pretrained.assert_called_once() | |
| mock_vocoder_class.from_pretrained.assert_called_once() | |
| finally: | |
| # Clean up temporary file | |
| if os.path.exists(tmp_path): | |
| os.unlink(tmp_path) | |
| class TestPipelineIntegration(unittest.TestCase): | |
| """Integration tests for the complete pipeline.""" | |
| def test_empty_text_handling(self): | |
| """Test pipeline handling of empty text.""" | |
| # This would test the actual pipeline with mocked components | |
| # For now, we test the concept | |
| text = "" | |
| expected_output = (16000, np.zeros(0, dtype=np.int16)) | |
| # Mock pipeline behavior | |
| if not text.strip(): | |
| result = expected_output | |
| self.assertEqual(result[0], 16000) | |
| self.assertEqual(len(result[1]), 0) | |
| def test_chunking_decision_logic(self): | |
| """Test the logic for deciding when to use chunking.""" | |
| max_chunk_length = 200 | |
| short_text = "Կարճ տեքստ" | |
| long_text = "a" * 300 # Longer than max_chunk_length | |
| should_chunk_short = len(short_text) > max_chunk_length | |
| should_chunk_long = len(long_text) > max_chunk_length | |
| self.assertFalse(should_chunk_short) | |
| self.assertTrue(should_chunk_long) | |
| def run_performance_benchmark(): | |
| """Run basic performance benchmarks.""" | |
| print("\n" + "="*50) | |
| print("PERFORMANCE BENCHMARK") | |
| print("="*50) | |
| # Text processing benchmark | |
| processor = TextProcessor() | |
| test_texts = [ | |
| "Կարճ տեքստ", | |
| "Միջին երկարության տեքստ, որը պարունակում է մի քանի բառ և թվեր 123:", | |
| "Շատ երկար տեքստ, որը կրկնվում է " * 20 | |
| ] | |
| for i, text in enumerate(test_texts): | |
| import time | |
| start = time.time() | |
| processed = processor.process_text(text) | |
| chunks = processor.chunk_text(processed) | |
| end = time.time() | |
| print(f"Text {i+1}: {len(text)} chars → {len(chunks)} chunks in {end-start:.4f}s") | |
| # Audio processing benchmark | |
| audio_processor = AudioProcessor() | |
| test_segments = [ | |
| np.random.randint(-10000, 10000, 16000, dtype=np.int16), # 1 second | |
| np.random.randint(-10000, 10000, 32000, dtype=np.int16), # 2 seconds | |
| np.random.randint(-10000, 10000, 80000, dtype=np.int16), # 5 seconds | |
| ] | |
| for i, segment in enumerate(test_segments): | |
| import time | |
| start = time.time() | |
| processed = audio_processor.process_audio(segment) | |
| end = time.time() | |
| duration = len(segment) / 16000 | |
| print(f"Audio {i+1}: {duration:.1f}s processed in {end-start:.4f}s") | |
| if __name__ == "__main__": | |
| # Run unit tests | |
| print("Running Unit Tests...") | |
| unittest.main(argv=[''], exit=False, verbosity=2) | |
| # Run performance benchmark | |
| run_performance_benchmark() | |