dreamlessx commited on
Commit
6ba7e19
·
verified ·
1 Parent(s): d2ca600

Upload landmarkdiff/synthetic/tps_warp.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. landmarkdiff/synthetic/tps_warp.py +272 -0
landmarkdiff/synthetic/tps_warp.py ADDED
@@ -0,0 +1,272 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """TPS warping for synthetic pair generation.
2
+
3
+ Only warps deformable tissue - rigid structures (teeth, sclera) get
4
+ rigid translation instead. Prevents "rubber teeth" from naive TPS.
5
+ """
6
+
7
+ from __future__ import annotations
8
+
9
+ import cv2
10
+ import numpy as np
11
+
12
+
13
+ def compute_tps_transform(
14
+ src_pts: np.ndarray,
15
+ dst_pts: np.ndarray,
16
+ ) -> cv2.ThinPlateSplineShapeTransformer:
17
+ """Fit a TPS transform from src to dst points."""
18
+ src = src_pts.reshape(1, -1, 2).astype(np.float32)
19
+ dst = dst_pts.reshape(1, -1, 2).astype(np.float32)
20
+ matches = [cv2.DMatch(i, i, 0) for i in range(len(src_pts))]
21
+
22
+ tps = cv2.createThinPlateSplineShapeTransformer()
23
+ tps.estimateTransformation(dst, src, matches)
24
+ return tps
25
+
26
+
27
+ def _subsample_control_points(
28
+ src: np.ndarray,
29
+ dst: np.ndarray,
30
+ max_points: int = 80,
31
+ anchor_stride: int = 8,
32
+ ) -> tuple[np.ndarray, np.ndarray]:
33
+ """Keep all displaced points + sparse anchors. ~80 pts instead of 478, ~30x faster."""
34
+ displacements = np.linalg.norm(dst - src, axis=1)
35
+ displaced_mask = displacements > 0.5 # moved by > 0.5px
36
+ displaced_idx = np.where(displaced_mask)[0]
37
+
38
+ # Add sparse anchors from non-displaced landmarks
39
+ non_displaced_idx = np.where(~displaced_mask)[0]
40
+ anchor_idx = non_displaced_idx[::anchor_stride]
41
+
42
+ selected = np.concatenate([displaced_idx, anchor_idx])
43
+
44
+ # If still too many, subsample anchors more aggressively
45
+ if len(selected) > max_points:
46
+ n_anchors = max_points - len(displaced_idx)
47
+ if n_anchors > 0:
48
+ step = max(1, len(non_displaced_idx) // n_anchors)
49
+ anchor_idx = non_displaced_idx[::step][:n_anchors]
50
+ selected = np.concatenate([displaced_idx, anchor_idx])
51
+ else:
52
+ selected = displaced_idx[:max_points]
53
+
54
+ selected = np.unique(selected)
55
+ return src[selected], dst[selected]
56
+
57
+
58
+ def warp_image_tps(
59
+ image: np.ndarray,
60
+ src_landmarks: np.ndarray,
61
+ dst_landmarks: np.ndarray,
62
+ rigid_mask: np.ndarray | None = None,
63
+ ) -> np.ndarray:
64
+ """Apply TPS warp to an image with optional rigid region preservation."""
65
+ h, w = image.shape[:2]
66
+
67
+ src_pts = src_landmarks.astype(np.float32)
68
+ dst_pts = dst_landmarks.astype(np.float32)
69
+
70
+ # Subsample control points for speed (478 -> ~80)
71
+ src_sub, dst_sub = _subsample_control_points(src_pts, dst_pts)
72
+
73
+ # Compute TPS coefficients on subsampled points
74
+ map_x, map_y = _compute_tps_map(src_sub, dst_sub, w, h)
75
+
76
+ # Warp the image
77
+ warped = cv2.remap(
78
+ image,
79
+ map_x.astype(np.float32),
80
+ map_y.astype(np.float32),
81
+ interpolation=cv2.INTER_LINEAR,
82
+ borderMode=cv2.BORDER_REFLECT_101,
83
+ )
84
+
85
+ if rigid_mask is not None:
86
+ # For rigid regions, compute mean translation and apply rigidly
87
+ rigid_translation = _compute_rigid_translation(src_pts, dst_pts, rigid_mask, w, h)
88
+ rigid_warped = _apply_rigid_translation(image, rigid_translation)
89
+
90
+ # Composite: use rigid warp in rigid regions, TPS elsewhere
91
+ mask_f = rigid_mask.astype(np.float32)
92
+ if len(mask_f.shape) == 2:
93
+ mask_f = np.stack([mask_f] * 3, axis=-1)
94
+ mask_f = mask_f / 255.0 if mask_f.max() > 1 else mask_f
95
+ warped = (rigid_warped * mask_f + warped * (1 - mask_f)).astype(np.uint8)
96
+
97
+ return warped
98
+
99
+
100
+ def _compute_tps_map(
101
+ src: np.ndarray,
102
+ dst: np.ndarray,
103
+ width: int,
104
+ height: int,
105
+ ) -> tuple[np.ndarray, np.ndarray]:
106
+ """Build remap arrays from TPS control points via RBF interpolation."""
107
+ # Displacement at control points
108
+ dx = dst[:, 0] - src[:, 0]
109
+ dy = dst[:, 1] - src[:, 1]
110
+
111
+ # Create grid
112
+ grid_x, grid_y = np.meshgrid(np.arange(width), np.arange(height))
113
+ grid_x = grid_x.astype(np.float64)
114
+ grid_y = grid_y.astype(np.float64)
115
+
116
+ # RBF interpolation using TPS kernel: r^2 * log(r)
117
+ map_x = grid_x.copy()
118
+ map_y = grid_y.copy()
119
+
120
+ n = len(src)
121
+ if n == 0:
122
+ return map_x, map_y
123
+
124
+ # Solve TPS system for x and y displacements
125
+ weights_x = _solve_tps_weights(src, dx)
126
+ weights_y = _solve_tps_weights(src, dy)
127
+
128
+ # Evaluate on grid (vectorized for speed)
129
+ flat_x = grid_x.ravel()
130
+ flat_y = grid_y.ravel()
131
+ pts = np.stack([flat_x, flat_y], axis=1)
132
+
133
+ disp_x = _evaluate_tps(pts, src, weights_x)
134
+ disp_y = _evaluate_tps(pts, src, weights_y)
135
+
136
+ map_x = (flat_x - disp_x).reshape(height, width)
137
+ map_y = (flat_y - disp_y).reshape(height, width)
138
+
139
+ return map_x, map_y
140
+
141
+
142
+ def _tps_kernel(r: np.ndarray) -> np.ndarray:
143
+ """TPS radial basis function: r^2 * log(r), with r=0 -> 0."""
144
+ result = np.zeros_like(r)
145
+ mask = r > 0
146
+ result[mask] = r[mask] ** 2 * np.log(r[mask])
147
+ return result
148
+
149
+
150
+ def _solve_tps_weights(
151
+ control_pts: np.ndarray,
152
+ values: np.ndarray,
153
+ ) -> np.ndarray:
154
+ """Solve TPS system -> weight vector [w1..wn, a0, a1, a2]."""
155
+ n = len(control_pts)
156
+
157
+ # Build kernel matrix K (vectorized)
158
+ diff = control_pts[:, np.newaxis, :] - control_pts[np.newaxis, :, :] # (n, n, 2)
159
+ r_mat = np.sqrt((diff ** 2).sum(axis=2)) # (n, n)
160
+ K = np.zeros((n, n))
161
+ nz = r_mat > 0
162
+ K[nz] = r_mat[nz] ** 2 * np.log(r_mat[nz])
163
+
164
+ # Build system matrix [K P; P^T 0]
165
+ P = np.hstack([np.ones((n, 1)), control_pts]) # (n, 3)
166
+
167
+ L = np.zeros((n + 3, n + 3))
168
+ L[:n, :n] = K
169
+ L[:n, n:] = P
170
+ L[n:, :n] = P.T
171
+
172
+ # Regularization for numerical stability
173
+ L[:n, :n] += np.eye(n) * 1e-6
174
+
175
+ rhs = np.zeros(n + 3)
176
+ rhs[:n] = values
177
+
178
+ try:
179
+ weights = np.linalg.solve(L, rhs)
180
+ except np.linalg.LinAlgError:
181
+ weights = np.linalg.lstsq(L, rhs, rcond=None)[0]
182
+
183
+ return weights
184
+
185
+
186
+ def _evaluate_tps(
187
+ points: np.ndarray,
188
+ control_pts: np.ndarray,
189
+ weights: np.ndarray,
190
+ ) -> np.ndarray:
191
+ """Evaluate TPS at arbitrary points (vectorized)."""
192
+ n = len(control_pts)
193
+ w = weights[:n]
194
+ a = weights[n:] # affine: a0 + a1*x + a2*y
195
+
196
+ # Affine component
197
+ result = a[0] + a[1] * points[:, 0] + a[2] * points[:, 1]
198
+
199
+ # Vectorized RBF evaluation in batches to limit memory
200
+ batch_size = 50000
201
+ for start in range(0, len(points), batch_size):
202
+ end = min(start + batch_size, len(points))
203
+ batch = points[start:end] # (M, 2)
204
+
205
+ # Compute all distances at once: (M, n)
206
+ dx = batch[:, 0:1] - control_pts[:, 0] # (M, n) via broadcasting
207
+ dy = batch[:, 1:2] - control_pts[:, 1] # (M, n)
208
+ r = np.sqrt(dx ** 2 + dy ** 2)
209
+
210
+ # TPS kernel: r^2 * log(r), with r=0 -> 0
211
+ kernel = np.zeros_like(r)
212
+ mask = r > 0
213
+ kernel[mask] = r[mask] ** 2 * np.log(r[mask])
214
+
215
+ # Weighted sum across all control points
216
+ result[start:end] += kernel @ w
217
+
218
+ return result
219
+
220
+
221
+ def _compute_rigid_translation(
222
+ src: np.ndarray,
223
+ dst: np.ndarray,
224
+ mask: np.ndarray,
225
+ width: int,
226
+ height: int,
227
+ ) -> np.ndarray:
228
+ """Compute mean translation for rigid regions."""
229
+ # Find control points inside rigid mask
230
+ inside = []
231
+ for i, (x, y) in enumerate(src):
232
+ ix, iy = int(x), int(y)
233
+ if 0 <= ix < width and 0 <= iy < height:
234
+ if mask[iy, ix] > 0:
235
+ inside.append(i)
236
+
237
+ if not inside:
238
+ return np.array([0.0, 0.0])
239
+
240
+ dx = np.mean(dst[inside, 0] - src[inside, 0])
241
+ dy = np.mean(dst[inside, 1] - src[inside, 1])
242
+ return np.array([dx, dy])
243
+
244
+
245
+ def _apply_rigid_translation(
246
+ image: np.ndarray,
247
+ translation: np.ndarray,
248
+ ) -> np.ndarray:
249
+ """Apply rigid translation to an image."""
250
+ h, w = image.shape[:2]
251
+ M = np.float32([[1, 0, translation[0]], [0, 1, translation[1]]])
252
+ return cv2.warpAffine(image, M, (w, h), borderMode=cv2.BORDER_REFLECT_101)
253
+
254
+
255
+ def generate_random_warp(
256
+ landmarks: np.ndarray,
257
+ procedure_indices: list[int],
258
+ max_displacement: float = 15.0,
259
+ rng: np.random.Generator | None = None,
260
+ ) -> np.ndarray:
261
+ """Generate randomly warped landmarks for synthetic data."""
262
+ rng = rng or np.random.default_rng()
263
+ result = landmarks.copy()
264
+
265
+ for idx in procedure_indices:
266
+ if idx < len(landmarks):
267
+ dx = rng.uniform(-max_displacement, max_displacement)
268
+ dy = rng.uniform(-max_displacement, max_displacement)
269
+ result[idx, 0] += dx
270
+ result[idx, 1] += dy
271
+
272
+ return result