Switcher / Frame_remover_v2.py
crash10155's picture
Upload 166 files
289fb74 verified
import os
import cv2
import logging
import traceback
import shutil
import tempfile
from datetime import timedelta
from tqdm import tqdm
import threading
from insightface.app import FaceAnalysis
from moviepy.editor import VideoFileClip, concatenate_audioclips
# Set the temporary directory for temporary files
tempfile.tempdir = 'D:\\Switcher\\Temp' # Update this path if needed
# Configure logging
logging.basicConfig(
filename='D:\\Switcher\\video_processing.log', # Update this path if needed
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Thread lock for face analyzer initialization
THREAD_LOCK = threading.Lock()
FACE_ANALYSER = None
def get_face_analyser():
global FACE_ANALYSER
with THREAD_LOCK:
if FACE_ANALYSER is None:
# Initialize FaceAnalysis with specified model and providers
FACE_ANALYSER = FaceAnalysis(name='buffalo_l', providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
# Prepare the analyzer
FACE_ANALYSER.prepare(ctx_id=0)
logging.info(f'FaceAnalysis initialized. Models loaded: {list(FACE_ANALYSER.models.keys())}')
return FACE_ANALYSER
def get_many_faces(frame):
try:
faces = get_face_analyser().get(frame)
return faces
except Exception as e:
logging.error(f'Error in get_many_faces: {e}')
logging.error(traceback.format_exc())
return []
def process_video(filename, video_dir, output_dir):
try:
video_path = os.path.join(video_dir, filename)
output_video_path = os.path.join(output_dir, filename)
temp_video_path = os.path.join(tempfile.gettempdir(), f"temp_{filename}")
logging.info(f'Starting processing video: {video_path}')
# Open video file
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
logging.error(f'Failed to open video file: {video_path}')
return None
# Get video properties
fps = cap.get(cv2.CAP_PROP_FPS)
if fps == 0 or fps is None or fps != fps:
fps = 30 # Default FPS if unable to get FPS from video
logging.warning(f'FPS not detected in {filename}. Using default FPS: {fps}')
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration_seconds = total_frames / fps
duration = str(timedelta(seconds=int(duration_seconds)))
# Define the codec and create VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # You can use 'XVID' or other codecs
out = cv2.VideoWriter(temp_video_path, fourcc, fps, (width, height))
frames_with_faces = 0
frame_index = 0
# Initialize variables for audio synchronization
face_intervals = []
in_face_sequence = False
# Create a progress bar using tqdm
with tqdm(total=total_frames, desc=f"Processing {filename}", unit="frame", ncols=120) as pbar:
while True:
ret, frame = cap.read()
if not ret:
break
# Analyze frame for faces
faces = get_many_faces(frame)
if len(faces) > 0:
frames_with_faces += 1
# Write the frame to the output video
out.write(frame)
# Start a new face interval if not already in one
if not in_face_sequence:
in_face_sequence = True
start_time = frame_index / fps
else:
# End the current face interval if in one
if in_face_sequence:
in_face_sequence = False
end_time = frame_index / fps
face_intervals.append((start_time, end_time))
frame_index += 1
pbar.update(1) # Update the progress bar by one frame
# Calculate percentage of frames with faces
percentage = (frames_with_faces / frame_index) * 100 if frame_index > 0 else 0
# Update the postfix with the number of frames with faces and percentage
pbar.set_postfix({
'Faces Detected': frames_with_faces,
'Face Percentage': f"{percentage:.2f}%"
})
# Check if we were still in a face interval at the end
if in_face_sequence:
end_time = frame_index / fps
face_intervals.append((start_time, end_time))
cap.release()
out.release()
logging.info(f'Finished processing video: {video_path}')
logging.info(f'Total frames: {total_frames}, Frames with faces: {frames_with_faces}, Face Percentage: {frames_with_faces / total_frames * 100:.2f}%')
# If no faces detected, delete the empty output video
if frames_with_faces == 0:
os.remove(temp_video_path)
logging.info(f'No faces detected in video: {filename}')
return {
'filename': filename,
'duration': duration,
'total_frames': total_frames,
'frames_with_faces': frames_with_faces,
'output_video': None
}
# Process the audio
# Load the original video using moviepy
original_video = VideoFileClip(video_path)
# Extract the audio
original_audio = original_video.audio
# Extract the audio segments corresponding to face intervals
audio_segments = []
for interval in face_intervals:
audio_segment = original_audio.subclip(interval[0], interval[1])
audio_segments.append(audio_segment)
# Concatenate the audio segments
if audio_segments:
final_audio = concatenate_audioclips(audio_segments)
else:
final_audio = None
# Now, load the processed video (without audio) using moviepy
processed_video = VideoFileClip(temp_video_path)
# Set the audio to the processed video
if final_audio:
processed_video = processed_video.set_audio(final_audio)
# Save the video with audio
processed_video.write_videofile(output_video_path, codec='libx264', audio_codec='aac', fps=fps, remove_temp=True)
processed_video.close()
else:
logging.warning(f"No audio segments extracted for video: {filename}")
# Save the video without audio
shutil.move(temp_video_path, output_video_path)
# Close original video and audio
original_video.close()
# Remove temporary video file if it still exists
if os.path.exists(temp_video_path):
os.remove(temp_video_path)
# Return result
return {
'filename': filename,
'duration': duration,
'total_frames': total_frames,
'frames_with_faces': frames_with_faces,
'output_video': output_video_path
}
except Exception as e:
logging.error(f'Error processing video {filename}: {e}')
logging.error(traceback.format_exc())
return None
def main():
# Directories and file paths
video_dir = 'D:\\Switcher\\Convert' # Update this path if needed
output_dir = 'D:\\Switcher\\Processed_Videos' # Update this path if needed
output_file = 'D:\\Switcher\\video_analysis_results.txt' # Update this path if needed
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Supported video file extensions
video_extensions = ['.mp4', '.avi', '.mov', '.mkv'] # Add more extensions if needed
# Collect video files
video_files = [
f for f in os.listdir(video_dir)
if os.path.isfile(os.path.join(video_dir, f)) and f.lower().endswith(tuple(video_extensions))
]
results = []
# Process videos one at a time
for filename in video_files:
logging.info(f'Starting analysis for video: {filename}')
result = process_video(filename, video_dir, output_dir)
if result:
results.append(result)
else:
logging.error(f'Failed to process video: {filename}')
logging.info(f'Completed analysis for video: {filename}')
# Write results to output file
with open(output_file, 'w') as f:
for result in results:
f.write(f"Video: {result['filename']}\n")
f.write(f"Duration: {result['duration']}\n")
f.write(f"Total frames: {result['total_frames']}\n")
f.write(f"Frames with faces: {result['frames_with_faces']}\n")
face_percentage = (result['frames_with_faces'] / result['total_frames'] * 100) if result['total_frames'] > 0 else 0
f.write(f"Face Percentage: {face_percentage:.2f}%\n")
if result['output_video']:
f.write(f"Processed video saved to: {result['output_video']}\n")
else:
f.write("No faces detected; no processed video generated.\n")
f.write('-' * 40 + '\n')
print('Analysis complete. Results saved to:', output_file)
logging.info('Analysis complete. Results saved to: ' + output_file)
if __name__ == '__main__':
main()