#!/usr/bin/env python3 """ Dataset Verification Script for Road Anomaly Detection Checks dataset structure, validates annotations, and provides statistics """ import os import sys from pathlib import Path from collections import defaultdict import json try: from PIL import Image import yaml except ImportError: print("Installing required packages...") os.system("pip install pillow pyyaml") from PIL import Image import yaml def verify_image(img_path: Path) -> dict: """Verify a single image file""" result = {"valid": False, "error": None, "size": None} try: with Image.open(img_path) as img: result["valid"] = True result["size"] = img.size result["format"] = img.format result["mode"] = img.mode except Exception as e: result["error"] = str(e) return result def verify_label(lbl_path: Path, num_classes: int = 20) -> dict: """Verify a YOLO format label file""" result = { "valid": True, "errors": [], "num_objects": 0, "classes": defaultdict(int) } if not lbl_path.exists(): result["valid"] = False result["errors"].append("Label file not found") return result try: with open(lbl_path) as f: lines = f.readlines() for i, line in enumerate(lines): line = line.strip() if not line: continue parts = line.split() if len(parts) < 5: result["errors"].append(f"Line {i+1}: Invalid format (expected 5+ values)") result["valid"] = False continue try: cls_id = int(parts[0]) x_center = float(parts[1]) y_center = float(parts[2]) width = float(parts[3]) height = float(parts[4]) # Validate ranges if cls_id < 0 or cls_id >= num_classes: result["errors"].append(f"Line {i+1}: Class ID {cls_id} out of range") for val, name in [(x_center, "x_center"), (y_center, "y_center"), (width, "width"), (height, "height")]: if val < 0 or val > 1: result["errors"].append(f"Line {i+1}: {name}={val} out of range [0,1]") result["num_objects"] += 1 result["classes"][cls_id] += 1 except ValueError as e: result["errors"].append(f"Line {i+1}: {e}") result["valid"] = False except Exception as e: result["valid"] = False result["errors"].append(f"Failed to read file: {e}") return result def verify_dataset(dataset_path: Path, verbose: bool = True) -> dict: """Verify complete dataset structure""" print("=" * 60) print("ROAD ANOMALY DETECTION - DATASET VERIFICATION") print("=" * 60) print(f"\nDataset path: {dataset_path.resolve()}") # Check for data.yaml data_yaml = dataset_path / "data.yaml" class_names = None num_classes = 20 # Default if data_yaml.exists(): with open(data_yaml) as f: config = yaml.safe_load(f) class_names = config.get("names", []) num_classes = config.get("nc", len(class_names)) print(f"\n✓ Found data.yaml with {num_classes} classes:") for i, name in enumerate(class_names): print(f" {i}: {name}") else: print(f"\n⚠ No data.yaml found at {data_yaml}") results = { "valid": True, "splits": {}, "total_images": 0, "total_labels": 0, "total_objects": 0, "class_distribution": defaultdict(int), "errors": [] } # Check each split for split in ["train", "valid", "test"]: print(f"\n{'-' * 40}") print(f"Checking {split} split...") split_path = dataset_path / split img_dir = split_path / "images" lbl_dir = split_path / "labels" split_result = { "exists": False, "images": 0, "labels": 0, "objects": 0, "matched": 0, "missing_labels": [], "missing_images": [], "invalid_images": [], "invalid_labels": [], "class_distribution": defaultdict(int) } if not split_path.exists(): print(f" ⚠ Split directory not found: {split_path}") results["splits"][split] = split_result continue split_result["exists"] = True # Get all images images = [] for ext in ["*.jpg", "*.jpeg", "*.png", "*.bmp"]: images.extend(img_dir.glob(ext)) images.extend(img_dir.glob(ext.upper())) # Get all labels labels = list(lbl_dir.glob("*.txt")) if lbl_dir.exists() else [] split_result["images"] = len(images) split_result["labels"] = len(labels) print(f" Images: {len(images)}") print(f" Labels: {len(labels)}") # Create lookup sets img_stems = {img.stem for img in images} lbl_stems = {lbl.stem for lbl in labels} # Check for missing pairs missing_labels = img_stems - lbl_stems missing_images = lbl_stems - img_stems matched = img_stems & lbl_stems split_result["matched"] = len(matched) split_result["missing_labels"] = list(missing_labels)[:10] # First 10 split_result["missing_images"] = list(missing_images)[:10] if missing_labels: print(f" ⚠ {len(missing_labels)} images without labels") if verbose and len(missing_labels) <= 5: for name in list(missing_labels)[:5]: print(f" - {name}") if missing_images: print(f" ⚠ {len(missing_images)} labels without images") # Verify matched pairs (sample for speed) sample_size = min(100, len(matched)) sampled = list(matched)[:sample_size] for stem in sampled: # Find image img_path = None for ext in [".jpg", ".jpeg", ".png", ".bmp", ".JPG", ".JPEG", ".PNG"]: candidate = img_dir / f"{stem}{ext}" if candidate.exists(): img_path = candidate break if img_path: img_result = verify_image(img_path) if not img_result["valid"]: split_result["invalid_images"].append(stem) # Verify label lbl_path = lbl_dir / f"{stem}.txt" lbl_result = verify_label(lbl_path, num_classes) if not lbl_result["valid"]: split_result["invalid_labels"].append((stem, lbl_result["errors"])) split_result["objects"] += lbl_result["num_objects"] for cls_id, count in lbl_result["classes"].items(): split_result["class_distribution"][cls_id] += count results["class_distribution"][cls_id] += count # Extrapolate objects from sample if sample_size < len(matched): ratio = len(matched) / sample_size split_result["objects"] = int(split_result["objects"] * ratio) print(f" Objects (estimated): {split_result['objects']}") print(f" Matched pairs: {len(matched)}") if split_result["invalid_labels"]: print(f" ⚠ {len(split_result['invalid_labels'])} labels with issues") results["splits"][split] = split_result results["total_images"] += split_result["images"] results["total_labels"] += split_result["labels"] results["total_objects"] += split_result["objects"] # Summary print(f"\n{'=' * 60}") print("SUMMARY") print("=" * 60) print(f"\nTotal images: {results['total_images']}") print(f"Total labels: {results['total_labels']}") print(f"Total objects (estimated): {results['total_objects']}") # Class distribution if results["class_distribution"]: print(f"\nClass distribution:") for cls_id, count in sorted(results["class_distribution"].items()): name = class_names[cls_id] if class_names and cls_id < len(class_names) else f"class_{cls_id}" print(f" {cls_id} ({name}): {count} objects") # Validation status print(f"\n{'=' * 60}") # Check minimum requirements train_split = results["splits"].get("train", {}) valid_split = results["splits"].get("valid", {}) checks = [ ("Train split exists", train_split.get("exists", False)), ("Valid split exists", valid_split.get("exists", False)), ("Train has images", train_split.get("images", 0) > 0), ("Valid has images", valid_split.get("images", 0) > 0), ("Train has labels", train_split.get("labels", 0) > 0), ("Valid has labels", valid_split.get("labels", 0) > 0), ("Train images ≥ 100", train_split.get("images", 0) >= 100), ] all_passed = True for name, passed in checks: status = "✓" if passed else "✗" print(f" [{status}] {name}") if not passed: all_passed = False results["valid"] = all_passed if all_passed: print(f"\n✅ Dataset is valid and ready for training!") else: print(f"\n❌ Dataset has issues that need to be fixed.") return results def create_sample_data_yaml(dataset_path: Path, classes: list = None): """Create a sample data.yaml file""" if classes is None: classes = ["pothole", "crack", "bump", "obstacle", "road_damage"] yaml_path = dataset_path / "data.yaml" data = { "train": str((dataset_path / "train" / "images").resolve()), "val": str((dataset_path / "valid" / "images").resolve()), "test": str((dataset_path / "test" / "images").resolve()), "nc": len(classes), "names": classes } with open(yaml_path, "w") as f: yaml.dump(data, f, default_flow_style=False) print(f"Created: {yaml_path}") return yaml_path if __name__ == "__main__": # Default dataset path dataset_path = Path("./dataset") # Allow command line override if len(sys.argv) > 1: dataset_path = Path(sys.argv[1]) if not dataset_path.exists(): print(f"Dataset path not found: {dataset_path}") print("\nExpected structure:") print(" dataset/") print(" ├── data.yaml") print(" ├── train/") print(" │ ├── images/") print(" │ └── labels/") print(" ├── valid/") print(" │ ├── images/") print(" │ └── labels/") print(" └── test/") print(" ├── images/") print(" └── labels/") # Offer to create structure print(f"\nCreate sample structure at {dataset_path}? [y/N]: ", end="") try: response = input().strip().lower() if response == "y": for split in ["train", "valid", "test"]: (dataset_path / split / "images").mkdir(parents=True, exist_ok=True) (dataset_path / split / "labels").mkdir(parents=True, exist_ok=True) create_sample_data_yaml(dataset_path) print("✓ Created sample structure. Add your images and labels, then run again.") except: pass sys.exit(1) results = verify_dataset(dataset_path) # Save results results_file = dataset_path / "verification_results.json" # Convert defaultdicts to regular dicts for JSON serialization def convert_defaultdict(obj): if isinstance(obj, defaultdict): return dict(obj) elif isinstance(obj, dict): return {k: convert_defaultdict(v) for k, v in obj.items()} elif isinstance(obj, list): return [convert_defaultdict(i) for i in obj] return obj with open(results_file, "w") as f: json.dump(convert_defaultdict(results), f, indent=2) print(f"\nResults saved to: {results_file}")