"""Unit tests for N8N integration with bearer token support.""" import pytest from unittest.mock import Mock, AsyncMock, patch from infrastructure.clients.n8n.n8n_client import N8NClient from infrastructure.clients.n8n.models import WebhooksRequest, WebhooksResponse from infrastructure.clients.n8n.settings import ClientSettings from infrastructure.services.n8n_notification_service import N8NNotificationService from domain.services.notification_service import NotificationRequest, NotificationResponse import logging class TestN8NClientBearerToken: """Test N8N client with bearer token support.""" def setup_method(self): """Set up test fixtures.""" self.settings = ClientSettings( base_url="http://test-n8n.com", token="n8n-token-123" ) self.logger = logging.getLogger("test") self.client = N8NClient(self.settings, self.logger) @pytest.mark.asyncio async def test_post_completion_event_without_bearer_token(self): """Test posting completion event without client bearer token.""" with patch.object(self.client, '_make_request') as mock_request: mock_request.return_value = {"acknowledged": True} request = WebhooksRequest(message="Test message", job_id="job-123") result = await self.client.post_completion_event(request) # Verify call was made correctly mock_request.assert_called_once_with( "POST", "/lovable-analysis", {"message": "Test message"}, {"rowID": "job-123"} ) assert result.acknowledged is True @pytest.mark.asyncio async def test_post_completion_event_with_bearer_token(self): """Test posting completion event with client bearer token.""" with patch.object(self.client, '_make_request') as mock_request: mock_request.return_value = {"acknowledged": True} request = WebhooksRequest(message="Test message", job_id="job-123") bearer_token = "client-bearer-token-xyz" result = await self.client.post_completion_event(request, bearer_token) # Verify call includes both rowID and Authorization headers expected_headers = { "rowID": "job-123", "Authorization": "Bearer client-bearer-token-xyz" } mock_request.assert_called_once_with( "POST", "/lovable-analysis", {"message": "Test message"}, expected_headers ) assert result.acknowledged is True @pytest.mark.asyncio async def test_post_completion_event_with_empty_bearer_token(self): """Test posting completion event with empty bearer token.""" with patch.object(self.client, '_make_request') as mock_request: mock_request.return_value = {"acknowledged": True} request = WebhooksRequest(message="Test message", job_id="job-123") # Empty string should be treated as None result = await self.client.post_completion_event(request, "") # Should only have rowID header, no Authorization mock_request.assert_called_once_with( "POST", "/lovable-analysis", {"message": "Test message"}, {"rowID": "job-123"} ) assert result.acknowledged is True @pytest.mark.asyncio async def test_headers_combination(self): """Test that headers are properly combined.""" with patch.object(self.client, '_make_request') as mock_request: mock_request.return_value = {"acknowledged": False} request = WebhooksRequest(message="Test", job_id="test-job") bearer_token = "test-token" await self.client.post_completion_event(request, bearer_token) # Verify the headers contain both required values call_args = mock_request.call_args headers = call_args[0][3] # Fourth argument is custom_headers assert headers["rowID"] == "test-job" assert headers["Authorization"] == "Bearer test-token" assert len(headers) == 2 # Should only have these two headers class TestN8NNotificationServiceBearerToken: """Test N8N notification service with bearer token support.""" def setup_method(self): """Set up test fixtures.""" self.mock_n8n_client = AsyncMock() self.notification_service = N8NNotificationService(self.mock_n8n_client) @pytest.mark.asyncio async def test_send_notification_without_bearer_token(self): """Test sending notification without bearer token.""" # Setup mock response mock_response = WebhooksResponse(acknowledged=True) self.mock_n8n_client.post_completion_event.return_value = mock_response result = await self.notification_service.send_job_completion_notification( job_id="job-123", status="completed", processing_time=45.5 ) # Verify N8N client was called correctly self.mock_n8n_client.post_completion_event.assert_called_once() call_args = self.mock_n8n_client.post_completion_event.call_args # Check the request data request_data = call_args[0][0] # First positional argument assert request_data.job_id == "job-123" assert "completed" in request_data.message assert "45.50s" in request_data.message # Check bearer token (should be None) bearer_token = call_args[0][1] if len(call_args[0]) > 1 else call_args[1].get('bearer_token') assert bearer_token is None assert result.acknowledged is True @pytest.mark.asyncio async def test_send_notification_with_bearer_token(self): """Test sending notification with bearer token.""" # Setup mock response mock_response = WebhooksResponse(acknowledged=True) self.mock_n8n_client.post_completion_event.return_value = mock_response bearer_token = "client-token-abc123" result = await self.notification_service.send_job_completion_notification( job_id="job-456", status="failed", processing_time=30.2, bearer_token=bearer_token ) # Verify N8N client was called with bearer token self.mock_n8n_client.post_completion_event.assert_called_once() call_args = self.mock_n8n_client.post_completion_event.call_args # Check the request data request_data = call_args[0][0] assert request_data.job_id == "job-456" assert "failed" in request_data.message assert "30.20s" in request_data.message # Check bearer token was passed passed_token = call_args[0][1] if len(call_args[0]) > 1 else call_args[1].get('bearer_token') assert passed_token == bearer_token assert result.acknowledged is True @pytest.mark.asyncio async def test_send_notification_n8n_failure_handling(self): """Test handling of N8N client failures.""" # Setup mock to raise exception self.mock_n8n_client.post_completion_event.side_effect = Exception("N8N connection failed") # Should not raise exception, but return acknowledged=False result = await self.notification_service.send_job_completion_notification( job_id="job-789", status="completed", processing_time=60.0, bearer_token="test-token" ) assert result.acknowledged is False self.mock_n8n_client.post_completion_event.assert_called_once() @pytest.mark.asyncio async def test_notification_message_format(self): """Test that notification messages are formatted correctly.""" mock_response = WebhooksResponse(acknowledged=True) self.mock_n8n_client.post_completion_event.return_value = mock_response await self.notification_service.send_job_completion_notification( job_id="format-test-job", status="completed", processing_time=123.456, bearer_token="format-token" ) # Check message format call_args = self.mock_n8n_client.post_completion_event.call_args request_data = call_args[0][0] expected_message = "Job format-test-job completed in 123.46s" assert request_data.message == expected_message assert request_data.job_id == "format-test-job" class TestBearerTokenFlow: """Test the complete bearer token flow from job to N8N.""" @pytest.mark.asyncio async def test_complete_bearer_token_flow(self): """Test the complete flow of bearer token from job to N8N notification.""" # Mock components mock_job_repo = AsyncMock() mock_n8n_client = AsyncMock() # Setup job record with bearer token mock_job_record = Mock() mock_job_record.bearer_token = "client-auth-token-123" mock_job_repo.get.return_value = mock_job_record # Setup N8N response mock_n8n_client.post_completion_event.return_value = WebhooksResponse(acknowledged=True) # Create notification service notification_service = N8NNotificationService(mock_n8n_client) # Simulate the flow job_id = "test-job-flow" # 1. Get job record (simulating ProcessJobUseCase) job_record = await mock_job_repo.get(job_id) bearer_token = job_record.bearer_token # 2. Send notification with bearer token result = await notification_service.send_job_completion_notification( job_id=job_id, status="completed", processing_time=88.7, bearer_token=bearer_token ) # 3. Clear bearer token (simulating repository cleanup) await mock_job_repo.clear_bearer_token(job_id) # Verify the flow mock_job_repo.get.assert_called_once_with(job_id) mock_n8n_client.post_completion_event.assert_called_once() mock_job_repo.clear_bearer_token.assert_called_once_with(job_id) # Verify N8N call included bearer token call_args = mock_n8n_client.post_completion_event.call_args passed_token = call_args[0][1] if len(call_args[0]) > 1 else call_args[1].get('bearer_token') assert passed_token == "client-auth-token-123" assert result.acknowledged is True @pytest.mark.asyncio async def test_bearer_token_flow_without_token(self): """Test the flow when job has no bearer token.""" # Mock components mock_job_repo = AsyncMock() mock_n8n_client = AsyncMock() # Setup job record without bearer token mock_job_record = Mock() mock_job_record.bearer_token = None mock_job_repo.get.return_value = mock_job_record # Setup N8N response mock_n8n_client.post_completion_event.return_value = WebhooksResponse(acknowledged=True) # Create notification service notification_service = N8NNotificationService(mock_n8n_client) # Simulate the flow job_id = "test-job-no-token" # Get job and send notification job_record = await mock_job_repo.get(job_id) bearer_token = job_record.bearer_token result = await notification_service.send_job_completion_notification( job_id=job_id, status="completed", processing_time=25.3, bearer_token=bearer_token ) # Verify N8N call was made without bearer token call_args = mock_n8n_client.post_completion_event.call_args passed_token = call_args[0][1] if len(call_args[0]) > 1 else call_args[1].get('bearer_token') assert passed_token is None assert result.acknowledged is True def test_notification_service_protocol_compliance(self): """Test that N8NNotificationService implements the protocol correctly.""" from domain.services.notification_service import NotificationService # Create instance mock_client = Mock() service = N8NNotificationService(mock_client) # Verify it implements the protocol assert isinstance(service, NotificationService) # Verify method signature import inspect sig = inspect.signature(service.send_job_completion_notification) params = list(sig.parameters.keys()) expected_params = ['job_id', 'status', 'processing_time', 'bearer_token'] assert params == expected_params # Verify bearer_token has default None bearer_token_param = sig.parameters['bearer_token'] assert bearer_token_param.default is None