import subprocess import os import shutil from pathlib import Path import logging import json from datetime import datetime import threading import signal import time import queue # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def _read_pipe(pipe, q): """Read lines from a pipe and put them into a queue.""" try: with pipe: for line in iter(pipe.readline, ''): q.put(line) except Exception as e: logger.error(f"Error reading pipe: {str(e)}") class EncoderService: def __init__(self): self.jobs = {} self.threads = {} # Store encoding thread references # Optimized settings for web streaming self.default_qualities = { '480p': { 'width': 854, 'height': 480, 'bitrate': '1000k', 'maxrate': '1200k', # Slightly tighter maxrate 'bufsize': '2000k', 'audio_bitrate': '128k', 'keyframe': '48', 'preset': 'medium', # Consider medium for faster encoding 'profile': 'main', 'level': '3.1', 'tune': 'fastdecode' }, '720p': { 'width': 1280, 'height': 720, 'bitrate': '2500k', 'maxrate': '2800k', # Slightly tighter maxrate 'bufsize': '4000k', 'audio_bitrate': '160k', # Increased audio bitrate 'keyframe': '48', 'preset': 'medium', # Consider medium for faster encoding 'profile': 'main', 'level': '3.1', 'tune': 'fastdecode' }, '1080p': { 'width': 1920, 'height': 1080, 'bitrate': '5000k', 'maxrate': '5500k', # Slightly tighter maxrate 'bufsize': '8000k', 'audio_bitrate': '192k', 'keyframe': '48', 'preset': 'medium', # Consider medium for faster encoding 'profile': 'high', 'level': '4.0', 'tune': 'fastdecode' } } self.active_processes = {} def start_encode_job(self, filename, job_id, output_name=None, settings=None): """Start an encoding job with optional custom settings and output name""" try: qualities = self.default_qualities.copy() if settings: settings_dict = json.loads(settings) for quality, bitrate in settings_dict.items(): if quality in qualities and bitrate: qualities[quality]['bitrate'] = bitrate # Adjust maxrate and bufsize based on new bitrate bitrate_value = int(bitrate.replace('k', '')) qualities[quality]['maxrate'] = f"{int(bitrate_value * 1.5)}k" qualities[quality]['bufsize'] = f"{int(bitrate_value * 2)}k" self.jobs[job_id] = { 'status': 'pending', 'progress': 0, 'start_time': datetime.now().isoformat(), 'filename': filename, 'output_name': output_name or os.path.splitext(filename)[0], 'current_quality': None, 'outputs': [], 'settings': qualities } # Start encoding in a separate thread and store the thread reference thread = threading.Thread( target=self._encode_video, args=(filename, job_id) ) self.threads[job_id] = thread thread.start() logger.info(f"Started encoding thread for job {job_id}") return {'status': 'pending', 'job_id': job_id} except Exception as e: logger.error(f"Failed to start encoding job: {str(e)}") return {'status': 'failed', 'error': str(e)} def _split_video(self, input_file, split_dir, segment_duration=60): # 5 minutes = 300 seconds """Splits the video into segments using ffmpeg.""" try: split_cmd = [ 'ffmpeg', '-i', str(input_file), '-c', 'copy', '-map', '0', '-segment_time', str(segment_duration), '-f', 'segment', '-segment_format', 'mp4', '-reset_timestamps', '1', # Important to reset timestamps for each segment str(split_dir / 'segment_%04d.mp4') # Using 4 digits for segment numbering ] logger.info(f"Splitting video with command: {' '.join(split_cmd)}") subprocess.run(split_cmd, check=True, capture_output=True) segments = sorted([split_dir / f for f in os.listdir(split_dir) if f.startswith('segment_') and f.endswith('.mp4')]) return segments except subprocess.CalledProcessError as e: logger.error(f"Error splitting video: {e.stderr.decode()}") raise Exception(f"FFmpeg split failed: {e}") except Exception as e: logger.error(f"Error splitting video: {str(e)}") raise def _stitch_segments(self, segment_files, output_file): """Stitches video segments back together using ffmpeg.""" try: # Create a list file for concat demuxer list_file_path = output_file.parent / 'segment_list.txt' with open(list_file_path, 'w') as f: for segment in segment_files: f.write(f"file '{segment}'\n") stitch_cmd = [ 'ffmpeg', '-f', 'concat', '-safe', '0', # Set to 0 if paths are relative, which they are in temp dir '-i', str(list_file_path), '-c', 'copy', str(output_file) ] logger.info(f"Stitching video with command: {' '.join(stitch_cmd)}") subprocess.run(stitch_cmd, check=True, capture_output=True) # Clean up list file if list_file_path.exists(): list_file_path.unlink() except subprocess.CalledProcessError as e: logger.error(f"Error stitching video: {e.stderr.decode()}") raise Exception(f"FFmpeg stitch failed: {e}") except Exception as e: logger.error(f"Error stitching video: {str(e)}") raise def _encode_video(self, filename, job_id): """Internal method to handle video encoding with splitting, encoding segments, and stitching.""" try: upload_path = Path(os.getenv('UPLOAD_FOLDER', 'uploads')) encoded_path = Path(os.getenv('ENCODED_FOLDER', 'encoded')) input_file = upload_path / filename output_dir = encoded_path / job_id output_name = self.jobs[job_id]['output_name'] # Create output directory output_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Created output directory {output_dir} for job {job_id}") # Create a temporary directory for video segments split_dir = output_dir / 'segments' split_dir.mkdir(exist_ok=True) segment_files = [] try: segment_files = self._split_video(input_file, split_dir) logger.info(f"Video split into {len(segment_files)} segments for job {job_id}") except Exception as split_err: raise Exception(f"Video splitting failed: {split_err}") qualities = self.jobs[job_id]['settings'] total_qualities = len(qualities) outputs = [] global_progress_steps = total_qualities * len(segment_files) + total_qualities # segments encoding + stitching completed_global_steps = 0 # Get video duration (using the first segment for approximation, or original file if needed) duration = 0 if segment_files: duration = self._get_video_duration(segment_files[0]) # Using first segment duration else: # Fallback to original video if no segments (unlikely, but for robustness) duration = self._get_video_duration(input_file) if not duration: raise Exception("Could not determine video duration") logger.info(f"Video duration (using segment or original) for job {job_id}: {duration} seconds") for quality, settings in qualities.items(): max_retries = 3 attempts = 0 encoded_segments_for_quality = [] success = False while attempts < max_retries and not success: logger.info(f"Starting encoding for quality {quality}, attempt {attempts+1} for job {job_id}") self.jobs[job_id].update({ 'status': 'processing', 'current_quality': quality, 'progress': (completed_global_steps / global_progress_steps) * 100 }) segment_output_files = [] segment_success = True # Track if all segments for current quality encoded successfully completed_steps = 0 for segment_input_file in segment_files: segment_output_file = split_dir / f"{segment_input_file.stem}_{quality}.mp4" # Output segment in split_dir segment_output_files.append(segment_output_file) cmd = [ 'ffmpeg', '-y', '-i', str(segment_input_file), '-c:v', 'libx264', '-preset', settings['preset'], '-profile:v', settings['profile'], '-level', settings['level'], '-tune', settings['tune'], '-b:v', settings['bitrate'], '-maxrate', settings['maxrate'], '-bufsize', settings['bufsize'], '-vf', f"scale={settings['width']}:{settings['height']}", '-g', settings['keyframe'], '-keyint_min', settings['keyframe'], '-sc_threshold', '0', '-c:a', 'aac', '-b:a', settings['audio_bitrate'], '-ar', '48000', '-ac', '2', '-movflags', '+faststart', '-progress', 'pipe:1', str(segment_output_file) ] try: process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True ) self.active_processes[job_id] = process q = queue.Queue() reader_thread = threading.Thread(target=_read_pipe, args=(process.stdout, q)) reader_thread.daemon = True reader_thread.start() last_output_time = time.time() segment_progress = 0 while True: try: line = q.get(timeout=5) last_output_time = time.time() prog = self._parse_ffmpeg_progress(line, duration) # Using segment duration for progress if prog is not None: segment_progress = prog quality_progress = ((completed_global_steps + (completed_steps * 100 + prog) / 100 ) / global_progress_steps) * 100 # Approximation self.jobs[job_id]['progress'] = quality_progress # Update overall job progress - this is rough, needs refinement for segments except queue.Empty: if time.time() - last_output_time > 30: logger.warning(f"No ffmpeg output for 30 seconds on quality {quality}, segment {segment_input_file.name}, attempt {attempts+1} for job {job_id}") break if process.poll() is not None: break reader_thread.join(timeout=5) if process.returncode == 0: logger.info(f"Encoding successful for quality {quality}, segment {segment_input_file.name} on attempt {attempts+1} for job {job_id}") completed_global_steps += 1 # Increment for each successful segment encoding encoded_segments_for_quality.append(segment_output_file) else: error_output = process.stderr.read() logger.error(f"FFmpeg error on quality {quality}, segment {segment_input_file.name}, attempt {attempts+1} for job {job_id}: {error_output}") segment_success = False # Indicate segment encoding failure raise Exception(f"FFmpeg failed for quality {quality}, segment {segment_input_file.name}") except Exception as e: logger.error(f"Error encoding segment {segment_input_file.name} for quality {quality} on attempt {attempts+1} for job {job_id}: {str(e)}") segment_success = False # Ensure segment_success is False if exception occurs attempts += 1 if attempts < max_retries: logger.info(f"Retrying encoding for quality {quality}, segment {segment_input_file.name} (attempt {attempts+2} of {max_retries}) for job {job_id}") time.sleep(2) else: break # Break out of attempts loop for current quality if max retries reached finally: if job_id in self.active_processes: del self.active_processes[job_id] if not segment_success: # If any segment failed, break segment loop and retry/fail quality break if segment_success: # If all segments for current quality were successful success = True # Mark quality encoding as successful completed_global_steps += 1 # Increment for successful quality stitching step # Stitch segments for current quality output_file = output_dir / f"{output_name}_{quality}.mp4" try: self._stitch_segments(encoded_segments_for_quality, output_file) outputs.append({ 'quality': quality, 'path': str(output_file), 'settings': settings }) logger.info(f"Stitching successful for quality {quality} for job {job_id}") except Exception as stitch_err: logger.error(f"Stitching failed for quality {quality} for job {job_id}: {stitch_err}") self.jobs[job_id].update({ 'status': 'failed', 'error': f"Stitching failed for quality {quality}: {str(stitch_err)}" }) return # Early return if stitching fails, consider if retry for stitching needed # Clean up encoded segments for this quality (optional, but good to clean up split_dir) for seg_file in encoded_segments_for_quality: if seg_file.exists(): seg_file.unlink() self.jobs[job_id]['progress'] = (completed_global_steps / global_progress_steps) * 100 # Final progress for this quality if not success: # After max retries for a quality self.jobs[job_id].update({ 'status': 'failed', 'error': f"Failed encoding quality {quality} after {max_retries} attempts" }) return # Early return if quality encoding fails after retries self.jobs[job_id].update({ 'status': 'completed', 'progress': 100, 'outputs': outputs, 'completion_time': datetime.now().isoformat() }) logger.info(f"Job {job_id} completed successfully") except Exception as e: logger.error(f"Encoding failed for job {job_id}: {str(e)}") self.jobs[job_id].update({ 'status': 'failed', 'error': str(e) }) finally: # Clean up split directory and segments after processing is done (success or fail) if split_dir.exists(): shutil.rmtree(split_dir) # Remove thread reference once job completes/fails if job_id in self.threads: del self.threads[job_id] def _get_video_duration(self, input_file): """Get video duration using FFprobe""" try: cmd = [ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'json', str(input_file) ] result = subprocess.run(cmd, capture_output=True, text=True) data = json.loads(result.stdout) return float(data['format']['duration']) except Exception as e: logger.error(f"Error getting video duration: {str(e)}") return None def _parse_ffmpeg_progress(self, output, duration): """Parse FFmpeg progress output""" try: if 'out_time_ms=' in output: time_ms = int(output.split('out_time_ms=')[1].strip()) progress = (time_ms / 1000000) / duration * 100 return min(100, max(0, progress)) return None except Exception: return None def stop_job(self, job_id): """Stop an encoding job""" try: if job_id in self.active_processes: process = self.active_processes[job_id] process.send_signal(signal.SIGTERM) process.wait(timeout=5) del self.active_processes[job_id] self.jobs[job_id]['status'] = 'stopped' return True return False except Exception as e: logger.error(f"Error stopping job {job_id}: {str(e)}") return False def clean_job(self, job_id): """Clean up all files related to a job""" try: upload_path = Path(os.getenv('UPLOAD_FOLDER', 'uploads')) encoded_path = Path(os.getenv('ENCODED_FOLDER', 'encoded')) if job_id in self.jobs: source_file = upload_path / self.jobs[job_id]['filename'] if source_file.exists(): source_file.unlink() job_output_dir = encoded_path / job_id if job_output_dir.exists(): shutil.rmtree(job_output_dir) if job_id in self.jobs: del self.jobs[job_id] return True except Exception as e: logger.error(f"Error cleaning job {job_id}: {str(e)}") return False def get_job_info(self, job_id): """Get detailed information about a job""" try: if job_id not in self.jobs: return None job = self.jobs[job_id] encoded_path = Path(os.getenv('ENCODED_FOLDER', 'encoded')) job_path = encoded_path / job_id files_info = [] if job_path.exists(): for output in job.get('outputs', []): file_path = Path(output['path']) if file_path.exists(): files_info.append({ 'quality': output['quality'], 'size': file_path.stat().st_size, 'path': str(file_path) }) return { **job, 'files': files_info } except Exception as e: logger.error(f"Error getting job info for {job_id}: {str(e)}") return None encoder_service = EncoderService()