Spaces:
Sleeping
Sleeping
| """Unit tests for error handling scenarios.""" | |
| import pytest | |
| from unittest.mock import Mock, AsyncMock, patch | |
| from fastapi import HTTPException | |
| from domain.exceptions.domain_exceptions import ( | |
| ValidationError, | |
| InvalidExternalJobIdFormatError, | |
| DuplicateExternalJobIdError, | |
| JobNotFoundError, | |
| JobNotCompletedError, | |
| AuthenticationError, | |
| NotificationFailureError | |
| ) | |
| from infrastructure.services.n8n_notification_service import redact_bearer_token | |
| from interfaces.api.middleware.error_handler import redact_sensitive_data | |
| from domain.services.validation_service import ValidationService | |
| class TestDomainExceptions: | |
| """Test domain exception classes.""" | |
| def test_invalid_external_job_id_format_error(self): | |
| """Test InvalidExternalJobIdFormatError creation and attributes.""" | |
| job_id = "invalid@job#id" | |
| description = "Must contain only alphanumeric characters" | |
| error = InvalidExternalJobIdFormatError(job_id, description) | |
| assert error.job_id == job_id | |
| assert error.format_description == description | |
| assert str(error) == f"Invalid external job ID format: {job_id}. {description}" | |
| def test_duplicate_external_job_id_error(self): | |
| """Test DuplicateExternalJobIdError creation and attributes.""" | |
| job_id = "existing-job-123" | |
| error = DuplicateExternalJobIdError(job_id) | |
| assert error.external_job_id == job_id | |
| assert str(error) == f"External job ID already exists: {job_id}" | |
| def test_job_not_found_error(self): | |
| """Test JobNotFoundError creation and attributes.""" | |
| job_id = "non-existent-job" | |
| error = JobNotFoundError(job_id) | |
| assert error.job_id == job_id | |
| assert str(error) == f"Job not found: {job_id}" | |
| def test_job_not_completed_error(self): | |
| """Test JobNotCompletedError creation and attributes.""" | |
| job_id = "processing-job" | |
| status = "processing" | |
| error = JobNotCompletedError(job_id, status) | |
| assert error.job_id == job_id | |
| assert error.status == status | |
| assert str(error) == f"Job {job_id} is not completed (status: {status})" | |
| def test_authentication_error(self): | |
| """Test AuthenticationError creation.""" | |
| error = AuthenticationError("Invalid token") | |
| assert str(error) == "Invalid token" | |
| def test_notification_failure_error(self): | |
| """Test NotificationFailureError creation.""" | |
| service = "N8N" | |
| details = "Connection timeout" | |
| error = NotificationFailureError(service, details) | |
| assert error.service == service | |
| assert error.details == details | |
| assert str(error) == f"Notification to {service} failed: {details}" | |
| class TestValidationService: | |
| """Test ValidationService error handling.""" | |
| def setup_method(self): | |
| """Set up test fixtures.""" | |
| self.validation_service = ValidationService( | |
| max_file_size_mb=100.0, | |
| supported_video_formats=['.mp4', '.avi', '.mov'], | |
| supported_audio_formats=['mp3', 'aac', 'wav'] | |
| ) | |
| def test_validate_external_job_id_valid_formats(self): | |
| """Test valid external job ID formats.""" | |
| valid_ids = [ | |
| "job-123", | |
| "job_456", | |
| "MyJob789", | |
| "a", | |
| "job-with-underscores_and-hyphens", | |
| "1234567890", | |
| None, # None should be valid (optional field) | |
| "", # Empty string should be valid (optional field) | |
| ] | |
| for job_id in valid_ids: | |
| # Should not raise any exception | |
| self.validation_service.validate_external_job_id(job_id) | |
| def test_validate_external_job_id_invalid_formats(self): | |
| """Test invalid external job ID formats.""" | |
| invalid_cases = [ | |
| ("job@123", "Must contain only alphanumeric characters, underscores, and hyphens"), | |
| ("job#456", "Must contain only alphanumeric characters, underscores, and hyphens"), | |
| ("job 789", "Must contain only alphanumeric characters, underscores, and hyphens"), | |
| ("job.txt", "Must contain only alphanumeric characters, underscores, and hyphens"), | |
| ("job+extra", "Must contain only alphanumeric characters, underscores, and hyphens"), | |
| ("a" * 51, "Must be 50 characters or less"), # Too long | |
| ] | |
| for job_id, expected_description in invalid_cases: | |
| with pytest.raises(InvalidExternalJobIdFormatError) as exc_info: | |
| self.validation_service.validate_external_job_id(job_id) | |
| assert exc_info.value.job_id == job_id | |
| assert expected_description in exc_info.value.format_description | |
| class TestSecureLogging: | |
| """Test secure logging with token redaction.""" | |
| def test_redact_bearer_token_function(self): | |
| """Test redaction of bearer tokens from log messages.""" | |
| test_cases = [ | |
| ( | |
| "Bearer token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature", | |
| "Bearer token: ***JWT***" | |
| ), | |
| ( | |
| 'Authorization: "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature"', | |
| 'Authorization: "Bearer ***JWT***"' | |
| ), | |
| ( | |
| "No tokens here", | |
| "No tokens here" | |
| ), | |
| ( | |
| "bearer: sometoken", | |
| "bearer: ***" | |
| ), | |
| ] | |
| for input_msg, expected_output in test_cases: | |
| result = redact_bearer_token(input_msg) | |
| assert result == expected_output | |
| def test_redact_sensitive_data_middleware(self): | |
| """Test middleware function for redacting sensitive data.""" | |
| test_cases = [ | |
| ( | |
| 'Authorization: Bearer eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature', | |
| 'Authorization: Bearer ***' | |
| ), | |
| ( | |
| '"Authorization": "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature"', | |
| '"Authorization": "Bearer ***"' | |
| ), | |
| ( | |
| "JWT token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature", | |
| "JWT token: ***JWT***" | |
| ), | |
| ( | |
| "No sensitive data here", | |
| "No sensitive data here" | |
| ), | |
| ] | |
| for input_data, expected_output in test_cases: | |
| result = redact_sensitive_data(input_data) | |
| assert result == expected_output | |
| def test_redact_multiple_tokens(self): | |
| """Test redaction of multiple tokens in the same message.""" | |
| input_msg = ( | |
| "User1 token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature1 " | |
| "and User2 token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiI5ODc2NTQzMjEwIn0.signature2" | |
| ) | |
| result = redact_bearer_token(input_msg) | |
| # Both tokens should be redacted | |
| assert "signature1" not in result | |
| assert "signature2" not in result | |
| assert "***JWT***" in result | |
| class TestN8NNotificationErrorHandling: | |
| """Test N8N notification service error handling.""" | |
| def setup_method(self): | |
| """Set up test fixtures.""" | |
| from infrastructure.clients.n8n.n8n_client import N8NClient | |
| from infrastructure.services.n8n_notification_service import N8NNotificationService | |
| self.mock_n8n_client = Mock(spec=N8NClient) | |
| self.service = N8NNotificationService(self.mock_n8n_client) | |
| async def test_notification_service_handles_client_exceptions(self): | |
| """Test that notification service handles all types of client exceptions.""" | |
| from infrastructure.clients.n8n.exceptions import APIClientError, APIConnectionError, APIResponseError | |
| exceptions_to_test = [ | |
| Exception("Generic error"), | |
| APIClientError("Client error"), | |
| APIConnectionError("Connection failed"), | |
| APIResponseError("Invalid response", 500), | |
| TimeoutError("Request timeout"), | |
| ConnectionRefusedError("Connection refused"), | |
| ] | |
| for exception in exceptions_to_test: | |
| # Reset mock for each test | |
| self.mock_n8n_client.post_completion_event = AsyncMock(side_effect=exception) | |
| # Service should handle all exceptions gracefully | |
| result = await self.service.send_job_completion_notification( | |
| job_id="test-job", | |
| status="completed", | |
| processing_time=10.0 | |
| ) | |
| # Should return failed acknowledgment without raising | |
| assert result.acknowledged is False | |
| async def test_notification_service_logs_sanitized_errors(self): | |
| """Test that notification service logs errors with sensitive data redacted.""" | |
| from infrastructure.clients.n8n.models import WebhooksResponse | |
| # Create an exception with sensitive data | |
| sensitive_error = Exception( | |
| "API error with token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature" | |
| ) | |
| self.mock_n8n_client.post_completion_event = AsyncMock(side_effect=sensitive_error) | |
| with patch('infrastructure.services.n8n_notification_service.logger') as mock_logger: | |
| result = await self.service.send_job_completion_notification( | |
| job_id="test-job", | |
| status="completed", | |
| processing_time=5.0 | |
| ) | |
| # Verify error was logged | |
| mock_logger.error.assert_called_once() | |
| logged_message = mock_logger.error.call_args[0][0] | |
| # Verify sensitive data was redacted | |
| assert "signature" not in logged_message | |
| assert "***JWT***" in logged_message | |
| assert result.acknowledged is False | |
| class TestAPIErrorResponses: | |
| """Test API endpoint error response formats.""" | |
| def test_job_not_found_error_response_format(self): | |
| """Test JobNotFoundError creates proper HTTP error response.""" | |
| job_id = "missing-job-123" | |
| error = JobNotFoundError(job_id) | |
| # Simulate how the route handler would format the error | |
| expected_response = { | |
| "error": str(error), | |
| "code": "JOB_NOT_FOUND", | |
| "job_id": job_id | |
| } | |
| # Verify the error object has the needed attributes | |
| assert error.job_id == job_id | |
| assert str(error) == f"Job not found: {job_id}" | |
| def test_duplicate_job_id_error_response_format(self): | |
| """Test DuplicateExternalJobIdError creates proper HTTP error response.""" | |
| external_job_id = "duplicate-job-456" | |
| error = DuplicateExternalJobIdError(external_job_id) | |
| # Simulate how the route handler would format the error | |
| expected_response = { | |
| "error": str(error), | |
| "code": "DUPLICATE_EXTERNAL_JOB_ID", | |
| "external_job_id": external_job_id | |
| } | |
| # Verify the error object has the needed attributes | |
| assert error.external_job_id == external_job_id | |
| assert str(error) == f"External job ID already exists: {external_job_id}" | |
| def test_invalid_job_id_format_error_response(self): | |
| """Test InvalidExternalJobIdFormatError creates proper HTTP error response.""" | |
| job_id = "invalid@job" | |
| description = "Must contain only alphanumeric characters" | |
| error = InvalidExternalJobIdFormatError(job_id, description) | |
| # Simulate how the route handler would format the error | |
| expected_response = { | |
| "error": "Invalid external job ID format", | |
| "details": str(error), | |
| "code": "INVALID_EXTERNAL_JOB_ID_FORMAT", | |
| "field": "job_id", | |
| "value": job_id | |
| } | |
| # Verify the error object has the needed attributes | |
| assert error.job_id == job_id | |
| assert error.format_description == description | |
| class TestErrorHandlingMiddleware: | |
| """Test error handling middleware functions.""" | |
| def test_secure_logging_redacts_authorization_headers(self): | |
| """Test that authorization headers are properly redacted in logs.""" | |
| test_data = ''' | |
| { | |
| "headers": { | |
| "Authorization": "Bearer eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature", | |
| "Content-Type": "application/json" | |
| } | |
| } | |
| ''' | |
| redacted = redact_sensitive_data(test_data) | |
| # Should redact the token but keep the Bearer prefix | |
| assert "Bearer ***" in redacted | |
| assert "signature" not in redacted | |
| assert "Content-Type" in redacted # Other headers should remain | |
| def test_secure_logging_handles_various_token_formats(self): | |
| """Test that various token formats are handled correctly.""" | |
| test_cases = [ | |
| 'Authorization: Bearer token.goes.here', | |
| '"Authorization": "Bearer token.goes.here"', | |
| "Authorization: 'Bearer token.goes.here'", | |
| 'bearer: token.goes.here', | |
| '"bearer": "token.goes.here"', | |
| ] | |
| for test_data in test_cases: | |
| redacted = redact_sensitive_data(test_data) | |
| # Should not contain the full token | |
| assert "token.goes.here" not in redacted | |
| # Should contain redaction indicator | |
| assert "***" in redacted | |
| def test_redaction_preserves_non_sensitive_data(self): | |
| """Test that redaction doesn't affect non-sensitive data.""" | |
| test_data = "User ID: 12345, Email: user@example.com, Status: active" | |
| redacted = redact_sensitive_data(test_data) | |
| # Should be unchanged since no sensitive data | |
| assert redacted == test_data | |
| class TestValidationErrorHandling: | |
| """Test validation error handling in various scenarios.""" | |
| def test_time_format_validation_errors(self): | |
| """Test time format validation error handling.""" | |
| validation_service = ValidationService( | |
| max_file_size_mb=100.0, | |
| supported_video_formats=['.mp4'], | |
| supported_audio_formats=['mp3'] | |
| ) | |
| invalid_times = [ | |
| ("25:30:00", "Invalid minutes"), # Invalid minutes | |
| ("12:60:00", "Invalid minutes"), # Invalid minutes | |
| ("12:30:60", "Invalid seconds"), # Invalid seconds | |
| ("invalid", "Invalid time format"), # Wrong format | |
| ("12:3:45", "Invalid time format"), # Wrong format (missing zero) | |
| ] | |
| for time_str, expected_error_type in invalid_times: | |
| with pytest.raises(ValidationError) as exc_info: | |
| validation_service.validate_time_format(time_str) | |
| assert expected_error_type.lower() in str(exc_info.value).lower() | |
| # Integration test for complete error flow | |
| class TestErrorFlowIntegration: | |
| """Test complete error handling flow from domain to API response.""" | |
| async def test_complete_authentication_error_flow(self): | |
| """Test complete flow from missing token to 401 response.""" | |
| from interfaces.api.dependencies import validate_bearer_token | |
| # Test missing authorization header | |
| with pytest.raises(HTTPException) as exc_info: | |
| await validate_bearer_token(None) | |
| assert exc_info.value.status_code == 401 | |
| assert "Missing Authorization header" in exc_info.value.detail | |
| assert exc_info.value.headers["WWW-Authenticate"] == "Bearer" | |
| async def test_complete_validation_error_flow(self): | |
| """Test complete flow from invalid job ID to 400 response.""" | |
| validation_service = ValidationService( | |
| max_file_size_mb=100.0, | |
| supported_video_formats=['.mp4'], | |
| supported_audio_formats=['mp3'] | |
| ) | |
| # Test invalid external job ID | |
| invalid_job_id = "invalid@job#id" | |
| with pytest.raises(InvalidExternalJobIdFormatError) as exc_info: | |
| validation_service.validate_external_job_id(invalid_job_id) | |
| error = exc_info.value | |
| assert error.job_id == invalid_job_id | |
| assert "alphanumeric" in error.format_description | |
| def test_n8n_notification_failure_resilience(self): | |
| """Test that N8N notification failures don't break job processing.""" | |
| from infrastructure.clients.n8n.n8n_client import N8NClient | |
| from infrastructure.services.n8n_notification_service import N8NNotificationService | |
| # Create service with mock client that always fails | |
| mock_client = Mock(spec=N8NClient) | |
| mock_client.post_completion_event = AsyncMock( | |
| side_effect=Exception("N8N service down") | |
| ) | |
| service = N8NNotificationService(mock_client) | |
| # This should not raise an exception | |
| # (would need to be async test in real scenario) | |
| assert service is not None # Basic test that service was created |