Spaces:
Sleeping
Sleeping
| """ | |
| Module for connecting to Copernicus Data Space Ecosystem through S3 and STAC interfaces. | |
| """ | |
| import io | |
| import os | |
| from urllib.parse import urlparse | |
| import boto3 | |
| import pystac_client | |
| from dotenv import load_dotenv | |
| from PIL import Image | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| class S3Connector: | |
| """ | |
| A client for connecting to S3-compatible storage services. | |
| This connector provides an interface to connect to an S3-compatible | |
| storage service and retrieve the S3 resource object. | |
| """ | |
| def __init__(self, endpoint_url, access_key_id, secret_access_key, region_name='default'): | |
| """ | |
| Initialize S3 connector with credentials and endpoint information. | |
| Parameters | |
| ---------- | |
| endpoint_url : str | |
| The URL of the S3 endpoint | |
| access_key_id : str | |
| The access key for authentication | |
| secret_access_key : str | |
| The secret key for authentication | |
| region_name : str, optional | |
| The AWS region name, by default 'default' | |
| """ | |
| self.endpoint_url = endpoint_url | |
| self.access_key_id = access_key_id | |
| self.secret_access_key = secret_access_key | |
| self.region_name = region_name | |
| self.s3_client = None | |
| def connect(self): | |
| """ | |
| Establish connection to S3 service. | |
| Returns | |
| ------- | |
| bool | |
| True if connection was successful, False otherwise | |
| """ | |
| try: | |
| # Also create a client object | |
| self.s3_client = boto3.client( | |
| 's3', | |
| endpoint_url=self.endpoint_url, | |
| aws_access_key_id=self.access_key_id, | |
| aws_secret_access_key=self.secret_access_key, | |
| region_name=self.region_name | |
| ) | |
| return True | |
| except Exception as e: | |
| print(f"Connection failed: {e}") | |
| return False | |
| def get_s3(self): | |
| """ | |
| Return the S3 resource object. | |
| If not already connected, this method will first establish a connection. | |
| Returns | |
| ------- | |
| boto3.resources.factory.s3.ServiceResource | |
| The boto3 S3 resource object for interacting with S3 storage | |
| """ | |
| if not self.s3: | |
| self.connect() | |
| return self.s3 | |
| def get_s3_client(self): | |
| """ | |
| Return the S3 client object. | |
| If not already connected, this method will first establish a connection. | |
| Returns | |
| ------- | |
| boto3.client.S3 | |
| The boto3 S3 client object for interacting with S3 storage | |
| """ | |
| if not self.s3_client: | |
| self.connect() | |
| return self.s3_client | |
| def extract_s3_path_from_url(url): | |
| """ | |
| Extracts the S3 object path from an S3 URL or URI. | |
| This function parses S3 URLs/URIs and returns just the object path portion, | |
| removing the protocol (s3://), bucket name, and any leading slashes. | |
| Args: | |
| url (str): The full S3 URI (e.g., 's3://eodata/path/to/file.jp2') | |
| Returns: | |
| str: The S3 object path (without protocol, bucket name and leading slashes) | |
| """ | |
| # If it's not an S3 URI, return it unchanged | |
| if not url.startswith('s3://'): | |
| return url | |
| # Parse the S3 URI | |
| parsed_url = urlparse(url) | |
| # Ensure this is an S3 URL | |
| if parsed_url.scheme != 's3': | |
| raise ValueError(f"URL {url} is not an S3 URL") | |
| # Extract the path without leading slashes | |
| object_path = parsed_url.path.lstrip('/') | |
| return object_path | |
| class ProductDownloader: | |
| """ | |
| Class for downloading products from Copernicus Data Space Ecosystem. | |
| This class provides methods to download products using an S3 client connection, | |
| either as in-memory content (bytes) or as files saved to disk. | |
| """ | |
| def __init__(self, s3_client, bucket_name='eodata'): | |
| """ | |
| Initialize the product downloader with an S3 client. | |
| Args: | |
| s3_client: The boto3 S3 client to use for downloads | |
| bucket_name (str): The S3 bucket name where products are stored (default: 'eodata') | |
| """ | |
| self.s3_client = s3_client | |
| self.bucket_name = bucket_name | |
| def get_product_content(self, product_path): | |
| """ | |
| Download a Sentinel-2 product directly from Copernicus Data Space Ecosystem as a bytes object. | |
| Args: | |
| product_path (str): S3 key or full S3 URI to the product | |
| Returns: | |
| bytes: The product content as bytes | |
| str: The filename of the product | |
| """ | |
| # Extract S3 key if full URI is provided | |
| # Extract the filename from the path | |
| _, filename = os.path.split(product_path) | |
| # Download the file to a bytes buffer | |
| try: | |
| # Create a bytes buffer | |
| buffer = io.BytesIO() | |
| # Download the file to the buffer using the client | |
| self.s3_client.download_fileobj(self.bucket_name, product_path, buffer) | |
| # Reset buffer position to the start | |
| buffer.seek(0) | |
| # Get the bytes | |
| product_content = buffer.getvalue() | |
| print(f"Successfully downloaded product: {filename}") | |
| # Return both the bytes and the filename | |
| return product_content, filename | |
| except Exception as e: | |
| print(f"Error downloading product: {str(e)}") | |
| raise | |
| def download_product(self, product_path, output_filename=None): | |
| """ | |
| Download a Sentinel-2 product directly from Copernicus Data Space Ecosystem to disk. | |
| Args: | |
| product_path (str): S3 key or full S3 URI to the product | |
| output_filename (str, optional): Filename to save the product to. | |
| If None, uses the original filename. | |
| Returns: | |
| str: Path to the downloaded file | |
| """ | |
| # Extract S3 key if full URI is provided | |
| if product_path.startswith('s3://'): | |
| product_path = self.get_s3_key_from_href(product_path) | |
| # Extract the filename from the path | |
| _, filename = os.path.split(product_path) | |
| # Use custom filename if provided, otherwise use the original | |
| if output_filename is None: | |
| output_filename = filename | |
| # Download the file using the client | |
| try: | |
| self.s3_client.download_file(self.bucket_name, product_path, output_filename) | |
| print(f"Successfully downloaded product {filename} to {output_filename}") | |
| return output_filename | |
| except Exception as e: | |
| print(f"Error downloading product: {str(e)}") | |
| raise | |
| if __name__ == "__main__": | |
| # Get credentials from environment variables | |
| ACCESS_KEY_ID = os.environ.get("ACCESS_KEY_ID") | |
| SECRET_ACCESS_KEY = os.environ.get("SECRET_ACCESS_KEY") | |
| ENDPOINT_URL = 'https://eodata.dataspace.copernicus.eu' | |
| ENDPOINT_STAC = "https://stac.dataspace.copernicus.eu/v1/" | |
| LON, LAT = 15, 50 | |
| # Initialize the connector | |
| s3_connector = S3Connector( | |
| endpoint_url=ENDPOINT_URL, | |
| access_key_id=ACCESS_KEY_ID, | |
| secret_access_key=SECRET_ACCESS_KEY | |
| ) | |
| # Connect to S3 | |
| s3_connector.connect() | |
| s3_client = s3_connector.get_s3_client() | |
| catalog = pystac_client.Client.open(ENDPOINT_STAC) | |
| # Search for Sentinel-2 products | |
| # items_txt = catalog.search( | |
| # collections=['sentinel-2-l2a'], | |
| # intersects=dict(type="Point", coordinates=[LON, LAT]), | |
| # datetime="2024-05-01/2024-06-01", | |
| # query=["eo:cloud_cover<50"] | |
| # ).item_collection() | |
| # Define bounding box coordinates [min_lon, min_lat, max_lon, max_lat] | |
| bbox = [150.47, -21.42, 151.47, -20.42] # 1° box around LON=150.97, LAT=-20.92 | |
| # Search for Sentinel-2 products within the bounding box | |
| items_txt = catalog.search( | |
| collections=['sentinel-2-l2a'], | |
| bbox=bbox, | |
| datetime="2024-05-01/2024-06-01", | |
| query=["eo:cloud_cover<50"] | |
| ).item_collection() | |
| for item in items_txt: | |
| product_url = extract_s3_path_from_url(item.assets['TCI_10m'].href) | |
| print(product_url) | |
| # Initialize the handler with the S3 connector | |
| handler = ProductDownloader(s3_client=s3_client, bucket_name='eodata') | |
| # # Get the image content as bytes | |
| # image_content, filename = handler.get_product_content(product_url) | |
| # print(f"Downloaded {filename}, content size: {len(image_content)} bytes") | |
| # Download the image to a file | |
| downloaded_file = handler.download_product(product_url) | |
| print(f"Downloaded file saved to {downloaded_file}") | |
| # product_url = extract_s3_path_from_url(items_txt[0].assets['TCI_60m'].href) | |
| # print(product_url) | |
| # # Initialize the handler with the S3 connector | |
| # handler = ProductDownloader(s3_client=s3_client, bucket_name='eodata') | |
| # # Get the image content as bytes | |
| # image_content, filename = handler.get_product_content(product_url) | |
| # print(f"Downloaded {filename}, content size: {len(image_content)} bytes") | |
| # # Download the image to a file | |
| # downloaded_file = handler.download_product(product_url) | |
| # print(f"Downloaded file saved to {downloaded_file}") | |
| # from PIL import Image | |
| # image = Image.open(io.BytesIO(image_content)) | |