ChandimaPrabath commited on
Commit
c57dccc
·
1 Parent(s): 49e20ff

thread path

Browse files
Files changed (1) hide show
  1. app/services/encoder_service.py +27 -20
app/services/encoder_service.py CHANGED
@@ -7,9 +7,8 @@ import json
7
  from datetime import datetime
8
  import re
9
  import threading
10
- import queue
11
  import signal
12
- import time # Added for retry delays
13
 
14
  # Configure logging
15
  logging.basicConfig(level=logging.INFO)
@@ -87,13 +86,14 @@ class EncoderService:
87
  'settings': qualities
88
  }
89
 
90
- # Start encoding in a separate thread
 
91
  thread = threading.Thread(
92
  target=self._encode_video,
93
  args=(filename, job_id)
94
  )
95
- thread.daemon = True
96
  thread.start()
 
97
 
98
  return {'status': 'pending', 'job_id': job_id}
99
  except Exception as e:
@@ -111,6 +111,7 @@ class EncoderService:
111
 
112
  # Create output directory
113
  output_dir.mkdir(parents=True, exist_ok=True)
 
114
 
115
  qualities = self.jobs[job_id]['settings']
116
  total_steps = len(qualities)
@@ -121,6 +122,7 @@ class EncoderService:
121
  duration = self._get_video_duration(input_file)
122
  if not duration:
123
  raise Exception("Could not determine video duration")
 
124
 
125
  for quality, settings in qualities.items():
126
  max_retries = 3
@@ -128,7 +130,7 @@ class EncoderService:
128
  success = False
129
 
130
  while attempts < max_retries and not success:
131
- logger.info(f"Starting encoding for quality {quality}, attempt {attempts+1}")
132
  self.jobs[job_id].update({
133
  'status': 'processing',
134
  'current_quality': quality,
@@ -170,19 +172,28 @@ class EncoderService:
170
  )
171
  self.active_processes[job_id] = process
172
 
173
- # Monitor FFmpeg progress
 
174
  while True:
175
- line = process.stdout.readline()
176
- if line == '' and process.poll() is not None:
177
- break
 
178
  if line:
 
179
  progress = self._parse_ffmpeg_progress(line, duration)
180
  if progress is not None:
181
  quality_progress = ((completed_steps + progress / 100) / total_steps) * 100
182
  self.jobs[job_id]['progress'] = quality_progress
 
 
 
 
 
 
183
 
184
  if process.returncode == 0:
185
- logger.info(f"Encoding successful for quality {quality} on attempt {attempts+1}")
186
  outputs.append({
187
  'quality': quality,
188
  'path': str(output_file),
@@ -192,14 +203,14 @@ class EncoderService:
192
  completed_steps += 1
193
  else:
194
  error_output = process.stderr.read()
195
- logger.error(f"FFmpeg error on quality {quality}, attempt {attempts+1}: {error_output}")
196
  raise Exception(f"FFmpeg failed for quality {quality}")
197
 
198
  except Exception as e:
199
- logger.error(f"Error encoding {quality} on attempt {attempts+1}: {str(e)}")
200
  attempts += 1
201
  if attempts < max_retries:
202
- logger.info(f"Retrying encoding for quality {quality} (attempt {attempts+1} of {max_retries})")
203
  time.sleep(2) # Wait before retrying
204
  else:
205
  self.jobs[job_id].update({
@@ -217,9 +228,10 @@ class EncoderService:
217
  'outputs': outputs,
218
  'completion_time': datetime.now().isoformat()
219
  })
 
220
 
221
  except Exception as e:
222
- logger.error(f"Encoding failed: {str(e)}")
223
  self.jobs[job_id].update({
224
  'status': 'failed',
225
  'error': str(e)
@@ -271,22 +283,18 @@ class EncoderService:
271
  def clean_job(self, job_id):
272
  """Clean up all files related to a job"""
273
  try:
274
- # Get paths
275
  upload_path = Path(os.getenv('UPLOAD_FOLDER', 'uploads'))
276
  encoded_path = Path(os.getenv('ENCODED_FOLDER', 'encoded'))
277
 
278
- # Clean up source file if it exists
279
  if job_id in self.jobs:
280
  source_file = upload_path / self.jobs[job_id]['filename']
281
  if source_file.exists():
282
  source_file.unlink()
283
 
284
- # Clean up encoded files
285
  job_output_dir = encoded_path / job_id
286
  if job_output_dir.exists():
287
  shutil.rmtree(job_output_dir)
288
 
289
- # Remove job from jobs dict
290
  if job_id in self.jobs:
291
  del self.jobs[job_id]
292
 
@@ -305,7 +313,6 @@ class EncoderService:
305
  encoded_path = Path(os.getenv('ENCODED_FOLDER', 'encoded'))
306
  job_path = encoded_path / job_id
307
 
308
- # Get file sizes
309
  files_info = []
310
  if job_path.exists():
311
  for output in job.get('outputs', []):
@@ -322,7 +329,7 @@ class EncoderService:
322
  'files': files_info
323
  }
324
  except Exception as e:
325
- logger.error(f"Error getting job info: {str(e)}")
326
  return None
327
 
328
  encoder_service = EncoderService()
 
7
  from datetime import datetime
8
  import re
9
  import threading
 
10
  import signal
11
+ import time
12
 
13
  # Configure logging
14
  logging.basicConfig(level=logging.INFO)
 
86
  'settings': qualities
87
  }
88
 
89
+ # Start encoding in a separate thread.
90
+ # Note: Removing the daemon flag to ensure the thread completes.
91
  thread = threading.Thread(
92
  target=self._encode_video,
93
  args=(filename, job_id)
94
  )
 
95
  thread.start()
96
+ logger.info(f"Started encoding thread for job {job_id}")
97
 
98
  return {'status': 'pending', 'job_id': job_id}
99
  except Exception as e:
 
111
 
112
  # Create output directory
113
  output_dir.mkdir(parents=True, exist_ok=True)
114
+ logger.info(f"Created output directory {output_dir} for job {job_id}")
115
 
116
  qualities = self.jobs[job_id]['settings']
117
  total_steps = len(qualities)
 
122
  duration = self._get_video_duration(input_file)
123
  if not duration:
124
  raise Exception("Could not determine video duration")
125
+ logger.info(f"Video duration for job {job_id}: {duration} seconds")
126
 
127
  for quality, settings in qualities.items():
128
  max_retries = 3
 
130
  success = False
131
 
132
  while attempts < max_retries and not success:
133
+ logger.info(f"Starting encoding for quality {quality}, attempt {attempts+1} for job {job_id}")
134
  self.jobs[job_id].update({
135
  'status': 'processing',
136
  'current_quality': quality,
 
172
  )
173
  self.active_processes[job_id] = process
174
 
175
+ # Monitor FFmpeg progress with a timeout if no output in 30 seconds
176
+ last_output_time = time.time()
177
  while True:
178
+ if process.stdout.readable():
179
+ line = process.stdout.readline()
180
+ else:
181
+ line = ''
182
  if line:
183
+ last_output_time = time.time()
184
  progress = self._parse_ffmpeg_progress(line, duration)
185
  if progress is not None:
186
  quality_progress = ((completed_steps + progress / 100) / total_steps) * 100
187
  self.jobs[job_id]['progress'] = quality_progress
188
+ # If no new output for 30 seconds, break the loop and retry
189
+ if time.time() - last_output_time > 30:
190
+ logger.warning(f"No ffmpeg output for 30 seconds on quality {quality}, attempt {attempts+1} for job {job_id}")
191
+ break
192
+ if process.poll() is not None:
193
+ break
194
 
195
  if process.returncode == 0:
196
+ logger.info(f"Encoding successful for quality {quality} on attempt {attempts+1} for job {job_id}")
197
  outputs.append({
198
  'quality': quality,
199
  'path': str(output_file),
 
203
  completed_steps += 1
204
  else:
205
  error_output = process.stderr.read()
206
+ logger.error(f"FFmpeg error on quality {quality}, attempt {attempts+1} for job {job_id}: {error_output}")
207
  raise Exception(f"FFmpeg failed for quality {quality}")
208
 
209
  except Exception as e:
210
+ logger.error(f"Error encoding {quality} on attempt {attempts+1} for job {job_id}: {str(e)}")
211
  attempts += 1
212
  if attempts < max_retries:
213
+ logger.info(f"Retrying encoding for quality {quality} (attempt {attempts+1} of {max_retries}) for job {job_id}")
214
  time.sleep(2) # Wait before retrying
215
  else:
216
  self.jobs[job_id].update({
 
228
  'outputs': outputs,
229
  'completion_time': datetime.now().isoformat()
230
  })
231
+ logger.info(f"Job {job_id} completed successfully")
232
 
233
  except Exception as e:
234
+ logger.error(f"Encoding failed for job {job_id}: {str(e)}")
235
  self.jobs[job_id].update({
236
  'status': 'failed',
237
  'error': str(e)
 
283
  def clean_job(self, job_id):
284
  """Clean up all files related to a job"""
285
  try:
 
286
  upload_path = Path(os.getenv('UPLOAD_FOLDER', 'uploads'))
287
  encoded_path = Path(os.getenv('ENCODED_FOLDER', 'encoded'))
288
 
 
289
  if job_id in self.jobs:
290
  source_file = upload_path / self.jobs[job_id]['filename']
291
  if source_file.exists():
292
  source_file.unlink()
293
 
 
294
  job_output_dir = encoded_path / job_id
295
  if job_output_dir.exists():
296
  shutil.rmtree(job_output_dir)
297
 
 
298
  if job_id in self.jobs:
299
  del self.jobs[job_id]
300
 
 
313
  encoded_path = Path(os.getenv('ENCODED_FOLDER', 'encoded'))
314
  job_path = encoded_path / job_id
315
 
 
316
  files_info = []
317
  if job_path.exists():
318
  for output in job.get('outputs', []):
 
329
  'files': files_info
330
  }
331
  except Exception as e:
332
+ logger.error(f"Error getting job info for {job_id}: {str(e)}")
333
  return None
334
 
335
  encoder_service = EncoderService()