Spaces:
Paused
Paused
| """ | |
| Video Processor - Handles video I/O, frame processing, and overlay generation | |
| Manages the complete video processing pipeline | |
| """ | |
| import cv2 | |
| import numpy as np | |
| from pathlib import Path | |
| from typing import Optional, Callable, Dict, Any, List, Tuple | |
| import logging | |
| from app.config import Config | |
| from app.core.pose_analyzer import PoseAnalyzer, PoseKeypoints | |
| from app.core.movement_classifier import MovementClassifier, MovementMetrics | |
| from app.utils.helpers import timing_decorator | |
| from app.utils.file_utils import format_file_size | |
| logger = logging.getLogger(__name__) | |
| import inspect | |
| import asyncio | |
| class VideoProcessor: | |
| """ | |
| Manages video loading, processing, and output generation | |
| Coordinates between pose analysis and movement classification | |
| """ | |
| def __init__(self): | |
| """Initialize video processor with analyzer components""" | |
| self.pose_analyzer = PoseAnalyzer() | |
| self.movement_classifier = MovementClassifier() | |
| self.current_video_path: Optional[Path] = None | |
| self.video_info: Dict[str, Any] = {} | |
| logger.info("VideoProcessor initialized") | |
| def _safe_callback(self, callback, *args, **kwargs): | |
| """Safely handle async or sync progress callbacks.""" | |
| if inspect.iscoroutinefunction(callback): | |
| asyncio.create_task(callback(*args, **kwargs)) | |
| else: | |
| callback(*args, **kwargs) | |
| def load_video(self, video_path: Path) -> Dict[str, Any]: | |
| """ | |
| Load video and extract metadata | |
| Args: | |
| video_path: Path to video file | |
| Returns: | |
| Dictionary with video information | |
| Raises: | |
| ValueError: If video cannot be loaded | |
| """ | |
| if not video_path.exists(): | |
| raise ValueError(f"Video file not found: {video_path}") | |
| # Open video with OpenCV | |
| cap = cv2.VideoCapture(str(video_path)) | |
| if not cap.isOpened(): | |
| raise ValueError(f"Cannot open video file: {video_path}") | |
| # Extract video properties | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| duration = frame_count / fps if fps > 0 else 0 | |
| cap.release() | |
| # Validate video duration | |
| if duration > Config.MAX_VIDEO_DURATION: | |
| raise ValueError( | |
| f"Video too long: {duration:.1f}s (max: {Config.MAX_VIDEO_DURATION}s)" | |
| ) | |
| # Store video info | |
| self.current_video_path = video_path | |
| size_bytes = video_path.stat().st_size | |
| self.video_info = { | |
| "path": str(video_path), | |
| "filename": video_path.name, | |
| "fps": fps, | |
| "frame_count": frame_count, | |
| "width": width, | |
| "height": height, | |
| "duration": duration, | |
| "size_bytes": size_bytes, | |
| "size": format_file_size(size_bytes) | |
| } | |
| logger.info(f"Loaded video: {video_path.name} ({width}x{height}, " | |
| f"{fps:.1f} fps, {duration:.1f}s)") | |
| return self.video_info | |
| def process_video(self, video_path: Path, output_path: Path, | |
| progress_callback: Optional[Callable[[float, str], None]] = None) -> Dict[str, Any]: | |
| """ | |
| Process video with pose detection and movement analysis | |
| Args: | |
| video_path: Input video path | |
| output_path: Output video path | |
| progress_callback: Optional callback for progress updates (progress, message) | |
| Returns: | |
| Dictionary with processing results and analysis | |
| """ | |
| # Load video info | |
| video_info = self.load_video(video_path) | |
| # Reset analyzers for fresh processing | |
| self.pose_analyzer.reset() | |
| self.movement_classifier.reset() | |
| # Open video for reading | |
| cap = cv2.VideoCapture(str(video_path)) | |
| # Setup video writer with codec fallback | |
| codecs_to_try = [ | |
| ('avc1', 'H.264'), # Try H.264 first | |
| ('mp4v', 'MPEG-4'), # Fallback to MPEG-4 | |
| ('XVID', 'Xvid'), | |
| ('MJPG', 'Motion JPEG') | |
| ] | |
| out = None | |
| last_error = None | |
| for codec_code, codec_name in codecs_to_try: | |
| try: | |
| logger.info(f"Trying codec: {codec_name} ({codec_code})") | |
| fourcc = cv2.VideoWriter_fourcc(*codec_code) | |
| out = cv2.VideoWriter( | |
| str(output_path), | |
| fourcc, | |
| video_info['fps'], | |
| (video_info['width'], video_info['height']) | |
| ) | |
| if out.isOpened(): | |
| logger.info(f"✅ Successfully initialized VideoWriter with {codec_name} codec") | |
| break | |
| else: | |
| logger.warning(f"Failed to open VideoWriter with {codec_name}") | |
| out.release() | |
| out = None | |
| except Exception as e: | |
| logger.warning(f"Error with {codec_name} codec: {e}") | |
| last_error = e | |
| if out: | |
| out.release() | |
| out = None | |
| if out is None or not out.isOpened(): | |
| error_msg = f"Cannot create output video with any codec. Last error: {last_error}" | |
| logger.error(error_msg) | |
| raise ValueError(error_msg) | |
| frame_number = 0 | |
| processed_frames = 0 | |
| all_keypoints: List[PoseKeypoints] = [] | |
| try: | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Calculate timestamp | |
| timestamp = frame_number / video_info['fps'] | |
| # Process frame with pose detection | |
| pose_keypoints = self.pose_analyzer.process_frame( | |
| frame, frame_number, timestamp | |
| ) | |
| if pose_keypoints: | |
| all_keypoints.append(pose_keypoints) | |
| processed_frames += 1 | |
| # Draw skeleton overlay | |
| annotated_frame = self.pose_analyzer.draw_skeleton_overlay( | |
| frame, pose_keypoints, draw_confidence=True | |
| ) | |
| # Add processing status box | |
| annotated_frame = self._add_status_box( | |
| annotated_frame, frame_number, video_info, pose_keypoints | |
| ) | |
| # Write frame to output | |
| out.write(annotated_frame) | |
| # Update progress | |
| if progress_callback: | |
| progress = (frame_number + 1) / video_info['frame_count'] | |
| message = f"Processing frame {frame_number + 1}/{video_info['frame_count']}" | |
| # progress_callback(progress, message) | |
| self._safe_callback(progress_callback, progress, message) | |
| frame_number += 1 | |
| finally: | |
| cap.release() | |
| out.release() | |
| # Analyze movement patterns | |
| if progress_callback: | |
| progress_callback(0.95, "Analyzing movements...") | |
| movement_metrics = None | |
| if all_keypoints: | |
| movement_metrics = self.movement_classifier.analyze_sequence(all_keypoints) | |
| # Get rhythm analysis | |
| rhythm_analysis = {} | |
| if all_keypoints: | |
| rhythm_analysis = self.movement_classifier.detect_rhythm_patterns( | |
| all_keypoints, video_info['fps'] | |
| ) | |
| # Calculate smoothness | |
| smoothness = 0.0 | |
| if all_keypoints: | |
| smoothness = self.movement_classifier.calculate_movement_smoothness( | |
| all_keypoints | |
| ) | |
| # Compile results | |
| results = { | |
| "video_info": video_info, | |
| "processing": { | |
| "total_frames": frame_number, | |
| "frames_with_pose": processed_frames, | |
| "detection_rate": processed_frames / frame_number if frame_number > 0 else 0, | |
| "output_path": str(output_path) | |
| }, | |
| "pose_analysis": { | |
| "average_confidence": self.pose_analyzer.get_average_confidence(), | |
| "keypoints_detected": len(all_keypoints) | |
| }, | |
| "movement_analysis": self._format_movement_metrics(movement_metrics) if movement_metrics else {}, | |
| "rhythm_analysis": rhythm_analysis, | |
| "smoothness_score": smoothness, | |
| "summary": self.movement_classifier.get_movement_summary() | |
| } | |
| if progress_callback: | |
| progress_callback(1.0, "Processing complete!") | |
| logger.info(f"Video processing complete: {output_path.name}") | |
| return results | |
| def _add_status_box(self, frame: np.ndarray, frame_number: int, | |
| video_info: Dict[str, Any], | |
| pose_keypoints: Optional[PoseKeypoints]) -> np.ndarray: | |
| """ | |
| Add status information box to frame | |
| Args: | |
| frame: Video frame | |
| frame_number: Current frame number | |
| video_info: Video metadata | |
| pose_keypoints: Detected pose keypoints (if any) | |
| Returns: | |
| Frame with status box | |
| """ | |
| # Create semi-transparent overlay | |
| overlay = frame.copy() | |
| h, w = frame.shape[:2] | |
| # Status box dimensions | |
| box_height = 120 | |
| box_width = 300 | |
| box_x = w - box_width - 10 | |
| box_y = 10 | |
| # Draw semi-transparent rectangle | |
| cv2.rectangle( | |
| overlay, | |
| (box_x, box_y), | |
| (box_x + box_width, box_y + box_height), | |
| (0, 0, 0), | |
| -1 | |
| ) | |
| # Blend with original frame | |
| alpha = 0.6 | |
| frame = cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0) | |
| # Add text information | |
| text_x = box_x + 10 | |
| text_y = box_y + 25 | |
| line_height = 25 | |
| # Frame info | |
| frame_text = f"Frame: {frame_number}/{video_info['frame_count']}" | |
| cv2.putText(frame, frame_text, (text_x, text_y), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) | |
| # FPS info | |
| fps_text = f"FPS: {video_info['fps']:.1f}" | |
| cv2.putText(frame, fps_text, (text_x, text_y + line_height), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) | |
| # Pose detection status | |
| if pose_keypoints: | |
| status_text = "Pose: DETECTED" | |
| status_color = (0, 255, 0) | |
| conf_text = f"Conf: {pose_keypoints.confidence:.2f}" | |
| else: | |
| status_text = "Pose: NOT DETECTED" | |
| status_color = (0, 0, 255) | |
| conf_text = "Conf: N/A" | |
| cv2.putText(frame, status_text, (text_x, text_y + line_height * 2), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, status_color, 1) | |
| cv2.putText(frame, conf_text, (text_x, text_y + line_height * 3), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) | |
| return frame | |
| def _format_movement_metrics(self, metrics: MovementMetrics) -> Dict[str, Any]: | |
| """ | |
| Format movement metrics for JSON serialization | |
| Args: | |
| metrics: MovementMetrics object | |
| Returns: | |
| Dictionary with formatted metrics | |
| """ | |
| return { | |
| "movement_type": metrics.movement_type.value, | |
| "intensity": round(metrics.intensity, 2), | |
| "velocity": round(metrics.velocity, 4), | |
| "body_part_activity": { | |
| part: round(activity, 2) | |
| for part, activity in metrics.body_part_activity.items() | |
| }, | |
| "frame_range": metrics.frame_range | |
| } | |
| def extract_frame(self, video_path: Path, frame_number: int) -> Optional[np.ndarray]: | |
| """ | |
| Extract a specific frame from video | |
| Args: | |
| video_path: Path to video file | |
| frame_number: Frame index to extract | |
| Returns: | |
| Frame as numpy array, or None if extraction fails | |
| """ | |
| cap = cv2.VideoCapture(str(video_path)) | |
| if not cap.isOpened(): | |
| return None | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) | |
| ret, frame = cap.read() | |
| cap.release() | |
| return frame if ret else None | |
| def create_thumbnail(self, video_path: Path, output_path: Path, | |
| timestamp: float = 0.0) -> bool: | |
| """ | |
| Create thumbnail image from video | |
| Args: | |
| video_path: Path to video file | |
| output_path: Output image path | |
| timestamp: Timestamp in seconds for thumbnail | |
| Returns: | |
| True if successful | |
| """ | |
| cap = cv2.VideoCapture(str(video_path)) | |
| if not cap.isOpened(): | |
| return False | |
| # Seek to timestamp | |
| cap.set(cv2.CAP_PROP_POS_MSEC, timestamp * 1000) | |
| ret, frame = cap.read() | |
| cap.release() | |
| if not ret: | |
| return False | |
| # Save thumbnail | |
| success = cv2.imwrite(str(output_path), frame) | |
| return success |