"""Dependency injection container for services.""" from typing import Optional from pathlib import Path import logging from ..config.settings import settings from ..repositories.job_repository import InMemoryJobRepository from ..providers.file_storage_provider import create_storage_from_settings from .ffmpeg_service import FFmpegService from .file_cleanup_service import FileCleanupService # N8N related imports from ..clients.n8n.n8n_client import N8NClient from ..clients.n8n.settings import ClientSettings from .n8n_notification_service import N8NNotificationService class ServiceContainer: """Container for all infrastructure services.""" _instance: Optional['ServiceContainer'] = None def __init__(self): # Repositories self.job_repository = InMemoryJobRepository() # Create file repository based on settings self.file_repository = self._create_file_repository() # Services self.ffmpeg_service = FFmpegService( ffmpeg_path=settings.ffmpeg_path, quality_presets=settings.quality_presets, timeout_seconds=settings.ffmpeg_timeout_seconds ) self.cleanup_service = FileCleanupService( job_repo=self.job_repository, file_repo=self.file_repository, cleanup_interval_seconds=settings.cleanup_interval_seconds, retention_hours=settings.file_retention_hours ) # Create N8N notification service self.notification_service = self._create_notification_service() def _create_file_repository(self): """Create file repository based on settings.""" if settings.storage_type.lower() == "filesystem": return create_storage_from_settings( storage_type="filesystem", base_path=settings.temp_dir ) elif settings.storage_type.lower() == "r2": return create_storage_from_settings( storage_type="r2", account_id=settings.cloudflare_r2_account_id, access_key_id=settings.cloudflare_r2_access_key_id, secret_access_key=settings.cloudflare_r2_secret_access_key, bucket_name=settings.cloudflare_r2_bucket_name ) else: raise ValueError(f"Unsupported storage type: {settings.storage_type}") def _create_notification_service(self) -> Optional[N8NNotificationService]: """Create N8N notification service if enabled.""" # Check if N8N is enabled if not settings.n8n_enabled: logging.getLogger(__name__).info("N8N notifications disabled via configuration") return None # Create N8N client settings (no token needed - client tokens are passed per request) n8n_settings = ClientSettings( base_url=settings.n8n_base_url, timeout=getattr(settings, 'n8n_timeout', 30) ) # Create logger for N8N client logger = logging.getLogger("n8n_client") # Create N8N client n8n_client = N8NClient(n8n_settings, logger) # Create and return notification service return N8NNotificationService(n8n_client) @classmethod def get_instance(cls) -> 'ServiceContainer': """Get singleton instance of service container.""" if cls._instance is None: cls._instance = cls() return cls._instance async def startup(self): """Initialize services on startup.""" await self.cleanup_service.start() async def shutdown(self): """Cleanup services on shutdown.""" await self.cleanup_service.stop() # Close N8N client if it exists and has async cleanup if (self.notification_service and hasattr(self.notification_service, 'n8n_client') and hasattr(self.notification_service.n8n_client, 'close')): await self.notification_service.n8n_client.close() # Convenience function for getting services def get_services() -> ServiceContainer: """Get the service container instance.""" return ServiceContainer.get_instance()