Spaces:
Sleeping
Sleeping
File size: 4,237 Bytes
92fd1a7 fca9e8c 92fd1a7 37e59a0 92fd1a7 fca9e8c 92fd1a7 37e59a0 92fd1a7 fca9e8c 92fd1a7 37e59a0 3216021 fca9e8c 92fd1a7 3216021 fca9e8c 92fd1a7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
"""Dependency injection container for services."""
from typing import Optional
from pathlib import Path
import logging
from ..config.settings import settings
from ..repositories.job_repository import InMemoryJobRepository
from ..providers.file_storage_provider import create_storage_from_settings
from .ffmpeg_service import FFmpegService
from .file_cleanup_service import FileCleanupService
# N8N related imports
from ..clients.n8n.n8n_client import N8NClient
from ..clients.n8n.settings import ClientSettings
from .n8n_notification_service import N8NNotificationService
class ServiceContainer:
"""Container for all infrastructure services."""
_instance: Optional['ServiceContainer'] = None
def __init__(self):
# Repositories
self.job_repository = InMemoryJobRepository()
# Create file repository based on settings
self.file_repository = self._create_file_repository()
# Services
self.ffmpeg_service = FFmpegService(
ffmpeg_path=settings.ffmpeg_path,
quality_presets=settings.quality_presets,
timeout_seconds=settings.ffmpeg_timeout_seconds
)
self.cleanup_service = FileCleanupService(
job_repo=self.job_repository,
file_repo=self.file_repository,
cleanup_interval_seconds=settings.cleanup_interval_seconds,
retention_hours=settings.file_retention_hours
)
# Create N8N notification service
self.notification_service = self._create_notification_service()
def _create_file_repository(self):
"""Create file repository based on settings."""
if settings.storage_type.lower() == "filesystem":
return create_storage_from_settings(
storage_type="filesystem",
base_path=settings.temp_dir
)
elif settings.storage_type.lower() == "r2":
return create_storage_from_settings(
storage_type="r2",
account_id=settings.cloudflare_r2_account_id,
access_key_id=settings.cloudflare_r2_access_key_id,
secret_access_key=settings.cloudflare_r2_secret_access_key,
bucket_name=settings.cloudflare_r2_bucket_name
)
else:
raise ValueError(f"Unsupported storage type: {settings.storage_type}")
def _create_notification_service(self) -> Optional[N8NNotificationService]:
"""Create N8N notification service if enabled."""
# Check if N8N is enabled
if not settings.n8n_enabled:
logging.getLogger(__name__).info("N8N notifications disabled via configuration")
return None
# Create N8N client settings (no token needed - client tokens are passed per request)
n8n_settings = ClientSettings(
base_url=settings.n8n_base_url,
timeout=getattr(settings, 'n8n_timeout', 30)
)
# Create logger for N8N client
logger = logging.getLogger("n8n_client")
# Create N8N client
n8n_client = N8NClient(n8n_settings, logger)
# Create and return notification service
return N8NNotificationService(n8n_client)
@classmethod
def get_instance(cls) -> 'ServiceContainer':
"""Get singleton instance of service container."""
if cls._instance is None:
cls._instance = cls()
return cls._instance
async def startup(self):
"""Initialize services on startup."""
await self.cleanup_service.start()
async def shutdown(self):
"""Cleanup services on shutdown."""
await self.cleanup_service.stop()
# Close N8N client if it exists and has async cleanup
if (self.notification_service and
hasattr(self.notification_service, 'n8n_client') and
hasattr(self.notification_service.n8n_client, 'close')):
await self.notification_service.n8n_client.close()
# Convenience function for getting services
def get_services() -> ServiceContainer:
"""Get the service container instance."""
return ServiceContainer.get_instance() |