Spaces:
Running
Running
| import os | |
| import shutil | |
| import json | |
| from collections import defaultdict | |
| import random | |
| from tqdm import tqdm | |
| from PIL import Image | |
| def convert_coco_to_yolo(coco_json_path, images_dir, output_dir, class_map, split='train'): | |
| """Convert COCO format annotations to YOLO format""" | |
| if not os.path.exists(coco_json_path): | |
| print(f"Warning: JSON file not found: {coco_json_path}") | |
| return set() | |
| if not os.path.exists(images_dir): | |
| print(f"Warning: Images directory not found: {images_dir}") | |
| return set() | |
| print(f"\nProcessing {split} split...") | |
| # Create output directories | |
| labels_dir = os.path.join(output_dir, 'labels', split) | |
| images_dir_out = os.path.join(output_dir, 'images', split) | |
| os.makedirs(labels_dir, exist_ok=True) | |
| os.makedirs(images_dir_out, exist_ok=True) | |
| # Load COCO annotations | |
| try: | |
| with open(coco_json_path, 'r') as f: | |
| coco = json.load(f) | |
| except json.JSONDecodeError: | |
| print(f"Error: Invalid JSON file: {coco_json_path}") | |
| return set() | |
| # Create id to filename mapping | |
| id_to_filename = {img['id']: img['file_name'] for img in coco['images']} | |
| # Group annotations by image | |
| img_to_anns = defaultdict(list) | |
| for ann in coco['annotations']: | |
| img_to_anns[ann['image_id']].append(ann) | |
| # Process each image | |
| processed_images = set() | |
| for img_id, anns in tqdm(img_to_anns.items(), desc=f"Converting {split} set"): | |
| img_file = id_to_filename[img_id] | |
| img_path = os.path.join(images_dir, img_file) | |
| if not os.path.exists(img_path): | |
| print(f"Warning: Image {img_path} not found, skipping...") | |
| continue | |
| try: | |
| # Copy image | |
| shutil.copy2(img_path, os.path.join(images_dir_out, img_file)) | |
| # Get image dimensions | |
| with Image.open(img_path) as im: | |
| w, h = im.size | |
| # Convert annotations | |
| label_lines = [] | |
| for ann in anns: | |
| cat_id = ann['category_id'] | |
| if cat_id not in class_map: | |
| print(f"Warning: Unknown category ID {cat_id} in {img_file}") | |
| continue | |
| yolo_cls = class_map[cat_id] | |
| # Convert segmentation points | |
| for seg in ann['segmentation']: | |
| coords = [str(x/w) if i%2==0 else str(x/h) for i,x in enumerate(seg)] | |
| label_lines.append(f"{yolo_cls} {' '.join(coords)}") | |
| # Write label file | |
| label_file = os.path.join(labels_dir, os.path.splitext(img_file)[0] + '.txt') | |
| with open(label_file, 'w') as f: | |
| f.write('\n'.join(label_lines)) | |
| processed_images.add(img_id) | |
| except (IOError, OSError) as e: | |
| print(f"Error processing {img_file}: {str(e)}") | |
| continue | |
| return processed_images | |
| def create_balanced_dataset(source_json, images_dir, output_dir, class_map, min_samples=50, split='train'): | |
| """Create balanced dataset by sampling equal number of images per class""" | |
| print(f"\nCreating balanced dataset for {split} split...") | |
| # Create output directories | |
| labels_dir = os.path.join(output_dir, 'labels', split) | |
| images_dir_out = os.path.join(output_dir, 'images', split) | |
| os.makedirs(labels_dir, exist_ok=True) | |
| os.makedirs(images_dir_out, exist_ok=True) | |
| # Load COCO annotations | |
| with open(source_json, 'r') as f: | |
| coco = json.load(f) | |
| # Group images by parts they contain | |
| images_by_part = defaultdict(set) | |
| image_to_anns = defaultdict(list) | |
| for ann in coco['annotations']: | |
| img_id = ann['image_id'] | |
| cat_id = ann['category_id'] | |
| images_by_part[cat_id].add(img_id) | |
| image_to_anns[img_id].append(ann) | |
| # Sample images for balanced dataset | |
| selected_images = set() | |
| for part_images in images_by_part.values(): | |
| sample_size = min(min_samples, len(part_images)) | |
| selected_images.update(random.sample(list(part_images), sample_size)) | |
| # Convert selected images to YOLO format | |
| id_to_filename = {img['id']: img['file_name'] for img in coco['images']} | |
| print(f"Processing {len(selected_images)} images for balanced {split} set...") | |
| for img_id in tqdm(selected_images): | |
| img_file = id_to_filename[img_id] | |
| img_path = os.path.join(images_dir, img_file) | |
| if not os.path.exists(img_path): | |
| print(f"Warning: Image {img_path} not found, skipping...") | |
| continue | |
| # Copy image | |
| shutil.copy2(img_path, os.path.join(images_dir_out, img_file)) | |
| # Get image dimensions | |
| with Image.open(img_path) as im: | |
| w, h = im.size | |
| # Convert annotations | |
| label_lines = [] | |
| for ann in image_to_anns[img_id]: | |
| cat_id = ann['category_id'] | |
| yolo_cls = class_map[cat_id] | |
| # Convert segmentation points | |
| for seg in ann['segmentation']: | |
| coords = [str(x/w) if i%2==0 else str(x/h) for i,x in enumerate(seg)] | |
| label_lines.append(f"{yolo_cls} {' '.join(coords)}") | |
| # Write label file | |
| label_file = os.path.join(labels_dir, os.path.splitext(img_file)[0] + '.txt') | |
| with open(label_file, 'w') as f: | |
| f.write('\n'.join(label_lines)) | |
| def main(): | |
| base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| source_dir = os.path.join(base_dir, 'damage_detection_dataset') | |
| if not os.path.exists(source_dir): | |
| print(f"Error: Source directory not found: {source_dir}") | |
| return | |
| # Set up output directories | |
| car_damage_dir = os.path.join(base_dir, 'data', 'data_yolo_for_training', 'car_damage_dataset') | |
| car_parts_dir = os.path.join(base_dir, 'data', 'data_yolo_for_training', 'car_parts_damage_dataset') | |
| # Class mappings | |
| damage_class_map = {1: 0} # Assuming damage is class 1 in COCO format | |
| parts_class_map = {1: 0, 2: 1, 3: 2, 4: 3, 5: 4} # headlamp, front_bumper, hood, door, rear_bumper | |
| # Process car damage dataset (full dataset) | |
| print("\nProcessing Car Damage Dataset...") | |
| for split in ['train', 'val', 'test']: | |
| json_name = 'COCO_train_annos.json' if split == 'train' else 'COCO_val_annos.json' | |
| json_path = os.path.join(source_dir, split, json_name) | |
| images_dir = os.path.join(source_dir, split) | |
| if os.path.exists(json_path): | |
| convert_coco_to_yolo( | |
| json_path, | |
| images_dir, | |
| car_damage_dir, | |
| damage_class_map, | |
| split | |
| ) | |
| else: | |
| print(f"Warning: JSON file not found for {split} split: {json_path}") | |
| # Process car parts dataset (balanced training, original val/test) | |
| print("\nProcessing Car Parts Dataset...") | |
| # Training set - balanced | |
| train_json = os.path.join(source_dir, 'train', 'COCO_mul_train_annos.json') | |
| if os.path.exists(train_json): | |
| create_balanced_dataset( | |
| train_json, | |
| os.path.join(source_dir, 'train'), | |
| car_parts_dir, | |
| parts_class_map, | |
| min_samples=50, | |
| split='train' | |
| ) | |
| else: | |
| print(f"Warning: Training JSON file not found: {train_json}") | |
| # Validation and test sets - original | |
| for split in ['val', 'test']: | |
| json_path = os.path.join(source_dir, split, 'COCO_mul_val_annos.json') | |
| images_dir = os.path.join(source_dir, split) | |
| if os.path.exists(json_path): | |
| convert_coco_to_yolo( | |
| json_path, | |
| images_dir, | |
| car_parts_dir, | |
| parts_class_map, | |
| split | |
| ) | |
| else: | |
| print(f"Warning: JSON file not found for {split} split: {json_path}") | |
| if __name__ == '__main__': | |
| main() | |