Spaces:
Sleeping
Sleeping
File size: 17,579 Bytes
dbe78dd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 |
"""Unit tests for error handling scenarios."""
import pytest
from unittest.mock import Mock, AsyncMock, patch
from fastapi import HTTPException
from domain.exceptions.domain_exceptions import (
ValidationError,
InvalidExternalJobIdFormatError,
DuplicateExternalJobIdError,
JobNotFoundError,
JobNotCompletedError,
AuthenticationError,
NotificationFailureError
)
from infrastructure.services.n8n_notification_service import redact_bearer_token
from interfaces.api.middleware.error_handler import redact_sensitive_data
from domain.services.validation_service import ValidationService
class TestDomainExceptions:
"""Test domain exception classes."""
def test_invalid_external_job_id_format_error(self):
"""Test InvalidExternalJobIdFormatError creation and attributes."""
job_id = "invalid@job#id"
description = "Must contain only alphanumeric characters"
error = InvalidExternalJobIdFormatError(job_id, description)
assert error.job_id == job_id
assert error.format_description == description
assert str(error) == f"Invalid external job ID format: {job_id}. {description}"
def test_duplicate_external_job_id_error(self):
"""Test DuplicateExternalJobIdError creation and attributes."""
job_id = "existing-job-123"
error = DuplicateExternalJobIdError(job_id)
assert error.external_job_id == job_id
assert str(error) == f"External job ID already exists: {job_id}"
def test_job_not_found_error(self):
"""Test JobNotFoundError creation and attributes."""
job_id = "non-existent-job"
error = JobNotFoundError(job_id)
assert error.job_id == job_id
assert str(error) == f"Job not found: {job_id}"
def test_job_not_completed_error(self):
"""Test JobNotCompletedError creation and attributes."""
job_id = "processing-job"
status = "processing"
error = JobNotCompletedError(job_id, status)
assert error.job_id == job_id
assert error.status == status
assert str(error) == f"Job {job_id} is not completed (status: {status})"
def test_authentication_error(self):
"""Test AuthenticationError creation."""
error = AuthenticationError("Invalid token")
assert str(error) == "Invalid token"
def test_notification_failure_error(self):
"""Test NotificationFailureError creation."""
service = "N8N"
details = "Connection timeout"
error = NotificationFailureError(service, details)
assert error.service == service
assert error.details == details
assert str(error) == f"Notification to {service} failed: {details}"
class TestValidationService:
"""Test ValidationService error handling."""
def setup_method(self):
"""Set up test fixtures."""
self.validation_service = ValidationService(
max_file_size_mb=100.0,
supported_video_formats=['.mp4', '.avi', '.mov'],
supported_audio_formats=['mp3', 'aac', 'wav']
)
def test_validate_external_job_id_valid_formats(self):
"""Test valid external job ID formats."""
valid_ids = [
"job-123",
"job_456",
"MyJob789",
"a",
"job-with-underscores_and-hyphens",
"1234567890",
None, # None should be valid (optional field)
"", # Empty string should be valid (optional field)
]
for job_id in valid_ids:
# Should not raise any exception
self.validation_service.validate_external_job_id(job_id)
def test_validate_external_job_id_invalid_formats(self):
"""Test invalid external job ID formats."""
invalid_cases = [
("job@123", "Must contain only alphanumeric characters, underscores, and hyphens"),
("job#456", "Must contain only alphanumeric characters, underscores, and hyphens"),
("job 789", "Must contain only alphanumeric characters, underscores, and hyphens"),
("job.txt", "Must contain only alphanumeric characters, underscores, and hyphens"),
("job+extra", "Must contain only alphanumeric characters, underscores, and hyphens"),
("a" * 51, "Must be 50 characters or less"), # Too long
]
for job_id, expected_description in invalid_cases:
with pytest.raises(InvalidExternalJobIdFormatError) as exc_info:
self.validation_service.validate_external_job_id(job_id)
assert exc_info.value.job_id == job_id
assert expected_description in exc_info.value.format_description
class TestSecureLogging:
"""Test secure logging with token redaction."""
def test_redact_bearer_token_function(self):
"""Test redaction of bearer tokens from log messages."""
test_cases = [
(
"Bearer token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature",
"Bearer token: ***JWT***"
),
(
'Authorization: "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature"',
'Authorization: "Bearer ***JWT***"'
),
(
"No tokens here",
"No tokens here"
),
(
"bearer: sometoken",
"bearer: ***"
),
]
for input_msg, expected_output in test_cases:
result = redact_bearer_token(input_msg)
assert result == expected_output
def test_redact_sensitive_data_middleware(self):
"""Test middleware function for redacting sensitive data."""
test_cases = [
(
'Authorization: Bearer eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature',
'Authorization: Bearer ***'
),
(
'"Authorization": "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature"',
'"Authorization": "Bearer ***"'
),
(
"JWT token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature",
"JWT token: ***JWT***"
),
(
"No sensitive data here",
"No sensitive data here"
),
]
for input_data, expected_output in test_cases:
result = redact_sensitive_data(input_data)
assert result == expected_output
def test_redact_multiple_tokens(self):
"""Test redaction of multiple tokens in the same message."""
input_msg = (
"User1 token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature1 "
"and User2 token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiI5ODc2NTQzMjEwIn0.signature2"
)
result = redact_bearer_token(input_msg)
# Both tokens should be redacted
assert "signature1" not in result
assert "signature2" not in result
assert "***JWT***" in result
class TestN8NNotificationErrorHandling:
"""Test N8N notification service error handling."""
def setup_method(self):
"""Set up test fixtures."""
from infrastructure.clients.n8n.n8n_client import N8NClient
from infrastructure.services.n8n_notification_service import N8NNotificationService
self.mock_n8n_client = Mock(spec=N8NClient)
self.service = N8NNotificationService(self.mock_n8n_client)
@pytest.mark.asyncio
async def test_notification_service_handles_client_exceptions(self):
"""Test that notification service handles all types of client exceptions."""
from infrastructure.clients.n8n.exceptions import APIClientError, APIConnectionError, APIResponseError
exceptions_to_test = [
Exception("Generic error"),
APIClientError("Client error"),
APIConnectionError("Connection failed"),
APIResponseError("Invalid response", 500),
TimeoutError("Request timeout"),
ConnectionRefusedError("Connection refused"),
]
for exception in exceptions_to_test:
# Reset mock for each test
self.mock_n8n_client.post_completion_event = AsyncMock(side_effect=exception)
# Service should handle all exceptions gracefully
result = await self.service.send_job_completion_notification(
job_id="test-job",
status="completed",
processing_time=10.0
)
# Should return failed acknowledgment without raising
assert result.acknowledged is False
@pytest.mark.asyncio
async def test_notification_service_logs_sanitized_errors(self):
"""Test that notification service logs errors with sensitive data redacted."""
from infrastructure.clients.n8n.models import WebhooksResponse
# Create an exception with sensitive data
sensitive_error = Exception(
"API error with token: eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature"
)
self.mock_n8n_client.post_completion_event = AsyncMock(side_effect=sensitive_error)
with patch('infrastructure.services.n8n_notification_service.logger') as mock_logger:
result = await self.service.send_job_completion_notification(
job_id="test-job",
status="completed",
processing_time=5.0
)
# Verify error was logged
mock_logger.error.assert_called_once()
logged_message = mock_logger.error.call_args[0][0]
# Verify sensitive data was redacted
assert "signature" not in logged_message
assert "***JWT***" in logged_message
assert result.acknowledged is False
class TestAPIErrorResponses:
"""Test API endpoint error response formats."""
def test_job_not_found_error_response_format(self):
"""Test JobNotFoundError creates proper HTTP error response."""
job_id = "missing-job-123"
error = JobNotFoundError(job_id)
# Simulate how the route handler would format the error
expected_response = {
"error": str(error),
"code": "JOB_NOT_FOUND",
"job_id": job_id
}
# Verify the error object has the needed attributes
assert error.job_id == job_id
assert str(error) == f"Job not found: {job_id}"
def test_duplicate_job_id_error_response_format(self):
"""Test DuplicateExternalJobIdError creates proper HTTP error response."""
external_job_id = "duplicate-job-456"
error = DuplicateExternalJobIdError(external_job_id)
# Simulate how the route handler would format the error
expected_response = {
"error": str(error),
"code": "DUPLICATE_EXTERNAL_JOB_ID",
"external_job_id": external_job_id
}
# Verify the error object has the needed attributes
assert error.external_job_id == external_job_id
assert str(error) == f"External job ID already exists: {external_job_id}"
def test_invalid_job_id_format_error_response(self):
"""Test InvalidExternalJobIdFormatError creates proper HTTP error response."""
job_id = "invalid@job"
description = "Must contain only alphanumeric characters"
error = InvalidExternalJobIdFormatError(job_id, description)
# Simulate how the route handler would format the error
expected_response = {
"error": "Invalid external job ID format",
"details": str(error),
"code": "INVALID_EXTERNAL_JOB_ID_FORMAT",
"field": "job_id",
"value": job_id
}
# Verify the error object has the needed attributes
assert error.job_id == job_id
assert error.format_description == description
class TestErrorHandlingMiddleware:
"""Test error handling middleware functions."""
def test_secure_logging_redacts_authorization_headers(self):
"""Test that authorization headers are properly redacted in logs."""
test_data = '''
{
"headers": {
"Authorization": "Bearer eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature",
"Content-Type": "application/json"
}
}
'''
redacted = redact_sensitive_data(test_data)
# Should redact the token but keep the Bearer prefix
assert "Bearer ***" in redacted
assert "signature" not in redacted
assert "Content-Type" in redacted # Other headers should remain
def test_secure_logging_handles_various_token_formats(self):
"""Test that various token formats are handled correctly."""
test_cases = [
'Authorization: Bearer token.goes.here',
'"Authorization": "Bearer token.goes.here"',
"Authorization: 'Bearer token.goes.here'",
'bearer: token.goes.here',
'"bearer": "token.goes.here"',
]
for test_data in test_cases:
redacted = redact_sensitive_data(test_data)
# Should not contain the full token
assert "token.goes.here" not in redacted
# Should contain redaction indicator
assert "***" in redacted
def test_redaction_preserves_non_sensitive_data(self):
"""Test that redaction doesn't affect non-sensitive data."""
test_data = "User ID: 12345, Email: user@example.com, Status: active"
redacted = redact_sensitive_data(test_data)
# Should be unchanged since no sensitive data
assert redacted == test_data
class TestValidationErrorHandling:
"""Test validation error handling in various scenarios."""
def test_time_format_validation_errors(self):
"""Test time format validation error handling."""
validation_service = ValidationService(
max_file_size_mb=100.0,
supported_video_formats=['.mp4'],
supported_audio_formats=['mp3']
)
invalid_times = [
("25:30:00", "Invalid minutes"), # Invalid minutes
("12:60:00", "Invalid minutes"), # Invalid minutes
("12:30:60", "Invalid seconds"), # Invalid seconds
("invalid", "Invalid time format"), # Wrong format
("12:3:45", "Invalid time format"), # Wrong format (missing zero)
]
for time_str, expected_error_type in invalid_times:
with pytest.raises(ValidationError) as exc_info:
validation_service.validate_time_format(time_str)
assert expected_error_type.lower() in str(exc_info.value).lower()
# Integration test for complete error flow
class TestErrorFlowIntegration:
"""Test complete error handling flow from domain to API response."""
@pytest.mark.asyncio
async def test_complete_authentication_error_flow(self):
"""Test complete flow from missing token to 401 response."""
from interfaces.api.dependencies import validate_bearer_token
# Test missing authorization header
with pytest.raises(HTTPException) as exc_info:
await validate_bearer_token(None)
assert exc_info.value.status_code == 401
assert "Missing Authorization header" in exc_info.value.detail
assert exc_info.value.headers["WWW-Authenticate"] == "Bearer"
@pytest.mark.asyncio
async def test_complete_validation_error_flow(self):
"""Test complete flow from invalid job ID to 400 response."""
validation_service = ValidationService(
max_file_size_mb=100.0,
supported_video_formats=['.mp4'],
supported_audio_formats=['mp3']
)
# Test invalid external job ID
invalid_job_id = "invalid@job#id"
with pytest.raises(InvalidExternalJobIdFormatError) as exc_info:
validation_service.validate_external_job_id(invalid_job_id)
error = exc_info.value
assert error.job_id == invalid_job_id
assert "alphanumeric" in error.format_description
def test_n8n_notification_failure_resilience(self):
"""Test that N8N notification failures don't break job processing."""
from infrastructure.clients.n8n.n8n_client import N8NClient
from infrastructure.services.n8n_notification_service import N8NNotificationService
# Create service with mock client that always fails
mock_client = Mock(spec=N8NClient)
mock_client.post_completion_event = AsyncMock(
side_effect=Exception("N8N service down")
)
service = N8NNotificationService(mock_client)
# This should not raise an exception
# (would need to be async test in real scenario)
assert service is not None # Basic test that service was created |