Spaces:
Sleeping
Sleeping
File size: 9,702 Bytes
4ad7bbf | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 | """
Module for connecting to Copernicus Data Space Ecosystem through S3 and STAC interfaces.
"""
import io
import os
from urllib.parse import urlparse
import boto3
import pystac_client
from dotenv import load_dotenv
from PIL import Image
# Load environment variables from .env file
load_dotenv()
class S3Connector:
"""
A client for connecting to S3-compatible storage services.
This connector provides an interface to connect to an S3-compatible
storage service and retrieve the S3 resource object.
"""
def __init__(self, endpoint_url, access_key_id, secret_access_key, region_name='default'):
"""
Initialize S3 connector with credentials and endpoint information.
Parameters
----------
endpoint_url : str
The URL of the S3 endpoint
access_key_id : str
The access key for authentication
secret_access_key : str
The secret key for authentication
region_name : str, optional
The AWS region name, by default 'default'
"""
self.endpoint_url = endpoint_url
self.access_key_id = access_key_id
self.secret_access_key = secret_access_key
self.region_name = region_name
self.s3_client = None
def connect(self):
"""
Establish connection to S3 service.
Returns
-------
bool
True if connection was successful, False otherwise
"""
try:
# Also create a client object
self.s3_client = boto3.client(
's3',
endpoint_url=self.endpoint_url,
aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key,
region_name=self.region_name
)
return True
except Exception as e:
print(f"Connection failed: {e}")
return False
def get_s3(self):
"""
Return the S3 resource object.
If not already connected, this method will first establish a connection.
Returns
-------
boto3.resources.factory.s3.ServiceResource
The boto3 S3 resource object for interacting with S3 storage
"""
if not self.s3:
self.connect()
return self.s3
def get_s3_client(self):
"""
Return the S3 client object.
If not already connected, this method will first establish a connection.
Returns
-------
boto3.client.S3
The boto3 S3 client object for interacting with S3 storage
"""
if not self.s3_client:
self.connect()
return self.s3_client
def extract_s3_path_from_url(url):
"""
Extracts the S3 object path from an S3 URL or URI.
This function parses S3 URLs/URIs and returns just the object path portion,
removing the protocol (s3://), bucket name, and any leading slashes.
Args:
url (str): The full S3 URI (e.g., 's3://eodata/path/to/file.jp2')
Returns:
str: The S3 object path (without protocol, bucket name and leading slashes)
"""
# If it's not an S3 URI, return it unchanged
if not url.startswith('s3://'):
return url
# Parse the S3 URI
parsed_url = urlparse(url)
# Ensure this is an S3 URL
if parsed_url.scheme != 's3':
raise ValueError(f"URL {url} is not an S3 URL")
# Extract the path without leading slashes
object_path = parsed_url.path.lstrip('/')
return object_path
class ProductDownloader:
"""
Class for downloading products from Copernicus Data Space Ecosystem.
This class provides methods to download products using an S3 client connection,
either as in-memory content (bytes) or as files saved to disk.
"""
def __init__(self, s3_client, bucket_name='eodata'):
"""
Initialize the product downloader with an S3 client.
Args:
s3_client: The boto3 S3 client to use for downloads
bucket_name (str): The S3 bucket name where products are stored (default: 'eodata')
"""
self.s3_client = s3_client
self.bucket_name = bucket_name
def get_product_content(self, product_path):
"""
Download a Sentinel-2 product directly from Copernicus Data Space Ecosystem as a bytes object.
Args:
product_path (str): S3 key or full S3 URI to the product
Returns:
bytes: The product content as bytes
str: The filename of the product
"""
# Extract S3 key if full URI is provided
# Extract the filename from the path
_, filename = os.path.split(product_path)
# Download the file to a bytes buffer
try:
# Create a bytes buffer
buffer = io.BytesIO()
# Download the file to the buffer using the client
self.s3_client.download_fileobj(self.bucket_name, product_path, buffer)
# Reset buffer position to the start
buffer.seek(0)
# Get the bytes
product_content = buffer.getvalue()
print(f"Successfully downloaded product: {filename}")
# Return both the bytes and the filename
return product_content, filename
except Exception as e:
print(f"Error downloading product: {str(e)}")
raise
def download_product(self, product_path, output_filename=None):
"""
Download a Sentinel-2 product directly from Copernicus Data Space Ecosystem to disk.
Args:
product_path (str): S3 key or full S3 URI to the product
output_filename (str, optional): Filename to save the product to.
If None, uses the original filename.
Returns:
str: Path to the downloaded file
"""
# Extract S3 key if full URI is provided
if product_path.startswith('s3://'):
product_path = self.get_s3_key_from_href(product_path)
# Extract the filename from the path
_, filename = os.path.split(product_path)
# Use custom filename if provided, otherwise use the original
if output_filename is None:
output_filename = filename
# Download the file using the client
try:
self.s3_client.download_file(self.bucket_name, product_path, output_filename)
print(f"Successfully downloaded product {filename} to {output_filename}")
return output_filename
except Exception as e:
print(f"Error downloading product: {str(e)}")
raise
if __name__ == "__main__":
# Get credentials from environment variables
ACCESS_KEY_ID = os.environ.get("ACCESS_KEY_ID")
SECRET_ACCESS_KEY = os.environ.get("SECRET_ACCESS_KEY")
ENDPOINT_URL = 'https://eodata.dataspace.copernicus.eu'
ENDPOINT_STAC = "https://stac.dataspace.copernicus.eu/v1/"
LON, LAT = 15, 50
# Initialize the connector
s3_connector = S3Connector(
endpoint_url=ENDPOINT_URL,
access_key_id=ACCESS_KEY_ID,
secret_access_key=SECRET_ACCESS_KEY
)
# Connect to S3
s3_connector.connect()
s3_client = s3_connector.get_s3_client()
catalog = pystac_client.Client.open(ENDPOINT_STAC)
# Search for Sentinel-2 products
# items_txt = catalog.search(
# collections=['sentinel-2-l2a'],
# intersects=dict(type="Point", coordinates=[LON, LAT]),
# datetime="2024-05-01/2024-06-01",
# query=["eo:cloud_cover<50"]
# ).item_collection()
# Define bounding box coordinates [min_lon, min_lat, max_lon, max_lat]
bbox = [150.47, -21.42, 151.47, -20.42] # 1° box around LON=150.97, LAT=-20.92
# Search for Sentinel-2 products within the bounding box
items_txt = catalog.search(
collections=['sentinel-2-l2a'],
bbox=bbox,
datetime="2024-05-01/2024-06-01",
query=["eo:cloud_cover<50"]
).item_collection()
for item in items_txt:
product_url = extract_s3_path_from_url(item.assets['TCI_10m'].href)
print(product_url)
# Initialize the handler with the S3 connector
handler = ProductDownloader(s3_client=s3_client, bucket_name='eodata')
# # Get the image content as bytes
# image_content, filename = handler.get_product_content(product_url)
# print(f"Downloaded {filename}, content size: {len(image_content)} bytes")
# Download the image to a file
downloaded_file = handler.download_product(product_url)
print(f"Downloaded file saved to {downloaded_file}")
# product_url = extract_s3_path_from_url(items_txt[0].assets['TCI_60m'].href)
# print(product_url)
# # Initialize the handler with the S3 connector
# handler = ProductDownloader(s3_client=s3_client, bucket_name='eodata')
# # Get the image content as bytes
# image_content, filename = handler.get_product_content(product_url)
# print(f"Downloaded {filename}, content size: {len(image_content)} bytes")
# # Download the image to a file
# downloaded_file = handler.download_product(product_url)
# print(f"Downloaded file saved to {downloaded_file}")
# from PIL import Image
# image = Image.open(io.BytesIO(image_content))
|