| """End-to-end inference demo on a single full-meter image. |
| |
| full BGR frame ββΊ deskew ββΊ rectify (175 Γ 736 strip) |
| β |
| βΌ |
| 8 Γ (105 Γ 86) slot crops |
| β |
| βΌ |
| DINOv2-small CLS features (8, 384) |
| β |
| ββββββββββββ΄βββββββββββ |
| βΌ βΌ |
| digit_classifier d4d5 / d6d7 Predictor90 |
| (10-way digit) (theta + entropy) |
| ββββββββββββ¬βββββββββββ |
| βΌ |
| 8-digit reading + per-slot annotation card |
| |
| The same `digit_classifier` (trained pooled across slots 4+5+6+7 β the |
| slots in the dataset that exercise all 10 digit classes) is applied to |
| slots 0β4 at inference. On captures from the *source* meter, slots 0β3 |
| will emit constants β that's because the upper drums of that specific |
| meter didn't move during data collection, so the trained classifier |
| sees an input that looks exactly like one digit class for those slots. |
| |
| Usage: |
| python demo.py # pick one sample capture |
| python demo.py --image PATH # supply your own 1920Γ1080 frame |
| python demo.py --image PATH --out OUT.jpg |
| """ |
| from __future__ import annotations |
|
|
| import argparse |
| import sys |
| import textwrap |
| import time |
| import os |
| from pathlib import Path |
|
|
| |
| |
| |
| os.environ.setdefault('HF_XET_HIGH_PERFORMANCE', '1') |
| os.environ.setdefault('HF_HUB_DOWNLOAD_TIMEOUT', '30') |
|
|
| import cv2 |
| import numpy as np |
| import pandas as pd |
| import torch |
| from huggingface_hub import hf_hub_download |
|
|
| import models |
| import rectifier |
|
|
|
|
| HERE = Path(__file__).parent |
| WEIGHTS = HERE / 'weights' |
| DATASET_ID = 'S3CUR/badger-55-watermeter' |
|
|
| |
| def infer_full_frame(img_bgr: np.ndarray, weights_dir: Path, |
| device: str | None = None): |
| """Run the full pipeline. Returns a dict with the rectified strip, |
| per-slot predictions, and rectify info.""" |
| if device is None: |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' |
| tight, info = rectifier.rectify(img_bgr) |
| if tight is None: |
| raise RuntimeError(f"rectifier failed: {info.get('error', 'unknown')}") |
| slots = rectifier.tight_to_slots(tight) |
| slots_arr = models.slot_crops_to_array(slots) |
|
|
| dino = models.DinoV2(device=device) |
| feats = dino.features(slots_arr) |
|
|
| |
| d4d5 = models.Predictor90().to(device).eval() |
| d4d5.load_state_dict(torch.load(weights_dir / 'd4d5_predictor90.pt', |
| map_location=device, weights_only=True)) |
| d6d7 = models.Predictor90().to(device).eval() |
| d6d7.load_state_dict(torch.load(weights_dir / 'd6d7_predictor90.pt', |
| map_location=device, weights_only=True)) |
| digit_cls = models.SlotClassifier().to(device).eval() |
| digit_cls.load_state_dict(torch.load(weights_dir / 'digit_classifier.pt', |
| map_location=device, weights_only=True)) |
|
|
| with torch.no_grad(): |
| |
| |
| |
| |
| cls_probs_all = digit_cls(feats).softmax(dim=-1) |
| cls_digits_all = cls_probs_all.argmax(dim=-1) |
| d4d5_p90 = models.predictor90_decode(d4d5(feats[0:6])) |
| d6d7_p90 = models.predictor90_decode(d6d7(feats[6:8])) |
|
|
| per_slot = [] |
| for s in range(8): |
| cla_d = int(cls_digits_all[s].item()) |
| cla_p = float(cls_probs_all[s, cla_d]) |
| if s <= 5: |
| p90_d = int(d4d5_p90['digit'][s]) |
| p90_t = float(d4d5_p90['theta_deg'][s]) |
| p90_p = float(d4d5_p90['top1_prob'][s]) |
| p90_h = float(d4d5_p90['entropy'][s]) |
| else: |
| i = s - 6 |
| p90_d = int(d6d7_p90['digit'][i]) |
| p90_t = float(d6d7_p90['theta_deg'][i]) |
| p90_p = float(d6d7_p90['top1_prob'][i]) |
| p90_h = float(d6d7_p90['entropy'][i]) |
|
|
| |
| |
| |
| |
| |
| |
| if cla_d == p90_d: |
| consensus, conf, source = cla_d, max(cla_p, p90_p), 'agree' |
| elif s == 0: |
| consensus, conf, source = cla_d, cla_p, 'classifier' |
| else: |
| consensus, conf, source = p90_d, p90_p, 'predictor90' |
|
|
| per_slot.append({ |
| 'slot': s, |
| 'digit': consensus, |
| 'top1_prob': conf, |
| 'theta_deg': p90_t, |
| 'entropy': p90_h, |
| 'source': source, |
| 'cla_digit': cla_d, 'cla_prob': cla_p, |
| 'p90_digit': p90_d, 'p90_prob': p90_p, |
| }) |
|
|
| reading_str = ''.join(str(p['digit']) for p in per_slot) |
| return { |
| 'reading': reading_str, |
| 'gallons': int(reading_str) / 10.0, |
| 'tight': tight, |
| 'slots': slots, |
| 'per_slot': per_slot, |
| 'rectify_info': info, |
| } |
|
|
|
|
| |
| |
| |
| |
| |
| |
| CANVAS_W = 940 |
| HEADER_H = 86 |
| SOURCE_H = 540 |
| STRIP_MAX_H = 200 |
| CARD_PANEL_H = 168 |
| NSLOTS = 8 |
| CARD_GAP = 6 |
| PAD = 4 |
|
|
| BG = (18, 18, 18) |
| CARD_BG = (36, 36, 32) |
| CARD_BORDER = (64, 64, 60) |
| FG = (230, 230, 230) |
| DIM = (140, 140, 140) |
| NA = (90, 90, 90) |
| AGREE = (80, 200, 60) |
| DISAGREE = (60, 60, 240) |
| EDGE_C = (60, 180, 230) |
|
|
| STATE_COL = {'OK': AGREE, 'EDGE': EDGE_C, 'DISPUTED': DISAGREE} |
|
|
| FONT = cv2.FONT_HERSHEY_SIMPLEX |
|
|
|
|
| def _state_for(conf: float | None) -> str: |
| """Confidence β state badge. Same thresholds as the production |
| `_display_conf` color picker (β₯0.85 green, β₯0.50 yellow, else red).""" |
| if conf is None: return 'OK' |
| if conf >= 0.85: return 'OK' |
| if conf >= 0.50: return 'EDGE' |
| return 'DISPUTED' |
|
|
|
|
| def _draw_voter_row(out, x0, y, label, vote, agree, informational=False): |
| """One voter row inside a card. Three render states: |
| - vote=None β '-' placeholder in grey |
| - informational=True β grey digit, no chip/circle |
| - else β colored chip + agreement circle""" |
| cv2.putText(out, label, (x0 + 8, y), FONT, 0.40, DIM, 1) |
| if vote is None: |
| cv2.putText(out, '-', (x0 + 44, y), FONT, 0.42, NA, 1) |
| return |
| if informational: |
| cv2.putText(out, vote, (x0 + 43, y), FONT, 0.42, DIM, 1) |
| return |
| col = AGREE if agree else DISAGREE |
| cv2.rectangle(out, (x0 + 40, y - 10), (x0 + 56, y + 3), col, -1) |
| cv2.putText(out, vote, (x0 + 43, y), FONT, 0.42, (15, 15, 15), 1) |
| cv2.circle(out, (x0 + 70, y - 4), 4, col, -1) |
|
|
|
|
| def _draw_card(canvas, x0, y0, card_w, card_h, per_slot): |
| digit_s = str(per_slot['digit']) |
| conf = per_slot.get('top1_prob') |
| state = _state_for(conf) |
| sc = STATE_COL[state] |
|
|
| |
| cv2.rectangle(canvas, (x0, y0), (x0 + card_w, y0 + card_h), CARD_BG, -1) |
| cv2.rectangle(canvas, (x0, y0), (x0 + card_w, y0 + card_h), CARD_BORDER, 1) |
| cv2.rectangle(canvas, (x0, y0), (x0 + 3, y0 + card_h), sc, -1) |
|
|
| |
| cv2.putText(canvas, f"d{per_slot['slot']}", (x0 + 8, y0 + 14), |
| FONT, 0.40, DIM, 1) |
|
|
| |
| big_color = DISAGREE if state == 'DISPUTED' else FG |
| (tw, th), _ = cv2.getTextSize(digit_s, FONT, 1.3, 2) |
| cv2.putText(canvas, digit_s, |
| (x0 + (card_w - tw) // 2, y0 + 14 + th + 4), |
| FONT, 1.3, big_color, 2) |
|
|
| |
| if conf is not None: |
| conf_col = (AGREE if conf >= 0.85 |
| else EDGE_C if conf >= 0.50 else DISAGREE) |
| cv2.putText(canvas, f"{int(conf * 100)}%", |
| (x0 + card_w - 34, y0 + 14), FONT, 0.40, conf_col, 1) |
|
|
| |
| theta = per_slot.get('theta_deg') |
| if theta is not None: |
| cv2.putText(canvas, f"{int(theta)}deg", |
| (x0 + card_w // 2 - 18, y0 + 80), FONT, 0.40, DIM, 1) |
|
|
| |
| |
| |
| voters_y = y0 + 90 |
| row_h = 17 |
| cla_v = str(per_slot['cla_digit']) |
| p90_v = str(per_slot['p90_digit']) |
|
|
| _draw_voter_row(canvas, x0, voters_y, |
| 'CLA', cla_v, cla_v == digit_s, |
| informational=(cla_v != digit_s)) |
| _draw_voter_row(canvas, x0, voters_y + row_h, |
| 'P90', p90_v, p90_v == digit_s, |
| informational=(p90_v != digit_s)) |
|
|
|
|
| def render_result(img_bgr: np.ndarray, result: dict, out_path: Path): |
| """Compose a production-style annotated image: |
| header β original frame β rectified strip β 8 voter cards.""" |
| canvas_w = CANVAS_W |
|
|
| |
| src_h, src_w = img_bgr.shape[:2] |
| src_scale = min(canvas_w / src_w, SOURCE_H / src_h) |
| src_w_s = int(src_w * src_scale) |
| src_h_s = int(src_h * src_scale) |
| src_scaled = cv2.resize(img_bgr, (src_w_s, src_h_s), |
| interpolation=cv2.INTER_LANCZOS4) |
|
|
| |
| tight = result['tight'] |
| th, tw = tight.shape[:2] |
| t_scale = min(canvas_w / tw, STRIP_MAX_H / th) |
| tw_s = int(tw * t_scale) |
| th_s = int(th * t_scale) |
| strip_scaled = cv2.resize(tight, (tw_s, th_s), |
| interpolation=cv2.INTER_LANCZOS4) |
|
|
| |
| card_w = (canvas_w - (NSLOTS - 1) * CARD_GAP) // NSLOTS |
| card_h = CARD_PANEL_H |
|
|
| total_h = HEADER_H + src_h_s + th_s + card_h + 4 * PAD |
| canvas = np.full((total_h, canvas_w, 3), BG, dtype=np.uint8) |
|
|
| |
| info = result['rectify_info'] |
| cv2.putText(canvas, f"reading {result['reading']}", |
| (12, 32), FONT, 0.85, FG, 2) |
| cv2.putText(canvas, f"{result['gallons']:.1f} gallons", |
| (12, 58), FONT, 0.50, DIM, 1) |
| meta = (f"deskew {info['deskew_angle']:+.2f}deg | " |
| f"{info['n_windows']} windows | " |
| f"residual {info.get('mean_residual_px', 0):.2f}px") |
| (mw, _), _ = cv2.getTextSize(meta, FONT, 0.42, 1) |
| cv2.putText(canvas, meta, (canvas_w - mw - 12, 32), |
| FONT, 0.42, DIM, 1) |
|
|
| |
| y = HEADER_H |
| src_x = (canvas_w - src_w_s) // 2 |
| canvas[y:y + src_h_s, src_x:src_x + src_w_s] = src_scaled |
| y += src_h_s + PAD |
|
|
| |
| strip_x = (canvas_w - tw_s) // 2 |
| canvas[y:y + th_s, strip_x:strip_x + tw_s] = strip_scaled |
| y += th_s + PAD |
|
|
| |
| for i, ps in enumerate(result['per_slot']): |
| cx = i * (card_w + CARD_GAP) |
| _draw_card(canvas, cx, y, card_w, card_h, ps) |
|
|
| cv2.imwrite(str(out_path), canvas) |
| print(f"[render] wrote {out_path} ({canvas.shape[1]}Γ{canvas.shape[0]})") |
|
|
|
|
| |
| def pick_sample_image(cache_dir: Path | None = None) -> np.ndarray: |
| """Fetch the captures parquet, pick one clean frame, decode bytes |
| into a BGR numpy array. The dataset is self-contained in a single |
| parquet β JPEG bytes are inline β so this is one HTTP request and |
| no loose-file roulette.""" |
| kwargs = {'repo_id': DATASET_ID, 'repo_type': 'dataset'} |
| if cache_dir: |
| kwargs['cache_dir'] = str(cache_dir) |
| print(f"[hf] fetching {DATASET_ID}:captures.parquet") |
| parquet_path = hf_hub_download(filename='captures.parquet', **kwargs) |
| caps_df = pd.read_parquet(parquet_path) |
| |
| |
| cands = caps_df[caps_df['split'] == 'test'].sort_values( |
| 'n_slots_known', ascending=False).head(10) |
| if not len(cands): |
| cands = caps_df.head(10) |
| pick = cands.iloc[0] |
| print(f"[hf] picked capture_id={pick.get('capture_id')} " |
| f"captured_at={pick['captured_at']} " |
| f"n_known={pick['n_slots_known']}") |
| img = cv2.imdecode(np.frombuffer(pick['image_bytes'], np.uint8), |
| cv2.IMREAD_COLOR) |
| if img is None: |
| raise RuntimeError("failed to decode image_bytes for the chosen row") |
| return img |
|
|
|
|
| |
| def main(): |
| ap = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, |
| description=textwrap.dedent(__doc__)) |
| ap.add_argument('--image', default=None, |
| help='Path to a 1920Γ1080 BGR meter capture; default = pick one from the dataset') |
| ap.add_argument('--out', default=str(HERE / 'demo_output.jpg')) |
| ap.add_argument('--cache-dir', default=None) |
| ap.add_argument('--device', default='cuda' if torch.cuda.is_available() else 'cpu') |
| args = ap.parse_args() |
|
|
| if args.image: |
| img_path = Path(args.image) |
| img = cv2.imread(str(img_path)) |
| if img is None: |
| sys.exit(f"cannot read {img_path}") |
| print(f"[demo] loaded {img.shape[1]}Γ{img.shape[0]} from {img_path}") |
| else: |
| img = pick_sample_image( |
| Path(args.cache_dir) if args.cache_dir else None) |
| print(f"[demo] loaded {img.shape[1]}Γ{img.shape[0]} from HF dataset row") |
|
|
| t0 = time.time() |
| result = infer_full_frame(img, WEIGHTS, device=args.device) |
| print(f"[infer] reading={result['reading']} " |
| f"({result['gallons']:.1f} gal) in {(time.time()-t0)*1000:.0f} ms") |
| for ps in result['per_slot']: |
| bits = f"d{ps['slot']}={ps['digit']}" |
| if ps['theta_deg'] is not None: |
| bits += f" ΞΈ={ps['theta_deg']:6.2f}Β° p={ps['top1_prob']:.3f}" |
| bits += f" [{ps['source']}]" |
| print(" ", bits) |
|
|
| render_result(img, result, Path(args.out)) |
|
|
|
|
| if __name__ == '__main__': |
| main() |
|
|