"""Inspect annotations, generate masks, create train/val/test splits.""" import json import random from pathlib import Path import numpy as np from PIL import Image from pycocotools.coco import COCO from pycocotools import mask as mask_utils RAW_DIR = Path(__file__).resolve().parents[2] / "data" / "raw" PROCESSED_DIR = Path(__file__).resolve().parents[2] / "data" / "processed" SPLITS_DIR = Path(__file__).resolve().parents[2] / "data" / "splits" def inspect_dataset(coco_json_path: str) -> dict: """Check what annotation types exist in a COCO JSON file.""" with open(coco_json_path) as f: data = json.load(f) total = len(data.get("annotations", [])) has_seg = 0 has_bbox_only = 0 for ann in data.get("annotations", []): seg = ann.get("segmentation") if seg and isinstance(seg, list) and len(seg) > 0 and len(seg[0]) >= 6: has_seg += 1 elif seg and isinstance(seg, dict): # RLE format has_seg += 1 else: has_bbox_only += 1 return { "total_annotations": total, "total_images": len(data.get("images", [])), "has_segmentation": has_seg, "has_bbox_only": has_bbox_only, "annotation_type": "segmentation" if has_seg > has_bbox_only else "bbox_only", "categories": [c["name"] for c in data.get("categories", [])], } def render_masks_from_coco(coco_json_path: str, images_dir: str, output_dir: str) -> list[dict]: """Render binary masks from COCO polygon/RLE annotations. Returns list of {image_path, mask_path, image_id, width, height}. """ output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) coco = COCO(coco_json_path) records = [] for img_id in sorted(coco.getImgIds()): img_info = coco.loadImgs(img_id)[0] h, w = img_info["height"], img_info["width"] ann_ids = coco.getAnnIds(imgIds=img_id) anns = coco.loadAnns(ann_ids) if not anns: continue # Merge all annotations into one binary mask combined = np.zeros((h, w), dtype=np.uint8) for ann in anns: seg = ann.get("segmentation") # Skip annotations with empty or invalid segmentation if not seg: continue if isinstance(seg, list) and (len(seg) == 0 or (len(seg) > 0 and isinstance(seg[0], list) and len(seg[0]) < 6)): continue if isinstance(seg, list) and len(seg) > 0 and not isinstance(seg[0], list) and len(seg) < 6: continue try: rle = coco.annToRLE(ann) m = mask_utils.decode(rle) combined = np.maximum(combined, m) except (IndexError, ValueError): # Fall back to bbox if segmentation decode fails if "bbox" in ann: x, y, bw, bh = [int(v) for v in ann["bbox"]] combined[y:y+bh, x:x+bw] = 1 mask_img = Image.fromarray(combined * 255, mode="L") mask_name = Path(img_info["file_name"]).stem + "_mask.png" mask_path = output_dir / mask_name mask_img.save(mask_path) image_path = Path(images_dir) / img_info["file_name"] records.append({ "image_path": str(image_path), "mask_path": str(mask_path), "image_id": img_id, "width": w, "height": h, }) return records def render_masks_from_bboxes(coco_json_path: str, images_dir: str, output_dir: str) -> list[dict]: """Create filled-rectangle masks from bounding boxes (fallback when no segmentation).""" output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) with open(coco_json_path) as f: data = json.load(f) img_lookup = {img["id"]: img for img in data["images"]} anns_by_img: dict[int, list] = {} for ann in data["annotations"]: anns_by_img.setdefault(ann["image_id"], []).append(ann) records = [] for img_id, img_info in sorted(img_lookup.items()): anns = anns_by_img.get(img_id, []) if not anns: continue h, w = img_info["height"], img_info["width"] combined = np.zeros((h, w), dtype=np.uint8) for ann in anns: x, y, bw, bh = [int(v) for v in ann["bbox"]] combined[y:y+bh, x:x+bw] = 1 mask_img = Image.fromarray(combined * 255, mode="L") mask_name = Path(img_info["file_name"]).stem + "_mask.png" mask_path = output_dir / mask_name mask_img.save(mask_path) image_path = Path(images_dir) / img_info["file_name"] records.append({ "image_path": str(image_path), "mask_path": str(mask_path), "image_id": img_id, "width": w, "height": h, }) return records def find_coco_json(dataset_dir: Path) -> tuple[str, str] | None: """Find the COCO JSON and images directory in a Roboflow download.""" for split in ["train", "valid", "test"]: json_path = dataset_dir / split / "_annotations.coco.json" if json_path.exists(): return str(json_path), str(dataset_dir / split) # Single-folder layout for json_path in dataset_dir.rglob("_annotations.coco.json"): return str(json_path), str(json_path.parent) return None def process_dataset(name: str, dataset_dir: Path, prompt_synonyms: list[str]) -> list[dict]: """Process a single dataset: inspect, render masks, return records with prompts.""" records = [] mask_dir = PROCESSED_DIR / name / "masks" # Process each split folder (train/valid/test from Roboflow) for split_dir in sorted(dataset_dir.iterdir()): if not split_dir.is_dir(): continue json_path = split_dir / "_annotations.coco.json" if not json_path.exists(): continue print(f"\n Processing {name}/{split_dir.name}...") info = inspect_dataset(str(json_path)) print(f" Images: {info['total_images']}, Annotations: {info['total_annotations']}") print(f" Type: {info['annotation_type']}, Categories: {info['categories']}") split_mask_dir = mask_dir / split_dir.name if info["annotation_type"] == "segmentation": split_records = render_masks_from_coco( str(json_path), str(split_dir), str(split_mask_dir) ) else: print(f" WARNING: bbox-only annotations, using filled rectangles") split_records = render_masks_from_bboxes( str(json_path), str(split_dir), str(split_mask_dir) ) for r in split_records: r["dataset"] = name r["prompts"] = prompt_synonyms records.extend(split_records) return records def create_splits(records: list[dict], ratios: tuple = (0.70, 0.15, 0.15), seed: int = 42): """Split records into train/val/test, stratified by dataset.""" random.seed(seed) by_dataset: dict[str, list] = {} for r in records: by_dataset.setdefault(r["dataset"], []).append(r) train, val, test = [], [], [] for name, recs in by_dataset.items(): random.shuffle(recs) n = len(recs) n_train = int(n * ratios[0]) n_val = int(n * ratios[1]) train.extend(recs[:n_train]) val.extend(recs[n_train:n_train + n_val]) test.extend(recs[n_train + n_val:]) random.shuffle(train) random.shuffle(val) random.shuffle(test) SPLITS_DIR.mkdir(parents=True, exist_ok=True) for split_name, split_data in [("train", train), ("val", val), ("test", test)]: path = SPLITS_DIR / f"{split_name}.json" with open(path, "w") as f: json.dump(split_data, f, indent=2) print(f" {split_name}: {len(split_data)} samples -> {path}") return {"train": train, "val": val, "test": test} def run(config: dict): """Run full preprocessing pipeline.""" synonyms = config["data"]["prompt_synonyms"] ratios = tuple(config["data"]["split_ratios"]) all_records = [] for name in ["taping", "cracks"]: dataset_dir = RAW_DIR / name if not dataset_dir.exists(): print(f"WARNING: {dataset_dir} not found, skipping {name}") continue print(f"\n{'='*60}") print(f"Processing dataset: {name}") print(f"{'='*60}") records = process_dataset(name, dataset_dir, synonyms[name]) all_records.extend(records) print(f" Total records for {name}: {len(records)}") print(f"\n{'='*60}") print(f"Creating splits (total: {len(all_records)} records)") print(f"{'='*60}") splits = create_splits(all_records, ratios=ratios, seed=config["seed"]) return splits if __name__ == "__main__": import yaml config_path = Path(__file__).resolve().parents[2] / "configs" / "train_config.yaml" with open(config_path) as f: config = yaml.safe_load(f) run(config)