File size: 21,503 Bytes
e2c1f2e
 
 
 
 
 
 
 
 
c57dccc
d0ff525
e2c1f2e
 
 
 
 
d0ff525
 
 
 
 
 
 
 
 
e2c1f2e
 
 
1a6f7ad
e2c1f2e
 
 
 
 
 
0a7175e
e2c1f2e
 
0a7175e
 
e2c1f2e
 
 
 
 
 
 
 
0a7175e
e2c1f2e
0a7175e
e2c1f2e
0a7175e
e2c1f2e
 
 
 
 
 
 
 
0a7175e
e2c1f2e
 
 
0a7175e
e2c1f2e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d31b48e
e2c1f2e
 
 
 
d31b48e
e2c1f2e
c57dccc
e2c1f2e
 
 
 
 
 
a5c8684
1a6f7ad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e2c1f2e
1a6f7ad
e2c1f2e
 
 
 
 
 
 
 
 
c57dccc
e2c1f2e
1a6f7ad
 
 
 
 
 
 
 
 
 
 
e2c1f2e
1a6f7ad
e2c1f2e
1a6f7ad
 
 
 
 
 
 
 
 
e2c1f2e
 
 
1a6f7ad
 
e2c1f2e
 
49e20ff
 
1a6f7ad
49e20ff
 
 
c57dccc
e2c1f2e
 
 
1a6f7ad
e2c1f2e
1a6f7ad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d0ff525
1a6f7ad
 
 
 
 
 
 
 
 
 
 
49e20ff
1a6f7ad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49e20ff
 
 
 
 
1a6f7ad
 
 
 
49e20ff
 
1a6f7ad
49e20ff
1a6f7ad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e2c1f2e
 
 
 
 
 
 
c57dccc
e2c1f2e
1a6f7ad
e2c1f2e
c57dccc
e2c1f2e
 
 
 
d31b48e
1a6f7ad
 
 
d31b48e
 
 
e2c1f2e
1a6f7ad
e2c1f2e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1a6f7ad
e2c1f2e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1a6f7ad
e2c1f2e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c57dccc
e2c1f2e
 
1a6f7ad
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
import subprocess
import os
import shutil
from pathlib import Path
import logging
import json
from datetime import datetime
import threading
import signal
import time
import queue

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def _read_pipe(pipe, q):
    """Read lines from a pipe and put them into a queue."""
    try:
        with pipe:
            for line in iter(pipe.readline, ''):
                q.put(line)
    except Exception as e:
        logger.error(f"Error reading pipe: {str(e)}")

class EncoderService:
    def __init__(self):
        self.jobs = {}
        self.threads = {}   # Store encoding thread references
        # Optimized settings for web streaming
        self.default_qualities = {
            '480p': {
                'width': 854,
                'height': 480,
                'bitrate': '1000k',
                'maxrate': '1200k',  # Slightly tighter maxrate
                'bufsize': '2000k',
                'audio_bitrate': '128k',
                'keyframe': '48',
                'preset': 'medium',  # Consider medium for faster encoding
                'profile': 'main',
                'level': '3.1',
                'tune': 'fastdecode'
            },
            '720p': {
                'width': 1280,
                'height': 720,
                'bitrate': '2500k',
                'maxrate': '2800k',  # Slightly tighter maxrate
                'bufsize': '4000k',
                'audio_bitrate': '160k', # Increased audio bitrate
                'keyframe': '48',
                'preset': 'medium',  # Consider medium for faster encoding
                'profile': 'main',
                'level': '3.1',
                'tune': 'fastdecode'
            },
            '1080p': {
                'width': 1920,
                'height': 1080,
                'bitrate': '5000k',
                'maxrate': '5500k',  # Slightly tighter maxrate
                'bufsize': '8000k',
                'audio_bitrate': '192k',
                'keyframe': '48',
                'preset': 'medium',  # Consider medium for faster encoding
                'profile': 'high',
                'level': '4.0',
                'tune': 'fastdecode'
            }
        }
        self.active_processes = {}

    def start_encode_job(self, filename, job_id, output_name=None, settings=None):
        """Start an encoding job with optional custom settings and output name"""
        try:
            qualities = self.default_qualities.copy()
            if settings:
                settings_dict = json.loads(settings)
                for quality, bitrate in settings_dict.items():
                    if quality in qualities and bitrate:
                        qualities[quality]['bitrate'] = bitrate
                        # Adjust maxrate and bufsize based on new bitrate
                        bitrate_value = int(bitrate.replace('k', ''))
                        qualities[quality]['maxrate'] = f"{int(bitrate_value * 1.5)}k"
                        qualities[quality]['bufsize'] = f"{int(bitrate_value * 2)}k"

            self.jobs[job_id] = {
                'status': 'pending',
                'progress': 0,
                'start_time': datetime.now().isoformat(),
                'filename': filename,
                'output_name': output_name or os.path.splitext(filename)[0],
                'current_quality': None,
                'outputs': [],
                'settings': qualities
            }

            # Start encoding in a separate thread and store the thread reference
            thread = threading.Thread(
                target=self._encode_video,
                args=(filename, job_id)
            )
            self.threads[job_id] = thread
            thread.start()
            logger.info(f"Started encoding thread for job {job_id}")

            return {'status': 'pending', 'job_id': job_id}
        except Exception as e:
            logger.error(f"Failed to start encoding job: {str(e)}")
            return {'status': 'failed', 'error': str(e)}

    def _split_video(self, input_file, split_dir, segment_duration=60): # 5 minutes = 300 seconds
        """Splits the video into segments using ffmpeg."""
        try:
            split_cmd = [
                'ffmpeg',
                '-i', str(input_file),
                '-c', 'copy',
                '-map', '0',
                '-segment_time', str(segment_duration),
                '-f', 'segment',
                '-segment_format', 'mp4',
                '-reset_timestamps', '1', # Important to reset timestamps for each segment
                str(split_dir / 'segment_%04d.mp4') # Using 4 digits for segment numbering
            ]
            logger.info(f"Splitting video with command: {' '.join(split_cmd)}")
            subprocess.run(split_cmd, check=True, capture_output=True)
            segments = sorted([split_dir / f for f in os.listdir(split_dir) if f.startswith('segment_') and f.endswith('.mp4')])
            return segments
        except subprocess.CalledProcessError as e:
            logger.error(f"Error splitting video: {e.stderr.decode()}")
            raise Exception(f"FFmpeg split failed: {e}")
        except Exception as e:
            logger.error(f"Error splitting video: {str(e)}")
            raise

    def _stitch_segments(self, segment_files, output_file):
        """Stitches video segments back together using ffmpeg."""
        try:
            # Create a list file for concat demuxer
            list_file_path = output_file.parent / 'segment_list.txt'
            with open(list_file_path, 'w') as f:
                for segment in segment_files:
                    f.write(f"file '{segment}'\n")

            stitch_cmd = [
                'ffmpeg',
                '-f', 'concat',
                '-safe', '0', # Set to 0 if paths are relative, which they are in temp dir
                '-i', str(list_file_path),
                '-c', 'copy',
                str(output_file)
            ]
            logger.info(f"Stitching video with command: {' '.join(stitch_cmd)}")
            subprocess.run(stitch_cmd, check=True, capture_output=True)

            # Clean up list file
            if list_file_path.exists():
                list_file_path.unlink()

        except subprocess.CalledProcessError as e:
            logger.error(f"Error stitching video: {e.stderr.decode()}")
            raise Exception(f"FFmpeg stitch failed: {e}")
        except Exception as e:
            logger.error(f"Error stitching video: {str(e)}")
            raise

    def _encode_video(self, filename, job_id):
        """Internal method to handle video encoding with splitting, encoding segments, and stitching."""
        try:
            upload_path = Path(os.getenv('UPLOAD_FOLDER', 'uploads'))
            encoded_path = Path(os.getenv('ENCODED_FOLDER', 'encoded'))
            input_file = upload_path / filename
            output_dir = encoded_path / job_id
            output_name = self.jobs[job_id]['output_name']

            # Create output directory
            output_dir.mkdir(parents=True, exist_ok=True)
            logger.info(f"Created output directory {output_dir} for job {job_id}")

            # Create a temporary directory for video segments
            split_dir = output_dir / 'segments'
            split_dir.mkdir(exist_ok=True)
            segment_files = []

            try:
                segment_files = self._split_video(input_file, split_dir)
                logger.info(f"Video split into {len(segment_files)} segments for job {job_id}")
            except Exception as split_err:
                raise Exception(f"Video splitting failed: {split_err}")

            qualities = self.jobs[job_id]['settings']
            total_qualities = len(qualities)
            outputs = []
            global_progress_steps = total_qualities * len(segment_files) + total_qualities # segments encoding + stitching
            completed_global_steps = 0

            # Get video duration (using the first segment for approximation, or original file if needed)
            duration = 0
            if segment_files:
                duration = self._get_video_duration(segment_files[0]) # Using first segment duration
            else: # Fallback to original video if no segments (unlikely, but for robustness)
                duration = self._get_video_duration(input_file)

            if not duration:
                raise Exception("Could not determine video duration")
            logger.info(f"Video duration (using segment or original) for job {job_id}: {duration} seconds")


            for quality, settings in qualities.items():
                max_retries = 3
                attempts = 0
                encoded_segments_for_quality = []
                success = False

                while attempts < max_retries and not success:
                    logger.info(f"Starting encoding for quality {quality}, attempt {attempts+1} for job {job_id}")
                    self.jobs[job_id].update({
                        'status': 'processing',
                        'current_quality': quality,
                        'progress': (completed_global_steps / global_progress_steps) * 100
                    })

                    segment_output_files = []
                    segment_success = True # Track if all segments for current quality encoded successfully
                    completed_steps = 0

                    for segment_input_file in segment_files:
                        segment_output_file = split_dir / f"{segment_input_file.stem}_{quality}.mp4" # Output segment in split_dir
                        segment_output_files.append(segment_output_file)

                        cmd = [
                            'ffmpeg', '-y',
                            '-i', str(segment_input_file),
                            '-c:v', 'libx264',
                            '-preset', settings['preset'],
                            '-profile:v', settings['profile'],
                            '-level', settings['level'],
                            '-tune', settings['tune'],
                            '-b:v', settings['bitrate'],
                            '-maxrate', settings['maxrate'],
                            '-bufsize', settings['bufsize'],
                            '-vf', f"scale={settings['width']}:{settings['height']}",
                            '-g', settings['keyframe'],
                            '-keyint_min', settings['keyframe'],
                            '-sc_threshold', '0',
                            '-c:a', 'aac',
                            '-b:a', settings['audio_bitrate'],
                            '-ar', '48000',
                            '-ac', '2',
                            '-movflags', '+faststart',
                            '-progress', 'pipe:1',
                            str(segment_output_file)
                        ]

                        try:
                            process = subprocess.Popen(
                                cmd,
                                stdout=subprocess.PIPE,
                                stderr=subprocess.PIPE,
                                universal_newlines=True
                            )
                            self.active_processes[job_id] = process

                            q = queue.Queue()
                            reader_thread = threading.Thread(target=_read_pipe, args=(process.stdout, q))
                            reader_thread.daemon = True
                            reader_thread.start()

                            last_output_time = time.time()
                            segment_progress = 0
                            while True:
                                try:
                                    line = q.get(timeout=5)
                                    last_output_time = time.time()
                                    prog = self._parse_ffmpeg_progress(line, duration) # Using segment duration for progress
                                    if prog is not None:
                                        segment_progress = prog
                                        quality_progress = ((completed_global_steps + (completed_steps * 100 + prog) / 100 ) / global_progress_steps) * 100 # Approximation
                                        self.jobs[job_id]['progress'] = quality_progress # Update overall job progress - this is rough, needs refinement for segments
                                except queue.Empty:
                                    if time.time() - last_output_time > 30:
                                        logger.warning(f"No ffmpeg output for 30 seconds on quality {quality}, segment {segment_input_file.name}, attempt {attempts+1} for job {job_id}")
                                        break
                                if process.poll() is not None:
                                    break
                            reader_thread.join(timeout=5)

                            if process.returncode == 0:
                                logger.info(f"Encoding successful for quality {quality}, segment {segment_input_file.name} on attempt {attempts+1} for job {job_id}")
                                completed_global_steps += 1 # Increment for each successful segment encoding
                                encoded_segments_for_quality.append(segment_output_file)
                            else:
                                error_output = process.stderr.read()
                                logger.error(f"FFmpeg error on quality {quality}, segment {segment_input_file.name}, attempt {attempts+1} for job {job_id}: {error_output}")
                                segment_success = False # Indicate segment encoding failure
                                raise Exception(f"FFmpeg failed for quality {quality}, segment {segment_input_file.name}")

                        except Exception as e:
                            logger.error(f"Error encoding segment {segment_input_file.name} for quality {quality} on attempt {attempts+1} for job {job_id}: {str(e)}")
                            segment_success = False # Ensure segment_success is False if exception occurs
                            attempts += 1
                            if attempts < max_retries:
                                logger.info(f"Retrying encoding for quality {quality}, segment {segment_input_file.name} (attempt {attempts+2} of {max_retries}) for job {job_id}")
                                time.sleep(2)
                            else:
                                break # Break out of attempts loop for current quality if max retries reached
                        finally:
                            if job_id in self.active_processes:
                                del self.active_processes[job_id]
                        if not segment_success: # If any segment failed, break segment loop and retry/fail quality
                            break
                    if segment_success: # If all segments for current quality were successful
                        success = True # Mark quality encoding as successful
                        completed_global_steps += 1 # Increment for successful quality stitching step

                        # Stitch segments for current quality
                        output_file = output_dir / f"{output_name}_{quality}.mp4"
                        try:
                            self._stitch_segments(encoded_segments_for_quality, output_file)
                            outputs.append({
                                'quality': quality,
                                'path': str(output_file),
                                'settings': settings
                            })
                            logger.info(f"Stitching successful for quality {quality} for job {job_id}")

                        except Exception as stitch_err:
                            logger.error(f"Stitching failed for quality {quality} for job {job_id}: {stitch_err}")
                            self.jobs[job_id].update({
                                'status': 'failed',
                                'error': f"Stitching failed for quality {quality}: {str(stitch_err)}"
                            })
                            return # Early return if stitching fails, consider if retry for stitching needed

                        # Clean up encoded segments for this quality (optional, but good to clean up split_dir)
                        for seg_file in encoded_segments_for_quality:
                            if seg_file.exists():
                                seg_file.unlink()

                        self.jobs[job_id]['progress'] = (completed_global_steps / global_progress_steps) * 100 # Final progress for this quality


                if not success: # After max retries for a quality
                    self.jobs[job_id].update({
                        'status': 'failed',
                        'error': f"Failed encoding quality {quality} after {max_retries} attempts"
                    })
                    return # Early return if quality encoding fails after retries

            self.jobs[job_id].update({
                'status': 'completed',
                'progress': 100,
                'outputs': outputs,
                'completion_time': datetime.now().isoformat()
            })
            logger.info(f"Job {job_id} completed successfully")


        except Exception as e:
            logger.error(f"Encoding failed for job {job_id}: {str(e)}")
            self.jobs[job_id].update({
                'status': 'failed',
                'error': str(e)
            })
        finally:
            # Clean up split directory and segments after processing is done (success or fail)
            if split_dir.exists():
                shutil.rmtree(split_dir)
            # Remove thread reference once job completes/fails
            if job_id in self.threads:
                del self.threads[job_id]


    def _get_video_duration(self, input_file):
        """Get video duration using FFprobe"""
        try:
            cmd = [
                'ffprobe',
                '-v', 'error',
                '-show_entries', 'format=duration',
                '-of', 'json',
                str(input_file)
            ]
            result = subprocess.run(cmd, capture_output=True, text=True)
            data = json.loads(result.stdout)
            return float(data['format']['duration'])
        except Exception as e:
            logger.error(f"Error getting video duration: {str(e)}")
            return None

    def _parse_ffmpeg_progress(self, output, duration):
        """Parse FFmpeg progress output"""
        try:
            if 'out_time_ms=' in output:
                time_ms = int(output.split('out_time_ms=')[1].strip())
                progress = (time_ms / 1000000) / duration * 100
                return min(100, max(0, progress))
            return None
        except Exception:
            return None

    def stop_job(self, job_id):
        """Stop an encoding job"""
        try:
            if job_id in self.active_processes:
                process = self.active_processes[job_id]
                process.send_signal(signal.SIGTERM)
                process.wait(timeout=5)
                del self.active_processes[job_id]
                self.jobs[job_id]['status'] = 'stopped'
                return True
            return False
        except Exception as e:
            logger.error(f"Error stopping job {job_id}: {str(e)}")
            return False

    def clean_job(self, job_id):
        """Clean up all files related to a job"""
        try:
            upload_path = Path(os.getenv('UPLOAD_FOLDER', 'uploads'))
            encoded_path = Path(os.getenv('ENCODED_FOLDER', 'encoded'))

            if job_id in self.jobs:
                source_file = upload_path / self.jobs[job_id]['filename']
                if source_file.exists():
                    source_file.unlink()

            job_output_dir = encoded_path / job_id
            if job_output_dir.exists():
                shutil.rmtree(job_output_dir)

            if job_id in self.jobs:
                del self.jobs[job_id]

            return True
        except Exception as e:
            logger.error(f"Error cleaning job {job_id}: {str(e)}")
            return False

    def get_job_info(self, job_id):
        """Get detailed information about a job"""
        try:
            if job_id not in self.jobs:
                return None

            job = self.jobs[job_id]
            encoded_path = Path(os.getenv('ENCODED_FOLDER', 'encoded'))
            job_path = encoded_path / job_id

            files_info = []
            if job_path.exists():
                for output in job.get('outputs', []):
                    file_path = Path(output['path'])
                    if file_path.exists():
                        files_info.append({
                            'quality': output['quality'],
                            'size': file_path.stat().st_size,
                            'path': str(file_path)
                        })

            return {
                **job,
                'files': files_info
            }
        except Exception as e:
            logger.error(f"Error getting job info for {job_id}: {str(e)}")
            return None

encoder_service = EncoderService()