| |
| """ |
| Dataset Verification Script for Road Anomaly Detection |
| Checks dataset structure, validates annotations, and provides statistics |
| """ |
|
|
| import os |
| import sys |
| from pathlib import Path |
| from collections import defaultdict |
| import json |
|
|
| try: |
| from PIL import Image |
| import yaml |
| except ImportError: |
| print("Installing required packages...") |
| os.system("pip install pillow pyyaml") |
| from PIL import Image |
| import yaml |
|
|
|
|
| def verify_image(img_path: Path) -> dict: |
| """Verify a single image file""" |
| result = {"valid": False, "error": None, "size": None} |
| try: |
| with Image.open(img_path) as img: |
| result["valid"] = True |
| result["size"] = img.size |
| result["format"] = img.format |
| result["mode"] = img.mode |
| except Exception as e: |
| result["error"] = str(e) |
| return result |
|
|
|
|
| def verify_label(lbl_path: Path, num_classes: int = 20) -> dict: |
| """Verify a YOLO format label file""" |
| result = { |
| "valid": True, |
| "errors": [], |
| "num_objects": 0, |
| "classes": defaultdict(int) |
| } |
| |
| if not lbl_path.exists(): |
| result["valid"] = False |
| result["errors"].append("Label file not found") |
| return result |
| |
| try: |
| with open(lbl_path) as f: |
| lines = f.readlines() |
| |
| for i, line in enumerate(lines): |
| line = line.strip() |
| if not line: |
| continue |
| |
| parts = line.split() |
| if len(parts) < 5: |
| result["errors"].append(f"Line {i+1}: Invalid format (expected 5+ values)") |
| result["valid"] = False |
| continue |
| |
| try: |
| cls_id = int(parts[0]) |
| x_center = float(parts[1]) |
| y_center = float(parts[2]) |
| width = float(parts[3]) |
| height = float(parts[4]) |
| |
| |
| if cls_id < 0 or cls_id >= num_classes: |
| result["errors"].append(f"Line {i+1}: Class ID {cls_id} out of range") |
| |
| for val, name in [(x_center, "x_center"), (y_center, "y_center"), |
| (width, "width"), (height, "height")]: |
| if val < 0 or val > 1: |
| result["errors"].append(f"Line {i+1}: {name}={val} out of range [0,1]") |
| |
| result["num_objects"] += 1 |
| result["classes"][cls_id] += 1 |
| |
| except ValueError as e: |
| result["errors"].append(f"Line {i+1}: {e}") |
| result["valid"] = False |
| |
| except Exception as e: |
| result["valid"] = False |
| result["errors"].append(f"Failed to read file: {e}") |
| |
| return result |
|
|
|
|
| def verify_dataset(dataset_path: Path, verbose: bool = True) -> dict: |
| """Verify complete dataset structure""" |
| |
| print("=" * 60) |
| print("ROAD ANOMALY DETECTION - DATASET VERIFICATION") |
| print("=" * 60) |
| print(f"\nDataset path: {dataset_path.resolve()}") |
| |
| |
| data_yaml = dataset_path / "data.yaml" |
| class_names = None |
| num_classes = 20 |
| |
| if data_yaml.exists(): |
| with open(data_yaml) as f: |
| config = yaml.safe_load(f) |
| class_names = config.get("names", []) |
| num_classes = config.get("nc", len(class_names)) |
| print(f"\nβ Found data.yaml with {num_classes} classes:") |
| for i, name in enumerate(class_names): |
| print(f" {i}: {name}") |
| else: |
| print(f"\nβ No data.yaml found at {data_yaml}") |
| |
| results = { |
| "valid": True, |
| "splits": {}, |
| "total_images": 0, |
| "total_labels": 0, |
| "total_objects": 0, |
| "class_distribution": defaultdict(int), |
| "errors": [] |
| } |
| |
| |
| for split in ["train", "valid", "test"]: |
| print(f"\n{'-' * 40}") |
| print(f"Checking {split} split...") |
| |
| split_path = dataset_path / split |
| img_dir = split_path / "images" |
| lbl_dir = split_path / "labels" |
| |
| split_result = { |
| "exists": False, |
| "images": 0, |
| "labels": 0, |
| "objects": 0, |
| "matched": 0, |
| "missing_labels": [], |
| "missing_images": [], |
| "invalid_images": [], |
| "invalid_labels": [], |
| "class_distribution": defaultdict(int) |
| } |
| |
| if not split_path.exists(): |
| print(f" β Split directory not found: {split_path}") |
| results["splits"][split] = split_result |
| continue |
| |
| split_result["exists"] = True |
| |
| |
| images = [] |
| for ext in ["*.jpg", "*.jpeg", "*.png", "*.bmp"]: |
| images.extend(img_dir.glob(ext)) |
| images.extend(img_dir.glob(ext.upper())) |
| |
| |
| labels = list(lbl_dir.glob("*.txt")) if lbl_dir.exists() else [] |
| |
| split_result["images"] = len(images) |
| split_result["labels"] = len(labels) |
| |
| print(f" Images: {len(images)}") |
| print(f" Labels: {len(labels)}") |
| |
| |
| img_stems = {img.stem for img in images} |
| lbl_stems = {lbl.stem for lbl in labels} |
| |
| |
| missing_labels = img_stems - lbl_stems |
| missing_images = lbl_stems - img_stems |
| matched = img_stems & lbl_stems |
| |
| split_result["matched"] = len(matched) |
| split_result["missing_labels"] = list(missing_labels)[:10] |
| split_result["missing_images"] = list(missing_images)[:10] |
| |
| if missing_labels: |
| print(f" β {len(missing_labels)} images without labels") |
| if verbose and len(missing_labels) <= 5: |
| for name in list(missing_labels)[:5]: |
| print(f" - {name}") |
| |
| if missing_images: |
| print(f" β {len(missing_images)} labels without images") |
| |
| |
| sample_size = min(100, len(matched)) |
| sampled = list(matched)[:sample_size] |
| |
| for stem in sampled: |
| |
| img_path = None |
| for ext in [".jpg", ".jpeg", ".png", ".bmp", ".JPG", ".JPEG", ".PNG"]: |
| candidate = img_dir / f"{stem}{ext}" |
| if candidate.exists(): |
| img_path = candidate |
| break |
| |
| if img_path: |
| img_result = verify_image(img_path) |
| if not img_result["valid"]: |
| split_result["invalid_images"].append(stem) |
| |
| |
| lbl_path = lbl_dir / f"{stem}.txt" |
| lbl_result = verify_label(lbl_path, num_classes) |
| if not lbl_result["valid"]: |
| split_result["invalid_labels"].append((stem, lbl_result["errors"])) |
| |
| split_result["objects"] += lbl_result["num_objects"] |
| for cls_id, count in lbl_result["classes"].items(): |
| split_result["class_distribution"][cls_id] += count |
| results["class_distribution"][cls_id] += count |
| |
| |
| if sample_size < len(matched): |
| ratio = len(matched) / sample_size |
| split_result["objects"] = int(split_result["objects"] * ratio) |
| |
| print(f" Objects (estimated): {split_result['objects']}") |
| print(f" Matched pairs: {len(matched)}") |
| |
| if split_result["invalid_labels"]: |
| print(f" β {len(split_result['invalid_labels'])} labels with issues") |
| |
| results["splits"][split] = split_result |
| results["total_images"] += split_result["images"] |
| results["total_labels"] += split_result["labels"] |
| results["total_objects"] += split_result["objects"] |
| |
| |
| print(f"\n{'=' * 60}") |
| print("SUMMARY") |
| print("=" * 60) |
| print(f"\nTotal images: {results['total_images']}") |
| print(f"Total labels: {results['total_labels']}") |
| print(f"Total objects (estimated): {results['total_objects']}") |
| |
| |
| if results["class_distribution"]: |
| print(f"\nClass distribution:") |
| for cls_id, count in sorted(results["class_distribution"].items()): |
| name = class_names[cls_id] if class_names and cls_id < len(class_names) else f"class_{cls_id}" |
| print(f" {cls_id} ({name}): {count} objects") |
| |
| |
| print(f"\n{'=' * 60}") |
| |
| |
| train_split = results["splits"].get("train", {}) |
| valid_split = results["splits"].get("valid", {}) |
| |
| checks = [ |
| ("Train split exists", train_split.get("exists", False)), |
| ("Valid split exists", valid_split.get("exists", False)), |
| ("Train has images", train_split.get("images", 0) > 0), |
| ("Valid has images", valid_split.get("images", 0) > 0), |
| ("Train has labels", train_split.get("labels", 0) > 0), |
| ("Valid has labels", valid_split.get("labels", 0) > 0), |
| ("Train images β₯ 100", train_split.get("images", 0) >= 100), |
| ] |
| |
| all_passed = True |
| for name, passed in checks: |
| status = "β" if passed else "β" |
| print(f" [{status}] {name}") |
| if not passed: |
| all_passed = False |
| |
| results["valid"] = all_passed |
| |
| if all_passed: |
| print(f"\nβ
Dataset is valid and ready for training!") |
| else: |
| print(f"\nβ Dataset has issues that need to be fixed.") |
| |
| return results |
|
|
|
|
| def create_sample_data_yaml(dataset_path: Path, classes: list = None): |
| """Create a sample data.yaml file""" |
| if classes is None: |
| classes = ["pothole", "crack", "bump", "obstacle", "road_damage"] |
| |
| yaml_path = dataset_path / "data.yaml" |
| |
| data = { |
| "train": str((dataset_path / "train" / "images").resolve()), |
| "val": str((dataset_path / "valid" / "images").resolve()), |
| "test": str((dataset_path / "test" / "images").resolve()), |
| "nc": len(classes), |
| "names": classes |
| } |
| |
| with open(yaml_path, "w") as f: |
| yaml.dump(data, f, default_flow_style=False) |
| |
| print(f"Created: {yaml_path}") |
| return yaml_path |
|
|
|
|
| if __name__ == "__main__": |
| |
| dataset_path = Path("./dataset") |
| |
| |
| if len(sys.argv) > 1: |
| dataset_path = Path(sys.argv[1]) |
| |
| if not dataset_path.exists(): |
| print(f"Dataset path not found: {dataset_path}") |
| print("\nExpected structure:") |
| print(" dataset/") |
| print(" βββ data.yaml") |
| print(" βββ train/") |
| print(" β βββ images/") |
| print(" β βββ labels/") |
| print(" βββ valid/") |
| print(" β βββ images/") |
| print(" β βββ labels/") |
| print(" βββ test/") |
| print(" βββ images/") |
| print(" βββ labels/") |
| |
| |
| print(f"\nCreate sample structure at {dataset_path}? [y/N]: ", end="") |
| try: |
| response = input().strip().lower() |
| if response == "y": |
| for split in ["train", "valid", "test"]: |
| (dataset_path / split / "images").mkdir(parents=True, exist_ok=True) |
| (dataset_path / split / "labels").mkdir(parents=True, exist_ok=True) |
| create_sample_data_yaml(dataset_path) |
| print("β Created sample structure. Add your images and labels, then run again.") |
| except: |
| pass |
| sys.exit(1) |
| |
| results = verify_dataset(dataset_path) |
| |
| |
| results_file = dataset_path / "verification_results.json" |
| |
| |
| def convert_defaultdict(obj): |
| if isinstance(obj, defaultdict): |
| return dict(obj) |
| elif isinstance(obj, dict): |
| return {k: convert_defaultdict(v) for k, v in obj.items()} |
| elif isinstance(obj, list): |
| return [convert_defaultdict(i) for i in obj] |
| return obj |
| |
| with open(results_file, "w") as f: |
| json.dump(convert_defaultdict(results), f, indent=2) |
| print(f"\nResults saved to: {results_file}") |
|
|