File size: 12,219 Bytes
3800bd2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 | """Rectifier for full-meter images of a Badger Model 55 water meter.
A raw camera frame of the meter face is tilted (the camera doesn't sit
perfectly square to the meter) and the digit strip occupies a small
fraction of the image. This module takes a 1920Γ1080 BGR frame and
produces eight 86Γ105 BGR slot crops, one per digit drum, axis-aligned
and at canonical scale.
The pipeline:
1. Deskew β find the digit-strip tilt angle and rotate so the
strip is horizontal.
2. Detect β segment the dark digit-window borders against the
bright meter face and find their bounding boxes.
3. Assign β figure out which detected window corresponds to
which slot index (0..7), handling missing detections.
4. Fit β solve a partial-affine (rotation + uniform scale +
translation) from detected window centers to their
canonical positions.
5. Warp β apply the affine + a translation that maps the
strip directly into a tight (175, 736) crop.
6. Slice β cut the tight crop into 8 Γ (105, 86) slot crops.
All geometric constants below were calibrated for the source meter and
camera used to build the published dataset. They are pixel coordinates,
not anything more interesting.
"""
from __future__ import annotations
import cv2
import numpy as np
# ββ Canonical strip layout ββββββββββββββββββββββββββββββββββββββββββββ
CANONICAL_W, CANONICAL_H = 1920, 1080
WIN_W, WIN_H = 80, 105 # nominal digit-window size, px
STEP = 86 # horizontal spacing between slots
STRIP_X0 = 580 # canonical x of slot-0 left edge
STRIP_Y0 = 480 # canonical y of strip top
SLOT_W, SLOT_H = STEP, WIN_H # per-slot crop dims (86, 105)
TIGHT_PAD_X = 0
TIGHT_PAD_Y = 0
TIGHT_H = WIN_H + 2 * TIGHT_PAD_Y # 175
TIGHT_W = 8 * STEP + 2 * TIGHT_PAD_X # 736
# ββ Stage 1: deskew βββββββββββββββββββββββββββββββββββββββββββββββββββ
def detect_rotation_degrees(img_bgr: np.ndarray, max_abs_deg: float = 20.0) -> float:
"""Estimate the strip's tilt by finding digit-window centroids and
fitting a line through them. Returns degrees-clockwise to rotate the
image to level. Falls back to 0Β° if fewer than 4 windows are found."""
H, W = img_bgr.shape[:2]
y0, y1 = int(H * 0.36), int(H * 0.47)
x0, x1 = int(W * 0.25), int(W * 0.75)
roi = img_bgr[y0:y1, x0:x1]
if roi.size == 0:
return 0.0
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
_, thr = cv2.threshold(gray, 60, 255, cv2.THRESH_BINARY_INV)
n, _, stats, centroids = cv2.connectedComponentsWithStats(thr, connectivity=8)
if n < 5:
return 0.0
cx_list, cy_list = [], []
for i in range(1, n):
x, y, w, h, area = stats[i]
if w == 0: continue
aspect = h / max(w, 1)
if 30 <= w <= 90 and 50 <= h <= 110 and 1.3 <= aspect <= 3.0 and area >= 200:
cx_list.append(float(centroids[i][0]))
cy_list.append(float(centroids[i][1]))
if len(cx_list) < 4:
return 0.0
cx = np.array(cx_list); cy = np.array(cy_list)
A = np.vstack([cx, np.ones_like(cx)]).T
slope, _ = np.linalg.lstsq(A, cy, rcond=None)[0]
angle = float(np.degrees(np.arctan(slope)))
return 0.0 if abs(angle) > max_abs_deg else angle
def deskew(img_bgr: np.ndarray) -> tuple[np.ndarray, float]:
"""Rotate `img_bgr` so the digit strip is horizontal. Returns
`(leveled_image, angle_applied_deg)`."""
angle = detect_rotation_degrees(img_bgr)
H, W = img_bgr.shape[:2]
M = cv2.getRotationMatrix2D((W / 2.0, H / 2.0), angle, 1.0)
rotated = cv2.warpAffine(img_bgr, M, (W, H), flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
return rotated, angle
# ββ Stage 2: detect digit windows βββββββββββββββββββββββββββββββββββββ
def detect_digit_windows(img_bgr: np.ndarray, threshold: int = 60,
use_otsu: bool = False) -> list[tuple[int, int, int, int]]:
"""Find dark digit-window rectangles against the bright meter face.
Apply *after* `deskew`. Returns `[(x0, y0, x1, y1), ...]` in image
coordinates, left to right. The threshold default works for the
dataset's source-camera exposure; pass `use_otsu=True` for re-warped
or off-camera images where the default misses borders."""
H, W = img_bgr.shape[:2]
y0_roi, y1_roi = int(H * 0.34), int(H * 0.49)
x0_roi, x1_roi = int(W * 0.20), int(W * 0.80)
roi = img_bgr[y0_roi:y1_roi, x0_roi:x1_roi]
if roi.size == 0: return []
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
if use_otsu:
_, thr = cv2.threshold(gray, 0, 255,
cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
else:
_, thr = cv2.threshold(gray, threshold, 255, cv2.THRESH_BINARY_INV)
n, _, stats, _ = cv2.connectedComponentsWithStats(thr, connectivity=8)
boxes = []
for i in range(1, n):
x, y, w, h, area = stats[i]
if w == 0: continue
aspect = h / max(w, 1)
if 30 <= w <= 90 and 50 <= h <= 110 and 1.3 <= aspect <= 3.0 and area >= 200:
boxes.append((x0_roi + x, y0_roi + y,
x0_roi + x + w, y0_roi + y + h))
return sorted(boxes, key=lambda b: b[0])
# ββ Stage 3: slot assignment ββββββββββββββββββββββββββββββββββββββββββ
def assign_slots(boxes: list[tuple[int, int, int, int]], img_w: int
) -> list[tuple[int, tuple[int, int, int, int]]]:
"""Map detected windows to slot indices 0..7.
Handles missing slots β e.g. a mid-roll digit whose contrast against
its window border collapses β by inferring slot index from inter-
detection gaps relative to the smallest gap (which is the true
slot-to-slot step)."""
if len(boxes) < 4: return []
centers_x = np.array([(b[0] + b[2]) / 2.0 for b in boxes])
order = np.argsort(centers_x)
sx = centers_x[order]
# Merge near-duplicate detections (motion-blur fragments)
groups = [[int(order[0])]]
for i in range(1, len(sx)):
if sx[i] - centers_x[groups[-1][0]] < 30:
groups[-1].append(int(order[i]))
else:
groups.append([int(order[i])])
rep_idx = [g[0] for g in groups]
pairs = sorted(zip(centers_x[rep_idx], rep_idx))
rep_cx = np.array([c for c, _ in pairs])
rep_idx = [i for _, i in pairs]
# Drop spatial outliers β keep the longest contiguous run where
# consecutive gaps are within 2.5Γ the smallest gap (the true step).
if len(rep_cx) >= 2:
gaps = np.diff(rep_cx)
step_est = float(np.min(gaps))
in_cluster = gaps <= 2.5 * step_est
best_lo, best_hi, best_len = 0, len(rep_cx) - 1, 0
run_lo = 0
for i, ok in enumerate(in_cluster):
if not ok:
run_len = i - run_lo + 1
if run_len > best_len:
best_len, best_lo, best_hi = run_len, run_lo, i
run_lo = i + 1
run_len = len(rep_cx) - run_lo
if run_len > best_len:
best_len, best_lo, best_hi = run_len, run_lo, len(rep_cx) - 1
if best_len >= 4:
rep_cx = rep_cx[best_lo:best_hi + 1]
rep_idx = rep_idx[best_lo:best_hi + 1]
if len(rep_cx) < 2: return []
step = float(np.min(np.diff(rep_cx)))
leftmost = float(rep_cx[0])
gaps = np.diff(rep_cx)
gap_in_steps = np.round(gaps / step).astype(int)
rel_idx = np.concatenate([[0], np.cumsum(gap_in_steps)])
rightmost_rel = int(rel_idx[-1])
# Pick which slot the leftmost detection actually is (0..7-rightmost_rel)
# by closeness to the expected canonical position of slot 0.
expected_slot0 = img_w * 0.62 - 7 * step
best_k0, best_score = None, float('inf')
for k0 in range(0, 8 - rightmost_rel):
slot0_cx = leftmost - k0 * step
score = abs(slot0_cx - expected_slot0)
if score < best_score:
best_score, best_k0 = score, k0
if best_k0 is None: return []
out = []
for i, b_idx in enumerate(rep_idx):
slot_k = best_k0 + int(rel_idx[i])
if 0 <= slot_k <= 7:
out.append((int(slot_k), boxes[b_idx]))
return out
# ββ Stage 4: affine fit βββββββββββββββββββββββββββββββββββββββββββββββ
def fit_affine_centers(slot_boxes
) -> tuple[np.ndarray | None, float | None, int]:
"""Partial-affine (rotation + uniform scale + translation) from
detected window centers to their canonical positions. Returns
`(M_3x3, mean_residual_px, n_used)` β `M` is in homography shape so
callers can use `cv2.warpPerspective` uniformly."""
if len(slot_boxes) < 3:
return None, None, len(slot_boxes)
src = np.array([[(b[0]+b[2])/2.0, (b[1]+b[3])/2.0] for _, b in slot_boxes],
dtype=np.float32)
dst = np.array([[STRIP_X0 + k*STEP + WIN_W/2.0, STRIP_Y0 + WIN_H/2.0]
for k, _ in slot_boxes], dtype=np.float32)
M, _ = cv2.estimateAffinePartial2D(src, dst, method=cv2.RANSAC,
ransacReprojThreshold=2.0)
if M is None:
return None, None, len(slot_boxes)
M3 = np.vstack([M, [0, 0, 1]]).astype(np.float32)
proj = cv2.transform(src.reshape(-1, 1, 2), M).reshape(-1, 2)
residuals = np.linalg.norm(proj - dst, axis=1)
return M3, float(np.mean(residuals)), len(slot_boxes)
# ββ End-to-end ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def rectify(img_bgr: np.ndarray, max_residual_px: float = 5.0,
min_windows: int = 6
) -> tuple[np.ndarray, dict] | tuple[None, dict]:
"""Run the full pipeline on a 1920Γ1080 BGR frame.
Returns `(tight_bgr, info)` on success, where `tight_bgr` is the
(175, 736, 3) BGR strip crop, or `(None, info)` on failure. `info`
always includes `deskew_angle`, `n_windows`, and the residual /
failure reason."""
img_lvl, angle = deskew(img_bgr)
info: dict = {'deskew_angle': float(angle)}
boxes = detect_digit_windows(img_lvl)
if len(boxes) < 6:
boxes = detect_digit_windows(img_lvl, use_otsu=True)
info['n_windows'] = len(boxes)
if len(boxes) < 4:
info['error'] = 'too few digit windows detected'
return None, info
slot_boxes = assign_slots(boxes, img_w=img_lvl.shape[1])
if len(slot_boxes) < min_windows:
info['error'] = f'only {len(slot_boxes)} slot assignments'
return None, info
H_mat, mean_resid, n_used = fit_affine_centers(slot_boxes)
info.update({'mean_residual_px': mean_resid, 'n_used': n_used})
if H_mat is None or mean_resid is None or mean_resid > max_residual_px:
info['error'] = 'affine fit too noisy'
return None, info
T = np.array([
[1.0, 0.0, -(STRIP_X0 - TIGHT_PAD_X)],
[0.0, 1.0, -(STRIP_Y0 - TIGHT_PAD_Y)],
[0.0, 0.0, 1.0],
], dtype=np.float32)
M_direct = (T @ H_mat).astype(np.float32)
tight = cv2.warpPerspective(img_lvl, M_direct, (TIGHT_W, TIGHT_H),
flags=cv2.INTER_LANCZOS4)
return tight, info
def tight_to_slots(tight_bgr: np.ndarray) -> list[np.ndarray]:
"""Cut a (175, 736, 3) tight strip into 8 Γ (105, 86, 3) BGR slot
crops, slot-0 first."""
out = []
y0, y1 = TIGHT_PAD_Y, TIGHT_PAD_Y + SLOT_H
for s in range(8):
x0 = TIGHT_PAD_X + s * STEP
x1 = x0 + SLOT_W
out.append(tight_bgr[y0:y1, x0:x1].copy())
return out
|