| import os |
| import rasterio |
| import geopandas as gpd |
| from shapely.geometry import box |
| from rasterio.mask import mask |
| from PIL import Image |
| from PIL import ImageOps |
| import numpy as np |
| import warnings |
| from rasterio.errors import NodataShadowWarning |
| import sys |
|
|
| warnings.filterwarnings("ignore", category=NodataShadowWarning) |
|
|
| def cut_trees(output_dir, geojson_path, tif_path): |
| |
| if not os.path.exists(output_dir): |
| os.makedirs(output_dir) |
|
|
|
|
| |
|
|
| gdf = gpd.read_file(geojson_path) |
|
|
|
|
| |
| |
|
|
| |
| with rasterio.open(tif_path) as src: |
| |
| tif_bounds = box(*src.bounds) |
| tif_bounds = gpd.GeoDataFrame(geometry=[tif_bounds], crs=gdf.crs) |
| tif_bounds = tif_bounds['geometry'].iloc[0] |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| N = len(gdf) |
| n = int(N/10) |
| print(f"Processing {N} polygons...") |
| image_counter = 0 |
| for idx, row in gdf.iterrows(): |
| if idx % n == 0: |
| progress = f"{round(idx/N*100)} % complete --> {idx}/{N}" |
| sys.stdout.write('\r' + progress) |
| sys.stdout.flush() |
|
|
| |
| geom = row['geometry'] |
| name = row['id'] |
|
|
| |
| |
| |
| if geom.intersects(tif_bounds): |
| |
| out_image, out_transform = mask(src, [geom], crop=True) |
| |
| |
| out_image = out_image.transpose(1, 2, 0) |
|
|
| |
| if out_image.size == 0: |
| print("Empty image") |
| gdf.drop(idx, inplace=True) |
| message = f"{round(idx/N*100)} % complete --> {idx}/{N} | Polygon {idx} resulted in an empty image and will be skipped." |
| sys.stdout.write('\r' + message) |
| sys.stdout.flush() |
| continue |
| |
| |
| mask_array = (out_image[:, :, 0] != src.nodata) |
| non_zero_rows = np.any(mask_array, axis=1) |
| non_zero_cols = np.any(mask_array, axis=0) |
|
|
| |
| if not np.any(non_zero_rows) or not np.any(non_zero_cols): |
| print("Non zero rows or columns") |
| gdf.drop(idx, inplace=True) |
| message = f"{round(idx/N*100)} % complete --> {idx}/{N} | Polygon {idx} resulted in an invalid image area and will be skipped." |
| sys.stdout.write('\r' + message) |
| sys.stdout.flush() |
| continue |
|
|
| out_image = out_image[non_zero_rows][:, non_zero_cols] |
|
|
| |
| out_image = Image.fromarray(out_image.astype(np.uint8)) |
| output_path = os.path.join(output_dir, f'tree_{name}.png') |
| out_image.save(output_path) |
| image_counter += 1 |
| else: |
| gdf.drop(idx, inplace=True) |
| print("Does not intersect") |
| message = f"{round(idx/N*100)} % complete --> {idx}/{N} | Polygon {idx} is outside the image bounds and will be skipped." |
| sys.stdout.write('\r' + message) |
| sys.stdout.flush() |
| print(len(gdf)) |
| gdf.to_file(geojson_path, driver='GeoJSON') |
| print(f'\n {image_counter}/{N} Tree images have been successfully saved in the "detected_trees" folder.') |
|
|
|
|
| def resize_images(input_folder, output_folder, target_size): |
| |
| if not os.path.exists(output_folder): |
| os.makedirs(output_folder) |
| |
| counter = 0 |
| |
| for filename in os.listdir(input_folder): |
| if filename.endswith('.png'): |
| |
| with Image.open(os.path.join(input_folder, filename)) as img: |
| |
| |
|
|
| img.thumbnail(target_size, Image.LANCZOS) |
|
|
| if img.size[0] < target_size[0] or img.size[1] < target_size[1]: |
| |
| |
| pad_width = target_size[0] - img.size[0] |
| pad_height = target_size[1] - img.size[1] |
| |
| |
| padding = (pad_width // 2, pad_height // 2, pad_width - (pad_width // 2), pad_height - (pad_height // 2)) |
| |
| |
| img = ImageOps.expand(img, padding, fill=(0, 0, 0)) |
| |
| |
|
|
|
|
| |
| paste_pos = ((target_size[0] - img.size[0]) // 2, (target_size[1] - img.size[1]) // 2) |
| |
| |
| new_img = Image.new("RGBA", target_size, (0, 0, 0, 255)) |
| img = img.convert("RGBA") |
| |
| new_img.paste(img, paste_pos, img) |
| |
| new_img = new_img.convert("RGB") |
| |
| new_img.save(os.path.join(output_folder, filename)) |
| |
| counter += 1 |
| |
| if counter % 100 == 0: |
| message = f"Processed {counter} images" |
| print(message, end='\r') |
| |
| |
| print(f"Processed a total of {counter} images.") |
|
|
|
|
| |
| def generate_tree_images(geojson_path, tif_path, target_size = (224, 224)): |
| """ |
| INPUT: geojson path, tif_path that contain the trees, optional target_size of the resulting images |
| |
| RETURNS: nothing |
| |
| Action: It creates two folders: + "detected trees" --> the cut tree images |
| + "tree_images" --> the processed cut tree images, ready to use for species recognition |
| """ |
|
|
|
|
| |
| folder_cut_trees = "detected_trees" |
| folder_finished_images = "tree_images" |
| |
| cut_trees(geojson_path = geojson_path, tif_path = tif_path, output_dir = folder_cut_trees) |
| resize_images(input_folder = folder_cut_trees, output_folder = folder_finished_images, target_size = target_size) |
|
|
| |