| """Inspect annotations, generate masks, create train/val/test splits.""" |
|
|
| import json |
| import random |
| from pathlib import Path |
|
|
| import numpy as np |
| from PIL import Image |
| from pycocotools.coco import COCO |
| from pycocotools import mask as mask_utils |
|
|
| RAW_DIR = Path(__file__).resolve().parents[2] / "data" / "raw" |
| PROCESSED_DIR = Path(__file__).resolve().parents[2] / "data" / "processed" |
| SPLITS_DIR = Path(__file__).resolve().parents[2] / "data" / "splits" |
|
|
|
|
| def inspect_dataset(coco_json_path: str) -> dict: |
| """Check what annotation types exist in a COCO JSON file.""" |
| with open(coco_json_path) as f: |
| data = json.load(f) |
|
|
| total = len(data.get("annotations", [])) |
| has_seg = 0 |
| has_bbox_only = 0 |
|
|
| for ann in data.get("annotations", []): |
| seg = ann.get("segmentation") |
| if seg and isinstance(seg, list) and len(seg) > 0 and len(seg[0]) >= 6: |
| has_seg += 1 |
| elif seg and isinstance(seg, dict): |
| has_seg += 1 |
| else: |
| has_bbox_only += 1 |
|
|
| return { |
| "total_annotations": total, |
| "total_images": len(data.get("images", [])), |
| "has_segmentation": has_seg, |
| "has_bbox_only": has_bbox_only, |
| "annotation_type": "segmentation" if has_seg > has_bbox_only else "bbox_only", |
| "categories": [c["name"] for c in data.get("categories", [])], |
| } |
|
|
|
|
| def render_masks_from_coco(coco_json_path: str, images_dir: str, output_dir: str) -> list[dict]: |
| """Render binary masks from COCO polygon/RLE annotations. |
| |
| Returns list of {image_path, mask_path, image_id, width, height}. |
| """ |
| output_dir = Path(output_dir) |
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| coco = COCO(coco_json_path) |
| records = [] |
|
|
| for img_id in sorted(coco.getImgIds()): |
| img_info = coco.loadImgs(img_id)[0] |
| h, w = img_info["height"], img_info["width"] |
|
|
| ann_ids = coco.getAnnIds(imgIds=img_id) |
| anns = coco.loadAnns(ann_ids) |
|
|
| if not anns: |
| continue |
|
|
| |
| combined = np.zeros((h, w), dtype=np.uint8) |
| for ann in anns: |
| seg = ann.get("segmentation") |
| |
| if not seg: |
| continue |
| if isinstance(seg, list) and (len(seg) == 0 or (len(seg) > 0 and isinstance(seg[0], list) and len(seg[0]) < 6)): |
| continue |
| if isinstance(seg, list) and len(seg) > 0 and not isinstance(seg[0], list) and len(seg) < 6: |
| continue |
| try: |
| rle = coco.annToRLE(ann) |
| m = mask_utils.decode(rle) |
| combined = np.maximum(combined, m) |
| except (IndexError, ValueError): |
| |
| if "bbox" in ann: |
| x, y, bw, bh = [int(v) for v in ann["bbox"]] |
| combined[y:y+bh, x:x+bw] = 1 |
|
|
| mask_img = Image.fromarray(combined * 255, mode="L") |
| mask_name = Path(img_info["file_name"]).stem + "_mask.png" |
| mask_path = output_dir / mask_name |
| mask_img.save(mask_path) |
|
|
| image_path = Path(images_dir) / img_info["file_name"] |
| records.append({ |
| "image_path": str(image_path), |
| "mask_path": str(mask_path), |
| "image_id": img_id, |
| "width": w, |
| "height": h, |
| }) |
|
|
| return records |
|
|
|
|
| def render_masks_from_bboxes(coco_json_path: str, images_dir: str, output_dir: str) -> list[dict]: |
| """Create filled-rectangle masks from bounding boxes (fallback when no segmentation).""" |
| output_dir = Path(output_dir) |
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| with open(coco_json_path) as f: |
| data = json.load(f) |
|
|
| img_lookup = {img["id"]: img for img in data["images"]} |
| anns_by_img: dict[int, list] = {} |
| for ann in data["annotations"]: |
| anns_by_img.setdefault(ann["image_id"], []).append(ann) |
|
|
| records = [] |
| for img_id, img_info in sorted(img_lookup.items()): |
| anns = anns_by_img.get(img_id, []) |
| if not anns: |
| continue |
|
|
| h, w = img_info["height"], img_info["width"] |
| combined = np.zeros((h, w), dtype=np.uint8) |
|
|
| for ann in anns: |
| x, y, bw, bh = [int(v) for v in ann["bbox"]] |
| combined[y:y+bh, x:x+bw] = 1 |
|
|
| mask_img = Image.fromarray(combined * 255, mode="L") |
| mask_name = Path(img_info["file_name"]).stem + "_mask.png" |
| mask_path = output_dir / mask_name |
| mask_img.save(mask_path) |
|
|
| image_path = Path(images_dir) / img_info["file_name"] |
| records.append({ |
| "image_path": str(image_path), |
| "mask_path": str(mask_path), |
| "image_id": img_id, |
| "width": w, |
| "height": h, |
| }) |
|
|
| return records |
|
|
|
|
| def find_coco_json(dataset_dir: Path) -> tuple[str, str] | None: |
| """Find the COCO JSON and images directory in a Roboflow download.""" |
| for split in ["train", "valid", "test"]: |
| json_path = dataset_dir / split / "_annotations.coco.json" |
| if json_path.exists(): |
| return str(json_path), str(dataset_dir / split) |
| |
| for json_path in dataset_dir.rglob("_annotations.coco.json"): |
| return str(json_path), str(json_path.parent) |
| return None |
|
|
|
|
| def process_dataset(name: str, dataset_dir: Path, prompt_synonyms: list[str]) -> list[dict]: |
| """Process a single dataset: inspect, render masks, return records with prompts.""" |
| records = [] |
| mask_dir = PROCESSED_DIR / name / "masks" |
|
|
| |
| for split_dir in sorted(dataset_dir.iterdir()): |
| if not split_dir.is_dir(): |
| continue |
| json_path = split_dir / "_annotations.coco.json" |
| if not json_path.exists(): |
| continue |
|
|
| print(f"\n Processing {name}/{split_dir.name}...") |
| info = inspect_dataset(str(json_path)) |
| print(f" Images: {info['total_images']}, Annotations: {info['total_annotations']}") |
| print(f" Type: {info['annotation_type']}, Categories: {info['categories']}") |
|
|
| split_mask_dir = mask_dir / split_dir.name |
| if info["annotation_type"] == "segmentation": |
| split_records = render_masks_from_coco( |
| str(json_path), str(split_dir), str(split_mask_dir) |
| ) |
| else: |
| print(f" WARNING: bbox-only annotations, using filled rectangles") |
| split_records = render_masks_from_bboxes( |
| str(json_path), str(split_dir), str(split_mask_dir) |
| ) |
|
|
| for r in split_records: |
| r["dataset"] = name |
| r["prompts"] = prompt_synonyms |
| records.extend(split_records) |
|
|
| return records |
|
|
|
|
| def create_splits(records: list[dict], ratios: tuple = (0.70, 0.15, 0.15), seed: int = 42): |
| """Split records into train/val/test, stratified by dataset.""" |
| random.seed(seed) |
|
|
| by_dataset: dict[str, list] = {} |
| for r in records: |
| by_dataset.setdefault(r["dataset"], []).append(r) |
|
|
| train, val, test = [], [], [] |
| for name, recs in by_dataset.items(): |
| random.shuffle(recs) |
| n = len(recs) |
| n_train = int(n * ratios[0]) |
| n_val = int(n * ratios[1]) |
| train.extend(recs[:n_train]) |
| val.extend(recs[n_train:n_train + n_val]) |
| test.extend(recs[n_train + n_val:]) |
|
|
| random.shuffle(train) |
| random.shuffle(val) |
| random.shuffle(test) |
|
|
| SPLITS_DIR.mkdir(parents=True, exist_ok=True) |
| for split_name, split_data in [("train", train), ("val", val), ("test", test)]: |
| path = SPLITS_DIR / f"{split_name}.json" |
| with open(path, "w") as f: |
| json.dump(split_data, f, indent=2) |
| print(f" {split_name}: {len(split_data)} samples -> {path}") |
|
|
| return {"train": train, "val": val, "test": test} |
|
|
|
|
| def run(config: dict): |
| """Run full preprocessing pipeline.""" |
| synonyms = config["data"]["prompt_synonyms"] |
| ratios = tuple(config["data"]["split_ratios"]) |
|
|
| all_records = [] |
| for name in ["taping", "cracks"]: |
| dataset_dir = RAW_DIR / name |
| if not dataset_dir.exists(): |
| print(f"WARNING: {dataset_dir} not found, skipping {name}") |
| continue |
| print(f"\n{'='*60}") |
| print(f"Processing dataset: {name}") |
| print(f"{'='*60}") |
| records = process_dataset(name, dataset_dir, synonyms[name]) |
| all_records.extend(records) |
| print(f" Total records for {name}: {len(records)}") |
|
|
| print(f"\n{'='*60}") |
| print(f"Creating splits (total: {len(all_records)} records)") |
| print(f"{'='*60}") |
| splits = create_splits(all_records, ratios=ratios, seed=config["seed"]) |
| return splits |
|
|
|
|
| if __name__ == "__main__": |
| import yaml |
| config_path = Path(__file__).resolve().parents[2] / "configs" / "train_config.yaml" |
| with open(config_path) as f: |
| config = yaml.safe_load(f) |
| run(config) |
|
|