from fastapi import BackgroundTasks from sqlalchemy.orm import Session import json import logging import asyncio from app.db.repositories.video import VideoRepository from app.db.repositories.results import ResultsRepository from app.models.processing import ProcessingRequest, ProcessingStatus from app.services.processing.video_processor import process_video from app.core.exceptions import VideoNotFoundError, ResultNotFoundError, VideoProcessingError from app.utils.logging_utils import setup_logger from app.db.base import SessionLocal # Configure logging logger = setup_logger(__name__) class ProcessingService: """Service for video processing operations.""" def __init__(self, db: Session): self.db = db self.video_repo = VideoRepository(db) self.results_repo = ResultsRepository(db) async def process_video(self, request: ProcessingRequest, background_tasks: BackgroundTasks) -> ProcessingStatus: """ Process a video. Args: request: Processing request parameters background_tasks: FastAPI background tasks Returns: ProcessingStatus object Raises: VideoNotFoundError: If the video is not found VideoProcessingError: If there is an error processing the video """ video_id = request.video_id # Check if video exists db_video = self.video_repo.get_by_id(video_id) if not db_video: raise VideoNotFoundError(video_id) try: # Update status self.video_repo.update_status(video_id, "processing") # Get model name from request or use default model_name = getattr(request, 'model_name', "gpt-4o") # Start processing in background background_tasks.add_task( self._process_video_task, video_id=video_id, video_path=db_video.file_path, frame_rate=request.frame_rate, backend=request.backend, language=request.language, generate_annotated_video=request.generate_annotated_video, model_name=model_name ) return ProcessingStatus( video_id=video_id, status="processing" ) except Exception as e: logger.error(f"Error processing video {video_id}: {str(e)}") self.video_repo.update_status(video_id, "failed") raise VideoProcessingError(f"Error processing video: {str(e)}") def get_processing_status(self, video_id: str) -> ProcessingStatus: """ Get the processing status of a video. Args: video_id: ID of the video Returns: ProcessingStatus object Raises: VideoNotFoundError: If the video is not found """ db_video = self.video_repo.get_by_id(video_id) if not db_video: raise VideoNotFoundError(video_id) return ProcessingStatus( video_id=video_id, status=db_video.status ) def get_processing_results(self, video_id: str) -> dict: """ Get the processing results of a video. Args: video_id: ID of the video Returns: Dictionary with processing results Raises: VideoNotFoundError: If the video is not found ResultNotFoundError: If the processing result is not found """ # Get the video video = self.video_repo.get_by_id(video_id) if not video: raise VideoNotFoundError(video_id) # Get the processing result result = self.results_repo.get_by_video_id(video_id) if not result: return { "status": video.status, "message": "No processing results available yet" } # Prepare response response = { "status": video.status, "processing_date": result.processing_date.isoformat(), "transcript": result.transcript, "emotion_analysis": result.emotion_analysis, "overall_summary": result.overall_summary, "transcript_analysis": result.transcript_analysis, "recommendations": result.recommendations, "body_language_analysis": result.body_language_analysis, "body_language_data": result.body_language_data, "eye_contact_analysis": result.eye_contact_analysis, "eye_contact_data": result.eye_contact_data, "face_analysis_data": result.face_analysis_data } return response async def _process_video_task( self, video_id: str, video_path: str, frame_rate: int, backend: str, language: str, generate_annotated_video: bool, model_name: str = "gpt-4o" ): """ Background task to process a video. Args: video_id: ID of the video video_path: Path to the video file frame_rate: Frame rate for processing backend: Backend for face detection language: Language of the video generate_annotated_video: Whether to generate an annotated video model_name: The name of the model to use for AI analysis (default: gpt-4o) """ try: # Force mediapipe backend for best GPU performance on Mac M3 if backend == 'opencv' or not backend or backend == "retinaface": logger.info(f"Backend '{backend}' doesn't support GPU acceleration or is not recommended.") logger.info(f"Switching to 'mediapipe' for GPU-accelerated frame analysis.") backend = "mediapipe" # Ensure we're using a GPU-compatible backend if backend not in ['mediapipe', 'ssd', 'mtcnn']: logger.info(f"Backend '{backend}' may not support GPU acceleration.") logger.info(f"Consider using 'mediapipe' for best GPU performance on Mac M3.") logger.info(f"Starting video processing task for {video_id}") logger.info(f"Video path: {video_path}") logger.info(f"Frame rate: {frame_rate}") logger.info(f"Backend: {backend}") logger.info(f"Language: {language}") logger.info(f"Generate annotated video: {generate_annotated_video}") logger.info(f"Model name for analysis: {model_name}") logger.info(f"Offloading video processing for {video_id} to a separate thread.") # Process the video in a separate thread to avoid blocking the event loop transcript, analysis = await asyncio.to_thread( process_video, # The synchronous, CPU-bound function video_path=video_path, frame_rate=frame_rate, backend=backend, language=language, generate_annotated_video=generate_annotated_video, video_id=video_id, status_callback=lambda progress: self._update_progress(video_id, progress), model_name=model_name ) logger.info(f"Threaded video processing for {video_id} completed.") # Parse the comprehensive analysis try: analysis_data = json.loads(analysis) if analysis else {} logger.info(f"Received analysis data: {analysis_data}") # Log the keys for debugging logger.info(f"Keys in analysis_data: {list(analysis_data.keys())}") # Extract data from the comprehensive analysis emotion_analysis = analysis_data.get("Emotion Analysis", {}) overall_summary = analysis_data.get("Overall Summary", "") transcript_analysis = analysis_data.get("Transcript Analysis", {}) recommendations = analysis_data.get("Recommendations", {}) body_language_analysis = analysis_data.get("Body Language Analysis", {}) eye_contact_analysis = analysis_data.get("Eye Contact Analysis", {}) # Try both capitalized and non-capitalized versions since the format may vary eye_contact_data = analysis_data.get("eye_contact_analysis", {}) body_language_data = analysis_data.get("body_language_analysis", {}) face_analysis_data = analysis_data.get("face_analysis", {}) # Check if data exists under any key - key names might be inconsistent if not eye_contact_data and "eye_contact_analysis" in str(analysis_data).lower(): logger.info(f"Searching for eye_contact_data in analysis_data string representation") for key in analysis_data.keys(): if "eye" in key.lower() and "contact" in key.lower(): logger.info(f"Found potential eye contact key: {key}") eye_contact_data = analysis_data.get(key, {}) break if not body_language_data and "body_language_analysis" in str(analysis_data).lower(): logger.info(f"Searching for body_language_data in analysis_data string representation") for key in analysis_data.keys(): if "body" in key.lower() and "language" in key.lower(): logger.info(f"Found potential body language key: {key}") body_language_data = analysis_data.get(key, {}) break if not face_analysis_data and "face_analysis" in str(analysis_data).lower(): logger.info(f"Searching for face_analysis_data in analysis_data string representation") for key in analysis_data.keys(): if "face" in key.lower() and "analysis" in key.lower(): logger.info(f"Found potential face analysis key: {key}") face_analysis_data = analysis_data.get(key, {}) break logger.info(f"Parsed analysis data: {emotion_analysis, overall_summary, transcript_analysis, recommendations, body_language_analysis}") logger.info(f"Parsed eye contact data: {eye_contact_data}") logger.info(f"Parsed body language data: {body_language_data}") logger.info(f"Parsed face analysis data: {face_analysis_data}") except Exception as e: logger.error(f"Error parsing analysis JSON: {e}") emotion_analysis = {} overall_summary = "" transcript_analysis = {} recommendations = {} body_language_data = {} body_language_analysis = {} eye_contact_data = {} eye_contact_analysis = {} face_analysis_data = {} # Save results to database self.results_repo.create( video_id=video_id, transcript=transcript or "", emotion_analysis=emotion_analysis, overall_summary=overall_summary, transcript_analysis=transcript_analysis, recommendations=recommendations, body_language_analysis=body_language_analysis, body_language_data=body_language_data, eye_contact_analysis=eye_contact_analysis, eye_contact_data=eye_contact_data, face_analysis_data=face_analysis_data ) # Update video status self.video_repo.update_status(video_id, "completed") logger.info(f"Video {video_id} processing completed successfully") except Exception as e: # Update status on error logger.error(f"Error processing video {video_id}: {str(e)}") self.video_repo.update_status(video_id, "failed") def _update_progress(self, video_id: str, progress: float): """ Update the processing progress of a video. Args: video_id: ID of the video progress: Processing progress (0-100) """ self.video_repo.update_progress(video_id, progress)