audio-processor / tests /integration /test_n8n_integration.py
tedowski's picture
n8n-improvements (#1)
dbe78dd verified
raw
history blame
16.3 kB
"""Integration tests for N8N notification with bearer token headers."""
import pytest
from unittest.mock import Mock, AsyncMock, patch
import httpx
from infrastructure.clients.n8n.n8n_client import N8NClient
from infrastructure.clients.n8n.models import WebhooksRequest, WebhooksResponse
from infrastructure.clients.n8n.settings import ClientSettings
from infrastructure.services.n8n_notification_service import N8NNotificationService
import logging
class TestN8NIntegrationWithBearerToken:
"""Integration tests for N8N notification with bearer tokens."""
def setup_method(self):
"""Set up test fixtures."""
self.settings = ClientSettings(
base_url="http://test-n8n.com",
token="n8n-service-token"
)
self.logger = logging.getLogger("test")
@pytest.mark.asyncio
async def test_complete_n8n_notification_flow_with_bearer_token(self):
"""Test complete N8N notification flow including bearer token headers."""
client = N8NClient(self.settings, self.logger)
service = N8NNotificationService(client)
bearer_token = "client-bearer-token-xyz"
job_id = "integration-test-job"
# Mock the HTTP response
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {"acknowledged": True}
mock_response.text = '{"acknowledged": true}'
mock_response.headers = {"content-type": "application/json"}
with patch.object(client._client, 'request', return_value=mock_response) as mock_request:
result = await service.send_job_completion_notification(
job_id=job_id,
status="completed",
processing_time=45.7,
bearer_token=bearer_token
)
# Verify the result
assert result.acknowledged is True
# Verify the HTTP request was made correctly
mock_request.assert_called_once()
call_kwargs = mock_request.call_args.kwargs
assert call_kwargs["method"] == "POST"
assert call_kwargs["url"] == "/lovable-analysis"
assert call_kwargs["json"] == {"message": f"Job {job_id} completed in 45.70s"}
# Verify bearer token was included in headers
expected_headers = {
"rowID": job_id,
"Authorization": f"Bearer {bearer_token}"
}
assert call_kwargs["headers"] == expected_headers
@pytest.mark.asyncio
async def test_n8n_notification_without_bearer_token(self):
"""Test N8N notification flow without client bearer token."""
client = N8NClient(self.settings, self.logger)
service = N8NNotificationService(client)
job_id = "no-token-test-job"
# Mock the HTTP response
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {"acknowledged": True}
mock_response.text = '{"acknowledged": true}'
mock_response.headers = {"content-type": "application/json"}
with patch.object(client._client, 'request', return_value=mock_response) as mock_request:
result = await service.send_job_completion_notification(
job_id=job_id,
status="failed",
processing_time=12.3,
bearer_token=None
)
# Verify the result
assert result.acknowledged is True
# Verify the HTTP request was made correctly
mock_request.assert_called_once()
call_kwargs = mock_request.call_args.kwargs
# Verify only rowID header was included (no Authorization header)
expected_headers = {"rowID": job_id}
assert call_kwargs["headers"] == expected_headers
@pytest.mark.asyncio
async def test_n8n_client_error_handling_preserves_bearer_token_context(self):
"""Test that N8N client errors don't leak bearer token information."""
client = N8NClient(self.settings, self.logger)
service = N8NNotificationService(client)
bearer_token = "secret-bearer-token-123"
job_id = "error-test-job"
# Mock HTTP error that includes sensitive information
mock_error = httpx.RequestError(
f"Connection failed with bearer token {bearer_token}"
)
with patch.object(client._client, 'request', side_effect=mock_error), \
patch('infrastructure.services.n8n_notification_service.logger') as mock_logger:
result = await service.send_job_completion_notification(
job_id=job_id,
status="completed",
processing_time=30.0,
bearer_token=bearer_token
)
# Service should handle error gracefully
assert result.acknowledged is False
# Verify error was logged but sensitive data was redacted
mock_logger.error.assert_called_once()
logged_message = mock_logger.error.call_args[0][0]
# Bearer token should be redacted in logs
assert bearer_token not in logged_message
assert "***" in logged_message or "redacted" in logged_message.lower()
@pytest.mark.asyncio
async def test_concurrent_n8n_notifications_with_different_tokens(self):
"""Test concurrent N8N notifications with different bearer tokens."""
import asyncio
client = N8NClient(self.settings, self.logger)
service = N8NNotificationService(client)
# Prepare test data for concurrent requests
test_cases = [
("job-1", "token-1"),
("job-2", "token-2"),
("job-3", "token-3"),
("job-4", None), # No token
("job-5", "token-5"),
]
# Track the requests made
requests_made = []
async def mock_request(**kwargs):
requests_made.append(kwargs)
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {"acknowledged": True}
mock_response.text = '{"acknowledged": true}'
mock_response.headers = {"content-type": "application/json"}
return mock_response
with patch.object(client._client, 'request', side_effect=mock_request):
# Run all notifications concurrently
tasks = [
service.send_job_completion_notification(
job_id=job_id,
status="completed",
processing_time=float(i * 10),
bearer_token=token
)
for i, (job_id, token) in enumerate(test_cases)
]
results = await asyncio.gather(*tasks)
# Verify all succeeded
assert all(result.acknowledged for result in results)
assert len(requests_made) == len(test_cases)
# Verify each request had the correct headers
for i, (job_id, token) in enumerate(test_cases):
request = requests_made[i]
expected_headers = {"rowID": job_id}
if token:
expected_headers["Authorization"] = f"Bearer {token}"
assert request["headers"] == expected_headers
@pytest.mark.asyncio
async def test_n8n_service_resilience_to_network_failures(self):
"""Test that N8N service is resilient to various network failures."""
client = N8NClient(self.settings, self.logger)
service = N8NNotificationService(client)
bearer_token = "resilience-test-token"
job_id = "resilience-test-job"
# Test various network failure scenarios
failure_scenarios = [
httpx.ConnectTimeout("Connection timeout"),
httpx.ReadTimeout("Read timeout"),
httpx.NetworkError("Network unreachable"),
httpx.RemoteProtocolError("Protocol error"),
]
for i, error in enumerate(failure_scenarios):
with patch.object(client._client, 'request', side_effect=error), \
patch('infrastructure.services.n8n_notification_service.logger') as mock_logger:
result = await service.send_job_completion_notification(
job_id=f"{job_id}-{i}",
status="completed",
processing_time=25.0,
bearer_token=bearer_token
)
# Service should handle all network errors gracefully
assert result.acknowledged is False
# Error should be logged but not leak sensitive data
mock_logger.error.assert_called_once()
logged_message = mock_logger.error.call_args[0][0]
assert bearer_token not in logged_message
@pytest.mark.asyncio
async def test_n8n_webhook_payload_format_consistency(self):
"""Test that N8N webhook payload format is consistent."""
client = N8NClient(self.settings, self.logger)
service = N8NNotificationService(client)
test_scenarios = [
("job-123", "completed", 45.67),
("job-456", "failed", 12.34),
("job-789", "timeout", 300.0),
("job-abc", "cancelled", 5.5),
]
requests_captured = []
async def capture_request(**kwargs):
requests_captured.append(kwargs)
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {"acknowledged": True}
mock_response.text = '{"acknowledged": true}'
mock_response.headers = {"content-type": "application/json"}
return mock_response
with patch.object(client._client, 'request', side_effect=capture_request):
for job_id, status, processing_time in test_scenarios:
await service.send_job_completion_notification(
job_id=job_id,
status=status,
processing_time=processing_time,
bearer_token="test-token"
)
# Verify payload format consistency
for i, (job_id, status, processing_time) in enumerate(test_scenarios):
request = requests_captured[i]
# Verify standard request structure
assert request["method"] == "POST"
assert request["url"] == "/lovable-analysis"
# Verify payload format
payload = request["json"]
expected_message = f"Job {job_id} {status} in {processing_time:.2f}s"
assert payload["message"] == expected_message
# Verify headers format
headers = request["headers"]
assert headers["rowID"] == job_id
assert headers["Authorization"] == "Bearer test-token"
@pytest.mark.asyncio
async def test_n8n_client_authentication_header_priority(self):
"""Test that client bearer token takes priority over service token."""
client = N8NClient(self.settings, self.logger)
client_bearer_token = "client-priority-token"
job_id = "priority-test-job"
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {"acknowledged": True}
mock_response.text = '{"acknowledged": true}'
mock_response.headers = {"content-type": "application/json"}
with patch.object(client._client, 'request', return_value=mock_response) as mock_request:
request_data = WebhooksRequest(message="Test message", job_id=job_id)
await client.post_completion_event(request_data, client_bearer_token)
# Verify the request was made with client bearer token
call_kwargs = mock_request.call_args.kwargs
headers = call_kwargs["headers"]
# Client token should override service token
assert headers["Authorization"] == f"Bearer {client_bearer_token}"
assert headers["Authorization"] != f"Bearer {self.settings.token}"
class TestN8NIntegrationErrorScenarios:
"""Test N8N integration error scenarios."""
def setup_method(self):
"""Set up test fixtures."""
self.settings = ClientSettings(
base_url="http://test-n8n.com",
token="n8n-service-token"
)
self.logger = logging.getLogger("test")
@pytest.mark.asyncio
async def test_n8n_server_error_handling(self):
"""Test handling of N8N server errors."""
client = N8NClient(self.settings, self.logger)
service = N8NNotificationService(client)
# Test various HTTP error responses
error_responses = [
(400, "Bad Request"),
(401, "Unauthorized"),
(403, "Forbidden"),
(404, "Not Found"),
(500, "Internal Server Error"),
(502, "Bad Gateway"),
(503, "Service Unavailable"),
]
for status_code, error_text in error_responses:
mock_response = Mock()
mock_response.status_code = status_code
mock_response.text = error_text
with patch.object(client._client, 'request', return_value=mock_response), \
patch('infrastructure.services.n8n_notification_service.logger') as mock_logger:
result = await service.send_job_completion_notification(
job_id=f"error-{status_code}-job",
status="completed",
processing_time=10.0,
bearer_token="test-token"
)
# Service should handle all HTTP errors gracefully
assert result.acknowledged is False
# Error should be logged
mock_logger.error.assert_called_once()
@pytest.mark.asyncio
async def test_n8n_malformed_response_handling(self):
"""Test handling of malformed N8N responses."""
client = N8NClient(self.settings, self.logger)
service = N8NNotificationService(client)
# Test various malformed responses
malformed_responses = [
'{"malformed": json}', # Invalid JSON
'{"missing": "acknowledged"}', # Missing acknowledged field
'', # Empty response
'plain text response', # Non-JSON response
]
for i, response_text in enumerate(malformed_responses):
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = response_text
if "json}" in response_text or response_text == '':
mock_response.json.side_effect = ValueError("Invalid JSON")
else:
mock_response.json.return_value = {}
with patch.object(client._client, 'request', return_value=mock_response), \
patch('infrastructure.services.n8n_notification_service.logger'):
result = await service.send_job_completion_notification(
job_id=f"malformed-{i}-job",
status="completed",
processing_time=15.0,
bearer_token="test-token"
)
# Service should handle malformed responses gracefully
# Result may be acknowledged=False depending on the response
assert isinstance(result.acknowledged, bool)