import os import logging import shutil from redis import from_url from ..config import RedisConfig from rq import Queue from pathlib import Path from typing import Optional from .encoder import FFmpegEncoder from ..config import EncodingConfig from datetime import datetime import time logger = logging.getLogger(__name__) q = Queue(connection=RedisConfig.get_connection(), default_timeout=3600) def init_job(job_id: str): RedisConfig.get_connection().hset(f"job:{job_id}", mapping={ "progress": 0.0, "status": "queued", "created_at": datetime.now().isoformat() }) def update_job_progress(job_id: str, progress: float): try: logger.info(f"Updating progress for job {job_id} to {progress}") RedisConfig.get_connection().hset(f"job:{job_id}", "progress", progress) except Exception as e: logger.error(f"Failed to update progress for job {job_id}: {str(e)}") def encode_video_task(job_id: str, input_path: str) -> Optional[str]: max_retries = 3 for attempt in range(max_retries): try: logger.info(f"Starting encoding for job {job_id} (attempt {attempt + 1})") output_dir = os.path.join(EncodingConfig.TEMP_DIR, job_id) encoder = FFmpegEncoder(input_path, output_dir) update_job_progress(job_id, 0.1) master_playlist = encoder.encode() update_job_progress(job_id, 1.0) logger.info(f"Encoding completed for job {job_id}") return master_playlist except Exception as e: logger.error(f"Encoding failed for job {job_id} (attempt {attempt + 1}): {str(e)}") update_job_progress(job_id, -1.0) if attempt < max_retries - 1: logger.info(f"Retrying encoding for job {job_id} in 5 seconds...") time.sleep(5) else: raise finally: # Cleanup temporary files after 1 hour shutil.rmtree(output_dir, ignore_errors=True)