| import logging |
| import time |
| import json |
| import pandas as pd |
| import cv2 |
| from pathlib import Path |
| from typing import Dict, Any, Optional, Tuple |
| import os |
| import concurrent.futures |
|
|
| from app.utils.logging_utils import time_it, setup_logger |
| from app.utils.data_utils import json_to_dataframe |
| from app.core.config import settings |
| from app.services.processing.speech_service import SpeechService |
| from app.services.processing.emotion_analyzer import EmotionAnalyzer |
| from app.services.processing.ai_analysis import AIAnalysisService |
| from app.services.processing.eye_contact_analyzer import analyze_video_file as analyze_eye_contact_video |
| from app.services.processing.body_language_analyzer import analyze_video_file as analyze_body_language_video |
| from app.services.processing.ai_face_analyzer import AIFaceAnalyzer |
|
|
|
|
| |
| logger = setup_logger(__name__) |
|
|
| class VideoProcessor: |
| """Service for processing videos.""" |
| |
| def __init__(self): |
| """Initialize the video processor.""" |
| self.speech_service = SpeechService() |
| self.emotion_analyzer = EmotionAnalyzer() |
| self.ai_analysis_service = AIAnalysisService() |
| |
|
|
| @time_it |
| def process_video( |
| self, |
| video_path: str, |
| frame_rate: int = 1, |
| backend: str = 'mediapipe', |
| language: str = 'en', |
| generate_annotated_video: bool = False, |
| video_id: Optional[str] = None, |
| status_callback = None, |
| min_face_confidence: float = 0.5, |
| min_face_size_ratio: float = 0.05, |
| save_emotion_stats: bool = True, |
| skip_frames: int = 2, |
| adaptive_sampling: bool = False, |
| analyze_eye_contact: bool = True, |
| analyze_body_language: bool = True, |
| analyze_face: bool = True, |
| job_title: str = "Professional", |
| model_name: str = "gpt-4o" |
| ) -> Tuple[str, str]: |
| """ |
| Process a video file for emotion analysis. |
| |
| Args: |
| video_path: Path to the video file |
| frame_rate: Process every nth frame (controls the sampling rate of frames for analysis) |
| backend: Backend to use for face detection |
| language: Language of the video |
| generate_annotated_video: Whether to generate an annotated video |
| video_id: ID of the video (optional) |
| status_callback: Callback function for progress updates |
| min_face_confidence: Minimum confidence for face detection |
| min_face_size_ratio: Minimum face size as ratio of image dimensions |
| save_emotion_stats: Whether to save detailed emotion statistics as JSON |
| skip_frames: Legacy parameter, kept for backward compatibility but not used |
| adaptive_sampling: Whether to use adaptive sampling |
| analyze_eye_contact: Whether to analyze eye contact |
| analyze_body_language: Whether to analyze body language |
| analyze_face: Whether to analyze face |
| job_title: Job title for face analysis |
| |
| Returns: |
| Tuple of (transcript, analysis_json) |
| """ |
| start_time = time.time() |
| |
| |
| logger.info(f"DEBUG - Processing video path: {video_path}") |
| logger.info(f"DEBUG - Video file exists: {os.path.exists(video_path)}") |
| if os.path.exists(video_path): |
| logger.info(f"DEBUG - Video file size: {os.path.getsize(video_path) / (1024*1024):.2f} MB") |
| |
| |
| results_dir = settings.RESULTS_DIR |
| os.makedirs(results_dir, exist_ok=True) |
| |
| |
| if status_callback: |
| status_callback(5) |
| |
| logger.info(f"Processing video: {video_path}") |
| logger.info(f"Using backend: {backend}") |
| logger.info(f"Language: {language}") |
| |
| |
| if backend == 'opencv' or not backend or backend == "retinaface": |
| logger.info(f"Backend '{backend}' doesn't support GPU acceleration or is not recommended.") |
| logger.info(f"Switching to 'mediapipe' for GPU-accelerated frame analysis.") |
| backend = "mediapipe" |
| |
| |
| if backend not in ['mediapipe', 'ssd', 'mtcnn']: |
| logger.info(f"Backend '{backend}' may not be optimized for GPU acceleration.") |
| logger.info(f"Consider using 'mediapipe' for best GPU performance.") |
| |
| |
| def process_speech(video_path, language): |
| logger.info("Starting speech-to-text processing...") |
| try: |
| service = 'groq' |
| transcript = self.speech_service.process_video_speech(video_path, language, service) |
| logger.info(f"Speech-to-text completed. Text length: {len(transcript)} characters") |
| return transcript |
| except Exception as e: |
| logger.error(f"Error during speech-to-text processing: {str(e)}") |
| logger.warning("Continuing with empty transcript due to speech processing failure") |
| return "" |
|
|
| def process_eye_contact(video_path, model_name): |
| logger.info("Starting eye contact analysis...") |
| try: |
| results = analyze_eye_contact_video( |
| video_path=video_path, |
| display_video=False, |
| save_results=False, |
| model_name=model_name |
| ) |
| logger.info("Eye contact analysis completed successfully") |
| return results |
| except Exception as e: |
| logger.error(f"Error during eye contact analysis: {str(e)}") |
| logger.warning("Continuing without eye contact analysis") |
| return None |
|
|
| def process_body_language(video_path): |
| logger.info("Starting body language analysis...") |
| try: |
| results = analyze_body_language_video( |
| video_path=video_path, |
| display_video=False, |
| save_results=False |
| ) |
| logger.info("Body language analysis completed successfully") |
| return results |
| except Exception as e: |
| logger.error(f"Error during body language analysis: {str(e)}") |
| logger.warning("Continuing without body language analysis") |
| return None |
|
|
| def process_face_analysis(video_path, job_title): |
| logger.info("Starting face analysis...") |
| try: |
| |
| temp_frames_dir = Path("temp_face_frames") |
| os.makedirs(temp_frames_dir, exist_ok=True) |
| |
| face_frames = [] |
| |
| cap = cv2.VideoCapture(video_path) |
| if not cap.isOpened(): |
| logger.error(f"Error: Could not open video file {video_path}") |
| return None |
| |
| |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| |
| |
| num_frames = 3 |
| frame_indices = [int(i * frame_count / (num_frames + 1)) for i in range(1, num_frames + 1)] |
| |
| for i, frame_idx in enumerate(frame_indices): |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) |
| ret, frame = cap.read() |
| if ret: |
| |
| timestamp = frame_idx / fps if fps > 0 else 0 |
| minutes = int(timestamp // 60) |
| seconds = int(timestamp % 60) |
| filename = f"frame_{i+1}_at_{minutes:02d}m{seconds:02d}s.jpg" |
| output_path = temp_frames_dir / filename |
| |
| |
| cv2.imwrite(str(output_path), frame) |
| face_frames.append(str(output_path)) |
| |
| cap.release() |
| |
| if face_frames: |
| |
| face_analyzer = AIFaceAnalyzer(provider="openai") |
| face_analysis_results = face_analyzer.analyze_profile_pictures(face_frames, job_title) |
| logger.info("Face analysis completed successfully") |
| return face_analysis_results |
| else: |
| logger.warning("No frames were extracted for face analysis") |
| return None |
| except Exception as e: |
| logger.error(f"Error during face analysis: {str(e)}") |
| logger.warning("Continuing without face analysis") |
| return None |
|
|
| def process_emotion_analysis(video_path, frame_rate, backend, generate_annotated_video, status_callback=None): |
| logger.info(f"Starting emotion analysis with {backend} backend...") |
| try: |
| |
| custom_emotion_analyzer = EmotionAnalyzer( |
| min_face_size_ratio=min_face_size_ratio, |
| min_confidence=min_face_confidence, |
| skip_similar_frames=False |
| ) |
| |
| |
| all_results, annotated_video_path, timing_summary, metadata = custom_emotion_analyzer.process_video_frames( |
| video_path=video_path, |
| frame_rate=frame_rate, |
| backend=backend, |
| generate_annotated_video=generate_annotated_video, |
| status_callback=status_callback, |
| adaptive_sampling=adaptive_sampling, |
| max_frames=1000 |
| ) |
| |
| |
| logger.info(f"Frame analysis timing summary: {timing_summary}") |
| logger.info(f"Frame analysis metadata: {metadata}") |
| logger.info(f"Total frames analyzed: {len(all_results)}") |
| |
| return all_results, annotated_video_path, timing_summary, metadata |
| except Exception as e: |
| logger.error(f"Error during emotion analysis: {str(e)}") |
| return [], None, {}, {} |
| |
| |
| with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: |
| |
| future_transcript = executor.submit(process_speech, video_path, language) |
| |
| futures = {} |
| if analyze_eye_contact: |
| futures['eye_contact'] = executor.submit(process_eye_contact, video_path, model_name) |
| |
| if analyze_body_language: |
| futures['body_language'] = executor.submit(process_body_language, video_path) |
| |
| if analyze_face: |
| futures['face'] = executor.submit(process_face_analysis, video_path, job_title) |
| |
| |
| futures['emotion'] = executor.submit(process_emotion_analysis, video_path, frame_rate, backend, generate_annotated_video, status_callback) |
| |
| |
| transcript = future_transcript.result() |
| |
| eye_contact_results = futures['eye_contact'].result() if 'eye_contact' in futures else None |
| body_language_results = futures['body_language'].result() if 'body_language' in futures else None |
| face_analysis_results = futures['face'].result() if 'face' in futures else None |
| |
| all_results, annotated_video_path, timing_summary, metadata = futures['emotion'].result() |
| |
| |
| if status_callback: |
| status_callback(80) |
| |
| print("********Body language results**************" ) |
| print(body_language_results) |
| print("********Eye contact results**************" ) |
| print(eye_contact_results) |
| print("********End of results**************" ) |
| |
| |
| if not all_results: |
| logger.warning("No emotions detected in any frames.") |
| empty_results = { |
| 'backend': [], |
| 'eye_contact_analysis': eye_contact_results if eye_contact_results else {}, |
| 'body_language_analysis': body_language_results if body_language_results else {}, |
| 'face_analysis': face_analysis_results if face_analysis_results else {} |
| } |
| empty_results_json = json.dumps(empty_results) |
| return transcript, empty_results_json |
| |
| |
| emotion_stats = self._calculate_emotion_statistics(all_results) |
| |
| |
| cap = cv2.VideoCapture(video_path) |
| video_fps = cap.get(cv2.CAP_PROP_FPS) |
| video_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| duration = video_frames / video_fps if video_fps > 0 else 0 |
| cap.release() |
| |
| |
| comprehensive_results = { |
| "video_info": { |
| "path": video_path, |
| "frames": video_frames, |
| "fps": video_fps, |
| "duration_seconds": duration, |
| "device_used": metadata.get("device", "unknown"), |
| "backend": backend, |
| "face_detection_params": { |
| "min_confidence": min_face_confidence, |
| "min_face_size_ratio": min_face_size_ratio |
| } |
| }, |
| "emotion_stats": emotion_stats, |
| "frames_analyzed": len(all_results), |
| "execution_stats": { |
| "total_processing_time_seconds": timing_summary.get("total_time", 0), |
| "avg_processing_time_seconds": timing_summary.get("avg_time_per_frame", 0), |
| "timing_breakdown": { |
| "face_detection": metadata.get("detailed_timing", {}).get("face_detection", 0), |
| "emotion_analysis": metadata.get("detailed_timing", {}).get("emotion_analysis", 0), |
| "temporal_consistency": metadata.get("detailed_timing", {}).get("temporal_consistency", 0), |
| "cache_check": metadata.get("detailed_timing", {}).get("cache_check", 0), |
| "similarity_check": metadata.get("detailed_timing", {}).get("similarity_check", 0), |
| "total": timing_summary.get("avg_time_per_frame", 0) |
| } |
| } |
| } |
| |
| |
| if eye_contact_results: |
| comprehensive_results["eye_contact_analysis"] = eye_contact_results |
| |
| if body_language_results: |
| comprehensive_results["body_language_analysis"] = body_language_results |
| |
| if face_analysis_results: |
| comprehensive_results["face_analysis"] = face_analysis_results |
| |
| |
| dominant_emotion, _ = max(emotion_stats["emotion_percentages"].items(), key=lambda x: x[1], default=("neutral", 0)) |
| comprehensive_results["overall_sentiment"] = dominant_emotion.capitalize() |
| |
| |
| print("\n--- Comprehensive Analysis JSON Results ---") |
| print(json.dumps(comprehensive_results, indent=2)) |
| print("--------------------------------------\n") |
| |
| |
| processed_results = self._process_emotion_results(all_results) |
| |
| |
| df = json_to_dataframe({'backend': processed_results}) |
| |
| |
| if emotion_stats["emotion_percentages"]: |
| |
| df['raw_emotion_data'] = [emotion_stats["emotion_percentages"]] * len(df) |
| |
| |
| confidence_data = { |
| "confidence_by_emotion": emotion_stats["confidence_by_emotion"], |
| "average_confidence": emotion_stats["average_confidence"] |
| } |
| df['confidence_data'] = [confidence_data] * len(df) |
| |
| |
| df['overall_sentiment'] = comprehensive_results["overall_sentiment"] |
| |
| logger.info(f"Added emotion percentages data to DataFrame: {emotion_stats['emotion_percentages']}") |
| logger.info(f"Added confidence data to DataFrame: {confidence_data}") |
| logger.info(f"Added overall sentiment to DataFrame: {comprehensive_results['overall_sentiment']}") |
| else: |
| logger.warning("No emotion data found to add to DataFrame") |
| |
| |
| if df.empty: |
| logger.warning("No emotions detected, cannot generate analysis.") |
| |
| if 'processed_results' not in locals(): |
| processed_results = [] |
| empty_results = { |
| 'backend': processed_results, |
| 'eye_contact_analysis': eye_contact_results if eye_contact_results else {}, |
| 'body_language_analysis': body_language_results if body_language_results else {}, |
| 'face_analysis': face_analysis_results if face_analysis_results else {} |
| } |
| empty_results_json = json.dumps(empty_results) |
| return transcript, empty_results_json |
| |
| |
| logger.info("Starting AI analysis...") |
| try: |
| |
| if eye_contact_results: |
| logger.info(f"Passing eye_contact_data to AI analysis with {len(str(eye_contact_results))} characters") |
| else: |
| logger.info("No eye_contact_data available to pass to AI analysis") |
| |
| if body_language_results: |
| logger.info(f"Passing body_language_data to AI analysis with {len(str(body_language_results))} characters") |
| else: |
| logger.info("No body_language_data available to pass to AI analysis") |
| |
| if face_analysis_results: |
| logger.info(f"Passing face_analysis_data to AI analysis with {len(str(face_analysis_results))} items") |
| else: |
| logger.info("No face_analysis_data available to pass to AI analysis") |
| |
| analysis = self.ai_analysis_service.analyze_emotions_and_transcript( |
| df, |
| transcript, |
| language, |
| eye_contact_data=eye_contact_results, |
| body_language_data=body_language_results, |
| face_analysis_data=face_analysis_results, |
| model_name=model_name |
| ) |
| except Exception as e: |
| logger.error(f"Error during AI analysis: {str(e)}") |
| results_with_error = { |
| 'backend': processed_results, |
| 'error': str(e), |
| 'eye_contact_analysis': eye_contact_results if eye_contact_results else {}, |
| 'body_language_analysis': body_language_results if body_language_results else {}, |
| 'face_analysis': face_analysis_results if face_analysis_results else {} |
| } |
| results_json = json.dumps(results_with_error) |
| return transcript, results_json |
| |
| |
| if status_callback: |
| status_callback(100) |
| |
| |
| end_time = time.time() |
| total_time_taken = end_time - start_time |
| logger.info(f"Total processing time: {total_time_taken:.2f} seconds") |
| |
| |
| analysis_json = json.dumps(analysis) |
| |
| return transcript, analysis_json |
| |
| def _calculate_emotion_statistics(self, all_results): |
| """Calculate comprehensive emotion statistics from frame results.""" |
| |
| frames_with_faces = 0 |
| total_faces = 0 |
| total_confidence = 0 |
| |
| emotion_counts = { |
| "angry": 0, |
| "disgust": 0, |
| "fear": 0, |
| "happy": 0, |
| "sad": 0, |
| "surprise": 0, |
| "neutral": 0 |
| } |
| |
| confidence_by_emotion = {emotion: [] for emotion in emotion_counts.keys()} |
| |
| |
| for result in all_results: |
| faces = result.get("faces", []) |
| if faces: |
| frames_with_faces += 1 |
| total_faces += len(faces) |
| |
| |
| if "main_emotion" in result: |
| main_emotion = result["main_emotion"]["emotion"] |
| confidence = result["main_emotion"]["confidence"] |
| |
| if main_emotion in emotion_counts: |
| emotion_counts[main_emotion] += 1 |
| confidence_by_emotion[main_emotion].append(confidence) |
| total_confidence += confidence |
| |
| else: |
| for face in faces: |
| if "emotion" in face: |
| |
| dominant_emotion = max(face["emotion"].items(), key=lambda x: x[1]) |
| emotion_name = dominant_emotion[0] |
| confidence = dominant_emotion[1] |
| |
| if emotion_name in emotion_counts: |
| emotion_counts[emotion_name] += 1 |
| confidence_by_emotion[emotion_name].append(confidence) |
| total_confidence += confidence |
| |
| |
| total_emotions = sum(emotion_counts.values()) |
| emotion_percentages = {} |
| if total_emotions > 0: |
| for emotion, count in emotion_counts.items(): |
| emotion_percentages[emotion] = (count / total_emotions) * 100 |
| |
| |
| face_detection_percentage = 0 |
| if all_results: |
| face_detection_percentage = (frames_with_faces / len(all_results)) * 100 |
| |
| |
| average_confidence = 0 |
| if total_emotions > 0: |
| average_confidence = total_confidence / total_emotions |
| |
| |
| confidence_averages = {} |
| for emotion, confidences in confidence_by_emotion.items(): |
| if confidences: |
| confidence_averages[emotion] = sum(confidences) / len(confidences) |
| else: |
| confidence_averages[emotion] = 0 |
| |
| |
| emotion_stats = { |
| "frames_with_faces": frames_with_faces, |
| "face_detection_percentage": face_detection_percentage, |
| "emotion_counts": emotion_counts, |
| "emotion_percentages": emotion_percentages, |
| "average_confidence": average_confidence, |
| "confidence_by_emotion": confidence_averages |
| } |
| |
| return emotion_stats |
| |
| def _process_emotion_results(self, all_results): |
| """Process emotion results to ensure they have required fields.""" |
| processed_results = [] |
| |
| |
| for result in all_results: |
| |
| if not result: |
| continue |
| |
| |
| if 'faces' in result and result['faces']: |
| for face in result['faces']: |
| |
| if 'emotion' in face and 'dominant_emotion' not in face: |
| emotions = face['emotion'] |
| if emotions: |
| |
| dominant_emotion, confidence = max(emotions.items(), key=lambda x: x[1]) |
| face['dominant_emotion'] = dominant_emotion |
| face['emotion_confidence'] = confidence |
| face['emotion_stable'] = face.get('emotion_stable', False) |
| |
| |
| if 'main_face' in result and result['main_face']: |
| main_face = result['main_face'] |
| if 'emotion' in main_face and 'dominant_emotion' not in main_face: |
| emotions = main_face['emotion'] |
| if emotions: |
| |
| dominant_emotion, confidence = max(emotions.items(), key=lambda x: x[1]) |
| main_face['dominant_emotion'] = dominant_emotion |
| main_face['emotion_confidence'] = confidence |
| main_face['emotion_stable'] = main_face.get('emotion_stable', False) |
| |
| |
| if 'main_emotion' in result and result['main_emotion']: |
| main_emotion = result['main_emotion'] |
| |
| if 'emotion' in main_emotion and 'confidence' not in main_emotion: |
| |
| if 'main_face' in result and result['main_face'] and 'emotion' in result['main_face']: |
| emotion_name = main_emotion['emotion'] |
| main_emotion['confidence'] = result['main_face']['emotion'].get(emotion_name, 0) |
| |
| processed_results.append(result) |
| |
| return processed_results |
|
|
| |
| video_processor = VideoProcessor() |
|
|
| |
| def process_video( |
| video_path: str, |
| frame_rate: int = 1, |
| backend: str = 'mediapipe', |
| language: str = 'en', |
| generate_annotated_video: bool = False, |
| video_id: Optional[str] = None, |
| status_callback = None, |
| min_face_confidence: float = 0.5, |
| min_face_size_ratio: float = 0.05, |
| save_emotion_stats: bool = True, |
| skip_frames: int = 2, |
| adaptive_sampling: bool = False, |
| analyze_eye_contact: bool = True, |
| analyze_body_language: bool = True, |
| analyze_face: bool = True, |
| job_title: str = "Professional", |
| model_name: str = "gpt-4o" |
| ) -> Tuple[str, str]: |
| """ |
| Process a video file for emotion analysis (backward compatibility function). |
| |
| Args: |
| video_path: Path to the video file |
| frame_rate: Process every nth frame (controls the sampling rate of frames for analysis) |
| backend: Backend to use for face detection |
| language: Language of the video |
| generate_annotated_video: Whether to generate an annotated video |
| video_id: ID of the video (optional) |
| status_callback: Callback function for progress updates |
| min_face_confidence: Minimum confidence for face detection |
| min_face_size_ratio: Minimum face size as ratio of image dimensions |
| save_emotion_stats: Whether to save detailed emotion statistics as JSON |
| skip_frames: Legacy parameter, kept for backward compatibility but not used |
| adaptive_sampling: Whether to use adaptive sampling |
| analyze_eye_contact: Whether to analyze eye contact |
| analyze_body_language: Whether to analyze body language |
| analyze_face: Whether to analyze face |
| job_title: Job title for face analysis |
| model_name: The name of the model to use for AI analysis |
| |
| Returns: |
| Tuple of (transcript, analysis_json) |
| """ |
| return video_processor.process_video( |
| video_path=video_path, |
| frame_rate=frame_rate, |
| backend=backend, |
| language=language, |
| generate_annotated_video=generate_annotated_video, |
| video_id=video_id, |
| status_callback=status_callback, |
| min_face_confidence=min_face_confidence, |
| min_face_size_ratio=min_face_size_ratio, |
| save_emotion_stats=save_emotion_stats, |
| skip_frames=skip_frames, |
| adaptive_sampling=adaptive_sampling, |
| analyze_eye_contact=analyze_eye_contact, |
| analyze_body_language=analyze_body_language, |
| analyze_face=analyze_face, |
| job_title=job_title, |
| model_name=model_name |
| ) |