Spaces:
Sleeping
Sleeping
| import subprocess | |
| import os | |
| import shutil | |
| from pathlib import Path | |
| import logging | |
| import json | |
| from datetime import datetime | |
| import threading | |
| import signal | |
| import time | |
| import queue | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| def _read_pipe(pipe, q): | |
| """Read lines from a pipe and put them into a queue.""" | |
| try: | |
| with pipe: | |
| for line in iter(pipe.readline, ''): | |
| q.put(line) | |
| except Exception as e: | |
| logger.error(f"Error reading pipe: {str(e)}") | |
| class EncoderService: | |
| def __init__(self): | |
| self.jobs = {} | |
| self.threads = {} # Store encoding thread references | |
| # Optimized settings for web streaming | |
| self.default_qualities = { | |
| '480p': { | |
| 'width': 854, | |
| 'height': 480, | |
| 'bitrate': '1000k', | |
| 'maxrate': '1200k', # Slightly tighter maxrate | |
| 'bufsize': '2000k', | |
| 'audio_bitrate': '128k', | |
| 'keyframe': '48', | |
| 'preset': 'medium', # Consider medium for faster encoding | |
| 'profile': 'main', | |
| 'level': '3.1', | |
| 'tune': 'fastdecode' | |
| }, | |
| '720p': { | |
| 'width': 1280, | |
| 'height': 720, | |
| 'bitrate': '2500k', | |
| 'maxrate': '2800k', # Slightly tighter maxrate | |
| 'bufsize': '4000k', | |
| 'audio_bitrate': '160k', # Increased audio bitrate | |
| 'keyframe': '48', | |
| 'preset': 'medium', # Consider medium for faster encoding | |
| 'profile': 'main', | |
| 'level': '3.1', | |
| 'tune': 'fastdecode' | |
| }, | |
| '1080p': { | |
| 'width': 1920, | |
| 'height': 1080, | |
| 'bitrate': '5000k', | |
| 'maxrate': '5500k', # Slightly tighter maxrate | |
| 'bufsize': '8000k', | |
| 'audio_bitrate': '192k', | |
| 'keyframe': '48', | |
| 'preset': 'medium', # Consider medium for faster encoding | |
| 'profile': 'high', | |
| 'level': '4.0', | |
| 'tune': 'fastdecode' | |
| } | |
| } | |
| self.active_processes = {} | |
| def start_encode_job(self, filename, job_id, output_name=None, settings=None): | |
| """Start an encoding job with optional custom settings and output name""" | |
| try: | |
| qualities = self.default_qualities.copy() | |
| if settings: | |
| settings_dict = json.loads(settings) | |
| for quality, bitrate in settings_dict.items(): | |
| if quality in qualities and bitrate: | |
| qualities[quality]['bitrate'] = bitrate | |
| # Adjust maxrate and bufsize based on new bitrate | |
| bitrate_value = int(bitrate.replace('k', '')) | |
| qualities[quality]['maxrate'] = f"{int(bitrate_value * 1.5)}k" | |
| qualities[quality]['bufsize'] = f"{int(bitrate_value * 2)}k" | |
| self.jobs[job_id] = { | |
| 'status': 'pending', | |
| 'progress': 0, | |
| 'start_time': datetime.now().isoformat(), | |
| 'filename': filename, | |
| 'output_name': output_name or os.path.splitext(filename)[0], | |
| 'current_quality': None, | |
| 'outputs': [], | |
| 'settings': qualities | |
| } | |
| # Start encoding in a separate thread and store the thread reference | |
| thread = threading.Thread( | |
| target=self._encode_video, | |
| args=(filename, job_id) | |
| ) | |
| self.threads[job_id] = thread | |
| thread.start() | |
| logger.info(f"Started encoding thread for job {job_id}") | |
| return {'status': 'pending', 'job_id': job_id} | |
| except Exception as e: | |
| logger.error(f"Failed to start encoding job: {str(e)}") | |
| return {'status': 'failed', 'error': str(e)} | |
| def _split_video(self, input_file, split_dir, segment_duration=60): # 5 minutes = 300 seconds | |
| """Splits the video into segments using ffmpeg.""" | |
| try: | |
| split_cmd = [ | |
| 'ffmpeg', | |
| '-i', str(input_file), | |
| '-c', 'copy', | |
| '-map', '0', | |
| '-segment_time', str(segment_duration), | |
| '-f', 'segment', | |
| '-segment_format', 'mp4', | |
| '-reset_timestamps', '1', # Important to reset timestamps for each segment | |
| str(split_dir / 'segment_%04d.mp4') # Using 4 digits for segment numbering | |
| ] | |
| logger.info(f"Splitting video with command: {' '.join(split_cmd)}") | |
| subprocess.run(split_cmd, check=True, capture_output=True) | |
| segments = sorted([split_dir / f for f in os.listdir(split_dir) if f.startswith('segment_') and f.endswith('.mp4')]) | |
| return segments | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"Error splitting video: {e.stderr.decode()}") | |
| raise Exception(f"FFmpeg split failed: {e}") | |
| except Exception as e: | |
| logger.error(f"Error splitting video: {str(e)}") | |
| raise | |
| def _stitch_segments(self, segment_files, output_file): | |
| """Stitches video segments back together using ffmpeg.""" | |
| try: | |
| # Create a list file for concat demuxer | |
| list_file_path = output_file.parent / 'segment_list.txt' | |
| with open(list_file_path, 'w') as f: | |
| for segment in segment_files: | |
| f.write(f"file '{segment}'\n") | |
| stitch_cmd = [ | |
| 'ffmpeg', | |
| '-f', 'concat', | |
| '-safe', '0', # Set to 0 if paths are relative, which they are in temp dir | |
| '-i', str(list_file_path), | |
| '-c', 'copy', | |
| str(output_file) | |
| ] | |
| logger.info(f"Stitching video with command: {' '.join(stitch_cmd)}") | |
| subprocess.run(stitch_cmd, check=True, capture_output=True) | |
| # Clean up list file | |
| if list_file_path.exists(): | |
| list_file_path.unlink() | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"Error stitching video: {e.stderr.decode()}") | |
| raise Exception(f"FFmpeg stitch failed: {e}") | |
| except Exception as e: | |
| logger.error(f"Error stitching video: {str(e)}") | |
| raise | |
| def _encode_video(self, filename, job_id): | |
| """Internal method to handle video encoding with splitting, encoding segments, and stitching.""" | |
| try: | |
| upload_path = Path(os.getenv('UPLOAD_FOLDER', 'uploads')) | |
| encoded_path = Path(os.getenv('ENCODED_FOLDER', 'encoded')) | |
| input_file = upload_path / filename | |
| output_dir = encoded_path / job_id | |
| output_name = self.jobs[job_id]['output_name'] | |
| # Create output directory | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| logger.info(f"Created output directory {output_dir} for job {job_id}") | |
| # Create a temporary directory for video segments | |
| split_dir = output_dir / 'segments' | |
| split_dir.mkdir(exist_ok=True) | |
| segment_files = [] | |
| try: | |
| segment_files = self._split_video(input_file, split_dir) | |
| logger.info(f"Video split into {len(segment_files)} segments for job {job_id}") | |
| except Exception as split_err: | |
| raise Exception(f"Video splitting failed: {split_err}") | |
| qualities = self.jobs[job_id]['settings'] | |
| total_qualities = len(qualities) | |
| outputs = [] | |
| global_progress_steps = total_qualities * len(segment_files) + total_qualities # segments encoding + stitching | |
| completed_global_steps = 0 | |
| # Get video duration (using the first segment for approximation, or original file if needed) | |
| duration = 0 | |
| if segment_files: | |
| duration = self._get_video_duration(segment_files[0]) # Using first segment duration | |
| else: # Fallback to original video if no segments (unlikely, but for robustness) | |
| duration = self._get_video_duration(input_file) | |
| if not duration: | |
| raise Exception("Could not determine video duration") | |
| logger.info(f"Video duration (using segment or original) for job {job_id}: {duration} seconds") | |
| for quality, settings in qualities.items(): | |
| max_retries = 3 | |
| attempts = 0 | |
| encoded_segments_for_quality = [] | |
| success = False | |
| while attempts < max_retries and not success: | |
| logger.info(f"Starting encoding for quality {quality}, attempt {attempts+1} for job {job_id}") | |
| self.jobs[job_id].update({ | |
| 'status': 'processing', | |
| 'current_quality': quality, | |
| 'progress': (completed_global_steps / global_progress_steps) * 100 | |
| }) | |
| segment_output_files = [] | |
| segment_success = True # Track if all segments for current quality encoded successfully | |
| completed_steps = 0 | |
| for segment_input_file in segment_files: | |
| segment_output_file = split_dir / f"{segment_input_file.stem}_{quality}.mp4" # Output segment in split_dir | |
| segment_output_files.append(segment_output_file) | |
| cmd = [ | |
| 'ffmpeg', '-y', | |
| '-i', str(segment_input_file), | |
| '-c:v', 'libx264', | |
| '-preset', settings['preset'], | |
| '-profile:v', settings['profile'], | |
| '-level', settings['level'], | |
| '-tune', settings['tune'], | |
| '-b:v', settings['bitrate'], | |
| '-maxrate', settings['maxrate'], | |
| '-bufsize', settings['bufsize'], | |
| '-vf', f"scale={settings['width']}:{settings['height']}", | |
| '-g', settings['keyframe'], | |
| '-keyint_min', settings['keyframe'], | |
| '-sc_threshold', '0', | |
| '-c:a', 'aac', | |
| '-b:a', settings['audio_bitrate'], | |
| '-ar', '48000', | |
| '-ac', '2', | |
| '-movflags', '+faststart', | |
| '-progress', 'pipe:1', | |
| str(segment_output_file) | |
| ] | |
| try: | |
| process = subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| universal_newlines=True | |
| ) | |
| self.active_processes[job_id] = process | |
| q = queue.Queue() | |
| reader_thread = threading.Thread(target=_read_pipe, args=(process.stdout, q)) | |
| reader_thread.daemon = True | |
| reader_thread.start() | |
| last_output_time = time.time() | |
| segment_progress = 0 | |
| while True: | |
| try: | |
| line = q.get(timeout=5) | |
| last_output_time = time.time() | |
| prog = self._parse_ffmpeg_progress(line, duration) # Using segment duration for progress | |
| if prog is not None: | |
| segment_progress = prog | |
| quality_progress = ((completed_global_steps + (completed_steps * 100 + prog) / 100 ) / global_progress_steps) * 100 # Approximation | |
| self.jobs[job_id]['progress'] = quality_progress # Update overall job progress - this is rough, needs refinement for segments | |
| except queue.Empty: | |
| if time.time() - last_output_time > 30: | |
| logger.warning(f"No ffmpeg output for 30 seconds on quality {quality}, segment {segment_input_file.name}, attempt {attempts+1} for job {job_id}") | |
| break | |
| if process.poll() is not None: | |
| break | |
| reader_thread.join(timeout=5) | |
| if process.returncode == 0: | |
| logger.info(f"Encoding successful for quality {quality}, segment {segment_input_file.name} on attempt {attempts+1} for job {job_id}") | |
| completed_global_steps += 1 # Increment for each successful segment encoding | |
| encoded_segments_for_quality.append(segment_output_file) | |
| else: | |
| error_output = process.stderr.read() | |
| logger.error(f"FFmpeg error on quality {quality}, segment {segment_input_file.name}, attempt {attempts+1} for job {job_id}: {error_output}") | |
| segment_success = False # Indicate segment encoding failure | |
| raise Exception(f"FFmpeg failed for quality {quality}, segment {segment_input_file.name}") | |
| except Exception as e: | |
| logger.error(f"Error encoding segment {segment_input_file.name} for quality {quality} on attempt {attempts+1} for job {job_id}: {str(e)}") | |
| segment_success = False # Ensure segment_success is False if exception occurs | |
| attempts += 1 | |
| if attempts < max_retries: | |
| logger.info(f"Retrying encoding for quality {quality}, segment {segment_input_file.name} (attempt {attempts+2} of {max_retries}) for job {job_id}") | |
| time.sleep(2) | |
| else: | |
| break # Break out of attempts loop for current quality if max retries reached | |
| finally: | |
| if job_id in self.active_processes: | |
| del self.active_processes[job_id] | |
| if not segment_success: # If any segment failed, break segment loop and retry/fail quality | |
| break | |
| if segment_success: # If all segments for current quality were successful | |
| success = True # Mark quality encoding as successful | |
| completed_global_steps += 1 # Increment for successful quality stitching step | |
| # Stitch segments for current quality | |
| output_file = output_dir / f"{output_name}_{quality}.mp4" | |
| try: | |
| self._stitch_segments(encoded_segments_for_quality, output_file) | |
| outputs.append({ | |
| 'quality': quality, | |
| 'path': str(output_file), | |
| 'settings': settings | |
| }) | |
| logger.info(f"Stitching successful for quality {quality} for job {job_id}") | |
| except Exception as stitch_err: | |
| logger.error(f"Stitching failed for quality {quality} for job {job_id}: {stitch_err}") | |
| self.jobs[job_id].update({ | |
| 'status': 'failed', | |
| 'error': f"Stitching failed for quality {quality}: {str(stitch_err)}" | |
| }) | |
| return # Early return if stitching fails, consider if retry for stitching needed | |
| # Clean up encoded segments for this quality (optional, but good to clean up split_dir) | |
| for seg_file in encoded_segments_for_quality: | |
| if seg_file.exists(): | |
| seg_file.unlink() | |
| self.jobs[job_id]['progress'] = (completed_global_steps / global_progress_steps) * 100 # Final progress for this quality | |
| if not success: # After max retries for a quality | |
| self.jobs[job_id].update({ | |
| 'status': 'failed', | |
| 'error': f"Failed encoding quality {quality} after {max_retries} attempts" | |
| }) | |
| return # Early return if quality encoding fails after retries | |
| self.jobs[job_id].update({ | |
| 'status': 'completed', | |
| 'progress': 100, | |
| 'outputs': outputs, | |
| 'completion_time': datetime.now().isoformat() | |
| }) | |
| logger.info(f"Job {job_id} completed successfully") | |
| except Exception as e: | |
| logger.error(f"Encoding failed for job {job_id}: {str(e)}") | |
| self.jobs[job_id].update({ | |
| 'status': 'failed', | |
| 'error': str(e) | |
| }) | |
| finally: | |
| # Clean up split directory and segments after processing is done (success or fail) | |
| if split_dir.exists(): | |
| shutil.rmtree(split_dir) | |
| # Remove thread reference once job completes/fails | |
| if job_id in self.threads: | |
| del self.threads[job_id] | |
| def _get_video_duration(self, input_file): | |
| """Get video duration using FFprobe""" | |
| try: | |
| cmd = [ | |
| 'ffprobe', | |
| '-v', 'error', | |
| '-show_entries', 'format=duration', | |
| '-of', 'json', | |
| str(input_file) | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| data = json.loads(result.stdout) | |
| return float(data['format']['duration']) | |
| except Exception as e: | |
| logger.error(f"Error getting video duration: {str(e)}") | |
| return None | |
| def _parse_ffmpeg_progress(self, output, duration): | |
| """Parse FFmpeg progress output""" | |
| try: | |
| if 'out_time_ms=' in output: | |
| time_ms = int(output.split('out_time_ms=')[1].strip()) | |
| progress = (time_ms / 1000000) / duration * 100 | |
| return min(100, max(0, progress)) | |
| return None | |
| except Exception: | |
| return None | |
| def stop_job(self, job_id): | |
| """Stop an encoding job""" | |
| try: | |
| if job_id in self.active_processes: | |
| process = self.active_processes[job_id] | |
| process.send_signal(signal.SIGTERM) | |
| process.wait(timeout=5) | |
| del self.active_processes[job_id] | |
| self.jobs[job_id]['status'] = 'stopped' | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error stopping job {job_id}: {str(e)}") | |
| return False | |
| def clean_job(self, job_id): | |
| """Clean up all files related to a job""" | |
| try: | |
| upload_path = Path(os.getenv('UPLOAD_FOLDER', 'uploads')) | |
| encoded_path = Path(os.getenv('ENCODED_FOLDER', 'encoded')) | |
| if job_id in self.jobs: | |
| source_file = upload_path / self.jobs[job_id]['filename'] | |
| if source_file.exists(): | |
| source_file.unlink() | |
| job_output_dir = encoded_path / job_id | |
| if job_output_dir.exists(): | |
| shutil.rmtree(job_output_dir) | |
| if job_id in self.jobs: | |
| del self.jobs[job_id] | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error cleaning job {job_id}: {str(e)}") | |
| return False | |
| def get_job_info(self, job_id): | |
| """Get detailed information about a job""" | |
| try: | |
| if job_id not in self.jobs: | |
| return None | |
| job = self.jobs[job_id] | |
| encoded_path = Path(os.getenv('ENCODED_FOLDER', 'encoded')) | |
| job_path = encoded_path / job_id | |
| files_info = [] | |
| if job_path.exists(): | |
| for output in job.get('outputs', []): | |
| file_path = Path(output['path']) | |
| if file_path.exists(): | |
| files_info.append({ | |
| 'quality': output['quality'], | |
| 'size': file_path.stat().st_size, | |
| 'path': str(file_path) | |
| }) | |
| return { | |
| **job, | |
| 'files': files_info | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting job info for {job_id}: {str(e)}") | |
| return None | |
| encoder_service = EncoderService() |