Spaces:
Running
on
Zero
Running
on
Zero
| """Physics constraint layers for motor dynamics in MANIFOLD.""" | |
| from __future__ import annotations | |
| import math | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from typing import Optional, Dict, Any | |
| from dataclasses import dataclass | |
| # Human motor control limits (empirically derived) | |
| DEFAULT_MAX_TURN_RATE = 5.0 # degrees per tick (128 tick = 640 deg/sec) | |
| DEFAULT_MAX_ACCELERATION = 50.0 # degrees per tick² | |
| DEFAULT_MIN_REACTION_MS = 150.0 # milliseconds | |
| DEFAULT_FITTS_A = 0.0 # intercept (seconds) | |
| DEFAULT_FITTS_B = 0.1 # slope (seconds per bit) | |
| class PhysicsConstraints: | |
| """Container for physics constraint parameters.""" | |
| max_turn_rate: float = DEFAULT_MAX_TURN_RATE | |
| max_acceleration: float = DEFAULT_MAX_ACCELERATION | |
| min_reaction_ms: float = DEFAULT_MIN_REACTION_MS | |
| fitts_a: float = DEFAULT_FITTS_A | |
| fitts_b: float = DEFAULT_FITTS_B | |
| def compute_jerk(trajectory: torch.Tensor) -> torch.Tensor: | |
| """ | |
| Compute jerk (rate of change of acceleration) from trajectory. | |
| Jerk is key for detecting aimbots - human movements have bounded jerk, | |
| while aimbots often have infinite jerk at snap points. | |
| Args: | |
| trajectory: [batch, seq, 2] mouse deltas (dx, dy per tick) | |
| Returns: | |
| Jerk tensor [batch, seq-2] | |
| """ | |
| # Velocity (trajectory is already velocity as deltas) | |
| velocity = torch.norm(trajectory, dim=-1) # [batch, seq] | |
| # Acceleration = dv/dt | |
| acceleration = torch.diff(velocity, dim=-1) # [batch, seq-1] | |
| # Jerk = da/dt | |
| jerk = torch.diff(acceleration, dim=-1) # [batch, seq-2] | |
| return jerk | |
| def compute_jerk_violation( | |
| trajectory: torch.Tensor, | |
| max_jerk: float = 100.0, | |
| ) -> torch.Tensor: | |
| """ | |
| Compute jerk violation penalty. | |
| Args: | |
| trajectory: [batch, seq, 2] mouse deltas | |
| max_jerk: Maximum allowed jerk magnitude | |
| Returns: | |
| Violation score [batch] - higher = more violation | |
| """ | |
| jerk = compute_jerk(trajectory) | |
| # Soft violation: ReLU over threshold, then mean | |
| violations = F.relu(torch.abs(jerk) - max_jerk) | |
| # Mean violation per sequence | |
| return violations.mean(dim=-1) | |
| def compute_fitts_violation( | |
| movement_time: torch.Tensor, | |
| distance: torch.Tensor, | |
| target_width: torch.Tensor, | |
| fitts_a: float = DEFAULT_FITTS_A, | |
| fitts_b: float = DEFAULT_FITTS_B, | |
| ) -> torch.Tensor: | |
| """ | |
| Compute Fitts' Law violation. | |
| MT = a + b * log2(2D/W + 1) | |
| Violation = ReLU(expected_MT - actual_MT) | |
| (faster than Fitts predicts = suspicious) | |
| Args: | |
| movement_time: Actual movement time [batch] | |
| distance: Movement distance [batch] | |
| target_width: Target size [batch] | |
| Returns: | |
| Violation score [batch] | |
| """ | |
| # Index of difficulty | |
| id = torch.log2(2 * distance / (target_width + 1e-6) + 1) | |
| # Expected time from Fitts' Law | |
| expected_time = fitts_a + fitts_b * id | |
| # Violation: faster than humanly possible | |
| violation = F.relu(expected_time - movement_time) | |
| return violation | |
| def compute_reaction_time_violation( | |
| reaction_times: torch.Tensor, | |
| min_reaction_ms: float = DEFAULT_MIN_REACTION_MS, | |
| ) -> torch.Tensor: | |
| """ | |
| Compute reaction time violation. | |
| Human physiological minimum is ~100-150ms for visual-motor response. | |
| Consistently faster reactions indicate artificial assistance. | |
| Args: | |
| reaction_times: Reaction times in ms [batch, n_events] | |
| min_reaction_ms: Minimum human reaction time | |
| Returns: | |
| Violation score [batch] | |
| """ | |
| # Violation: faster than humanly possible | |
| violations = F.relu(min_reaction_ms - reaction_times) | |
| # Mean violation per batch | |
| return violations.mean(dim=-1) | |
| class PhysicsConstraintLayer(nn.Module): | |
| """ | |
| Learnable physics constraint layer. | |
| Learns soft constraint thresholds while computing violations. | |
| """ | |
| def __init__( | |
| self, | |
| init_max_turn_rate: float = DEFAULT_MAX_TURN_RATE, | |
| init_max_acceleration: float = DEFAULT_MAX_ACCELERATION, | |
| init_min_reaction_ms: float = DEFAULT_MIN_REACTION_MS, | |
| learnable: bool = True, | |
| ): | |
| super().__init__() | |
| # Learnable parameters (in log space for positivity) | |
| if learnable: | |
| self.log_max_turn_rate = nn.Parameter(torch.tensor(math.log(init_max_turn_rate))) | |
| self.log_max_acceleration = nn.Parameter(torch.tensor(math.log(init_max_acceleration))) | |
| self.log_min_reaction_ms = nn.Parameter(torch.tensor(math.log(init_min_reaction_ms))) | |
| else: | |
| self.register_buffer("log_max_turn_rate", torch.tensor(math.log(init_max_turn_rate))) | |
| self.register_buffer("log_max_acceleration", torch.tensor(math.log(init_max_acceleration))) | |
| self.register_buffer("log_min_reaction_ms", torch.tensor(math.log(init_min_reaction_ms))) | |
| def max_turn_rate(self) -> torch.Tensor: | |
| return torch.exp(self.log_max_turn_rate) | |
| def max_acceleration(self) -> torch.Tensor: | |
| return torch.exp(self.log_max_acceleration) | |
| def min_reaction_ms(self) -> torch.Tensor: | |
| return torch.exp(self.log_min_reaction_ms) | |
| def forward( | |
| self, | |
| trajectory: torch.Tensor, | |
| reaction_times: Optional[torch.Tensor] = None, | |
| movement_times: Optional[torch.Tensor] = None, | |
| distances: Optional[torch.Tensor] = None, | |
| target_widths: Optional[torch.Tensor] = None, | |
| ) -> Dict[str, torch.Tensor]: | |
| """ | |
| Compute physics constraint violations. | |
| Args: | |
| trajectory: [batch, seq, 2] mouse deltas | |
| reaction_times: [batch, n] reaction times in ms | |
| movement_times: [batch] for Fitts' law | |
| distances: [batch] for Fitts' law | |
| target_widths: [batch] for Fitts' law | |
| Returns: | |
| Dict with violation scores and total | |
| """ | |
| violations = {} | |
| # Jerk violation | |
| jerk = compute_jerk(trajectory) | |
| # Max jerk derived from max_acceleration | |
| max_jerk = self.max_acceleration * 2 # Rough estimate | |
| jerk_violation = F.relu(torch.abs(jerk) - max_jerk).mean(dim=-1) | |
| violations["jerk_violation"] = jerk_violation | |
| # Turn rate violation | |
| velocity = torch.norm(trajectory, dim=-1) | |
| turn_rate_violation = F.relu(velocity - self.max_turn_rate).mean(dim=-1) | |
| violations["turn_rate_violation"] = turn_rate_violation | |
| # Acceleration violation | |
| acceleration = torch.diff(velocity, dim=-1) | |
| accel_violation = F.relu(torch.abs(acceleration) - self.max_acceleration).mean(dim=-1) | |
| violations["acceleration_violation"] = accel_violation | |
| # Reaction time violation (if provided) | |
| if reaction_times is not None: | |
| rt_violation = compute_reaction_time_violation(reaction_times, self.min_reaction_ms.item()) | |
| violations["reaction_time_violation"] = rt_violation | |
| # Fitts' Law violation (if provided) | |
| if movement_times is not None and distances is not None and target_widths is not None: | |
| fitts_violation = compute_fitts_violation(movement_times, distances, target_widths) | |
| violations["fitts_violation"] = fitts_violation | |
| # Total violation (sum of all) | |
| total = sum(v.mean() for v in violations.values()) | |
| violations["total_violation"] = total | |
| return violations | |
| def get_constraints(self) -> Dict[str, float]: | |
| """Get current constraint values.""" | |
| return { | |
| "max_turn_rate": self.max_turn_rate.item(), | |
| "max_acceleration": self.max_acceleration.item(), | |
| "min_reaction_ms": self.min_reaction_ms.item(), | |
| } | |
| class SignalDependentNoiseChecker(nn.Module): | |
| """ | |
| Check if trajectory noise follows signal-dependent pattern (Weber's Law). | |
| Human motor noise scales with movement magnitude. | |
| Artificial noise is often constant (uniform) - detectable artifact. | |
| """ | |
| def __init__(self, expected_k: float = 0.1): | |
| super().__init__() | |
| self.expected_k = expected_k | |
| def forward(self, trajectory: torch.Tensor) -> Dict[str, torch.Tensor]: | |
| """ | |
| Analyze noise pattern in trajectory. | |
| Args: | |
| trajectory: [batch, seq, 2] mouse deltas | |
| Returns: | |
| Dict with noise analysis metrics | |
| """ | |
| # Compute velocity magnitude | |
| velocity = torch.norm(trajectory, dim=-1) # [batch, seq] | |
| # Compute local variance (rolling window) | |
| window = 5 | |
| if trajectory.shape[1] > window: | |
| # Simple variance estimation | |
| variance = torch.zeros_like(velocity) | |
| for i in range(window, trajectory.shape[1]): | |
| window_data = velocity[:, i-window:i] | |
| variance[:, i] = window_data.var(dim=-1) | |
| # Check if variance correlates with velocity (Weber's Law) | |
| mean_vel = velocity[:, window:].mean(dim=-1) | |
| mean_var = variance[:, window:].mean(dim=-1) | |
| # Ratio should be approximately k² for human (Weber's Law) | |
| noise_ratio = torch.sqrt(mean_var + 1e-8) / (mean_vel + 1e-8) | |
| else: | |
| noise_ratio = torch.zeros(trajectory.shape[0], device=trajectory.device) | |
| return { | |
| "noise_ratio": noise_ratio, | |
| "expected_k": torch.tensor(self.expected_k, device=trajectory.device), | |
| } | |