import logging import time import json import pandas as pd import cv2 from pathlib import Path from typing import Dict, Any, Optional, Tuple import os import concurrent.futures from app.utils.logging_utils import time_it, setup_logger from app.utils.data_utils import json_to_dataframe from app.core.config import settings from app.services.processing.speech_service import SpeechService from app.services.processing.emotion_analyzer import EmotionAnalyzer from app.services.processing.ai_analysis import AIAnalysisService from app.services.processing.eye_contact_analyzer import analyze_video_file as analyze_eye_contact_video from app.services.processing.body_language_analyzer import analyze_video_file as analyze_body_language_video from app.services.processing.ai_face_analyzer import AIFaceAnalyzer # Configure logging logger = setup_logger(__name__) class VideoProcessor: """Service for processing videos.""" def __init__(self): """Initialize the video processor.""" self.speech_service = SpeechService() self.emotion_analyzer = EmotionAnalyzer() self.ai_analysis_service = AIAnalysisService() @time_it def process_video( self, video_path: str, frame_rate: int = 1, backend: str = 'mediapipe', language: str = 'en', generate_annotated_video: bool = False, video_id: Optional[str] = None, status_callback = None, min_face_confidence: float = 0.5, min_face_size_ratio: float = 0.05, save_emotion_stats: bool = True, skip_frames: int = 2, # Default parameter, not used for frame sampling anymore adaptive_sampling: bool = False, # Disable adaptive sampling to match test behavior analyze_eye_contact: bool = True, analyze_body_language: bool = True, analyze_face: bool = True, job_title: str = "Professional", model_name: str = "gpt-4o" ) -> Tuple[str, str]: """ Process a video file for emotion analysis. Args: video_path: Path to the video file frame_rate: Process every nth frame (controls the sampling rate of frames for analysis) backend: Backend to use for face detection language: Language of the video generate_annotated_video: Whether to generate an annotated video video_id: ID of the video (optional) status_callback: Callback function for progress updates min_face_confidence: Minimum confidence for face detection min_face_size_ratio: Minimum face size as ratio of image dimensions save_emotion_stats: Whether to save detailed emotion statistics as JSON skip_frames: Legacy parameter, kept for backward compatibility but not used adaptive_sampling: Whether to use adaptive sampling analyze_eye_contact: Whether to analyze eye contact analyze_body_language: Whether to analyze body language analyze_face: Whether to analyze face job_title: Job title for face analysis Returns: Tuple of (transcript, analysis_json) """ start_time = time.time() # Add debug info about the video path logger.info(f"DEBUG - Processing video path: {video_path}") logger.info(f"DEBUG - Video file exists: {os.path.exists(video_path)}") if os.path.exists(video_path): logger.info(f"DEBUG - Video file size: {os.path.getsize(video_path) / (1024*1024):.2f} MB") # Create results directory if it doesn't exist results_dir = settings.RESULTS_DIR os.makedirs(results_dir, exist_ok=True) # Update status if callback provided if status_callback: status_callback(5) # 5% progress logger.info(f"Processing video: {video_path}") logger.info(f"Using backend: {backend}") logger.info(f"Language: {language}") # Force mediapipe backend for best GPU performance on Mac M3 if backend == 'opencv' or not backend or backend == "retinaface": logger.info(f"Backend '{backend}' doesn't support GPU acceleration or is not recommended.") logger.info(f"Switching to 'mediapipe' for GPU-accelerated frame analysis.") backend = "mediapipe" # Ensure we're using a GPU-compatible backend if backend not in ['mediapipe', 'ssd', 'mtcnn']: logger.info(f"Backend '{backend}' may not be optimized for GPU acceleration.") logger.info(f"Consider using 'mediapipe' for best GPU performance.") # Define worker functions for parallel processing def process_speech(video_path, language): logger.info("Starting speech-to-text processing...") try: service = 'groq' transcript = self.speech_service.process_video_speech(video_path, language, service) logger.info(f"Speech-to-text completed. Text length: {len(transcript)} characters") return transcript except Exception as e: logger.error(f"Error during speech-to-text processing: {str(e)}") logger.warning("Continuing with empty transcript due to speech processing failure") return "" def process_eye_contact(video_path, model_name): logger.info("Starting eye contact analysis...") try: results = analyze_eye_contact_video( video_path=video_path, display_video=False, save_results=False, model_name=model_name ) logger.info("Eye contact analysis completed successfully") return results except Exception as e: logger.error(f"Error during eye contact analysis: {str(e)}") logger.warning("Continuing without eye contact analysis") return None def process_body_language(video_path): logger.info("Starting body language analysis...") try: results = analyze_body_language_video( video_path=video_path, display_video=False, save_results=False ) logger.info("Body language analysis completed successfully") return results except Exception as e: logger.error(f"Error during body language analysis: {str(e)}") logger.warning("Continuing without body language analysis") return None def process_face_analysis(video_path, job_title): logger.info("Starting face analysis...") try: # Create a temp directory for extracted frames temp_frames_dir = Path("temp_face_frames") os.makedirs(temp_frames_dir, exist_ok=True) face_frames = [] # Extract frames from the video cap = cv2.VideoCapture(video_path) if not cap.isOpened(): logger.error(f"Error: Could not open video file {video_path}") return None # Get video properties frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) # Extract 3 evenly distributed frames num_frames = 3 frame_indices = [int(i * frame_count / (num_frames + 1)) for i in range(1, num_frames + 1)] for i, frame_idx in enumerate(frame_indices): cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) ret, frame = cap.read() if ret: # Generate filename timestamp = frame_idx / fps if fps > 0 else 0 minutes = int(timestamp // 60) seconds = int(timestamp % 60) filename = f"frame_{i+1}_at_{minutes:02d}m{seconds:02d}s.jpg" output_path = temp_frames_dir / filename # Save frame cv2.imwrite(str(output_path), frame) face_frames.append(str(output_path)) cap.release() if face_frames: # Analyze extracted frames face_analyzer = AIFaceAnalyzer(provider="openai") face_analysis_results = face_analyzer.analyze_profile_pictures(face_frames, job_title) logger.info("Face analysis completed successfully") return face_analysis_results else: logger.warning("No frames were extracted for face analysis") return None except Exception as e: logger.error(f"Error during face analysis: {str(e)}") logger.warning("Continuing without face analysis") return None def process_emotion_analysis(video_path, frame_rate, backend, generate_annotated_video, status_callback=None): logger.info(f"Starting emotion analysis with {backend} backend...") try: # Initialize emotion analyzer with custom parameters custom_emotion_analyzer = EmotionAnalyzer( min_face_size_ratio=min_face_size_ratio, min_confidence=min_face_confidence, skip_similar_frames=False # Explicitly disable frame similarity checks ) # Use process_video_frames from EmotionAnalyzer all_results, annotated_video_path, timing_summary, metadata = custom_emotion_analyzer.process_video_frames( video_path=video_path, frame_rate=frame_rate, backend=backend, generate_annotated_video=generate_annotated_video, status_callback=status_callback, # Pass the received status_callback adaptive_sampling=adaptive_sampling, max_frames=1000 ) # Log timing summary and metadata for monitoring logger.info(f"Frame analysis timing summary: {timing_summary}") logger.info(f"Frame analysis metadata: {metadata}") logger.info(f"Total frames analyzed: {len(all_results)}") return all_results, annotated_video_path, timing_summary, metadata except Exception as e: logger.error(f"Error during emotion analysis: {str(e)}") return [], None, {}, {} # Execute tasks in parallel using ThreadPoolExecutor with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # Start all tasks in parallel future_transcript = executor.submit(process_speech, video_path, language) futures = {} if analyze_eye_contact: futures['eye_contact'] = executor.submit(process_eye_contact, video_path, model_name) if analyze_body_language: futures['body_language'] = executor.submit(process_body_language, video_path) if analyze_face: futures['face'] = executor.submit(process_face_analysis, video_path, job_title) # Always submit emotion analysis futures['emotion'] = executor.submit(process_emotion_analysis, video_path, frame_rate, backend, generate_annotated_video, status_callback) # Wait for all tasks to complete and collect results transcript = future_transcript.result() eye_contact_results = futures['eye_contact'].result() if 'eye_contact' in futures else None body_language_results = futures['body_language'].result() if 'body_language' in futures else None face_analysis_results = futures['face'].result() if 'face' in futures else None all_results, annotated_video_path, timing_summary, metadata = futures['emotion'].result() # Update status after parallel processing if status_callback: status_callback(80) # 80% progress print("********Body language results**************" ) print(body_language_results) print("********Eye contact results**************" ) print(eye_contact_results) print("********End of results**************" ) # Check if we have any emotion results if not all_results: logger.warning("No emotions detected in any frames.") empty_results = { 'backend': [], 'eye_contact_analysis': eye_contact_results if eye_contact_results else {}, 'body_language_analysis': body_language_results if body_language_results else {}, 'face_analysis': face_analysis_results if face_analysis_results else {} } empty_results_json = json.dumps(empty_results) return transcript, empty_results_json # Calculate emotion statistics emotion_stats = self._calculate_emotion_statistics(all_results) # Video info data cap = cv2.VideoCapture(video_path) video_fps = cap.get(cv2.CAP_PROP_FPS) video_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = video_frames / video_fps if video_fps > 0 else 0 cap.release() # Create comprehensive results structure comprehensive_results = { "video_info": { "path": video_path, "frames": video_frames, "fps": video_fps, "duration_seconds": duration, "device_used": metadata.get("device", "unknown"), "backend": backend, "face_detection_params": { "min_confidence": min_face_confidence, "min_face_size_ratio": min_face_size_ratio } }, "emotion_stats": emotion_stats, "frames_analyzed": len(all_results), "execution_stats": { "total_processing_time_seconds": timing_summary.get("total_time", 0), "avg_processing_time_seconds": timing_summary.get("avg_time_per_frame", 0), "timing_breakdown": { "face_detection": metadata.get("detailed_timing", {}).get("face_detection", 0), "emotion_analysis": metadata.get("detailed_timing", {}).get("emotion_analysis", 0), "temporal_consistency": metadata.get("detailed_timing", {}).get("temporal_consistency", 0), "cache_check": metadata.get("detailed_timing", {}).get("cache_check", 0), "similarity_check": metadata.get("detailed_timing", {}).get("similarity_check", 0), "total": timing_summary.get("avg_time_per_frame", 0) } } } # Add eye contact, body language, and face analysis results if available if eye_contact_results: comprehensive_results["eye_contact_analysis"] = eye_contact_results if body_language_results: comprehensive_results["body_language_analysis"] = body_language_results if face_analysis_results: comprehensive_results["face_analysis"] = face_analysis_results # Determine overall sentiment based on emotion_percentages dominant_emotion, _ = max(emotion_stats["emotion_percentages"].items(), key=lambda x: x[1], default=("neutral", 0)) comprehensive_results["overall_sentiment"] = dominant_emotion.capitalize() # Print the JSON results to console for immediate feedback print("\n--- Comprehensive Analysis JSON Results ---") print(json.dumps(comprehensive_results, indent=2)) print("--------------------------------------\n") # Process the results to ensure they have the required fields processed_results = self._process_emotion_results(all_results) # Convert results to DataFrame df = json_to_dataframe({'backend': processed_results}) # Store original emotion data from emotion_stats on the DataFrame if emotion_stats["emotion_percentages"]: # Use the emotion_percentages data for all rows df['raw_emotion_data'] = [emotion_stats["emotion_percentages"]] * len(df) # Add confidence data as a separate field confidence_data = { "confidence_by_emotion": emotion_stats["confidence_by_emotion"], "average_confidence": emotion_stats["average_confidence"] } df['confidence_data'] = [confidence_data] * len(df) # Add overall sentiment to each row df['overall_sentiment'] = comprehensive_results["overall_sentiment"] logger.info(f"Added emotion percentages data to DataFrame: {emotion_stats['emotion_percentages']}") logger.info(f"Added confidence data to DataFrame: {confidence_data}") logger.info(f"Added overall sentiment to DataFrame: {comprehensive_results['overall_sentiment']}") else: logger.warning("No emotion data found to add to DataFrame") # Check if we have emotion data if df.empty: logger.warning("No emotions detected, cannot generate analysis.") # Use the already processed results if available, or create empty list if not if 'processed_results' not in locals(): processed_results = [] empty_results = { 'backend': processed_results, 'eye_contact_analysis': eye_contact_results if eye_contact_results else {}, 'body_language_analysis': body_language_results if body_language_results else {}, 'face_analysis': face_analysis_results if face_analysis_results else {} } empty_results_json = json.dumps(empty_results) return transcript, empty_results_json # Perform AI analysis logger.info("Starting AI analysis...") try: # Log the data being passed to the AI analysis if eye_contact_results: logger.info(f"Passing eye_contact_data to AI analysis with {len(str(eye_contact_results))} characters") else: logger.info("No eye_contact_data available to pass to AI analysis") if body_language_results: logger.info(f"Passing body_language_data to AI analysis with {len(str(body_language_results))} characters") else: logger.info("No body_language_data available to pass to AI analysis") if face_analysis_results: logger.info(f"Passing face_analysis_data to AI analysis with {len(str(face_analysis_results))} items") else: logger.info("No face_analysis_data available to pass to AI analysis") analysis = self.ai_analysis_service.analyze_emotions_and_transcript( df, transcript, language, eye_contact_data=eye_contact_results, body_language_data=body_language_results, face_analysis_data=face_analysis_results, model_name=model_name ) except Exception as e: logger.error(f"Error during AI analysis: {str(e)}") results_with_error = { 'backend': processed_results, 'error': str(e), 'eye_contact_analysis': eye_contact_results if eye_contact_results else {}, 'body_language_analysis': body_language_results if body_language_results else {}, 'face_analysis': face_analysis_results if face_analysis_results else {} } results_json = json.dumps(results_with_error) return transcript, results_json # Update status if status_callback: status_callback(100) # 100% progress # Log total processing time end_time = time.time() total_time_taken = end_time - start_time logger.info(f"Total processing time: {total_time_taken:.2f} seconds") # Convert analysis to JSON analysis_json = json.dumps(analysis) return transcript, analysis_json def _calculate_emotion_statistics(self, all_results): """Calculate comprehensive emotion statistics from frame results.""" # Count frames with faces frames_with_faces = 0 total_faces = 0 total_confidence = 0 emotion_counts = { "angry": 0, "disgust": 0, "fear": 0, "happy": 0, "sad": 0, "surprise": 0, "neutral": 0 } confidence_by_emotion = {emotion: [] for emotion in emotion_counts.keys()} # Process each frame result for result in all_results: faces = result.get("faces", []) if faces: frames_with_faces += 1 total_faces += len(faces) # Count main emotion if available if "main_emotion" in result: main_emotion = result["main_emotion"]["emotion"] confidence = result["main_emotion"]["confidence"] if main_emotion in emotion_counts: emotion_counts[main_emotion] += 1 confidence_by_emotion[main_emotion].append(confidence) total_confidence += confidence # Otherwise check each face for emotions else: for face in faces: if "emotion" in face: # Find dominant emotion for this face dominant_emotion = max(face["emotion"].items(), key=lambda x: x[1]) emotion_name = dominant_emotion[0] confidence = dominant_emotion[1] if emotion_name in emotion_counts: emotion_counts[emotion_name] += 1 confidence_by_emotion[emotion_name].append(confidence) total_confidence += confidence # Calculate percentages total_emotions = sum(emotion_counts.values()) emotion_percentages = {} if total_emotions > 0: for emotion, count in emotion_counts.items(): emotion_percentages[emotion] = (count / total_emotions) * 100 # Calculate face detection percentage face_detection_percentage = 0 if all_results: face_detection_percentage = (frames_with_faces / len(all_results)) * 100 # Calculate average confidence average_confidence = 0 if total_emotions > 0: average_confidence = total_confidence / total_emotions # Calculate average confidence by emotion confidence_averages = {} for emotion, confidences in confidence_by_emotion.items(): if confidences: confidence_averages[emotion] = sum(confidences) / len(confidences) else: confidence_averages[emotion] = 0 # Create emotion statistics emotion_stats = { "frames_with_faces": frames_with_faces, "face_detection_percentage": face_detection_percentage, "emotion_counts": emotion_counts, "emotion_percentages": emotion_percentages, "average_confidence": average_confidence, "confidence_by_emotion": confidence_averages } return emotion_stats def _process_emotion_results(self, all_results): """Process emotion results to ensure they have required fields.""" processed_results = [] # Process all results for result in all_results: # Skip empty results if not result: continue # Process faces to ensure they have dominant_emotion and emotion_confidence if 'faces' in result and result['faces']: for face in result['faces']: # If face has emotion data but no dominant_emotion, calculate it if 'emotion' in face and 'dominant_emotion' not in face: emotions = face['emotion'] if emotions: # Find dominant emotion and its confidence dominant_emotion, confidence = max(emotions.items(), key=lambda x: x[1]) face['dominant_emotion'] = dominant_emotion face['emotion_confidence'] = confidence face['emotion_stable'] = face.get('emotion_stable', False) # Process main_face if it exists if 'main_face' in result and result['main_face']: main_face = result['main_face'] if 'emotion' in main_face and 'dominant_emotion' not in main_face: emotions = main_face['emotion'] if emotions: # Find dominant emotion and its confidence dominant_emotion, confidence = max(emotions.items(), key=lambda x: x[1]) main_face['dominant_emotion'] = dominant_emotion main_face['emotion_confidence'] = confidence main_face['emotion_stable'] = main_face.get('emotion_stable', False) # Process main_emotion if it exists if 'main_emotion' in result and result['main_emotion']: main_emotion = result['main_emotion'] # If main_emotion has emotion but not confidence, add it if 'emotion' in main_emotion and 'confidence' not in main_emotion: # Try to get confidence from main_face if 'main_face' in result and result['main_face'] and 'emotion' in result['main_face']: emotion_name = main_emotion['emotion'] main_emotion['confidence'] = result['main_face']['emotion'].get(emotion_name, 0) processed_results.append(result) return processed_results # Create a singleton instance video_processor = VideoProcessor() # Function to maintain backward compatibility def process_video( video_path: str, frame_rate: int = 1, backend: str = 'mediapipe', language: str = 'en', generate_annotated_video: bool = False, video_id: Optional[str] = None, status_callback = None, min_face_confidence: float = 0.5, min_face_size_ratio: float = 0.05, save_emotion_stats: bool = True, skip_frames: int = 2, # Default parameter, not used for frame sampling anymore adaptive_sampling: bool = False, # Control whether adaptive sampling is used analyze_eye_contact: bool = True, analyze_body_language: bool = True, analyze_face: bool = True, job_title: str = "Professional", model_name: str = "gpt-4o" ) -> Tuple[str, str]: """ Process a video file for emotion analysis (backward compatibility function). Args: video_path: Path to the video file frame_rate: Process every nth frame (controls the sampling rate of frames for analysis) backend: Backend to use for face detection language: Language of the video generate_annotated_video: Whether to generate an annotated video video_id: ID of the video (optional) status_callback: Callback function for progress updates min_face_confidence: Minimum confidence for face detection min_face_size_ratio: Minimum face size as ratio of image dimensions save_emotion_stats: Whether to save detailed emotion statistics as JSON skip_frames: Legacy parameter, kept for backward compatibility but not used adaptive_sampling: Whether to use adaptive sampling analyze_eye_contact: Whether to analyze eye contact analyze_body_language: Whether to analyze body language analyze_face: Whether to analyze face job_title: Job title for face analysis model_name: The name of the model to use for AI analysis Returns: Tuple of (transcript, analysis_json) """ return video_processor.process_video( video_path=video_path, frame_rate=frame_rate, backend=backend, language=language, generate_annotated_video=generate_annotated_video, video_id=video_id, status_callback=status_callback, min_face_confidence=min_face_confidence, min_face_size_ratio=min_face_size_ratio, save_emotion_stats=save_emotion_stats, skip_frames=skip_frames, adaptive_sampling=adaptive_sampling, analyze_eye_contact=analyze_eye_contact, analyze_body_language=analyze_body_language, analyze_face=analyze_face, job_title=job_title, model_name=model_name )