File size: 10,336 Bytes
bc18e51
 
 
 
 
 
 
 
 
 
 
 
a601b1d
 
bc18e51
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
"""
Pose Analyzer - Core MediaPipe pose detection engine
Handles video frame processing and skeleton overlay generation
"""

import cv2
import numpy as np
import mediapipe as mp
from typing import List, Tuple, Optional, Dict, Any
from dataclasses import dataclass
import logging

from app.config import Config
from app.utils.helpers import timing_decorator

logger = logging.getLogger(__name__)


@dataclass
class PoseKeypoints:
    """Data class for storing pose keypoints from a single frame"""
    landmarks: np.ndarray  # Shape: (33, 3) - x, y, visibility
    frame_number: int
    timestamp: float
    confidence: float  # Average visibility score


class PoseAnalyzer:
    """
    MediaPipe-based pose detection and analysis engine
    Processes video frames to extract body keypoints and generate skeleton overlays
    """
    
    # MediaPipe pose connections for skeleton drawing
    POSE_CONNECTIONS = mp.solutions.pose.POSE_CONNECTIONS
    
    def __init__(self, config: Optional[Dict[str, Any]] = None):
        """
        Initialize pose analyzer with MediaPipe
        
        Args:
            config: Optional configuration dictionary (uses Config class defaults if None)
        """
        self.config = config or Config.get_mediapipe_config()
        
        # Initialize MediaPipe Pose
        self.mp_pose = mp.solutions.pose
        self.mp_drawing = mp.solutions.drawing_utils
        self.mp_drawing_styles = mp.solutions.drawing_styles
        
        # Create pose detector instance
        self.pose = self.mp_pose.Pose(
            static_image_mode=False,
            model_complexity=self.config['model_complexity'],
            smooth_landmarks=self.config['smooth_landmarks'],
            min_detection_confidence=self.config['min_detection_confidence'],
            min_tracking_confidence=self.config['min_tracking_confidence']
        )
        
        self.keypoints_history: List[PoseKeypoints] = []
        logger.info("PoseAnalyzer initialized with MediaPipe Pose")
    
    def process_frame(self, frame: np.ndarray, frame_number: int, 
                     timestamp: float) -> Optional[PoseKeypoints]:
        """
        Process a single video frame to detect pose
        
        Args:
            frame: BGR image from OpenCV
            frame_number: Frame index in video
            timestamp: Timestamp in seconds
        
        Returns:
            PoseKeypoints object if pose detected, None otherwise
        """
        # Convert BGR to RGB for MediaPipe
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # Process frame with MediaPipe
        results = self.pose.process(rgb_frame)
        
        if results.pose_landmarks:
            # Extract landmarks as numpy array
            landmarks = self._extract_landmarks(results.pose_landmarks)
            
            # Calculate average confidence (visibility score)
            confidence = np.mean(landmarks[:, 2])
            
            # Create PoseKeypoints object
            pose_data = PoseKeypoints(
                landmarks=landmarks,
                frame_number=frame_number,
                timestamp=timestamp,
                confidence=confidence
            )
            
            return pose_data
        
        return None
    
    def _extract_landmarks(self, pose_landmarks) -> np.ndarray:
        """
        Extract landmarks from MediaPipe results as numpy array
        
        Args:
            pose_landmarks: MediaPipe pose landmarks object
        
        Returns:
            Numpy array of shape (33, 3) containing x, y, visibility
        """
        landmarks = []
        for landmark in pose_landmarks.landmark:
            landmarks.append([landmark.x, landmark.y, landmark.visibility])
        return np.array(landmarks)
    
    def draw_skeleton_overlay(self, frame: np.ndarray, 
                              pose_keypoints: Optional[PoseKeypoints],
                              draw_confidence: bool = True) -> np.ndarray:
        """
        Draw skeleton overlay on video frame
        
        Args:
            frame: Original BGR frame
            pose_keypoints: Detected pose keypoints
            draw_confidence: Whether to display confidence score
        
        Returns:
            Frame with skeleton overlay
        """
        annotated_frame = frame.copy()
        
        if pose_keypoints is None:
            # Draw "No pose detected" message
            cv2.putText(
                annotated_frame,
                "No pose detected",
                (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.7,
                (0, 0, 255),
                2
            )
            return annotated_frame
        
        # Only draw if confidence is above threshold
        if pose_keypoints.confidence < Config.SKELETON_CONFIDENCE_THRESHOLD:
            return annotated_frame
        
        # Get frame dimensions
        h, w = frame.shape[:2]
        
        # Convert normalized coordinates to pixel coordinates
        landmarks_px = pose_keypoints.landmarks.copy()
        landmarks_px[:, 0] *= w  # x coordinates
        landmarks_px[:, 1] *= h  # y coordinates
        
        # Draw connections (skeleton lines)
        for connection in self.POSE_CONNECTIONS:
            start_idx, end_idx = connection
            
            start_point = landmarks_px[start_idx]
            end_point = landmarks_px[end_idx]
            
            # Check visibility of both points
            if (start_point[2] > Config.SKELETON_CONFIDENCE_THRESHOLD and 
                end_point[2] > Config.SKELETON_CONFIDENCE_THRESHOLD):
                
                start_pos = (int(start_point[0]), int(start_point[1]))
                end_pos = (int(end_point[0]), int(end_point[1]))
                
                # Draw line with color gradient based on confidence
                avg_confidence = (start_point[2] + end_point[2]) / 2
                color = self._get_confidence_color(avg_confidence)
                
                cv2.line(
                    annotated_frame,
                    start_pos,
                    end_pos,
                    color,
                    Config.SKELETON_LINE_THICKNESS,
                    cv2.LINE_AA
                )
        
        # Draw keypoints (circles)
        for i, landmark in enumerate(landmarks_px):
            if landmark[2] > Config.SKELETON_CONFIDENCE_THRESHOLD:
                center = (int(landmark[0]), int(landmark[1]))
                color = self._get_confidence_color(landmark[2])
                
                cv2.circle(
                    annotated_frame,
                    center,
                    Config.SKELETON_CIRCLE_RADIUS,
                    color,
                    -1,
                    cv2.LINE_AA
                )
        
        # Draw confidence score
        if draw_confidence:
            confidence_text = f"Confidence: {pose_keypoints.confidence:.2f}"
            cv2.putText(
                annotated_frame,
                confidence_text,
                (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.7,
                (0, 255, 0),
                2
            )
        
        # Draw frame info
        frame_text = f"Frame: {pose_keypoints.frame_number}"
        cv2.putText(
            annotated_frame,
            frame_text,
            (10, 60),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.6,
            (255, 255, 255),
            1
        )
        
        return annotated_frame
    
    def _get_confidence_color(self, confidence: float) -> Tuple[int, int, int]:
        """
        Get color based on confidence score (green to yellow to red)
        
        Args:
            confidence: Confidence score (0-1)
        
        Returns:
            BGR color tuple
        """
        if confidence >= 0.8:
            return (0, 255, 0)  # Green - high confidence
        elif confidence >= 0.6:
            return (0, 255, 255)  # Yellow - medium confidence
        else:
            return (0, 165, 255)  # Orange - low confidence
    
    @timing_decorator
    def process_video_batch(self, frames: List[np.ndarray], 
                           start_frame_number: int,
                           fps: float) -> List[Optional[PoseKeypoints]]:
        """
        Process a batch of video frames efficiently
        
        Args:
            frames: List of BGR frames
            start_frame_number: Starting frame number
            fps: Video frames per second
        
        Returns:
            List of PoseKeypoints (None for frames without detected pose)
        """
        results = []
        
        for i, frame in enumerate(frames):
            frame_number = start_frame_number + i
            timestamp = frame_number / fps
            
            pose_data = self.process_frame(frame, frame_number, timestamp)
            results.append(pose_data)
            
            if pose_data:
                self.keypoints_history.append(pose_data)
        
        logger.info(f"Processed {len(frames)} frames, detected pose in "
                   f"{sum(1 for r in results if r is not None)} frames")
        
        return results
    
    def get_keypoints_array(self) -> np.ndarray:
        """
        Get all detected keypoints as a numpy array
        
        Returns:
            Array of shape (N, 33, 3) where N is number of detected frames
        """
        if not self.keypoints_history:
            return np.array([])
        
        return np.array([kp.landmarks for kp in self.keypoints_history])
    
    def get_average_confidence(self) -> float:
        """
        Calculate average confidence across all processed frames
        
        Returns:
            Average confidence score
        """
        if not self.keypoints_history:
            return 0.0
        
        confidences = [kp.confidence for kp in self.keypoints_history]
        return np.mean(confidences)
    
    def reset(self):
        """Reset keypoints history for new video processing"""
        self.keypoints_history.clear()
        logger.info("PoseAnalyzer reset")
    
    def __del__(self):
        """Cleanup MediaPipe resources"""
        if hasattr(self, 'pose'):
            self.pose.close()