Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """Batch measurement script for calibration dataset. | |
| Runs measure_finger.py on all sample images × 3 fingers, | |
| collects results, and writes to CSV + JSON. | |
| """ | |
| import csv | |
| import json | |
| import os | |
| import subprocess | |
| import sys | |
| from pathlib import Path | |
| # Finger name mapping (Chinese → CLI arg) | |
| FINGER_MAP = { | |
| "食指": "index", | |
| "中指": "middle", | |
| "无名指": "ring", | |
| } | |
| # People to exclude (no ground truth) | |
| EXCLUDE = {"谢峰", "空白"} | |
| def run_measurement(image_path: str, finger: str, output_json: str) -> dict: | |
| """Run measure_finger.py and return parsed JSON result.""" | |
| cmd = [ | |
| sys.executable, "measure_finger.py", | |
| "--input", image_path, | |
| "--output", output_json, | |
| "--finger-index", finger, | |
| "--edge-method", "sobel", | |
| ] | |
| try: | |
| proc = subprocess.run( | |
| cmd, capture_output=True, text=True, timeout=120 | |
| ) | |
| if os.path.exists(output_json): | |
| with open(output_json) as f: | |
| return json.load(f) | |
| else: | |
| return {"fail_reason": f"no output file; stderr={proc.stderr[-200:]}"} | |
| except subprocess.TimeoutExpired: | |
| return {"fail_reason": "timeout"} | |
| except Exception as e: | |
| return {"fail_reason": str(e)} | |
| def load_ground_truth(csv_path: str) -> list[dict]: | |
| """Load ground truth CSV.""" | |
| rows = [] | |
| with open(csv_path, encoding="utf-8-sig") as f: | |
| reader = csv.DictReader(f) | |
| for row in reader: | |
| rows.append(row) | |
| return rows | |
| def main(): | |
| base_dir = Path(__file__).resolve().parent.parent | |
| os.chdir(base_dir) | |
| jpg_dir = base_dir / "input" / "sample" / "jpg" | |
| csv_path = base_dir / "input" / "sample" / "finger-size.csv" | |
| out_dir = base_dir / "output" / "batch" | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| # Load ground truth | |
| gt_rows = load_ground_truth(str(csv_path)) | |
| print(f"Loaded {len(gt_rows)} ground truth rows") | |
| # Build name→rows lookup | |
| gt_by_name = {} | |
| for row in gt_rows: | |
| name = row["姓名"] | |
| if name not in gt_by_name: | |
| gt_by_name[name] = {} | |
| finger_cn = row["手指"] | |
| gt_by_name[name][finger_cn] = row | |
| # Find all person images (exclude 谢峰, 空白) | |
| images = sorted([ | |
| f for f in jpg_dir.glob("*.jpg") | |
| if not any(ex in f.stem for ex in EXCLUDE) | |
| ]) | |
| print(f"Found {len(images)} images to process") | |
| all_results = [] | |
| total = len(images) * 3 # 3 fingers per image | |
| done = 0 | |
| for img_path in images: | |
| stem = img_path.stem # e.g. "S01A" | |
| person = stem[:-1] # e.g. "S01" | |
| shot = stem[-1] # e.g. "A" | |
| if person not in gt_by_name: | |
| print(f" SKIP {stem}: no ground truth for {person}") | |
| continue | |
| for finger_cn, finger_en in FINGER_MAP.items(): | |
| done += 1 | |
| gt_row = gt_by_name[person].get(finger_cn) | |
| gt_diameter = float(gt_row["直径(cm)"]) if gt_row else None | |
| gt_circumference = float(gt_row["周长(cm)"]) if gt_row else None | |
| gt_ring_size = gt_row.get("指环尺寸", "") if gt_row else "" | |
| out_json = str(out_dir / f"{stem}_{finger_en}.json") | |
| print(f"[{done}/{total}] {stem} / {finger_cn} ({finger_en})...", end=" ", flush=True) | |
| result = run_measurement(str(img_path), finger_en, out_json) | |
| cv_diameter = result.get("finger_outer_diameter_cm") | |
| cv_confidence = result.get("confidence") | |
| cv_scale = result.get("scale_px_per_cm") | |
| fail = result.get("fail_reason") | |
| if cv_diameter and gt_diameter: | |
| error = cv_diameter - gt_diameter | |
| pct = error / gt_diameter * 100 | |
| print(f"CV={cv_diameter:.3f} GT={gt_diameter:.3f} Δ={error:+.3f} ({pct:+.1f}%) scale={cv_scale}") | |
| elif fail: | |
| print(f"FAILED: {fail[:80]}") | |
| else: | |
| print(f"CV={cv_diameter} (no GT)") | |
| all_results.append({ | |
| "person": person, | |
| "shot": shot, | |
| "finger_cn": finger_cn, | |
| "finger_en": finger_en, | |
| "image": img_path.name, | |
| "gt_diameter_cm": gt_diameter, | |
| "gt_circumference_cm": gt_circumference, | |
| "gt_ring_size": gt_ring_size, | |
| "cv_diameter_cm": cv_diameter, | |
| "cv_confidence": cv_confidence, | |
| "cv_scale_px_per_cm": cv_scale, | |
| "fail_reason": fail, | |
| "edge_method": result.get("edge_method_used"), | |
| }) | |
| # Save full results JSON | |
| results_json = str(out_dir / "batch_results.json") | |
| with open(results_json, "w", encoding="utf-8") as f: | |
| json.dump(all_results, f, indent=2, ensure_ascii=False) | |
| print(f"\nSaved {len(all_results)} results to {results_json}") | |
| # Save summary CSV | |
| results_csv = str(out_dir / "batch_results.csv") | |
| if all_results: | |
| keys = all_results[0].keys() | |
| with open(results_csv, "w", encoding="utf-8", newline="") as f: | |
| writer = csv.DictWriter(f, fieldnames=keys) | |
| writer.writeheader() | |
| writer.writerows(all_results) | |
| print(f"Saved CSV to {results_csv}") | |
| # Quick stats | |
| valid = [r for r in all_results if r["cv_diameter_cm"] and r["gt_diameter_cm"]] | |
| failed = [r for r in all_results if r["fail_reason"]] | |
| if valid: | |
| errors = [r["cv_diameter_cm"] - r["gt_diameter_cm"] for r in valid] | |
| mean_err = sum(errors) / len(errors) | |
| scales = [r["cv_scale_px_per_cm"] for r in valid if r["cv_scale_px_per_cm"]] | |
| mean_scale = sum(scales) / len(scales) if scales else 0 | |
| print(f"\n--- Quick Stats ---") | |
| print(f"Valid measurements: {len(valid)}/{len(all_results)}") | |
| print(f"Failed: {len(failed)}") | |
| print(f"Mean error (CV - GT): {mean_err:+.4f} cm") | |
| print(f"Mean scale: {mean_scale:.2f} px/cm") | |
| print(f"Scale range: {min(scales):.2f} - {max(scales):.2f} px/cm") | |
| if __name__ == "__main__": | |
| main() | |