Spaces:
Running
Running
| """Tests for subtitle extraction functionality.""" | |
| import pytest | |
| from unittest.mock import patch, MagicMock | |
| from app.apis.subtitles.service import SubtitleService, SUBTITLE_CACHE | |
| from app.apis.subtitles.utils import extract_video_id | |
| from app.core.exceptions import SubtitlesNotFoundError | |
| class TestSubtitleUtils: | |
| """Test subtitle utility functions.""" | |
| def test_extract_video_id_standard_url(self): | |
| """Test extracting video ID from standard YouTube URL.""" | |
| url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ" | |
| assert extract_video_id(url) == "dQw4w9WgXcQ" | |
| def test_extract_video_id_short_url(self): | |
| """Test extracting video ID from short YouTube URL.""" | |
| url = "https://youtu.be/dQw4w9WgXcQ" | |
| assert extract_video_id(url) == "dQw4w9WgXcQ" | |
| def test_extract_video_id_embed_url(self): | |
| """Test extracting video ID from embed URL.""" | |
| url = "https://www.youtube.com/embed/dQw4w9WgXcQ" | |
| assert extract_video_id(url) == "dQw4w9WgXcQ" | |
| class TestSubtitleService: | |
| """Test subtitle extraction service.""" | |
| def clear_cache(self): | |
| """Clear cache before each test.""" | |
| SUBTITLE_CACHE.clear() | |
| def service(self): | |
| """Create a subtitle service instance.""" | |
| return SubtitleService() | |
| async def test_extract_subtitles_success(self, service, sample_youtube_url): | |
| """Test successful subtitle extraction.""" | |
| with patch.object(service, '_download_audio') as mock_download, \ | |
| patch.object(service, '_transcribe_audio') as mock_transcribe: | |
| mock_download.return_value = MagicMock() | |
| mock_download.return_value.exists.return_value = True | |
| mock_transcribe.return_value = ["Test subtitle line 1", "Test subtitle line 2"] | |
| video_id, subtitles = await service.extract_subtitles(sample_youtube_url, "en") | |
| assert video_id == "dQw4w9WgXcQ" | |
| assert len(subtitles) == 2 | |
| assert "Test subtitle line 1" in subtitles | |
| async def test_extract_subtitles_uses_cache(self, service, sample_youtube_url): | |
| """Test that cached results are returned.""" | |
| with patch.object(service, '_download_audio') as mock_download, \ | |
| patch.object(service, '_transcribe_audio') as mock_transcribe: | |
| mock_download.return_value = MagicMock() | |
| mock_download.return_value.exists.return_value = True | |
| mock_transcribe.return_value = ["Cached subtitle"] | |
| result1 = await service.extract_subtitles(sample_youtube_url, "en") | |
| result2 = await service.extract_subtitles(sample_youtube_url, "en") | |
| assert result1 == result2 | |
| assert mock_download.call_count == 1 | |
| async def test_extract_subtitles_empty_transcription(self, service, sample_youtube_url): | |
| """Test error when transcription produces no text.""" | |
| with patch.object(service, '_download_audio') as mock_download, \ | |
| patch.object(service, '_transcribe_audio') as mock_transcribe: | |
| mock_download.return_value = MagicMock() | |
| mock_download.return_value.exists.return_value = True | |
| mock_transcribe.return_value = [] | |
| with pytest.raises(SubtitlesNotFoundError): | |
| await service.extract_subtitles(sample_youtube_url, "en") | |
| class TestSubtitleAPI: | |
| """Test subtitle API endpoints.""" | |
| def test_extract_subtitles_endpoint_success(self, client, api_key): | |
| """Test successful subtitle extraction via API.""" | |
| with patch('app.apis.subtitles.service.subtitle_service.extract_subtitles') as mock_extract: | |
| mock_extract.return_value = ("dQw4w9WgXcQ", ["Never gonna give you up"]) | |
| response = client.post( | |
| "/api/v1/subtitles/extract", | |
| json={"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ", "lang": "en"}, | |
| headers={"x-api-key": api_key} | |
| ) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert data["status"] == "success" | |
| assert data["video_id"] == "dQw4w9WgXcQ" | |
| def test_extract_subtitles_endpoint_invalid_api_key(self, client, invalid_api_key): | |
| """Test API endpoint with invalid API key.""" | |
| response = client.post( | |
| "/api/v1/subtitles/extract", | |
| json={"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ", "lang": "en"}, | |
| headers={"x-api-key": invalid_api_key} | |
| ) | |
| assert response.status_code == 401 | |
| def test_extract_subtitles_endpoint_missing_api_key(self, client): | |
| """Test API endpoint with missing API key.""" | |
| response = client.post( | |
| "/api/v1/subtitles/extract", | |
| json={"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ", "lang": "en"} | |
| ) | |
| assert response.status_code == 401 | |
| def test_extract_subtitles_endpoint_invalid_url(self, client, api_key): | |
| """Test API endpoint with invalid URL.""" | |
| response = client.post( | |
| "/api/v1/subtitles/extract", | |
| json={"url": "https://example.com/not-youtube", "lang": "en"}, | |
| headers={"x-api-key": api_key} | |
| ) | |
| assert response.status_code == 422 | |
| def test_subtitles_health_endpoint(self, client): | |
| """Test subtitles health check endpoint.""" | |
| response = client.get("/api/v1/subtitles/health") | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert data["status"] == "healthy" | |
| assert data["service"] == "subtitles" |