audio-processor / tests /test_error_handling.py
tedowski's picture
n8n-improvements (#1)
dbe78dd verified
"""Unit tests for error handling scenarios."""
import pytest
from unittest.mock import Mock, AsyncMock, patch
from fastapi import HTTPException
from domain.exceptions.domain_exceptions import (
ValidationError,
InvalidExternalJobIdFormatError,
DuplicateExternalJobIdError,
JobNotFoundError,
JobNotCompletedError,
AuthenticationError,
NotificationFailureError
)
from infrastructure.services.n8n_notification_service import redact_bearer_token
from interfaces.api.middleware.error_handler import redact_sensitive_data
from domain.services.validation_service import ValidationService
class TestDomainExceptions:
"""Test domain exception classes."""
def test_invalid_external_job_id_format_error(self):
"""Test InvalidExternalJobIdFormatError creation and attributes."""
job_id = "invalid@job#id"
description = "Must contain only alphanumeric characters"
error = InvalidExternalJobIdFormatError(job_id, description)
assert error.job_id == job_id
assert error.format_description == description
assert str(error) == f"Invalid external job ID format: {job_id}. {description}"
def test_duplicate_external_job_id_error(self):
"""Test DuplicateExternalJobIdError creation and attributes."""
job_id = "existing-job-123"
error = DuplicateExternalJobIdError(job_id)
assert error.external_job_id == job_id
assert str(error) == f"External job ID already exists: {job_id}"
def test_job_not_found_error(self):
"""Test JobNotFoundError creation and attributes."""
job_id = "non-existent-job"
error = JobNotFoundError(job_id)
assert error.job_id == job_id
assert str(error) == f"Job not found: {job_id}"
def test_job_not_completed_error(self):
"""Test JobNotCompletedError creation and attributes."""
job_id = "processing-job"
status = "processing"
error = JobNotCompletedError(job_id, status)
assert error.job_id == job_id
assert error.status == status
assert str(error) == f"Job {job_id} is not completed (status: {status})"
def test_authentication_error(self):
"""Test AuthenticationError creation."""
error = AuthenticationError("Invalid token")
assert str(error) == "Invalid token"
def test_notification_failure_error(self):
"""Test NotificationFailureError creation."""
service = "N8N"
details = "Connection timeout"
error = NotificationFailureError(service, details)
assert error.service == service
assert error.details == details
assert str(error) == f"Notification to {service} failed: {details}"
class TestValidationService:
"""Test ValidationService error handling."""
def setup_method(self):
"""Set up test fixtures."""
self.validation_service = ValidationService(
max_file_size_mb=100.0,
supported_video_formats=['.mp4', '.avi', '.mov'],
supported_audio_formats=['mp3', 'aac', 'wav']
)
def test_validate_external_job_id_valid_formats(self):
"""Test valid external job ID formats."""
valid_ids = [
"job-123",
"job_456",
"MyJob789",
"a",
"job-with-underscores_and-hyphens",
"1234567890",
None, # None should be valid (optional field)
"", # Empty string should be valid (optional field)
]
for job_id in valid_ids:
# Should not raise any exception
self.validation_service.validate_external_job_id(job_id)
def test_validate_external_job_id_invalid_formats(self):
"""Test invalid external job ID formats."""
invalid_cases = [
("job@123", "Must contain only alphanumeric characters, underscores, and hyphens"),
("job#456", "Must contain only alphanumeric characters, underscores, and hyphens"),
("job 789", "Must contain only alphanumeric characters, underscores, and hyphens"),
("job.txt", "Must contain only alphanumeric characters, underscores, and hyphens"),
("job+extra", "Must contain only alphanumeric characters, underscores, and hyphens"),
("a" * 51, "Must be 50 characters or less"), # Too long
]
for job_id, expected_description in invalid_cases:
with pytest.raises(InvalidExternalJobIdFormatError) as exc_info:
self.validation_service.validate_external_job_id(job_id)
assert exc_info.value.job_id == job_id
assert expected_description in exc_info.value.format_description
class TestSecureLogging:
"""Test secure logging with token redaction."""
def test_redact_bearer_token_function(self):
"""Test redaction of bearer tokens from log messages."""
test_cases = [
(
"Bearer token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature",
"Bearer token: ***JWT***"
),
(
'Authorization: "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature"',
'Authorization: "Bearer ***JWT***"'
),
(
"No tokens here",
"No tokens here"
),
(
"bearer: sometoken",
"bearer: ***"
),
]
for input_msg, expected_output in test_cases:
result = redact_bearer_token(input_msg)
assert result == expected_output
def test_redact_sensitive_data_middleware(self):
"""Test middleware function for redacting sensitive data."""
test_cases = [
(
'Authorization: Bearer eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature',
'Authorization: Bearer ***'
),
(
'"Authorization": "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature"',
'"Authorization": "Bearer ***"'
),
(
"JWT token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature",
"JWT token: ***JWT***"
),
(
"No sensitive data here",
"No sensitive data here"
),
]
for input_data, expected_output in test_cases:
result = redact_sensitive_data(input_data)
assert result == expected_output
def test_redact_multiple_tokens(self):
"""Test redaction of multiple tokens in the same message."""
input_msg = (
"User1 token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature1 "
"and User2 token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiI5ODc2NTQzMjEwIn0.signature2"
)
result = redact_bearer_token(input_msg)
# Both tokens should be redacted
assert "signature1" not in result
assert "signature2" not in result
assert "***JWT***" in result
class TestN8NNotificationErrorHandling:
"""Test N8N notification service error handling."""
def setup_method(self):
"""Set up test fixtures."""
from infrastructure.clients.n8n.n8n_client import N8NClient
from infrastructure.services.n8n_notification_service import N8NNotificationService
self.mock_n8n_client = Mock(spec=N8NClient)
self.service = N8NNotificationService(self.mock_n8n_client)
@pytest.mark.asyncio
async def test_notification_service_handles_client_exceptions(self):
"""Test that notification service handles all types of client exceptions."""
from infrastructure.clients.n8n.exceptions import APIClientError, APIConnectionError, APIResponseError
exceptions_to_test = [
Exception("Generic error"),
APIClientError("Client error"),
APIConnectionError("Connection failed"),
APIResponseError("Invalid response", 500),
TimeoutError("Request timeout"),
ConnectionRefusedError("Connection refused"),
]
for exception in exceptions_to_test:
# Reset mock for each test
self.mock_n8n_client.post_completion_event = AsyncMock(side_effect=exception)
# Service should handle all exceptions gracefully
result = await self.service.send_job_completion_notification(
job_id="test-job",
status="completed",
processing_time=10.0
)
# Should return failed acknowledgment without raising
assert result.acknowledged is False
@pytest.mark.asyncio
async def test_notification_service_logs_sanitized_errors(self):
"""Test that notification service logs errors with sensitive data redacted."""
from infrastructure.clients.n8n.models import WebhooksResponse
# Create an exception with sensitive data
sensitive_error = Exception(
"API error with token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature"
)
self.mock_n8n_client.post_completion_event = AsyncMock(side_effect=sensitive_error)
with patch('infrastructure.services.n8n_notification_service.logger') as mock_logger:
result = await self.service.send_job_completion_notification(
job_id="test-job",
status="completed",
processing_time=5.0
)
# Verify error was logged
mock_logger.error.assert_called_once()
logged_message = mock_logger.error.call_args[0][0]
# Verify sensitive data was redacted
assert "signature" not in logged_message
assert "***JWT***" in logged_message
assert result.acknowledged is False
class TestAPIErrorResponses:
"""Test API endpoint error response formats."""
def test_job_not_found_error_response_format(self):
"""Test JobNotFoundError creates proper HTTP error response."""
job_id = "missing-job-123"
error = JobNotFoundError(job_id)
# Simulate how the route handler would format the error
expected_response = {
"error": str(error),
"code": "JOB_NOT_FOUND",
"job_id": job_id
}
# Verify the error object has the needed attributes
assert error.job_id == job_id
assert str(error) == f"Job not found: {job_id}"
def test_duplicate_job_id_error_response_format(self):
"""Test DuplicateExternalJobIdError creates proper HTTP error response."""
external_job_id = "duplicate-job-456"
error = DuplicateExternalJobIdError(external_job_id)
# Simulate how the route handler would format the error
expected_response = {
"error": str(error),
"code": "DUPLICATE_EXTERNAL_JOB_ID",
"external_job_id": external_job_id
}
# Verify the error object has the needed attributes
assert error.external_job_id == external_job_id
assert str(error) == f"External job ID already exists: {external_job_id}"
def test_invalid_job_id_format_error_response(self):
"""Test InvalidExternalJobIdFormatError creates proper HTTP error response."""
job_id = "invalid@job"
description = "Must contain only alphanumeric characters"
error = InvalidExternalJobIdFormatError(job_id, description)
# Simulate how the route handler would format the error
expected_response = {
"error": "Invalid external job ID format",
"details": str(error),
"code": "INVALID_EXTERNAL_JOB_ID_FORMAT",
"field": "job_id",
"value": job_id
}
# Verify the error object has the needed attributes
assert error.job_id == job_id
assert error.format_description == description
class TestErrorHandlingMiddleware:
"""Test error handling middleware functions."""
def test_secure_logging_redacts_authorization_headers(self):
"""Test that authorization headers are properly redacted in logs."""
test_data = '''
{
"headers": {
"Authorization": "Bearer eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature",
"Content-Type": "application/json"
}
}
'''
redacted = redact_sensitive_data(test_data)
# Should redact the token but keep the Bearer prefix
assert "Bearer ***" in redacted
assert "signature" not in redacted
assert "Content-Type" in redacted # Other headers should remain
def test_secure_logging_handles_various_token_formats(self):
"""Test that various token formats are handled correctly."""
test_cases = [
'Authorization: Bearer token.goes.here',
'"Authorization": "Bearer token.goes.here"',
"Authorization: 'Bearer token.goes.here'",
'bearer: token.goes.here',
'"bearer": "token.goes.here"',
]
for test_data in test_cases:
redacted = redact_sensitive_data(test_data)
# Should not contain the full token
assert "token.goes.here" not in redacted
# Should contain redaction indicator
assert "***" in redacted
def test_redaction_preserves_non_sensitive_data(self):
"""Test that redaction doesn't affect non-sensitive data."""
test_data = "User ID: 12345, Email: user@example.com, Status: active"
redacted = redact_sensitive_data(test_data)
# Should be unchanged since no sensitive data
assert redacted == test_data
class TestValidationErrorHandling:
"""Test validation error handling in various scenarios."""
def test_time_format_validation_errors(self):
"""Test time format validation error handling."""
validation_service = ValidationService(
max_file_size_mb=100.0,
supported_video_formats=['.mp4'],
supported_audio_formats=['mp3']
)
invalid_times = [
("25:30:00", "Invalid minutes"), # Invalid minutes
("12:60:00", "Invalid minutes"), # Invalid minutes
("12:30:60", "Invalid seconds"), # Invalid seconds
("invalid", "Invalid time format"), # Wrong format
("12:3:45", "Invalid time format"), # Wrong format (missing zero)
]
for time_str, expected_error_type in invalid_times:
with pytest.raises(ValidationError) as exc_info:
validation_service.validate_time_format(time_str)
assert expected_error_type.lower() in str(exc_info.value).lower()
# Integration test for complete error flow
class TestErrorFlowIntegration:
"""Test complete error handling flow from domain to API response."""
@pytest.mark.asyncio
async def test_complete_authentication_error_flow(self):
"""Test complete flow from missing token to 401 response."""
from interfaces.api.dependencies import validate_bearer_token
# Test missing authorization header
with pytest.raises(HTTPException) as exc_info:
await validate_bearer_token(None)
assert exc_info.value.status_code == 401
assert "Missing Authorization header" in exc_info.value.detail
assert exc_info.value.headers["WWW-Authenticate"] == "Bearer"
@pytest.mark.asyncio
async def test_complete_validation_error_flow(self):
"""Test complete flow from invalid job ID to 400 response."""
validation_service = ValidationService(
max_file_size_mb=100.0,
supported_video_formats=['.mp4'],
supported_audio_formats=['mp3']
)
# Test invalid external job ID
invalid_job_id = "invalid@job#id"
with pytest.raises(InvalidExternalJobIdFormatError) as exc_info:
validation_service.validate_external_job_id(invalid_job_id)
error = exc_info.value
assert error.job_id == invalid_job_id
assert "alphanumeric" in error.format_description
def test_n8n_notification_failure_resilience(self):
"""Test that N8N notification failures don't break job processing."""
from infrastructure.clients.n8n.n8n_client import N8NClient
from infrastructure.services.n8n_notification_service import N8NNotificationService
# Create service with mock client that always fails
mock_client = Mock(spec=N8NClient)
mock_client.post_completion_event = AsyncMock(
side_effect=Exception("N8N service down")
)
service = N8NNotificationService(mock_client)
# This should not raise an exception
# (would need to be async test in real scenario)
assert service is not None # Basic test that service was created