#!/usr/bin/env python3 """Analysis & regression script for calibration dataset. Performs: 1. px/cm stability analysis 2. A vs B repeatability 3. Ground truth sanity check (π×diameter vs circumference) 4. Scatter plot & bias analysis 5. Linear regression with leave-one-person-out cross-validation """ import json import math import os from pathlib import Path import numpy as np # Optional: matplotlib for plots (skip gracefully if missing) try: import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt HAS_PLT = True except ImportError: HAS_PLT = False print("Warning: matplotlib not available, skipping plots") def load_results(path: str) -> list[dict]: with open(path, encoding="utf-8") as f: return json.load(f) def section(title: str): print(f"\n{'='*60}") print(f" {title}") print(f"{'='*60}") def analyze_scale_stability(results: list[dict]): """Phase 3a: px/cm stability across all images.""" section("1. Card Detection / px/cm Stability") scales = {} for r in results: img = r["image"] if img not in scales and r["cv_scale_px_per_cm"]: scales[img] = r["cv_scale_px_per_cm"] vals = list(scales.values()) mean_s = np.mean(vals) std_s = np.std(vals) cv_pct = (std_s / mean_s) * 100 print(f" Images analyzed: {len(scales)}") print(f" Mean px/cm: {mean_s:.2f}") print(f" Std px/cm: {std_s:.2f}") print(f" CV%: {cv_pct:.2f}%") print(f" Range: {min(vals):.2f} — {max(vals):.2f}") print(f" Max spread: {max(vals)-min(vals):.2f} px/cm ({(max(vals)-min(vals))/mean_s*100:.2f}%)") # Per-image table print(f"\n {'Image':<16} {'px/cm':>8} {'Δ from mean':>12}") print(f" {'-'*38}") for img, s in sorted(scales.items()): print(f" {img:<16} {s:>8.2f} {s-mean_s:>+12.2f}") return mean_s, std_s def analyze_repeatability(results: list[dict]): """Phase 3b: A vs B repeatability.""" section("2. A vs B Repeatability") # Group by (person, finger) pairs = {} for r in results: key = (r["person"], r["finger_en"]) if key not in pairs: pairs[key] = {} pairs[key][r["shot"]] = r["cv_diameter_cm"] diffs = [] print(f" {'Person':<10} {'Finger':<8} {'Shot A':>8} {'Shot B':>8} {'Δ(B-A)':>8} {'%diff':>7}") print(f" {'-'*53}") for (person, finger), shots in sorted(pairs.items()): a = shots.get("A") b = shots.get("B") if a and b: d = b - a pct = abs(d) / ((a + b) / 2) * 100 diffs.append(abs(d)) print(f" {person:<10} {finger:<8} {a:>8.3f} {b:>8.3f} {d:>+8.3f} {pct:>6.1f}%") if diffs: print(f"\n Mean |A-B| difference: {np.mean(diffs):.4f} cm") print(f" Max |A-B| difference: {max(diffs):.4f} cm") print(f" Std |A-B| difference: {np.std(diffs):.4f} cm") print(f" 95th percentile: {np.percentile(diffs, 95):.4f} cm") return diffs def check_ground_truth_sanity(results: list[dict]): """Phase 3c: Check π×diameter ≈ circumference.""" section("3. Ground Truth Sanity (π×diameter vs circumference)") seen = set() diffs = [] print(f" {'Person':<10} {'Finger':<6} {'Diam':>6} {'Circ':>6} {'π×D':>6} {'Δ':>7} {'%err':>6}") print(f" {'-'*55}") for r in results: key = (r["person"], r["finger_cn"]) if key in seen: continue seen.add(key) d = r["gt_diameter_cm"] c = r["gt_circumference_cm"] if d and c: pi_d = math.pi * d diff = c - pi_d pct = diff / c * 100 diffs.append(diff) print(f" {r['person']:<10} {r['finger_cn']:<6} {d:>6.2f} {c:>6.1f} {pi_d:>6.2f} {diff:>+7.2f} {pct:>+5.1f}%") if diffs: print(f"\n Mean (circ - π×diam): {np.mean(diffs):+.3f} cm") print(f" This is expected: circumference > π×diameter because") print(f" fingers are not perfect circles (slightly oval/flattened).") def bias_analysis(results: list[dict]): """Phase 4a: Scatter plot and bias analysis.""" section("4. Accuracy & Bias Analysis") valid = [r for r in results if r["cv_diameter_cm"] and r["gt_diameter_cm"]] cv = np.array([r["cv_diameter_cm"] for r in valid]) gt = np.array([r["gt_diameter_cm"] for r in valid]) errors = cv - gt pct_errors = errors / gt * 100 print(f" N = {len(valid)} measurements") print(f" Mean error (CV-GT): {np.mean(errors):+.4f} cm") print(f" Median error: {np.median(errors):+.4f} cm") print(f" Std of error: {np.std(errors):.4f} cm") print(f" Mean % error: {np.mean(pct_errors):+.1f}%") print(f" MAE (absolute): {np.mean(np.abs(errors)):.4f} cm") print(f" Max error: {np.max(np.abs(errors)):.4f} cm") print(f" RMSE: {np.sqrt(np.mean(errors**2)):.4f} cm") # Correlation corr = np.corrcoef(cv, gt)[0, 1] print(f" Pearson r: {corr:.4f}") print(f" R²: {corr**2:.4f}") return cv, gt, errors def linear_regression(cv: np.ndarray, gt: np.ndarray, results: list[dict]): """Phase 4b: OLS regression + leave-one-person-out CV.""" section("5. Linear Regression Calibration") # Fit: gt = a * cv + b A = np.vstack([cv, np.ones(len(cv))]).T (a, b), residuals, _, _ = np.linalg.lstsq(A, gt, rcond=None) calibrated = a * cv + b cal_errors = calibrated - gt print(f" Model: actual = {a:.4f} × measured + {b:.4f}") print(f" (i.e., slope={a:.4f}, intercept={b:.4f})") print(f"\n After calibration:") print(f" Mean error: {np.mean(cal_errors):+.4f} cm") print(f" MAE: {np.mean(np.abs(cal_errors)):.4f} cm") print(f" Max error: {np.max(np.abs(cal_errors)):.4f} cm") print(f" RMSE: {np.sqrt(np.mean(cal_errors**2)):.4f} cm") # Leave-one-person-out cross-validation section("6. Leave-One-Person-Out Cross-Validation") persons = sorted(set(r["person"] for r in results if r["cv_diameter_cm"] and r["gt_diameter_cm"])) all_cv_errors = [] all_cv_cal_errors = [] print(f" {'Person':<10} {'N':>3} {'a':>7} {'b':>7} {'MAE_raw':>8} {'MAE_cal':>8} {'Max_cal':>8}") print(f" {'-'*57}") for holdout in persons: # Train on all except holdout train = [r for r in results if r["cv_diameter_cm"] and r["gt_diameter_cm"] and r["person"] != holdout] test = [r for r in results if r["cv_diameter_cm"] and r["gt_diameter_cm"] and r["person"] == holdout] train_cv = np.array([r["cv_diameter_cm"] for r in train]) train_gt = np.array([r["gt_diameter_cm"] for r in train]) test_cv = np.array([r["cv_diameter_cm"] for r in test]) test_gt = np.array([r["gt_diameter_cm"] for r in test]) A_train = np.vstack([train_cv, np.ones(len(train_cv))]).T (a_fold, b_fold), _, _, _ = np.linalg.lstsq(A_train, train_gt, rcond=None) test_cal = a_fold * test_cv + b_fold raw_errors = np.abs(test_cv - test_gt) cal_errors_fold = np.abs(test_cal - test_gt) all_cv_errors.extend(raw_errors.tolist()) all_cv_cal_errors.extend(cal_errors_fold.tolist()) print(f" {holdout:<10} {len(test):>3} {a_fold:>7.4f} {b_fold:>+7.4f} " f"{np.mean(raw_errors):>8.4f} {np.mean(cal_errors_fold):>8.4f} " f"{np.max(cal_errors_fold):>8.4f}") all_cv_errors = np.array(all_cv_errors) all_cv_cal_errors = np.array(all_cv_cal_errors) print(f"\n Cross-validated results (all holdout predictions):") print(f" Raw MAE: {np.mean(all_cv_errors):.4f} cm") print(f" Cal MAE: {np.mean(all_cv_cal_errors):.4f} cm") print(f" Raw RMSE: {np.sqrt(np.mean(all_cv_errors**2)):.4f} cm") print(f" Cal RMSE: {np.sqrt(np.mean(all_cv_cal_errors**2)):.4f} cm") print(f" Improvement: {(1 - np.mean(all_cv_cal_errors)/np.mean(all_cv_errors))*100:.1f}% reduction in MAE") return a, b def generate_plots(cv: np.ndarray, gt: np.ndarray, a: float, b: float, out_dir: str): """Generate scatter plot and residual plot.""" if not HAS_PLT: return section("7. Generating Plots") fig, axes = plt.subplots(1, 3, figsize=(16, 5)) # 1. Scatter: CV vs GT with regression line ax = axes[0] ax.scatter(cv, gt, alpha=0.6, s=30, label="Measurements") lim = [min(cv.min(), gt.min()) - 0.1, max(cv.max(), gt.max()) + 0.1] ax.plot(lim, lim, "k--", alpha=0.3, label="y=x (perfect)") x_fit = np.linspace(lim[0], lim[1], 100) ax.plot(x_fit, a * x_fit + b, "r-", linewidth=2, label=f"Fit: y={a:.3f}x{b:+.3f}") ax.set_xlabel("CV Measured Diameter (cm)") ax.set_ylabel("Actual Diameter (cm)") ax.set_title("CV Measured vs Actual (Caliper)") ax.legend(fontsize=8) ax.set_aspect("equal") ax.grid(True, alpha=0.3) # 2. Error distribution ax = axes[1] errors = cv - gt cal_errors = (a * cv + b) - gt ax.hist(errors, bins=15, alpha=0.5, label=f"Raw (μ={np.mean(errors):+.3f})") ax.hist(cal_errors, bins=15, alpha=0.5, label=f"Calibrated (μ={np.mean(cal_errors):+.3f})") ax.axvline(0, color="k", linestyle="--", alpha=0.3) ax.set_xlabel("Error (cm)") ax.set_ylabel("Count") ax.set_title("Error Distribution: Before vs After Calibration") ax.legend(fontsize=8) ax.grid(True, alpha=0.3) # 3. Residuals vs predicted ax = axes[2] calibrated = a * cv + b ax.scatter(calibrated, cal_errors, alpha=0.6, s=30) ax.axhline(0, color="k", linestyle="--", alpha=0.3) ax.set_xlabel("Calibrated Diameter (cm)") ax.set_ylabel("Residual (cm)") ax.set_title("Residuals After Calibration") ax.grid(True, alpha=0.3) plt.tight_layout() plot_path = os.path.join(out_dir, "calibration_analysis.png") plt.savefig(plot_path, dpi=150) print(f" Saved plot: {plot_path}") plt.close() def main(): base_dir = Path(__file__).resolve().parent.parent results_path = base_dir / "output" / "batch" / "batch_results.json" out_dir = str(base_dir / "output" / "batch") results = load_results(str(results_path)) print(f"Loaded {len(results)} results") # Phase 3 analyze_scale_stability(results) analyze_repeatability(results) check_ground_truth_sanity(results) # Phase 4 cv, gt, errors = bias_analysis(results) a, b = linear_regression(cv, gt, results) generate_plots(cv, gt, a, b, out_dir) # Summary section("SUMMARY — Calibration Coefficients") print(f" actual_diameter = {a:.6f} × measured_diameter + ({b:.6f})") print(f" slope (a) = {a:.6f}") print(f" offset (b) = {b:.6f}") print(f"\n Save these to the pipeline configuration.") # Write coefficients to file coeff_path = os.path.join(out_dir, "calibration_coefficients.json") with open(coeff_path, "w") as f: json.dump({ "slope": round(a, 6), "intercept": round(b, 6), "description": "actual = slope * measured + intercept", "n_samples": len(cv), "dataset": "input/sample (10 people × 3 fingers × 2 shots)", }, f, indent=2) print(f" Saved coefficients: {coeff_path}") if __name__ == "__main__": main()