FaceSwapLite-1.0 / refacer.py
minhho's picture
feat: add debug logging for face swap modes
6f4eab9
import cv2
import numpy as np
import onnxruntime as rt
import sys
from insightface.app import FaceAnalysis
sys.path.insert(1, './recognition')
from scrfd import SCRFD
from arcface_onnx import ArcFaceONNX
import os.path as osp
import os
from pathlib import Path
from tqdm import tqdm
import ffmpeg
import random
import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor
from insightface.model_zoo.inswapper import INSwapper
import psutil
from enum import Enum
from insightface.app.common import Face
from insightface.utils.storage import ensure_available
import re
import subprocess
import urllib.request
# Face enhancement imports
try:
from gfpgan import GFPGANer
GFPGAN_AVAILABLE = True
except ImportError:
GFPGAN_AVAILABLE = False
print("GFPGAN not available - face enhancement disabled")
class RefacerMode(Enum):
CPU, CUDA, COREML, TENSORRT = range(1, 5)
class Refacer:
def __init__(self,force_cpu=False,colab_performance=False):
self.first_face = False
self.force_cpu = force_cpu
self.colab_performance = colab_performance
self.__check_encoders()
self.__check_providers()
self.total_mem = psutil.virtual_memory().total
self.__init_apps()
# Advanced temporal smoothing for reducing flickering
self.prev_faces = [] # Store faces from previous frame
self.face_tracking_threshold = 0.15 # Lower IOU threshold for fast motion tracking
self.face_memory = {} # Track face state across frames
self.occlusion_tolerance = 10 # Higher tolerance for fast motion and occlusions
self.last_swapped_frame = None # Cache last successfully swapped frame for stability
self.stable_swap_count = 0 # Count consecutive stable swaps
# Quality enhancement settings
self.enable_color_correction = True # Match skin tone and lighting
self.enable_seamless_clone = False # Disabled - INSwapper already handles blending
self.enable_temporal_blend = True # Smooth frame transitions
self.temporal_blend_alpha = 0.15 # Blend 15% with previous frame
self.prev_blended_frame = None # For temporal smoothing
self.enable_face_enhancement = GFPGAN_AVAILABLE # Face restoration with GFPGAN
self.face_enhancer = None
# Initialize GFPGAN for face enhancement
if self.enable_face_enhancement:
try:
print("Initializing GFPGAN face enhancer...")
self.face_enhancer = GFPGANer(
model_path='https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.3.pth',
upscale=1, # Don't upscale, just enhance
arch='clean',
channel_multiplier=2,
bg_upsampler=None # Don't enhance background
)
print("GFPGAN initialized successfully!")
except Exception as e:
print(f"GFPGAN initialization failed: {e}")
self.enable_face_enhancement = False
def __check_providers(self):
if self.force_cpu :
self.providers = ['CPUExecutionProvider']
else:
self.providers = rt.get_available_providers()
rt.set_default_logger_severity(4)
self.sess_options = rt.SessionOptions()
self.sess_options.execution_mode = rt.ExecutionMode.ORT_SEQUENTIAL
self.sess_options.graph_optimization_level = rt.GraphOptimizationLevel.ORT_ENABLE_ALL
if len(self.providers) == 1 and 'CPUExecutionProvider' in self.providers:
self.mode = RefacerMode.CPU
self.use_num_cpus = mp.cpu_count()-1
self.sess_options.intra_op_num_threads = int(self.use_num_cpus/3)
print(f"CPU mode with providers {self.providers}")
elif self.colab_performance:
self.mode = RefacerMode.TENSORRT
self.use_num_cpus = mp.cpu_count()-1
self.sess_options.intra_op_num_threads = int(self.use_num_cpus/3)
print(f"TENSORRT mode with providers {self.providers}")
elif 'CoreMLExecutionProvider' in self.providers:
self.mode = RefacerMode.COREML
self.use_num_cpus = mp.cpu_count()-1
self.sess_options.intra_op_num_threads = int(self.use_num_cpus/3)
print(f"CoreML mode with providers {self.providers}")
elif 'CUDAExecutionProvider' in self.providers:
self.mode = RefacerMode.CUDA
self.use_num_cpus = 2
self.sess_options.intra_op_num_threads = 1
if 'TensorrtExecutionProvider' in self.providers:
self.providers.remove('TensorrtExecutionProvider')
print(f"CUDA mode with providers {self.providers}")
"""
elif 'TensorrtExecutionProvider' in self.providers:
self.mode = RefacerMode.TENSORRT
#self.use_num_cpus = 1
#self.sess_options.intra_op_num_threads = 1
self.use_num_cpus = mp.cpu_count()-1
self.sess_options.intra_op_num_threads = int(self.use_num_cpus/3)
print(f"TENSORRT mode with providers {self.providers}")
"""
def __download_model(self, model_path):
"""Download the inswapper model if it doesn't exist"""
if os.path.exists(model_path):
return
print(f"Model file {model_path} not found. Downloading...")
# Direct download from reliable sources
sources = [
{
'name': 'Hugging Face - ezioruan',
'url': 'https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx',
},
{
'name': 'Hugging Face - ashleykleynhans',
'url': 'https://huggingface.co/ashleykleynhans/inswapper/resolve/main/inswapper_128.onnx',
},
{
'name': 'Hugging Face - public-data',
'url': 'https://huggingface.co/public-data/insightface/resolve/main/models/inswapper_128.onnx',
}
]
for source in sources:
try:
print(f"Trying to download from {source['name']}...")
# Use urllib with headers to avoid blocking
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
req = urllib.request.Request(
source['url'],
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
)
with urllib.request.urlopen(req, timeout=300) as response:
total_size = int(response.headers.get('content-length', 0))
print(f"Downloading {total_size / (1024*1024):.1f} MB...")
with open(model_path, 'wb') as f:
downloaded = 0
chunk_size = 8192
while True:
chunk = response.read(chunk_size)
if not chunk:
break
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
percent = (downloaded / total_size) * 100
if int(percent) % 10 == 0: # Print every 10%
print(f"Progress: {percent:.0f}%")
# Verify the file
if os.path.exists(model_path) and os.path.getsize(model_path) > 500000000: # > 500MB
print(f"✅ Successfully downloaded model from {source['name']} ({os.path.getsize(model_path) / (1024*1024):.1f} MB)")
return
else:
print(f"❌ Downloaded file seems incomplete (size: {os.path.getsize(model_path) if os.path.exists(model_path) else 0} bytes)")
if os.path.exists(model_path):
os.remove(model_path)
except Exception as e:
print(f"❌ Failed to download from {source['name']}: {str(e)}")
if os.path.exists(model_path):
os.remove(model_path)
continue
raise Exception(
"❌ Failed to download inswapper_128.onnx from all sources.\n\n"
"Please upload the model file manually:\n"
"1. Download from: https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx\n"
"2. Upload to your Space via the Files tab\n"
"3. Name it: inswapper_128.onnx"
)
def __init_apps(self):
assets_dir = ensure_available('models', 'buffalo_l', root='~/.insightface')
model_path = os.path.join(assets_dir, 'det_10g.onnx')
sess_face = rt.InferenceSession(model_path, self.sess_options, providers=self.providers)
self.face_detector = SCRFD(model_path,sess_face)
self.face_detector.prepare(0,input_size=(640, 640))
model_path = os.path.join(assets_dir , 'w600k_r50.onnx')
sess_rec = rt.InferenceSession(model_path, self.sess_options, providers=self.providers)
self.rec_app = ArcFaceONNX(model_path,sess_rec)
self.rec_app.prepare(0)
model_path = 'inswapper_128.onnx'
# Download model if it doesn't exist
self.__download_model(model_path)
sess_swap = rt.InferenceSession(model_path, self.sess_options, providers=self.providers)
self.face_swapper = INSwapper(model_path,sess_swap)
def prepare_faces(self, faces):
self.replacement_faces=[]
for face in faces:
#image1 = cv2.imread(face.origin)
if "origin" in face:
face_threshold = face['threshold']
bboxes1, kpss1 = self.face_detector.autodetect(face['origin'], max_num=1)
if len(kpss1)<1:
raise Exception('No face detected on "Face to replace" image')
feat_original = self.rec_app.get(face['origin'], kpss1[0])
print(f"✅ Prepared SPECIFIC mode face swap (will match target face)")
else:
face_threshold = 0
self.first_face = True
feat_original = None
print('✅ Prepared SIMPLE mode face swap (will replace first face)')
#image2 = cv2.imread(face.destination)
_faces = self.__get_faces(face['destination'],max_num=1)
if len(_faces)<1:
raise Exception('No face detected on "Destination face" image')
self.replacement_faces.append((feat_original,_faces[0],face_threshold))
def __convert_video(self,video_path,output_video_path):
if self.video_has_audio:
print("Merging audio with the refaced video...")
new_path = output_video_path + str(random.randint(0,999)) + "_c.mp4"
#stream = ffmpeg.input(output_video_path)
in1 = ffmpeg.input(output_video_path)
in2 = ffmpeg.input(video_path)
out = ffmpeg.output(in1.video, in2.audio, new_path,video_bitrate=self.ffmpeg_video_bitrate,vcodec=self.ffmpeg_video_encoder)
out.run(overwrite_output=True,quiet=True)
else:
new_path = output_video_path
print("The video doesn't have audio, so post-processing is not necessary")
print(f"The process has finished.\nThe refaced video can be found at {os.path.abspath(new_path)}")
return new_path
def __get_faces(self,frame,max_num=0):
bboxes, kpss = self.face_detector.detect(frame,max_num=max_num,metric='default')
if bboxes.shape[0] == 0:
return []
ret = []
for i in range(bboxes.shape[0]):
bbox = bboxes[i, 0:4]
det_score = bboxes[i, 4]
kps = None
if kpss is not None:
kps = kpss[i]
face = Face(bbox=bbox, kps=kps, det_score=det_score)
face.embedding = self.rec_app.get(frame, kps)
ret.append(face)
return ret
def __compute_iou(self, box1, box2):
"""Compute Intersection over Union for face tracking"""
x1 = max(box1[0], box2[0])
y1 = max(box1[1], box2[1])
x2 = min(box1[2], box2[2])
y2 = min(box1[3], box2[3])
intersection = max(0, x2 - x1) * max(0, y2 - y1)
area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
area2 = (box2[2] - box2[0]) * (box2[3] - box2[1])
union = area1 + area2 - intersection
return intersection / union if union > 0 else 0
def __enhance_face_gfpgan(self, swapped_face, bbox):
"""Enhance face quality using GFPGAN"""
if not self.enable_face_enhancement or self.face_enhancer is None:
return swapped_face
try:
x1, y1, x2, y2 = map(int, bbox)
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(swapped_face.shape[1], x2), min(swapped_face.shape[0], y2)
if x2 <= x1 or y2 <= y1:
return swapped_face
# Extract face region
face_region = swapped_face[y1:y2, x1:x2].copy()
# Enhance with GFPGAN
_, _, enhanced_face = self.face_enhancer.enhance(
face_region,
has_aligned=False,
only_center_face=True,
paste_back=True
)
if enhanced_face is not None:
# Create result image
result = swapped_face.copy()
result[y1:y2, x1:x2] = enhanced_face
return result
else:
return swapped_face
except Exception as e:
print(f"GFPGAN enhancement failed: {e}")
return swapped_face
def __color_correct_face(self, swapped_face, target_face, bbox):
"""Apply color correction to match lighting and skin tone"""
try:
x1, y1, x2, y2 = map(int, bbox)
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(swapped_face.shape[1], x2), min(swapped_face.shape[0], y2)
if x2 <= x1 or y2 <= y1:
return swapped_face
# Work on a copy to avoid modifying original
result = swapped_face.copy()
# Extract face regions
swapped_region = result[y1:y2, x1:x2].copy()
target_region = target_face[y1:y2, x1:x2]
if swapped_region.size == 0 or target_region.size == 0:
return swapped_face
# Calculate mean and std for each channel
for i in range(3): # BGR channels
swapped_mean, swapped_std = cv2.meanStdDev(swapped_region[:,:,i])
target_mean, target_std = cv2.meanStdDev(target_region[:,:,i])
# Avoid division by zero
if swapped_std[0][0] > 1: # Only if there's enough variance
# Match the color distribution (subtle adjustment)
factor = min(target_std[0][0] / swapped_std[0][0], 1.5) # Limit adjustment
swapped_region[:,:,i] = np.clip(
(swapped_region[:,:,i] - swapped_mean[0][0]) * factor * 0.5 + swapped_mean[0][0] * 0.5 + target_mean[0][0] * 0.5,
0, 255
).astype(np.uint8)
# Put corrected region back
result[y1:y2, x1:x2] = swapped_region
return result
except Exception as e:
print(f"Color correction failed: {e}")
return swapped_face
def __seamless_blend(self, swapped_face, target_face, bbox):
"""Apply seamless cloning for better edge integration"""
try:
x1, y1, x2, y2 = map(int, bbox)
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(swapped_face.shape[1], x2), min(swapped_face.shape[0], y2)
if x2 <= x1 or y2 <= y1:
return swapped_face
# Create center point for seamless clone
center_x = (x1 + x2) // 2
center_y = (y1 + y2) // 2
center = (center_x, center_y)
# Create mask for the face region
mask = np.zeros(target_face.shape[:2], dtype=np.uint8)
# Create elliptical mask for more natural blending
width = x2 - x1
height = y2 - y1
cv2.ellipse(mask, center, (width//2, height//2), 0, 0, 360, 255, -1)
# Apply Gaussian blur to mask for softer edges
mask = cv2.GaussianBlur(mask, (15, 15), 0)
# Use cv2.seamlessClone for better blending
try:
result = cv2.seamlessClone(swapped_face, target_face, mask, center, cv2.NORMAL_CLONE)
return result
except:
# Fallback to alpha blending if seamless clone fails
mask_3channel = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR) / 255.0
result = (swapped_face * mask_3channel + target_face * (1 - mask_3channel)).astype(np.uint8)
return result
except Exception as e:
print(f"Seamless blending failed: {e}")
return swapped_face
def __temporal_smooth(self, current_frame):
"""Apply temporal smoothing to reduce frame-to-frame jitter"""
if not self.enable_temporal_blend or self.prev_blended_frame is None:
self.prev_blended_frame = current_frame.copy()
return current_frame
try:
# Blend with previous frame for smoothness
alpha = self.temporal_blend_alpha
smoothed = cv2.addWeighted(
current_frame, 1 - alpha,
self.prev_blended_frame, alpha,
0
)
self.prev_blended_frame = smoothed.copy()
return smoothed
except:
self.prev_blended_frame = current_frame.copy()
return current_frame
def __enhance_quality(self, swapped_frame, original_frame, bbox):
"""Apply all quality enhancements to the swapped frame"""
result = swapped_frame.copy()
# 1. GFPGAN face enhancement (if available)
if self.enable_face_enhancement:
try:
result = self.__enhance_face_gfpgan(result, bbox)
except Exception as e:
print(f"Skipping GFPGAN enhancement: {e}")
pass
# 2. Subtle color correction to match lighting (optional, conservative)
if self.enable_color_correction:
try:
result = self.__color_correct_face(result, original_frame, bbox)
except Exception as e:
print(f"Skipping color correction: {e}")
pass
# 3. Skip seamless blending - INSwapper already handles this
# The seamless_clone was causing black backgrounds
# 4. Light sharpening only if needed
try:
# Very subtle sharpening to maintain detail
kernel = np.array([[0, -0.25, 0],
[-0.25, 2, -0.25],
[0, -0.25, 0]])
sharpened = cv2.filter2D(result, -1, kernel)
# Blend 30% sharpened with 70% original
result = cv2.addWeighted(result, 0.7, sharpened, 0.3, 0)
except Exception as e:
print(f"Skipping sharpening: {e}")
pass
# 5. Temporal smoothing for motion stability
try:
result = self.__temporal_smooth(result)
except Exception as e:
print(f"Skipping temporal smoothing: {e}")
pass
return result
def process_first_face(self,frame):
faces = self.__get_faces(frame,max_num=1)
# Aggressive anti-flicker: handle no detection or weak detection
if len(faces) == 0:
# No face detected - check if we have stable swap history
if 'first_face' in self.face_memory:
self.face_memory['first_face']['frames_missing'] += 1
# During occlusion/fast motion, use last good swap to maintain stability
if self.face_memory['first_face']['frames_missing'] <= self.occlusion_tolerance:
if self.last_swapped_frame is not None and self.stable_swap_count > 3:
# Use cached swap result for stability
return self.last_swapped_frame.copy()
return frame # Skip swapping but show original
else:
# Face gone too long, reset
del self.face_memory['first_face']
self.last_swapped_frame = None
self.stable_swap_count = 0
return frame
# Face detected - determine if we should swap
if len(faces) != 0:
# Check if tracked face (motion consistency)
tracked = False
iou = 0
if 'first_face' in self.face_memory and len(self.prev_faces) > 0:
iou = self.__compute_iou(faces[0].bbox, self.prev_faces[0].bbox)
# Very lenient tracking for fast motion
if iou > self.face_tracking_threshold:
tracked = True
# Decision: swap if good confidence OR tracked OR stable history
should_swap = False
if faces[0].det_score > 0.5:
should_swap = True # High confidence
elif tracked and faces[0].det_score > 0.3:
should_swap = True # Tracked with reasonable confidence
elif self.stable_swap_count > 5 and faces[0].det_score > 0.25:
should_swap = True # Stable history, accept lower confidence
if should_swap:
# Perform basic swap
swapped_frame = self.face_swapper.get(frame.copy(), faces[0], self.replacement_faces[0][1], paste_back=True)
# Apply quality enhancements
swapped_frame = self.__enhance_quality(swapped_frame, frame, faces[0].bbox)
# Cache this successful swap
self.last_swapped_frame = swapped_frame.copy()
self.stable_swap_count += 1
# Update memory
self.face_memory['first_face'] = {
'bbox': faces[0].bbox,
'confidence': faces[0].det_score,
'frames_missing': 0
}
self.prev_faces = [faces[0]]
return swapped_frame
else:
# Low confidence, not tracked - use cached if available
if self.last_swapped_frame is not None and self.stable_swap_count > 3:
return self.last_swapped_frame.copy()
# Otherwise maintain tracking but don't swap
self.prev_faces = [faces[0]]
return frame
return frame
def process_faces(self,frame):
faces = self.__get_faces(frame,max_num=0)
# Handle no faces detected - use cached swap if stable
if len(faces) == 0:
if self.last_swapped_frame is not None and self.stable_swap_count > 5:
return self.last_swapped_frame.copy()
return frame
# Aggressive temporal smoothing with motion tracking
matched_faces = []
for face in faces:
best_match = None
best_iou = 0
face_id = None
# Try to match with previous frame faces - lenient for fast motion
for prev_idx, prev_face in enumerate(self.prev_faces):
iou = self.__compute_iou(face.bbox, prev_face.bbox)
if iou > best_iou and iou > self.face_tracking_threshold:
best_iou = iou
best_match = prev_face
face_id = prev_idx
matched_faces.append((face, best_match, best_iou, face_id))
# Process face swapping with aggressive stability
swapped = False
for rep_face in self.replacement_faces:
for i in range(len(faces) - 1, -1, -1):
matched_info = matched_faces[i] if i < len(matched_faces) else (faces[i], None, 0, None)
current_face = matched_info[0]
prev_match = matched_info[1]
iou_score = matched_info[2]
# SIMPLE MODE: If no origin face specified, swap the first/largest face
if rep_face[0] is None:
# Simple mode - just swap the first face we encounter
if i == 0: # Only process first face
print(f"🔄 SIMPLE MODE: Swapping first detected face (confidence: {current_face.det_score:.2f})")
# Perform basic swap
temp_frame = self.face_swapper.get(frame.copy(), faces[i], rep_face[1], paste_back=True)
# Apply quality enhancements
frame = self.__enhance_quality(temp_frame, frame, current_face.bbox)
swapped = True
del faces[i]
break
else:
continue # Skip other faces in simple mode
# SPECIFIC MODE: Match by face similarity
# Very aggressive threshold reduction for tracked faces
effective_threshold = rep_face[2]
if prev_match is not None:
if iou_score > 0.3: # Tracked with some overlap
effective_threshold = max(0, rep_face[2] - 0.25) # Major boost
elif iou_score > 0.1: # Weak tracking (fast motion)
effective_threshold = max(0, rep_face[2] - 0.15)
# More lenient confidence check for tracked faces
min_confidence = 0.5
if prev_match is not None and iou_score > 0.2:
min_confidence = 0.25 # Much lower for tracked
elif self.stable_swap_count > 5:
min_confidence = 0.35 # Lower if we have stable history
if current_face.det_score < min_confidence:
continue
# Compute face similarity
sim = self.rec_app.compute_sim(rep_face[0], faces[i].embedding)
# Perform swap if similarity meets threshold
if sim >= effective_threshold:
# Perform basic swap
temp_frame = self.face_swapper.get(frame.copy(), faces[i], rep_face[1], paste_back=True)
# Apply quality enhancements
frame = self.__enhance_quality(temp_frame, frame, current_face.bbox)
swapped = True
del faces[i]
break
# Update tracking state
if swapped:
self.last_swapped_frame = frame.copy()
self.stable_swap_count += 1
else:
self.stable_swap_count = max(0, self.stable_swap_count - 1)
# Store current faces for next frame tracking
self.prev_faces = self.__get_faces(frame, max_num=0)
return frame
def __check_video_has_audio(self,video_path):
self.video_has_audio = False
probe = ffmpeg.probe(video_path)
audio_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'audio'), None)
if audio_stream is not None:
self.video_has_audio = True
def reface_group(self, faces, frames, output):
# Sequential processing to maintain temporal consistency
# Parallel processing breaks face tracking and causes flickering
process_func = self.process_first_face if self.first_face else self.process_faces
for frame in tqdm(frames, desc="Processing frames"):
result = process_func(frame)
output.write(result)
def reface(self, video_path, faces):
self.__check_video_has_audio(video_path)
output_video_path = os.path.join('out',Path(video_path).name)
self.prepare_faces(faces)
# Reset all temporal tracking for new video
self.prev_faces = []
self.face_memory = {}
self.last_swapped_frame = None
self.stable_swap_count = 0
self.prev_blended_frame = None # Reset temporal smoothing
cap = cv2.VideoCapture(video_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"Total frames: {total_frames}")
fps = cap.get(cv2.CAP_PROP_FPS)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
output = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))
frames=[]
self.k = 1
with tqdm(total=total_frames,desc="Extracting frames") as pbar:
while cap.isOpened():
flag, frame = cap.read()
if flag and len(frame)>0:
frames.append(frame.copy())
pbar.update()
else:
break
if (len(frames) > 1000):
self.reface_group(faces,frames,output)
frames=[]
cap.release()
pbar.close()
self.reface_group(faces,frames,output)
frames=[]
output.release()
return self.__convert_video(video_path,output_video_path)
def __try_ffmpeg_encoder(self, vcodec):
print(f"Trying FFMPEG {vcodec} encoder")
command = ['ffmpeg', '-y', '-f','lavfi','-i','testsrc=duration=1:size=1280x720:rate=30','-vcodec',vcodec,'testsrc.mp4']
try:
subprocess.run(command, check=True, capture_output=True).stderr
except subprocess.CalledProcessError as e:
print(f"FFMPEG {vcodec} encoder doesn't work -> Disabled.")
return False
print(f"FFMPEG {vcodec} encoder works")
return True
def __check_encoders(self):
self.ffmpeg_video_encoder='libx264'
self.ffmpeg_video_bitrate='0'
pattern = r"encoders: ([a-zA-Z0-9_]+(?: [a-zA-Z0-9_]+)*)"
command = ['ffmpeg', '-codecs', '--list-encoders']
commandout = subprocess.run(command, check=True, capture_output=True).stdout
result = commandout.decode('utf-8').split('\n')
for r in result:
if "264" in r:
encoders = re.search(pattern, r).group(1).split(' ')
for v_c in Refacer.VIDEO_CODECS:
for v_k in encoders:
if v_c == v_k:
if self.__try_ffmpeg_encoder(v_k):
self.ffmpeg_video_encoder=v_k
self.ffmpeg_video_bitrate=Refacer.VIDEO_CODECS[v_k]
print(f"Video codec for FFMPEG: {self.ffmpeg_video_encoder}")
return
VIDEO_CODECS = {
'h264_videotoolbox':'0', #osx HW acceleration
'h264_nvenc':'0', #NVIDIA HW acceleration
#'h264_qsv', #Intel HW acceleration
#'h264_vaapi', #Intel HW acceleration
#'h264_omx', #HW acceleration
'libx264':'0' #No HW acceleration
}