drywall-qa-clipseg / src /data /preprocess.py
youngPhilosopher's picture
Upload folder using huggingface_hub
b891e61 verified
"""Inspect annotations, generate masks, create train/val/test splits."""
import json
import random
from pathlib import Path
import numpy as np
from PIL import Image
from pycocotools.coco import COCO
from pycocotools import mask as mask_utils
RAW_DIR = Path(__file__).resolve().parents[2] / "data" / "raw"
PROCESSED_DIR = Path(__file__).resolve().parents[2] / "data" / "processed"
SPLITS_DIR = Path(__file__).resolve().parents[2] / "data" / "splits"
def inspect_dataset(coco_json_path: str) -> dict:
"""Check what annotation types exist in a COCO JSON file."""
with open(coco_json_path) as f:
data = json.load(f)
total = len(data.get("annotations", []))
has_seg = 0
has_bbox_only = 0
for ann in data.get("annotations", []):
seg = ann.get("segmentation")
if seg and isinstance(seg, list) and len(seg) > 0 and len(seg[0]) >= 6:
has_seg += 1
elif seg and isinstance(seg, dict): # RLE format
has_seg += 1
else:
has_bbox_only += 1
return {
"total_annotations": total,
"total_images": len(data.get("images", [])),
"has_segmentation": has_seg,
"has_bbox_only": has_bbox_only,
"annotation_type": "segmentation" if has_seg > has_bbox_only else "bbox_only",
"categories": [c["name"] for c in data.get("categories", [])],
}
def render_masks_from_coco(coco_json_path: str, images_dir: str, output_dir: str) -> list[dict]:
"""Render binary masks from COCO polygon/RLE annotations.
Returns list of {image_path, mask_path, image_id, width, height}.
"""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
coco = COCO(coco_json_path)
records = []
for img_id in sorted(coco.getImgIds()):
img_info = coco.loadImgs(img_id)[0]
h, w = img_info["height"], img_info["width"]
ann_ids = coco.getAnnIds(imgIds=img_id)
anns = coco.loadAnns(ann_ids)
if not anns:
continue
# Merge all annotations into one binary mask
combined = np.zeros((h, w), dtype=np.uint8)
for ann in anns:
seg = ann.get("segmentation")
# Skip annotations with empty or invalid segmentation
if not seg:
continue
if isinstance(seg, list) and (len(seg) == 0 or (len(seg) > 0 and isinstance(seg[0], list) and len(seg[0]) < 6)):
continue
if isinstance(seg, list) and len(seg) > 0 and not isinstance(seg[0], list) and len(seg) < 6:
continue
try:
rle = coco.annToRLE(ann)
m = mask_utils.decode(rle)
combined = np.maximum(combined, m)
except (IndexError, ValueError):
# Fall back to bbox if segmentation decode fails
if "bbox" in ann:
x, y, bw, bh = [int(v) for v in ann["bbox"]]
combined[y:y+bh, x:x+bw] = 1
mask_img = Image.fromarray(combined * 255, mode="L")
mask_name = Path(img_info["file_name"]).stem + "_mask.png"
mask_path = output_dir / mask_name
mask_img.save(mask_path)
image_path = Path(images_dir) / img_info["file_name"]
records.append({
"image_path": str(image_path),
"mask_path": str(mask_path),
"image_id": img_id,
"width": w,
"height": h,
})
return records
def render_masks_from_bboxes(coco_json_path: str, images_dir: str, output_dir: str) -> list[dict]:
"""Create filled-rectangle masks from bounding boxes (fallback when no segmentation)."""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
with open(coco_json_path) as f:
data = json.load(f)
img_lookup = {img["id"]: img for img in data["images"]}
anns_by_img: dict[int, list] = {}
for ann in data["annotations"]:
anns_by_img.setdefault(ann["image_id"], []).append(ann)
records = []
for img_id, img_info in sorted(img_lookup.items()):
anns = anns_by_img.get(img_id, [])
if not anns:
continue
h, w = img_info["height"], img_info["width"]
combined = np.zeros((h, w), dtype=np.uint8)
for ann in anns:
x, y, bw, bh = [int(v) for v in ann["bbox"]]
combined[y:y+bh, x:x+bw] = 1
mask_img = Image.fromarray(combined * 255, mode="L")
mask_name = Path(img_info["file_name"]).stem + "_mask.png"
mask_path = output_dir / mask_name
mask_img.save(mask_path)
image_path = Path(images_dir) / img_info["file_name"]
records.append({
"image_path": str(image_path),
"mask_path": str(mask_path),
"image_id": img_id,
"width": w,
"height": h,
})
return records
def find_coco_json(dataset_dir: Path) -> tuple[str, str] | None:
"""Find the COCO JSON and images directory in a Roboflow download."""
for split in ["train", "valid", "test"]:
json_path = dataset_dir / split / "_annotations.coco.json"
if json_path.exists():
return str(json_path), str(dataset_dir / split)
# Single-folder layout
for json_path in dataset_dir.rglob("_annotations.coco.json"):
return str(json_path), str(json_path.parent)
return None
def process_dataset(name: str, dataset_dir: Path, prompt_synonyms: list[str]) -> list[dict]:
"""Process a single dataset: inspect, render masks, return records with prompts."""
records = []
mask_dir = PROCESSED_DIR / name / "masks"
# Process each split folder (train/valid/test from Roboflow)
for split_dir in sorted(dataset_dir.iterdir()):
if not split_dir.is_dir():
continue
json_path = split_dir / "_annotations.coco.json"
if not json_path.exists():
continue
print(f"\n Processing {name}/{split_dir.name}...")
info = inspect_dataset(str(json_path))
print(f" Images: {info['total_images']}, Annotations: {info['total_annotations']}")
print(f" Type: {info['annotation_type']}, Categories: {info['categories']}")
split_mask_dir = mask_dir / split_dir.name
if info["annotation_type"] == "segmentation":
split_records = render_masks_from_coco(
str(json_path), str(split_dir), str(split_mask_dir)
)
else:
print(f" WARNING: bbox-only annotations, using filled rectangles")
split_records = render_masks_from_bboxes(
str(json_path), str(split_dir), str(split_mask_dir)
)
for r in split_records:
r["dataset"] = name
r["prompts"] = prompt_synonyms
records.extend(split_records)
return records
def create_splits(records: list[dict], ratios: tuple = (0.70, 0.15, 0.15), seed: int = 42):
"""Split records into train/val/test, stratified by dataset."""
random.seed(seed)
by_dataset: dict[str, list] = {}
for r in records:
by_dataset.setdefault(r["dataset"], []).append(r)
train, val, test = [], [], []
for name, recs in by_dataset.items():
random.shuffle(recs)
n = len(recs)
n_train = int(n * ratios[0])
n_val = int(n * ratios[1])
train.extend(recs[:n_train])
val.extend(recs[n_train:n_train + n_val])
test.extend(recs[n_train + n_val:])
random.shuffle(train)
random.shuffle(val)
random.shuffle(test)
SPLITS_DIR.mkdir(parents=True, exist_ok=True)
for split_name, split_data in [("train", train), ("val", val), ("test", test)]:
path = SPLITS_DIR / f"{split_name}.json"
with open(path, "w") as f:
json.dump(split_data, f, indent=2)
print(f" {split_name}: {len(split_data)} samples -> {path}")
return {"train": train, "val": val, "test": test}
def run(config: dict):
"""Run full preprocessing pipeline."""
synonyms = config["data"]["prompt_synonyms"]
ratios = tuple(config["data"]["split_ratios"])
all_records = []
for name in ["taping", "cracks"]:
dataset_dir = RAW_DIR / name
if not dataset_dir.exists():
print(f"WARNING: {dataset_dir} not found, skipping {name}")
continue
print(f"\n{'='*60}")
print(f"Processing dataset: {name}")
print(f"{'='*60}")
records = process_dataset(name, dataset_dir, synonyms[name])
all_records.extend(records)
print(f" Total records for {name}: {len(records)}")
print(f"\n{'='*60}")
print(f"Creating splits (total: {len(all_records)} records)")
print(f"{'='*60}")
splits = create_splits(all_records, ratios=ratios, seed=config["seed"])
return splits
if __name__ == "__main__":
import yaml
config_path = Path(__file__).resolve().parents[2] / "configs" / "train_config.yaml"
with open(config_path) as f:
config = yaml.safe_load(f)
run(config)