SwipeALot-base / preprocessing.py
dleemiller's picture
Upload folder using huggingface_hub
b121266 verified
"""Shared preprocessing utilities for swipe path data.
This module provides a single source of truth for path preprocessing,
used by both the training dataset and the HuggingFace processor.
"""
import numpy as np
def preprocess_raw_path_to_features(
data_points: list[dict],
max_len: int,
*,
resample_mode: str = "spatial",
dt_clamp_min_ms: float = 1.0,
dt_clamp_max_ms: float = 200.0,
) -> tuple[np.ndarray, np.ndarray]:
"""Convert a raw `{"x","y","t"}` path to fixed-length engineered features.
This is the fast path used by training and the HuggingFace processor. It avoids
building an intermediate list-of-dicts representation by:
1) extracting x/y/t arrays once,
2) resampling x/y using spatial- or time-uniform interpolation,
3) recomputing dx/dy/ds and log_dt on the resampled trajectory.
Args:
data_points: Raw path as a list of dicts with keys: "x", "y", "t".
max_len: Target length.
resample_mode: "spatial" (arc-length) or "time" (cumulative dt).
dt_clamp_min_ms: Clamp for dt feature after resampling (first dt remains 0).
dt_clamp_max_ms: Clamp for dt feature after resampling.
Returns:
(features, mask) where:
- features: [max_len, 6] float32 array (x, y, dx, dy, ds, log_dt)
- mask: [max_len] int64 array (1 for valid; all-ones for non-empty paths)
"""
num_points = len(data_points)
if num_points == 0:
return (
np.zeros((max_len, 6), dtype=np.float32),
np.zeros(max_len, dtype=np.int64),
)
x = np.fromiter((p["x"] for p in data_points), dtype=np.float64, count=num_points)
y = np.fromiter((p["y"] for p in data_points), dtype=np.float64, count=num_points)
t = np.fromiter((p["t"] for p in data_points), dtype=np.float64, count=num_points)
x = np.clip(x, 0.0, 1.0)
y = np.clip(y, 0.0, 1.0)
# Per-step deltas and axes for resampling
dx_in = np.concatenate([[0.0], np.diff(x)])
dy_in = np.concatenate([[0.0], np.diff(y)])
ds_in = np.hypot(dx_in, dy_in)
dt_raw_in = np.concatenate([[0.0], np.diff(t)])
s = np.cumsum(ds_in)
tau = np.cumsum(dt_raw_in)
if resample_mode not in {"spatial", "time"}:
raise ValueError(f"Unknown resample_mode={resample_mode!r} (use 'spatial' or 'time')")
eps = 1e-12
if resample_mode == "time" and tau[-1] > eps:
target_tau = np.linspace(0.0, float(tau[-1]), max_len, dtype=np.float64)
x_r = np.interp(target_tau, tau, x)
y_r = np.interp(target_tau, tau, y)
tau_r = target_tau
else:
# Spatial sampling (or fallback when time axis is degenerate).
if s[-1] <= eps:
original = np.arange(num_points, dtype=np.float64)
target = np.linspace(0, num_points - 1, max_len, dtype=np.float64)
x_r = np.interp(target, original, x)
y_r = np.interp(target, original, y)
tau_r = np.interp(target, original, tau)
else:
target_s = np.linspace(0.0, float(s[-1]), max_len, dtype=np.float64)
x_r = np.interp(target_s, s, x)
y_r = np.interp(target_s, s, y)
tau_r = np.interp(target_s, s, tau)
dx = np.concatenate([[0.0], np.diff(x_r)])
dy = np.concatenate([[0.0], np.diff(y_r)])
ds = np.hypot(dx, dy)
dt_raw_r = np.concatenate([[0.0], np.diff(tau_r)])
dt_feat = np.clip(dt_raw_r, dt_clamp_min_ms, dt_clamp_max_ms)
dt_feat[0] = 0.0
log_dt = np.log1p(np.maximum(0.0, dt_feat))
mask = np.ones(max_len, dtype=np.int64)
features = np.stack([x_r, y_r, dx, dy, ds, log_dt], axis=-1).astype(np.float32)
return features, mask
def normalize_and_compute_features(
data_points: list[dict],
dt_clamp_min_ms: float = 1.0,
dt_clamp_max_ms: float = 200.0,
) -> list[dict]:
"""
Normalize coordinates and compute motion features.
Computes delta features (dx, dy, dt) and log-scaled time deltas.
First point has dx=dy=dt=0 by convention.
Args:
data_points: List of {"x", "y", "t"} dicts
dt_clamp_min_ms: Minimum dt in milliseconds (inclusive).
dt_clamp_max_ms: Maximum dt in milliseconds (inclusive).
Returns:
List of dicts with keys:
- x, y: normalized coordinates in [0, 1]
- t: raw timestamp from input (passed through)
- dx, dy: deltas in x/y
- ds: sqrt(dx^2 + dy^2)
- dt_raw: raw time delta (unclamped)
- dt: clamped time delta used for feature stability
- log_dt: log1p(dt)
"""
if not data_points:
return []
num_points = len(data_points)
x = np.fromiter((p["x"] for p in data_points), dtype=np.float64, count=num_points)
y = np.fromiter((p["y"] for p in data_points), dtype=np.float64, count=num_points)
t = np.fromiter((p["t"] for p in data_points), dtype=np.float64, count=num_points)
x = np.clip(x, 0.0, 1.0)
y = np.clip(y, 0.0, 1.0)
dx = np.concatenate([[0.0], np.diff(x)])
dy = np.concatenate([[0.0], np.diff(y)])
ds = np.hypot(dx, dy)
dt_raw = np.concatenate([[0.0], np.diff(t)])
dt = np.clip(dt_raw, dt_clamp_min_ms, dt_clamp_max_ms)
dt[0] = 0.0
log_dt = np.log1p(np.maximum(0.0, dt))
out: list[dict] = []
for i in range(num_points):
out.append(
{
"x": float(x[i]),
"y": float(y[i]),
"t": float(t[i]),
"dx": float(dx[i]),
"dy": float(dy[i]),
"ds": float(ds[i]),
"dt_raw": float(dt_raw[i]),
"dt": float(dt[i]),
"log_dt": float(log_dt[i]),
}
)
return out
def sample_path_points_with_features(
data_points: list[dict],
max_len: int,
*,
resample_mode: str = "spatial",
dt_clamp_min_ms: float = 1.0,
dt_clamp_max_ms: float = 200.0,
) -> tuple[np.ndarray, np.ndarray]:
"""
Sample path points with motion features to fixed length using interpolation.
Always uses interpolation (no zero-padding) to preserve feature structure.
Paths shorter than max_len are upsampled; longer paths are downsampled.
Modes:
- resample_mode="spatial": sample approximately uniformly in arc length (distance).
- resample_mode="time": sample uniformly in time (dwell regions get more samples).
Args:
data_points: List of coordinate dicts. Expected keys: x, y and either:
- dx, dy (preferred), plus optional ds, dt, log_dt; or
- ds/log_dt/dt (ds can be derived from dx/dy; dt from log_dt).
max_len: Target length
resample_mode: "spatial" or "time"
dt_clamp_min_ms: Clamp for dt feature after resampling (first dt remains 0).
dt_clamp_max_ms: Clamp for dt feature after resampling.
Returns:
Tuple of (features, mask) where:
- features: [max_len, 6] array with (x, y, dx, dy, ds, log_dt)
- mask: [max_len] binary mask (all 1s since we always interpolate)
"""
num_points = len(data_points)
if num_points == 0:
# Empty path - return zeros
return (
np.zeros((max_len, 6), dtype=np.float32),
np.zeros(max_len, dtype=np.int64),
)
# Extract base signals
x = np.fromiter((p["x"] for p in data_points), dtype=np.float64, count=num_points)
y = np.fromiter((p["y"] for p in data_points), dtype=np.float64, count=num_points)
# Prefer provided dx/dy, otherwise derive from x/y
if all("dx" in p for p in data_points) and all("dy" in p for p in data_points):
dx_in = np.fromiter((p["dx"] for p in data_points), dtype=np.float64, count=num_points)
dy_in = np.fromiter((p["dy"] for p in data_points), dtype=np.float64, count=num_points)
else:
dx_in = np.concatenate([[0.0], np.diff(x)])
dy_in = np.concatenate([[0.0], np.diff(y)])
# ds can be provided or derived from dx/dy
if all("ds" in p for p in data_points):
ds_in = np.fromiter((p["ds"] for p in data_points), dtype=np.float64, count=num_points)
else:
ds_in = np.sqrt(dx_in**2 + dy_in**2)
# Time axis for resampling: prefer dt_raw (unclamped) so "dwell" gets represented.
if all("dt_raw" in p for p in data_points):
dt_axis = np.fromiter(
(p["dt_raw"] for p in data_points), dtype=np.float64, count=num_points
)
elif all("dt" in p for p in data_points):
dt_axis = np.fromiter((p["dt"] for p in data_points), dtype=np.float64, count=num_points)
elif all("log_dt" in p for p in data_points):
log_dt_in_raw = np.fromiter(
(p["log_dt"] for p in data_points), dtype=np.float64, count=num_points
)
dt_axis = np.expm1(log_dt_in_raw)
else:
dt_axis = np.zeros(num_points, dtype=np.float64)
# Cumulative arc length (s) and cumulative time (tau) for resampling
s = np.cumsum(ds_in)
tau = np.cumsum(dt_axis)
if resample_mode not in {"spatial", "time"}:
raise ValueError(f"Unknown resample_mode={resample_mode!r} (use 'spatial' or 'time')")
eps = 1e-12
if resample_mode == "time" and tau[-1] > eps:
target_tau = np.linspace(0.0, float(tau[-1]), max_len, dtype=np.float64)
x_r = np.interp(target_tau, tau, x)
y_r = np.interp(target_tau, tau, y)
tau_r = target_tau
else:
# Spatial sampling (or fallback when time axis is degenerate).
# Handle degenerate paths (zero movement): fall back to index-based interpolation
if s[-1] <= eps:
original = np.arange(num_points, dtype=np.float64)
target = np.linspace(0, num_points - 1, max_len, dtype=np.float64)
x_r = np.interp(target, original, x)
y_r = np.interp(target, original, y)
tau_r = np.interp(target, original, tau)
else:
target_s = np.linspace(0.0, float(s[-1]), max_len, dtype=np.float64)
x_r = np.interp(target_s, s, x)
y_r = np.interp(target_s, s, y)
tau_r = np.interp(target_s, s, tau)
# Recompute deltas on the resampled path for consistency
dx = np.concatenate([[0.0], np.diff(x_r)])
dy = np.concatenate([[0.0], np.diff(y_r)])
ds = np.sqrt(dx**2 + dy**2)
dt_raw_r = np.concatenate([[0.0], np.diff(tau_r)])
dt_feat = np.clip(dt_raw_r, dt_clamp_min_ms, dt_clamp_max_ms)
dt_feat[0] = 0.0
log_dt = np.log1p(np.maximum(0.0, dt_feat))
mask = np.ones(max_len, dtype=np.int64)
features = np.stack([x_r, y_r, dx, dy, ds, log_dt], axis=-1).astype(np.float32)
return features, mask