import cv2 import numpy as np import tempfile import os from collections import defaultdict from typing import Dict, List, Tuple, Optional from fastapi import FastAPI, UploadFile, File, HTTPException, Form from fastapi.responses import ORJSONResponse from fastapi.encoders import jsonable_encoder from .models import Gesture, GestureResponse, GESTURE_MAPPING, FULL_GESTURE_MAPPING, PRODUCTION_GESTURE_MAPPING from .config import get_logfire_token, is_monitoring_enabled # Import the gesture detection components from .main_controller import MainController # Configure logfire monitoring if token is available logfire = None if is_monitoring_enabled(): try: import logfire logfire.configure(token=get_logfire_token()) logfire.instrument_fastapi = logfire.instrument_fastapi except ImportError: logfire = None app = FastAPI(default_response_class=ORJSONResponse) # Instrument FastAPI with logfire if monitoring is enabled if logfire is not None: logfire.instrument_fastapi(app, capture_headers=True) def process_video_for_gestures(video_path: str, detector_path: str = "models/hand_detector.onnx", classifier_path: str = "models/crops_classifier.onnx", frame_skip: int = 1) -> List[Gesture]: """ Process a video file to detect gestures using the MainController. Parameters ---------- video_path : str Path to the video file to process detector_path : str Path to the hand detection ONNX model classifier_path : str Path to the gesture classification ONNX model frame_skip : int Number of frames to skip between processing (1 = process every frame, 3 = process every 3rd frame) Returns ------- List[Gesture] List of detected gestures with duration and confidence """ # Create monitoring span for video processing span_context = None if logfire is not None: span_context = logfire.span('process_video_for_gestures', video_path=video_path, detector_path=detector_path, classifier_path=classifier_path) span_context.__enter__() try: # Initialize the main controller if logfire is not None: with logfire.span('initialize_controller'): controller = MainController(detector_path, classifier_path) else: controller = MainController(detector_path, classifier_path) # Open video file cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError(f"Could not open video file: {video_path}") # Get video properties for monitoring total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) if logfire is not None: logfire.info('Video properties', total_frames=total_frames, fps=fps, duration_seconds=total_frames/fps if fps > 0 else 0) # Track gestures per hand ID gesture_tracks: Dict[int, List[Tuple[int, float]]] = defaultdict(list) # {hand_id: [(gesture_id, confidence), ...]} frame_count = 0 processed_frames = 0 detection_stats = { 'frames_with_detections': 0, 'total_detections': 0, 'gesture_counts': defaultdict(int) } try: while True: ret, frame = cap.read() if not ret: break # Skip frames based on frame_skip parameter if frame_count % frame_skip == 0: # Process frame through the controller bboxes, ids, labels = controller(frame) processed_frames += 1 if bboxes is not None and ids is not None and labels is not None: detection_stats['frames_with_detections'] += 1 detection_stats['total_detections'] += len(bboxes) # Track gestures for each detected hand for i in range(len(bboxes)): hand_id = int(ids[i]) gesture_id = labels[i] if gesture_id is not None: # Get confidence from bbox (assuming it's the last element) confidence = 0.8 # Default confidence, could be extracted from bbox if available gesture_tracks[hand_id].append((gesture_id, confidence)) detection_stats['gesture_counts'][gesture_id] += 1 # Log individual detections for debugging if logfire is not None: gesture_name = FULL_GESTURE_MAPPING.get(gesture_id, f"unknown_{gesture_id}") logfire.debug('Hand detection', frame=frame_count, hand_id=hand_id, gesture_id=gesture_id, gesture_name=gesture_name, confidence=confidence, bbox=bboxes[i].tolist() if len(bboxes[i]) >= 4 else None) else: # Advance tracker on skipped frames to keep state consistent controller.update(np.empty((0, 5)), None) frame_count += 1 # Log progress every 100 frames if frame_count % 100 == 0 and logfire is not None: progress = (frame_count / total_frames) * 100 if total_frames > 0 else 0 logfire.info('Processing progress', frame=frame_count, total_frames=total_frames, progress_percent=round(progress, 2)) finally: cap.release() # Log final detection statistics if logfire is not None: logfire.info('Detection statistics', total_frames=frame_count, processed_frames=processed_frames, frame_skip=frame_skip, frames_with_detections=detection_stats['frames_with_detections'], total_detections=detection_stats['total_detections'], detection_rate=detection_stats['frames_with_detections']/processed_frames if processed_frames > 0 else 0, gesture_counts=dict(detection_stats['gesture_counts'])) # Process gesture tracks to find continuous gestures detected_gestures = [] for hand_id, gesture_sequence in gesture_tracks.items(): if not gesture_sequence: continue # Group consecutive identical gestures current_gesture = None current_duration = 0 current_confidence = 0.0 for gesture_id, confidence in gesture_sequence: if current_gesture is None or current_gesture != gesture_id: # Save previous gesture if it was significant # Adjust minimum duration based on frame skip min_duration = max(5, frame_skip * 2) # At least 2 processed frames if current_gesture is not None and current_duration >= min_duration: gesture_name = PRODUCTION_GESTURE_MAPPING.get(current_gesture, f"unknown_{current_gesture}") avg_confidence = current_confidence / current_duration if current_duration > 0 else 0.0 # Scale duration back to original frame count scaled_duration = current_duration * frame_skip detected_gestures.append(Gesture( gesture=gesture_name, duration=scaled_duration, confidence=avg_confidence )) # Log significant gesture detection if logfire is not None: logfire.info('Significant gesture detected', hand_id=hand_id, gesture=gesture_name, duration_frames=current_duration, confidence=avg_confidence) # Start new gesture current_gesture = gesture_id current_duration = 1 current_confidence = confidence else: # Continue current gesture current_duration += 1 current_confidence += confidence # Don't forget the last gesture min_duration = max(5, frame_skip * 2) # At least 2 processed frames if current_gesture is not None and current_duration >= min_duration: gesture_name = PRODUCTION_GESTURE_MAPPING.get(current_gesture, f"unknown_{current_gesture}") avg_confidence = current_confidence / current_duration if current_duration > 0 else 0.0 # Scale duration back to original frame count scaled_duration = current_duration * frame_skip detected_gestures.append(Gesture( gesture=gesture_name, duration=scaled_duration, confidence=avg_confidence )) # Log final gesture detection if logfire is not None: logfire.info('Final gesture detected', hand_id=hand_id, gesture=gesture_name, duration_frames=current_duration, confidence=avg_confidence) # Log final results if logfire is not None: logfire.info('Video processing completed', total_gestures_detected=len(detected_gestures), unique_hands=len(gesture_tracks), gestures=[{'gesture': g.gesture, 'duration': g.duration, 'confidence': g.confidence} for g in detected_gestures]) return detected_gestures finally: if span_context is not None: span_context.__exit__(None, None, None) @app.get("/health") async def health(): """Health check endpoint.""" if logfire is not None: logfire.info('Health check requested') return {"message": "OK"} @app.post("/gestures", response_model=GestureResponse) async def detect_gestures(video: UploadFile = File(...), frame_skip: int = Form(1)): """ Detect gestures in an uploaded video file. Parameters ---------- video : UploadFile The video file to process frame_skip : int Number of frames to skip between processing (1 = process every frame, 3 = process every 3rd frame) Returns ------- GestureResponse Response containing detected gestures with duration and confidence """ # Log request details if logfire is not None: logfire.info('Gesture detection request received', filename=video.filename, content_type=video.content_type, content_length=video.size if hasattr(video, 'size') else 'unknown') # Validate file type if not video.content_type.startswith('video/'): if logfire is not None: logfire.warning('Invalid file type received', content_type=video.content_type) raise HTTPException(status_code=400, detail="File must be a video") # Create temporary file to save uploaded video with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_file: try: # Write uploaded content to temporary file content = await video.read() temp_file.write(content) temp_file.flush() if logfire is not None: logfire.info('Video file saved for processing', temp_file=temp_file.name, file_size_bytes=len(content)) # Process the video with frame skip parameter gestures = process_video_for_gestures(temp_file.name, frame_skip=frame_skip) if logfire is not None: logfire.info('Gesture detection completed successfully', total_gestures=len(gestures), gestures=[g.gesture for g in gestures]) return GestureResponse(gestures=gestures) except Exception as e: if logfire is not None: logfire.error('Error processing video', error=str(e), error_type=type(e).__name__, temp_file=temp_file.name) raise HTTPException(status_code=500, detail=f"Error processing video: {str(e)}") finally: # Clean up temporary file if os.path.exists(temp_file.name): os.unlink(temp_file.name) if logfire is not None: logfire.debug('Temporary file cleaned up', temp_file=temp_file.name)