import os import cv2 import logging import traceback import shutil import tempfile from datetime import timedelta from tqdm import tqdm import threading from insightface.app import FaceAnalysis from moviepy.editor import VideoFileClip, concatenate_audioclips # Set the temporary directory for temporary files tempfile.tempdir = 'D:\\Switcher\\Temp' # Update this path if needed # Configure logging logging.basicConfig( filename='D:\\Switcher\\video_processing.log', # Update this path if needed level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # Thread lock for face analyzer initialization THREAD_LOCK = threading.Lock() FACE_ANALYSER = None def get_face_analyser(): global FACE_ANALYSER with THREAD_LOCK: if FACE_ANALYSER is None: # Initialize FaceAnalysis with specified model and providers FACE_ANALYSER = FaceAnalysis(name='buffalo_l', providers=['CUDAExecutionProvider', 'CPUExecutionProvider']) # Prepare the analyzer FACE_ANALYSER.prepare(ctx_id=0) logging.info(f'FaceAnalysis initialized. Models loaded: {list(FACE_ANALYSER.models.keys())}') return FACE_ANALYSER def get_many_faces(frame): try: faces = get_face_analyser().get(frame) return faces except Exception as e: logging.error(f'Error in get_many_faces: {e}') logging.error(traceback.format_exc()) return [] def process_video(filename, video_dir, output_dir): try: video_path = os.path.join(video_dir, filename) output_video_path = os.path.join(output_dir, filename) temp_video_path = os.path.join(tempfile.gettempdir(), f"temp_{filename}") logging.info(f'Starting processing video: {video_path}') # Open video file cap = cv2.VideoCapture(video_path) if not cap.isOpened(): logging.error(f'Failed to open video file: {video_path}') return None # Get video properties fps = cap.get(cv2.CAP_PROP_FPS) if fps == 0 or fps is None or fps != fps: fps = 30 # Default FPS if unable to get FPS from video logging.warning(f'FPS not detected in {filename}. Using default FPS: {fps}') width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration_seconds = total_frames / fps duration = str(timedelta(seconds=int(duration_seconds))) # Define the codec and create VideoWriter object fourcc = cv2.VideoWriter_fourcc(*'mp4v') # You can use 'XVID' or other codecs out = cv2.VideoWriter(temp_video_path, fourcc, fps, (width, height)) frames_with_faces = 0 frame_index = 0 # Initialize variables for audio synchronization face_intervals = [] in_face_sequence = False # Create a progress bar using tqdm with tqdm(total=total_frames, desc=f"Processing {filename}", unit="frame", ncols=120) as pbar: while True: ret, frame = cap.read() if not ret: break # Analyze frame for faces faces = get_many_faces(frame) if len(faces) > 0: frames_with_faces += 1 # Write the frame to the output video out.write(frame) # Start a new face interval if not already in one if not in_face_sequence: in_face_sequence = True start_time = frame_index / fps else: # End the current face interval if in one if in_face_sequence: in_face_sequence = False end_time = frame_index / fps face_intervals.append((start_time, end_time)) frame_index += 1 pbar.update(1) # Update the progress bar by one frame # Calculate percentage of frames with faces percentage = (frames_with_faces / frame_index) * 100 if frame_index > 0 else 0 # Update the postfix with the number of frames with faces and percentage pbar.set_postfix({ 'Faces Detected': frames_with_faces, 'Face Percentage': f"{percentage:.2f}%" }) # Check if we were still in a face interval at the end if in_face_sequence: end_time = frame_index / fps face_intervals.append((start_time, end_time)) cap.release() out.release() logging.info(f'Finished processing video: {video_path}') logging.info(f'Total frames: {total_frames}, Frames with faces: {frames_with_faces}, Face Percentage: {frames_with_faces / total_frames * 100:.2f}%') # If no faces detected, delete the empty output video if frames_with_faces == 0: os.remove(temp_video_path) logging.info(f'No faces detected in video: {filename}') return { 'filename': filename, 'duration': duration, 'total_frames': total_frames, 'frames_with_faces': frames_with_faces, 'output_video': None } # Process the audio # Load the original video using moviepy original_video = VideoFileClip(video_path) # Extract the audio original_audio = original_video.audio # Extract the audio segments corresponding to face intervals audio_segments = [] for interval in face_intervals: audio_segment = original_audio.subclip(interval[0], interval[1]) audio_segments.append(audio_segment) # Concatenate the audio segments if audio_segments: final_audio = concatenate_audioclips(audio_segments) else: final_audio = None # Now, load the processed video (without audio) using moviepy processed_video = VideoFileClip(temp_video_path) # Set the audio to the processed video if final_audio: processed_video = processed_video.set_audio(final_audio) # Save the video with audio processed_video.write_videofile(output_video_path, codec='libx264', audio_codec='aac', fps=fps, remove_temp=True) processed_video.close() else: logging.warning(f"No audio segments extracted for video: {filename}") # Save the video without audio shutil.move(temp_video_path, output_video_path) # Close original video and audio original_video.close() # Remove temporary video file if it still exists if os.path.exists(temp_video_path): os.remove(temp_video_path) # Return result return { 'filename': filename, 'duration': duration, 'total_frames': total_frames, 'frames_with_faces': frames_with_faces, 'output_video': output_video_path } except Exception as e: logging.error(f'Error processing video {filename}: {e}') logging.error(traceback.format_exc()) return None def main(): # Directories and file paths video_dir = 'D:\\Switcher\\Convert' # Update this path if needed output_dir = 'D:\\Switcher\\Processed_Videos' # Update this path if needed output_file = 'D:\\Switcher\\video_analysis_results.txt' # Update this path if needed # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Supported video file extensions video_extensions = ['.mp4', '.avi', '.mov', '.mkv'] # Add more extensions if needed # Collect video files video_files = [ f for f in os.listdir(video_dir) if os.path.isfile(os.path.join(video_dir, f)) and f.lower().endswith(tuple(video_extensions)) ] results = [] # Process videos one at a time for filename in video_files: logging.info(f'Starting analysis for video: {filename}') result = process_video(filename, video_dir, output_dir) if result: results.append(result) else: logging.error(f'Failed to process video: {filename}') logging.info(f'Completed analysis for video: {filename}') # Write results to output file with open(output_file, 'w') as f: for result in results: f.write(f"Video: {result['filename']}\n") f.write(f"Duration: {result['duration']}\n") f.write(f"Total frames: {result['total_frames']}\n") f.write(f"Frames with faces: {result['frames_with_faces']}\n") face_percentage = (result['frames_with_faces'] / result['total_frames'] * 100) if result['total_frames'] > 0 else 0 f.write(f"Face Percentage: {face_percentage:.2f}%\n") if result['output_video']: f.write(f"Processed video saved to: {result['output_video']}\n") else: f.write("No faces detected; no processed video generated.\n") f.write('-' * 40 + '\n') print('Analysis complete. Results saved to:', output_file) logging.info('Analysis complete. Results saved to: ' + output_file) if __name__ == '__main__': main()