File size: 7,078 Bytes
b39a019
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import os
import yaml
from pathlib import Path
import shutil
from PIL import Image
import math
import numpy as np
from shapely.geometry import Polygon
import matplotlib.pyplot as plt
from matplotlib.patches import Polygon as PltPolygon
import random

# Should eventually be moved to a settings class
zebra_labels = "zebra_annotations/txt_annotations"
zebra_images = "zebra_annotations/zebra_images"
save_dir = "zebra_annotations/classification_data"
segments = 4

# Each box has format (x_1, y_1, x_2, y_2) - this does mean this is an estimate.
# box_1 is the bounding box of the crosswalk, and box_2 is that of the tile.
def check_box_intersection(box_1, box_2, threshold=0.6):
    # Threshold is the min IoU required to consider it as having a crosswalk - the minimum percent area of the crosswalk that must be in the tile
    formatted_box_1 = [[box_1[0], box_1[1]], [box_1[2], box_1[1]], [box_1[2], box_1[3]], [box_1[0], box_1[3]]]  # Formatting follows shapely clockwise system

    poly_1 = Polygon(formatted_box_1)
    poly_2 = Polygon(box_2)
    try:
        iou = poly_1.intersection(poly_2).area / poly_1.union(poly_2).area
        scaled_iou = iou * ((poly_1.area) / (poly_2.area))
        # print("--------------")
        # print(iou, scaled_iou)
        return (scaled_iou > threshold)
    except ZeroDivisionError:
        # In the case of a zero division error
        print("ZERO DIVISION ERROR", poly_1.area, poly_2.area)
        return False

def load_yaml_database(yaml_path):
    config_file = None

    with open(yaml_path, "r") as file:
        config_file = yaml.safe_load(file)
    
    root = Path(config_file['path'])
    train_dir = root / config_file['train']  # Training data
    valid_dir = root / config_file['val']  # Validation data
    test_dir = root / config_file['test']  # Testing data
    image_size = config_file['img_size']
    classes = config_file['names']

    # Returns in format (directories, label_description)
    return (train_dir, valid_dir, test_dir), (image_size, classes)


# Takes in a entity database, labelled for bounding box regression, and breaks down the images into
# smaller images, with labels for classification training
def convert_database_to_segments(image_dir, label_dir, dst_dir, overwrite=False):
    if not os.path.exists(image_dir) or not os.path.exists(label_dir):
        print("Error: Image or label directories do not exist")
        return
    
    # If there isn't a directory created to save the new data to, create it
    dst_dir = Path(dst_dir)
    try:
        if overwrite and dst_dir.exists() and dst_dir.is_dir():
            shutil.rmtree(dst_dir)
            dst_dir.mkdir(parents=True, exist_ok=True)
        else:
            dst_dir.mkdir(parents=True, exist_ok=False)
    except FileExistsError:
        # If the conversion has already been made, don't do anything
        return
    
    # Some images are unlabelled and will be ignored
    image_files = {Path(f).stem for f in os.listdir(image_dir) if f.endswith(('.jpg', '.jpeg', '.png'))}

    file_base = 0
    iterations = 0

    for label in os.listdir(label_dir)[1:]:
        if label.endswith('.txt'):
            image_name = Path(label).stem

            if image_name in image_files:
                image_path = os.path.join(image_dir, image_name)
                label_path = os.path.join(label_dir, label)
                # Saves the broken down segments to the destination directory - no need to return
                file_base = breakdown(image_path, label_path, dst_dir, file_base)

        if iterations < 10:
            print(file_base, end=" ")  # To make sure that it is progressing - a progress bar of sorts
            iterations += 1
        else:
            print(file_base, end= '\r',)  # This doesn't work on the VS code terminal unfortunately 
            iterations = 0

def breakdown(image_path, label, dst_dir, file_base, segment=segments, targ_size = None):
    with Image.open(image_path + ".jpg") as image:
        img_size, take_size = image.size[0], None

        # Can segment images by quantity (how many images you want) or by quantity (how large do you want the images)
        if targ_size is None:
            take_size = math.floor(img_size / segment)

        else:
            segment = math.floor(img_size / targ_size)
            take_size = img_size / segment

        # Particular to the Zebra label format
        label_data = []
        with open(label, 'r') as label_file:
            for line in label_file.readlines():
                if line[0] == '0':
                    parsed = list(map(float, line.split()[1:]))
                    entity_box = np.array([(parsed[i], parsed[i + 1]) for i in range(0, len(parsed), 2)])
                    label_data.append(entity_box * img_size)


        img = np.array(image)
        for i in range(segment):
            for j in range(segment):
                box_coordinates = [i * take_size, j * take_size, min((i + 1) * take_size, len(img[0])), min((j + 1) * take_size, len(img))]
                new_img = img[box_coordinates[1]: box_coordinates[3], box_coordinates[0]: box_coordinates[2]]
                
                # Some images do not load so have size zero, which causes crashes when loading them from the database during training
                # because tensor conversions cannot take empty lists, For that reason we threshold at an arbitrarily low number here.
                if new_img.size <= 32:
                    continue

                new_image = Image.fromarray(new_img)

                crosswalk_intersection = False
                for crosswalk_box in label_data:
                    if check_box_intersection(box_coordinates, crosswalk_box):
                        # 1 means an image that contains a crosswalk (a significant portion of it)
                        crosswalk_intersection = True
                    else:
                        # 0 means background image (does not contain any significant portion of a crosswalk)
                        pass
                
                if crosswalk_intersection: 
                    with open(os.path.join(dst_dir, str(file_base)) + ".txt", 'w') as new_label_file:
                        new_label_file.write("1")
                        new_image.save(str(os.path.join(dst_dir, str(file_base))) + ".png")

                # As the crosswalks are sparse - this improves the balance of positive to negative cases for training
                else:
                    if random.randint(0, 4) >= 1:
                        with open(os.path.join(dst_dir, str(file_base)) + ".txt", 'w') as new_label_file:
                            new_label_file.write("0")
                            new_image.save(str(os.path.join(dst_dir, str(file_base))) + ".png")
                    
                # This ensures we don't overwrite previous segment files
                file_base += 1

    return file_base

convert_database_to_segments("zebra_annotations/zebra_images", "zebra_annotations/txt_annotations", "zebra_annotations/classification_data")