"""Unit tests for error handling scenarios.""" import pytest from unittest.mock import Mock, AsyncMock, patch from fastapi import HTTPException from domain.exceptions.domain_exceptions import ( ValidationError, InvalidExternalJobIdFormatError, DuplicateExternalJobIdError, JobNotFoundError, JobNotCompletedError, AuthenticationError, NotificationFailureError ) from infrastructure.services.n8n_notification_service import redact_bearer_token from interfaces.api.middleware.error_handler import redact_sensitive_data from domain.services.validation_service import ValidationService class TestDomainExceptions: """Test domain exception classes.""" def test_invalid_external_job_id_format_error(self): """Test InvalidExternalJobIdFormatError creation and attributes.""" job_id = "invalid@job#id" description = "Must contain only alphanumeric characters" error = InvalidExternalJobIdFormatError(job_id, description) assert error.job_id == job_id assert error.format_description == description assert str(error) == f"Invalid external job ID format: {job_id}. {description}" def test_duplicate_external_job_id_error(self): """Test DuplicateExternalJobIdError creation and attributes.""" job_id = "existing-job-123" error = DuplicateExternalJobIdError(job_id) assert error.external_job_id == job_id assert str(error) == f"External job ID already exists: {job_id}" def test_job_not_found_error(self): """Test JobNotFoundError creation and attributes.""" job_id = "non-existent-job" error = JobNotFoundError(job_id) assert error.job_id == job_id assert str(error) == f"Job not found: {job_id}" def test_job_not_completed_error(self): """Test JobNotCompletedError creation and attributes.""" job_id = "processing-job" status = "processing" error = JobNotCompletedError(job_id, status) assert error.job_id == job_id assert error.status == status assert str(error) == f"Job {job_id} is not completed (status: {status})" def test_authentication_error(self): """Test AuthenticationError creation.""" error = AuthenticationError("Invalid token") assert str(error) == "Invalid token" def test_notification_failure_error(self): """Test NotificationFailureError creation.""" service = "N8N" details = "Connection timeout" error = NotificationFailureError(service, details) assert error.service == service assert error.details == details assert str(error) == f"Notification to {service} failed: {details}" class TestValidationService: """Test ValidationService error handling.""" def setup_method(self): """Set up test fixtures.""" self.validation_service = ValidationService( max_file_size_mb=100.0, supported_video_formats=['.mp4', '.avi', '.mov'], supported_audio_formats=['mp3', 'aac', 'wav'] ) def test_validate_external_job_id_valid_formats(self): """Test valid external job ID formats.""" valid_ids = [ "job-123", "job_456", "MyJob789", "a", "job-with-underscores_and-hyphens", "1234567890", None, # None should be valid (optional field) "", # Empty string should be valid (optional field) ] for job_id in valid_ids: # Should not raise any exception self.validation_service.validate_external_job_id(job_id) def test_validate_external_job_id_invalid_formats(self): """Test invalid external job ID formats.""" invalid_cases = [ ("job@123", "Must contain only alphanumeric characters, underscores, and hyphens"), ("job#456", "Must contain only alphanumeric characters, underscores, and hyphens"), ("job 789", "Must contain only alphanumeric characters, underscores, and hyphens"), ("job.txt", "Must contain only alphanumeric characters, underscores, and hyphens"), ("job+extra", "Must contain only alphanumeric characters, underscores, and hyphens"), ("a" * 51, "Must be 50 characters or less"), # Too long ] for job_id, expected_description in invalid_cases: with pytest.raises(InvalidExternalJobIdFormatError) as exc_info: self.validation_service.validate_external_job_id(job_id) assert exc_info.value.job_id == job_id assert expected_description in exc_info.value.format_description class TestSecureLogging: """Test secure logging with token redaction.""" def test_redact_bearer_token_function(self): """Test redaction of bearer tokens from log messages.""" test_cases = [ ( "Bearer token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature", "Bearer token: ***JWT***" ), ( 'Authorization: "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature"', 'Authorization: "Bearer ***JWT***"' ), ( "No tokens here", "No tokens here" ), ( "bearer: sometoken", "bearer: ***" ), ] for input_msg, expected_output in test_cases: result = redact_bearer_token(input_msg) assert result == expected_output def test_redact_sensitive_data_middleware(self): """Test middleware function for redacting sensitive data.""" test_cases = [ ( 'Authorization: Bearer eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature', 'Authorization: Bearer ***' ), ( '"Authorization": "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature"', '"Authorization": "Bearer ***"' ), ( "JWT token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature", "JWT token: ***JWT***" ), ( "No sensitive data here", "No sensitive data here" ), ] for input_data, expected_output in test_cases: result = redact_sensitive_data(input_data) assert result == expected_output def test_redact_multiple_tokens(self): """Test redaction of multiple tokens in the same message.""" input_msg = ( "User1 token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature1 " "and User2 token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiI5ODc2NTQzMjEwIn0.signature2" ) result = redact_bearer_token(input_msg) # Both tokens should be redacted assert "signature1" not in result assert "signature2" not in result assert "***JWT***" in result class TestN8NNotificationErrorHandling: """Test N8N notification service error handling.""" def setup_method(self): """Set up test fixtures.""" from infrastructure.clients.n8n.n8n_client import N8NClient from infrastructure.services.n8n_notification_service import N8NNotificationService self.mock_n8n_client = Mock(spec=N8NClient) self.service = N8NNotificationService(self.mock_n8n_client) @pytest.mark.asyncio async def test_notification_service_handles_client_exceptions(self): """Test that notification service handles all types of client exceptions.""" from infrastructure.clients.n8n.exceptions import APIClientError, APIConnectionError, APIResponseError exceptions_to_test = [ Exception("Generic error"), APIClientError("Client error"), APIConnectionError("Connection failed"), APIResponseError("Invalid response", 500), TimeoutError("Request timeout"), ConnectionRefusedError("Connection refused"), ] for exception in exceptions_to_test: # Reset mock for each test self.mock_n8n_client.post_completion_event = AsyncMock(side_effect=exception) # Service should handle all exceptions gracefully result = await self.service.send_job_completion_notification( job_id="test-job", status="completed", processing_time=10.0 ) # Should return failed acknowledgment without raising assert result.acknowledged is False @pytest.mark.asyncio async def test_notification_service_logs_sanitized_errors(self): """Test that notification service logs errors with sensitive data redacted.""" from infrastructure.clients.n8n.models import WebhooksResponse # Create an exception with sensitive data sensitive_error = Exception( "API error with token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature" ) self.mock_n8n_client.post_completion_event = AsyncMock(side_effect=sensitive_error) with patch('infrastructure.services.n8n_notification_service.logger') as mock_logger: result = await self.service.send_job_completion_notification( job_id="test-job", status="completed", processing_time=5.0 ) # Verify error was logged mock_logger.error.assert_called_once() logged_message = mock_logger.error.call_args[0][0] # Verify sensitive data was redacted assert "signature" not in logged_message assert "***JWT***" in logged_message assert result.acknowledged is False class TestAPIErrorResponses: """Test API endpoint error response formats.""" def test_job_not_found_error_response_format(self): """Test JobNotFoundError creates proper HTTP error response.""" job_id = "missing-job-123" error = JobNotFoundError(job_id) # Simulate how the route handler would format the error expected_response = { "error": str(error), "code": "JOB_NOT_FOUND", "job_id": job_id } # Verify the error object has the needed attributes assert error.job_id == job_id assert str(error) == f"Job not found: {job_id}" def test_duplicate_job_id_error_response_format(self): """Test DuplicateExternalJobIdError creates proper HTTP error response.""" external_job_id = "duplicate-job-456" error = DuplicateExternalJobIdError(external_job_id) # Simulate how the route handler would format the error expected_response = { "error": str(error), "code": "DUPLICATE_EXTERNAL_JOB_ID", "external_job_id": external_job_id } # Verify the error object has the needed attributes assert error.external_job_id == external_job_id assert str(error) == f"External job ID already exists: {external_job_id}" def test_invalid_job_id_format_error_response(self): """Test InvalidExternalJobIdFormatError creates proper HTTP error response.""" job_id = "invalid@job" description = "Must contain only alphanumeric characters" error = InvalidExternalJobIdFormatError(job_id, description) # Simulate how the route handler would format the error expected_response = { "error": "Invalid external job ID format", "details": str(error), "code": "INVALID_EXTERNAL_JOB_ID_FORMAT", "field": "job_id", "value": job_id } # Verify the error object has the needed attributes assert error.job_id == job_id assert error.format_description == description class TestErrorHandlingMiddleware: """Test error handling middleware functions.""" def test_secure_logging_redacts_authorization_headers(self): """Test that authorization headers are properly redacted in logs.""" test_data = ''' { "headers": { "Authorization": "Bearer eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature", "Content-Type": "application/json" } } ''' redacted = redact_sensitive_data(test_data) # Should redact the token but keep the Bearer prefix assert "Bearer ***" in redacted assert "signature" not in redacted assert "Content-Type" in redacted # Other headers should remain def test_secure_logging_handles_various_token_formats(self): """Test that various token formats are handled correctly.""" test_cases = [ 'Authorization: Bearer token.goes.here', '"Authorization": "Bearer token.goes.here"', "Authorization: 'Bearer token.goes.here'", 'bearer: token.goes.here', '"bearer": "token.goes.here"', ] for test_data in test_cases: redacted = redact_sensitive_data(test_data) # Should not contain the full token assert "token.goes.here" not in redacted # Should contain redaction indicator assert "***" in redacted def test_redaction_preserves_non_sensitive_data(self): """Test that redaction doesn't affect non-sensitive data.""" test_data = "User ID: 12345, Email: user@example.com, Status: active" redacted = redact_sensitive_data(test_data) # Should be unchanged since no sensitive data assert redacted == test_data class TestValidationErrorHandling: """Test validation error handling in various scenarios.""" def test_time_format_validation_errors(self): """Test time format validation error handling.""" validation_service = ValidationService( max_file_size_mb=100.0, supported_video_formats=['.mp4'], supported_audio_formats=['mp3'] ) invalid_times = [ ("25:30:00", "Invalid minutes"), # Invalid minutes ("12:60:00", "Invalid minutes"), # Invalid minutes ("12:30:60", "Invalid seconds"), # Invalid seconds ("invalid", "Invalid time format"), # Wrong format ("12:3:45", "Invalid time format"), # Wrong format (missing zero) ] for time_str, expected_error_type in invalid_times: with pytest.raises(ValidationError) as exc_info: validation_service.validate_time_format(time_str) assert expected_error_type.lower() in str(exc_info.value).lower() # Integration test for complete error flow class TestErrorFlowIntegration: """Test complete error handling flow from domain to API response.""" @pytest.mark.asyncio async def test_complete_authentication_error_flow(self): """Test complete flow from missing token to 401 response.""" from interfaces.api.dependencies import validate_bearer_token # Test missing authorization header with pytest.raises(HTTPException) as exc_info: await validate_bearer_token(None) assert exc_info.value.status_code == 401 assert "Missing Authorization header" in exc_info.value.detail assert exc_info.value.headers["WWW-Authenticate"] == "Bearer" @pytest.mark.asyncio async def test_complete_validation_error_flow(self): """Test complete flow from invalid job ID to 400 response.""" validation_service = ValidationService( max_file_size_mb=100.0, supported_video_formats=['.mp4'], supported_audio_formats=['mp3'] ) # Test invalid external job ID invalid_job_id = "invalid@job#id" with pytest.raises(InvalidExternalJobIdFormatError) as exc_info: validation_service.validate_external_job_id(invalid_job_id) error = exc_info.value assert error.job_id == invalid_job_id assert "alphanumeric" in error.format_description def test_n8n_notification_failure_resilience(self): """Test that N8N notification failures don't break job processing.""" from infrastructure.clients.n8n.n8n_client import N8NClient from infrastructure.services.n8n_notification_service import N8NNotificationService # Create service with mock client that always fails mock_client = Mock(spec=N8NClient) mock_client.post_completion_event = AsyncMock( side_effect=Exception("N8N service down") ) service = N8NNotificationService(mock_client) # This should not raise an exception # (would need to be async test in real scenario) assert service is not None # Basic test that service was created