|
|
"""Integration tests for N8N notification with bearer token headers.""" |
|
|
import pytest |
|
|
from unittest.mock import Mock, AsyncMock, patch |
|
|
import httpx |
|
|
from infrastructure.clients.n8n.n8n_client import N8NClient |
|
|
from infrastructure.clients.n8n.models import WebhooksRequest, WebhooksResponse |
|
|
from infrastructure.clients.n8n.settings import ClientSettings |
|
|
from infrastructure.services.n8n_notification_service import N8NNotificationService |
|
|
import logging |
|
|
|
|
|
|
|
|
class TestN8NIntegrationWithBearerToken: |
|
|
"""Integration tests for N8N notification with bearer tokens.""" |
|
|
|
|
|
def setup_method(self): |
|
|
"""Set up test fixtures.""" |
|
|
self.settings = ClientSettings( |
|
|
base_url="http://test-n8n.com", |
|
|
token="n8n-service-token" |
|
|
) |
|
|
self.logger = logging.getLogger("test") |
|
|
|
|
|
@pytest.mark.asyncio |
|
|
async def test_complete_n8n_notification_flow_with_bearer_token(self): |
|
|
"""Test complete N8N notification flow including bearer token headers.""" |
|
|
client = N8NClient(self.settings, self.logger) |
|
|
service = N8NNotificationService(client) |
|
|
|
|
|
bearer_token = "client-bearer-token-xyz" |
|
|
job_id = "integration-test-job" |
|
|
|
|
|
|
|
|
mock_response = Mock() |
|
|
mock_response.status_code = 200 |
|
|
mock_response.json.return_value = {"acknowledged": True} |
|
|
mock_response.text = '{"acknowledged": true}' |
|
|
mock_response.headers = {"content-type": "application/json"} |
|
|
|
|
|
with patch.object(client._client, 'request', return_value=mock_response) as mock_request: |
|
|
result = await service.send_job_completion_notification( |
|
|
job_id=job_id, |
|
|
status="completed", |
|
|
processing_time=45.7, |
|
|
bearer_token=bearer_token |
|
|
) |
|
|
|
|
|
|
|
|
assert result.acknowledged is True |
|
|
|
|
|
|
|
|
mock_request.assert_called_once() |
|
|
call_kwargs = mock_request.call_args.kwargs |
|
|
|
|
|
assert call_kwargs["method"] == "POST" |
|
|
assert call_kwargs["url"] == "/lovable-analysis" |
|
|
assert call_kwargs["json"] == {"message": f"Job {job_id} completed in 45.70s"} |
|
|
|
|
|
|
|
|
expected_headers = { |
|
|
"rowID": job_id, |
|
|
"Authorization": f"Bearer {bearer_token}" |
|
|
} |
|
|
assert call_kwargs["headers"] == expected_headers |
|
|
|
|
|
@pytest.mark.asyncio |
|
|
async def test_n8n_notification_without_bearer_token(self): |
|
|
"""Test N8N notification flow without client bearer token.""" |
|
|
client = N8NClient(self.settings, self.logger) |
|
|
service = N8NNotificationService(client) |
|
|
|
|
|
job_id = "no-token-test-job" |
|
|
|
|
|
|
|
|
mock_response = Mock() |
|
|
mock_response.status_code = 200 |
|
|
mock_response.json.return_value = {"acknowledged": True} |
|
|
mock_response.text = '{"acknowledged": true}' |
|
|
mock_response.headers = {"content-type": "application/json"} |
|
|
|
|
|
with patch.object(client._client, 'request', return_value=mock_response) as mock_request: |
|
|
result = await service.send_job_completion_notification( |
|
|
job_id=job_id, |
|
|
status="failed", |
|
|
processing_time=12.3, |
|
|
bearer_token=None |
|
|
) |
|
|
|
|
|
|
|
|
assert result.acknowledged is True |
|
|
|
|
|
|
|
|
mock_request.assert_called_once() |
|
|
call_kwargs = mock_request.call_args.kwargs |
|
|
|
|
|
|
|
|
expected_headers = {"rowID": job_id} |
|
|
assert call_kwargs["headers"] == expected_headers |
|
|
|
|
|
@pytest.mark.asyncio |
|
|
async def test_n8n_client_error_handling_preserves_bearer_token_context(self): |
|
|
"""Test that N8N client errors don't leak bearer token information.""" |
|
|
client = N8NClient(self.settings, self.logger) |
|
|
service = N8NNotificationService(client) |
|
|
|
|
|
bearer_token = "secret-bearer-token-123" |
|
|
job_id = "error-test-job" |
|
|
|
|
|
|
|
|
mock_error = httpx.RequestError( |
|
|
f"Connection failed with bearer token {bearer_token}" |
|
|
) |
|
|
|
|
|
with patch.object(client._client, 'request', side_effect=mock_error), \ |
|
|
patch('infrastructure.services.n8n_notification_service.logger') as mock_logger: |
|
|
|
|
|
result = await service.send_job_completion_notification( |
|
|
job_id=job_id, |
|
|
status="completed", |
|
|
processing_time=30.0, |
|
|
bearer_token=bearer_token |
|
|
) |
|
|
|
|
|
|
|
|
assert result.acknowledged is False |
|
|
|
|
|
|
|
|
mock_logger.error.assert_called_once() |
|
|
logged_message = mock_logger.error.call_args[0][0] |
|
|
|
|
|
|
|
|
assert bearer_token not in logged_message |
|
|
assert "***" in logged_message or "redacted" in logged_message.lower() |
|
|
|
|
|
@pytest.mark.asyncio |
|
|
async def test_concurrent_n8n_notifications_with_different_tokens(self): |
|
|
"""Test concurrent N8N notifications with different bearer tokens.""" |
|
|
import asyncio |
|
|
|
|
|
client = N8NClient(self.settings, self.logger) |
|
|
service = N8NNotificationService(client) |
|
|
|
|
|
|
|
|
test_cases = [ |
|
|
("job-1", "token-1"), |
|
|
("job-2", "token-2"), |
|
|
("job-3", "token-3"), |
|
|
("job-4", None), |
|
|
("job-5", "token-5"), |
|
|
] |
|
|
|
|
|
|
|
|
requests_made = [] |
|
|
|
|
|
async def mock_request(**kwargs): |
|
|
requests_made.append(kwargs) |
|
|
mock_response = Mock() |
|
|
mock_response.status_code = 200 |
|
|
mock_response.json.return_value = {"acknowledged": True} |
|
|
mock_response.text = '{"acknowledged": true}' |
|
|
mock_response.headers = {"content-type": "application/json"} |
|
|
return mock_response |
|
|
|
|
|
with patch.object(client._client, 'request', side_effect=mock_request): |
|
|
|
|
|
tasks = [ |
|
|
service.send_job_completion_notification( |
|
|
job_id=job_id, |
|
|
status="completed", |
|
|
processing_time=float(i * 10), |
|
|
bearer_token=token |
|
|
) |
|
|
for i, (job_id, token) in enumerate(test_cases) |
|
|
] |
|
|
|
|
|
results = await asyncio.gather(*tasks) |
|
|
|
|
|
|
|
|
assert all(result.acknowledged for result in results) |
|
|
assert len(requests_made) == len(test_cases) |
|
|
|
|
|
|
|
|
for i, (job_id, token) in enumerate(test_cases): |
|
|
request = requests_made[i] |
|
|
expected_headers = {"rowID": job_id} |
|
|
|
|
|
if token: |
|
|
expected_headers["Authorization"] = f"Bearer {token}" |
|
|
|
|
|
assert request["headers"] == expected_headers |
|
|
|
|
|
@pytest.mark.asyncio |
|
|
async def test_n8n_service_resilience_to_network_failures(self): |
|
|
"""Test that N8N service is resilient to various network failures.""" |
|
|
client = N8NClient(self.settings, self.logger) |
|
|
service = N8NNotificationService(client) |
|
|
|
|
|
bearer_token = "resilience-test-token" |
|
|
job_id = "resilience-test-job" |
|
|
|
|
|
|
|
|
failure_scenarios = [ |
|
|
httpx.ConnectTimeout("Connection timeout"), |
|
|
httpx.ReadTimeout("Read timeout"), |
|
|
httpx.NetworkError("Network unreachable"), |
|
|
httpx.RemoteProtocolError("Protocol error"), |
|
|
] |
|
|
|
|
|
for i, error in enumerate(failure_scenarios): |
|
|
with patch.object(client._client, 'request', side_effect=error), \ |
|
|
patch('infrastructure.services.n8n_notification_service.logger') as mock_logger: |
|
|
|
|
|
result = await service.send_job_completion_notification( |
|
|
job_id=f"{job_id}-{i}", |
|
|
status="completed", |
|
|
processing_time=25.0, |
|
|
bearer_token=bearer_token |
|
|
) |
|
|
|
|
|
|
|
|
assert result.acknowledged is False |
|
|
|
|
|
|
|
|
mock_logger.error.assert_called_once() |
|
|
logged_message = mock_logger.error.call_args[0][0] |
|
|
assert bearer_token not in logged_message |
|
|
|
|
|
@pytest.mark.asyncio |
|
|
async def test_n8n_webhook_payload_format_consistency(self): |
|
|
"""Test that N8N webhook payload format is consistent.""" |
|
|
client = N8NClient(self.settings, self.logger) |
|
|
service = N8NNotificationService(client) |
|
|
|
|
|
test_scenarios = [ |
|
|
("job-123", "completed", 45.67), |
|
|
("job-456", "failed", 12.34), |
|
|
("job-789", "timeout", 300.0), |
|
|
("job-abc", "cancelled", 5.5), |
|
|
] |
|
|
|
|
|
requests_captured = [] |
|
|
|
|
|
async def capture_request(**kwargs): |
|
|
requests_captured.append(kwargs) |
|
|
mock_response = Mock() |
|
|
mock_response.status_code = 200 |
|
|
mock_response.json.return_value = {"acknowledged": True} |
|
|
mock_response.text = '{"acknowledged": true}' |
|
|
mock_response.headers = {"content-type": "application/json"} |
|
|
return mock_response |
|
|
|
|
|
with patch.object(client._client, 'request', side_effect=capture_request): |
|
|
for job_id, status, processing_time in test_scenarios: |
|
|
await service.send_job_completion_notification( |
|
|
job_id=job_id, |
|
|
status=status, |
|
|
processing_time=processing_time, |
|
|
bearer_token="test-token" |
|
|
) |
|
|
|
|
|
|
|
|
for i, (job_id, status, processing_time) in enumerate(test_scenarios): |
|
|
request = requests_captured[i] |
|
|
|
|
|
|
|
|
assert request["method"] == "POST" |
|
|
assert request["url"] == "/lovable-analysis" |
|
|
|
|
|
|
|
|
payload = request["json"] |
|
|
expected_message = f"Job {job_id} {status} in {processing_time:.2f}s" |
|
|
assert payload["message"] == expected_message |
|
|
|
|
|
|
|
|
headers = request["headers"] |
|
|
assert headers["rowID"] == job_id |
|
|
assert headers["Authorization"] == "Bearer test-token" |
|
|
|
|
|
@pytest.mark.asyncio |
|
|
async def test_n8n_client_authentication_header_priority(self): |
|
|
"""Test that client bearer token takes priority over service token.""" |
|
|
client = N8NClient(self.settings, self.logger) |
|
|
|
|
|
client_bearer_token = "client-priority-token" |
|
|
job_id = "priority-test-job" |
|
|
|
|
|
mock_response = Mock() |
|
|
mock_response.status_code = 200 |
|
|
mock_response.json.return_value = {"acknowledged": True} |
|
|
mock_response.text = '{"acknowledged": true}' |
|
|
mock_response.headers = {"content-type": "application/json"} |
|
|
|
|
|
with patch.object(client._client, 'request', return_value=mock_response) as mock_request: |
|
|
request_data = WebhooksRequest(message="Test message", job_id=job_id) |
|
|
|
|
|
await client.post_completion_event(request_data, client_bearer_token) |
|
|
|
|
|
|
|
|
call_kwargs = mock_request.call_args.kwargs |
|
|
headers = call_kwargs["headers"] |
|
|
|
|
|
|
|
|
assert headers["Authorization"] == f"Bearer {client_bearer_token}" |
|
|
assert headers["Authorization"] != f"Bearer {self.settings.token}" |
|
|
|
|
|
|
|
|
class TestN8NIntegrationErrorScenarios: |
|
|
"""Test N8N integration error scenarios.""" |
|
|
|
|
|
def setup_method(self): |
|
|
"""Set up test fixtures.""" |
|
|
self.settings = ClientSettings( |
|
|
base_url="http://test-n8n.com", |
|
|
token="n8n-service-token" |
|
|
) |
|
|
self.logger = logging.getLogger("test") |
|
|
|
|
|
@pytest.mark.asyncio |
|
|
async def test_n8n_server_error_handling(self): |
|
|
"""Test handling of N8N server errors.""" |
|
|
client = N8NClient(self.settings, self.logger) |
|
|
service = N8NNotificationService(client) |
|
|
|
|
|
|
|
|
error_responses = [ |
|
|
(400, "Bad Request"), |
|
|
(401, "Unauthorized"), |
|
|
(403, "Forbidden"), |
|
|
(404, "Not Found"), |
|
|
(500, "Internal Server Error"), |
|
|
(502, "Bad Gateway"), |
|
|
(503, "Service Unavailable"), |
|
|
] |
|
|
|
|
|
for status_code, error_text in error_responses: |
|
|
mock_response = Mock() |
|
|
mock_response.status_code = status_code |
|
|
mock_response.text = error_text |
|
|
|
|
|
with patch.object(client._client, 'request', return_value=mock_response), \ |
|
|
patch('infrastructure.services.n8n_notification_service.logger') as mock_logger: |
|
|
|
|
|
result = await service.send_job_completion_notification( |
|
|
job_id=f"error-{status_code}-job", |
|
|
status="completed", |
|
|
processing_time=10.0, |
|
|
bearer_token="test-token" |
|
|
) |
|
|
|
|
|
|
|
|
assert result.acknowledged is False |
|
|
|
|
|
|
|
|
mock_logger.error.assert_called_once() |
|
|
|
|
|
@pytest.mark.asyncio |
|
|
async def test_n8n_malformed_response_handling(self): |
|
|
"""Test handling of malformed N8N responses.""" |
|
|
client = N8NClient(self.settings, self.logger) |
|
|
service = N8NNotificationService(client) |
|
|
|
|
|
|
|
|
malformed_responses = [ |
|
|
'{"malformed": json}', |
|
|
'{"missing": "acknowledged"}', |
|
|
'', |
|
|
'plain text response', |
|
|
] |
|
|
|
|
|
for i, response_text in enumerate(malformed_responses): |
|
|
mock_response = Mock() |
|
|
mock_response.status_code = 200 |
|
|
mock_response.text = response_text |
|
|
|
|
|
if "json}" in response_text or response_text == '': |
|
|
mock_response.json.side_effect = ValueError("Invalid JSON") |
|
|
else: |
|
|
mock_response.json.return_value = {} |
|
|
|
|
|
with patch.object(client._client, 'request', return_value=mock_response), \ |
|
|
patch('infrastructure.services.n8n_notification_service.logger'): |
|
|
|
|
|
result = await service.send_job_completion_notification( |
|
|
job_id=f"malformed-{i}-job", |
|
|
status="completed", |
|
|
processing_time=15.0, |
|
|
bearer_token="test-token" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
assert isinstance(result.acknowledged, bool) |