test / behavior_backend /app /services /processing /processing_service.py
hibatorrahmen's picture
Add backend application and Dockerfile
8ae78b0
from fastapi import BackgroundTasks
from sqlalchemy.orm import Session
import json
import logging
import asyncio
from app.db.repositories.video import VideoRepository
from app.db.repositories.results import ResultsRepository
from app.models.processing import ProcessingRequest, ProcessingStatus
from app.services.processing.video_processor import process_video
from app.core.exceptions import VideoNotFoundError, ResultNotFoundError, VideoProcessingError
from app.utils.logging_utils import setup_logger
from app.db.base import SessionLocal
# Configure logging
logger = setup_logger(__name__)
class ProcessingService:
"""Service for video processing operations."""
def __init__(self, db: Session):
self.db = db
self.video_repo = VideoRepository(db)
self.results_repo = ResultsRepository(db)
async def process_video(self, request: ProcessingRequest, background_tasks: BackgroundTasks) -> ProcessingStatus:
"""
Process a video.
Args:
request: Processing request parameters
background_tasks: FastAPI background tasks
Returns:
ProcessingStatus object
Raises:
VideoNotFoundError: If the video is not found
VideoProcessingError: If there is an error processing the video
"""
video_id = request.video_id
# Check if video exists
db_video = self.video_repo.get_by_id(video_id)
if not db_video:
raise VideoNotFoundError(video_id)
try:
# Update status
self.video_repo.update_status(video_id, "processing")
# Get model name from request or use default
model_name = getattr(request, 'model_name', "gpt-4o")
# Start processing in background
background_tasks.add_task(
self._process_video_task,
video_id=video_id,
video_path=db_video.file_path,
frame_rate=request.frame_rate,
backend=request.backend,
language=request.language,
generate_annotated_video=request.generate_annotated_video,
model_name=model_name
)
return ProcessingStatus(
video_id=video_id,
status="processing"
)
except Exception as e:
logger.error(f"Error processing video {video_id}: {str(e)}")
self.video_repo.update_status(video_id, "failed")
raise VideoProcessingError(f"Error processing video: {str(e)}")
def get_processing_status(self, video_id: str) -> ProcessingStatus:
"""
Get the processing status of a video.
Args:
video_id: ID of the video
Returns:
ProcessingStatus object
Raises:
VideoNotFoundError: If the video is not found
"""
db_video = self.video_repo.get_by_id(video_id)
if not db_video:
raise VideoNotFoundError(video_id)
return ProcessingStatus(
video_id=video_id,
status=db_video.status
)
def get_processing_results(self, video_id: str) -> dict:
"""
Get the processing results of a video.
Args:
video_id: ID of the video
Returns:
Dictionary with processing results
Raises:
VideoNotFoundError: If the video is not found
ResultNotFoundError: If the processing result is not found
"""
# Get the video
video = self.video_repo.get_by_id(video_id)
if not video:
raise VideoNotFoundError(video_id)
# Get the processing result
result = self.results_repo.get_by_video_id(video_id)
if not result:
return {
"status": video.status,
"message": "No processing results available yet"
}
# Prepare response
response = {
"status": video.status,
"processing_date": result.processing_date.isoformat(),
"transcript": result.transcript,
"emotion_analysis": result.emotion_analysis,
"overall_summary": result.overall_summary,
"transcript_analysis": result.transcript_analysis,
"recommendations": result.recommendations,
"body_language_analysis": result.body_language_analysis,
"body_language_data": result.body_language_data,
"eye_contact_analysis": result.eye_contact_analysis,
"eye_contact_data": result.eye_contact_data,
"face_analysis_data": result.face_analysis_data
}
return response
async def _process_video_task(
self,
video_id: str,
video_path: str,
frame_rate: int,
backend: str,
language: str,
generate_annotated_video: bool,
model_name: str = "gpt-4o"
):
"""
Background task to process a video.
Args:
video_id: ID of the video
video_path: Path to the video file
frame_rate: Frame rate for processing
backend: Backend for face detection
language: Language of the video
generate_annotated_video: Whether to generate an annotated video
model_name: The name of the model to use for AI analysis (default: gpt-4o)
"""
try:
# Force mediapipe backend for best GPU performance on Mac M3
if backend == 'opencv' or not backend or backend == "retinaface":
logger.info(f"Backend '{backend}' doesn't support GPU acceleration or is not recommended.")
logger.info(f"Switching to 'mediapipe' for GPU-accelerated frame analysis.")
backend = "mediapipe"
# Ensure we're using a GPU-compatible backend
if backend not in ['mediapipe', 'ssd', 'mtcnn']:
logger.info(f"Backend '{backend}' may not support GPU acceleration.")
logger.info(f"Consider using 'mediapipe' for best GPU performance on Mac M3.")
logger.info(f"Starting video processing task for {video_id}")
logger.info(f"Video path: {video_path}")
logger.info(f"Frame rate: {frame_rate}")
logger.info(f"Backend: {backend}")
logger.info(f"Language: {language}")
logger.info(f"Generate annotated video: {generate_annotated_video}")
logger.info(f"Model name for analysis: {model_name}")
logger.info(f"Offloading video processing for {video_id} to a separate thread.")
# Process the video in a separate thread to avoid blocking the event loop
transcript, analysis = await asyncio.to_thread(
process_video, # The synchronous, CPU-bound function
video_path=video_path,
frame_rate=frame_rate,
backend=backend,
language=language,
generate_annotated_video=generate_annotated_video,
video_id=video_id,
status_callback=lambda progress: self._update_progress(video_id, progress),
model_name=model_name
)
logger.info(f"Threaded video processing for {video_id} completed.")
# Parse the comprehensive analysis
try:
analysis_data = json.loads(analysis) if analysis else {}
logger.info(f"Received analysis data: {analysis_data}")
# Log the keys for debugging
logger.info(f"Keys in analysis_data: {list(analysis_data.keys())}")
# Extract data from the comprehensive analysis
emotion_analysis = analysis_data.get("Emotion Analysis", {})
overall_summary = analysis_data.get("Overall Summary", "")
transcript_analysis = analysis_data.get("Transcript Analysis", {})
recommendations = analysis_data.get("Recommendations", {})
body_language_analysis = analysis_data.get("Body Language Analysis", {})
eye_contact_analysis = analysis_data.get("Eye Contact Analysis", {})
# Try both capitalized and non-capitalized versions since the format may vary
eye_contact_data = analysis_data.get("eye_contact_analysis", {})
body_language_data = analysis_data.get("body_language_analysis", {})
face_analysis_data = analysis_data.get("face_analysis", {})
# Check if data exists under any key - key names might be inconsistent
if not eye_contact_data and "eye_contact_analysis" in str(analysis_data).lower():
logger.info(f"Searching for eye_contact_data in analysis_data string representation")
for key in analysis_data.keys():
if "eye" in key.lower() and "contact" in key.lower():
logger.info(f"Found potential eye contact key: {key}")
eye_contact_data = analysis_data.get(key, {})
break
if not body_language_data and "body_language_analysis" in str(analysis_data).lower():
logger.info(f"Searching for body_language_data in analysis_data string representation")
for key in analysis_data.keys():
if "body" in key.lower() and "language" in key.lower():
logger.info(f"Found potential body language key: {key}")
body_language_data = analysis_data.get(key, {})
break
if not face_analysis_data and "face_analysis" in str(analysis_data).lower():
logger.info(f"Searching for face_analysis_data in analysis_data string representation")
for key in analysis_data.keys():
if "face" in key.lower() and "analysis" in key.lower():
logger.info(f"Found potential face analysis key: {key}")
face_analysis_data = analysis_data.get(key, {})
break
logger.info(f"Parsed analysis data: {emotion_analysis, overall_summary, transcript_analysis, recommendations, body_language_analysis}")
logger.info(f"Parsed eye contact data: {eye_contact_data}")
logger.info(f"Parsed body language data: {body_language_data}")
logger.info(f"Parsed face analysis data: {face_analysis_data}")
except Exception as e:
logger.error(f"Error parsing analysis JSON: {e}")
emotion_analysis = {}
overall_summary = ""
transcript_analysis = {}
recommendations = {}
body_language_data = {}
body_language_analysis = {}
eye_contact_data = {}
eye_contact_analysis = {}
face_analysis_data = {}
# Save results to database
self.results_repo.create(
video_id=video_id,
transcript=transcript or "",
emotion_analysis=emotion_analysis,
overall_summary=overall_summary,
transcript_analysis=transcript_analysis,
recommendations=recommendations,
body_language_analysis=body_language_analysis,
body_language_data=body_language_data,
eye_contact_analysis=eye_contact_analysis,
eye_contact_data=eye_contact_data,
face_analysis_data=face_analysis_data
)
# Update video status
self.video_repo.update_status(video_id, "completed")
logger.info(f"Video {video_id} processing completed successfully")
except Exception as e:
# Update status on error
logger.error(f"Error processing video {video_id}: {str(e)}")
self.video_repo.update_status(video_id, "failed")
def _update_progress(self, video_id: str, progress: float):
"""
Update the processing progress of a video.
Args:
video_id: ID of the video
progress: Processing progress (0-100)
"""
self.video_repo.update_progress(video_id, progress)