Tadeas Kosek
implement n8n client
fca9e8c
raw
history blame
3.88 kB
"""Dependency injection container for services."""
from typing import Optional
from pathlib import Path
import logging
from ..config.settings import settings
from ..repositories.job_repository import InMemoryJobRepository
from ..providers.file_storage_provider import create_storage_from_settings
from .ffmpeg_service import FFmpegService
from .file_cleanup_service import FileCleanupService
# N8N related imports
from ..clients.n8n.n8n_client import N8NClient
from ..clients.n8n.settings import ClientSettings
from .n8n_notification_service import N8NNotificationService
class ServiceContainer:
"""Container for all infrastructure services."""
_instance: Optional['ServiceContainer'] = None
def __init__(self):
# Repositories
self.job_repository = InMemoryJobRepository()
# Create file repository based on settings
self.file_repository = self._create_file_repository()
# Services
self.ffmpeg_service = FFmpegService(
ffmpeg_path=settings.ffmpeg_path,
quality_presets=settings.quality_presets,
timeout_seconds=settings.ffmpeg_timeout_seconds
)
self.cleanup_service = FileCleanupService(
job_repo=self.job_repository,
file_repo=self.file_repository,
cleanup_interval_seconds=settings.cleanup_interval_seconds,
retention_hours=settings.file_retention_hours
)
# Create N8N notification service
self.notification_service = self._create_notification_service()
def _create_file_repository(self):
"""Create file repository based on settings."""
if settings.storage_type.lower() == "filesystem":
return create_storage_from_settings(
storage_type="filesystem",
base_path=settings.temp_dir
)
elif settings.storage_type.lower() == "r2":
return create_storage_from_settings(
storage_type="r2",
account_id=settings.cloudflare_r2_account_id,
access_key_id=settings.cloudflare_r2_access_key_id,
secret_access_key=settings.cloudflare_r2_secret_access_key,
bucket_name=settings.cloudflare_r2_bucket_name
)
else:
raise ValueError(f"Unsupported storage type: {settings.storage_type}")
def _create_notification_service(self) -> N8NNotificationService:
"""Create N8N notification service."""
# Create N8N client settings
n8n_settings = ClientSettings(
base_url=settings.n8n_base_url,
token=settings.n8n_token,
timeout=getattr(settings, 'n8n_timeout', 30)
)
# Create logger for N8N client
logger = logging.getLogger("n8n_client")
# Create N8N client
n8n_client = N8NClient(n8n_settings, logger)
# Create and return notification service
return N8NNotificationService(n8n_client)
@classmethod
def get_instance(cls) -> 'ServiceContainer':
"""Get singleton instance of service container."""
if cls._instance is None:
cls._instance = cls()
return cls._instance
async def startup(self):
"""Initialize services on startup."""
await self.cleanup_service.start()
async def shutdown(self):
"""Cleanup services on shutdown."""
await self.cleanup_service.stop()
# Close N8N client if it has async cleanup
if hasattr(self.notification_service.n8n_client, 'close'):
await self.notification_service.n8n_client.close()
# Convenience function for getting services
def get_services() -> ServiceContainer:
"""Get the service container instance."""
return ServiceContainer.get_instance()