File size: 19,149 Bytes
a31294b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
36537b9
 
a31294b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7e08013
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a31294b
 
 
 
7e08013
 
 
 
 
a31294b
7e08013
a31294b
 
 
7e08013
 
 
 
 
a31294b
 
36537b9
a31294b
 
 
7e08013
 
 
 
 
 
 
 
a31294b
 
 
 
 
 
36537b9
 
a31294b
 
36537b9
a31294b
 
 
 
 
36537b9
 
 
a31294b
 
 
 
 
 
36537b9
a31294b
 
 
36537b9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7e08013
 
 
 
 
 
 
 
a31294b
 
 
 
36537b9
a31294b
 
36537b9
 
a31294b
 
 
 
 
 
 
 
 
 
 
 
 
 
36537b9
 
 
 
a31294b
 
 
 
36537b9
 
 
a31294b
 
 
 
36537b9
a31294b
 
 
7e08013
 
 
 
 
 
a31294b
 
 
7e08013
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a31294b
 
 
 
 
 
 
 
36537b9
a31294b
 
 
36537b9
a31294b
 
 
36537b9
a31294b
36537b9
a31294b
 
36537b9
a31294b
 
36537b9
a31294b
 
 
 
 
 
36537b9
a31294b
 
 
 
 
 
 
36537b9
a31294b
 
36537b9
a31294b
 
 
36537b9
a31294b
 
 
 
36537b9
 
a31294b
 
 
 
 
 
 
 
 
36537b9
 
a31294b
36537b9
 
 
a31294b
 
 
 
 
 
 
 
 
 
36537b9
 
 
a31294b
 
36537b9
 
 
a31294b
36537b9
 
 
 
 
 
 
 
a31294b
 
 
36537b9
 
 
 
a31294b
36537b9
a31294b
7e08013
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
36537b9
 
a31294b
36537b9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a31294b
36537b9
a31294b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
36537b9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
"""
Laban Movement Analysis (LMA) inspired notation engine.
Computes movement metrics like direction, intensity, and speed from pose keypoints.
"""

import numpy as np
from typing import List, Dict, Optional, Tuple, Any
from dataclasses import dataclass
from enum import Enum

from .pose_estimation import PoseResult, Keypoint


class Direction(Enum):
    """Movement direction categories."""
    UP = "up"
    DOWN = "down"
    LEFT = "left"
    RIGHT = "right"
    FORWARD = "forward"  # Note: Not fully implemented with 2D keypoints
    BACKWARD = "backward" # Note: Not fully implemented with 2D keypoints
    STATIONARY = "stationary"


class Intensity(Enum):
    """Movement intensity levels."""
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"


class Speed(Enum):
    """Movement speed categories."""
    SLOW = "slow"
    MODERATE = "moderate"
    FAST = "fast"


@dataclass
class MovementMetrics:
    """LMA-inspired movement metrics for a frame or segment."""
    frame_index: int
    timestamp: Optional[float] = None
    
    # Primary metrics
    direction: Direction = Direction.STATIONARY
    intensity: Intensity = Intensity.LOW
    speed: Speed = Speed.SLOW
    
    # Numeric values
    velocity: float = 0.0  # pixels/second or normalized units
    acceleration: float = 0.0
    
    # Additional qualities
    fluidity: float = 0.0  # 0-1, smoothness of movement
    expansion: float = 0.0  # 0-1, how spread out the pose is
    
    # Raw displacement data
    center_displacement: Optional[Tuple[float, float]] = None
    total_displacement: float = 0.0


class SimpleKalmanFilter:
    """Lightweight Kalman filter for position/velocity tracking."""
    
    def __init__(self, process_noise: float = 0.01, measurement_noise: float = 0.1):
        self.process_noise = process_noise
        self.measurement_noise = measurement_noise
        self.is_initialized = False
        
        # State: [x, y, vx, vy]
        self.state = np.zeros(4)
        self.covariance = np.eye(4) * 0.1
        
        # Transition matrix (constant velocity model)
        self.F = np.array([[1, 0, 1, 0],
                          [0, 1, 0, 1],
                          [0, 0, 1, 0],
                          [0, 0, 0, 1]])
        
        # Measurement matrix (observe position only)
        self.H = np.array([[1, 0, 0, 0],
                          [0, 1, 0, 0]])
        
        # Process noise matrix
        self.Q = np.eye(4) * process_noise
        
        # Measurement noise matrix
        self.R = np.eye(2) * measurement_noise
    
    def predict(self, dt: float = 1.0):
        """Predict next state."""
        # Update transition matrix with time step
        self.F[0, 2] = dt
        self.F[1, 3] = dt
        
        # Predict state
        self.state = self.F @ self.state
        self.covariance = self.F @ self.covariance @ self.F.T + self.Q
    
    def update(self, measurement: Tuple[float, float]):
        """Update with measurement."""
        z = np.array([measurement[0], measurement[1]])
        
        if not self.is_initialized:
            self.state[:2] = z
            self.is_initialized = True
            return
        
        # Calculate Kalman gain
        S = self.H @ self.covariance @ self.H.T + self.R
        K = self.covariance @ self.H.T @ np.linalg.inv(S)
        
        # Update state
        y = z - self.H @ self.state
        self.state = self.state + K @ y
        self.covariance = (np.eye(4) - K @ self.H) @ self.covariance
    
    def get_position(self) -> Tuple[float, float]:
        """Get filtered position."""
        return (self.state[0], self.state[1])
    
    def get_velocity(self) -> Tuple[float, float]:
        """Get filtered velocity."""
        return (self.state[2], self.state[3])


class MovementAnalyzer:
    """Analyzes pose sequences to extract LMA-style movement metrics."""
    
    def __init__(self, fps: float = 30.0, 
                 velocity_threshold_slow: float = 0.01,
                 velocity_threshold_fast: float = 0.1,
                 intensity_accel_threshold: float = 0.05,
                 use_kalman_filter: bool = True,
                 use_adaptive_thresholds: bool = True):
        """
        Initialize movement analyzer with advanced features.
        
        Args:
            fps: Frames per second of the video
            velocity_threshold_slow: Initial threshold for slow movement
            velocity_threshold_fast: Initial threshold for fast movement  
            intensity_accel_threshold: Initial acceleration threshold for intensity
            use_kalman_filter: Whether to use Kalman filtering for tracking
            use_adaptive_thresholds: Whether to adapt thresholds based on video content
        """
        self.fps = fps
        self.frame_duration = 1.0 / fps if fps > 0 else 0.0
        self.velocity_threshold_slow = velocity_threshold_slow
        self.velocity_threshold_fast = velocity_threshold_fast
        self.intensity_accel_threshold = intensity_accel_threshold
        self.use_kalman_filter = use_kalman_filter
        self.use_adaptive_thresholds = use_adaptive_thresholds
        
        # Kalman filter for tracking
        self.kalman_filter = SimpleKalmanFilter() if use_kalman_filter else None
        
        # Adaptive threshold parameters
        self.adaptive_thresholds_computed = False
    
    def analyze_movement(self, pose_sequence: List[List[PoseResult]]) -> List[MovementMetrics]:
        """
        Analyze a sequence of poses to compute movement metrics.
        
        Args:
            pose_sequence: List of pose results per frame. Each inner list can contain
                           multiple PoseResult if multiple people are detected.
            
        Returns:
            List of movement metrics per frame (currently for the first detected person).
        """
        if not pose_sequence:
            return []
        
        metrics = []
        # For multi-person, these would need to be dictionaries mapping person_id to values
        prev_centers = None # Store as {person_id: center_coords} for multi-person
        prev_velocity = None # Store as {person_id: velocity_value} for multi-person
        
        for frame_idx, frame_poses in enumerate(pose_sequence):
            if not frame_poses:
                # No pose detected in this frame
                metrics.append(MovementMetrics(
                    frame_index=frame_idx,
                    timestamp=frame_idx * self.frame_duration if self.frame_duration else None
                ))
                continue
            
            # --- CURRENT: Analyze first person only ---
            # TODO: Extend to multi-person analysis. This would involve iterating
            # through frame_poses and tracking metrics for each person_id.
            pose = frame_poses[0] 
            # --- END CURRENT ---

            if not pose.keypoints: # Ensure pose object has keypoints
                 metrics.append(MovementMetrics(
                    frame_index=frame_idx,
                    timestamp=frame_idx * self.frame_duration if self.frame_duration else None
                ))
                 # Reset for next frame if this person was being tracked
                 prev_centers = None # Or prev_centers.pop(person_id, None)
                 prev_velocity = None # Or prev_velocity.pop(person_id, None)
                 continue

            # Compute confidence-weighted body center
            center = self._compute_body_center_weighted(pose.keypoints)
            
            # Apply Kalman filtering if enabled
            if self.kalman_filter:
                self.kalman_filter.predict(self.frame_duration)
                self.kalman_filter.update(center)
                center = self.kalman_filter.get_position()
            
            # Initialize metrics for this frame
            frame_metrics = MovementMetrics(
                frame_index=frame_idx,
                timestamp=frame_idx * self.frame_duration if self.frame_duration else None
            )
            
            # Displacement, velocity, etc. can only be computed if there's a previous frame's center
            if prev_centers is not None and frame_idx > 0 and self.fps > 0:
                displacement = (
                    center[0] - prev_centers[0],
                    center[1] - prev_centers[1]
                )
                frame_metrics.center_displacement = displacement
                frame_metrics.total_displacement = np.sqrt(
                    displacement[0]**2 + displacement[1]**2
                )
                
                frame_metrics.velocity = frame_metrics.total_displacement * self.fps
                frame_metrics.direction = self._compute_direction(displacement)
                frame_metrics.speed = self._categorize_speed(frame_metrics.velocity)
                
                if prev_velocity is not None:
                    # Acceleration (change in velocity per second)
                    delta_velocity = frame_metrics.velocity - prev_velocity
                    frame_metrics.acceleration = delta_velocity * self.fps # (units/s)/s = units/s^2
                    
                    frame_metrics.intensity = self._compute_intensity(
                        frame_metrics.acceleration,
                        frame_metrics.velocity
                    )
                    frame_metrics.fluidity = self._compute_fluidity(
                        frame_metrics.acceleration
                    )
            
            frame_metrics.expansion = self._compute_expansion(pose.keypoints)
            metrics.append(frame_metrics)
            
            # Update for next iteration (for the currently tracked person)
            prev_centers = center
            prev_velocity = frame_metrics.velocity
        
        # Apply adaptive thresholds if enabled
        if self.use_adaptive_thresholds and not self.adaptive_thresholds_computed:
            self._compute_adaptive_thresholds(metrics)
            # Recompute speed and intensity with new thresholds
            metrics = self._recompute_with_adaptive_thresholds(metrics)
        
        metrics = self._smooth_metrics(metrics)
        return metrics
    
    def _compute_body_center_weighted(self, keypoints: List[Keypoint]) -> Tuple[float, float]:
        """Compute confidence-weighted center of mass of the body."""
        major_joints = ["left_hip", "right_hip", "left_shoulder", "right_shoulder"]
        
        weighted_x = 0.0
        weighted_y = 0.0
        total_weight = 0.0
        
        for kp in keypoints:
            if kp.name in major_joints and kp.confidence > 0.5:
                weight = kp.confidence
                weighted_x += kp.x * weight
                weighted_y += kp.y * weight
                total_weight += weight
        
        # Fallback to all keypoints if no major joints found
        if total_weight == 0:
            for kp in keypoints:
                if kp.confidence > 0.3:
                    weight = kp.confidence
                    weighted_x += kp.x * weight
                    weighted_y += kp.y * weight
                    total_weight += weight
        
        if total_weight > 0:
            return (weighted_x / total_weight, weighted_y / total_weight)
        return (0.5, 0.5)
    
    def _compute_body_center(self, keypoints: List[Keypoint]) -> Tuple[float, float]:
        """Compute the center of mass of the body."""
        major_joints = ["left_hip", "right_hip", "left_shoulder", "right_shoulder"]
        
        x_coords = []
        y_coords = []
        
        for kp in keypoints:
            if kp.confidence > 0.5 and kp.name in major_joints: # Ensure kp.name is not None
                x_coords.append(kp.x)
                y_coords.append(kp.y)
        
        if not x_coords or not y_coords: # If no major joints, try all keypoints
            x_coords = [kp.x for kp in keypoints if kp.confidence > 0.3]
            y_coords = [kp.y for kp in keypoints if kp.confidence > 0.3]
        
        if x_coords and y_coords: # Check if lists are non-empty
            return (np.mean(x_coords), np.mean(y_coords))
        return (0.5, 0.5)  # Default center if no reliable keypoints
    
    def _get_limb_positions(self, keypoints: List[Keypoint]) -> Dict[str, Tuple[float, float]]:
        """Get positions of major limbs. (Currently not heavily used beyond potential debugging)"""
        positions = {}
        for kp in keypoints:
            if kp.confidence > 0.3 and kp.name:
                positions[kp.name] = (kp.x, kp.y)
        return positions
    
    def _compute_direction(self, displacement: Tuple[float, float]) -> Direction:
        """Compute movement direction from displacement vector."""
        dx, dy = displacement
        threshold = 0.005 # Normalized units per frame
        
        if abs(dx) < threshold and abs(dy) < threshold:
            return Direction.STATIONARY
        
        if abs(dx) > abs(dy):
            return Direction.RIGHT if dx > 0 else Direction.LEFT
        else:
            return Direction.DOWN if dy > 0 else Direction.UP # dy positive is typically down in image coords
    
    def _categorize_speed(self, velocity: float) -> Speed:
        """Categorize velocity into speed levels (velocity is in normalized units/sec)."""
        if velocity < self.velocity_threshold_slow:
            return Speed.SLOW
        elif velocity < self.velocity_threshold_fast:
            return Speed.MODERATE # Corrected from Speed.FAST
        else:
            return Speed.FAST
    
    def _compute_intensity(self, acceleration: float, velocity: float) -> Intensity:
        """Compute movement intensity (accel in norm_units/sec^2, vel in norm_units/sec)."""
        # Thresholds are relative to normalized space and per-second metrics
        if acceleration > self.intensity_accel_threshold * 2 or velocity > self.velocity_threshold_fast:
            return Intensity.HIGH
        elif acceleration > self.intensity_accel_threshold or velocity > self.velocity_threshold_slow:
            return Intensity.MEDIUM
        else:
            return Intensity.LOW
    
    def _compute_fluidity(self, acceleration: float) -> float:
        """
        Compute fluidity score (0-1) based on acceleration (norm_units/sec^2).
        Lower acceleration = higher fluidity.
        """
        max_expected_accel = 0.2 # This is an assumption for normalization, might need tuning.
                                 # Represents a fairly high acceleration in normalized units/sec^2.
        norm_accel = min(abs(acceleration) / max_expected_accel, 1.0) if max_expected_accel > 0 else 0.0
        return 1.0 - norm_accel
    
    def _compute_expansion(self, keypoints: List[Keypoint]) -> float:
        """
        Compute how expanded/contracted the pose is.
        Returns 0-1 where 1 is fully expanded.
        """
        limb_pairs = [
            ("left_wrist", "right_wrist"),
            ("left_ankle", "right_ankle"),
            ("left_wrist", "left_ankle"), 
            ("right_wrist", "right_ankle"),
            # Could add torso diagonals like ("left_shoulder", "right_hip")
        ]
        
        kp_dict = {kp.name: kp for kp in keypoints if kp.confidence > 0.3 and kp.name}
        if not kp_dict: return 0.5 # No reliable keypoints

        distances = []
        for name1, name2 in limb_pairs:
            if name1 in kp_dict and name2 in kp_dict:
                kp1 = kp_dict[name1]
                kp2 = kp_dict[name2]
                # Ensure coordinates are not NaN before calculation
                if not (np.isnan(kp1.x) or np.isnan(kp1.y) or np.isnan(kp2.x) or np.isnan(kp2.y)):
                    dist = np.sqrt((kp1.x - kp2.x)**2 + (kp1.y - kp2.y)**2)
                    distances.append(dist)
        
        if distances:
            avg_dist = np.mean(distances)
            # Max expected distance (e.g., diagonal of normalized 1x1 space is sqrt(2) approx 1.414)
            # This assumes keypoints are normalized.
            max_possible_dist_heuristic = 1.0 # A more conservative heuristic than 1.4, as limbs rarely span the full diagonal.
            return min(avg_dist / max_possible_dist_heuristic, 1.0) if max_possible_dist_heuristic > 0 else 0.0
        
        return 0.5  # Default neutral expansion if no valid pairs
    
    def _compute_adaptive_thresholds(self, metrics: List[MovementMetrics]):
        """Compute adaptive thresholds based on movement characteristics in the video."""
        velocities = [m.velocity for m in metrics if m.velocity > 0]
        accelerations = [abs(m.acceleration) for m in metrics if m.acceleration != 0]
        
        if velocities:
            velocity_percentiles = np.percentile(velocities, [25, 75])
            self.velocity_threshold_slow = max(velocity_percentiles[0], 0.005)
            self.velocity_threshold_fast = velocity_percentiles[1]
        
        if accelerations:
            accel_75th = np.percentile(accelerations, 75)
            self.intensity_accel_threshold = max(accel_75th * 0.5, 0.01)
        
        self.adaptive_thresholds_computed = True
    
    def _recompute_with_adaptive_thresholds(self, metrics: List[MovementMetrics]) -> List[MovementMetrics]:
        """Recompute speed and intensity classifications with adaptive thresholds."""
        for metric in metrics:
            if metric.velocity > 0:
                metric.speed = self._categorize_speed(metric.velocity)
                metric.intensity = self._compute_intensity(metric.acceleration, metric.velocity)
        return metrics
    
    def _smooth_metrics(self, metrics_list: List[MovementMetrics]) -> List[MovementMetrics]:
        """Apply smoothing to reduce noise in metrics using a simple moving average."""
        window_size = 3
        num_metrics = len(metrics_list)

        if num_metrics <= window_size:
            return metrics_list

        smoothed_metrics_list = metrics_list[:] # Work on a copy

        # Fields to smooth
        fields_to_smooth = ["velocity", "acceleration", "fluidity", "expansion"]

        for i in range(num_metrics):
            start_idx = max(0, i - window_size // 2)
            end_idx = min(num_metrics, i + window_size // 2 + 1)
            window = metrics_list[start_idx:end_idx]

            if not window: continue

            for field in fields_to_smooth:
                values = [getattr(m, field) for m in window if hasattr(m, field)]
                if values:
                    setattr(smoothed_metrics_list[i], field, np.mean(values))
        
        return smoothed_metrics_list


def analyze_pose_sequence(pose_sequence: List[List[PoseResult]], 
                         fps: float = 30.0) -> List[MovementMetrics]:
    """
    Convenience function to analyze a pose sequence.
    
    Args:
        pose_sequence: List of pose results per frame
        fps: Video frame rate
        
    Returns:
        List of movement metrics
    """
    analyzer = MovementAnalyzer(fps=fps)
    return analyzer.analyze_movement(pose_sequence)