from fastapi import FastAPI, UploadFile, HTTPException, status from fastapi.responses import JSONResponse, FileResponse from pathlib import Path import uuid import os import hmac import hashlib from datetime import datetime from rq import Queue from redis import from_url from ..config import EncodingConfig, RedisConfig from video_encoder.worker.tasks import encode_video_task, init_job import logging logger = logging.getLogger(__name__) q = Queue(connection=RedisConfig.get_connection(), default_timeout=3600) app = FastAPI(title="Video Encoding Service") @app.on_event("startup") async def startup_event(): # Initialize Redis connection on startup RedisConfig.get_connection() logger.info("Application initialization complete") @app.get("/") async def health_check(): return {"status": "ok", "redis": "connected"} @app.post("/upload") async def upload_video(file: UploadFile): job_id = str(uuid.uuid4()) temp_dir = os.path.join(EncodingConfig.TEMP_DIR, job_id) os.makedirs(temp_dir, exist_ok=True) input_path = os.path.join(temp_dir, file.filename) with open(input_path, "wb") as f: f.write(await file.read()) logger.info(f"Enqueuing job {job_id} for file {file.filename}") init_job(job_id) q.enqueue(encode_video_task, job_id, input_path) return JSONResponse( content={"job_id": job_id}, status_code=status.HTTP_202_ACCEPTED ) @app.get("/status/{job_id}") async def get_status(job_id: str): redis = RedisConfig.get_connection() progress = redis.hget(f"job:{job_id}", "progress") if not progress: logger.warning(f"Job {job_id} not found") raise HTTPException(status_code=404, detail="Job not found") return {"job_id": job_id, "progress": float(progress)} @app.get("/play/{job_id}") async def play_video(job_id: str, token: str): expected_token = hmac.new( EncodingConfig.HMAC_SECRET.encode(), job_id.encode(), hashlib.sha256 ).hexdigest() if not hmac.compare_digest(token, expected_token): logger.warning(f"Invalid token for job {job_id}") raise HTTPException(status_code=403, detail="Invalid token") master_playlist = os.path.join(EncodingConfig.TEMP_DIR, job_id, f"{job_id}_master.m3u8") if not os.path.exists(master_playlist): logger.warning(f"Playlist not found for job {job_id}") raise HTTPException(status_code=404, detail="Playlist not found") return FileResponse( master_playlist, media_type="application/vnd.apple.mpegurl" ) @app.get("/token/{job_id}") async def generate_token(job_id: str): timestamp = datetime.now().isoformat() signature = hmac.new( EncodingConfig.HMAC_SECRET.encode(), f"{job_id}{timestamp}".encode(), hashlib.sha256 ).hexdigest() logger.info(f"Generated token for job {job_id}") return {"token": f"{timestamp}:{signature}"}