File size: 17,466 Bytes
bc18e51
 
 
 
 
 
 
 
 
 
 
a601b1d
 
 
bc18e51
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
"""
Movement Classifier - Advanced movement intelligence and analysis
Classifies dance movements, calculates intensity, and detects patterns
"""

import numpy as np
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
from enum import Enum
import logging

from app.config import Config
from app.core.pose_analyzer import PoseKeypoints
from app.utils.validation import safe_divide

logger = logging.getLogger(__name__)


class MovementType(Enum):
    """Enumeration of movement types"""
    STANDING = "Standing"
    WALKING = "Walking"
    DANCING = "Dancing"
    JUMPING = "Jumping"
    CROUCHING = "Crouching"
    UNKNOWN = "Unknown"


@dataclass
class MovementMetrics:
    """Data class for movement analysis results"""
    movement_type: MovementType
    intensity: float  # 0-100 scale
    velocity: float  # Average velocity
    body_part_activity: Dict[str, float]  # Activity level per body part
    frame_range: Tuple[int, int]  # Start and end frame numbers


class MovementClassifier:
    """
    Analyzes pose sequences to classify movements and calculate metrics
    """
    
    # Body part groupings using MediaPipe landmark indices
    BODY_PARTS = {
        "head": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10],  # Face and head
        "torso": [11, 12, 23, 24],  # Shoulders and hips
        "left_arm": [11, 13, 15, 17, 19, 21],  # Left shoulder to hand
        "right_arm": [12, 14, 16, 18, 20, 22],  # Right shoulder to hand
        "left_leg": [23, 25, 27, 29, 31],  # Left hip to foot
        "right_leg": [24, 26, 28, 30, 32]  # Right hip to foot
    }
    
    def __init__(self, smoothing_window: int = 5):
        """
        Initialize movement classifier
        
        Args:
            smoothing_window: Number of frames for smoothing calculations
        """
        self.smoothing_window = smoothing_window
        self.movement_history: List[MovementMetrics] = []
        logger.info("MovementClassifier initialized")
    
    def analyze_sequence(self, keypoints_sequence: List[PoseKeypoints]) -> MovementMetrics:
        """
        Analyze a sequence of pose keypoints to classify movement
        
        Args:
            keypoints_sequence: List of detected pose keypoints
        
        Returns:
            MovementMetrics object with analysis results
        """
        if not keypoints_sequence:
            return self._create_empty_metrics()
        
        # Calculate velocities between consecutive frames
        velocities = self._calculate_velocities(keypoints_sequence)
        
        # Calculate average velocity (overall movement speed)
        avg_velocity = np.mean(velocities) if len(velocities) > 0 else 0.0
        
        # Classify movement type based on velocity and pose characteristics
        movement_type = self._classify_movement(keypoints_sequence, avg_velocity)
        
        # Calculate movement intensity (0-100 scale)
        intensity = self._calculate_intensity(velocities, movement_type)
        
        # Analyze activity per body part
        body_part_activity = self._calculate_body_part_activity(keypoints_sequence)
        
        # Get frame range
        frame_range = (
            keypoints_sequence[0].frame_number,
            keypoints_sequence[-1].frame_number
        )
        
        metrics = MovementMetrics(
            movement_type=movement_type,
            intensity=intensity,
            velocity=avg_velocity,
            body_part_activity=body_part_activity,
            frame_range=frame_range
        )
        
        self.movement_history.append(metrics)
        
        logger.info(f"Analyzed sequence: {movement_type.value}, "
                   f"Intensity: {intensity:.1f}, Velocity: {avg_velocity:.4f}")
        
        return metrics
    
    def _calculate_velocities(self, keypoints_sequence: List[PoseKeypoints]) -> np.ndarray:
        """
        Calculate frame-to-frame velocities for all keypoints
        
        Args:
            keypoints_sequence: List of pose keypoints
        
        Returns:
            Array of velocities (one per frame transition)
        """
        if len(keypoints_sequence) < 2:
            return np.array([0.0])
        
        velocities = []
        
        for i in range(1, len(keypoints_sequence)):
            prev_landmarks = keypoints_sequence[i-1].landmarks[:, :2]  # x, y only
            curr_landmarks = keypoints_sequence[i].landmarks[:, :2]
            
            # Calculate Euclidean distance for each keypoint
            displacement = np.linalg.norm(curr_landmarks - prev_landmarks, axis=1)
            
            # Average displacement across all keypoints
            avg_displacement = np.mean(displacement)
            
            # Time difference (assuming constant fps)
            time_diff = keypoints_sequence[i].timestamp - keypoints_sequence[i-1].timestamp
            
            # Velocity = displacement / time
            velocity = safe_divide(avg_displacement, time_diff, 0.0)
            velocities.append(velocity)
        
        return np.array(velocities)
    
    def _classify_movement(self, keypoints_sequence: List[PoseKeypoints], 
                          avg_velocity: float) -> MovementType:
        """
        Classify movement type based on velocity and pose characteristics
        
        Args:
            keypoints_sequence: List of pose keypoints
            avg_velocity: Average velocity across sequence
        
        Returns:
            MovementType classification
        """
        # Check for jumping (vertical movement of center of mass)
        if self._detect_jumping(keypoints_sequence):
            return MovementType.JUMPING
        
        # Check for crouching (low body position)
        if self._detect_crouching(keypoints_sequence):
            return MovementType.CROUCHING
        
        # Classify based on velocity thresholds
        if avg_velocity < Config.VELOCITY_STANDING:
            return MovementType.STANDING
        elif avg_velocity < Config.VELOCITY_WALKING:
            return MovementType.WALKING
        elif avg_velocity < Config.VELOCITY_DANCING:
            return MovementType.DANCING
        else:
            # High velocity movements are likely dancing
            return MovementType.DANCING
    
    def _detect_jumping(self, keypoints_sequence: List[PoseKeypoints]) -> bool:
        """
        Detect jumping motion by analyzing vertical hip movement
        
        Args:
            keypoints_sequence: List of pose keypoints
        
        Returns:
            True if jumping detected
        """
        if len(keypoints_sequence) < 5:
            return False
        
        # Get hip positions (landmarks 23 and 24)
        hip_y_positions = []
        for kp in keypoints_sequence:
            left_hip_y = kp.landmarks[23, 1]
            right_hip_y = kp.landmarks[24, 1]
            avg_hip_y = (left_hip_y + right_hip_y) / 2
            hip_y_positions.append(avg_hip_y)
        
        hip_y_positions = np.array(hip_y_positions)
        
        # Calculate vertical velocity
        vertical_velocity = np.abs(np.diff(hip_y_positions))
        
        # Jumping has high vertical velocity peaks
        max_vertical_velocity = np.max(vertical_velocity)
        
        return max_vertical_velocity > Config.VELOCITY_JUMPING
    
    def _detect_crouching(self, keypoints_sequence: List[PoseKeypoints]) -> bool:
        """
        Detect crouching by analyzing hip-to-shoulder distance
        
        Args:
            keypoints_sequence: List of pose keypoints
        
        Returns:
            True if crouching detected
        """
        if not keypoints_sequence:
            return False
        
        # Use middle frame for analysis
        mid_idx = len(keypoints_sequence) // 2
        landmarks = keypoints_sequence[mid_idx].landmarks
        
        # Calculate average shoulder position (landmarks 11, 12)
        shoulder_y = (landmarks[11, 1] + landmarks[12, 1]) / 2
        
        # Calculate average hip position (landmarks 23, 24)
        hip_y = (landmarks[23, 1] + landmarks[24, 1]) / 2
        
        # Calculate torso length
        torso_length = abs(hip_y - shoulder_y)
        
        # Crouching: torso is compressed (small torso length)
        # This is relative, so we use a threshold
        return torso_length < 0.15  # Normalized coordinates
    
    def _calculate_intensity(self, velocities: np.ndarray, 
                            movement_type: MovementType) -> float:
        """
        Calculate movement intensity on 0-100 scale
        
        Args:
            velocities: Array of velocities
            movement_type: Classified movement type
        
        Returns:
            Intensity score (0-100)
        """
        if len(velocities) == 0:
            return 0.0
        
        # Calculate base intensity from velocity
        avg_velocity = np.mean(velocities)
        velocity_std = np.std(velocities)
        
        # Normalize velocity to 0-100 scale
        # Higher velocity and variation = higher intensity
        base_intensity = min(avg_velocity * 500, 70)  # Cap at 70
        variation_bonus = min(velocity_std * 300, 30)  # Up to 30 bonus
        
        raw_intensity = base_intensity + variation_bonus
        
        # Apply movement type multipliers
        multipliers = {
            MovementType.STANDING: 0.1,
            MovementType.WALKING: 0.4,
            MovementType.DANCING: 1.0,
            MovementType.JUMPING: 1.2,
            MovementType.CROUCHING: 0.3,
            MovementType.UNKNOWN: 0.5
        }
        
        intensity = raw_intensity * multipliers.get(movement_type, 1.0)
        
        # Clamp to 0-100 range
        return np.clip(intensity, 0, 100)
    
    def _calculate_body_part_activity(self, 
                                     keypoints_sequence: List[PoseKeypoints]) -> Dict[str, float]:
        """
        Calculate activity level for each body part
        
        Args:
            keypoints_sequence: List of pose keypoints
        
        Returns:
            Dictionary mapping body part names to activity scores (0-100)
        """
        if len(keypoints_sequence) < 2:
            return {part: 0.0 for part in self.BODY_PARTS.keys()}
        
        activity_scores = {}
        
        for part_name, landmark_indices in self.BODY_PARTS.items():
            total_movement = 0.0
            
            # Calculate movement for this body part across all frames
            for i in range(1, len(keypoints_sequence)):
                prev_landmarks = keypoints_sequence[i-1].landmarks[landmark_indices, :2]
                curr_landmarks = keypoints_sequence[i].landmarks[landmark_indices, :2]
                
                # Calculate average movement for this body part
                displacement = np.linalg.norm(curr_landmarks - prev_landmarks, axis=1)
                avg_displacement = np.mean(displacement)
                
                total_movement += avg_displacement
            
            # Normalize to 0-100 scale
            avg_movement = total_movement / (len(keypoints_sequence) - 1)
            activity_score = min(avg_movement * 1000, 100)  # Scale and cap at 100
            
            activity_scores[part_name] = activity_score
        
        return activity_scores
    
    def get_movement_summary(self) -> Dict[str, any]:
        """
        Get summary statistics of all analyzed movements
        
        Returns:
            Dictionary with summary statistics
        """
        if not self.movement_history:
            return {
                "total_sequences": 0,
                "average_intensity": 0.0,
                "movement_distribution": {},
                "most_active_body_part": "none"
            }
        
        # Count movement types
        movement_counts = {}
        for metrics in self.movement_history:
            movement_type = metrics.movement_type.value
            movement_counts[movement_type] = movement_counts.get(movement_type, 0) + 1
        
        # Calculate average intensity
        avg_intensity = np.mean([m.intensity for m in self.movement_history])
        
        # Find most active body part across all sequences
        all_body_parts = {}
        for metrics in self.movement_history:
            for part, activity in metrics.body_part_activity.items():
                if part not in all_body_parts:
                    all_body_parts[part] = []
                all_body_parts[part].append(activity)
        
        avg_body_part_activity = {
            part: np.mean(activities) 
            for part, activities in all_body_parts.items()
        }
        
        most_active_part = max(avg_body_part_activity.items(), key=lambda x: x[1])[0]
        
        return {
            "total_sequences": len(self.movement_history),
            "average_intensity": round(avg_intensity, 2),
            "movement_distribution": movement_counts,
            "most_active_body_part": most_active_part,
            "avg_body_part_activity": {
                k: round(v, 2) for k, v in avg_body_part_activity.items()
            }
        }
    
    def detect_rhythm_patterns(self, keypoints_sequence: List[PoseKeypoints], 
                              fps: float) -> Dict[str, any]:
        """
        Detect rhythmic patterns in movement (basic beat detection)
        
        Args:
            keypoints_sequence: List of pose keypoints
            fps: Video frames per second
        
        Returns:
            Dictionary with rhythm analysis
        """
        if len(keypoints_sequence) < 10:
            return {"has_rhythm": False, "estimated_bpm": 0}
        
        # Calculate velocities
        velocities = self._calculate_velocities(keypoints_sequence)
        
        # Apply smoothing
        if len(velocities) > self.smoothing_window:
            kernel = np.ones(self.smoothing_window) / self.smoothing_window
            smoothed_velocities = np.convolve(velocities, kernel, mode='valid')
        else:
            smoothed_velocities = velocities
        
        # Find peaks in velocity (potential beats)
        peaks = self._find_peaks(smoothed_velocities)
        
        if len(peaks) < 2:
            return {"has_rhythm": False, "estimated_bpm": 0}
        
        # Calculate average time between peaks
        peak_intervals = np.diff(peaks) / fps  # Convert to seconds
        avg_interval = np.mean(peak_intervals)
        
        # Calculate BPM (beats per minute)
        bpm = safe_divide(60, avg_interval, 0)
        
        # Check if rhythm is consistent (low standard deviation)
        interval_std = np.std(peak_intervals)
        is_rhythmic = interval_std < (avg_interval * 0.3)  # Within 30% variation
        
        return {
            "has_rhythm": is_rhythmic,
            "estimated_bpm": round(bpm, 1),
            "peak_count": len(peaks),
            "rhythm_consistency": round(1 - (interval_std / avg_interval), 2) if avg_interval > 0 else 0
        }
    
    def _find_peaks(self, signal: np.ndarray, threshold_percentile: float = 70) -> np.ndarray:
        """
        Find peaks in a signal (simple peak detection)
        
        Args:
            signal: 1D signal array
            threshold_percentile: Percentile threshold for peak detection
        
        Returns:
            Array of peak indices
        """
        if len(signal) < 3:
            return np.array([])
        
        # Calculate threshold
        threshold = np.percentile(signal, threshold_percentile)
        
        peaks = []
        for i in range(1, len(signal) - 1):
            # Peak: higher than neighbors and above threshold
            if (signal[i] > signal[i-1] and 
                signal[i] > signal[i+1] and 
                signal[i] > threshold):
                peaks.append(i)
        
        return np.array(peaks)
    
    def calculate_movement_smoothness(self, keypoints_sequence: List[PoseKeypoints]) -> float:
        """
        Calculate smoothness of movement (lower jerk = smoother)
        
        Args:
            keypoints_sequence: List of pose keypoints
        
        Returns:
            Smoothness score (0-100, higher is smoother)
        """
        if len(keypoints_sequence) < 3:
            return 100.0  # Not enough data
        
        # Calculate velocities
        velocities = self._calculate_velocities(keypoints_sequence)
        
        if len(velocities) < 2:
            return 100.0
        
        # Calculate jerk (rate of change of velocity)
        jerk = np.abs(np.diff(velocities))
        avg_jerk = np.mean(jerk)
        
        # Convert to smoothness score (inverse of jerk)
        # Lower jerk = higher smoothness
        smoothness = max(0, 100 - (avg_jerk * 1000))
        
        return round(smoothness, 2)
    
    def _create_empty_metrics(self) -> MovementMetrics:
        """Create empty metrics for cases with no data"""
        return MovementMetrics(
            movement_type=MovementType.UNKNOWN,
            intensity=0.0,
            velocity=0.0,
            body_part_activity={part: 0.0 for part in self.BODY_PARTS.keys()},
            frame_range=(0, 0)
        )
    
    def reset(self):
        """Reset movement history"""
        self.movement_history.clear()
        logger.info("MovementClassifier reset")