Nexora-Encoder / app /services /encoder_service.py
ChandimaPrabath's picture
`Update segment duration in _split_video method of EncoderService`
a5c8684
import subprocess
import os
import shutil
from pathlib import Path
import logging
import json
from datetime import datetime
import threading
import signal
import time
import queue
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _read_pipe(pipe, q):
"""Read lines from a pipe and put them into a queue."""
try:
with pipe:
for line in iter(pipe.readline, ''):
q.put(line)
except Exception as e:
logger.error(f"Error reading pipe: {str(e)}")
class EncoderService:
def __init__(self):
self.jobs = {}
self.threads = {} # Store encoding thread references
# Optimized settings for web streaming
self.default_qualities = {
'480p': {
'width': 854,
'height': 480,
'bitrate': '1000k',
'maxrate': '1200k', # Slightly tighter maxrate
'bufsize': '2000k',
'audio_bitrate': '128k',
'keyframe': '48',
'preset': 'medium', # Consider medium for faster encoding
'profile': 'main',
'level': '3.1',
'tune': 'fastdecode'
},
'720p': {
'width': 1280,
'height': 720,
'bitrate': '2500k',
'maxrate': '2800k', # Slightly tighter maxrate
'bufsize': '4000k',
'audio_bitrate': '160k', # Increased audio bitrate
'keyframe': '48',
'preset': 'medium', # Consider medium for faster encoding
'profile': 'main',
'level': '3.1',
'tune': 'fastdecode'
},
'1080p': {
'width': 1920,
'height': 1080,
'bitrate': '5000k',
'maxrate': '5500k', # Slightly tighter maxrate
'bufsize': '8000k',
'audio_bitrate': '192k',
'keyframe': '48',
'preset': 'medium', # Consider medium for faster encoding
'profile': 'high',
'level': '4.0',
'tune': 'fastdecode'
}
}
self.active_processes = {}
def start_encode_job(self, filename, job_id, output_name=None, settings=None):
"""Start an encoding job with optional custom settings and output name"""
try:
qualities = self.default_qualities.copy()
if settings:
settings_dict = json.loads(settings)
for quality, bitrate in settings_dict.items():
if quality in qualities and bitrate:
qualities[quality]['bitrate'] = bitrate
# Adjust maxrate and bufsize based on new bitrate
bitrate_value = int(bitrate.replace('k', ''))
qualities[quality]['maxrate'] = f"{int(bitrate_value * 1.5)}k"
qualities[quality]['bufsize'] = f"{int(bitrate_value * 2)}k"
self.jobs[job_id] = {
'status': 'pending',
'progress': 0,
'start_time': datetime.now().isoformat(),
'filename': filename,
'output_name': output_name or os.path.splitext(filename)[0],
'current_quality': None,
'outputs': [],
'settings': qualities
}
# Start encoding in a separate thread and store the thread reference
thread = threading.Thread(
target=self._encode_video,
args=(filename, job_id)
)
self.threads[job_id] = thread
thread.start()
logger.info(f"Started encoding thread for job {job_id}")
return {'status': 'pending', 'job_id': job_id}
except Exception as e:
logger.error(f"Failed to start encoding job: {str(e)}")
return {'status': 'failed', 'error': str(e)}
def _split_video(self, input_file, split_dir, segment_duration=60): # 5 minutes = 300 seconds
"""Splits the video into segments using ffmpeg."""
try:
split_cmd = [
'ffmpeg',
'-i', str(input_file),
'-c', 'copy',
'-map', '0',
'-segment_time', str(segment_duration),
'-f', 'segment',
'-segment_format', 'mp4',
'-reset_timestamps', '1', # Important to reset timestamps for each segment
str(split_dir / 'segment_%04d.mp4') # Using 4 digits for segment numbering
]
logger.info(f"Splitting video with command: {' '.join(split_cmd)}")
subprocess.run(split_cmd, check=True, capture_output=True)
segments = sorted([split_dir / f for f in os.listdir(split_dir) if f.startswith('segment_') and f.endswith('.mp4')])
return segments
except subprocess.CalledProcessError as e:
logger.error(f"Error splitting video: {e.stderr.decode()}")
raise Exception(f"FFmpeg split failed: {e}")
except Exception as e:
logger.error(f"Error splitting video: {str(e)}")
raise
def _stitch_segments(self, segment_files, output_file):
"""Stitches video segments back together using ffmpeg."""
try:
# Create a list file for concat demuxer
list_file_path = output_file.parent / 'segment_list.txt'
with open(list_file_path, 'w') as f:
for segment in segment_files:
f.write(f"file '{segment}'\n")
stitch_cmd = [
'ffmpeg',
'-f', 'concat',
'-safe', '0', # Set to 0 if paths are relative, which they are in temp dir
'-i', str(list_file_path),
'-c', 'copy',
str(output_file)
]
logger.info(f"Stitching video with command: {' '.join(stitch_cmd)}")
subprocess.run(stitch_cmd, check=True, capture_output=True)
# Clean up list file
if list_file_path.exists():
list_file_path.unlink()
except subprocess.CalledProcessError as e:
logger.error(f"Error stitching video: {e.stderr.decode()}")
raise Exception(f"FFmpeg stitch failed: {e}")
except Exception as e:
logger.error(f"Error stitching video: {str(e)}")
raise
def _encode_video(self, filename, job_id):
"""Internal method to handle video encoding with splitting, encoding segments, and stitching."""
try:
upload_path = Path(os.getenv('UPLOAD_FOLDER', 'uploads'))
encoded_path = Path(os.getenv('ENCODED_FOLDER', 'encoded'))
input_file = upload_path / filename
output_dir = encoded_path / job_id
output_name = self.jobs[job_id]['output_name']
# Create output directory
output_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Created output directory {output_dir} for job {job_id}")
# Create a temporary directory for video segments
split_dir = output_dir / 'segments'
split_dir.mkdir(exist_ok=True)
segment_files = []
try:
segment_files = self._split_video(input_file, split_dir)
logger.info(f"Video split into {len(segment_files)} segments for job {job_id}")
except Exception as split_err:
raise Exception(f"Video splitting failed: {split_err}")
qualities = self.jobs[job_id]['settings']
total_qualities = len(qualities)
outputs = []
global_progress_steps = total_qualities * len(segment_files) + total_qualities # segments encoding + stitching
completed_global_steps = 0
# Get video duration (using the first segment for approximation, or original file if needed)
duration = 0
if segment_files:
duration = self._get_video_duration(segment_files[0]) # Using first segment duration
else: # Fallback to original video if no segments (unlikely, but for robustness)
duration = self._get_video_duration(input_file)
if not duration:
raise Exception("Could not determine video duration")
logger.info(f"Video duration (using segment or original) for job {job_id}: {duration} seconds")
for quality, settings in qualities.items():
max_retries = 3
attempts = 0
encoded_segments_for_quality = []
success = False
while attempts < max_retries and not success:
logger.info(f"Starting encoding for quality {quality}, attempt {attempts+1} for job {job_id}")
self.jobs[job_id].update({
'status': 'processing',
'current_quality': quality,
'progress': (completed_global_steps / global_progress_steps) * 100
})
segment_output_files = []
segment_success = True # Track if all segments for current quality encoded successfully
completed_steps = 0
for segment_input_file in segment_files:
segment_output_file = split_dir / f"{segment_input_file.stem}_{quality}.mp4" # Output segment in split_dir
segment_output_files.append(segment_output_file)
cmd = [
'ffmpeg', '-y',
'-i', str(segment_input_file),
'-c:v', 'libx264',
'-preset', settings['preset'],
'-profile:v', settings['profile'],
'-level', settings['level'],
'-tune', settings['tune'],
'-b:v', settings['bitrate'],
'-maxrate', settings['maxrate'],
'-bufsize', settings['bufsize'],
'-vf', f"scale={settings['width']}:{settings['height']}",
'-g', settings['keyframe'],
'-keyint_min', settings['keyframe'],
'-sc_threshold', '0',
'-c:a', 'aac',
'-b:a', settings['audio_bitrate'],
'-ar', '48000',
'-ac', '2',
'-movflags', '+faststart',
'-progress', 'pipe:1',
str(segment_output_file)
]
try:
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True
)
self.active_processes[job_id] = process
q = queue.Queue()
reader_thread = threading.Thread(target=_read_pipe, args=(process.stdout, q))
reader_thread.daemon = True
reader_thread.start()
last_output_time = time.time()
segment_progress = 0
while True:
try:
line = q.get(timeout=5)
last_output_time = time.time()
prog = self._parse_ffmpeg_progress(line, duration) # Using segment duration for progress
if prog is not None:
segment_progress = prog
quality_progress = ((completed_global_steps + (completed_steps * 100 + prog) / 100 ) / global_progress_steps) * 100 # Approximation
self.jobs[job_id]['progress'] = quality_progress # Update overall job progress - this is rough, needs refinement for segments
except queue.Empty:
if time.time() - last_output_time > 30:
logger.warning(f"No ffmpeg output for 30 seconds on quality {quality}, segment {segment_input_file.name}, attempt {attempts+1} for job {job_id}")
break
if process.poll() is not None:
break
reader_thread.join(timeout=5)
if process.returncode == 0:
logger.info(f"Encoding successful for quality {quality}, segment {segment_input_file.name} on attempt {attempts+1} for job {job_id}")
completed_global_steps += 1 # Increment for each successful segment encoding
encoded_segments_for_quality.append(segment_output_file)
else:
error_output = process.stderr.read()
logger.error(f"FFmpeg error on quality {quality}, segment {segment_input_file.name}, attempt {attempts+1} for job {job_id}: {error_output}")
segment_success = False # Indicate segment encoding failure
raise Exception(f"FFmpeg failed for quality {quality}, segment {segment_input_file.name}")
except Exception as e:
logger.error(f"Error encoding segment {segment_input_file.name} for quality {quality} on attempt {attempts+1} for job {job_id}: {str(e)}")
segment_success = False # Ensure segment_success is False if exception occurs
attempts += 1
if attempts < max_retries:
logger.info(f"Retrying encoding for quality {quality}, segment {segment_input_file.name} (attempt {attempts+2} of {max_retries}) for job {job_id}")
time.sleep(2)
else:
break # Break out of attempts loop for current quality if max retries reached
finally:
if job_id in self.active_processes:
del self.active_processes[job_id]
if not segment_success: # If any segment failed, break segment loop and retry/fail quality
break
if segment_success: # If all segments for current quality were successful
success = True # Mark quality encoding as successful
completed_global_steps += 1 # Increment for successful quality stitching step
# Stitch segments for current quality
output_file = output_dir / f"{output_name}_{quality}.mp4"
try:
self._stitch_segments(encoded_segments_for_quality, output_file)
outputs.append({
'quality': quality,
'path': str(output_file),
'settings': settings
})
logger.info(f"Stitching successful for quality {quality} for job {job_id}")
except Exception as stitch_err:
logger.error(f"Stitching failed for quality {quality} for job {job_id}: {stitch_err}")
self.jobs[job_id].update({
'status': 'failed',
'error': f"Stitching failed for quality {quality}: {str(stitch_err)}"
})
return # Early return if stitching fails, consider if retry for stitching needed
# Clean up encoded segments for this quality (optional, but good to clean up split_dir)
for seg_file in encoded_segments_for_quality:
if seg_file.exists():
seg_file.unlink()
self.jobs[job_id]['progress'] = (completed_global_steps / global_progress_steps) * 100 # Final progress for this quality
if not success: # After max retries for a quality
self.jobs[job_id].update({
'status': 'failed',
'error': f"Failed encoding quality {quality} after {max_retries} attempts"
})
return # Early return if quality encoding fails after retries
self.jobs[job_id].update({
'status': 'completed',
'progress': 100,
'outputs': outputs,
'completion_time': datetime.now().isoformat()
})
logger.info(f"Job {job_id} completed successfully")
except Exception as e:
logger.error(f"Encoding failed for job {job_id}: {str(e)}")
self.jobs[job_id].update({
'status': 'failed',
'error': str(e)
})
finally:
# Clean up split directory and segments after processing is done (success or fail)
if split_dir.exists():
shutil.rmtree(split_dir)
# Remove thread reference once job completes/fails
if job_id in self.threads:
del self.threads[job_id]
def _get_video_duration(self, input_file):
"""Get video duration using FFprobe"""
try:
cmd = [
'ffprobe',
'-v', 'error',
'-show_entries', 'format=duration',
'-of', 'json',
str(input_file)
]
result = subprocess.run(cmd, capture_output=True, text=True)
data = json.loads(result.stdout)
return float(data['format']['duration'])
except Exception as e:
logger.error(f"Error getting video duration: {str(e)}")
return None
def _parse_ffmpeg_progress(self, output, duration):
"""Parse FFmpeg progress output"""
try:
if 'out_time_ms=' in output:
time_ms = int(output.split('out_time_ms=')[1].strip())
progress = (time_ms / 1000000) / duration * 100
return min(100, max(0, progress))
return None
except Exception:
return None
def stop_job(self, job_id):
"""Stop an encoding job"""
try:
if job_id in self.active_processes:
process = self.active_processes[job_id]
process.send_signal(signal.SIGTERM)
process.wait(timeout=5)
del self.active_processes[job_id]
self.jobs[job_id]['status'] = 'stopped'
return True
return False
except Exception as e:
logger.error(f"Error stopping job {job_id}: {str(e)}")
return False
def clean_job(self, job_id):
"""Clean up all files related to a job"""
try:
upload_path = Path(os.getenv('UPLOAD_FOLDER', 'uploads'))
encoded_path = Path(os.getenv('ENCODED_FOLDER', 'encoded'))
if job_id in self.jobs:
source_file = upload_path / self.jobs[job_id]['filename']
if source_file.exists():
source_file.unlink()
job_output_dir = encoded_path / job_id
if job_output_dir.exists():
shutil.rmtree(job_output_dir)
if job_id in self.jobs:
del self.jobs[job_id]
return True
except Exception as e:
logger.error(f"Error cleaning job {job_id}: {str(e)}")
return False
def get_job_info(self, job_id):
"""Get detailed information about a job"""
try:
if job_id not in self.jobs:
return None
job = self.jobs[job_id]
encoded_path = Path(os.getenv('ENCODED_FOLDER', 'encoded'))
job_path = encoded_path / job_id
files_info = []
if job_path.exists():
for output in job.get('outputs', []):
file_path = Path(output['path'])
if file_path.exists():
files_info.append({
'quality': output['quality'],
'size': file_path.stat().st_size,
'path': str(file_path)
})
return {
**job,
'files': files_info
}
except Exception as e:
logger.error(f"Error getting job info for {job_id}: {str(e)}")
return None
encoder_service = EncoderService()