Chandima Prabhath commited on
Commit
a4f374c
·
1 Parent(s): 21b1e7f

Update dependencies in requirements.txt; enhance FFmpegEncoder with availability check and improve logging in worker

Browse files
requirements.txt CHANGED
@@ -1,7 +1,6 @@
1
- fastapi[all]==0.115.12
2
- uvicorn==0.34.0
3
- valkey==6.1.0
4
- rq>=1.10.0
5
- ffmpeg-python==0.2.0
6
- python-multipart==0.0.20
7
- python-dotenv==1.1.0
 
1
+ fastapi>=0.68.0
2
+ uvicorn>=0.15.0
3
+ python-multipart>=0.0.5
4
+ redis>=4.3.4
5
+ rq>=1.10.1
6
+ python-dotenv>=0.19.0
 
video_encoder/worker/encoder.py CHANGED
@@ -1,5 +1,6 @@
1
  import subprocess
2
  import os
 
3
  from pathlib import Path
4
  from typing import List, Dict
5
  from ..config import EncodingConfig
@@ -9,6 +10,9 @@ logger = logging.getLogger(__name__)
9
 
10
  class FFmpegEncoder:
11
  def __init__(self, input_path: str, output_dir: str):
 
 
 
12
  self.input_path = input_path
13
  self.output_dir = output_dir
14
  self.base_name = Path(input_path).stem
@@ -42,22 +46,26 @@ class FFmpegEncoder:
42
  commands.append(cmd)
43
  return commands
44
 
45
- def encode(self) -> str:
46
- """Execute encoding commands and return master playlist path"""
47
- master_playlist = os.path.join(self.output_dir, f"{self.base_name}_master.m3u8")
48
 
49
- with open(master_playlist, "w") as f:
50
- f.write("#EXTM3U\n")
51
- for res in reversed(EncodingConfig.RESOLUTIONS):
52
- f.write(f"#EXT-X-STREAM-INF:BANDWIDTH={res['video_bitrate'].replace('k', '000')},"
53
- f"RESOLUTION={res['width']}x{res['height']}\n")
54
- f.write(f"{self.base_name}_{res['name']}.m3u8\n")
55
 
56
- for cmd in self.generate_commands():
57
- result = subprocess.run(cmd, capture_output=True)
58
- logger.debug(f"FFmpeg command: {' '.join(cmd)}")
59
- logger.debug(f"FFmpeg output: {result.stdout.decode()}")
60
- logger.debug(f"FFmpeg error: {result.stderr.decode()}")
61
- result.check_returncode()
 
 
 
 
62
 
63
- return master_playlist
 
1
  import subprocess
2
  import os
3
+ import shutil
4
  from pathlib import Path
5
  from typing import List, Dict
6
  from ..config import EncodingConfig
 
10
 
11
  class FFmpegEncoder:
12
  def __init__(self, input_path: str, output_dir: str):
13
+ # Verify FFmpeg is available
14
+ if not shutil.which("ffmpeg"):
15
+ raise RuntimeError("FFmpeg not found in PATH")
16
  self.input_path = input_path
17
  self.output_dir = output_dir
18
  self.base_name = Path(input_path).stem
 
46
  commands.append(cmd)
47
  return commands
48
 
49
+ def encode(self) -> str:
50
+ """Execute encoding commands and return master playlist path"""
51
+ master_playlist = os.path.join(self.output_dir, f"{self.base_name}_master.m3u8")
52
 
53
+ with open(master_playlist, "w") as f:
54
+ f.write("#EXTM3U\n")
55
+ for res in reversed(EncodingConfig.RESOLUTIONS):
56
+ f.write(f"#EXT-X-STREAM-INF:BANDWIDTH={res['video_bitrate'].replace('k', '000')},"
57
+ f"RESOLUTION={res['width']}x{res['height']}\n")
58
+ f.write(f"{self.base_name}_{res['name']}.m3u8\n")
59
 
60
+ for cmd in self.generate_commands():
61
+ logger.info(f"Executing FFmpeg command: {' '.join(cmd)}")
62
+ result = subprocess.run(cmd, capture_output=True)
63
+ logger.debug(f"FFmpeg command: {' '.join(cmd)}")
64
+ logger.debug(f"FFmpeg output: {result.stdout.decode()}")
65
+ logger.debug(f"FFmpeg error: {result.stderr.decode()}")
66
+ if result.returncode != 0:
67
+ logger.error(f"FFmpeg command failed with return code {result.returncode}")
68
+ raise subprocess.CalledProcessError(result.returncode, cmd)
69
+ result.check_returncode()
70
 
71
+ return master_playlist
video_encoder/worker/tasks.py CHANGED
@@ -23,8 +23,11 @@ def init_job(job_id: str):
23
  })
24
 
25
  def update_job_progress(job_id: str, progress: float):
26
- logger.info(f"Updating progress for job {job_id} to {progress}")
27
- RedisConfig.get_connection().hset(f"job:{job_id}", "progress", progress)
 
 
 
28
 
29
  def encode_video_task(job_id: str, input_path: str) -> Optional[str]:
30
  max_retries = 3
 
23
  })
24
 
25
  def update_job_progress(job_id: str, progress: float):
26
+ try:
27
+ logger.info(f"Updating progress for job {job_id} to {progress}")
28
+ RedisConfig.get_connection().hset(f"job:{job_id}", "progress", progress)
29
+ except Exception as e:
30
+ logger.error(f"Failed to update progress for job {job_id}: {str(e)}")
31
 
32
  def encode_video_task(job_id: str, input_path: str) -> Optional[str]:
33
  max_retries = 3
video_encoder/worker/worker.py CHANGED
@@ -1,6 +1,14 @@
1
  from rq import Worker
2
  from video_encoder.config import RedisConfig
3
 
 
 
 
4
  if __name__ == "__main__":
5
- worker = Worker([RedisConfig.QUEUE_NAME], connection=RedisConfig.get_connection(), default_result_ttl=3600)
6
- worker.work()
 
 
 
 
 
 
1
  from rq import Worker
2
  from video_encoder.config import RedisConfig
3
 
4
+ import logging
5
+ logger = logging.getLogger(__name__)
6
+
7
  if __name__ == "__main__":
8
+ try:
9
+ logger.info("Starting RQ worker")
10
+ worker = Worker([RedisConfig.QUEUE_NAME], connection=RedisConfig.get_connection(), default_result_ttl=3600)
11
+ logger.info("RQ worker started successfully")
12
+ worker.work()
13
+ except Exception as e:
14
+ logger.error(f"Failed to start RQ worker: {str(e)}")