| """Integration tests for N8N notification with bearer token headers.""" |
| import pytest |
| from unittest.mock import Mock, AsyncMock, patch |
| import httpx |
| from infrastructure.clients.n8n.n8n_client import N8NClient |
| from infrastructure.clients.n8n.models import WebhooksRequest, WebhooksResponse |
| from infrastructure.clients.n8n.settings import ClientSettings |
| from infrastructure.services.n8n_notification_service import N8NNotificationService |
| import logging |
|
|
|
|
| class TestN8NIntegrationWithBearerToken: |
| """Integration tests for N8N notification with bearer tokens.""" |
| |
| def setup_method(self): |
| """Set up test fixtures.""" |
| self.settings = ClientSettings( |
| base_url="http://test-n8n.com", |
| token="n8n-service-token" |
| ) |
| self.logger = logging.getLogger("test") |
| |
| @pytest.mark.asyncio |
| async def test_complete_n8n_notification_flow_with_bearer_token(self): |
| """Test complete N8N notification flow including bearer token headers.""" |
| client = N8NClient(self.settings, self.logger) |
| service = N8NNotificationService(client) |
| |
| bearer_token = "client-bearer-token-xyz" |
| job_id = "integration-test-job" |
| |
| |
| mock_response = Mock() |
| mock_response.status_code = 200 |
| mock_response.json.return_value = {"acknowledged": True} |
| mock_response.text = '{"acknowledged": true}' |
| mock_response.headers = {"content-type": "application/json"} |
| |
| with patch.object(client._client, 'request', return_value=mock_response) as mock_request: |
| result = await service.send_job_completion_notification( |
| job_id=job_id, |
| status="completed", |
| processing_time=45.7, |
| bearer_token=bearer_token |
| ) |
| |
| |
| assert result.acknowledged is True |
| |
| |
| mock_request.assert_called_once() |
| call_kwargs = mock_request.call_args.kwargs |
| |
| assert call_kwargs["method"] == "POST" |
| assert call_kwargs["url"] == "/lovable-analysis" |
| assert call_kwargs["json"] == {"message": f"Job {job_id} completed in 45.70s"} |
| |
| |
| expected_headers = { |
| "rowID": job_id, |
| "Authorization": f"Bearer {bearer_token}" |
| } |
| assert call_kwargs["headers"] == expected_headers |
| |
| @pytest.mark.asyncio |
| async def test_n8n_notification_without_bearer_token(self): |
| """Test N8N notification flow without client bearer token.""" |
| client = N8NClient(self.settings, self.logger) |
| service = N8NNotificationService(client) |
| |
| job_id = "no-token-test-job" |
| |
| |
| mock_response = Mock() |
| mock_response.status_code = 200 |
| mock_response.json.return_value = {"acknowledged": True} |
| mock_response.text = '{"acknowledged": true}' |
| mock_response.headers = {"content-type": "application/json"} |
| |
| with patch.object(client._client, 'request', return_value=mock_response) as mock_request: |
| result = await service.send_job_completion_notification( |
| job_id=job_id, |
| status="failed", |
| processing_time=12.3, |
| bearer_token=None |
| ) |
| |
| |
| assert result.acknowledged is True |
| |
| |
| mock_request.assert_called_once() |
| call_kwargs = mock_request.call_args.kwargs |
| |
| |
| expected_headers = {"rowID": job_id} |
| assert call_kwargs["headers"] == expected_headers |
| |
| @pytest.mark.asyncio |
| async def test_n8n_client_error_handling_preserves_bearer_token_context(self): |
| """Test that N8N client errors don't leak bearer token information.""" |
| client = N8NClient(self.settings, self.logger) |
| service = N8NNotificationService(client) |
| |
| bearer_token = "secret-bearer-token-123" |
| job_id = "error-test-job" |
| |
| |
| mock_error = httpx.RequestError( |
| f"Connection failed with bearer token {bearer_token}" |
| ) |
| |
| with patch.object(client._client, 'request', side_effect=mock_error), \ |
| patch('infrastructure.services.n8n_notification_service.logger') as mock_logger: |
| |
| result = await service.send_job_completion_notification( |
| job_id=job_id, |
| status="completed", |
| processing_time=30.0, |
| bearer_token=bearer_token |
| ) |
| |
| |
| assert result.acknowledged is False |
| |
| |
| mock_logger.error.assert_called_once() |
| logged_message = mock_logger.error.call_args[0][0] |
| |
| |
| assert bearer_token not in logged_message |
| assert "***" in logged_message or "redacted" in logged_message.lower() |
| |
| @pytest.mark.asyncio |
| async def test_concurrent_n8n_notifications_with_different_tokens(self): |
| """Test concurrent N8N notifications with different bearer tokens.""" |
| import asyncio |
| |
| client = N8NClient(self.settings, self.logger) |
| service = N8NNotificationService(client) |
| |
| |
| test_cases = [ |
| ("job-1", "token-1"), |
| ("job-2", "token-2"), |
| ("job-3", "token-3"), |
| ("job-4", None), |
| ("job-5", "token-5"), |
| ] |
| |
| |
| requests_made = [] |
| |
| async def mock_request(**kwargs): |
| requests_made.append(kwargs) |
| mock_response = Mock() |
| mock_response.status_code = 200 |
| mock_response.json.return_value = {"acknowledged": True} |
| mock_response.text = '{"acknowledged": true}' |
| mock_response.headers = {"content-type": "application/json"} |
| return mock_response |
| |
| with patch.object(client._client, 'request', side_effect=mock_request): |
| |
| tasks = [ |
| service.send_job_completion_notification( |
| job_id=job_id, |
| status="completed", |
| processing_time=float(i * 10), |
| bearer_token=token |
| ) |
| for i, (job_id, token) in enumerate(test_cases) |
| ] |
| |
| results = await asyncio.gather(*tasks) |
| |
| |
| assert all(result.acknowledged for result in results) |
| assert len(requests_made) == len(test_cases) |
| |
| |
| for i, (job_id, token) in enumerate(test_cases): |
| request = requests_made[i] |
| expected_headers = {"rowID": job_id} |
| |
| if token: |
| expected_headers["Authorization"] = f"Bearer {token}" |
| |
| assert request["headers"] == expected_headers |
| |
| @pytest.mark.asyncio |
| async def test_n8n_service_resilience_to_network_failures(self): |
| """Test that N8N service is resilient to various network failures.""" |
| client = N8NClient(self.settings, self.logger) |
| service = N8NNotificationService(client) |
| |
| bearer_token = "resilience-test-token" |
| job_id = "resilience-test-job" |
| |
| |
| failure_scenarios = [ |
| httpx.ConnectTimeout("Connection timeout"), |
| httpx.ReadTimeout("Read timeout"), |
| httpx.NetworkError("Network unreachable"), |
| httpx.RemoteProtocolError("Protocol error"), |
| ] |
| |
| for i, error in enumerate(failure_scenarios): |
| with patch.object(client._client, 'request', side_effect=error), \ |
| patch('infrastructure.services.n8n_notification_service.logger') as mock_logger: |
| |
| result = await service.send_job_completion_notification( |
| job_id=f"{job_id}-{i}", |
| status="completed", |
| processing_time=25.0, |
| bearer_token=bearer_token |
| ) |
| |
| |
| assert result.acknowledged is False |
| |
| |
| mock_logger.error.assert_called_once() |
| logged_message = mock_logger.error.call_args[0][0] |
| assert bearer_token not in logged_message |
| |
| @pytest.mark.asyncio |
| async def test_n8n_webhook_payload_format_consistency(self): |
| """Test that N8N webhook payload format is consistent.""" |
| client = N8NClient(self.settings, self.logger) |
| service = N8NNotificationService(client) |
| |
| test_scenarios = [ |
| ("job-123", "completed", 45.67), |
| ("job-456", "failed", 12.34), |
| ("job-789", "timeout", 300.0), |
| ("job-abc", "cancelled", 5.5), |
| ] |
| |
| requests_captured = [] |
| |
| async def capture_request(**kwargs): |
| requests_captured.append(kwargs) |
| mock_response = Mock() |
| mock_response.status_code = 200 |
| mock_response.json.return_value = {"acknowledged": True} |
| mock_response.text = '{"acknowledged": true}' |
| mock_response.headers = {"content-type": "application/json"} |
| return mock_response |
| |
| with patch.object(client._client, 'request', side_effect=capture_request): |
| for job_id, status, processing_time in test_scenarios: |
| await service.send_job_completion_notification( |
| job_id=job_id, |
| status=status, |
| processing_time=processing_time, |
| bearer_token="test-token" |
| ) |
| |
| |
| for i, (job_id, status, processing_time) in enumerate(test_scenarios): |
| request = requests_captured[i] |
| |
| |
| assert request["method"] == "POST" |
| assert request["url"] == "/lovable-analysis" |
| |
| |
| payload = request["json"] |
| expected_message = f"Job {job_id} {status} in {processing_time:.2f}s" |
| assert payload["message"] == expected_message |
| |
| |
| headers = request["headers"] |
| assert headers["rowID"] == job_id |
| assert headers["Authorization"] == "Bearer test-token" |
| |
| @pytest.mark.asyncio |
| async def test_n8n_client_authentication_header_priority(self): |
| """Test that client bearer token takes priority over service token.""" |
| client = N8NClient(self.settings, self.logger) |
| |
| client_bearer_token = "client-priority-token" |
| job_id = "priority-test-job" |
| |
| mock_response = Mock() |
| mock_response.status_code = 200 |
| mock_response.json.return_value = {"acknowledged": True} |
| mock_response.text = '{"acknowledged": true}' |
| mock_response.headers = {"content-type": "application/json"} |
| |
| with patch.object(client._client, 'request', return_value=mock_response) as mock_request: |
| request_data = WebhooksRequest(message="Test message", job_id=job_id) |
| |
| await client.post_completion_event(request_data, client_bearer_token) |
| |
| |
| call_kwargs = mock_request.call_args.kwargs |
| headers = call_kwargs["headers"] |
| |
| |
| assert headers["Authorization"] == f"Bearer {client_bearer_token}" |
| assert headers["Authorization"] != f"Bearer {self.settings.token}" |
|
|
|
|
| class TestN8NIntegrationErrorScenarios: |
| """Test N8N integration error scenarios.""" |
| |
| def setup_method(self): |
| """Set up test fixtures.""" |
| self.settings = ClientSettings( |
| base_url="http://test-n8n.com", |
| token="n8n-service-token" |
| ) |
| self.logger = logging.getLogger("test") |
| |
| @pytest.mark.asyncio |
| async def test_n8n_server_error_handling(self): |
| """Test handling of N8N server errors.""" |
| client = N8NClient(self.settings, self.logger) |
| service = N8NNotificationService(client) |
| |
| |
| error_responses = [ |
| (400, "Bad Request"), |
| (401, "Unauthorized"), |
| (403, "Forbidden"), |
| (404, "Not Found"), |
| (500, "Internal Server Error"), |
| (502, "Bad Gateway"), |
| (503, "Service Unavailable"), |
| ] |
| |
| for status_code, error_text in error_responses: |
| mock_response = Mock() |
| mock_response.status_code = status_code |
| mock_response.text = error_text |
| |
| with patch.object(client._client, 'request', return_value=mock_response), \ |
| patch('infrastructure.services.n8n_notification_service.logger') as mock_logger: |
| |
| result = await service.send_job_completion_notification( |
| job_id=f"error-{status_code}-job", |
| status="completed", |
| processing_time=10.0, |
| bearer_token="test-token" |
| ) |
| |
| |
| assert result.acknowledged is False |
| |
| |
| mock_logger.error.assert_called_once() |
| |
| @pytest.mark.asyncio |
| async def test_n8n_malformed_response_handling(self): |
| """Test handling of malformed N8N responses.""" |
| client = N8NClient(self.settings, self.logger) |
| service = N8NNotificationService(client) |
| |
| |
| malformed_responses = [ |
| '{"malformed": json}', |
| '{"missing": "acknowledged"}', |
| '', |
| 'plain text response', |
| ] |
| |
| for i, response_text in enumerate(malformed_responses): |
| mock_response = Mock() |
| mock_response.status_code = 200 |
| mock_response.text = response_text |
| |
| if "json}" in response_text or response_text == '': |
| mock_response.json.side_effect = ValueError("Invalid JSON") |
| else: |
| mock_response.json.return_value = {} |
| |
| with patch.object(client._client, 'request', return_value=mock_response), \ |
| patch('infrastructure.services.n8n_notification_service.logger'): |
| |
| result = await service.send_job_completion_notification( |
| job_id=f"malformed-{i}-job", |
| status="completed", |
| processing_time=15.0, |
| bearer_token="test-token" |
| ) |
| |
| |
| |
| assert isinstance(result.acknowledged, bool) |