audio-processor / test_n8n_integration_bearer_token.py
vitek
add bearer token to endpoints
acb0ec6
raw
history blame
13.4 kB
"""Unit tests for N8N integration with bearer token support."""
import pytest
from unittest.mock import Mock, AsyncMock, patch
from infrastructure.clients.n8n.n8n_client import N8NClient
from infrastructure.clients.n8n.models import WebhooksRequest, WebhooksResponse
from infrastructure.clients.n8n.settings import ClientSettings
from infrastructure.services.n8n_notification_service import N8NNotificationService
from domain.services.notification_service import NotificationRequest, NotificationResponse
import logging
class TestN8NClientBearerToken:
"""Test N8N client with bearer token support."""
def setup_method(self):
"""Set up test fixtures."""
self.settings = ClientSettings(
base_url="http://test-n8n.com",
token="n8n-token-123"
)
self.logger = logging.getLogger("test")
self.client = N8NClient(self.settings, self.logger)
@pytest.mark.asyncio
async def test_post_completion_event_without_bearer_token(self):
"""Test posting completion event without client bearer token."""
with patch.object(self.client, '_make_request') as mock_request:
mock_request.return_value = {"acknowledged": True}
request = WebhooksRequest(message="Test message", job_id="job-123")
result = await self.client.post_completion_event(request)
# Verify call was made correctly
mock_request.assert_called_once_with(
"POST",
"/lovable-analysis",
{"message": "Test message"},
{"rowID": "job-123"}
)
assert result.acknowledged is True
@pytest.mark.asyncio
async def test_post_completion_event_with_bearer_token(self):
"""Test posting completion event with client bearer token."""
with patch.object(self.client, '_make_request') as mock_request:
mock_request.return_value = {"acknowledged": True}
request = WebhooksRequest(message="Test message", job_id="job-123")
bearer_token = "client-bearer-token-xyz"
result = await self.client.post_completion_event(request, bearer_token)
# Verify call includes both rowID and Authorization headers
expected_headers = {
"rowID": "job-123",
"Authorization": "Bearer client-bearer-token-xyz"
}
mock_request.assert_called_once_with(
"POST",
"/lovable-analysis",
{"message": "Test message"},
expected_headers
)
assert result.acknowledged is True
@pytest.mark.asyncio
async def test_post_completion_event_with_empty_bearer_token(self):
"""Test posting completion event with empty bearer token."""
with patch.object(self.client, '_make_request') as mock_request:
mock_request.return_value = {"acknowledged": True}
request = WebhooksRequest(message="Test message", job_id="job-123")
# Empty string should be treated as None
result = await self.client.post_completion_event(request, "")
# Should only have rowID header, no Authorization
mock_request.assert_called_once_with(
"POST",
"/lovable-analysis",
{"message": "Test message"},
{"rowID": "job-123"}
)
assert result.acknowledged is True
@pytest.mark.asyncio
async def test_headers_combination(self):
"""Test that headers are properly combined."""
with patch.object(self.client, '_make_request') as mock_request:
mock_request.return_value = {"acknowledged": False}
request = WebhooksRequest(message="Test", job_id="test-job")
bearer_token = "test-token"
await self.client.post_completion_event(request, bearer_token)
# Verify the headers contain both required values
call_args = mock_request.call_args
headers = call_args[0][3] # Fourth argument is custom_headers
assert headers["rowID"] == "test-job"
assert headers["Authorization"] == "Bearer test-token"
assert len(headers) == 2 # Should only have these two headers
class TestN8NNotificationServiceBearerToken:
"""Test N8N notification service with bearer token support."""
def setup_method(self):
"""Set up test fixtures."""
self.mock_n8n_client = AsyncMock()
self.notification_service = N8NNotificationService(self.mock_n8n_client)
@pytest.mark.asyncio
async def test_send_notification_without_bearer_token(self):
"""Test sending notification without bearer token."""
# Setup mock response
mock_response = WebhooksResponse(acknowledged=True)
self.mock_n8n_client.post_completion_event.return_value = mock_response
result = await self.notification_service.send_job_completion_notification(
job_id="job-123",
status="completed",
processing_time=45.5
)
# Verify N8N client was called correctly
self.mock_n8n_client.post_completion_event.assert_called_once()
call_args = self.mock_n8n_client.post_completion_event.call_args
# Check the request data
request_data = call_args[0][0] # First positional argument
assert request_data.job_id == "job-123"
assert "completed" in request_data.message
assert "45.50s" in request_data.message
# Check bearer token (should be None)
bearer_token = call_args[0][1] if len(call_args[0]) > 1 else call_args[1].get('bearer_token')
assert bearer_token is None
assert result.acknowledged is True
@pytest.mark.asyncio
async def test_send_notification_with_bearer_token(self):
"""Test sending notification with bearer token."""
# Setup mock response
mock_response = WebhooksResponse(acknowledged=True)
self.mock_n8n_client.post_completion_event.return_value = mock_response
bearer_token = "client-token-abc123"
result = await self.notification_service.send_job_completion_notification(
job_id="job-456",
status="failed",
processing_time=30.2,
bearer_token=bearer_token
)
# Verify N8N client was called with bearer token
self.mock_n8n_client.post_completion_event.assert_called_once()
call_args = self.mock_n8n_client.post_completion_event.call_args
# Check the request data
request_data = call_args[0][0]
assert request_data.job_id == "job-456"
assert "failed" in request_data.message
assert "30.20s" in request_data.message
# Check bearer token was passed
passed_token = call_args[0][1] if len(call_args[0]) > 1 else call_args[1].get('bearer_token')
assert passed_token == bearer_token
assert result.acknowledged is True
@pytest.mark.asyncio
async def test_send_notification_n8n_failure_handling(self):
"""Test handling of N8N client failures."""
# Setup mock to raise exception
self.mock_n8n_client.post_completion_event.side_effect = Exception("N8N connection failed")
# Should not raise exception, but return acknowledged=False
result = await self.notification_service.send_job_completion_notification(
job_id="job-789",
status="completed",
processing_time=60.0,
bearer_token="test-token"
)
assert result.acknowledged is False
self.mock_n8n_client.post_completion_event.assert_called_once()
@pytest.mark.asyncio
async def test_notification_message_format(self):
"""Test that notification messages are formatted correctly."""
mock_response = WebhooksResponse(acknowledged=True)
self.mock_n8n_client.post_completion_event.return_value = mock_response
await self.notification_service.send_job_completion_notification(
job_id="format-test-job",
status="completed",
processing_time=123.456,
bearer_token="format-token"
)
# Check message format
call_args = self.mock_n8n_client.post_completion_event.call_args
request_data = call_args[0][0]
expected_message = "Job format-test-job completed in 123.46s"
assert request_data.message == expected_message
assert request_data.job_id == "format-test-job"
class TestBearerTokenFlow:
"""Test the complete bearer token flow from job to N8N."""
@pytest.mark.asyncio
async def test_complete_bearer_token_flow(self):
"""Test the complete flow of bearer token from job to N8N notification."""
# Mock components
mock_job_repo = AsyncMock()
mock_n8n_client = AsyncMock()
# Setup job record with bearer token
mock_job_record = Mock()
mock_job_record.bearer_token = "client-auth-token-123"
mock_job_repo.get.return_value = mock_job_record
# Setup N8N response
mock_n8n_client.post_completion_event.return_value = WebhooksResponse(acknowledged=True)
# Create notification service
notification_service = N8NNotificationService(mock_n8n_client)
# Simulate the flow
job_id = "test-job-flow"
# 1. Get job record (simulating ProcessJobUseCase)
job_record = await mock_job_repo.get(job_id)
bearer_token = job_record.bearer_token
# 2. Send notification with bearer token
result = await notification_service.send_job_completion_notification(
job_id=job_id,
status="completed",
processing_time=88.7,
bearer_token=bearer_token
)
# 3. Clear bearer token (simulating repository cleanup)
await mock_job_repo.clear_bearer_token(job_id)
# Verify the flow
mock_job_repo.get.assert_called_once_with(job_id)
mock_n8n_client.post_completion_event.assert_called_once()
mock_job_repo.clear_bearer_token.assert_called_once_with(job_id)
# Verify N8N call included bearer token
call_args = mock_n8n_client.post_completion_event.call_args
passed_token = call_args[0][1] if len(call_args[0]) > 1 else call_args[1].get('bearer_token')
assert passed_token == "client-auth-token-123"
assert result.acknowledged is True
@pytest.mark.asyncio
async def test_bearer_token_flow_without_token(self):
"""Test the flow when job has no bearer token."""
# Mock components
mock_job_repo = AsyncMock()
mock_n8n_client = AsyncMock()
# Setup job record without bearer token
mock_job_record = Mock()
mock_job_record.bearer_token = None
mock_job_repo.get.return_value = mock_job_record
# Setup N8N response
mock_n8n_client.post_completion_event.return_value = WebhooksResponse(acknowledged=True)
# Create notification service
notification_service = N8NNotificationService(mock_n8n_client)
# Simulate the flow
job_id = "test-job-no-token"
# Get job and send notification
job_record = await mock_job_repo.get(job_id)
bearer_token = job_record.bearer_token
result = await notification_service.send_job_completion_notification(
job_id=job_id,
status="completed",
processing_time=25.3,
bearer_token=bearer_token
)
# Verify N8N call was made without bearer token
call_args = mock_n8n_client.post_completion_event.call_args
passed_token = call_args[0][1] if len(call_args[0]) > 1 else call_args[1].get('bearer_token')
assert passed_token is None
assert result.acknowledged is True
def test_notification_service_protocol_compliance(self):
"""Test that N8NNotificationService implements the protocol correctly."""
from domain.services.notification_service import NotificationService
# Create instance
mock_client = Mock()
service = N8NNotificationService(mock_client)
# Verify it implements the protocol
assert isinstance(service, NotificationService)
# Verify method signature
import inspect
sig = inspect.signature(service.send_job_completion_notification)
params = list(sig.parameters.keys())
expected_params = ['job_id', 'status', 'processing_time', 'bearer_token']
assert params == expected_params
# Verify bearer_token has default None
bearer_token_param = sig.parameters['bearer_token']
assert bearer_token_param.default is None