S3CUR's picture
Initial release: badger-55 meter reader
3800bd2 verified
"""End-to-end inference demo on a single full-meter image.
full BGR frame ─► deskew ─► rectify (175 Γ— 736 strip)
β”‚
β–Ό
8 Γ— (105 Γ— 86) slot crops
β”‚
β–Ό
DINOv2-small CLS features (8, 384)
β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β–Ό β–Ό
digit_classifier d4d5 / d6d7 Predictor90
(10-way digit) (theta + entropy)
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β–Ό
8-digit reading + per-slot annotation card
The same `digit_classifier` (trained pooled across slots 4+5+6+7 β€” the
slots in the dataset that exercise all 10 digit classes) is applied to
slots 0–4 at inference. On captures from the *source* meter, slots 0–3
will emit constants β€” that's because the upper drums of that specific
meter didn't move during data collection, so the trained classifier
sees an input that looks exactly like one digit class for those slots.
Usage:
python demo.py # pick one sample capture
python demo.py --image PATH # supply your own 1920Γ—1080 frame
python demo.py --image PATH --out OUT.jpg
"""
from __future__ import annotations
import argparse
import sys
import textwrap
import time
import os
from pathlib import Path
# Match train.py β€” turn on the high-perf downloader before importing
# huggingface_hub. The default Python downloader silently throttles on
# repeated single-file fetches against the same dataset.
os.environ.setdefault('HF_XET_HIGH_PERFORMANCE', '1')
os.environ.setdefault('HF_HUB_DOWNLOAD_TIMEOUT', '30')
import cv2
import numpy as np
import pandas as pd
import torch
from huggingface_hub import hf_hub_download
import models
import rectifier
HERE = Path(__file__).parent
WEIGHTS = HERE / 'weights'
DATASET_ID = 'S3CUR/badger-55-watermeter'
# ── inference ─────────────────────────────────────────────────────────
def infer_full_frame(img_bgr: np.ndarray, weights_dir: Path,
device: str | None = None):
"""Run the full pipeline. Returns a dict with the rectified strip,
per-slot predictions, and rectify info."""
if device is None:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
tight, info = rectifier.rectify(img_bgr)
if tight is None:
raise RuntimeError(f"rectifier failed: {info.get('error', 'unknown')}")
slots = rectifier.tight_to_slots(tight)
slots_arr = models.slot_crops_to_array(slots)
dino = models.DinoV2(device=device)
feats = dino.features(slots_arr) # (8, 384)
# Heads
d4d5 = models.Predictor90().to(device).eval()
d4d5.load_state_dict(torch.load(weights_dir / 'd4d5_predictor90.pt',
map_location=device, weights_only=True))
d6d7 = models.Predictor90().to(device).eval()
d6d7.load_state_dict(torch.load(weights_dir / 'd6d7_predictor90.pt',
map_location=device, weights_only=True))
digit_cls = models.SlotClassifier().to(device).eval()
digit_cls.load_state_dict(torch.load(weights_dir / 'digit_classifier.pt',
map_location=device, weights_only=True))
with torch.no_grad():
# Two heads on every applicable slot: classifier (10-way digit)
# and predictor90 (90-bin angular). d4d5_* trained on slots {4,5}
# but applies fine to slots 0..5 since the upper drums are visually
# similar; d6d7_* covers slots {6,7}.
cls_probs_all = digit_cls(feats).softmax(dim=-1) # (8, 10)
cls_digits_all = cls_probs_all.argmax(dim=-1)
d4d5_p90 = models.predictor90_decode(d4d5(feats[0:6]))
d6d7_p90 = models.predictor90_decode(d6d7(feats[6:8]))
per_slot = []
for s in range(8):
cla_d = int(cls_digits_all[s].item())
cla_p = float(cls_probs_all[s, cla_d])
if s <= 5:
p90_d = int(d4d5_p90['digit'][s])
p90_t = float(d4d5_p90['theta_deg'][s])
p90_p = float(d4d5_p90['top1_prob'][s])
p90_h = float(d4d5_p90['entropy'][s])
else:
i = s - 6
p90_d = int(d6d7_p90['digit'][i])
p90_t = float(d6d7_p90['theta_deg'][i])
p90_p = float(d6d7_p90['top1_prob'][i])
p90_h = float(d6d7_p90['entropy'][i])
# ── Two-voter consensus ────────────────────────────────────────
# When CLA and P90 agree, take that. When they disagree, prefer
# P90 for d1-d7 (its ΞΈ disambiguates upper-drum mid-roll cleanly
# β€” empirically the right call when the slot ever sees motion).
# For d0, defer to CLA: it's a hard constant on this meter, and
# P90 on a constant slot is meaningless.
if cla_d == p90_d:
consensus, conf, source = cla_d, max(cla_p, p90_p), 'agree'
elif s == 0:
consensus, conf, source = cla_d, cla_p, 'classifier'
else:
consensus, conf, source = p90_d, p90_p, 'predictor90'
per_slot.append({
'slot': s,
'digit': consensus,
'top1_prob': conf,
'theta_deg': p90_t,
'entropy': p90_h,
'source': source,
'cla_digit': cla_d, 'cla_prob': cla_p,
'p90_digit': p90_d, 'p90_prob': p90_p,
})
reading_str = ''.join(str(p['digit']) for p in per_slot)
return {
'reading': reading_str,
'gallons': int(reading_str) / 10.0,
'tight': tight,
'slots': slots,
'per_slot': per_slot,
'rectify_info': info,
}
# ── rendering ─────────────────────────────────────────────────────────
# Layout constants β€” mirror the production renderer so the visual
# language is familiar (header β†’ frame β†’ rectified strip β†’ voter cards).
# Dropped relative to production: SDR row, PREV-reading delta row, WARN
# row, VLM voter, cascade-gate state promotion. None of those apply to a
# standalone single-frame demo.
CANVAS_W = 940
HEADER_H = 86
SOURCE_H = 540
STRIP_MAX_H = 200
CARD_PANEL_H = 168
NSLOTS = 8
CARD_GAP = 6
PAD = 4
BG = (18, 18, 18)
CARD_BG = (36, 36, 32)
CARD_BORDER = (64, 64, 60)
FG = (230, 230, 230)
DIM = (140, 140, 140)
NA = (90, 90, 90)
AGREE = (80, 200, 60) # green
DISAGREE = (60, 60, 240) # red (BGR)
EDGE_C = (60, 180, 230) # yellow
STATE_COL = {'OK': AGREE, 'EDGE': EDGE_C, 'DISPUTED': DISAGREE}
FONT = cv2.FONT_HERSHEY_SIMPLEX
def _state_for(conf: float | None) -> str:
"""Confidence β†’ state badge. Same thresholds as the production
`_display_conf` color picker (β‰₯0.85 green, β‰₯0.50 yellow, else red)."""
if conf is None: return 'OK'
if conf >= 0.85: return 'OK'
if conf >= 0.50: return 'EDGE'
return 'DISPUTED'
def _draw_voter_row(out, x0, y, label, vote, agree, informational=False):
"""One voter row inside a card. Three render states:
- vote=None β†’ '-' placeholder in grey
- informational=True β†’ grey digit, no chip/circle
- else β†’ colored chip + agreement circle"""
cv2.putText(out, label, (x0 + 8, y), FONT, 0.40, DIM, 1)
if vote is None:
cv2.putText(out, '-', (x0 + 44, y), FONT, 0.42, NA, 1)
return
if informational:
cv2.putText(out, vote, (x0 + 43, y), FONT, 0.42, DIM, 1)
return
col = AGREE if agree else DISAGREE
cv2.rectangle(out, (x0 + 40, y - 10), (x0 + 56, y + 3), col, -1)
cv2.putText(out, vote, (x0 + 43, y), FONT, 0.42, (15, 15, 15), 1)
cv2.circle(out, (x0 + 70, y - 4), 4, col, -1)
def _draw_card(canvas, x0, y0, card_w, card_h, per_slot):
digit_s = str(per_slot['digit'])
conf = per_slot.get('top1_prob')
state = _state_for(conf)
sc = STATE_COL[state]
# Backdrop + border + state stripe
cv2.rectangle(canvas, (x0, y0), (x0 + card_w, y0 + card_h), CARD_BG, -1)
cv2.rectangle(canvas, (x0, y0), (x0 + card_w, y0 + card_h), CARD_BORDER, 1)
cv2.rectangle(canvas, (x0, y0), (x0 + 3, y0 + card_h), sc, -1)
# Slot label top-left
cv2.putText(canvas, f"d{per_slot['slot']}", (x0 + 8, y0 + 14),
FONT, 0.40, DIM, 1)
# Big committed digit, centered
big_color = DISAGREE if state == 'DISPUTED' else FG
(tw, th), _ = cv2.getTextSize(digit_s, FONT, 1.3, 2)
cv2.putText(canvas, digit_s,
(x0 + (card_w - tw) // 2, y0 + 14 + th + 4),
FONT, 1.3, big_color, 2)
# Confidence % top-right
if conf is not None:
conf_col = (AGREE if conf >= 0.85
else EDGE_C if conf >= 0.50 else DISAGREE)
cv2.putText(canvas, f"{int(conf * 100)}%",
(x0 + card_w - 34, y0 + 14), FONT, 0.40, conf_col, 1)
# ΞΈ for d4-d7
theta = per_slot.get('theta_deg')
if theta is not None:
cv2.putText(canvas, f"{int(theta)}deg",
(x0 + card_w // 2 - 18, y0 + 80), FONT, 0.40, DIM, 1)
# Voter rows β€” CLA + P90. Both heads run on every applicable slot;
# the head whose vote MATCHES the consensus gets a colored chip,
# the other shows its digit in informational grey.
voters_y = y0 + 90
row_h = 17
cla_v = str(per_slot['cla_digit'])
p90_v = str(per_slot['p90_digit'])
_draw_voter_row(canvas, x0, voters_y,
'CLA', cla_v, cla_v == digit_s,
informational=(cla_v != digit_s))
_draw_voter_row(canvas, x0, voters_y + row_h,
'P90', p90_v, p90_v == digit_s,
informational=(p90_v != digit_s))
def render_result(img_bgr: np.ndarray, result: dict, out_path: Path):
"""Compose a production-style annotated image:
header β†’ original frame β†’ rectified strip β†’ 8 voter cards."""
canvas_w = CANVAS_W
# Scale the source frame to canvas_w, capped at SOURCE_H tall
src_h, src_w = img_bgr.shape[:2]
src_scale = min(canvas_w / src_w, SOURCE_H / src_h)
src_w_s = int(src_w * src_scale)
src_h_s = int(src_h * src_scale)
src_scaled = cv2.resize(img_bgr, (src_w_s, src_h_s),
interpolation=cv2.INTER_LANCZOS4)
# Rectified strip β€” scale up to canvas_w
tight = result['tight']
th, tw = tight.shape[:2]
t_scale = min(canvas_w / tw, STRIP_MAX_H / th)
tw_s = int(tw * t_scale)
th_s = int(th * t_scale)
strip_scaled = cv2.resize(tight, (tw_s, th_s),
interpolation=cv2.INTER_LANCZOS4)
# Card panel
card_w = (canvas_w - (NSLOTS - 1) * CARD_GAP) // NSLOTS
card_h = CARD_PANEL_H
total_h = HEADER_H + src_h_s + th_s + card_h + 4 * PAD
canvas = np.full((total_h, canvas_w, 3), BG, dtype=np.uint8)
# ── header ────────────────────────────────────────────────────────
info = result['rectify_info']
cv2.putText(canvas, f"reading {result['reading']}",
(12, 32), FONT, 0.85, FG, 2)
cv2.putText(canvas, f"{result['gallons']:.1f} gallons",
(12, 58), FONT, 0.50, DIM, 1)
meta = (f"deskew {info['deskew_angle']:+.2f}deg | "
f"{info['n_windows']} windows | "
f"residual {info.get('mean_residual_px', 0):.2f}px")
(mw, _), _ = cv2.getTextSize(meta, FONT, 0.42, 1)
cv2.putText(canvas, meta, (canvas_w - mw - 12, 32),
FONT, 0.42, DIM, 1)
# ── source frame ──────────────────────────────────────────────────
y = HEADER_H
src_x = (canvas_w - src_w_s) // 2
canvas[y:y + src_h_s, src_x:src_x + src_w_s] = src_scaled
y += src_h_s + PAD
# ── rectified strip ───────────────────────────────────────────────
strip_x = (canvas_w - tw_s) // 2
canvas[y:y + th_s, strip_x:strip_x + tw_s] = strip_scaled
y += th_s + PAD
# ── voter cards ───────────────────────────────────────────────────
for i, ps in enumerate(result['per_slot']):
cx = i * (card_w + CARD_GAP)
_draw_card(canvas, cx, y, card_w, card_h, ps)
cv2.imwrite(str(out_path), canvas)
print(f"[render] wrote {out_path} ({canvas.shape[1]}Γ—{canvas.shape[0]})")
# ── pick a sample image from the HF dataset cache ─────────────────────
def pick_sample_image(cache_dir: Path | None = None) -> np.ndarray:
"""Fetch the captures parquet, pick one clean frame, decode bytes
into a BGR numpy array. The dataset is self-contained in a single
parquet β€” JPEG bytes are inline β€” so this is one HTTP request and
no loose-file roulette."""
kwargs = {'repo_id': DATASET_ID, 'repo_type': 'dataset'}
if cache_dir:
kwargs['cache_dir'] = str(cache_dir)
print(f"[hf] fetching {DATASET_ID}:captures.parquet")
parquet_path = hf_hub_download(filename='captures.parquet', **kwargs)
caps_df = pd.read_parquet(parquet_path)
# Prefer test-split captures with the most human-tagged slots (cleanest
# frames in the dataset).
cands = caps_df[caps_df['split'] == 'test'].sort_values(
'n_slots_known', ascending=False).head(10)
if not len(cands):
cands = caps_df.head(10)
pick = cands.iloc[0]
print(f"[hf] picked capture_id={pick.get('capture_id')} "
f"captured_at={pick['captured_at']} "
f"n_known={pick['n_slots_known']}")
img = cv2.imdecode(np.frombuffer(pick['image_bytes'], np.uint8),
cv2.IMREAD_COLOR)
if img is None:
raise RuntimeError("failed to decode image_bytes for the chosen row")
return img
# ── main ──────────────────────────────────────────────────────────────
def main():
ap = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent(__doc__))
ap.add_argument('--image', default=None,
help='Path to a 1920Γ—1080 BGR meter capture; default = pick one from the dataset')
ap.add_argument('--out', default=str(HERE / 'demo_output.jpg'))
ap.add_argument('--cache-dir', default=None)
ap.add_argument('--device', default='cuda' if torch.cuda.is_available() else 'cpu')
args = ap.parse_args()
if args.image:
img_path = Path(args.image)
img = cv2.imread(str(img_path))
if img is None:
sys.exit(f"cannot read {img_path}")
print(f"[demo] loaded {img.shape[1]}Γ—{img.shape[0]} from {img_path}")
else:
img = pick_sample_image(
Path(args.cache_dir) if args.cache_dir else None)
print(f"[demo] loaded {img.shape[1]}Γ—{img.shape[0]} from HF dataset row")
t0 = time.time()
result = infer_full_frame(img, WEIGHTS, device=args.device)
print(f"[infer] reading={result['reading']} "
f"({result['gallons']:.1f} gal) in {(time.time()-t0)*1000:.0f} ms")
for ps in result['per_slot']:
bits = f"d{ps['slot']}={ps['digit']}"
if ps['theta_deg'] is not None:
bits += f" ΞΈ={ps['theta_deg']:6.2f}Β° p={ps['top1_prob']:.3f}"
bits += f" [{ps['source']}]"
print(" ", bits)
render_result(img, result, Path(args.out))
if __name__ == '__main__':
main()