Chandima Prabhath
Update dependencies in requirements.txt; enhance FFmpegEncoder with availability check and improve logging in worker
a4f374c
import os
import logging
import shutil
from redis import from_url
from ..config import RedisConfig
from rq import Queue
from pathlib import Path
from typing import Optional
from .encoder import FFmpegEncoder
from ..config import EncodingConfig
from datetime import datetime
import time
logger = logging.getLogger(__name__)
q = Queue(connection=RedisConfig.get_connection(), default_timeout=3600)
def init_job(job_id: str):
RedisConfig.get_connection().hset(f"job:{job_id}", mapping={
"progress": 0.0,
"status": "queued",
"created_at": datetime.now().isoformat()
})
def update_job_progress(job_id: str, progress: float):
try:
logger.info(f"Updating progress for job {job_id} to {progress}")
RedisConfig.get_connection().hset(f"job:{job_id}", "progress", progress)
except Exception as e:
logger.error(f"Failed to update progress for job {job_id}: {str(e)}")
def encode_video_task(job_id: str, input_path: str) -> Optional[str]:
max_retries = 3
for attempt in range(max_retries):
try:
logger.info(f"Starting encoding for job {job_id} (attempt {attempt + 1})")
output_dir = os.path.join(EncodingConfig.TEMP_DIR, job_id)
encoder = FFmpegEncoder(input_path, output_dir)
update_job_progress(job_id, 0.1)
master_playlist = encoder.encode()
update_job_progress(job_id, 1.0)
logger.info(f"Encoding completed for job {job_id}")
return master_playlist
except Exception as e:
logger.error(f"Encoding failed for job {job_id} (attempt {attempt + 1}): {str(e)}")
update_job_progress(job_id, -1.0)
if attempt < max_retries - 1:
logger.info(f"Retrying encoding for job {job_id} in 5 seconds...")
time.sleep(5)
else:
raise
finally:
# Cleanup temporary files after 1 hour
shutil.rmtree(output_dir, ignore_errors=True)