Tadeas Kosek commited on
Commit
fca9e8c
·
1 Parent(s): b8ef32f

implement n8n client

Browse files
application/use_cases/container.py CHANGED
@@ -29,11 +29,12 @@ class UseCaseContainer:
29
  def _init_use_cases(self):
30
  """Initialize all use cases with dependencies."""
31
 
32
- # Process job use case (needed by async extractor)
33
  self.process_job = ProcessJobUseCase(
34
  job_repository=self.services.job_repository,
35
  ffmpeg_service=self.services.ffmpeg_service,
36
- file_repository=self.services.file_repository
 
37
  )
38
 
39
  # Async extraction use case
 
29
  def _init_use_cases(self):
30
  """Initialize all use cases with dependencies."""
31
 
32
+ # Process job use case (updated with notification service)
33
  self.process_job = ProcessJobUseCase(
34
  job_repository=self.services.job_repository,
35
  ffmpeg_service=self.services.ffmpeg_service,
36
+ file_repository=self.services.file_repository,
37
+ notification_service=self.services.notification_service # Added notification service
38
  )
39
 
40
  # Async extraction use case
application/use_cases/process_job.py CHANGED
@@ -36,13 +36,22 @@ class FileRepository(Protocol):
36
  async def delete_file(self, file_path: str) -> bool:
37
  ...
38
 
 
 
 
 
 
 
 
 
39
  class ProcessJobUseCase:
40
  """Use case for processing a queued extraction job."""
41
 
42
- def __init__(self, file_repository, ffmpeg_service, job_repository):
43
  self.file_repository = file_repository
44
  self.ffmpeg_service = ffmpeg_service
45
  self.job_repository = job_repository
 
46
  self.file_processor = LocalFileProcessor(file_repository)
47
 
48
  async def execute(self, job_id: str, request: ExtractionRequestDTO):
@@ -78,6 +87,12 @@ class ProcessJobUseCase:
78
  output_path=output_key,
79
  processing_time=processing_time
80
  )
 
 
 
 
 
 
81
 
82
  logger.info(f"Job {job_id} completed in {processing_time:.2f} seconds")
83
 
 
36
  async def delete_file(self, file_path: str) -> bool:
37
  ...
38
 
39
+ class NotificationService(Protocol):
40
+ """Protocol for notification service."""
41
+ async def send_job_completion_notification(self,
42
+ job_id: str,
43
+ status: str,
44
+ processing_time: float) -> Any:
45
+ ...
46
+
47
  class ProcessJobUseCase:
48
  """Use case for processing a queued extraction job."""
49
 
50
+ def __init__(self, file_repository, ffmpeg_service, job_repository, notification_service):
51
  self.file_repository = file_repository
52
  self.ffmpeg_service = ffmpeg_service
53
  self.job_repository = job_repository
54
+ self.notification_service = notification_service
55
  self.file_processor = LocalFileProcessor(file_repository)
56
 
57
  async def execute(self, job_id: str, request: ExtractionRequestDTO):
 
87
  output_path=output_key,
88
  processing_time=processing_time
89
  )
90
+
91
+ await self.notification_service.send_job_completion_notification(
92
+ job_id=job_id,
93
+ status="completed",
94
+ processing_time=processing_time
95
+ )
96
 
97
  logger.info(f"Job {job_id} completed in {processing_time:.2f} seconds")
98
 
domain/services/notification_service.py ADDED
@@ -0,0 +1,19 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Protocol
2
+ from dataclasses import dataclass
3
+
4
+ @dataclass
5
+ class NotificationRequest:
6
+ message: str
7
+
8
+ @dataclass
9
+ class NotificationResponse:
10
+ acknowledged: bool
11
+
12
+ class NotificationService(Protocol):
13
+ """Protocol for sending notifications after job completion."""
14
+
15
+ async def send_job_completion_notification(self,
16
+ job_id: str,
17
+ status: str,
18
+ processing_time: float) -> NotificationResponse:
19
+ ...
infrastructure/clients/__init__.py ADDED
File without changes
infrastructure/clients/n8n/__init__.py ADDED
File without changes
infrastructure/clients/n8n/exceptions.py ADDED
@@ -0,0 +1,17 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ class APIClientError(Exception):
2
+ """Base exception for API client errors."""
3
+ pass
4
+
5
+
6
+ class APIConnectionError(APIClientError):
7
+ """Raised when connection to API fails."""
8
+ pass
9
+
10
+
11
+ class APIResponseError(APIClientError):
12
+ """Raised when API returns an error response."""
13
+
14
+ def __init__(self, message: str, status_code: int, response_body: str = ""):
15
+ super().__init__(message)
16
+ self.status_code = status_code
17
+ self.response_body = response_body
infrastructure/clients/n8n/models.py ADDED
@@ -0,0 +1,23 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from dataclasses import dataclass
2
+ from typing import Protocol
3
+
4
+
5
+ @dataclass
6
+ class WebhooksRequest:
7
+ """Request model for webhooks endpoint."""
8
+ message: str
9
+ job_id: str
10
+
11
+
12
+ @dataclass
13
+ class WebhooksResponse:
14
+ """Response model for webhooks endpoint."""
15
+ acknowledged: bool
16
+
17
+
18
+ class N8NClientProtocol(Protocol):
19
+ """Protocol defining the API client interface."""
20
+
21
+ async def post_completion_event(self, data: WebhooksRequest) -> WebhooksResponse:
22
+ """Post to webhooks endpoint."""
23
+ ...
infrastructure/clients/n8n/n8n_client.py ADDED
@@ -0,0 +1,114 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import logging
3
+ from typing import Dict, Any, Optional
4
+ import httpx
5
+ from .settings import ClientSettings
6
+ from .models import WebhooksRequest, WebhooksResponse, N8NClientProtocol
7
+ from .exceptions import APIClientError, APIConnectionError, APIResponseError
8
+ from dataclasses import asdict
9
+
10
+
11
+ class N8NClient(N8NClientProtocol):
12
+ """HTTP client for API interactions with automatic auth and logging."""
13
+
14
+ def __init__(self, settings: ClientSettings, logger: logging.Logger):
15
+ self.settings = settings
16
+ self.logger = logger
17
+ self._client = httpx.AsyncClient(
18
+ base_url=settings.base_url,
19
+ timeout=settings.timeout,
20
+ headers=self._get_default_headers()
21
+ )
22
+
23
+ def _get_default_headers(self) -> Dict[str, str]:
24
+ """Get default headers including authentication."""
25
+ return {
26
+ "Authorization": f"Bearer {self.settings.token}",
27
+ "Content-Type": "application/json",
28
+ "Accept": "application/json"
29
+ }
30
+
31
+ async def _make_request(
32
+ self,
33
+ method: str,
34
+ endpoint: str,
35
+ data: Optional[Dict[str, Any]] = None,
36
+ custom_headers: Optional[Dict[str, str]] = None
37
+ ) -> Dict[str, Any]:
38
+ """Make HTTP request with automatic logging and error handling."""
39
+ url = endpoint
40
+ headers = dict(self._client.headers)
41
+
42
+ if custom_headers:
43
+ headers.update(custom_headers)
44
+
45
+ # Log request
46
+ self.logger.info(
47
+ f"Making {method} request to {url}",
48
+ extra={
49
+ "method": method,
50
+ "url": url,
51
+ "headers": {k: v if k.lower() != "authorization" else "Bearer ***" for k, v in headers.items()},
52
+ "payload": data
53
+ }
54
+ )
55
+
56
+ try:
57
+ response = await self._client.request(
58
+ method=method,
59
+ url=url,
60
+ json=data,
61
+ headers=custom_headers # Pass custom headers to the request
62
+ )
63
+
64
+ # Log response
65
+ self.logger.info(
66
+ f"Received response from {url}",
67
+ extra={
68
+ "status_code": response.status_code,
69
+ "response_headers": dict(response.headers),
70
+ "response_body": response.text
71
+ }
72
+ )
73
+
74
+ # Handle HTTP errors
75
+ if response.status_code >= 400:
76
+ raise APIResponseError(
77
+ f"API request failed with status {response.status_code}",
78
+ status_code=response.status_code,
79
+ response_body=response.text
80
+ )
81
+
82
+ return response.json()
83
+
84
+ except httpx.RequestError as e:
85
+ self.logger.error(f"Connection error for {url}: {str(e)}")
86
+ raise APIConnectionError(f"Failed to connect to API: {str(e)}")
87
+ except json.JSONDecodeError as e:
88
+ self.logger.error(f"Failed to parse JSON response from {url}: {str(e)}")
89
+ raise APIResponseError(f"Invalid JSON response: {str(e)}", response.status_code)
90
+
91
+ async def post_completion_event(self, data: WebhooksRequest) -> WebhooksResponse:
92
+ """Post to webhooks endpoint."""
93
+ from dataclasses import asdict
94
+ payload = asdict(data)
95
+
96
+ # Extract job_id for header and remove from payload
97
+ job_id = payload.pop("job_id")
98
+ custom_headers = {"rowID": job_id}
99
+
100
+ response_data = await self._make_request("POST", "/lovable-analysis", payload, custom_headers)
101
+
102
+ return WebhooksResponse(acknowledged=response_data.get("acknowledged", False))
103
+
104
+ async def close(self):
105
+ """Close the HTTP client."""
106
+ await self._client.aclose()
107
+
108
+ async def __aenter__(self):
109
+ """Async context manager entry."""
110
+ return self
111
+
112
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
113
+ """Async context manager exit."""
114
+ await self.close()
infrastructure/clients/n8n/settings.py ADDED
@@ -0,0 +1,10 @@
 
 
 
 
 
 
 
 
 
 
 
1
+ from dataclasses import dataclass
2
+ from typing import Optional
3
+
4
+
5
+ @dataclass
6
+ class ClientSettings:
7
+ """Settings for the HTTP client."""
8
+ base_url: str
9
+ token: str
10
+ timeout: Optional[int] = 30
infrastructure/config/settings.py CHANGED
@@ -85,6 +85,11 @@ class Settings(BaseSettings):
85
  'm4a': 'audio/mp4',
86
  'ogg': 'audio/ogg'
87
  }
 
 
 
 
 
88
 
89
  class Config:
90
  env_file = ".env"
 
85
  'm4a': 'audio/mp4',
86
  'ogg': 'audio/ogg'
87
  }
88
+
89
+ # N8N Configuration
90
+ n8n_base_url: str = Field(default="http://localhost:5678", env="N8N_BASE_URL")
91
+ n8n_token: str = Field(env="N8N_TOKEN")
92
+ n8n_timeout: int = Field(default=30, env="N8N_TIMEOUT")
93
 
94
  class Config:
95
  env_file = ".env"
infrastructure/services/container.py CHANGED
@@ -1,6 +1,7 @@
1
  """Dependency injection container for services."""
2
  from typing import Optional
3
  from pathlib import Path
 
4
 
5
  from ..config.settings import settings
6
  from ..repositories.job_repository import InMemoryJobRepository
@@ -8,6 +9,11 @@ from ..providers.file_storage_provider import create_storage_from_settings
8
  from .ffmpeg_service import FFmpegService
9
  from .file_cleanup_service import FileCleanupService
10
 
 
 
 
 
 
11
  class ServiceContainer:
12
  """Container for all infrastructure services."""
13
 
@@ -33,6 +39,9 @@ class ServiceContainer:
33
  cleanup_interval_seconds=settings.cleanup_interval_seconds,
34
  retention_hours=settings.file_retention_hours
35
  )
 
 
 
36
 
37
  def _create_file_repository(self):
38
  """Create file repository based on settings."""
@@ -52,6 +61,24 @@ class ServiceContainer:
52
  else:
53
  raise ValueError(f"Unsupported storage type: {settings.storage_type}")
54
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55
  @classmethod
56
  def get_instance(cls) -> 'ServiceContainer':
57
  """Get singleton instance of service container."""
@@ -66,6 +93,9 @@ class ServiceContainer:
66
  async def shutdown(self):
67
  """Cleanup services on shutdown."""
68
  await self.cleanup_service.stop()
 
 
 
69
 
70
  # Convenience function for getting services
71
  def get_services() -> ServiceContainer:
 
1
  """Dependency injection container for services."""
2
  from typing import Optional
3
  from pathlib import Path
4
+ import logging
5
 
6
  from ..config.settings import settings
7
  from ..repositories.job_repository import InMemoryJobRepository
 
9
  from .ffmpeg_service import FFmpegService
10
  from .file_cleanup_service import FileCleanupService
11
 
12
+ # N8N related imports
13
+ from ..clients.n8n.n8n_client import N8NClient
14
+ from ..clients.n8n.settings import ClientSettings
15
+ from .n8n_notification_service import N8NNotificationService
16
+
17
  class ServiceContainer:
18
  """Container for all infrastructure services."""
19
 
 
39
  cleanup_interval_seconds=settings.cleanup_interval_seconds,
40
  retention_hours=settings.file_retention_hours
41
  )
42
+
43
+ # Create N8N notification service
44
+ self.notification_service = self._create_notification_service()
45
 
46
  def _create_file_repository(self):
47
  """Create file repository based on settings."""
 
61
  else:
62
  raise ValueError(f"Unsupported storage type: {settings.storage_type}")
63
 
64
+ def _create_notification_service(self) -> N8NNotificationService:
65
+ """Create N8N notification service."""
66
+ # Create N8N client settings
67
+ n8n_settings = ClientSettings(
68
+ base_url=settings.n8n_base_url,
69
+ token=settings.n8n_token,
70
+ timeout=getattr(settings, 'n8n_timeout', 30)
71
+ )
72
+
73
+ # Create logger for N8N client
74
+ logger = logging.getLogger("n8n_client")
75
+
76
+ # Create N8N client
77
+ n8n_client = N8NClient(n8n_settings, logger)
78
+
79
+ # Create and return notification service
80
+ return N8NNotificationService(n8n_client)
81
+
82
  @classmethod
83
  def get_instance(cls) -> 'ServiceContainer':
84
  """Get singleton instance of service container."""
 
93
  async def shutdown(self):
94
  """Cleanup services on shutdown."""
95
  await self.cleanup_service.stop()
96
+ # Close N8N client if it has async cleanup
97
+ if hasattr(self.notification_service.n8n_client, 'close'):
98
+ await self.notification_service.n8n_client.close()
99
 
100
  # Convenience function for getting services
101
  def get_services() -> ServiceContainer:
infrastructure/services/n8n_notification_service.py ADDED
@@ -0,0 +1,29 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from domain.services.notification_service import NotificationService, NotificationRequest, NotificationResponse
2
+ import logging
3
+
4
+ from infrastructure.clients.n8n.n8n_client import N8NClient
5
+
6
+ logger = logging.getLogger(__name__)
7
+
8
+ class N8NNotificationService(NotificationService):
9
+ """N8N implementation of notification service."""
10
+
11
+ def __init__(self, n8n_client: N8NClient):
12
+ self.n8n_client = n8n_client
13
+
14
+ async def send_job_completion_notification(self,
15
+ job_id: str,
16
+ status: str,
17
+ processing_time: float) -> NotificationResponse:
18
+ """Send job completion notification via N8N."""
19
+ try:
20
+ message = f"Job {job_id} {status} in {processing_time:.2f}s"
21
+ request = NotificationRequest(message=message, job_id=job_id)
22
+
23
+ response = await self.n8n_client.post_completion_event(request)
24
+ return NotificationResponse(acknowledged=response.acknowledged)
25
+
26
+ except Exception as e:
27
+ logger.error(f"Failed to send notification for job {job_id}: {e}")
28
+ # You might want to return False or re-raise depending on your error handling strategy
29
+ return NotificationResponse(acknowledged=False)
requirements.txt CHANGED
@@ -5,6 +5,7 @@ uvicorn[standard]==0.24.0
5
  python-multipart==0.0.6
6
  pydantic==2.5.0
7
  pydantic-settings==2.1.0
 
8
 
9
  # File Processing
10
  aiofiles==23.2.1
 
5
  python-multipart==0.0.6
6
  pydantic==2.5.0
7
  pydantic-settings==2.1.0
8
+ httpx==0.27.2
9
 
10
  # File Processing
11
  aiofiles==23.2.1