Chandima Prabhath commited on
Commit
0410763
·
1 Parent(s): 9792bdd

Add logging and threading support for video encoding worker

Browse files
run.py CHANGED
@@ -1,7 +1,20 @@
1
  import uvicorn
2
  from video_encoder.api.main import app
 
 
 
3
 
4
  if __name__ == "__main__":
 
 
 
 
 
 
 
 
 
 
5
  uvicorn.run(
6
  app,
7
  host="0.0.0.0",
 
1
  import uvicorn
2
  from video_encoder.api.main import app
3
+ from rq import Worker, Queue, Connection
4
+ from redis import Redis
5
+ from video_encoder.config import RedisConfig
6
 
7
  if __name__ == "__main__":
8
+ import threading
9
+
10
+ def start_worker():
11
+ with Connection(RedisConfig.get_connection()):
12
+ worker = Worker(map(Queue, [RedisConfig.QUEUE_NAME]))
13
+ worker.work()
14
+
15
+ worker_thread = threading.Thread(target=start_worker)
16
+ worker_thread.start()
17
+
18
  uvicorn.run(
19
  app,
20
  host="0.0.0.0",
video_encoder/api/main.py CHANGED
@@ -10,23 +10,31 @@ from rq import Queue
10
  from redis import from_url
11
  from ..config import EncodingConfig, RedisConfig
12
  from ..worker.tasks import encode_video_task
 
 
 
13
 
14
  q = Queue(connection=RedisConfig.get_connection(), default_timeout=3600)
15
 
16
  app = FastAPI(title="Video Encoding Service")
17
 
 
 
 
 
18
  @app.post("/upload")
19
  async def upload_video(file: UploadFile):
20
  job_id = str(uuid.uuid4())
21
  temp_dir = os.path.join(EncodingConfig.TEMP_DIR, job_id)
22
  os.makedirs(temp_dir, exist_ok=True)
23
-
24
  input_path = os.path.join(temp_dir, file.filename)
25
  with open(input_path, "wb") as f:
26
  f.write(await file.read())
27
-
 
28
  q.enqueue(encode_video_task, job_id, input_path)
29
-
30
  return JSONResponse(
31
  content={"job_id": job_id},
32
  status_code=status.HTTP_202_ACCEPTED
@@ -36,10 +44,11 @@ async def upload_video(file: UploadFile):
36
  async def get_status(job_id: str):
37
  redis = RedisConfig.get_connection()
38
  progress = redis.hget(f"job:{job_id}", "progress")
39
-
40
  if not progress:
 
41
  raise HTTPException(status_code=404, detail="Job not found")
42
-
43
  return {"job_id": job_id, "progress": float(progress)}
44
 
45
  @app.get("/play/{job_id}")
@@ -49,14 +58,16 @@ async def play_video(job_id: str, token: str):
49
  job_id.encode(),
50
  hashlib.sha256
51
  ).hexdigest()
52
-
53
  if not hmac.compare_digest(token, expected_token):
 
54
  raise HTTPException(status_code=403, detail="Invalid token")
55
-
56
  master_playlist = os.path.join(EncodingConfig.TEMP_DIR, job_id, f"{job_id}_master.m3u8")
57
  if not os.path.exists(master_playlist):
 
58
  raise HTTPException(status_code=404, detail="Playlist not found")
59
-
60
  return FileResponse(
61
  master_playlist,
62
  media_type="application/vnd.apple.mpegurl"
@@ -70,5 +81,6 @@ async def generate_token(job_id: str):
70
  f"{job_id}{timestamp}".encode(),
71
  hashlib.sha256
72
  ).hexdigest()
73
-
 
74
  return {"token": f"{timestamp}:{signature}"}
 
10
  from redis import from_url
11
  from ..config import EncodingConfig, RedisConfig
12
  from ..worker.tasks import encode_video_task
13
+ import logging
14
+
15
+ logger = logging.getLogger(__name__)
16
 
17
  q = Queue(connection=RedisConfig.get_connection(), default_timeout=3600)
18
 
19
  app = FastAPI(title="Video Encoding Service")
20
 
21
+ @app.on_event("startup")
22
+ async def startup_event():
23
+ logger.info("Application startup")
24
+
25
  @app.post("/upload")
26
  async def upload_video(file: UploadFile):
27
  job_id = str(uuid.uuid4())
28
  temp_dir = os.path.join(EncodingConfig.TEMP_DIR, job_id)
29
  os.makedirs(temp_dir, exist_ok=True)
30
+
31
  input_path = os.path.join(temp_dir, file.filename)
32
  with open(input_path, "wb") as f:
33
  f.write(await file.read())
34
+
35
+ logger.info(f"Enqueuing job {job_id} for file {file.filename}")
36
  q.enqueue(encode_video_task, job_id, input_path)
37
+
38
  return JSONResponse(
39
  content={"job_id": job_id},
40
  status_code=status.HTTP_202_ACCEPTED
 
44
  async def get_status(job_id: str):
45
  redis = RedisConfig.get_connection()
46
  progress = redis.hget(f"job:{job_id}", "progress")
47
+
48
  if not progress:
49
+ logger.warning(f"Job {job_id} not found")
50
  raise HTTPException(status_code=404, detail="Job not found")
51
+
52
  return {"job_id": job_id, "progress": float(progress)}
53
 
54
  @app.get("/play/{job_id}")
 
58
  job_id.encode(),
59
  hashlib.sha256
60
  ).hexdigest()
61
+
62
  if not hmac.compare_digest(token, expected_token):
63
+ logger.warning(f"Invalid token for job {job_id}")
64
  raise HTTPException(status_code=403, detail="Invalid token")
65
+
66
  master_playlist = os.path.join(EncodingConfig.TEMP_DIR, job_id, f"{job_id}_master.m3u8")
67
  if not os.path.exists(master_playlist):
68
+ logger.warning(f"Playlist not found for job {job_id}")
69
  raise HTTPException(status_code=404, detail="Playlist not found")
70
+
71
  return FileResponse(
72
  master_playlist,
73
  media_type="application/vnd.apple.mpegurl"
 
81
  f"{job_id}{timestamp}".encode(),
82
  hashlib.sha256
83
  ).hexdigest()
84
+
85
+ logger.info(f"Generated token for job {job_id}")
86
  return {"token": f"{timestamp}:{signature}"}
video_encoder/config.py CHANGED
@@ -7,8 +7,14 @@ from dotenv import load_dotenv
7
 
8
  load_dotenv()
9
 
 
10
  class EncodingConfig:
 
11
  TEMP_DIR = "./tmp"
 
 
 
 
12
 
13
  class RedisConfig:
14
  URL = os.getenv("REDIS_URL")
@@ -18,7 +24,12 @@ class RedisConfig:
18
  @classmethod
19
  def get_connection(cls):
20
  if not cls.CONN_POOL:
21
- cls.CONN_POOL = from_url(cls.URL, max_connections=20)
 
 
 
 
 
22
  return cls.CONN_POOL
23
  HMAC_SECRET = os.getenv("VIDEO_HMAC_SECRET", "default-secret-change-me")
24
 
 
7
 
8
  load_dotenv()
9
 
10
+ import logging
11
  class EncodingConfig:
12
+ DEBUG = os.getenv("DEBUG", "False") == "True"
13
  TEMP_DIR = "./tmp"
14
+ LOG_LEVEL = logging.DEBUG if DEBUG else logging.INFO
15
+
16
+ logging.basicConfig(level=EncodingConfig.LOG_LEVEL)
17
+ logger = logging.getLogger(__name__)
18
 
19
  class RedisConfig:
20
  URL = os.getenv("REDIS_URL")
 
24
  @classmethod
25
  def get_connection(cls):
26
  if not cls.CONN_POOL:
27
+ try:
28
+ cls.CONN_POOL = from_url(cls.URL, max_connections=20)
29
+ logger.info("Successfully connected to Redis")
30
+ except Exception as e:
31
+ logger.error(f"Failed to connect to Redis: {e}")
32
+ raise
33
  return cls.CONN_POOL
34
  HMAC_SECRET = os.getenv("VIDEO_HMAC_SECRET", "default-secret-change-me")
35
 
video_encoder/worker/tasks.py CHANGED
@@ -9,23 +9,25 @@ from typing import Optional
9
  from .ffmpeg import FFmpegEncoder
10
  from ..config import EncodingConfig
11
 
12
- logging.basicConfig(level=logging.INFO)
13
  logger = logging.getLogger(__name__)
14
 
15
  q = Queue(connection=RedisConfig.get_connection(), default_timeout=3600)
16
 
17
  def update_job_progress(job_id: str, progress: float):
 
18
  RedisConfig.get_connection().hset(f"job:{job_id}", "progress", progress)
19
 
20
  def encode_video_task(job_id: str, input_path: str) -> Optional[str]:
21
  try:
 
22
  output_dir = os.path.join(EncodingConfig.TEMP_DIR, job_id)
23
  encoder = FFmpegEncoder(input_path, output_dir)
24
-
25
  update_job_progress(job_id, 0.1)
26
  master_playlist = encoder.encode()
27
  update_job_progress(job_id, 1.0)
28
-
 
29
  return master_playlist
30
  except Exception as e:
31
  logger.error(f"Encoding failed for job {job_id}: {str(e)}")
 
9
  from .ffmpeg import FFmpegEncoder
10
  from ..config import EncodingConfig
11
 
 
12
  logger = logging.getLogger(__name__)
13
 
14
  q = Queue(connection=RedisConfig.get_connection(), default_timeout=3600)
15
 
16
  def update_job_progress(job_id: str, progress: float):
17
+ logger.info(f"Updating progress for job {job_id} to {progress}")
18
  RedisConfig.get_connection().hset(f"job:{job_id}", "progress", progress)
19
 
20
  def encode_video_task(job_id: str, input_path: str) -> Optional[str]:
21
  try:
22
+ logger.info(f"Starting encoding for job {job_id}")
23
  output_dir = os.path.join(EncodingConfig.TEMP_DIR, job_id)
24
  encoder = FFmpegEncoder(input_path, output_dir)
25
+
26
  update_job_progress(job_id, 0.1)
27
  master_playlist = encoder.encode()
28
  update_job_progress(job_id, 1.0)
29
+
30
+ logger.info(f"Encoding completed for job {job_id}")
31
  return master_playlist
32
  except Exception as e:
33
  logger.error(f"Encoding failed for job {job_id}: {str(e)}")