| from fastapi import BackgroundTasks |
| from sqlalchemy.orm import Session |
| import json |
| import logging |
| import asyncio |
|
|
| from app.db.repositories.video import VideoRepository |
| from app.db.repositories.results import ResultsRepository |
| from app.models.processing import ProcessingRequest, ProcessingStatus |
| from app.services.processing.video_processor import process_video |
| from app.core.exceptions import VideoNotFoundError, ResultNotFoundError, VideoProcessingError |
| from app.utils.logging_utils import setup_logger |
| from app.db.base import SessionLocal |
|
|
| |
| logger = setup_logger(__name__) |
|
|
| class ProcessingService: |
| """Service for video processing operations.""" |
| |
| def __init__(self, db: Session): |
| self.db = db |
| self.video_repo = VideoRepository(db) |
| self.results_repo = ResultsRepository(db) |
| |
| async def process_video(self, request: ProcessingRequest, background_tasks: BackgroundTasks) -> ProcessingStatus: |
| """ |
| Process a video. |
| |
| Args: |
| request: Processing request parameters |
| background_tasks: FastAPI background tasks |
| |
| Returns: |
| ProcessingStatus object |
| |
| Raises: |
| VideoNotFoundError: If the video is not found |
| VideoProcessingError: If there is an error processing the video |
| """ |
| video_id = request.video_id |
| |
| |
| db_video = self.video_repo.get_by_id(video_id) |
| if not db_video: |
| raise VideoNotFoundError(video_id) |
| |
| try: |
| |
| self.video_repo.update_status(video_id, "processing") |
| |
| |
| model_name = getattr(request, 'model_name', "gpt-4o") |
| |
| |
| background_tasks.add_task( |
| self._process_video_task, |
| video_id=video_id, |
| video_path=db_video.file_path, |
| frame_rate=request.frame_rate, |
| backend=request.backend, |
| language=request.language, |
| generate_annotated_video=request.generate_annotated_video, |
| model_name=model_name |
| ) |
| |
| return ProcessingStatus( |
| video_id=video_id, |
| status="processing" |
| ) |
| except Exception as e: |
| logger.error(f"Error processing video {video_id}: {str(e)}") |
| self.video_repo.update_status(video_id, "failed") |
| raise VideoProcessingError(f"Error processing video: {str(e)}") |
| |
| def get_processing_status(self, video_id: str) -> ProcessingStatus: |
| """ |
| Get the processing status of a video. |
| |
| Args: |
| video_id: ID of the video |
| |
| Returns: |
| ProcessingStatus object |
| |
| Raises: |
| VideoNotFoundError: If the video is not found |
| """ |
| db_video = self.video_repo.get_by_id(video_id) |
| if not db_video: |
| raise VideoNotFoundError(video_id) |
| |
| return ProcessingStatus( |
| video_id=video_id, |
| status=db_video.status |
| ) |
| |
| def get_processing_results(self, video_id: str) -> dict: |
| """ |
| Get the processing results of a video. |
| |
| Args: |
| video_id: ID of the video |
| |
| Returns: |
| Dictionary with processing results |
| |
| Raises: |
| VideoNotFoundError: If the video is not found |
| ResultNotFoundError: If the processing result is not found |
| """ |
| |
| video = self.video_repo.get_by_id(video_id) |
| if not video: |
| raise VideoNotFoundError(video_id) |
| |
| |
| result = self.results_repo.get_by_video_id(video_id) |
| if not result: |
| return { |
| "status": video.status, |
| "message": "No processing results available yet" |
| } |
|
|
| |
| response = { |
| "status": video.status, |
| "processing_date": result.processing_date.isoformat(), |
| "transcript": result.transcript, |
| "emotion_analysis": result.emotion_analysis, |
| "overall_summary": result.overall_summary, |
| "transcript_analysis": result.transcript_analysis, |
| "recommendations": result.recommendations, |
| "body_language_analysis": result.body_language_analysis, |
| "body_language_data": result.body_language_data, |
| "eye_contact_analysis": result.eye_contact_analysis, |
| "eye_contact_data": result.eye_contact_data, |
| "face_analysis_data": result.face_analysis_data |
| } |
| |
| return response |
| |
| async def _process_video_task( |
| self, |
| video_id: str, |
| video_path: str, |
| frame_rate: int, |
| backend: str, |
| language: str, |
| generate_annotated_video: bool, |
| model_name: str = "gpt-4o" |
| ): |
| """ |
| Background task to process a video. |
| |
| Args: |
| video_id: ID of the video |
| video_path: Path to the video file |
| frame_rate: Frame rate for processing |
| backend: Backend for face detection |
| language: Language of the video |
| generate_annotated_video: Whether to generate an annotated video |
| model_name: The name of the model to use for AI analysis (default: gpt-4o) |
| """ |
| try: |
| |
| if backend == 'opencv' or not backend or backend == "retinaface": |
| logger.info(f"Backend '{backend}' doesn't support GPU acceleration or is not recommended.") |
| logger.info(f"Switching to 'mediapipe' for GPU-accelerated frame analysis.") |
| backend = "mediapipe" |
| |
| |
| if backend not in ['mediapipe', 'ssd', 'mtcnn']: |
| logger.info(f"Backend '{backend}' may not support GPU acceleration.") |
| logger.info(f"Consider using 'mediapipe' for best GPU performance on Mac M3.") |
| |
| logger.info(f"Starting video processing task for {video_id}") |
| logger.info(f"Video path: {video_path}") |
| logger.info(f"Frame rate: {frame_rate}") |
| logger.info(f"Backend: {backend}") |
| logger.info(f"Language: {language}") |
| logger.info(f"Generate annotated video: {generate_annotated_video}") |
| logger.info(f"Model name for analysis: {model_name}") |
| |
| logger.info(f"Offloading video processing for {video_id} to a separate thread.") |
| |
| transcript, analysis = await asyncio.to_thread( |
| process_video, |
| video_path=video_path, |
| frame_rate=frame_rate, |
| backend=backend, |
| language=language, |
| generate_annotated_video=generate_annotated_video, |
| video_id=video_id, |
| status_callback=lambda progress: self._update_progress(video_id, progress), |
| model_name=model_name |
| ) |
| logger.info(f"Threaded video processing for {video_id} completed.") |
| |
| |
| try: |
| analysis_data = json.loads(analysis) if analysis else {} |
| logger.info(f"Received analysis data: {analysis_data}") |
| |
| |
| logger.info(f"Keys in analysis_data: {list(analysis_data.keys())}") |
| |
| |
| emotion_analysis = analysis_data.get("Emotion Analysis", {}) |
| overall_summary = analysis_data.get("Overall Summary", "") |
| transcript_analysis = analysis_data.get("Transcript Analysis", {}) |
| recommendations = analysis_data.get("Recommendations", {}) |
| body_language_analysis = analysis_data.get("Body Language Analysis", {}) |
| eye_contact_analysis = analysis_data.get("Eye Contact Analysis", {}) |
| |
| |
| eye_contact_data = analysis_data.get("eye_contact_analysis", {}) |
| body_language_data = analysis_data.get("body_language_analysis", {}) |
| face_analysis_data = analysis_data.get("face_analysis", {}) |
| |
| |
| if not eye_contact_data and "eye_contact_analysis" in str(analysis_data).lower(): |
| logger.info(f"Searching for eye_contact_data in analysis_data string representation") |
| for key in analysis_data.keys(): |
| if "eye" in key.lower() and "contact" in key.lower(): |
| logger.info(f"Found potential eye contact key: {key}") |
| eye_contact_data = analysis_data.get(key, {}) |
| break |
| |
| if not body_language_data and "body_language_analysis" in str(analysis_data).lower(): |
| logger.info(f"Searching for body_language_data in analysis_data string representation") |
| for key in analysis_data.keys(): |
| if "body" in key.lower() and "language" in key.lower(): |
| logger.info(f"Found potential body language key: {key}") |
| body_language_data = analysis_data.get(key, {}) |
| break |
| |
| if not face_analysis_data and "face_analysis" in str(analysis_data).lower(): |
| logger.info(f"Searching for face_analysis_data in analysis_data string representation") |
| for key in analysis_data.keys(): |
| if "face" in key.lower() and "analysis" in key.lower(): |
| logger.info(f"Found potential face analysis key: {key}") |
| face_analysis_data = analysis_data.get(key, {}) |
| break |
| |
| logger.info(f"Parsed analysis data: {emotion_analysis, overall_summary, transcript_analysis, recommendations, body_language_analysis}") |
| logger.info(f"Parsed eye contact data: {eye_contact_data}") |
| logger.info(f"Parsed body language data: {body_language_data}") |
| logger.info(f"Parsed face analysis data: {face_analysis_data}") |
| except Exception as e: |
| logger.error(f"Error parsing analysis JSON: {e}") |
| emotion_analysis = {} |
| overall_summary = "" |
| transcript_analysis = {} |
| recommendations = {} |
| body_language_data = {} |
| body_language_analysis = {} |
| eye_contact_data = {} |
| eye_contact_analysis = {} |
| face_analysis_data = {} |
| |
| self.results_repo.create( |
| video_id=video_id, |
| transcript=transcript or "", |
| emotion_analysis=emotion_analysis, |
| overall_summary=overall_summary, |
| transcript_analysis=transcript_analysis, |
| recommendations=recommendations, |
| body_language_analysis=body_language_analysis, |
| body_language_data=body_language_data, |
| eye_contact_analysis=eye_contact_analysis, |
| eye_contact_data=eye_contact_data, |
| face_analysis_data=face_analysis_data |
| ) |
| |
| |
| self.video_repo.update_status(video_id, "completed") |
| logger.info(f"Video {video_id} processing completed successfully") |
| |
| except Exception as e: |
| |
| logger.error(f"Error processing video {video_id}: {str(e)}") |
| self.video_repo.update_status(video_id, "failed") |
| |
| def _update_progress(self, video_id: str, progress: float): |
| """ |
| Update the processing progress of a video. |
| |
| Args: |
| video_id: ID of the video |
| progress: Processing progress (0-100) |
| """ |
| self.video_repo.update_progress(video_id, progress) |