""" Inference script: detect immunogold particles in new images. Usage: python predict.py --image path/to/image.tif --checkpoint checkpoints/fold_S1_seed42/phase3_best.pth python predict.py --fold S1 --checkpoint checkpoints/fold_S1_seed42/phase3_best.pth --config config/config.yaml """ import argparse from pathlib import Path import numpy as np import torch import yaml from src.heatmap import extract_peaks from src.model import ImmunogoldCenterNet from src.postprocess import apply_structural_mask_filter, cross_class_nms from src.preprocessing import load_image, load_mask from src.ensemble import sliding_window_inference, d4_tta_predict from src.visualize import overlay_annotations def parse_args(): parser = argparse.ArgumentParser(description="Predict immunogold particles") parser.add_argument("--image", type=str, help="Path to single image") parser.add_argument("--mask", type=str, help="Path to mask (optional)") parser.add_argument("--fold", type=str, help="Fold synapse ID for evaluation") parser.add_argument("--checkpoint", type=str, required=True, help="Path to model checkpoint") parser.add_argument("--config", type=str, default="config/config.yaml") parser.add_argument("--device", type=str, default="auto") parser.add_argument("--tta", action="store_true", help="Enable D4 TTA") parser.add_argument("--conf-threshold", type=float, default=0.3) parser.add_argument("--output-dir", type=str, default="results/predictions") return parser.parse_args() def main(): args = parse_args() with open(args.config) as f: cfg = yaml.safe_load(f) device = torch.device( "cuda" if args.device == "auto" and torch.cuda.is_available() else args.device if args.device != "auto" else "cpu" ) # Load model model = ImmunogoldCenterNet( bifpn_channels=cfg["model"]["bifpn_channels"], bifpn_rounds=cfg["model"]["bifpn_rounds"], num_classes=cfg["model"]["num_classes"], ) ckpt = torch.load(args.checkpoint, map_location="cpu", weights_only=False) model.load_state_dict(ckpt["model_state_dict"]) model.to(device) model.eval() print(f"Loaded checkpoint from epoch {ckpt.get('epoch', '?')}, " f"val_f1={ckpt.get('val_f1_mean', '?')}") # Load image if args.fold: from src.preprocessing import discover_synapse_data, load_synapse records = discover_synapse_data(cfg["data"]["root"], cfg["data"]["synapse_ids"]) record = [r for r in records if r.synapse_id == args.fold][0] data = load_synapse(record) image = data["image"] preprocessed = data["image"] mask = data["mask"] annotations = data["annotations"] name = args.fold else: image = load_image(Path(args.image)) preprocessed = image mask = load_mask(Path(args.mask)) if args.mask else None annotations = {"6nm": np.empty((0, 2)), "12nm": np.empty((0, 2))} name = Path(args.image).stem # Inference if args.tta: print("Running D4 TTA inference...") heatmap_np, offset_np = d4_tta_predict(model, preprocessed, device) else: print("Running sliding window inference...") heatmap_np, offset_np = sliding_window_inference( model, preprocessed, patch_size=cfg["data"]["patch_size"], device=device, ) # Extract detections heatmap_t = torch.from_numpy(heatmap_np) offset_t = torch.from_numpy(offset_np) detections = extract_peaks( heatmap_t, offset_t, stride=cfg["data"]["stride"], conf_threshold=args.conf_threshold, nms_kernel_sizes=cfg["postprocessing"]["nms_kernel_size"], ) # Post-processing if mask is not None: detections = apply_structural_mask_filter( detections, mask, margin_px=cfg["postprocessing"]["mask_filter_margin_px"], ) detections = cross_class_nms( detections, cfg["postprocessing"]["cross_class_nms_distance_px"], ) # Print results n_6nm = sum(1 for d in detections if d["class"] == "6nm") n_12nm = sum(1 for d in detections if d["class"] == "12nm") print(f"\nDetections: {n_6nm} 6nm, {n_12nm} 12nm ({len(detections)} total)") # Evaluate if GT available if annotations and (len(annotations["6nm"]) > 0 or len(annotations["12nm"]) > 0): from src.evaluate import match_detections_to_gt results = match_detections_to_gt( detections, annotations["6nm"], annotations["12nm"], {k: float(v) for k, v in cfg["evaluation"]["match_radii_px"].items()}, ) for cls in ["6nm", "12nm", "overall"]: r = results[cls] print(f" {cls}: F1={r['f1']:.3f}, P={r['precision']:.3f}, " f"R={r['recall']:.3f} (TP={r['tp']}, FP={r['fp']}, FN={r['fn']})") # Save visualization output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) overlay_annotations( image, annotations, title=f"{name} — {n_6nm} 6nm, {n_12nm} 12nm detected", save_path=output_dir / f"{name}_predictions.png", predictions=detections, ) print(f"Saved overlay to {output_dir / f'{name}_predictions.png'}") if __name__ == "__main__": main()