Chandima Prabhath commited on
Commit
ca7cbe1
·
1 Parent(s): 15909e3

Enhance Redis connection handling with retries; improve logging in encoding process

Browse files
video_encoder/config.py CHANGED
@@ -2,12 +2,13 @@ import os
2
  from typing import List, Dict
3
 
4
  import os
5
- from redis import from_url
6
  from dotenv import load_dotenv
 
 
7
 
8
  load_dotenv()
9
 
10
- import logging
11
  class EncodingConfig:
12
  DEBUG = os.getenv("DEBUG", "False") == "True"
13
  TEMP_DIR = "./tmp"
@@ -24,11 +25,18 @@ class RedisConfig:
24
  @classmethod
25
  def get_connection(cls):
26
  if not cls.CONN_POOL:
27
- cls.CONN_POOL = from_url(cls.URL, max_connections=20)
28
- logger.info("Successfully connected to Redis")
 
 
 
 
 
 
29
  return cls.CONN_POOL
 
30
  HMAC_SECRET = os.getenv("VIDEO_HMAC_SECRET", "default-secret-change-me")
31
-
32
  RESOLUTIONS: List[Dict] = [
33
  {
34
  "name": "1080p",
@@ -41,7 +49,7 @@ class RedisConfig:
41
  "preset": "slow"
42
  },
43
  {
44
- "name": "720p",
45
  "width": 1280,
46
  "height": 720,
47
  "video_bitrate": "2500k",
@@ -65,13 +73,13 @@ class RedisConfig:
65
  "width": 426,
66
  "height": 240,
67
  "video_bitrate": "400k",
68
- "audio_bitrate": "64k",
69
  "codec": "libx264",
70
  "profile": "baseline",
71
  "preset": "veryfast"
72
  }
73
  ]
74
-
75
  @classmethod
76
  def validate_resolutions(cls):
77
  for res in cls.RESOLUTIONS:
 
2
  from typing import List, Dict
3
 
4
  import os
5
+ from redis import from_url, RedisError
6
  from dotenv import load_dotenv
7
+ import time
8
+ import logging
9
 
10
  load_dotenv()
11
 
 
12
  class EncodingConfig:
13
  DEBUG = os.getenv("DEBUG", "False") == "True"
14
  TEMP_DIR = "./tmp"
 
25
  @classmethod
26
  def get_connection(cls):
27
  if not cls.CONN_POOL:
28
+ while True:
29
+ try:
30
+ cls.CONN_POOL = from_url(cls.URL, max_connections=20, ssl_cert_reqs=None)
31
+ logger.info("Successfully connected to Redis")
32
+ break
33
+ except RedisError as e:
34
+ logger.error(f"Could not connect to Redis instance: {e}. Retrying in 2 seconds...")
35
+ time.sleep(2)
36
  return cls.CONN_POOL
37
+
38
  HMAC_SECRET = os.getenv("VIDEO_HMAC_SECRET", "default-secret-change-me")
39
+
40
  RESOLUTIONS: List[Dict] = [
41
  {
42
  "name": "1080p",
 
49
  "preset": "slow"
50
  },
51
  {
52
+ "name": "720p",
53
  "width": 1280,
54
  "height": 720,
55
  "video_bitrate": "2500k",
 
73
  "width": 426,
74
  "height": 240,
75
  "video_bitrate": "400k",
76
+ "audio_bitrate": "64k",
77
  "codec": "libx264",
78
  "profile": "baseline",
79
  "preset": "veryfast"
80
  }
81
  ]
82
+
83
  @classmethod
84
  def validate_resolutions(cls):
85
  for res in cls.RESOLUTIONS:
video_encoder/worker/ffmpeg.py CHANGED
@@ -39,18 +39,25 @@ class FFmpegEncoder:
39
  commands.append(cmd)
40
  return commands
41
 
42
- def encode(self) -> str:
43
- """Execute encoding commands and return master playlist path"""
44
- master_playlist = os.path.join(self.output_dir, f"{self.base_name}_master.m3u8")
45
-
46
- with open(master_playlist, "w") as f:
47
- f.write("#EXTM3U\n")
48
- for res in reversed(EncodingConfig.RESOLUTIONS):
49
- f.write(f"#EXT-X-STREAM-INF:BANDWIDTH={res['video_bitrate'].replace('k', '000')},"
50
- f"RESOLUTION={res['width']}x{res['height']}\n")
51
- f.write(f"{self.base_name}_{res['name']}.m3u8\n")
52
-
53
- for cmd in self.generate_commands():
54
- subprocess.run(cmd, check=True, capture_output=True)
55
-
56
- return master_playlist
 
 
 
 
 
 
 
 
39
  commands.append(cmd)
40
  return commands
41
 
42
+ import logging
43
+ logger = logging.getLogger(__name__)
44
+
45
+ def encode(self) -> str:
46
+ """Execute encoding commands and return master playlist path"""
47
+ master_playlist = os.path.join(self.output_dir, f"{self.base_name}_master.m3u8")
48
+
49
+ with open(master_playlist, "w") as f:
50
+ f.write("#EXTM3U\n")
51
+ for res in reversed(EncodingConfig.RESOLUTIONS):
52
+ f.write(f"#EXT-X-STREAM-INF:BANDWIDTH={res['video_bitrate'].replace('k', '000')},"
53
+ f"RESOLUTION={res['width']}x{res['height']}\n")
54
+ f.write(f"{self.base_name}_{res['name']}.m3u8\n")
55
+
56
+ for cmd in self.generate_commands():
57
+ result = subprocess.run(cmd, capture_output=True)
58
+ logger.debug(f"FFmpeg command: {' '.join(cmd)}")
59
+ logger.debug(f"FFmpeg output: {result.stdout.decode()}")
60
+ logger.debug(f"FFmpeg error: {result.stderr.decode()}")
61
+ result.check_returncode()
62
+
63
+ return master_playlist
video_encoder/worker/tasks.py CHANGED
@@ -9,6 +9,7 @@ from typing import Optional
9
  from .ffmpeg import FFmpegEncoder
10
  from ..config import EncodingConfig
11
  from datetime import datetime
 
12
 
13
  logger = logging.getLogger(__name__)
14
 
@@ -26,21 +27,27 @@ def update_job_progress(job_id: str, progress: float):
26
  RedisConfig.get_connection().hset(f"job:{job_id}", "progress", progress)
27
 
28
  def encode_video_task(job_id: str, input_path: str) -> Optional[str]:
29
- try:
30
- logger.info(f"Starting encoding for job {job_id}")
31
- output_dir = os.path.join(EncodingConfig.TEMP_DIR, job_id)
32
- encoder = FFmpegEncoder(input_path, output_dir)
 
 
33
 
34
- update_job_progress(job_id, 0.1)
35
- master_playlist = encoder.encode()
36
- update_job_progress(job_id, 1.0)
37
 
38
- logger.info(f"Encoding completed for job {job_id}")
39
- return master_playlist
40
- except Exception as e:
41
- logger.error(f"Encoding failed for job {job_id}: {str(e)}")
42
- update_job_progress(job_id, -1.0)
43
- raise
44
- finally:
45
- # Cleanup temporary files after 1 hour
46
- shutil.rmtree(output_dir, ignore_errors=True)
 
 
 
 
 
9
  from .ffmpeg import FFmpegEncoder
10
  from ..config import EncodingConfig
11
  from datetime import datetime
12
+ import time
13
 
14
  logger = logging.getLogger(__name__)
15
 
 
27
  RedisConfig.get_connection().hset(f"job:{job_id}", "progress", progress)
28
 
29
  def encode_video_task(job_id: str, input_path: str) -> Optional[str]:
30
+ max_retries = 3
31
+ for attempt in range(max_retries):
32
+ try:
33
+ logger.info(f"Starting encoding for job {job_id} (attempt {attempt + 1})")
34
+ output_dir = os.path.join(EncodingConfig.TEMP_DIR, job_id)
35
+ encoder = FFmpegEncoder(input_path, output_dir)
36
 
37
+ update_job_progress(job_id, 0.1)
38
+ master_playlist = encoder.encode()
39
+ update_job_progress(job_id, 1.0)
40
 
41
+ logger.info(f"Encoding completed for job {job_id}")
42
+ return master_playlist
43
+ except Exception as e:
44
+ logger.error(f"Encoding failed for job {job_id} (attempt {attempt + 1}): {str(e)}")
45
+ update_job_progress(job_id, -1.0)
46
+ if attempt < max_retries - 1:
47
+ logger.info(f"Retrying encoding for job {job_id} in 5 seconds...")
48
+ time.sleep(5)
49
+ else:
50
+ raise
51
+ finally:
52
+ # Cleanup temporary files after 1 hour
53
+ shutil.rmtree(output_dir, ignore_errors=True)