Spaces:
Sleeping
Sleeping
| """Unit tests for N8N integration with bearer token support.""" | |
| import pytest | |
| from unittest.mock import Mock, AsyncMock, patch | |
| from infrastructure.clients.n8n.n8n_client import N8NClient | |
| from infrastructure.clients.n8n.models import WebhooksRequest, WebhooksResponse | |
| from infrastructure.clients.n8n.settings import ClientSettings | |
| from infrastructure.services.n8n_notification_service import N8NNotificationService | |
| from domain.services.notification_service import NotificationRequest, NotificationResponse | |
| import logging | |
| class TestN8NClientBearerToken: | |
| """Test N8N client with bearer token support.""" | |
| def setup_method(self): | |
| """Set up test fixtures.""" | |
| self.settings = ClientSettings( | |
| base_url="http://test-n8n.com", | |
| token="n8n-token-123" | |
| ) | |
| self.logger = logging.getLogger("test") | |
| self.client = N8NClient(self.settings, self.logger) | |
| async def test_post_completion_event_without_bearer_token(self): | |
| """Test posting completion event without client bearer token.""" | |
| with patch.object(self.client, '_make_request') as mock_request: | |
| mock_request.return_value = {"acknowledged": True} | |
| request = WebhooksRequest(message="Test message", job_id="job-123") | |
| result = await self.client.post_completion_event(request) | |
| # Verify call was made correctly | |
| mock_request.assert_called_once_with( | |
| "POST", | |
| "/lovable-analysis", | |
| {"message": "Test message"}, | |
| {"rowID": "job-123"} | |
| ) | |
| assert result.acknowledged is True | |
| async def test_post_completion_event_with_bearer_token(self): | |
| """Test posting completion event with client bearer token.""" | |
| with patch.object(self.client, '_make_request') as mock_request: | |
| mock_request.return_value = {"acknowledged": True} | |
| request = WebhooksRequest(message="Test message", job_id="job-123") | |
| bearer_token = "client-bearer-token-xyz" | |
| result = await self.client.post_completion_event(request, bearer_token) | |
| # Verify call includes both rowID and Authorization headers | |
| expected_headers = { | |
| "rowID": "job-123", | |
| "Authorization": "Bearer client-bearer-token-xyz" | |
| } | |
| mock_request.assert_called_once_with( | |
| "POST", | |
| "/lovable-analysis", | |
| {"message": "Test message"}, | |
| expected_headers | |
| ) | |
| assert result.acknowledged is True | |
| async def test_post_completion_event_with_empty_bearer_token(self): | |
| """Test posting completion event with empty bearer token.""" | |
| with patch.object(self.client, '_make_request') as mock_request: | |
| mock_request.return_value = {"acknowledged": True} | |
| request = WebhooksRequest(message="Test message", job_id="job-123") | |
| # Empty string should be treated as None | |
| result = await self.client.post_completion_event(request, "") | |
| # Should only have rowID header, no Authorization | |
| mock_request.assert_called_once_with( | |
| "POST", | |
| "/lovable-analysis", | |
| {"message": "Test message"}, | |
| {"rowID": "job-123"} | |
| ) | |
| assert result.acknowledged is True | |
| async def test_headers_combination(self): | |
| """Test that headers are properly combined.""" | |
| with patch.object(self.client, '_make_request') as mock_request: | |
| mock_request.return_value = {"acknowledged": False} | |
| request = WebhooksRequest(message="Test", job_id="test-job") | |
| bearer_token = "test-token" | |
| await self.client.post_completion_event(request, bearer_token) | |
| # Verify the headers contain both required values | |
| call_args = mock_request.call_args | |
| headers = call_args[0][3] # Fourth argument is custom_headers | |
| assert headers["rowID"] == "test-job" | |
| assert headers["Authorization"] == "Bearer test-token" | |
| assert len(headers) == 2 # Should only have these two headers | |
| class TestN8NNotificationServiceBearerToken: | |
| """Test N8N notification service with bearer token support.""" | |
| def setup_method(self): | |
| """Set up test fixtures.""" | |
| self.mock_n8n_client = AsyncMock() | |
| self.notification_service = N8NNotificationService(self.mock_n8n_client) | |
| async def test_send_notification_without_bearer_token(self): | |
| """Test sending notification without bearer token.""" | |
| # Setup mock response | |
| mock_response = WebhooksResponse(acknowledged=True) | |
| self.mock_n8n_client.post_completion_event.return_value = mock_response | |
| result = await self.notification_service.send_job_completion_notification( | |
| job_id="job-123", | |
| status="completed", | |
| processing_time=45.5 | |
| ) | |
| # Verify N8N client was called correctly | |
| self.mock_n8n_client.post_completion_event.assert_called_once() | |
| call_args = self.mock_n8n_client.post_completion_event.call_args | |
| # Check the request data | |
| request_data = call_args[0][0] # First positional argument | |
| assert request_data.job_id == "job-123" | |
| assert "completed" in request_data.message | |
| assert "45.50s" in request_data.message | |
| # Check bearer token (should be None) | |
| bearer_token = call_args[0][1] if len(call_args[0]) > 1 else call_args[1].get('bearer_token') | |
| assert bearer_token is None | |
| assert result.acknowledged is True | |
| async def test_send_notification_with_bearer_token(self): | |
| """Test sending notification with bearer token.""" | |
| # Setup mock response | |
| mock_response = WebhooksResponse(acknowledged=True) | |
| self.mock_n8n_client.post_completion_event.return_value = mock_response | |
| bearer_token = "client-token-abc123" | |
| result = await self.notification_service.send_job_completion_notification( | |
| job_id="job-456", | |
| status="failed", | |
| processing_time=30.2, | |
| bearer_token=bearer_token | |
| ) | |
| # Verify N8N client was called with bearer token | |
| self.mock_n8n_client.post_completion_event.assert_called_once() | |
| call_args = self.mock_n8n_client.post_completion_event.call_args | |
| # Check the request data | |
| request_data = call_args[0][0] | |
| assert request_data.job_id == "job-456" | |
| assert "failed" in request_data.message | |
| assert "30.20s" in request_data.message | |
| # Check bearer token was passed | |
| passed_token = call_args[0][1] if len(call_args[0]) > 1 else call_args[1].get('bearer_token') | |
| assert passed_token == bearer_token | |
| assert result.acknowledged is True | |
| async def test_send_notification_n8n_failure_handling(self): | |
| """Test handling of N8N client failures.""" | |
| # Setup mock to raise exception | |
| self.mock_n8n_client.post_completion_event.side_effect = Exception("N8N connection failed") | |
| # Should not raise exception, but return acknowledged=False | |
| result = await self.notification_service.send_job_completion_notification( | |
| job_id="job-789", | |
| status="completed", | |
| processing_time=60.0, | |
| bearer_token="test-token" | |
| ) | |
| assert result.acknowledged is False | |
| self.mock_n8n_client.post_completion_event.assert_called_once() | |
| async def test_notification_message_format(self): | |
| """Test that notification messages are formatted correctly.""" | |
| mock_response = WebhooksResponse(acknowledged=True) | |
| self.mock_n8n_client.post_completion_event.return_value = mock_response | |
| await self.notification_service.send_job_completion_notification( | |
| job_id="format-test-job", | |
| status="completed", | |
| processing_time=123.456, | |
| bearer_token="format-token" | |
| ) | |
| # Check message format | |
| call_args = self.mock_n8n_client.post_completion_event.call_args | |
| request_data = call_args[0][0] | |
| expected_message = "Job format-test-job completed in 123.46s" | |
| assert request_data.message == expected_message | |
| assert request_data.job_id == "format-test-job" | |
| class TestBearerTokenFlow: | |
| """Test the complete bearer token flow from job to N8N.""" | |
| async def test_complete_bearer_token_flow(self): | |
| """Test the complete flow of bearer token from job to N8N notification.""" | |
| # Mock components | |
| mock_job_repo = AsyncMock() | |
| mock_n8n_client = AsyncMock() | |
| # Setup job record with bearer token | |
| mock_job_record = Mock() | |
| mock_job_record.bearer_token = "client-auth-token-123" | |
| mock_job_repo.get.return_value = mock_job_record | |
| # Setup N8N response | |
| mock_n8n_client.post_completion_event.return_value = WebhooksResponse(acknowledged=True) | |
| # Create notification service | |
| notification_service = N8NNotificationService(mock_n8n_client) | |
| # Simulate the flow | |
| job_id = "test-job-flow" | |
| # 1. Get job record (simulating ProcessJobUseCase) | |
| job_record = await mock_job_repo.get(job_id) | |
| bearer_token = job_record.bearer_token | |
| # 2. Send notification with bearer token | |
| result = await notification_service.send_job_completion_notification( | |
| job_id=job_id, | |
| status="completed", | |
| processing_time=88.7, | |
| bearer_token=bearer_token | |
| ) | |
| # 3. Clear bearer token (simulating repository cleanup) | |
| await mock_job_repo.clear_bearer_token(job_id) | |
| # Verify the flow | |
| mock_job_repo.get.assert_called_once_with(job_id) | |
| mock_n8n_client.post_completion_event.assert_called_once() | |
| mock_job_repo.clear_bearer_token.assert_called_once_with(job_id) | |
| # Verify N8N call included bearer token | |
| call_args = mock_n8n_client.post_completion_event.call_args | |
| passed_token = call_args[0][1] if len(call_args[0]) > 1 else call_args[1].get('bearer_token') | |
| assert passed_token == "client-auth-token-123" | |
| assert result.acknowledged is True | |
| async def test_bearer_token_flow_without_token(self): | |
| """Test the flow when job has no bearer token.""" | |
| # Mock components | |
| mock_job_repo = AsyncMock() | |
| mock_n8n_client = AsyncMock() | |
| # Setup job record without bearer token | |
| mock_job_record = Mock() | |
| mock_job_record.bearer_token = None | |
| mock_job_repo.get.return_value = mock_job_record | |
| # Setup N8N response | |
| mock_n8n_client.post_completion_event.return_value = WebhooksResponse(acknowledged=True) | |
| # Create notification service | |
| notification_service = N8NNotificationService(mock_n8n_client) | |
| # Simulate the flow | |
| job_id = "test-job-no-token" | |
| # Get job and send notification | |
| job_record = await mock_job_repo.get(job_id) | |
| bearer_token = job_record.bearer_token | |
| result = await notification_service.send_job_completion_notification( | |
| job_id=job_id, | |
| status="completed", | |
| processing_time=25.3, | |
| bearer_token=bearer_token | |
| ) | |
| # Verify N8N call was made without bearer token | |
| call_args = mock_n8n_client.post_completion_event.call_args | |
| passed_token = call_args[0][1] if len(call_args[0]) > 1 else call_args[1].get('bearer_token') | |
| assert passed_token is None | |
| assert result.acknowledged is True | |
| def test_notification_service_protocol_compliance(self): | |
| """Test that N8NNotificationService implements the protocol correctly.""" | |
| from domain.services.notification_service import NotificationService | |
| # Create instance | |
| mock_client = Mock() | |
| service = N8NNotificationService(mock_client) | |
| # Verify it implements the protocol | |
| assert isinstance(service, NotificationService) | |
| # Verify method signature | |
| import inspect | |
| sig = inspect.signature(service.send_job_completion_notification) | |
| params = list(sig.parameters.keys()) | |
| expected_params = ['job_id', 'status', 'processing_time', 'bearer_token'] | |
| assert params == expected_params | |
| # Verify bearer_token has default None | |
| bearer_token_param = sig.parameters['bearer_token'] | |
| assert bearer_token_param.default is None |