File size: 3,895 Bytes
4fcc331 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 | """Utility function to split a STAC collection into multiple STAC collections based on CRS.
Requires the "proj:epsg" property to be present in all the STAC items.
"""
import os
from pathlib import Path
from typing import Iterator, Union
import pystac
def _extract_epsg_from_stac_item(stac_item: pystac.Item) -> int:
"""
Extract the EPSG code from a STAC item.
Parameters:
stac_item (pystac.Item): The STAC item.
Returns:
int: The EPSG code.
Raises:
KeyError: If the "proj:epsg" property is missing from the STAC item.
"""
try:
epsg_code = stac_item.properties["proj:epsg"]
return epsg_code
except KeyError:
raise KeyError("The 'proj:epsg' property is missing from the STAC item.")
def _get_items_by_epsg(
collection: pystac.Collection,
) -> Iterator[tuple[int, pystac.Item]]:
"""
Generator function that yields items grouped by their EPSG code.
Parameters:
collection (pystac.Collection): The STAC collection.
Yields:
tuple[int, pystac.Item]: EPSG code and corresponding STAC item.
"""
for item in collection.get_all_items():
epsg = _extract_epsg_from_stac_item(item)
yield epsg, item
def _create_collection_skeleton(
collection: pystac.Collection, epsg: int
) -> pystac.Collection:
"""
Create a skeleton for a new STAC collection with a given EPSG code.
Parameters:
collection (pystac.Collection): The original STAC collection.
epsg (int): The EPSG code.
Returns:
pystac.Collection: The skeleton of the new STAC collection.
"""
new_collection = pystac.Collection(
id=f"{collection.id}_{epsg}",
description=f"{collection.description} Containing only items with EPSG code {epsg}",
extent=collection.extent.clone(),
summaries=collection.summaries,
license=collection.license,
stac_extensions=collection.stac_extensions,
)
if "item_assets" in collection.extra_fields:
item_assets_extension = pystac.extensions.item_assets.ItemAssetsExtension.ext(
collection
)
new_item_assets_extension = (
pystac.extensions.item_assets.ItemAssetsExtension.ext(
new_collection, add_if_missing=True
)
)
new_item_assets_extension.item_assets = item_assets_extension.item_assets
return new_collection
def split_collection_by_epsg(
collection: Union[str, Path, pystac.Collection], output_dir: Union[str, Path]
):
"""
Split a STAC collection into multiple STAC collections based on EPSG code.
Parameters
----------
collection: Union[str, Path, pystac.Collection]
A collection of STAC items or a path to a STAC collection.
output_dir: Union[str, Path]
The directory where the split STAC collections will be saved.
"""
if not isinstance(collection, pystac.Collection):
collection = Path(collection)
output_dir = Path(output_dir)
os.makedirs(output_dir, exist_ok=True)
try:
collection = pystac.read_file(collection)
except pystac.STACError:
print("Please provide a path to a valid STAC collection.")
return
collections_by_epsg = {}
for epsg, item in _get_items_by_epsg(collection):
if epsg not in collections_by_epsg:
collections_by_epsg[epsg] = _create_collection_skeleton(collection, epsg)
# Add item to the corresponding collection
collections_by_epsg[epsg].add_item(item)
# Write each collection to disk
for epsg, new_collection in collections_by_epsg.items():
new_collection.update_extent_from_items() # Update extent based on added items
collection_path = output_dir / f"collection-{epsg}"
new_collection.normalize_hrefs(str(collection_path))
new_collection.save()
|