""" Module for connecting to Copernicus Data Space Ecosystem through S3 and STAC interfaces. """ import io import os from urllib.parse import urlparse import boto3 import pystac_client from dotenv import load_dotenv from PIL import Image # Load environment variables from .env file load_dotenv() class S3Connector: """ A client for connecting to S3-compatible storage services. This connector provides an interface to connect to an S3-compatible storage service and retrieve the S3 resource object. """ def __init__(self, endpoint_url, access_key_id, secret_access_key, region_name='default'): """ Initialize S3 connector with credentials and endpoint information. Parameters ---------- endpoint_url : str The URL of the S3 endpoint access_key_id : str The access key for authentication secret_access_key : str The secret key for authentication region_name : str, optional The AWS region name, by default 'default' """ self.endpoint_url = endpoint_url self.access_key_id = access_key_id self.secret_access_key = secret_access_key self.region_name = region_name self.s3_client = None def connect(self): """ Establish connection to S3 service. Returns ------- bool True if connection was successful, False otherwise """ try: # Also create a client object self.s3_client = boto3.client( 's3', endpoint_url=self.endpoint_url, aws_access_key_id=self.access_key_id, aws_secret_access_key=self.secret_access_key, region_name=self.region_name ) return True except Exception as e: print(f"Connection failed: {e}") return False def get_s3(self): """ Return the S3 resource object. If not already connected, this method will first establish a connection. Returns ------- boto3.resources.factory.s3.ServiceResource The boto3 S3 resource object for interacting with S3 storage """ if not self.s3: self.connect() return self.s3 def get_s3_client(self): """ Return the S3 client object. If not already connected, this method will first establish a connection. Returns ------- boto3.client.S3 The boto3 S3 client object for interacting with S3 storage """ if not self.s3_client: self.connect() return self.s3_client def extract_s3_path_from_url(url): """ Extracts the S3 object path from an S3 URL or URI. This function parses S3 URLs/URIs and returns just the object path portion, removing the protocol (s3://), bucket name, and any leading slashes. Args: url (str): The full S3 URI (e.g., 's3://eodata/path/to/file.jp2') Returns: str: The S3 object path (without protocol, bucket name and leading slashes) """ # If it's not an S3 URI, return it unchanged if not url.startswith('s3://'): return url # Parse the S3 URI parsed_url = urlparse(url) # Ensure this is an S3 URL if parsed_url.scheme != 's3': raise ValueError(f"URL {url} is not an S3 URL") # Extract the path without leading slashes object_path = parsed_url.path.lstrip('/') return object_path class ProductDownloader: """ Class for downloading products from Copernicus Data Space Ecosystem. This class provides methods to download products using an S3 client connection, either as in-memory content (bytes) or as files saved to disk. """ def __init__(self, s3_client, bucket_name='eodata'): """ Initialize the product downloader with an S3 client. Args: s3_client: The boto3 S3 client to use for downloads bucket_name (str): The S3 bucket name where products are stored (default: 'eodata') """ self.s3_client = s3_client self.bucket_name = bucket_name def get_product_content(self, product_path): """ Download a Sentinel-2 product directly from Copernicus Data Space Ecosystem as a bytes object. Args: product_path (str): S3 key or full S3 URI to the product Returns: bytes: The product content as bytes str: The filename of the product """ # Extract S3 key if full URI is provided # Extract the filename from the path _, filename = os.path.split(product_path) # Download the file to a bytes buffer try: # Create a bytes buffer buffer = io.BytesIO() # Download the file to the buffer using the client self.s3_client.download_fileobj(self.bucket_name, product_path, buffer) # Reset buffer position to the start buffer.seek(0) # Get the bytes product_content = buffer.getvalue() print(f"Successfully downloaded product: {filename}") # Return both the bytes and the filename return product_content, filename except Exception as e: print(f"Error downloading product: {str(e)}") raise def download_product(self, product_path, output_filename=None): """ Download a Sentinel-2 product directly from Copernicus Data Space Ecosystem to disk. Args: product_path (str): S3 key or full S3 URI to the product output_filename (str, optional): Filename to save the product to. If None, uses the original filename. Returns: str: Path to the downloaded file """ # Extract S3 key if full URI is provided if product_path.startswith('s3://'): product_path = self.get_s3_key_from_href(product_path) # Extract the filename from the path _, filename = os.path.split(product_path) # Use custom filename if provided, otherwise use the original if output_filename is None: output_filename = filename # Download the file using the client try: self.s3_client.download_file(self.bucket_name, product_path, output_filename) print(f"Successfully downloaded product {filename} to {output_filename}") return output_filename except Exception as e: print(f"Error downloading product: {str(e)}") raise if __name__ == "__main__": # Get credentials from environment variables ACCESS_KEY_ID = os.environ.get("ACCESS_KEY_ID") SECRET_ACCESS_KEY = os.environ.get("SECRET_ACCESS_KEY") ENDPOINT_URL = 'https://eodata.dataspace.copernicus.eu' ENDPOINT_STAC = "https://stac.dataspace.copernicus.eu/v1/" LON, LAT = 15, 50 # Initialize the connector s3_connector = S3Connector( endpoint_url=ENDPOINT_URL, access_key_id=ACCESS_KEY_ID, secret_access_key=SECRET_ACCESS_KEY ) # Connect to S3 s3_connector.connect() s3_client = s3_connector.get_s3_client() catalog = pystac_client.Client.open(ENDPOINT_STAC) # Search for Sentinel-2 products # items_txt = catalog.search( # collections=['sentinel-2-l2a'], # intersects=dict(type="Point", coordinates=[LON, LAT]), # datetime="2024-05-01/2024-06-01", # query=["eo:cloud_cover<50"] # ).item_collection() # Define bounding box coordinates [min_lon, min_lat, max_lon, max_lat] bbox = [150.47, -21.42, 151.47, -20.42] # 1° box around LON=150.97, LAT=-20.92 # Search for Sentinel-2 products within the bounding box items_txt = catalog.search( collections=['sentinel-2-l2a'], bbox=bbox, datetime="2024-05-01/2024-06-01", query=["eo:cloud_cover<50"] ).item_collection() for item in items_txt: product_url = extract_s3_path_from_url(item.assets['TCI_10m'].href) print(product_url) # Initialize the handler with the S3 connector handler = ProductDownloader(s3_client=s3_client, bucket_name='eodata') # # Get the image content as bytes # image_content, filename = handler.get_product_content(product_url) # print(f"Downloaded {filename}, content size: {len(image_content)} bytes") # Download the image to a file downloaded_file = handler.download_product(product_url) print(f"Downloaded file saved to {downloaded_file}") # product_url = extract_s3_path_from_url(items_txt[0].assets['TCI_60m'].href) # print(product_url) # # Initialize the handler with the S3 connector # handler = ProductDownloader(s3_client=s3_client, bucket_name='eodata') # # Get the image content as bytes # image_content, filename = handler.get_product_content(product_url) # print(f"Downloaded {filename}, content size: {len(image_content)} bytes") # # Download the image to a file # downloaded_file = handler.download_product(product_url) # print(f"Downloaded file saved to {downloaded_file}") # from PIL import Image # image = Image.open(io.BytesIO(image_content))