ChandimaPrabath commited on
Commit
1a6f7ad
·
1 Parent(s): d0ff525

debug beta

Browse files
app/__pycache__/__init__.cpython-310.pyc DELETED
Binary file (1.23 kB)
 
app/__pycache__/__init__.cpython-313.pyc DELETED
Binary file (1.93 kB)
 
app/__pycache__/config.cpython-310.pyc DELETED
Binary file (836 Bytes)
 
app/__pycache__/config.cpython-313.pyc DELETED
Binary file (1.22 kB)
 
app/__pycache__/error_handlers.cpython-310.pyc DELETED
Binary file (1.58 kB)
 
app/__pycache__/proxy.cpython-310.pyc DELETED
Binary file (1.9 kB)
 
app/__pycache__/proxy.cpython-313.pyc DELETED
Binary file (3.15 kB)
 
app/__pycache__/routes.cpython-310.pyc DELETED
Binary file (6 kB)
 
app/__pycache__/routes.cpython-313.pyc DELETED
Binary file (10.1 kB)
 
app/__pycache__/utils.cpython-310.pyc DELETED
Binary file (2.52 kB)
 
app/routes.py CHANGED
@@ -28,7 +28,7 @@ def files_page():
28
 
29
  @api_bp.route('/files')
30
  def list_files():
31
- """List all encoded files with their details"""
32
  try:
33
  encoded_dir = Path(current_app.config['ENCODED_FOLDER'])
34
  files = []
@@ -39,12 +39,14 @@ def list_files():
39
  job_id = job_dir.name
40
  job_info = encoder_service.get_job_info(job_id)
41
 
42
- if job_info and job_info.get('status') == 'completed':
 
43
  files.append({
44
  'job_id': job_id,
45
  'output_name': job_info.get('output_name', ''),
46
  'created_at': job_info.get('start_time'),
47
- 'completed_at': job_info.get('completion_time'),
 
48
  'qualities': {
49
  file['quality']: file['size']
50
  for file in job_info.get('files', [])
@@ -312,8 +314,7 @@ def serve_video(job_id, quality):
312
  def allowed_file(filename):
313
  """Check if the file extension is allowed"""
314
  ALLOWED_EXTENSIONS = {'mp4', 'mov', 'avi', 'mkv', 'wmv', 'flv', 'webm', '3gp','ts','m4v', 'mpg', 'mpeg'}
315
- return '.' in filename and \
316
- filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
317
 
318
  def generate_job_id():
319
  """Generate a unique job ID"""
 
28
 
29
  @api_bp.route('/files')
30
  def list_files():
31
+ """List all quality files that have been encoded (even if the overall job isn’t finished)"""
32
  try:
33
  encoded_dir = Path(current_app.config['ENCODED_FOLDER'])
34
  files = []
 
39
  job_id = job_dir.name
40
  job_info = encoder_service.get_job_info(job_id)
41
 
42
+ # If there are any quality files available, include this job.
43
+ if job_info and job_info.get('files'):
44
  files.append({
45
  'job_id': job_id,
46
  'output_name': job_info.get('output_name', ''),
47
  'created_at': job_info.get('start_time'),
48
+ 'completed_at': job_info.get('completion_time', None),
49
+ 'status': job_info.get('status'),
50
  'qualities': {
51
  file['quality']: file['size']
52
  for file in job_info.get('files', [])
 
314
  def allowed_file(filename):
315
  """Check if the file extension is allowed"""
316
  ALLOWED_EXTENSIONS = {'mp4', 'mov', 'avi', 'mkv', 'wmv', 'flv', 'webm', '3gp','ts','m4v', 'mpg', 'mpeg'}
317
+ return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
 
318
 
319
  def generate_job_id():
320
  """Generate a unique job ID"""
app/services/__pycache__/encoder_service.cpython-310.pyc DELETED
Binary file (7.33 kB)
 
app/services/__pycache__/encoder_service.cpython-313.pyc DELETED
Binary file (12.9 kB)
 
app/services/encoder_service.py CHANGED
@@ -26,7 +26,7 @@ def _read_pipe(pipe, q):
26
  class EncoderService:
27
  def __init__(self):
28
  self.jobs = {}
29
- self.threads = {} # Store encoding thread references
30
  # Optimized settings for web streaming
31
  self.default_qualities = {
32
  '480p': {
@@ -36,7 +36,7 @@ class EncoderService:
36
  'maxrate': '1500k',
37
  'bufsize': '2000k',
38
  'audio_bitrate': '128k',
39
- 'keyframe': '48', # Keyframe every 2 seconds at 24fps
40
  'preset': 'ultrafast', # Faster encoding speed
41
  'profile': 'main',
42
  'level': '3.1',
@@ -110,8 +110,64 @@ class EncoderService:
110
  logger.error(f"Failed to start encoding job: {str(e)}")
111
  return {'status': 'failed', 'error': str(e)}
112
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
113
  def _encode_video(self, filename, job_id):
114
- """Internal method to handle video encoding with retries and enhanced error handling"""
115
  try:
116
  upload_path = Path(os.getenv('UPLOAD_FOLDER', 'uploads'))
117
  encoded_path = Path(os.getenv('ENCODED_FOLDER', 'encoded'))
@@ -123,20 +179,39 @@ class EncoderService:
123
  output_dir.mkdir(parents=True, exist_ok=True)
124
  logger.info(f"Created output directory {output_dir} for job {job_id}")
125
 
 
 
 
 
 
 
 
 
 
 
 
126
  qualities = self.jobs[job_id]['settings']
127
- total_steps = len(qualities)
128
- completed_steps = 0
129
  outputs = []
 
 
 
 
 
 
 
 
 
130
 
131
- # Get video duration
132
- duration = self._get_video_duration(input_file)
133
  if not duration:
134
  raise Exception("Could not determine video duration")
135
- logger.info(f"Video duration for job {job_id}: {duration} seconds")
 
136
 
137
  for quality, settings in qualities.items():
138
  max_retries = 3
139
  attempts = 0
 
140
  success = False
141
 
142
  while attempts < max_retries and not success:
@@ -144,99 +219,135 @@ class EncoderService:
144
  self.jobs[job_id].update({
145
  'status': 'processing',
146
  'current_quality': quality,
147
- 'progress': (completed_steps / total_steps) * 100
148
  })
149
- output_file = output_dir / f"{output_name}_{quality}.mp4"
150
-
151
- # Build the FFmpeg command
152
- cmd = [
153
- 'ffmpeg', '-y',
154
- '-i', str(input_file),
155
- '-c:v', 'libx264',
156
- '-preset', settings['preset'],
157
- '-profile:v', settings['profile'],
158
- '-level', settings['level'],
159
- '-tune', settings['tune'],
160
- '-b:v', settings['bitrate'],
161
- '-maxrate', settings['maxrate'],
162
- '-bufsize', settings['bufsize'],
163
- '-vf', f"scale={settings['width']}:{settings['height']}",
164
- '-g', settings['keyframe'],
165
- '-keyint_min', settings['keyframe'],
166
- '-sc_threshold', '0',
167
- '-c:a', 'aac',
168
- '-b:a', settings['audio_bitrate'],
169
- '-ar', '48000',
170
- '-ac', '2',
171
- '-movflags', '+faststart',
172
- '-progress', 'pipe:1',
173
- str(output_file)
174
- ]
175
-
176
- try:
177
- process = subprocess.Popen(
178
- cmd,
179
- stdout=subprocess.PIPE,
180
- stderr=subprocess.PIPE,
181
- universal_newlines=True
182
- )
183
- self.active_processes[job_id] = process
184
-
185
- # Use a queue and separate thread to read stdout
186
- q = queue.Queue()
187
- reader_thread = threading.Thread(target=_read_pipe, args=(process.stdout, q))
188
- reader_thread.daemon = True
189
- reader_thread.start()
190
-
191
- last_output_time = time.time()
192
- while True:
193
- try:
194
- # Wait up to 5 seconds for a line
195
- line = q.get(timeout=5)
196
- last_output_time = time.time()
197
- prog = self._parse_ffmpeg_progress(line, duration)
198
- if prog is not None:
199
- quality_progress = ((completed_steps + prog / 100) / total_steps) * 100
200
- self.jobs[job_id]['progress'] = quality_progress
201
- except queue.Empty:
202
- # If no output in 5 seconds, check if process is still alive
203
- if time.time() - last_output_time > 30:
204
- logger.warning(f"No ffmpeg output for 30 seconds on quality {quality}, attempt {attempts+1} for job {job_id}")
 
 
 
 
 
 
 
205
  break
206
- if process.poll() is not None:
207
- break
 
 
 
 
 
 
 
 
 
208
 
209
- reader_thread.join(timeout=5)
210
- # Check if process completed successfully
211
- if process.returncode == 0:
212
- logger.info(f"Encoding successful for quality {quality} on attempt {attempts+1} for job {job_id}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
213
  outputs.append({
214
  'quality': quality,
215
  'path': str(output_file),
216
  'settings': settings
217
  })
218
- success = True
219
- completed_steps += 1
220
- else:
221
- error_output = process.stderr.read()
222
- logger.error(f"FFmpeg error on quality {quality}, attempt {attempts+1} for job {job_id}: {error_output}")
223
- raise Exception(f"FFmpeg failed for quality {quality}")
224
-
225
- except Exception as e:
226
- logger.error(f"Error encoding {quality} on attempt {attempts+1} for job {job_id}: {str(e)}")
227
- attempts += 1
228
- if attempts < max_retries:
229
- logger.info(f"Retrying encoding for quality {quality} (attempt {attempts+1} of {max_retries}) for job {job_id}")
230
- time.sleep(2) # Wait before retrying
231
- else:
232
  self.jobs[job_id].update({
233
  'status': 'failed',
234
- 'error': f"Failed encoding {quality} after {max_retries} attempts: {str(e)}"
235
  })
236
- return
237
- finally:
238
- if job_id in self.active_processes:
239
- del self.active_processes[job_id]
 
 
 
 
 
 
 
 
 
 
 
 
240
 
241
  self.jobs[job_id].update({
242
  'status': 'completed',
@@ -246,6 +357,7 @@ class EncoderService:
246
  })
247
  logger.info(f"Job {job_id} completed successfully")
248
 
 
249
  except Exception as e:
250
  logger.error(f"Encoding failed for job {job_id}: {str(e)}")
251
  self.jobs[job_id].update({
@@ -253,10 +365,14 @@ class EncoderService:
253
  'error': str(e)
254
  })
255
  finally:
 
 
 
256
  # Remove thread reference once job completes/fails
257
  if job_id in self.threads:
258
  del self.threads[job_id]
259
 
 
260
  def _get_video_duration(self, input_file):
261
  """Get video duration using FFprobe"""
262
  try:
@@ -305,7 +421,7 @@ class EncoderService:
305
  try:
306
  upload_path = Path(os.getenv('UPLOAD_FOLDER', 'uploads'))
307
  encoded_path = Path(os.getenv('ENCODED_FOLDER', 'encoded'))
308
-
309
  if job_id in self.jobs:
310
  source_file = upload_path / self.jobs[job_id]['filename']
311
  if source_file.exists():
@@ -328,7 +444,7 @@ class EncoderService:
328
  try:
329
  if job_id not in self.jobs:
330
  return None
331
-
332
  job = self.jobs[job_id]
333
  encoded_path = Path(os.getenv('ENCODED_FOLDER', 'encoded'))
334
  job_path = encoded_path / job_id
@@ -352,4 +468,4 @@ class EncoderService:
352
  logger.error(f"Error getting job info for {job_id}: {str(e)}")
353
  return None
354
 
355
- encoder_service = EncoderService()
 
26
  class EncoderService:
27
  def __init__(self):
28
  self.jobs = {}
29
+ self.threads = {} # Store encoding thread references
30
  # Optimized settings for web streaming
31
  self.default_qualities = {
32
  '480p': {
 
36
  'maxrate': '1500k',
37
  'bufsize': '2000k',
38
  'audio_bitrate': '128k',
39
+ 'keyframe': '48', # Keyframe every 2 seconds at 24fps
40
  'preset': 'ultrafast', # Faster encoding speed
41
  'profile': 'main',
42
  'level': '3.1',
 
110
  logger.error(f"Failed to start encoding job: {str(e)}")
111
  return {'status': 'failed', 'error': str(e)}
112
 
113
+ def _split_video(self, input_file, split_dir, segment_duration=300): # 5 minutes = 300 seconds
114
+ """Splits the video into segments using ffmpeg."""
115
+ try:
116
+ split_cmd = [
117
+ 'ffmpeg',
118
+ '-i', str(input_file),
119
+ '-c', 'copy',
120
+ '-map', '0',
121
+ '-segment_time', str(segment_duration),
122
+ '-f', 'segment',
123
+ '-segment_format', 'mp4',
124
+ '-reset_timestamps', '1', # Important to reset timestamps for each segment
125
+ str(split_dir / 'segment_%04d.mp4') # Using 4 digits for segment numbering
126
+ ]
127
+ logger.info(f"Splitting video with command: {' '.join(split_cmd)}")
128
+ subprocess.run(split_cmd, check=True, capture_output=True)
129
+ segments = sorted([split_dir / f for f in os.listdir(split_dir) if f.startswith('segment_') and f.endswith('.mp4')])
130
+ return segments
131
+ except subprocess.CalledProcessError as e:
132
+ logger.error(f"Error splitting video: {e.stderr.decode()}")
133
+ raise Exception(f"FFmpeg split failed: {e}")
134
+ except Exception as e:
135
+ logger.error(f"Error splitting video: {str(e)}")
136
+ raise
137
+
138
+ def _stitch_segments(self, segment_files, output_file):
139
+ """Stitches video segments back together using ffmpeg."""
140
+ try:
141
+ # Create a list file for concat demuxer
142
+ list_file_path = output_file.parent / 'segment_list.txt'
143
+ with open(list_file_path, 'w') as f:
144
+ for segment in segment_files:
145
+ f.write(f"file '{segment}'\n")
146
+
147
+ stitch_cmd = [
148
+ 'ffmpeg',
149
+ '-f', 'concat',
150
+ '-safe', '0', # Set to 0 if paths are relative, which they are in temp dir
151
+ '-i', str(list_file_path),
152
+ '-c', 'copy',
153
+ str(output_file)
154
+ ]
155
+ logger.info(f"Stitching video with command: {' '.join(stitch_cmd)}")
156
+ subprocess.run(stitch_cmd, check=True, capture_output=True)
157
+
158
+ # Clean up list file
159
+ if list_file_path.exists():
160
+ list_file_path.unlink()
161
+
162
+ except subprocess.CalledProcessError as e:
163
+ logger.error(f"Error stitching video: {e.stderr.decode()}")
164
+ raise Exception(f"FFmpeg stitch failed: {e}")
165
+ except Exception as e:
166
+ logger.error(f"Error stitching video: {str(e)}")
167
+ raise
168
+
169
  def _encode_video(self, filename, job_id):
170
+ """Internal method to handle video encoding with splitting, encoding segments, and stitching."""
171
  try:
172
  upload_path = Path(os.getenv('UPLOAD_FOLDER', 'uploads'))
173
  encoded_path = Path(os.getenv('ENCODED_FOLDER', 'encoded'))
 
179
  output_dir.mkdir(parents=True, exist_ok=True)
180
  logger.info(f"Created output directory {output_dir} for job {job_id}")
181
 
182
+ # Create a temporary directory for video segments
183
+ split_dir = output_dir / 'segments'
184
+ split_dir.mkdir(exist_ok=True)
185
+ segment_files = []
186
+
187
+ try:
188
+ segment_files = self._split_video(input_file, split_dir)
189
+ logger.info(f"Video split into {len(segment_files)} segments for job {job_id}")
190
+ except Exception as split_err:
191
+ raise Exception(f"Video splitting failed: {split_err}")
192
+
193
  qualities = self.jobs[job_id]['settings']
194
+ total_qualities = len(qualities)
 
195
  outputs = []
196
+ global_progress_steps = total_qualities * len(segment_files) + total_qualities # segments encoding + stitching
197
+ completed_global_steps = 0
198
+
199
+ # Get video duration (using the first segment for approximation, or original file if needed)
200
+ duration = 0
201
+ if segment_files:
202
+ duration = self._get_video_duration(segment_files[0]) # Using first segment duration
203
+ else: # Fallback to original video if no segments (unlikely, but for robustness)
204
+ duration = self._get_video_duration(input_file)
205
 
 
 
206
  if not duration:
207
  raise Exception("Could not determine video duration")
208
+ logger.info(f"Video duration (using segment or original) for job {job_id}: {duration} seconds")
209
+
210
 
211
  for quality, settings in qualities.items():
212
  max_retries = 3
213
  attempts = 0
214
+ encoded_segments_for_quality = []
215
  success = False
216
 
217
  while attempts < max_retries and not success:
 
219
  self.jobs[job_id].update({
220
  'status': 'processing',
221
  'current_quality': quality,
222
+ 'progress': (completed_global_steps / global_progress_steps) * 100
223
  })
224
+
225
+ segment_output_files = []
226
+ segment_success = True # Track if all segments for current quality encoded successfully
227
+ completed_steps = 0
228
+
229
+ for segment_input_file in segment_files:
230
+ segment_output_file = split_dir / f"{segment_input_file.stem}_{quality}.mp4" # Output segment in split_dir
231
+ segment_output_files.append(segment_output_file)
232
+
233
+ cmd = [
234
+ 'ffmpeg', '-y',
235
+ '-i', str(segment_input_file),
236
+ '-c:v', 'libx264',
237
+ '-preset', settings['preset'],
238
+ '-profile:v', settings['profile'],
239
+ '-level', settings['level'],
240
+ '-tune', settings['tune'],
241
+ '-b:v', settings['bitrate'],
242
+ '-maxrate', settings['maxrate'],
243
+ '-bufsize', settings['bufsize'],
244
+ '-vf', f"scale={settings['width']}:{settings['height']}",
245
+ '-g', settings['keyframe'],
246
+ '-keyint_min', settings['keyframe'],
247
+ '-sc_threshold', '0',
248
+ '-c:a', 'aac',
249
+ '-b:a', settings['audio_bitrate'],
250
+ '-ar', '48000',
251
+ '-ac', '2',
252
+ '-movflags', '+faststart',
253
+ '-progress', 'pipe:1',
254
+ str(segment_output_file)
255
+ ]
256
+
257
+ try:
258
+ process = subprocess.Popen(
259
+ cmd,
260
+ stdout=subprocess.PIPE,
261
+ stderr=subprocess.PIPE,
262
+ universal_newlines=True
263
+ )
264
+ self.active_processes[job_id] = process
265
+
266
+ q = queue.Queue()
267
+ reader_thread = threading.Thread(target=_read_pipe, args=(process.stdout, q))
268
+ reader_thread.daemon = True
269
+ reader_thread.start()
270
+
271
+ last_output_time = time.time()
272
+ segment_progress = 0
273
+ while True:
274
+ try:
275
+ line = q.get(timeout=5)
276
+ last_output_time = time.time()
277
+ prog = self._parse_ffmpeg_progress(line, duration) # Using segment duration for progress
278
+ if prog is not None:
279
+ segment_progress = prog
280
+ quality_progress = ((completed_global_steps + (completed_steps * 100 + prog) / 100 ) / global_progress_steps) * 100 # Approximation
281
+ self.jobs[job_id]['progress'] = quality_progress # Update overall job progress - this is rough, needs refinement for segments
282
+ except queue.Empty:
283
+ if time.time() - last_output_time > 30:
284
+ logger.warning(f"No ffmpeg output for 30 seconds on quality {quality}, segment {segment_input_file.name}, attempt {attempts+1} for job {job_id}")
285
+ break
286
+ if process.poll() is not None:
287
  break
288
+ reader_thread.join(timeout=5)
289
+
290
+ if process.returncode == 0:
291
+ logger.info(f"Encoding successful for quality {quality}, segment {segment_input_file.name} on attempt {attempts+1} for job {job_id}")
292
+ completed_global_steps += 1 # Increment for each successful segment encoding
293
+ encoded_segments_for_quality.append(segment_output_file)
294
+ else:
295
+ error_output = process.stderr.read()
296
+ logger.error(f"FFmpeg error on quality {quality}, segment {segment_input_file.name}, attempt {attempts+1} for job {job_id}: {error_output}")
297
+ segment_success = False # Indicate segment encoding failure
298
+ raise Exception(f"FFmpeg failed for quality {quality}, segment {segment_input_file.name}")
299
 
300
+ except Exception as e:
301
+ logger.error(f"Error encoding segment {segment_input_file.name} for quality {quality} on attempt {attempts+1} for job {job_id}: {str(e)}")
302
+ segment_success = False # Ensure segment_success is False if exception occurs
303
+ attempts += 1
304
+ if attempts < max_retries:
305
+ logger.info(f"Retrying encoding for quality {quality}, segment {segment_input_file.name} (attempt {attempts+2} of {max_retries}) for job {job_id}")
306
+ time.sleep(2)
307
+ else:
308
+ break # Break out of attempts loop for current quality if max retries reached
309
+ finally:
310
+ if job_id in self.active_processes:
311
+ del self.active_processes[job_id]
312
+ if not segment_success: # If any segment failed, break segment loop and retry/fail quality
313
+ break
314
+ if segment_success: # If all segments for current quality were successful
315
+ success = True # Mark quality encoding as successful
316
+ completed_global_steps += 1 # Increment for successful quality stitching step
317
+
318
+ # Stitch segments for current quality
319
+ output_file = output_dir / f"{output_name}_{quality}.mp4"
320
+ try:
321
+ self._stitch_segments(encoded_segments_for_quality, output_file)
322
  outputs.append({
323
  'quality': quality,
324
  'path': str(output_file),
325
  'settings': settings
326
  })
327
+ logger.info(f"Stitching successful for quality {quality} for job {job_id}")
328
+
329
+ except Exception as stitch_err:
330
+ logger.error(f"Stitching failed for quality {quality} for job {job_id}: {stitch_err}")
 
 
 
 
 
 
 
 
 
 
331
  self.jobs[job_id].update({
332
  'status': 'failed',
333
+ 'error': f"Stitching failed for quality {quality}: {str(stitch_err)}"
334
  })
335
+ return # Early return if stitching fails, consider if retry for stitching needed
336
+
337
+ # Clean up encoded segments for this quality (optional, but good to clean up split_dir)
338
+ for seg_file in encoded_segments_for_quality:
339
+ if seg_file.exists():
340
+ seg_file.unlink()
341
+
342
+ self.jobs[job_id]['progress'] = (completed_global_steps / global_progress_steps) * 100 # Final progress for this quality
343
+
344
+
345
+ if not success: # After max retries for a quality
346
+ self.jobs[job_id].update({
347
+ 'status': 'failed',
348
+ 'error': f"Failed encoding quality {quality} after {max_retries} attempts"
349
+ })
350
+ return # Early return if quality encoding fails after retries
351
 
352
  self.jobs[job_id].update({
353
  'status': 'completed',
 
357
  })
358
  logger.info(f"Job {job_id} completed successfully")
359
 
360
+
361
  except Exception as e:
362
  logger.error(f"Encoding failed for job {job_id}: {str(e)}")
363
  self.jobs[job_id].update({
 
365
  'error': str(e)
366
  })
367
  finally:
368
+ # Clean up split directory and segments after processing is done (success or fail)
369
+ if split_dir.exists():
370
+ shutil.rmtree(split_dir)
371
  # Remove thread reference once job completes/fails
372
  if job_id in self.threads:
373
  del self.threads[job_id]
374
 
375
+
376
  def _get_video_duration(self, input_file):
377
  """Get video duration using FFprobe"""
378
  try:
 
421
  try:
422
  upload_path = Path(os.getenv('UPLOAD_FOLDER', 'uploads'))
423
  encoded_path = Path(os.getenv('ENCODED_FOLDER', 'encoded'))
424
+
425
  if job_id in self.jobs:
426
  source_file = upload_path / self.jobs[job_id]['filename']
427
  if source_file.exists():
 
444
  try:
445
  if job_id not in self.jobs:
446
  return None
447
+
448
  job = self.jobs[job_id]
449
  encoded_path = Path(os.getenv('ENCODED_FOLDER', 'encoded'))
450
  job_path = encoded_path / job_id
 
468
  logger.error(f"Error getting job info for {job_id}: {str(e)}")
469
  return None
470
 
471
+ encoder_service = EncoderService()