Spaces:
Sleeping
Sleeping
| """Dependency injection container for services.""" | |
| from typing import Optional | |
| from pathlib import Path | |
| import logging | |
| from ..config.settings import settings | |
| from ..repositories.job_repository import InMemoryJobRepository | |
| from ..providers.file_storage_provider import create_storage_from_settings | |
| from .ffmpeg_service import FFmpegService | |
| from .file_cleanup_service import FileCleanupService | |
| # N8N related imports | |
| from ..clients.n8n.n8n_client import N8NClient | |
| from ..clients.n8n.settings import ClientSettings | |
| from .n8n_notification_service import N8NNotificationService | |
| class ServiceContainer: | |
| """Container for all infrastructure services.""" | |
| _instance: Optional['ServiceContainer'] = None | |
| def __init__(self): | |
| # Repositories | |
| self.job_repository = InMemoryJobRepository() | |
| # Create file repository based on settings | |
| self.file_repository = self._create_file_repository() | |
| # Services | |
| self.ffmpeg_service = FFmpegService( | |
| ffmpeg_path=settings.ffmpeg_path, | |
| quality_presets=settings.quality_presets, | |
| timeout_seconds=settings.ffmpeg_timeout_seconds | |
| ) | |
| self.cleanup_service = FileCleanupService( | |
| job_repo=self.job_repository, | |
| file_repo=self.file_repository, | |
| cleanup_interval_seconds=settings.cleanup_interval_seconds, | |
| retention_hours=settings.file_retention_hours | |
| ) | |
| # Create N8N notification service | |
| self.notification_service = self._create_notification_service() | |
| def _create_file_repository(self): | |
| """Create file repository based on settings.""" | |
| if settings.storage_type.lower() == "filesystem": | |
| return create_storage_from_settings( | |
| storage_type="filesystem", | |
| base_path=settings.temp_dir | |
| ) | |
| elif settings.storage_type.lower() == "r2": | |
| return create_storage_from_settings( | |
| storage_type="r2", | |
| account_id=settings.cloudflare_r2_account_id, | |
| access_key_id=settings.cloudflare_r2_access_key_id, | |
| secret_access_key=settings.cloudflare_r2_secret_access_key, | |
| bucket_name=settings.cloudflare_r2_bucket_name | |
| ) | |
| else: | |
| raise ValueError(f"Unsupported storage type: {settings.storage_type}") | |
| def _create_notification_service(self) -> Optional[N8NNotificationService]: | |
| """Create N8N notification service if enabled.""" | |
| # Check if N8N is enabled | |
| if not settings.n8n_enabled: | |
| logging.getLogger(__name__).info("N8N notifications disabled via configuration") | |
| return None | |
| # Create N8N client settings (no token needed - client tokens are passed per request) | |
| n8n_settings = ClientSettings( | |
| base_url=settings.n8n_base_url, | |
| timeout=getattr(settings, 'n8n_timeout', 30) | |
| ) | |
| # Create logger for N8N client | |
| logger = logging.getLogger("n8n_client") | |
| # Create N8N client | |
| n8n_client = N8NClient(n8n_settings, logger) | |
| # Create and return notification service | |
| return N8NNotificationService(n8n_client) | |
| def get_instance(cls) -> 'ServiceContainer': | |
| """Get singleton instance of service container.""" | |
| if cls._instance is None: | |
| cls._instance = cls() | |
| return cls._instance | |
| async def startup(self): | |
| """Initialize services on startup.""" | |
| await self.cleanup_service.start() | |
| async def shutdown(self): | |
| """Cleanup services on shutdown.""" | |
| await self.cleanup_service.stop() | |
| # Close N8N client if it exists and has async cleanup | |
| if (self.notification_service and | |
| hasattr(self.notification_service, 'n8n_client') and | |
| hasattr(self.notification_service.n8n_client, 'close')): | |
| await self.notification_service.n8n_client.close() | |
| # Convenience function for getting services | |
| def get_services() -> ServiceContainer: | |
| """Get the service container instance.""" | |
| return ServiceContainer.get_instance() |