"""End-to-end inference demo on a single full-meter image. full BGR frame ─► deskew ─► rectify (175 × 736 strip) │ ▼ 8 × (105 × 86) slot crops │ ▼ DINOv2-small CLS features (8, 384) │ ┌──────────┴──────────┐ ▼ ▼ digit_classifier d4d5 / d6d7 Predictor90 (10-way digit) (theta + entropy) └──────────┬──────────┘ ▼ 8-digit reading + per-slot annotation card The same `digit_classifier` (trained pooled across slots 4+5+6+7 — the slots in the dataset that exercise all 10 digit classes) is applied to slots 0–4 at inference. On captures from the *source* meter, slots 0–3 will emit constants — that's because the upper drums of that specific meter didn't move during data collection, so the trained classifier sees an input that looks exactly like one digit class for those slots. Usage: python demo.py # pick one sample capture python demo.py --image PATH # supply your own 1920×1080 frame python demo.py --image PATH --out OUT.jpg """ from __future__ import annotations import argparse import sys import textwrap import time import os from pathlib import Path # Match train.py — turn on the high-perf downloader before importing # huggingface_hub. The default Python downloader silently throttles on # repeated single-file fetches against the same dataset. os.environ.setdefault('HF_XET_HIGH_PERFORMANCE', '1') os.environ.setdefault('HF_HUB_DOWNLOAD_TIMEOUT', '30') import cv2 import numpy as np import pandas as pd import torch from huggingface_hub import hf_hub_download import models import rectifier HERE = Path(__file__).parent WEIGHTS = HERE / 'weights' DATASET_ID = 'S3CUR/badger-55-watermeter' # ── inference ───────────────────────────────────────────────────────── def infer_full_frame(img_bgr: np.ndarray, weights_dir: Path, device: str | None = None): """Run the full pipeline. Returns a dict with the rectified strip, per-slot predictions, and rectify info.""" if device is None: device = 'cuda' if torch.cuda.is_available() else 'cpu' tight, info = rectifier.rectify(img_bgr) if tight is None: raise RuntimeError(f"rectifier failed: {info.get('error', 'unknown')}") slots = rectifier.tight_to_slots(tight) slots_arr = models.slot_crops_to_array(slots) dino = models.DinoV2(device=device) feats = dino.features(slots_arr) # (8, 384) # Heads d4d5 = models.Predictor90().to(device).eval() d4d5.load_state_dict(torch.load(weights_dir / 'd4d5_predictor90.pt', map_location=device, weights_only=True)) d6d7 = models.Predictor90().to(device).eval() d6d7.load_state_dict(torch.load(weights_dir / 'd6d7_predictor90.pt', map_location=device, weights_only=True)) digit_cls = models.SlotClassifier().to(device).eval() digit_cls.load_state_dict(torch.load(weights_dir / 'digit_classifier.pt', map_location=device, weights_only=True)) with torch.no_grad(): # Two heads on every applicable slot: classifier (10-way digit) # and predictor90 (90-bin angular). d4d5_* trained on slots {4,5} # but applies fine to slots 0..5 since the upper drums are visually # similar; d6d7_* covers slots {6,7}. cls_probs_all = digit_cls(feats).softmax(dim=-1) # (8, 10) cls_digits_all = cls_probs_all.argmax(dim=-1) d4d5_p90 = models.predictor90_decode(d4d5(feats[0:6])) d6d7_p90 = models.predictor90_decode(d6d7(feats[6:8])) per_slot = [] for s in range(8): cla_d = int(cls_digits_all[s].item()) cla_p = float(cls_probs_all[s, cla_d]) if s <= 5: p90_d = int(d4d5_p90['digit'][s]) p90_t = float(d4d5_p90['theta_deg'][s]) p90_p = float(d4d5_p90['top1_prob'][s]) p90_h = float(d4d5_p90['entropy'][s]) else: i = s - 6 p90_d = int(d6d7_p90['digit'][i]) p90_t = float(d6d7_p90['theta_deg'][i]) p90_p = float(d6d7_p90['top1_prob'][i]) p90_h = float(d6d7_p90['entropy'][i]) # ── Two-voter consensus ──────────────────────────────────────── # When CLA and P90 agree, take that. When they disagree, prefer # P90 for d1-d7 (its θ disambiguates upper-drum mid-roll cleanly # — empirically the right call when the slot ever sees motion). # For d0, defer to CLA: it's a hard constant on this meter, and # P90 on a constant slot is meaningless. if cla_d == p90_d: consensus, conf, source = cla_d, max(cla_p, p90_p), 'agree' elif s == 0: consensus, conf, source = cla_d, cla_p, 'classifier' else: consensus, conf, source = p90_d, p90_p, 'predictor90' per_slot.append({ 'slot': s, 'digit': consensus, 'top1_prob': conf, 'theta_deg': p90_t, 'entropy': p90_h, 'source': source, 'cla_digit': cla_d, 'cla_prob': cla_p, 'p90_digit': p90_d, 'p90_prob': p90_p, }) reading_str = ''.join(str(p['digit']) for p in per_slot) return { 'reading': reading_str, 'gallons': int(reading_str) / 10.0, 'tight': tight, 'slots': slots, 'per_slot': per_slot, 'rectify_info': info, } # ── rendering ───────────────────────────────────────────────────────── # Layout constants — mirror the production renderer so the visual # language is familiar (header → frame → rectified strip → voter cards). # Dropped relative to production: SDR row, PREV-reading delta row, WARN # row, VLM voter, cascade-gate state promotion. None of those apply to a # standalone single-frame demo. CANVAS_W = 940 HEADER_H = 86 SOURCE_H = 540 STRIP_MAX_H = 200 CARD_PANEL_H = 168 NSLOTS = 8 CARD_GAP = 6 PAD = 4 BG = (18, 18, 18) CARD_BG = (36, 36, 32) CARD_BORDER = (64, 64, 60) FG = (230, 230, 230) DIM = (140, 140, 140) NA = (90, 90, 90) AGREE = (80, 200, 60) # green DISAGREE = (60, 60, 240) # red (BGR) EDGE_C = (60, 180, 230) # yellow STATE_COL = {'OK': AGREE, 'EDGE': EDGE_C, 'DISPUTED': DISAGREE} FONT = cv2.FONT_HERSHEY_SIMPLEX def _state_for(conf: float | None) -> str: """Confidence → state badge. Same thresholds as the production `_display_conf` color picker (≥0.85 green, ≥0.50 yellow, else red).""" if conf is None: return 'OK' if conf >= 0.85: return 'OK' if conf >= 0.50: return 'EDGE' return 'DISPUTED' def _draw_voter_row(out, x0, y, label, vote, agree, informational=False): """One voter row inside a card. Three render states: - vote=None → '-' placeholder in grey - informational=True → grey digit, no chip/circle - else → colored chip + agreement circle""" cv2.putText(out, label, (x0 + 8, y), FONT, 0.40, DIM, 1) if vote is None: cv2.putText(out, '-', (x0 + 44, y), FONT, 0.42, NA, 1) return if informational: cv2.putText(out, vote, (x0 + 43, y), FONT, 0.42, DIM, 1) return col = AGREE if agree else DISAGREE cv2.rectangle(out, (x0 + 40, y - 10), (x0 + 56, y + 3), col, -1) cv2.putText(out, vote, (x0 + 43, y), FONT, 0.42, (15, 15, 15), 1) cv2.circle(out, (x0 + 70, y - 4), 4, col, -1) def _draw_card(canvas, x0, y0, card_w, card_h, per_slot): digit_s = str(per_slot['digit']) conf = per_slot.get('top1_prob') state = _state_for(conf) sc = STATE_COL[state] # Backdrop + border + state stripe cv2.rectangle(canvas, (x0, y0), (x0 + card_w, y0 + card_h), CARD_BG, -1) cv2.rectangle(canvas, (x0, y0), (x0 + card_w, y0 + card_h), CARD_BORDER, 1) cv2.rectangle(canvas, (x0, y0), (x0 + 3, y0 + card_h), sc, -1) # Slot label top-left cv2.putText(canvas, f"d{per_slot['slot']}", (x0 + 8, y0 + 14), FONT, 0.40, DIM, 1) # Big committed digit, centered big_color = DISAGREE if state == 'DISPUTED' else FG (tw, th), _ = cv2.getTextSize(digit_s, FONT, 1.3, 2) cv2.putText(canvas, digit_s, (x0 + (card_w - tw) // 2, y0 + 14 + th + 4), FONT, 1.3, big_color, 2) # Confidence % top-right if conf is not None: conf_col = (AGREE if conf >= 0.85 else EDGE_C if conf >= 0.50 else DISAGREE) cv2.putText(canvas, f"{int(conf * 100)}%", (x0 + card_w - 34, y0 + 14), FONT, 0.40, conf_col, 1) # θ for d4-d7 theta = per_slot.get('theta_deg') if theta is not None: cv2.putText(canvas, f"{int(theta)}deg", (x0 + card_w // 2 - 18, y0 + 80), FONT, 0.40, DIM, 1) # Voter rows — CLA + P90. Both heads run on every applicable slot; # the head whose vote MATCHES the consensus gets a colored chip, # the other shows its digit in informational grey. voters_y = y0 + 90 row_h = 17 cla_v = str(per_slot['cla_digit']) p90_v = str(per_slot['p90_digit']) _draw_voter_row(canvas, x0, voters_y, 'CLA', cla_v, cla_v == digit_s, informational=(cla_v != digit_s)) _draw_voter_row(canvas, x0, voters_y + row_h, 'P90', p90_v, p90_v == digit_s, informational=(p90_v != digit_s)) def render_result(img_bgr: np.ndarray, result: dict, out_path: Path): """Compose a production-style annotated image: header → original frame → rectified strip → 8 voter cards.""" canvas_w = CANVAS_W # Scale the source frame to canvas_w, capped at SOURCE_H tall src_h, src_w = img_bgr.shape[:2] src_scale = min(canvas_w / src_w, SOURCE_H / src_h) src_w_s = int(src_w * src_scale) src_h_s = int(src_h * src_scale) src_scaled = cv2.resize(img_bgr, (src_w_s, src_h_s), interpolation=cv2.INTER_LANCZOS4) # Rectified strip — scale up to canvas_w tight = result['tight'] th, tw = tight.shape[:2] t_scale = min(canvas_w / tw, STRIP_MAX_H / th) tw_s = int(tw * t_scale) th_s = int(th * t_scale) strip_scaled = cv2.resize(tight, (tw_s, th_s), interpolation=cv2.INTER_LANCZOS4) # Card panel card_w = (canvas_w - (NSLOTS - 1) * CARD_GAP) // NSLOTS card_h = CARD_PANEL_H total_h = HEADER_H + src_h_s + th_s + card_h + 4 * PAD canvas = np.full((total_h, canvas_w, 3), BG, dtype=np.uint8) # ── header ──────────────────────────────────────────────────────── info = result['rectify_info'] cv2.putText(canvas, f"reading {result['reading']}", (12, 32), FONT, 0.85, FG, 2) cv2.putText(canvas, f"{result['gallons']:.1f} gallons", (12, 58), FONT, 0.50, DIM, 1) meta = (f"deskew {info['deskew_angle']:+.2f}deg | " f"{info['n_windows']} windows | " f"residual {info.get('mean_residual_px', 0):.2f}px") (mw, _), _ = cv2.getTextSize(meta, FONT, 0.42, 1) cv2.putText(canvas, meta, (canvas_w - mw - 12, 32), FONT, 0.42, DIM, 1) # ── source frame ────────────────────────────────────────────────── y = HEADER_H src_x = (canvas_w - src_w_s) // 2 canvas[y:y + src_h_s, src_x:src_x + src_w_s] = src_scaled y += src_h_s + PAD # ── rectified strip ─────────────────────────────────────────────── strip_x = (canvas_w - tw_s) // 2 canvas[y:y + th_s, strip_x:strip_x + tw_s] = strip_scaled y += th_s + PAD # ── voter cards ─────────────────────────────────────────────────── for i, ps in enumerate(result['per_slot']): cx = i * (card_w + CARD_GAP) _draw_card(canvas, cx, y, card_w, card_h, ps) cv2.imwrite(str(out_path), canvas) print(f"[render] wrote {out_path} ({canvas.shape[1]}×{canvas.shape[0]})") # ── pick a sample image from the HF dataset cache ───────────────────── def pick_sample_image(cache_dir: Path | None = None) -> np.ndarray: """Fetch the captures parquet, pick one clean frame, decode bytes into a BGR numpy array. The dataset is self-contained in a single parquet — JPEG bytes are inline — so this is one HTTP request and no loose-file roulette.""" kwargs = {'repo_id': DATASET_ID, 'repo_type': 'dataset'} if cache_dir: kwargs['cache_dir'] = str(cache_dir) print(f"[hf] fetching {DATASET_ID}:captures.parquet") parquet_path = hf_hub_download(filename='captures.parquet', **kwargs) caps_df = pd.read_parquet(parquet_path) # Prefer test-split captures with the most human-tagged slots (cleanest # frames in the dataset). cands = caps_df[caps_df['split'] == 'test'].sort_values( 'n_slots_known', ascending=False).head(10) if not len(cands): cands = caps_df.head(10) pick = cands.iloc[0] print(f"[hf] picked capture_id={pick.get('capture_id')} " f"captured_at={pick['captured_at']} " f"n_known={pick['n_slots_known']}") img = cv2.imdecode(np.frombuffer(pick['image_bytes'], np.uint8), cv2.IMREAD_COLOR) if img is None: raise RuntimeError("failed to decode image_bytes for the chosen row") return img # ── main ────────────────────────────────────────────────────────────── def main(): ap = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, description=textwrap.dedent(__doc__)) ap.add_argument('--image', default=None, help='Path to a 1920×1080 BGR meter capture; default = pick one from the dataset') ap.add_argument('--out', default=str(HERE / 'demo_output.jpg')) ap.add_argument('--cache-dir', default=None) ap.add_argument('--device', default='cuda' if torch.cuda.is_available() else 'cpu') args = ap.parse_args() if args.image: img_path = Path(args.image) img = cv2.imread(str(img_path)) if img is None: sys.exit(f"cannot read {img_path}") print(f"[demo] loaded {img.shape[1]}×{img.shape[0]} from {img_path}") else: img = pick_sample_image( Path(args.cache_dir) if args.cache_dir else None) print(f"[demo] loaded {img.shape[1]}×{img.shape[0]} from HF dataset row") t0 = time.time() result = infer_full_frame(img, WEIGHTS, device=args.device) print(f"[infer] reading={result['reading']} " f"({result['gallons']:.1f} gal) in {(time.time()-t0)*1000:.0f} ms") for ps in result['per_slot']: bits = f"d{ps['slot']}={ps['digit']}" if ps['theta_deg'] is not None: bits += f" θ={ps['theta_deg']:6.2f}° p={ps['top1_prob']:.3f}" bits += f" [{ps['source']}]" print(" ", bits) render_result(img, result, Path(args.out)) if __name__ == '__main__': main()