Tadeas Kosek
n8n use passed token
3216021
"""Dependency injection container for services."""
from typing import Optional
from pathlib import Path
import logging
from ..config.settings import settings
from ..repositories.job_repository import InMemoryJobRepository
from ..providers.file_storage_provider import create_storage_from_settings
from .ffmpeg_service import FFmpegService
from .file_cleanup_service import FileCleanupService
# N8N related imports
from ..clients.n8n.n8n_client import N8NClient
from ..clients.n8n.settings import ClientSettings
from .n8n_notification_service import N8NNotificationService
class ServiceContainer:
"""Container for all infrastructure services."""
_instance: Optional['ServiceContainer'] = None
def __init__(self):
# Repositories
self.job_repository = InMemoryJobRepository()
# Create file repository based on settings
self.file_repository = self._create_file_repository()
# Services
self.ffmpeg_service = FFmpegService(
ffmpeg_path=settings.ffmpeg_path,
quality_presets=settings.quality_presets,
timeout_seconds=settings.ffmpeg_timeout_seconds
)
self.cleanup_service = FileCleanupService(
job_repo=self.job_repository,
file_repo=self.file_repository,
cleanup_interval_seconds=settings.cleanup_interval_seconds,
retention_hours=settings.file_retention_hours
)
# Create N8N notification service
self.notification_service = self._create_notification_service()
def _create_file_repository(self):
"""Create file repository based on settings."""
if settings.storage_type.lower() == "filesystem":
return create_storage_from_settings(
storage_type="filesystem",
base_path=settings.temp_dir
)
elif settings.storage_type.lower() == "r2":
return create_storage_from_settings(
storage_type="r2",
account_id=settings.cloudflare_r2_account_id,
access_key_id=settings.cloudflare_r2_access_key_id,
secret_access_key=settings.cloudflare_r2_secret_access_key,
bucket_name=settings.cloudflare_r2_bucket_name
)
else:
raise ValueError(f"Unsupported storage type: {settings.storage_type}")
def _create_notification_service(self) -> Optional[N8NNotificationService]:
"""Create N8N notification service if enabled."""
# Check if N8N is enabled
if not settings.n8n_enabled:
logging.getLogger(__name__).info("N8N notifications disabled via configuration")
return None
# Create N8N client settings (no token needed - client tokens are passed per request)
n8n_settings = ClientSettings(
base_url=settings.n8n_base_url,
timeout=getattr(settings, 'n8n_timeout', 30)
)
# Create logger for N8N client
logger = logging.getLogger("n8n_client")
# Create N8N client
n8n_client = N8NClient(n8n_settings, logger)
# Create and return notification service
return N8NNotificationService(n8n_client)
@classmethod
def get_instance(cls) -> 'ServiceContainer':
"""Get singleton instance of service container."""
if cls._instance is None:
cls._instance = cls()
return cls._instance
async def startup(self):
"""Initialize services on startup."""
await self.cleanup_service.start()
async def shutdown(self):
"""Cleanup services on shutdown."""
await self.cleanup_service.stop()
# Close N8N client if it exists and has async cleanup
if (self.notification_service and
hasattr(self.notification_service, 'n8n_client') and
hasattr(self.notification_service.n8n_client, 'close')):
await self.notification_service.n8n_client.close()
# Convenience function for getting services
def get_services() -> ServiceContainer:
"""Get the service container instance."""
return ServiceContainer.get_instance()