audio-processor / tests /integration /test_n8n_integration.py
vitek
add bearer token to endpoints
acb0ec6
raw
history blame
16.3 kB
"""Integration tests for N8N notification with bearer token headers."""
import pytest
from unittest.mock import Mock, AsyncMock, patch
import httpx
from infrastructure.clients.n8n.n8n_client import N8NClient
from infrastructure.clients.n8n.models import WebhooksRequest, WebhooksResponse
from infrastructure.clients.n8n.settings import ClientSettings
from infrastructure.services.n8n_notification_service import N8NNotificationService
import logging
class TestN8NIntegrationWithBearerToken:
"""Integration tests for N8N notification with bearer tokens."""
def setup_method(self):
"""Set up test fixtures."""
self.settings = ClientSettings(
base_url="http://test-n8n.com",
token="n8n-service-token"
)
self.logger = logging.getLogger("test")
@pytest.mark.asyncio
async def test_complete_n8n_notification_flow_with_bearer_token(self):
"""Test complete N8N notification flow including bearer token headers."""
client = N8NClient(self.settings, self.logger)
service = N8NNotificationService(client)
bearer_token = "client-bearer-token-xyz"
job_id = "integration-test-job"
# Mock the HTTP response
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {"acknowledged": True}
mock_response.text = '{"acknowledged": true}'
mock_response.headers = {"content-type": "application/json"}
with patch.object(client._client, 'request', return_value=mock_response) as mock_request:
result = await service.send_job_completion_notification(
job_id=job_id,
status="completed",
processing_time=45.7,
bearer_token=bearer_token
)
# Verify the result
assert result.acknowledged is True
# Verify the HTTP request was made correctly
mock_request.assert_called_once()
call_kwargs = mock_request.call_args.kwargs
assert call_kwargs["method"] == "POST"
assert call_kwargs["url"] == "/lovable-analysis"
assert call_kwargs["json"] == {"message": f"Job {job_id} completed in 45.70s"}
# Verify bearer token was included in headers
expected_headers = {
"rowID": job_id,
"Authorization": f"Bearer {bearer_token}"
}
assert call_kwargs["headers"] == expected_headers
@pytest.mark.asyncio
async def test_n8n_notification_without_bearer_token(self):
"""Test N8N notification flow without client bearer token."""
client = N8NClient(self.settings, self.logger)
service = N8NNotificationService(client)
job_id = "no-token-test-job"
# Mock the HTTP response
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {"acknowledged": True}
mock_response.text = '{"acknowledged": true}'
mock_response.headers = {"content-type": "application/json"}
with patch.object(client._client, 'request', return_value=mock_response) as mock_request:
result = await service.send_job_completion_notification(
job_id=job_id,
status="failed",
processing_time=12.3,
bearer_token=None
)
# Verify the result
assert result.acknowledged is True
# Verify the HTTP request was made correctly
mock_request.assert_called_once()
call_kwargs = mock_request.call_args.kwargs
# Verify only rowID header was included (no Authorization header)
expected_headers = {"rowID": job_id}
assert call_kwargs["headers"] == expected_headers
@pytest.mark.asyncio
async def test_n8n_client_error_handling_preserves_bearer_token_context(self):
"""Test that N8N client errors don't leak bearer token information."""
client = N8NClient(self.settings, self.logger)
service = N8NNotificationService(client)
bearer_token = "secret-bearer-token-123"
job_id = "error-test-job"
# Mock HTTP error that includes sensitive information
mock_error = httpx.RequestError(
f"Connection failed with bearer token {bearer_token}"
)
with patch.object(client._client, 'request', side_effect=mock_error), \
patch('infrastructure.services.n8n_notification_service.logger') as mock_logger:
result = await service.send_job_completion_notification(
job_id=job_id,
status="completed",
processing_time=30.0,
bearer_token=bearer_token
)
# Service should handle error gracefully
assert result.acknowledged is False
# Verify error was logged but sensitive data was redacted
mock_logger.error.assert_called_once()
logged_message = mock_logger.error.call_args[0][0]
# Bearer token should be redacted in logs
assert bearer_token not in logged_message
assert "***" in logged_message or "redacted" in logged_message.lower()
@pytest.mark.asyncio
async def test_concurrent_n8n_notifications_with_different_tokens(self):
"""Test concurrent N8N notifications with different bearer tokens."""
import asyncio
client = N8NClient(self.settings, self.logger)
service = N8NNotificationService(client)
# Prepare test data for concurrent requests
test_cases = [
("job-1", "token-1"),
("job-2", "token-2"),
("job-3", "token-3"),
("job-4", None), # No token
("job-5", "token-5"),
]
# Track the requests made
requests_made = []
async def mock_request(**kwargs):
requests_made.append(kwargs)
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {"acknowledged": True}
mock_response.text = '{"acknowledged": true}'
mock_response.headers = {"content-type": "application/json"}
return mock_response
with patch.object(client._client, 'request', side_effect=mock_request):
# Run all notifications concurrently
tasks = [
service.send_job_completion_notification(
job_id=job_id,
status="completed",
processing_time=float(i * 10),
bearer_token=token
)
for i, (job_id, token) in enumerate(test_cases)
]
results = await asyncio.gather(*tasks)
# Verify all succeeded
assert all(result.acknowledged for result in results)
assert len(requests_made) == len(test_cases)
# Verify each request had the correct headers
for i, (job_id, token) in enumerate(test_cases):
request = requests_made[i]
expected_headers = {"rowID": job_id}
if token:
expected_headers["Authorization"] = f"Bearer {token}"
assert request["headers"] == expected_headers
@pytest.mark.asyncio
async def test_n8n_service_resilience_to_network_failures(self):
"""Test that N8N service is resilient to various network failures."""
client = N8NClient(self.settings, self.logger)
service = N8NNotificationService(client)
bearer_token = "resilience-test-token"
job_id = "resilience-test-job"
# Test various network failure scenarios
failure_scenarios = [
httpx.ConnectTimeout("Connection timeout"),
httpx.ReadTimeout("Read timeout"),
httpx.NetworkError("Network unreachable"),
httpx.RemoteProtocolError("Protocol error"),
]
for i, error in enumerate(failure_scenarios):
with patch.object(client._client, 'request', side_effect=error), \
patch('infrastructure.services.n8n_notification_service.logger') as mock_logger:
result = await service.send_job_completion_notification(
job_id=f"{job_id}-{i}",
status="completed",
processing_time=25.0,
bearer_token=bearer_token
)
# Service should handle all network errors gracefully
assert result.acknowledged is False
# Error should be logged but not leak sensitive data
mock_logger.error.assert_called_once()
logged_message = mock_logger.error.call_args[0][0]
assert bearer_token not in logged_message
@pytest.mark.asyncio
async def test_n8n_webhook_payload_format_consistency(self):
"""Test that N8N webhook payload format is consistent."""
client = N8NClient(self.settings, self.logger)
service = N8NNotificationService(client)
test_scenarios = [
("job-123", "completed", 45.67),
("job-456", "failed", 12.34),
("job-789", "timeout", 300.0),
("job-abc", "cancelled", 5.5),
]
requests_captured = []
async def capture_request(**kwargs):
requests_captured.append(kwargs)
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {"acknowledged": True}
mock_response.text = '{"acknowledged": true}'
mock_response.headers = {"content-type": "application/json"}
return mock_response
with patch.object(client._client, 'request', side_effect=capture_request):
for job_id, status, processing_time in test_scenarios:
await service.send_job_completion_notification(
job_id=job_id,
status=status,
processing_time=processing_time,
bearer_token="test-token"
)
# Verify payload format consistency
for i, (job_id, status, processing_time) in enumerate(test_scenarios):
request = requests_captured[i]
# Verify standard request structure
assert request["method"] == "POST"
assert request["url"] == "/lovable-analysis"
# Verify payload format
payload = request["json"]
expected_message = f"Job {job_id} {status} in {processing_time:.2f}s"
assert payload["message"] == expected_message
# Verify headers format
headers = request["headers"]
assert headers["rowID"] == job_id
assert headers["Authorization"] == "Bearer test-token"
@pytest.mark.asyncio
async def test_n8n_client_authentication_header_priority(self):
"""Test that client bearer token takes priority over service token."""
client = N8NClient(self.settings, self.logger)
client_bearer_token = "client-priority-token"
job_id = "priority-test-job"
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {"acknowledged": True}
mock_response.text = '{"acknowledged": true}'
mock_response.headers = {"content-type": "application/json"}
with patch.object(client._client, 'request', return_value=mock_response) as mock_request:
request_data = WebhooksRequest(message="Test message", job_id=job_id)
await client.post_completion_event(request_data, client_bearer_token)
# Verify the request was made with client bearer token
call_kwargs = mock_request.call_args.kwargs
headers = call_kwargs["headers"]
# Client token should override service token
assert headers["Authorization"] == f"Bearer {client_bearer_token}"
assert headers["Authorization"] != f"Bearer {self.settings.token}"
class TestN8NIntegrationErrorScenarios:
"""Test N8N integration error scenarios."""
def setup_method(self):
"""Set up test fixtures."""
self.settings = ClientSettings(
base_url="http://test-n8n.com",
token="n8n-service-token"
)
self.logger = logging.getLogger("test")
@pytest.mark.asyncio
async def test_n8n_server_error_handling(self):
"""Test handling of N8N server errors."""
client = N8NClient(self.settings, self.logger)
service = N8NNotificationService(client)
# Test various HTTP error responses
error_responses = [
(400, "Bad Request"),
(401, "Unauthorized"),
(403, "Forbidden"),
(404, "Not Found"),
(500, "Internal Server Error"),
(502, "Bad Gateway"),
(503, "Service Unavailable"),
]
for status_code, error_text in error_responses:
mock_response = Mock()
mock_response.status_code = status_code
mock_response.text = error_text
with patch.object(client._client, 'request', return_value=mock_response), \
patch('infrastructure.services.n8n_notification_service.logger') as mock_logger:
result = await service.send_job_completion_notification(
job_id=f"error-{status_code}-job",
status="completed",
processing_time=10.0,
bearer_token="test-token"
)
# Service should handle all HTTP errors gracefully
assert result.acknowledged is False
# Error should be logged
mock_logger.error.assert_called_once()
@pytest.mark.asyncio
async def test_n8n_malformed_response_handling(self):
"""Test handling of malformed N8N responses."""
client = N8NClient(self.settings, self.logger)
service = N8NNotificationService(client)
# Test various malformed responses
malformed_responses = [
'{"malformed": json}', # Invalid JSON
'{"missing": "acknowledged"}', # Missing acknowledged field
'', # Empty response
'plain text response', # Non-JSON response
]
for i, response_text in enumerate(malformed_responses):
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = response_text
if "json}" in response_text or response_text == '':
mock_response.json.side_effect = ValueError("Invalid JSON")
else:
mock_response.json.return_value = {}
with patch.object(client._client, 'request', return_value=mock_response), \
patch('infrastructure.services.n8n_notification_service.logger'):
result = await service.send_job_completion_notification(
job_id=f"malformed-{i}-job",
status="completed",
processing_time=15.0,
bearer_token="test-token"
)
# Service should handle malformed responses gracefully
# Result may be acknowledged=False depending on the response
assert isinstance(result.acknowledged, bool)