"""Dependency injection container for services.""" from typing import Optional from pathlib import Path import logging from ..config.settings import settings from ..repositories.job_repository import InMemoryJobRepository from ..providers.file_storage_provider import create_storage_from_settings from .ffmpeg_service import FFmpegService from .file_cleanup_service import FileCleanupService # N8N related imports from ..clients.n8n.n8n_client import N8NClient from ..clients.n8n.settings import ClientSettings from .n8n_notification_service import N8NNotificationService class ServiceContainer: """Container for all infrastructure services.""" _instance: Optional['ServiceContainer'] = None def __init__(self): # Repositories self.job_repository = InMemoryJobRepository() # Create file repository based on settings self.file_repository = self._create_file_repository() # Services self.ffmpeg_service = FFmpegService( ffmpeg_path=settings.ffmpeg_path, quality_presets=settings.quality_presets, timeout_seconds=settings.ffmpeg_timeout_seconds ) self.cleanup_service = FileCleanupService( job_repo=self.job_repository, file_repo=self.file_repository, cleanup_interval_seconds=settings.cleanup_interval_seconds, retention_hours=settings.file_retention_hours ) # Create N8N notification service self.notification_service = self._create_notification_service() def _create_file_repository(self): """Create file repository based on settings.""" if settings.storage_type.lower() == "filesystem": return create_storage_from_settings( storage_type="filesystem", base_path=settings.temp_dir ) elif settings.storage_type.lower() == "r2": return create_storage_from_settings( storage_type="r2", account_id=settings.cloudflare_r2_account_id, access_key_id=settings.cloudflare_r2_access_key_id, secret_access_key=settings.cloudflare_r2_secret_access_key, bucket_name=settings.cloudflare_r2_bucket_name ) else: raise ValueError(f"Unsupported storage type: {settings.storage_type}") def _create_notification_service(self) -> N8NNotificationService: """Create N8N notification service.""" # Create N8N client settings n8n_settings = ClientSettings( base_url=settings.n8n_base_url, token=settings.n8n_token, timeout=getattr(settings, 'n8n_timeout', 30) ) # Create logger for N8N client logger = logging.getLogger("n8n_client") # Create N8N client n8n_client = N8NClient(n8n_settings, logger) # Create and return notification service return N8NNotificationService(n8n_client) @classmethod def get_instance(cls) -> 'ServiceContainer': """Get singleton instance of service container.""" if cls._instance is None: cls._instance = cls() return cls._instance async def startup(self): """Initialize services on startup.""" await self.cleanup_service.start() async def shutdown(self): """Cleanup services on shutdown.""" await self.cleanup_service.stop() # Close N8N client if it has async cleanup if hasattr(self.notification_service.n8n_client, 'close'): await self.notification_service.n8n_client.close() # Convenience function for getting services def get_services() -> ServiceContainer: """Get the service container instance.""" return ServiceContainer.get_instance()