Spaces:
Paused
Paused
| import os | |
| import cv2 | |
| import logging | |
| import traceback | |
| import shutil | |
| import tempfile | |
| from datetime import timedelta | |
| from tqdm import tqdm | |
| import threading | |
| from insightface.app import FaceAnalysis | |
| from moviepy.editor import VideoFileClip, concatenate_audioclips | |
| # Set the temporary directory for temporary files | |
| tempfile.tempdir = 'D:\\Switcher\\Temp' # Update this path if needed | |
| # Configure logging | |
| logging.basicConfig( | |
| filename='D:\\Switcher\\video_processing.log', # Update this path if needed | |
| level=logging.INFO, | |
| format='%(asctime)s [%(levelname)s] %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| # Thread lock for face analyzer initialization | |
| THREAD_LOCK = threading.Lock() | |
| FACE_ANALYSER = None | |
| def get_face_analyser(): | |
| global FACE_ANALYSER | |
| with THREAD_LOCK: | |
| if FACE_ANALYSER is None: | |
| # Initialize FaceAnalysis with specified model and providers | |
| FACE_ANALYSER = FaceAnalysis(name='buffalo_l', providers=['CUDAExecutionProvider', 'CPUExecutionProvider']) | |
| # Prepare the analyzer | |
| FACE_ANALYSER.prepare(ctx_id=0) | |
| logging.info(f'FaceAnalysis initialized. Models loaded: {list(FACE_ANALYSER.models.keys())}') | |
| return FACE_ANALYSER | |
| def get_many_faces(frame): | |
| try: | |
| faces = get_face_analyser().get(frame) | |
| return faces | |
| except Exception as e: | |
| logging.error(f'Error in get_many_faces: {e}') | |
| logging.error(traceback.format_exc()) | |
| return [] | |
| def process_video(filename, video_dir, output_dir): | |
| try: | |
| video_path = os.path.join(video_dir, filename) | |
| output_video_path = os.path.join(output_dir, filename) | |
| temp_video_path = os.path.join(tempfile.gettempdir(), f"temp_{filename}") | |
| logging.info(f'Starting processing video: {video_path}') | |
| # Open video file | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| logging.error(f'Failed to open video file: {video_path}') | |
| return None | |
| # Get video properties | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| if fps == 0 or fps is None or fps != fps: | |
| fps = 30 # Default FPS if unable to get FPS from video | |
| logging.warning(f'FPS not detected in {filename}. Using default FPS: {fps}') | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| duration_seconds = total_frames / fps | |
| duration = str(timedelta(seconds=int(duration_seconds))) | |
| # Define the codec and create VideoWriter object | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') # You can use 'XVID' or other codecs | |
| out = cv2.VideoWriter(temp_video_path, fourcc, fps, (width, height)) | |
| frames_with_faces = 0 | |
| frame_index = 0 | |
| # Initialize variables for audio synchronization | |
| face_intervals = [] | |
| in_face_sequence = False | |
| # Create a progress bar using tqdm | |
| with tqdm(total=total_frames, desc=f"Processing {filename}", unit="frame", ncols=120) as pbar: | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Analyze frame for faces | |
| faces = get_many_faces(frame) | |
| if len(faces) > 0: | |
| frames_with_faces += 1 | |
| # Write the frame to the output video | |
| out.write(frame) | |
| # Start a new face interval if not already in one | |
| if not in_face_sequence: | |
| in_face_sequence = True | |
| start_time = frame_index / fps | |
| else: | |
| # End the current face interval if in one | |
| if in_face_sequence: | |
| in_face_sequence = False | |
| end_time = frame_index / fps | |
| face_intervals.append((start_time, end_time)) | |
| frame_index += 1 | |
| pbar.update(1) # Update the progress bar by one frame | |
| # Calculate percentage of frames with faces | |
| percentage = (frames_with_faces / frame_index) * 100 if frame_index > 0 else 0 | |
| # Update the postfix with the number of frames with faces and percentage | |
| pbar.set_postfix({ | |
| 'Faces Detected': frames_with_faces, | |
| 'Face Percentage': f"{percentage:.2f}%" | |
| }) | |
| # Check if we were still in a face interval at the end | |
| if in_face_sequence: | |
| end_time = frame_index / fps | |
| face_intervals.append((start_time, end_time)) | |
| cap.release() | |
| out.release() | |
| logging.info(f'Finished processing video: {video_path}') | |
| logging.info(f'Total frames: {total_frames}, Frames with faces: {frames_with_faces}, Face Percentage: {frames_with_faces / total_frames * 100:.2f}%') | |
| # If no faces detected, delete the empty output video | |
| if frames_with_faces == 0: | |
| os.remove(temp_video_path) | |
| logging.info(f'No faces detected in video: {filename}') | |
| return { | |
| 'filename': filename, | |
| 'duration': duration, | |
| 'total_frames': total_frames, | |
| 'frames_with_faces': frames_with_faces, | |
| 'output_video': None | |
| } | |
| # Process the audio | |
| # Load the original video using moviepy | |
| original_video = VideoFileClip(video_path) | |
| # Extract the audio | |
| original_audio = original_video.audio | |
| # Extract the audio segments corresponding to face intervals | |
| audio_segments = [] | |
| for interval in face_intervals: | |
| audio_segment = original_audio.subclip(interval[0], interval[1]) | |
| audio_segments.append(audio_segment) | |
| # Concatenate the audio segments | |
| if audio_segments: | |
| final_audio = concatenate_audioclips(audio_segments) | |
| else: | |
| final_audio = None | |
| # Now, load the processed video (without audio) using moviepy | |
| processed_video = VideoFileClip(temp_video_path) | |
| # Set the audio to the processed video | |
| if final_audio: | |
| processed_video = processed_video.set_audio(final_audio) | |
| # Save the video with audio | |
| processed_video.write_videofile(output_video_path, codec='libx264', audio_codec='aac', fps=fps, remove_temp=True) | |
| processed_video.close() | |
| else: | |
| logging.warning(f"No audio segments extracted for video: {filename}") | |
| # Save the video without audio | |
| shutil.move(temp_video_path, output_video_path) | |
| # Close original video and audio | |
| original_video.close() | |
| # Remove temporary video file if it still exists | |
| if os.path.exists(temp_video_path): | |
| os.remove(temp_video_path) | |
| # Return result | |
| return { | |
| 'filename': filename, | |
| 'duration': duration, | |
| 'total_frames': total_frames, | |
| 'frames_with_faces': frames_with_faces, | |
| 'output_video': output_video_path | |
| } | |
| except Exception as e: | |
| logging.error(f'Error processing video {filename}: {e}') | |
| logging.error(traceback.format_exc()) | |
| return None | |
| def main(): | |
| # Directories and file paths | |
| video_dir = 'D:\\Switcher\\Convert' # Update this path if needed | |
| output_dir = 'D:\\Switcher\\Processed_Videos' # Update this path if needed | |
| output_file = 'D:\\Switcher\\video_analysis_results.txt' # Update this path if needed | |
| # Create output directory if it doesn't exist | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Supported video file extensions | |
| video_extensions = ['.mp4', '.avi', '.mov', '.mkv'] # Add more extensions if needed | |
| # Collect video files | |
| video_files = [ | |
| f for f in os.listdir(video_dir) | |
| if os.path.isfile(os.path.join(video_dir, f)) and f.lower().endswith(tuple(video_extensions)) | |
| ] | |
| results = [] | |
| # Process videos one at a time | |
| for filename in video_files: | |
| logging.info(f'Starting analysis for video: {filename}') | |
| result = process_video(filename, video_dir, output_dir) | |
| if result: | |
| results.append(result) | |
| else: | |
| logging.error(f'Failed to process video: {filename}') | |
| logging.info(f'Completed analysis for video: {filename}') | |
| # Write results to output file | |
| with open(output_file, 'w') as f: | |
| for result in results: | |
| f.write(f"Video: {result['filename']}\n") | |
| f.write(f"Duration: {result['duration']}\n") | |
| f.write(f"Total frames: {result['total_frames']}\n") | |
| f.write(f"Frames with faces: {result['frames_with_faces']}\n") | |
| face_percentage = (result['frames_with_faces'] / result['total_frames'] * 100) if result['total_frames'] > 0 else 0 | |
| f.write(f"Face Percentage: {face_percentage:.2f}%\n") | |
| if result['output_video']: | |
| f.write(f"Processed video saved to: {result['output_video']}\n") | |
| else: | |
| f.write("No faces detected; no processed video generated.\n") | |
| f.write('-' * 40 + '\n') | |
| print('Analysis complete. Results saved to:', output_file) | |
| logging.info('Analysis complete. Results saved to: ' + output_file) | |
| if __name__ == '__main__': | |
| main() | |