"""Integration tests for N8N notification with bearer token headers.""" import pytest from unittest.mock import Mock, AsyncMock, patch import httpx from infrastructure.clients.n8n.n8n_client import N8NClient from infrastructure.clients.n8n.models import WebhooksRequest, WebhooksResponse from infrastructure.clients.n8n.settings import ClientSettings from infrastructure.services.n8n_notification_service import N8NNotificationService import logging class TestN8NIntegrationWithBearerToken: """Integration tests for N8N notification with bearer tokens.""" def setup_method(self): """Set up test fixtures.""" self.settings = ClientSettings( base_url="http://test-n8n.com", token="n8n-service-token" ) self.logger = logging.getLogger("test") @pytest.mark.asyncio async def test_complete_n8n_notification_flow_with_bearer_token(self): """Test complete N8N notification flow including bearer token headers.""" client = N8NClient(self.settings, self.logger) service = N8NNotificationService(client) bearer_token = "client-bearer-token-xyz" job_id = "integration-test-job" # Mock the HTTP response mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = {"acknowledged": True} mock_response.text = '{"acknowledged": true}' mock_response.headers = {"content-type": "application/json"} with patch.object(client._client, 'request', return_value=mock_response) as mock_request: result = await service.send_job_completion_notification( job_id=job_id, status="completed", processing_time=45.7, bearer_token=bearer_token ) # Verify the result assert result.acknowledged is True # Verify the HTTP request was made correctly mock_request.assert_called_once() call_kwargs = mock_request.call_args.kwargs assert call_kwargs["method"] == "POST" assert call_kwargs["url"] == "/lovable-analysis" assert call_kwargs["json"] == {"message": f"Job {job_id} completed in 45.70s"} # Verify bearer token was included in headers expected_headers = { "rowID": job_id, "Authorization": f"Bearer {bearer_token}" } assert call_kwargs["headers"] == expected_headers @pytest.mark.asyncio async def test_n8n_notification_without_bearer_token(self): """Test N8N notification flow without client bearer token.""" client = N8NClient(self.settings, self.logger) service = N8NNotificationService(client) job_id = "no-token-test-job" # Mock the HTTP response mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = {"acknowledged": True} mock_response.text = '{"acknowledged": true}' mock_response.headers = {"content-type": "application/json"} with patch.object(client._client, 'request', return_value=mock_response) as mock_request: result = await service.send_job_completion_notification( job_id=job_id, status="failed", processing_time=12.3, bearer_token=None ) # Verify the result assert result.acknowledged is True # Verify the HTTP request was made correctly mock_request.assert_called_once() call_kwargs = mock_request.call_args.kwargs # Verify only rowID header was included (no Authorization header) expected_headers = {"rowID": job_id} assert call_kwargs["headers"] == expected_headers @pytest.mark.asyncio async def test_n8n_client_error_handling_preserves_bearer_token_context(self): """Test that N8N client errors don't leak bearer token information.""" client = N8NClient(self.settings, self.logger) service = N8NNotificationService(client) bearer_token = "secret-bearer-token-123" job_id = "error-test-job" # Mock HTTP error that includes sensitive information mock_error = httpx.RequestError( f"Connection failed with bearer token {bearer_token}" ) with patch.object(client._client, 'request', side_effect=mock_error), \ patch('infrastructure.services.n8n_notification_service.logger') as mock_logger: result = await service.send_job_completion_notification( job_id=job_id, status="completed", processing_time=30.0, bearer_token=bearer_token ) # Service should handle error gracefully assert result.acknowledged is False # Verify error was logged but sensitive data was redacted mock_logger.error.assert_called_once() logged_message = mock_logger.error.call_args[0][0] # Bearer token should be redacted in logs assert bearer_token not in logged_message assert "***" in logged_message or "redacted" in logged_message.lower() @pytest.mark.asyncio async def test_concurrent_n8n_notifications_with_different_tokens(self): """Test concurrent N8N notifications with different bearer tokens.""" import asyncio client = N8NClient(self.settings, self.logger) service = N8NNotificationService(client) # Prepare test data for concurrent requests test_cases = [ ("job-1", "token-1"), ("job-2", "token-2"), ("job-3", "token-3"), ("job-4", None), # No token ("job-5", "token-5"), ] # Track the requests made requests_made = [] async def mock_request(**kwargs): requests_made.append(kwargs) mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = {"acknowledged": True} mock_response.text = '{"acknowledged": true}' mock_response.headers = {"content-type": "application/json"} return mock_response with patch.object(client._client, 'request', side_effect=mock_request): # Run all notifications concurrently tasks = [ service.send_job_completion_notification( job_id=job_id, status="completed", processing_time=float(i * 10), bearer_token=token ) for i, (job_id, token) in enumerate(test_cases) ] results = await asyncio.gather(*tasks) # Verify all succeeded assert all(result.acknowledged for result in results) assert len(requests_made) == len(test_cases) # Verify each request had the correct headers for i, (job_id, token) in enumerate(test_cases): request = requests_made[i] expected_headers = {"rowID": job_id} if token: expected_headers["Authorization"] = f"Bearer {token}" assert request["headers"] == expected_headers @pytest.mark.asyncio async def test_n8n_service_resilience_to_network_failures(self): """Test that N8N service is resilient to various network failures.""" client = N8NClient(self.settings, self.logger) service = N8NNotificationService(client) bearer_token = "resilience-test-token" job_id = "resilience-test-job" # Test various network failure scenarios failure_scenarios = [ httpx.ConnectTimeout("Connection timeout"), httpx.ReadTimeout("Read timeout"), httpx.NetworkError("Network unreachable"), httpx.RemoteProtocolError("Protocol error"), ] for i, error in enumerate(failure_scenarios): with patch.object(client._client, 'request', side_effect=error), \ patch('infrastructure.services.n8n_notification_service.logger') as mock_logger: result = await service.send_job_completion_notification( job_id=f"{job_id}-{i}", status="completed", processing_time=25.0, bearer_token=bearer_token ) # Service should handle all network errors gracefully assert result.acknowledged is False # Error should be logged but not leak sensitive data mock_logger.error.assert_called_once() logged_message = mock_logger.error.call_args[0][0] assert bearer_token not in logged_message @pytest.mark.asyncio async def test_n8n_webhook_payload_format_consistency(self): """Test that N8N webhook payload format is consistent.""" client = N8NClient(self.settings, self.logger) service = N8NNotificationService(client) test_scenarios = [ ("job-123", "completed", 45.67), ("job-456", "failed", 12.34), ("job-789", "timeout", 300.0), ("job-abc", "cancelled", 5.5), ] requests_captured = [] async def capture_request(**kwargs): requests_captured.append(kwargs) mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = {"acknowledged": True} mock_response.text = '{"acknowledged": true}' mock_response.headers = {"content-type": "application/json"} return mock_response with patch.object(client._client, 'request', side_effect=capture_request): for job_id, status, processing_time in test_scenarios: await service.send_job_completion_notification( job_id=job_id, status=status, processing_time=processing_time, bearer_token="test-token" ) # Verify payload format consistency for i, (job_id, status, processing_time) in enumerate(test_scenarios): request = requests_captured[i] # Verify standard request structure assert request["method"] == "POST" assert request["url"] == "/lovable-analysis" # Verify payload format payload = request["json"] expected_message = f"Job {job_id} {status} in {processing_time:.2f}s" assert payload["message"] == expected_message # Verify headers format headers = request["headers"] assert headers["rowID"] == job_id assert headers["Authorization"] == "Bearer test-token" @pytest.mark.asyncio async def test_n8n_client_authentication_header_priority(self): """Test that client bearer token takes priority over service token.""" client = N8NClient(self.settings, self.logger) client_bearer_token = "client-priority-token" job_id = "priority-test-job" mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = {"acknowledged": True} mock_response.text = '{"acknowledged": true}' mock_response.headers = {"content-type": "application/json"} with patch.object(client._client, 'request', return_value=mock_response) as mock_request: request_data = WebhooksRequest(message="Test message", job_id=job_id) await client.post_completion_event(request_data, client_bearer_token) # Verify the request was made with client bearer token call_kwargs = mock_request.call_args.kwargs headers = call_kwargs["headers"] # Client token should override service token assert headers["Authorization"] == f"Bearer {client_bearer_token}" assert headers["Authorization"] != f"Bearer {self.settings.token}" class TestN8NIntegrationErrorScenarios: """Test N8N integration error scenarios.""" def setup_method(self): """Set up test fixtures.""" self.settings = ClientSettings( base_url="http://test-n8n.com", token="n8n-service-token" ) self.logger = logging.getLogger("test") @pytest.mark.asyncio async def test_n8n_server_error_handling(self): """Test handling of N8N server errors.""" client = N8NClient(self.settings, self.logger) service = N8NNotificationService(client) # Test various HTTP error responses error_responses = [ (400, "Bad Request"), (401, "Unauthorized"), (403, "Forbidden"), (404, "Not Found"), (500, "Internal Server Error"), (502, "Bad Gateway"), (503, "Service Unavailable"), ] for status_code, error_text in error_responses: mock_response = Mock() mock_response.status_code = status_code mock_response.text = error_text with patch.object(client._client, 'request', return_value=mock_response), \ patch('infrastructure.services.n8n_notification_service.logger') as mock_logger: result = await service.send_job_completion_notification( job_id=f"error-{status_code}-job", status="completed", processing_time=10.0, bearer_token="test-token" ) # Service should handle all HTTP errors gracefully assert result.acknowledged is False # Error should be logged mock_logger.error.assert_called_once() @pytest.mark.asyncio async def test_n8n_malformed_response_handling(self): """Test handling of malformed N8N responses.""" client = N8NClient(self.settings, self.logger) service = N8NNotificationService(client) # Test various malformed responses malformed_responses = [ '{"malformed": json}', # Invalid JSON '{"missing": "acknowledged"}', # Missing acknowledged field '', # Empty response 'plain text response', # Non-JSON response ] for i, response_text in enumerate(malformed_responses): mock_response = Mock() mock_response.status_code = 200 mock_response.text = response_text if "json}" in response_text or response_text == '': mock_response.json.side_effect = ValueError("Invalid JSON") else: mock_response.json.return_value = {} with patch.object(client._client, 'request', return_value=mock_response), \ patch('infrastructure.services.n8n_notification_service.logger'): result = await service.send_job_completion_notification( job_id=f"malformed-{i}-job", status="completed", processing_time=15.0, bearer_token="test-token" ) # Service should handle malformed responses gracefully # Result may be acknowledged=False depending on the response assert isinstance(result.acknowledged, bool)