import os from typing import List, Dict import os from redis import from_url, RedisError from dotenv import load_dotenv import time import logging load_dotenv() class EncodingConfig: DEBUG = os.getenv("DEBUG", "True") == "True" TEMP_DIR = "/tmp" LOG_LEVEL = logging.DEBUG if DEBUG else logging.INFO logging.basicConfig(level=EncodingConfig.LOG_LEVEL) logger = logging.getLogger(__name__) class RedisConfig: URL = os.getenv("REDIS_URL") CONN_POOL = None # Will be initialized on first use QUEUE_NAME = "video_encoding_queue" @classmethod def get_connection(cls): if not cls.CONN_POOL: while True: try: cls.CONN_POOL = from_url(cls.URL, max_connections=20, ssl_cert_reqs=None) logger.info("Successfully connected to Redis") break except RedisError as e: logger.error(f"Could not connect to Redis instance: {e}. Retrying in 2 seconds...") time.sleep(2) return cls.CONN_POOL HMAC_SECRET = os.getenv("VIDEO_HMAC_SECRET", "default-secret-change-me") RESOLUTIONS: List[Dict] = [ { "name": "1080p", "width": 1920, "height": 1080, "video_bitrate": "5000k", "audio_bitrate": "128k", "codec": "libx264", "profile": "high", "preset": "slow" }, { "name": "720p", "width": 1280, "height": 720, "video_bitrate": "2500k", "audio_bitrate": "128k", "codec": "libx264", "profile": "main", "preset": "medium" }, { "name": "480p", "width": 854, "height": 480, "video_bitrate": "1000k", "audio_bitrate": "96k", "codec": "libx264", "profile": "baseline", "preset": "fast" }, { "name": "240p", "width": 426, "height": 240, "video_bitrate": "400k", "audio_bitrate": "64k", "codec": "libx264", "profile": "baseline", "preset": "veryfast" } ] @classmethod def validate_resolutions(cls): for res in cls.RESOLUTIONS: if not all(key in res for key in ['name', 'width', 'height', 'video_bitrate']): raise ValueError(f"Invalid resolution config: {res}")