audio-processor / tests /test_error_handling.py
vitek
add bearer token to endpoints
acb0ec6
raw
history blame
17.6 kB
"""Unit tests for error handling scenarios."""
import pytest
from unittest.mock import Mock, AsyncMock, patch
from fastapi import HTTPException
from domain.exceptions.domain_exceptions import (
ValidationError,
InvalidExternalJobIdFormatError,
DuplicateExternalJobIdError,
JobNotFoundError,
JobNotCompletedError,
AuthenticationError,
NotificationFailureError
)
from infrastructure.services.n8n_notification_service import redact_bearer_token
from interfaces.api.middleware.error_handler import redact_sensitive_data
from domain.services.validation_service import ValidationService
class TestDomainExceptions:
"""Test domain exception classes."""
def test_invalid_external_job_id_format_error(self):
"""Test InvalidExternalJobIdFormatError creation and attributes."""
job_id = "invalid@job#id"
description = "Must contain only alphanumeric characters"
error = InvalidExternalJobIdFormatError(job_id, description)
assert error.job_id == job_id
assert error.format_description == description
assert str(error) == f"Invalid external job ID format: {job_id}. {description}"
def test_duplicate_external_job_id_error(self):
"""Test DuplicateExternalJobIdError creation and attributes."""
job_id = "existing-job-123"
error = DuplicateExternalJobIdError(job_id)
assert error.external_job_id == job_id
assert str(error) == f"External job ID already exists: {job_id}"
def test_job_not_found_error(self):
"""Test JobNotFoundError creation and attributes."""
job_id = "non-existent-job"
error = JobNotFoundError(job_id)
assert error.job_id == job_id
assert str(error) == f"Job not found: {job_id}"
def test_job_not_completed_error(self):
"""Test JobNotCompletedError creation and attributes."""
job_id = "processing-job"
status = "processing"
error = JobNotCompletedError(job_id, status)
assert error.job_id == job_id
assert error.status == status
assert str(error) == f"Job {job_id} is not completed (status: {status})"
def test_authentication_error(self):
"""Test AuthenticationError creation."""
error = AuthenticationError("Invalid token")
assert str(error) == "Invalid token"
def test_notification_failure_error(self):
"""Test NotificationFailureError creation."""
service = "N8N"
details = "Connection timeout"
error = NotificationFailureError(service, details)
assert error.service == service
assert error.details == details
assert str(error) == f"Notification to {service} failed: {details}"
class TestValidationService:
"""Test ValidationService error handling."""
def setup_method(self):
"""Set up test fixtures."""
self.validation_service = ValidationService(
max_file_size_mb=100.0,
supported_video_formats=['.mp4', '.avi', '.mov'],
supported_audio_formats=['mp3', 'aac', 'wav']
)
def test_validate_external_job_id_valid_formats(self):
"""Test valid external job ID formats."""
valid_ids = [
"job-123",
"job_456",
"MyJob789",
"a",
"job-with-underscores_and-hyphens",
"1234567890",
None, # None should be valid (optional field)
"", # Empty string should be valid (optional field)
]
for job_id in valid_ids:
# Should not raise any exception
self.validation_service.validate_external_job_id(job_id)
def test_validate_external_job_id_invalid_formats(self):
"""Test invalid external job ID formats."""
invalid_cases = [
("job@123", "Must contain only alphanumeric characters, underscores, and hyphens"),
("job#456", "Must contain only alphanumeric characters, underscores, and hyphens"),
("job 789", "Must contain only alphanumeric characters, underscores, and hyphens"),
("job.txt", "Must contain only alphanumeric characters, underscores, and hyphens"),
("job+extra", "Must contain only alphanumeric characters, underscores, and hyphens"),
("a" * 51, "Must be 50 characters or less"), # Too long
]
for job_id, expected_description in invalid_cases:
with pytest.raises(InvalidExternalJobIdFormatError) as exc_info:
self.validation_service.validate_external_job_id(job_id)
assert exc_info.value.job_id == job_id
assert expected_description in exc_info.value.format_description
class TestSecureLogging:
"""Test secure logging with token redaction."""
def test_redact_bearer_token_function(self):
"""Test redaction of bearer tokens from log messages."""
test_cases = [
(
"Bearer token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature",
"Bearer token: ***JWT***"
),
(
'Authorization: "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature"',
'Authorization: "Bearer ***JWT***"'
),
(
"No tokens here",
"No tokens here"
),
(
"bearer: sometoken",
"bearer: ***"
),
]
for input_msg, expected_output in test_cases:
result = redact_bearer_token(input_msg)
assert result == expected_output
def test_redact_sensitive_data_middleware(self):
"""Test middleware function for redacting sensitive data."""
test_cases = [
(
'Authorization: Bearer eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature',
'Authorization: Bearer ***'
),
(
'"Authorization": "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature"',
'"Authorization": "Bearer ***"'
),
(
"JWT token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature",
"JWT token: ***JWT***"
),
(
"No sensitive data here",
"No sensitive data here"
),
]
for input_data, expected_output in test_cases:
result = redact_sensitive_data(input_data)
assert result == expected_output
def test_redact_multiple_tokens(self):
"""Test redaction of multiple tokens in the same message."""
input_msg = (
"User1 token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature1 "
"and User2 token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiI5ODc2NTQzMjEwIn0.signature2"
)
result = redact_bearer_token(input_msg)
# Both tokens should be redacted
assert "signature1" not in result
assert "signature2" not in result
assert "***JWT***" in result
class TestN8NNotificationErrorHandling:
"""Test N8N notification service error handling."""
def setup_method(self):
"""Set up test fixtures."""
from infrastructure.clients.n8n.n8n_client import N8NClient
from infrastructure.services.n8n_notification_service import N8NNotificationService
self.mock_n8n_client = Mock(spec=N8NClient)
self.service = N8NNotificationService(self.mock_n8n_client)
@pytest.mark.asyncio
async def test_notification_service_handles_client_exceptions(self):
"""Test that notification service handles all types of client exceptions."""
from infrastructure.clients.n8n.exceptions import APIClientError, APIConnectionError, APIResponseError
exceptions_to_test = [
Exception("Generic error"),
APIClientError("Client error"),
APIConnectionError("Connection failed"),
APIResponseError("Invalid response", 500),
TimeoutError("Request timeout"),
ConnectionRefusedError("Connection refused"),
]
for exception in exceptions_to_test:
# Reset mock for each test
self.mock_n8n_client.post_completion_event = AsyncMock(side_effect=exception)
# Service should handle all exceptions gracefully
result = await self.service.send_job_completion_notification(
job_id="test-job",
status="completed",
processing_time=10.0
)
# Should return failed acknowledgment without raising
assert result.acknowledged is False
@pytest.mark.asyncio
async def test_notification_service_logs_sanitized_errors(self):
"""Test that notification service logs errors with sensitive data redacted."""
from infrastructure.clients.n8n.models import WebhooksResponse
# Create an exception with sensitive data
sensitive_error = Exception(
"API error with token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature"
)
self.mock_n8n_client.post_completion_event = AsyncMock(side_effect=sensitive_error)
with patch('infrastructure.services.n8n_notification_service.logger') as mock_logger:
result = await self.service.send_job_completion_notification(
job_id="test-job",
status="completed",
processing_time=5.0
)
# Verify error was logged
mock_logger.error.assert_called_once()
logged_message = mock_logger.error.call_args[0][0]
# Verify sensitive data was redacted
assert "signature" not in logged_message
assert "***JWT***" in logged_message
assert result.acknowledged is False
class TestAPIErrorResponses:
"""Test API endpoint error response formats."""
def test_job_not_found_error_response_format(self):
"""Test JobNotFoundError creates proper HTTP error response."""
job_id = "missing-job-123"
error = JobNotFoundError(job_id)
# Simulate how the route handler would format the error
expected_response = {
"error": str(error),
"code": "JOB_NOT_FOUND",
"job_id": job_id
}
# Verify the error object has the needed attributes
assert error.job_id == job_id
assert str(error) == f"Job not found: {job_id}"
def test_duplicate_job_id_error_response_format(self):
"""Test DuplicateExternalJobIdError creates proper HTTP error response."""
external_job_id = "duplicate-job-456"
error = DuplicateExternalJobIdError(external_job_id)
# Simulate how the route handler would format the error
expected_response = {
"error": str(error),
"code": "DUPLICATE_EXTERNAL_JOB_ID",
"external_job_id": external_job_id
}
# Verify the error object has the needed attributes
assert error.external_job_id == external_job_id
assert str(error) == f"External job ID already exists: {external_job_id}"
def test_invalid_job_id_format_error_response(self):
"""Test InvalidExternalJobIdFormatError creates proper HTTP error response."""
job_id = "invalid@job"
description = "Must contain only alphanumeric characters"
error = InvalidExternalJobIdFormatError(job_id, description)
# Simulate how the route handler would format the error
expected_response = {
"error": "Invalid external job ID format",
"details": str(error),
"code": "INVALID_EXTERNAL_JOB_ID_FORMAT",
"field": "job_id",
"value": job_id
}
# Verify the error object has the needed attributes
assert error.job_id == job_id
assert error.format_description == description
class TestErrorHandlingMiddleware:
"""Test error handling middleware functions."""
def test_secure_logging_redacts_authorization_headers(self):
"""Test that authorization headers are properly redacted in logs."""
test_data = '''
{
"headers": {
"Authorization": "Bearer eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature",
"Content-Type": "application/json"
}
}
'''
redacted = redact_sensitive_data(test_data)
# Should redact the token but keep the Bearer prefix
assert "Bearer ***" in redacted
assert "signature" not in redacted
assert "Content-Type" in redacted # Other headers should remain
def test_secure_logging_handles_various_token_formats(self):
"""Test that various token formats are handled correctly."""
test_cases = [
'Authorization: Bearer token.goes.here',
'"Authorization": "Bearer token.goes.here"',
"Authorization: 'Bearer token.goes.here'",
'bearer: token.goes.here',
'"bearer": "token.goes.here"',
]
for test_data in test_cases:
redacted = redact_sensitive_data(test_data)
# Should not contain the full token
assert "token.goes.here" not in redacted
# Should contain redaction indicator
assert "***" in redacted
def test_redaction_preserves_non_sensitive_data(self):
"""Test that redaction doesn't affect non-sensitive data."""
test_data = "User ID: 12345, Email: user@example.com, Status: active"
redacted = redact_sensitive_data(test_data)
# Should be unchanged since no sensitive data
assert redacted == test_data
class TestValidationErrorHandling:
"""Test validation error handling in various scenarios."""
def test_time_format_validation_errors(self):
"""Test time format validation error handling."""
validation_service = ValidationService(
max_file_size_mb=100.0,
supported_video_formats=['.mp4'],
supported_audio_formats=['mp3']
)
invalid_times = [
("25:30:00", "Invalid minutes"), # Invalid minutes
("12:60:00", "Invalid minutes"), # Invalid minutes
("12:30:60", "Invalid seconds"), # Invalid seconds
("invalid", "Invalid time format"), # Wrong format
("12:3:45", "Invalid time format"), # Wrong format (missing zero)
]
for time_str, expected_error_type in invalid_times:
with pytest.raises(ValidationError) as exc_info:
validation_service.validate_time_format(time_str)
assert expected_error_type.lower() in str(exc_info.value).lower()
# Integration test for complete error flow
class TestErrorFlowIntegration:
"""Test complete error handling flow from domain to API response."""
@pytest.mark.asyncio
async def test_complete_authentication_error_flow(self):
"""Test complete flow from missing token to 401 response."""
from interfaces.api.dependencies import validate_bearer_token
# Test missing authorization header
with pytest.raises(HTTPException) as exc_info:
await validate_bearer_token(None)
assert exc_info.value.status_code == 401
assert "Missing Authorization header" in exc_info.value.detail
assert exc_info.value.headers["WWW-Authenticate"] == "Bearer"
@pytest.mark.asyncio
async def test_complete_validation_error_flow(self):
"""Test complete flow from invalid job ID to 400 response."""
validation_service = ValidationService(
max_file_size_mb=100.0,
supported_video_formats=['.mp4'],
supported_audio_formats=['mp3']
)
# Test invalid external job ID
invalid_job_id = "invalid@job#id"
with pytest.raises(InvalidExternalJobIdFormatError) as exc_info:
validation_service.validate_external_job_id(invalid_job_id)
error = exc_info.value
assert error.job_id == invalid_job_id
assert "alphanumeric" in error.format_description
def test_n8n_notification_failure_resilience(self):
"""Test that N8N notification failures don't break job processing."""
from infrastructure.clients.n8n.n8n_client import N8NClient
from infrastructure.services.n8n_notification_service import N8NNotificationService
# Create service with mock client that always fails
mock_client = Mock(spec=N8NClient)
mock_client.post_completion_event = AsyncMock(
side_effect=Exception("N8N service down")
)
service = N8NNotificationService(mock_client)
# This should not raise an exception
# (would need to be async test in real scenario)
assert service is not None # Basic test that service was created