File size: 2,933 Bytes
8b4f8f9
 
 
 
 
 
 
 
 
f7bd5c7
 
15909e3
0410763
 
 
f7bd5c7
 
8b4f8f9
 
 
0410763
 
f0052f2
 
 
 
 
 
f27ea86
0410763
8b4f8f9
 
 
 
 
0410763
8b4f8f9
 
 
0410763
 
15909e3
8b4f8f9
0410763
8b4f8f9
 
 
 
 
 
 
f7bd5c7
8b4f8f9
0410763
8b4f8f9
0410763
8b4f8f9
0410763
8b4f8f9
 
 
 
 
 
 
 
 
0410763
8b4f8f9
0410763
8b4f8f9
0410763
8b4f8f9
 
0410763
8b4f8f9
0410763
8b4f8f9
 
 
 
 
 
 
 
 
 
 
 
 
0410763
 
8b4f8f9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from fastapi import FastAPI, UploadFile, HTTPException, status
from fastapi.responses import JSONResponse, FileResponse
from pathlib import Path
import uuid
import os
import hmac
import hashlib
from datetime import datetime
from rq import Queue
from redis import from_url
from ..config import EncodingConfig, RedisConfig
from video_encoder.worker.tasks import encode_video_task, init_job
import logging

logger = logging.getLogger(__name__)

q = Queue(connection=RedisConfig.get_connection(), default_timeout=3600)

app = FastAPI(title="Video Encoding Service")

@app.on_event("startup")
async def startup_event():
    # Initialize Redis connection on startup
    RedisConfig.get_connection()
    logger.info("Application initialization complete")

@app.get("/")
async def health_check():
    return {"status": "ok", "redis": "connected"}

@app.post("/upload")
async def upload_video(file: UploadFile):
    job_id = str(uuid.uuid4())
    temp_dir = os.path.join(EncodingConfig.TEMP_DIR, job_id)
    os.makedirs(temp_dir, exist_ok=True)

    input_path = os.path.join(temp_dir, file.filename)
    with open(input_path, "wb") as f:
        f.write(await file.read())

    logger.info(f"Enqueuing job {job_id} for file {file.filename}")
    init_job(job_id)
    q.enqueue(encode_video_task, job_id, input_path)

    return JSONResponse(
        content={"job_id": job_id},
        status_code=status.HTTP_202_ACCEPTED
    )

@app.get("/status/{job_id}")
async def get_status(job_id: str):
    redis = RedisConfig.get_connection()
    progress = redis.hget(f"job:{job_id}", "progress")

    if not progress:
        logger.warning(f"Job {job_id} not found")
        raise HTTPException(status_code=404, detail="Job not found")

    return {"job_id": job_id, "progress": float(progress)}

@app.get("/play/{job_id}")
async def play_video(job_id: str, token: str):
    expected_token = hmac.new(
        EncodingConfig.HMAC_SECRET.encode(),
        job_id.encode(),
        hashlib.sha256
    ).hexdigest()

    if not hmac.compare_digest(token, expected_token):
        logger.warning(f"Invalid token for job {job_id}")
        raise HTTPException(status_code=403, detail="Invalid token")

    master_playlist = os.path.join(EncodingConfig.TEMP_DIR, job_id, f"{job_id}_master.m3u8")
    if not os.path.exists(master_playlist):
        logger.warning(f"Playlist not found for job {job_id}")
        raise HTTPException(status_code=404, detail="Playlist not found")

    return FileResponse(
        master_playlist,
        media_type="application/vnd.apple.mpegurl"
    )

@app.get("/token/{job_id}")
async def generate_token(job_id: str):
    timestamp = datetime.now().isoformat()
    signature = hmac.new(
        EncodingConfig.HMAC_SECRET.encode(),
        f"{job_id}{timestamp}".encode(),
        hashlib.sha256
    ).hexdigest()

    logger.info(f"Generated token for job {job_id}")
    return {"token": f"{timestamp}:{signature}"}