Spaces:
Sleeping
Sleeping
| """Integration tests for N8N notification with bearer token headers.""" | |
| import pytest | |
| from unittest.mock import Mock, AsyncMock, patch | |
| import httpx | |
| from infrastructure.clients.n8n.n8n_client import N8NClient | |
| from infrastructure.clients.n8n.models import WebhooksRequest, WebhooksResponse | |
| from infrastructure.clients.n8n.settings import ClientSettings | |
| from infrastructure.services.n8n_notification_service import N8NNotificationService | |
| import logging | |
| class TestN8NIntegrationWithBearerToken: | |
| """Integration tests for N8N notification with bearer tokens.""" | |
| def setup_method(self): | |
| """Set up test fixtures.""" | |
| self.settings = ClientSettings( | |
| base_url="http://test-n8n.com", | |
| token="n8n-service-token" | |
| ) | |
| self.logger = logging.getLogger("test") | |
| async def test_complete_n8n_notification_flow_with_bearer_token(self): | |
| """Test complete N8N notification flow including bearer token headers.""" | |
| client = N8NClient(self.settings, self.logger) | |
| service = N8NNotificationService(client) | |
| bearer_token = "client-bearer-token-xyz" | |
| job_id = "integration-test-job" | |
| # Mock the HTTP response | |
| mock_response = Mock() | |
| mock_response.status_code = 200 | |
| mock_response.json.return_value = {"acknowledged": True} | |
| mock_response.text = '{"acknowledged": true}' | |
| mock_response.headers = {"content-type": "application/json"} | |
| with patch.object(client._client, 'request', return_value=mock_response) as mock_request: | |
| result = await service.send_job_completion_notification( | |
| job_id=job_id, | |
| status="completed", | |
| processing_time=45.7, | |
| bearer_token=bearer_token | |
| ) | |
| # Verify the result | |
| assert result.acknowledged is True | |
| # Verify the HTTP request was made correctly | |
| mock_request.assert_called_once() | |
| call_kwargs = mock_request.call_args.kwargs | |
| assert call_kwargs["method"] == "POST" | |
| assert call_kwargs["url"] == "/lovable-analysis" | |
| assert call_kwargs["json"] == {"message": f"Job {job_id} completed in 45.70s"} | |
| # Verify bearer token was included in headers | |
| expected_headers = { | |
| "rowID": job_id, | |
| "Authorization": f"Bearer {bearer_token}" | |
| } | |
| assert call_kwargs["headers"] == expected_headers | |
| async def test_n8n_notification_without_bearer_token(self): | |
| """Test N8N notification flow without client bearer token.""" | |
| client = N8NClient(self.settings, self.logger) | |
| service = N8NNotificationService(client) | |
| job_id = "no-token-test-job" | |
| # Mock the HTTP response | |
| mock_response = Mock() | |
| mock_response.status_code = 200 | |
| mock_response.json.return_value = {"acknowledged": True} | |
| mock_response.text = '{"acknowledged": true}' | |
| mock_response.headers = {"content-type": "application/json"} | |
| with patch.object(client._client, 'request', return_value=mock_response) as mock_request: | |
| result = await service.send_job_completion_notification( | |
| job_id=job_id, | |
| status="failed", | |
| processing_time=12.3, | |
| bearer_token=None | |
| ) | |
| # Verify the result | |
| assert result.acknowledged is True | |
| # Verify the HTTP request was made correctly | |
| mock_request.assert_called_once() | |
| call_kwargs = mock_request.call_args.kwargs | |
| # Verify only rowID header was included (no Authorization header) | |
| expected_headers = {"rowID": job_id} | |
| assert call_kwargs["headers"] == expected_headers | |
| async def test_n8n_client_error_handling_preserves_bearer_token_context(self): | |
| """Test that N8N client errors don't leak bearer token information.""" | |
| client = N8NClient(self.settings, self.logger) | |
| service = N8NNotificationService(client) | |
| bearer_token = "secret-bearer-token-123" | |
| job_id = "error-test-job" | |
| # Mock HTTP error that includes sensitive information | |
| mock_error = httpx.RequestError( | |
| f"Connection failed with bearer token {bearer_token}" | |
| ) | |
| with patch.object(client._client, 'request', side_effect=mock_error), \ | |
| patch('infrastructure.services.n8n_notification_service.logger') as mock_logger: | |
| result = await service.send_job_completion_notification( | |
| job_id=job_id, | |
| status="completed", | |
| processing_time=30.0, | |
| bearer_token=bearer_token | |
| ) | |
| # Service should handle error gracefully | |
| assert result.acknowledged is False | |
| # Verify error was logged but sensitive data was redacted | |
| mock_logger.error.assert_called_once() | |
| logged_message = mock_logger.error.call_args[0][0] | |
| # Bearer token should be redacted in logs | |
| assert bearer_token not in logged_message | |
| assert "***" in logged_message or "redacted" in logged_message.lower() | |
| async def test_concurrent_n8n_notifications_with_different_tokens(self): | |
| """Test concurrent N8N notifications with different bearer tokens.""" | |
| import asyncio | |
| client = N8NClient(self.settings, self.logger) | |
| service = N8NNotificationService(client) | |
| # Prepare test data for concurrent requests | |
| test_cases = [ | |
| ("job-1", "token-1"), | |
| ("job-2", "token-2"), | |
| ("job-3", "token-3"), | |
| ("job-4", None), # No token | |
| ("job-5", "token-5"), | |
| ] | |
| # Track the requests made | |
| requests_made = [] | |
| async def mock_request(**kwargs): | |
| requests_made.append(kwargs) | |
| mock_response = Mock() | |
| mock_response.status_code = 200 | |
| mock_response.json.return_value = {"acknowledged": True} | |
| mock_response.text = '{"acknowledged": true}' | |
| mock_response.headers = {"content-type": "application/json"} | |
| return mock_response | |
| with patch.object(client._client, 'request', side_effect=mock_request): | |
| # Run all notifications concurrently | |
| tasks = [ | |
| service.send_job_completion_notification( | |
| job_id=job_id, | |
| status="completed", | |
| processing_time=float(i * 10), | |
| bearer_token=token | |
| ) | |
| for i, (job_id, token) in enumerate(test_cases) | |
| ] | |
| results = await asyncio.gather(*tasks) | |
| # Verify all succeeded | |
| assert all(result.acknowledged for result in results) | |
| assert len(requests_made) == len(test_cases) | |
| # Verify each request had the correct headers | |
| for i, (job_id, token) in enumerate(test_cases): | |
| request = requests_made[i] | |
| expected_headers = {"rowID": job_id} | |
| if token: | |
| expected_headers["Authorization"] = f"Bearer {token}" | |
| assert request["headers"] == expected_headers | |
| async def test_n8n_service_resilience_to_network_failures(self): | |
| """Test that N8N service is resilient to various network failures.""" | |
| client = N8NClient(self.settings, self.logger) | |
| service = N8NNotificationService(client) | |
| bearer_token = "resilience-test-token" | |
| job_id = "resilience-test-job" | |
| # Test various network failure scenarios | |
| failure_scenarios = [ | |
| httpx.ConnectTimeout("Connection timeout"), | |
| httpx.ReadTimeout("Read timeout"), | |
| httpx.NetworkError("Network unreachable"), | |
| httpx.RemoteProtocolError("Protocol error"), | |
| ] | |
| for i, error in enumerate(failure_scenarios): | |
| with patch.object(client._client, 'request', side_effect=error), \ | |
| patch('infrastructure.services.n8n_notification_service.logger') as mock_logger: | |
| result = await service.send_job_completion_notification( | |
| job_id=f"{job_id}-{i}", | |
| status="completed", | |
| processing_time=25.0, | |
| bearer_token=bearer_token | |
| ) | |
| # Service should handle all network errors gracefully | |
| assert result.acknowledged is False | |
| # Error should be logged but not leak sensitive data | |
| mock_logger.error.assert_called_once() | |
| logged_message = mock_logger.error.call_args[0][0] | |
| assert bearer_token not in logged_message | |
| async def test_n8n_webhook_payload_format_consistency(self): | |
| """Test that N8N webhook payload format is consistent.""" | |
| client = N8NClient(self.settings, self.logger) | |
| service = N8NNotificationService(client) | |
| test_scenarios = [ | |
| ("job-123", "completed", 45.67), | |
| ("job-456", "failed", 12.34), | |
| ("job-789", "timeout", 300.0), | |
| ("job-abc", "cancelled", 5.5), | |
| ] | |
| requests_captured = [] | |
| async def capture_request(**kwargs): | |
| requests_captured.append(kwargs) | |
| mock_response = Mock() | |
| mock_response.status_code = 200 | |
| mock_response.json.return_value = {"acknowledged": True} | |
| mock_response.text = '{"acknowledged": true}' | |
| mock_response.headers = {"content-type": "application/json"} | |
| return mock_response | |
| with patch.object(client._client, 'request', side_effect=capture_request): | |
| for job_id, status, processing_time in test_scenarios: | |
| await service.send_job_completion_notification( | |
| job_id=job_id, | |
| status=status, | |
| processing_time=processing_time, | |
| bearer_token="test-token" | |
| ) | |
| # Verify payload format consistency | |
| for i, (job_id, status, processing_time) in enumerate(test_scenarios): | |
| request = requests_captured[i] | |
| # Verify standard request structure | |
| assert request["method"] == "POST" | |
| assert request["url"] == "/lovable-analysis" | |
| # Verify payload format | |
| payload = request["json"] | |
| expected_message = f"Job {job_id} {status} in {processing_time:.2f}s" | |
| assert payload["message"] == expected_message | |
| # Verify headers format | |
| headers = request["headers"] | |
| assert headers["rowID"] == job_id | |
| assert headers["Authorization"] == "Bearer test-token" | |
| async def test_n8n_client_authentication_header_priority(self): | |
| """Test that client bearer token takes priority over service token.""" | |
| client = N8NClient(self.settings, self.logger) | |
| client_bearer_token = "client-priority-token" | |
| job_id = "priority-test-job" | |
| mock_response = Mock() | |
| mock_response.status_code = 200 | |
| mock_response.json.return_value = {"acknowledged": True} | |
| mock_response.text = '{"acknowledged": true}' | |
| mock_response.headers = {"content-type": "application/json"} | |
| with patch.object(client._client, 'request', return_value=mock_response) as mock_request: | |
| request_data = WebhooksRequest(message="Test message", job_id=job_id) | |
| await client.post_completion_event(request_data, client_bearer_token) | |
| # Verify the request was made with client bearer token | |
| call_kwargs = mock_request.call_args.kwargs | |
| headers = call_kwargs["headers"] | |
| # Client token should override service token | |
| assert headers["Authorization"] == f"Bearer {client_bearer_token}" | |
| assert headers["Authorization"] != f"Bearer {self.settings.token}" | |
| class TestN8NIntegrationErrorScenarios: | |
| """Test N8N integration error scenarios.""" | |
| def setup_method(self): | |
| """Set up test fixtures.""" | |
| self.settings = ClientSettings( | |
| base_url="http://test-n8n.com", | |
| token="n8n-service-token" | |
| ) | |
| self.logger = logging.getLogger("test") | |
| async def test_n8n_server_error_handling(self): | |
| """Test handling of N8N server errors.""" | |
| client = N8NClient(self.settings, self.logger) | |
| service = N8NNotificationService(client) | |
| # Test various HTTP error responses | |
| error_responses = [ | |
| (400, "Bad Request"), | |
| (401, "Unauthorized"), | |
| (403, "Forbidden"), | |
| (404, "Not Found"), | |
| (500, "Internal Server Error"), | |
| (502, "Bad Gateway"), | |
| (503, "Service Unavailable"), | |
| ] | |
| for status_code, error_text in error_responses: | |
| mock_response = Mock() | |
| mock_response.status_code = status_code | |
| mock_response.text = error_text | |
| with patch.object(client._client, 'request', return_value=mock_response), \ | |
| patch('infrastructure.services.n8n_notification_service.logger') as mock_logger: | |
| result = await service.send_job_completion_notification( | |
| job_id=f"error-{status_code}-job", | |
| status="completed", | |
| processing_time=10.0, | |
| bearer_token="test-token" | |
| ) | |
| # Service should handle all HTTP errors gracefully | |
| assert result.acknowledged is False | |
| # Error should be logged | |
| mock_logger.error.assert_called_once() | |
| async def test_n8n_malformed_response_handling(self): | |
| """Test handling of malformed N8N responses.""" | |
| client = N8NClient(self.settings, self.logger) | |
| service = N8NNotificationService(client) | |
| # Test various malformed responses | |
| malformed_responses = [ | |
| '{"malformed": json}', # Invalid JSON | |
| '{"missing": "acknowledged"}', # Missing acknowledged field | |
| '', # Empty response | |
| 'plain text response', # Non-JSON response | |
| ] | |
| for i, response_text in enumerate(malformed_responses): | |
| mock_response = Mock() | |
| mock_response.status_code = 200 | |
| mock_response.text = response_text | |
| if "json}" in response_text or response_text == '': | |
| mock_response.json.side_effect = ValueError("Invalid JSON") | |
| else: | |
| mock_response.json.return_value = {} | |
| with patch.object(client._client, 'request', return_value=mock_response), \ | |
| patch('infrastructure.services.n8n_notification_service.logger'): | |
| result = await service.send_job_completion_notification( | |
| job_id=f"malformed-{i}-job", | |
| status="completed", | |
| processing_time=15.0, | |
| bearer_token="test-token" | |
| ) | |
| # Service should handle malformed responses gracefully | |
| # Result may be acknowledged=False depending on the response | |
| assert isinstance(result.acknowledged, bool) |