rodolphethinks1's picture
Rename test.py to notebook/test.py
353e3b1 verified
"""
Module for connecting to Copernicus Data Space Ecosystem through S3 and STAC interfaces.
"""
import io
import os
from urllib.parse import urlparse
import boto3
import pystac_client
from dotenv import load_dotenv
from PIL import Image
# Load environment variables from .env file
load_dotenv()
class S3Connector:
"""
A client for connecting to S3-compatible storage services.
This connector provides an interface to connect to an S3-compatible
storage service and retrieve the S3 resource object.
"""
def __init__(self, endpoint_url, access_key_id, secret_access_key, region_name='default'):
"""
Initialize S3 connector with credentials and endpoint information.
Parameters
----------
endpoint_url : str
The URL of the S3 endpoint
access_key_id : str
The access key for authentication
secret_access_key : str
The secret key for authentication
region_name : str, optional
The AWS region name, by default 'default'
"""
self.endpoint_url = endpoint_url
self.access_key_id = access_key_id
self.secret_access_key = secret_access_key
self.region_name = region_name
self.s3_client = None
def connect(self):
"""
Establish connection to S3 service.
Returns
-------
bool
True if connection was successful, False otherwise
"""
try:
# Also create a client object
self.s3_client = boto3.client(
's3',
endpoint_url=self.endpoint_url,
aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key,
region_name=self.region_name
)
return True
except Exception as e:
print(f"Connection failed: {e}")
return False
def get_s3(self):
"""
Return the S3 resource object.
If not already connected, this method will first establish a connection.
Returns
-------
boto3.resources.factory.s3.ServiceResource
The boto3 S3 resource object for interacting with S3 storage
"""
if not self.s3:
self.connect()
return self.s3
def get_s3_client(self):
"""
Return the S3 client object.
If not already connected, this method will first establish a connection.
Returns
-------
boto3.client.S3
The boto3 S3 client object for interacting with S3 storage
"""
if not self.s3_client:
self.connect()
return self.s3_client
def extract_s3_path_from_url(url):
"""
Extracts the S3 object path from an S3 URL or URI.
This function parses S3 URLs/URIs and returns just the object path portion,
removing the protocol (s3://), bucket name, and any leading slashes.
Args:
url (str): The full S3 URI (e.g., 's3://eodata/path/to/file.jp2')
Returns:
str: The S3 object path (without protocol, bucket name and leading slashes)
"""
# If it's not an S3 URI, return it unchanged
if not url.startswith('s3://'):
return url
# Parse the S3 URI
parsed_url = urlparse(url)
# Ensure this is an S3 URL
if parsed_url.scheme != 's3':
raise ValueError(f"URL {url} is not an S3 URL")
# Extract the path without leading slashes
object_path = parsed_url.path.lstrip('/')
return object_path
class ProductDownloader:
"""
Class for downloading products from Copernicus Data Space Ecosystem.
This class provides methods to download products using an S3 client connection,
either as in-memory content (bytes) or as files saved to disk.
"""
def __init__(self, s3_client, bucket_name='eodata'):
"""
Initialize the product downloader with an S3 client.
Args:
s3_client: The boto3 S3 client to use for downloads
bucket_name (str): The S3 bucket name where products are stored (default: 'eodata')
"""
self.s3_client = s3_client
self.bucket_name = bucket_name
def get_product_content(self, product_path):
"""
Download a Sentinel-2 product directly from Copernicus Data Space Ecosystem as a bytes object.
Args:
product_path (str): S3 key or full S3 URI to the product
Returns:
bytes: The product content as bytes
str: The filename of the product
"""
# Extract S3 key if full URI is provided
# Extract the filename from the path
_, filename = os.path.split(product_path)
# Download the file to a bytes buffer
try:
# Create a bytes buffer
buffer = io.BytesIO()
# Download the file to the buffer using the client
self.s3_client.download_fileobj(self.bucket_name, product_path, buffer)
# Reset buffer position to the start
buffer.seek(0)
# Get the bytes
product_content = buffer.getvalue()
print(f"Successfully downloaded product: {filename}")
# Return both the bytes and the filename
return product_content, filename
except Exception as e:
print(f"Error downloading product: {str(e)}")
raise
def download_product(self, product_path, output_filename=None):
"""
Download a Sentinel-2 product directly from Copernicus Data Space Ecosystem to disk.
Args:
product_path (str): S3 key or full S3 URI to the product
output_filename (str, optional): Filename to save the product to.
If None, uses the original filename.
Returns:
str: Path to the downloaded file
"""
# Extract S3 key if full URI is provided
if product_path.startswith('s3://'):
product_path = self.get_s3_key_from_href(product_path)
# Extract the filename from the path
_, filename = os.path.split(product_path)
# Use custom filename if provided, otherwise use the original
if output_filename is None:
output_filename = filename
# Download the file using the client
try:
self.s3_client.download_file(self.bucket_name, product_path, output_filename)
print(f"Successfully downloaded product {filename} to {output_filename}")
return output_filename
except Exception as e:
print(f"Error downloading product: {str(e)}")
raise
if __name__ == "__main__":
# Get credentials from environment variables
ACCESS_KEY_ID = os.environ.get("ACCESS_KEY_ID")
SECRET_ACCESS_KEY = os.environ.get("SECRET_ACCESS_KEY")
ENDPOINT_URL = 'https://eodata.dataspace.copernicus.eu'
ENDPOINT_STAC = "https://stac.dataspace.copernicus.eu/v1/"
LON, LAT = 15, 50
# Initialize the connector
s3_connector = S3Connector(
endpoint_url=ENDPOINT_URL,
access_key_id=ACCESS_KEY_ID,
secret_access_key=SECRET_ACCESS_KEY
)
# Connect to S3
s3_connector.connect()
s3_client = s3_connector.get_s3_client()
catalog = pystac_client.Client.open(ENDPOINT_STAC)
# Search for Sentinel-2 products
# items_txt = catalog.search(
# collections=['sentinel-2-l2a'],
# intersects=dict(type="Point", coordinates=[LON, LAT]),
# datetime="2024-05-01/2024-06-01",
# query=["eo:cloud_cover<50"]
# ).item_collection()
# Define bounding box coordinates [min_lon, min_lat, max_lon, max_lat]
bbox = [150.47, -21.42, 151.47, -20.42] # 1° box around LON=150.97, LAT=-20.92
# Search for Sentinel-2 products within the bounding box
items_txt = catalog.search(
collections=['sentinel-2-l2a'],
bbox=bbox,
datetime="2024-05-01/2024-06-01",
query=["eo:cloud_cover<50"]
).item_collection()
for item in items_txt:
product_url = extract_s3_path_from_url(item.assets['TCI_10m'].href)
print(product_url)
# Initialize the handler with the S3 connector
handler = ProductDownloader(s3_client=s3_client, bucket_name='eodata')
# # Get the image content as bytes
# image_content, filename = handler.get_product_content(product_url)
# print(f"Downloaded {filename}, content size: {len(image_content)} bytes")
# Download the image to a file
downloaded_file = handler.download_product(product_url)
print(f"Downloaded file saved to {downloaded_file}")
# product_url = extract_s3_path_from_url(items_txt[0].assets['TCI_60m'].href)
# print(product_url)
# # Initialize the handler with the S3 connector
# handler = ProductDownloader(s3_client=s3_client, bucket_name='eodata')
# # Get the image content as bytes
# image_content, filename = handler.get_product_content(product_url)
# print(f"Downloaded {filename}, content size: {len(image_content)} bytes")
# # Download the image to a file
# downloaded_file = handler.download_product(product_url)
# print(f"Downloaded file saved to {downloaded_file}")
# from PIL import Image
# image = Image.open(io.BytesIO(image_content))