Chandima Prabhath commited on
Commit
8b4f8f9
·
1 Parent(s): 269b89b

Add initial implementation of video encoder pipeline with FastAPI and FFmpeg

Browse files
DOCS.md ADDED
@@ -0,0 +1,130 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Video Encoder Pipeline Documentation
2
+
3
+ ## Introduction
4
+ The Video Encoder Pipeline is a robust solution for encoding videos into multiple resolutions and bitrates, suitable for Video on Demand (VOD) services. This pipeline leverages FastAPI for the API, Valkey (Redis) for job queuing, and FFmpeg for video encoding.
5
+
6
+ ## Quickstart Guide
7
+
8
+ ### Prerequisites
9
+ - Python 3.8+
10
+ - FFmpeg installed system-wide
11
+ - Valkey (Redis) server running
12
+
13
+ ### Installation
14
+ 1. Clone the repository:
15
+ ```bash
16
+ git clone https://github.com/yourusername/video-encoder-pipeline.git
17
+ cd video-encoder-pipeline
18
+ ```
19
+
20
+ 2. Install the required dependencies:
21
+ ```bash
22
+ pip install -r requirements.txt
23
+ ```
24
+
25
+ ### Running the Server
26
+ 1. Start the FastAPI server:
27
+ ```bash
28
+ python run.py
29
+ ```
30
+
31
+ 2. The server will be available at `http://localhost:8000`.
32
+
33
+ ### Basic Usage
34
+ 1. Upload a video:
35
+ ```bash
36
+ curl -X POST -F "file=@input.mp4" http://localhost:8000/upload
37
+ ```
38
+
39
+ 2. Check encoding status:
40
+ ```bash
41
+ curl http://localhost:8000/status/{job_id}
42
+ ```
43
+
44
+ 3. Generate a token for playback:
45
+ ```bash
46
+ curl http://localhost:8000/token/{job_id}
47
+ ```
48
+
49
+ 4. Play encoded video:
50
+ ```bash
51
+ curl http://localhost:8000/play/{job_id}?token={valid_token}
52
+ ```
53
+
54
+ ## API Reference
55
+
56
+ ### Endpoints
57
+
58
+ #### Upload Video
59
+ - **Endpoint**: `/upload`
60
+ - **Method**: `POST`
61
+ - **Description**: Uploads a video file for encoding.
62
+ - **Parameters**:
63
+ - `file`: The video file to upload.
64
+ - **Response**:
65
+ - `job_id`: The ID of the encoding job.
66
+
67
+ #### Get Encoding Status
68
+ - **Endpoint**: `/status/{job_id}`
69
+ - **Method**: `GET`
70
+ - **Description**: Retrieves the encoding status of a job.
71
+ - **Parameters**:
72
+ - `job_id`: The ID of the encoding job.
73
+ - **Response**:
74
+ - `job_id`: The ID of the encoding job.
75
+ - `progress`: The encoding progress (0.0 to 1.0).
76
+
77
+ #### Generate Token
78
+ - **Endpoint**: `/token/{job_id}`
79
+ - **Method**: `GET`
80
+ - **Description**: Generates a token for secure playback.
81
+ - **Parameters**:
82
+ - `job_id`: The ID of the encoding job.
83
+ - **Response**:
84
+ - `token`: The generated token.
85
+
86
+ #### Play Encoded Video
87
+ - **Endpoint**: `/play/{job_id}`
88
+ - **Method**: `GET`
89
+ - **Description**: Streams the encoded video.
90
+ - **Parameters**:
91
+ - `job_id`: The ID of the encoding job.
92
+ - `token`: The generated token.
93
+ - **Response**:
94
+ - The HLS playlist for the encoded video.
95
+
96
+ ## Architecture Overview
97
+
98
+ The video encoder pipeline consists of the following components:
99
+
100
+ 1. **FastAPI**: Handles API requests and responses.
101
+ 2. **Valkey (Redis)**: Manages job queuing and progress tracking.
102
+ 3. **FFmpeg**: Performs video encoding into multiple resolutions and bitrates.
103
+ 4. **HMAC**: Ensures secure access to encoded videos.
104
+
105
+ ## Encoding Profiles
106
+
107
+ The pipeline supports the following encoding profiles:
108
+
109
+ - **1080p**: 1920x1080, 5000 kbps video, 128 kbps audio
110
+ - **720p**: 1280x720, 2500 kbps video, 128 kbps audio
111
+ - **480p**: 854x480, 1000 kbps video, 96 kbps audio
112
+ - **240p**: 426x240, 400 kbps video, 64 kbps audio
113
+
114
+ ## Security Considerations
115
+
116
+ The pipeline uses HMAC for secure access to encoded videos. Each video playback request requires a valid token generated using the `/token/{job_id}` endpoint.
117
+
118
+ ## Troubleshooting
119
+
120
+ ### Common Issues
121
+
122
+ 1. **FFmpeg Not Found**: Ensure FFmpeg is installed and accessible in the system PATH.
123
+ 2. **Job Not Found**: Ensure the `job_id` is correct and the job has been enqueued.
124
+ 3. **Invalid Token**: Ensure the token is generated using the `/token/{job_id}` endpoint and is valid.
125
+
126
+ ### Solutions
127
+
128
+ 1. **FFmpeg Not Found**: Install FFmpeg and add it to the system PATH.
129
+ 2. **Job Not Found**: Verify the `job_id` and ensure the job has been enqueued.
130
+ 3. **Invalid Token**: Generate a new token using the `/token/{job_id}` endpoint.
Dockerfile ADDED
@@ -0,0 +1,15 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ FROM python:3.10
2
+
3
+ RUN useradd -m -u 1000 user
4
+ USER user
5
+ ENV PATH="/home/user/.local/bin:$PATH"
6
+
7
+ WORKDIR /app
8
+
9
+ COPY --chown=user ./requirements.txt requirements.txt
10
+ RUN pip install --no-cache-dir --upgrade -r requirements.txt
11
+
12
+ EXPOSE 7860
13
+
14
+ COPY --chown=user . /app
15
+ CMD ["python","run.py"]
requirements.txt ADDED
@@ -0,0 +1,7 @@
 
 
 
 
 
 
 
 
1
+ fastapi[all]==0.115.12
2
+ uvicorn==0.34.0
3
+ valkey==6.1.0
4
+ rq==2.3.2
5
+ ffmpeg-python==0.2.0
6
+ python-multipart==0.0.20
7
+ python-dotenv==1.1.0
run.py ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
 
1
+ import uvicorn
2
+ from video_encoder.api.main import app
3
+
4
+ if __name__ == "__main__":
5
+ uvicorn.run(
6
+ app,
7
+ host="0.0.0.0",
8
+ port=7860
9
+ )
video_encoder/api/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ # Package initialization
video_encoder/api/main.py ADDED
@@ -0,0 +1,72 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI, UploadFile, HTTPException, status
2
+ from fastapi.responses import JSONResponse, FileResponse
3
+ from pathlib import Path
4
+ import uuid
5
+ import os
6
+ import hmac
7
+ import hashlib
8
+ from datetime import datetime
9
+ from rq import Queue
10
+ from redis import Redis
11
+ from ..config import EncodingConfig
12
+ from ..worker.tasks import encode_video_task, q
13
+
14
+ app = FastAPI(title="Video Encoding Service")
15
+
16
+ @app.post("/upload")
17
+ async def upload_video(file: UploadFile):
18
+ job_id = str(uuid.uuid4())
19
+ temp_dir = os.path.join(EncodingConfig.TEMP_DIR, job_id)
20
+ os.makedirs(temp_dir, exist_ok=True)
21
+
22
+ input_path = os.path.join(temp_dir, file.filename)
23
+ with open(input_path, "wb") as f:
24
+ f.write(await file.read())
25
+
26
+ q.enqueue(encode_video_task, job_id, input_path)
27
+
28
+ return JSONResponse(
29
+ content={"job_id": job_id},
30
+ status_code=status.HTTP_202_ACCEPTED
31
+ )
32
+
33
+ @app.get("/status/{job_id}")
34
+ async def get_status(job_id: str):
35
+ redis = Redis()
36
+ progress = redis.hget(f"job:{job_id}", "progress")
37
+
38
+ if not progress:
39
+ raise HTTPException(status_code=404, detail="Job not found")
40
+
41
+ return {"job_id": job_id, "progress": float(progress)}
42
+
43
+ @app.get("/play/{job_id}")
44
+ async def play_video(job_id: str, token: str):
45
+ expected_token = hmac.new(
46
+ EncodingConfig.HMAC_SECRET.encode(),
47
+ job_id.encode(),
48
+ hashlib.sha256
49
+ ).hexdigest()
50
+
51
+ if not hmac.compare_digest(token, expected_token):
52
+ raise HTTPException(status_code=403, detail="Invalid token")
53
+
54
+ master_playlist = os.path.join(EncodingConfig.TEMP_DIR, job_id, f"{job_id}_master.m3u8")
55
+ if not os.path.exists(master_playlist):
56
+ raise HTTPException(status_code=404, detail="Playlist not found")
57
+
58
+ return FileResponse(
59
+ master_playlist,
60
+ media_type="application/vnd.apple.mpegurl"
61
+ )
62
+
63
+ @app.get("/token/{job_id}")
64
+ async def generate_token(job_id: str):
65
+ timestamp = datetime.now().isoformat()
66
+ signature = hmac.new(
67
+ EncodingConfig.HMAC_SECRET.encode(),
68
+ f"{job_id}{timestamp}".encode(),
69
+ hashlib.sha256
70
+ ).hexdigest()
71
+
72
+ return {"token": f"{timestamp}:{signature}"}
video_encoder/config.py ADDED
@@ -0,0 +1,55 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from typing import List, Dict
3
+
4
+ class EncodingConfig:
5
+ TEMP_DIR = "./tmp"
6
+ HMAC_SECRET = os.getenv("VIDEO_HMAC_SECRET", "default-secret-change-me")
7
+
8
+ RESOLUTIONS: List[Dict] = [
9
+ {
10
+ "name": "1080p",
11
+ "width": 1920,
12
+ "height": 1080,
13
+ "video_bitrate": "5000k",
14
+ "audio_bitrate": "128k",
15
+ "codec": "libx264",
16
+ "profile": "high",
17
+ "preset": "slow"
18
+ },
19
+ {
20
+ "name": "720p",
21
+ "width": 1280,
22
+ "height": 720,
23
+ "video_bitrate": "2500k",
24
+ "audio_bitrate": "128k",
25
+ "codec": "libx264",
26
+ "profile": "main",
27
+ "preset": "medium"
28
+ },
29
+ {
30
+ "name": "480p",
31
+ "width": 854,
32
+ "height": 480,
33
+ "video_bitrate": "1000k",
34
+ "audio_bitrate": "96k",
35
+ "codec": "libx264",
36
+ "profile": "baseline",
37
+ "preset": "fast"
38
+ },
39
+ {
40
+ "name": "240p",
41
+ "width": 426,
42
+ "height": 240,
43
+ "video_bitrate": "400k",
44
+ "audio_bitrate": "64k",
45
+ "codec": "libx264",
46
+ "profile": "baseline",
47
+ "preset": "veryfast"
48
+ }
49
+ ]
50
+
51
+ @classmethod
52
+ def validate_resolutions(cls):
53
+ for res in cls.RESOLUTIONS:
54
+ if not all(key in res for key in ['name', 'width', 'height', 'video_bitrate']):
55
+ raise ValueError(f"Invalid resolution config: {res}")
video_encoder/worker/ffmpeg.py ADDED
@@ -0,0 +1,56 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import subprocess
2
+ import os
3
+ from pathlib import Path
4
+ from typing import List, Dict
5
+ from ..config import EncodingConfig
6
+
7
+ class FFmpegEncoder:
8
+ def __init__(self, input_path: str, output_dir: str):
9
+ self.input_path = input_path
10
+ self.output_dir = output_dir
11
+ self.base_name = Path(input_path).stem
12
+ os.makedirs(output_dir, exist_ok=True)
13
+
14
+ def generate_commands(self) -> List[List[str]]:
15
+ """Generate FFmpeg commands for all resolutions"""
16
+ commands = []
17
+ for res in EncodingConfig.RESOLUTIONS:
18
+ output_path = os.path.join(
19
+ self.output_dir,
20
+ f"{self.base_name}_{res['name']}.m3u8"
21
+ )
22
+
23
+ cmd = [
24
+ "ffmpeg", "-i", self.input_path,
25
+ "-vf", f"scale={res['width']}:{res['height']}",
26
+ "-c:v", res["codec"],
27
+ "-profile:v", res["profile"],
28
+ "-preset", res["preset"],
29
+ "-b:v", res["video_bitrate"],
30
+ "-c:a", "aac",
31
+ "-b:a", res["audio_bitrate"],
32
+ "-f", "hls",
33
+ "-hls_time", "6",
34
+ "-hls_playlist_type", "vod",
35
+ "-hls_segment_filename",
36
+ os.path.join(self.output_dir, f"{self.base_name}_{res['name']}_%03d.ts"),
37
+ output_path
38
+ ]
39
+ commands.append(cmd)
40
+ return commands
41
+
42
+ def encode(self) -> str:
43
+ """Execute encoding commands and return master playlist path"""
44
+ master_playlist = os.path.join(self.output_dir, f"{self.base_name}_master.m3u8")
45
+
46
+ with open(master_playlist, "w") as f:
47
+ f.write("#EXTM3U\n")
48
+ for res in reversed(EncodingConfig.RESOLUTIONS):
49
+ f.write(f"#EXT-X-STREAM-INF:BANDWIDTH={res['video_bitrate'].replace('k', '000')},"
50
+ f"RESOLUTION={res['width']}x{res['height']}\n")
51
+ f.write(f"{self.base_name}_{res['name']}.m3u8\n")
52
+
53
+ for cmd in self.generate_commands():
54
+ subprocess.run(cmd, check=True, capture_output=True)
55
+
56
+ return master_playlist
video_encoder/worker/tasks.py ADDED
@@ -0,0 +1,35 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import logging
3
+ import shutil
4
+ from redis import Redis
5
+ from rq import Queue
6
+ from pathlib import Path
7
+ from typing import Optional
8
+ from .ffmpeg import FFmpegEncoder
9
+ from ..config import EncodingConfig
10
+
11
+ logging.basicConfig(level=logging.INFO)
12
+ logger = logging.getLogger(__name__)
13
+
14
+ q = Queue(connection=Redis(), default_timeout=3600)
15
+
16
+ def update_job_progress(job_id: str, progress: float):
17
+ Redis().hset(f"job:{job_id}", "progress", progress)
18
+
19
+ def encode_video_task(job_id: str, input_path: str) -> Optional[str]:
20
+ try:
21
+ output_dir = os.path.join(EncodingConfig.TEMP_DIR, job_id)
22
+ encoder = FFmpegEncoder(input_path, output_dir)
23
+
24
+ update_job_progress(job_id, 0.1)
25
+ master_playlist = encoder.encode()
26
+ update_job_progress(job_id, 1.0)
27
+
28
+ return master_playlist
29
+ except Exception as e:
30
+ logger.error(f"Encoding failed for job {job_id}: {str(e)}")
31
+ update_job_progress(job_id, -1.0)
32
+ raise
33
+ finally:
34
+ # Cleanup temporary files after 1 hour
35
+ pass # Implement retention policy as needed