File size: 10,684 Bytes
b121266 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 |
"""Shared preprocessing utilities for swipe path data.
This module provides a single source of truth for path preprocessing,
used by both the training dataset and the HuggingFace processor.
"""
import numpy as np
def preprocess_raw_path_to_features(
data_points: list[dict],
max_len: int,
*,
resample_mode: str = "spatial",
dt_clamp_min_ms: float = 1.0,
dt_clamp_max_ms: float = 200.0,
) -> tuple[np.ndarray, np.ndarray]:
"""Convert a raw `{"x","y","t"}` path to fixed-length engineered features.
This is the fast path used by training and the HuggingFace processor. It avoids
building an intermediate list-of-dicts representation by:
1) extracting x/y/t arrays once,
2) resampling x/y using spatial- or time-uniform interpolation,
3) recomputing dx/dy/ds and log_dt on the resampled trajectory.
Args:
data_points: Raw path as a list of dicts with keys: "x", "y", "t".
max_len: Target length.
resample_mode: "spatial" (arc-length) or "time" (cumulative dt).
dt_clamp_min_ms: Clamp for dt feature after resampling (first dt remains 0).
dt_clamp_max_ms: Clamp for dt feature after resampling.
Returns:
(features, mask) where:
- features: [max_len, 6] float32 array (x, y, dx, dy, ds, log_dt)
- mask: [max_len] int64 array (1 for valid; all-ones for non-empty paths)
"""
num_points = len(data_points)
if num_points == 0:
return (
np.zeros((max_len, 6), dtype=np.float32),
np.zeros(max_len, dtype=np.int64),
)
x = np.fromiter((p["x"] for p in data_points), dtype=np.float64, count=num_points)
y = np.fromiter((p["y"] for p in data_points), dtype=np.float64, count=num_points)
t = np.fromiter((p["t"] for p in data_points), dtype=np.float64, count=num_points)
x = np.clip(x, 0.0, 1.0)
y = np.clip(y, 0.0, 1.0)
# Per-step deltas and axes for resampling
dx_in = np.concatenate([[0.0], np.diff(x)])
dy_in = np.concatenate([[0.0], np.diff(y)])
ds_in = np.hypot(dx_in, dy_in)
dt_raw_in = np.concatenate([[0.0], np.diff(t)])
s = np.cumsum(ds_in)
tau = np.cumsum(dt_raw_in)
if resample_mode not in {"spatial", "time"}:
raise ValueError(f"Unknown resample_mode={resample_mode!r} (use 'spatial' or 'time')")
eps = 1e-12
if resample_mode == "time" and tau[-1] > eps:
target_tau = np.linspace(0.0, float(tau[-1]), max_len, dtype=np.float64)
x_r = np.interp(target_tau, tau, x)
y_r = np.interp(target_tau, tau, y)
tau_r = target_tau
else:
# Spatial sampling (or fallback when time axis is degenerate).
if s[-1] <= eps:
original = np.arange(num_points, dtype=np.float64)
target = np.linspace(0, num_points - 1, max_len, dtype=np.float64)
x_r = np.interp(target, original, x)
y_r = np.interp(target, original, y)
tau_r = np.interp(target, original, tau)
else:
target_s = np.linspace(0.0, float(s[-1]), max_len, dtype=np.float64)
x_r = np.interp(target_s, s, x)
y_r = np.interp(target_s, s, y)
tau_r = np.interp(target_s, s, tau)
dx = np.concatenate([[0.0], np.diff(x_r)])
dy = np.concatenate([[0.0], np.diff(y_r)])
ds = np.hypot(dx, dy)
dt_raw_r = np.concatenate([[0.0], np.diff(tau_r)])
dt_feat = np.clip(dt_raw_r, dt_clamp_min_ms, dt_clamp_max_ms)
dt_feat[0] = 0.0
log_dt = np.log1p(np.maximum(0.0, dt_feat))
mask = np.ones(max_len, dtype=np.int64)
features = np.stack([x_r, y_r, dx, dy, ds, log_dt], axis=-1).astype(np.float32)
return features, mask
def normalize_and_compute_features(
data_points: list[dict],
dt_clamp_min_ms: float = 1.0,
dt_clamp_max_ms: float = 200.0,
) -> list[dict]:
"""
Normalize coordinates and compute motion features.
Computes delta features (dx, dy, dt) and log-scaled time deltas.
First point has dx=dy=dt=0 by convention.
Args:
data_points: List of {"x", "y", "t"} dicts
dt_clamp_min_ms: Minimum dt in milliseconds (inclusive).
dt_clamp_max_ms: Maximum dt in milliseconds (inclusive).
Returns:
List of dicts with keys:
- x, y: normalized coordinates in [0, 1]
- t: raw timestamp from input (passed through)
- dx, dy: deltas in x/y
- ds: sqrt(dx^2 + dy^2)
- dt_raw: raw time delta (unclamped)
- dt: clamped time delta used for feature stability
- log_dt: log1p(dt)
"""
if not data_points:
return []
num_points = len(data_points)
x = np.fromiter((p["x"] for p in data_points), dtype=np.float64, count=num_points)
y = np.fromiter((p["y"] for p in data_points), dtype=np.float64, count=num_points)
t = np.fromiter((p["t"] for p in data_points), dtype=np.float64, count=num_points)
x = np.clip(x, 0.0, 1.0)
y = np.clip(y, 0.0, 1.0)
dx = np.concatenate([[0.0], np.diff(x)])
dy = np.concatenate([[0.0], np.diff(y)])
ds = np.hypot(dx, dy)
dt_raw = np.concatenate([[0.0], np.diff(t)])
dt = np.clip(dt_raw, dt_clamp_min_ms, dt_clamp_max_ms)
dt[0] = 0.0
log_dt = np.log1p(np.maximum(0.0, dt))
out: list[dict] = []
for i in range(num_points):
out.append(
{
"x": float(x[i]),
"y": float(y[i]),
"t": float(t[i]),
"dx": float(dx[i]),
"dy": float(dy[i]),
"ds": float(ds[i]),
"dt_raw": float(dt_raw[i]),
"dt": float(dt[i]),
"log_dt": float(log_dt[i]),
}
)
return out
def sample_path_points_with_features(
data_points: list[dict],
max_len: int,
*,
resample_mode: str = "spatial",
dt_clamp_min_ms: float = 1.0,
dt_clamp_max_ms: float = 200.0,
) -> tuple[np.ndarray, np.ndarray]:
"""
Sample path points with motion features to fixed length using interpolation.
Always uses interpolation (no zero-padding) to preserve feature structure.
Paths shorter than max_len are upsampled; longer paths are downsampled.
Modes:
- resample_mode="spatial": sample approximately uniformly in arc length (distance).
- resample_mode="time": sample uniformly in time (dwell regions get more samples).
Args:
data_points: List of coordinate dicts. Expected keys: x, y and either:
- dx, dy (preferred), plus optional ds, dt, log_dt; or
- ds/log_dt/dt (ds can be derived from dx/dy; dt from log_dt).
max_len: Target length
resample_mode: "spatial" or "time"
dt_clamp_min_ms: Clamp for dt feature after resampling (first dt remains 0).
dt_clamp_max_ms: Clamp for dt feature after resampling.
Returns:
Tuple of (features, mask) where:
- features: [max_len, 6] array with (x, y, dx, dy, ds, log_dt)
- mask: [max_len] binary mask (all 1s since we always interpolate)
"""
num_points = len(data_points)
if num_points == 0:
# Empty path - return zeros
return (
np.zeros((max_len, 6), dtype=np.float32),
np.zeros(max_len, dtype=np.int64),
)
# Extract base signals
x = np.fromiter((p["x"] for p in data_points), dtype=np.float64, count=num_points)
y = np.fromiter((p["y"] for p in data_points), dtype=np.float64, count=num_points)
# Prefer provided dx/dy, otherwise derive from x/y
if all("dx" in p for p in data_points) and all("dy" in p for p in data_points):
dx_in = np.fromiter((p["dx"] for p in data_points), dtype=np.float64, count=num_points)
dy_in = np.fromiter((p["dy"] for p in data_points), dtype=np.float64, count=num_points)
else:
dx_in = np.concatenate([[0.0], np.diff(x)])
dy_in = np.concatenate([[0.0], np.diff(y)])
# ds can be provided or derived from dx/dy
if all("ds" in p for p in data_points):
ds_in = np.fromiter((p["ds"] for p in data_points), dtype=np.float64, count=num_points)
else:
ds_in = np.sqrt(dx_in**2 + dy_in**2)
# Time axis for resampling: prefer dt_raw (unclamped) so "dwell" gets represented.
if all("dt_raw" in p for p in data_points):
dt_axis = np.fromiter(
(p["dt_raw"] for p in data_points), dtype=np.float64, count=num_points
)
elif all("dt" in p for p in data_points):
dt_axis = np.fromiter((p["dt"] for p in data_points), dtype=np.float64, count=num_points)
elif all("log_dt" in p for p in data_points):
log_dt_in_raw = np.fromiter(
(p["log_dt"] for p in data_points), dtype=np.float64, count=num_points
)
dt_axis = np.expm1(log_dt_in_raw)
else:
dt_axis = np.zeros(num_points, dtype=np.float64)
# Cumulative arc length (s) and cumulative time (tau) for resampling
s = np.cumsum(ds_in)
tau = np.cumsum(dt_axis)
if resample_mode not in {"spatial", "time"}:
raise ValueError(f"Unknown resample_mode={resample_mode!r} (use 'spatial' or 'time')")
eps = 1e-12
if resample_mode == "time" and tau[-1] > eps:
target_tau = np.linspace(0.0, float(tau[-1]), max_len, dtype=np.float64)
x_r = np.interp(target_tau, tau, x)
y_r = np.interp(target_tau, tau, y)
tau_r = target_tau
else:
# Spatial sampling (or fallback when time axis is degenerate).
# Handle degenerate paths (zero movement): fall back to index-based interpolation
if s[-1] <= eps:
original = np.arange(num_points, dtype=np.float64)
target = np.linspace(0, num_points - 1, max_len, dtype=np.float64)
x_r = np.interp(target, original, x)
y_r = np.interp(target, original, y)
tau_r = np.interp(target, original, tau)
else:
target_s = np.linspace(0.0, float(s[-1]), max_len, dtype=np.float64)
x_r = np.interp(target_s, s, x)
y_r = np.interp(target_s, s, y)
tau_r = np.interp(target_s, s, tau)
# Recompute deltas on the resampled path for consistency
dx = np.concatenate([[0.0], np.diff(x_r)])
dy = np.concatenate([[0.0], np.diff(y_r)])
ds = np.sqrt(dx**2 + dy**2)
dt_raw_r = np.concatenate([[0.0], np.diff(tau_r)])
dt_feat = np.clip(dt_raw_r, dt_clamp_min_ms, dt_clamp_max_ms)
dt_feat[0] = 0.0
log_dt = np.log1p(np.maximum(0.0, dt_feat))
mask = np.ones(max_len, dtype=np.int64)
features = np.stack([x_r, y_r, dx, dy, ds, log_dt], axis=-1).astype(np.float32)
return features, mask
|