mapster_space / src /utils /stac_client.py
rodolphethinks1's picture
Create utils/stac_client.py
fca871f verified
import io
from datetime import datetime, timedelta
import random
import requests
from pystac_client import Client
import os
from src.auth.auth import get_direct_access_token
from src.utils.image import extract_url_after_filename
class ProductDownloader:
"""
Class for downloading products from Copernicus Data Space Ecosystem.
This class provides methods to download products using an S3 client connection,
either as in-memory content (bytes) or as files saved to disk.
"""
def __init__(self, s3_client, bucket_name='eodata'):
"""
Initialize the product downloader with an S3 client.
Args:
s3_client: The boto3 S3 client to use for downloads
bucket_name (str): The S3 bucket name where products are stored (default: 'eodata')
"""
self.s3_client = s3_client
self.bucket_name = bucket_name
def get_product_content(self, product_path):
"""
Download a Sentinel-2 product directly from Copernicus Data Space Ecosystem as a bytes object.
Args:
product_path (str): S3 key or full S3 URI to the product
Returns:
bytes: The product content as bytes
str: The filename of the product
"""
# Extract S3 key if full URI is provided
# Extract the filename from the path
_, filename = os.path.split(product_path)
# Download the file to a bytes buffer
try:
# Create a bytes buffer
buffer = io.BytesIO()
# Download the file to the buffer using the client
self.s3_client.download_fileobj(self.bucket_name, product_path, buffer)
# Reset buffer position to the start
buffer.seek(0)
# Get the bytes
product_content = buffer.getvalue()
print(f"Successfully downloaded product: {filename}")
# Return both the bytes and the filename
return product_content, filename
except Exception as e:
print(f"Error downloading product: {str(e)}")
raise
async def download_product(self, product_path, output_filename=None):
"""
Download a Sentinel-2 product directly from Copernicus Data Space Ecosystem to disk.
Args:
product_path (str): S3 key or full S3 URI to the product
output_filename (str, optional): Filename to save the product to.
If None, uses the original filename.
Returns:
str: Path to the downloaded file
"""
# Extract S3 key if full URI is provided
if product_path.startswith('s3://'):
product_path = self.get_s3_key_from_href(product_path)
# Extract the filename from the path
_, filename = os.path.split(product_path)
# Use custom filename if provided, otherwise use the original
if output_filename is None:
output_filename = filename
# Download the file using the client
try:
self.s3_client.download_file(self.bucket_name, product_path, output_filename)
print(f"Successfully downloaded product {filename} to {output_filename}")
return output_filename
except Exception as e:
print(f"Error downloading product: {str(e)}")
raise
from src.auth.auth import get_direct_access_token
from src.utils.image import extract_url_after_filename
def download_sentinel_image(username, password, start_date, end_date,
bbox=[-180, -90, 180, 90], limit=10):
"""
Download a random Sentinel-2 image based on criteria.
Args:
username (str): DESTINE username
password (str): DESTINE password
# date_range (str): Date range in format "YYYY-MM-DD/YYYY-MM-DD"
cloud_cover (int, optional): Maximum cloud cover percentage
bbox (list): Bounding box coordinates [west, south, east, north]
limit (int): Maximum number of results to return
Returns:
tuple: (image_content or error_message, metadata)
"""
# Get access token
token_result = get_direct_access_token(username=username, password=password)
if not token_result:
return "Failed to authenticate", None
access_token = token_result["access_token"]
# Set up STAC API client
stac_base_url = "https://cachea.destine.eu"
stac_url = f"{stac_base_url}/stac/api"
catalog = Client.open(stac_url)
start_date = datetime.strptime(start_date, "%Y-%m-%d")
end_date = datetime.strptime(end_date, "%Y-%m-%d")
days_between = (end_date - start_date).days
random_start_day = random.randint(0, days_between - 7) # Ensure we have 7 days
random_start_date = start_date + timedelta(days=random_start_day)
random_end_date = random_start_date + timedelta(days=1)
# Format dates for the API
start_date_str = random_start_date.strftime("%Y-%m-%d")
end_date_str = random_end_date.strftime("%Y-%m-%d")
# Build search parameters
search_params = {
"method": "GET",
"collections": ["SENTINEL-2"],
"bbox": bbox,
"datetime": f"{start_date_str}/{end_date_str}",
"limit": limit
}
# Search for Sentinel-2 images
search = catalog.search(**search_params)
# Get a list of items
items = list(search.items())
if not items:
return "No Sentinel-2 images found", None
# Select a random item
random_item = random.choice(items)
# Get metadata for the selected item
metadata = {
"id": random_item.id,
"datetime": random_item.datetime.strftime("%Y-%m-%d %H:%M:%S"),
"bbox": random_item.bbox,
}
# Get the assets of the random item
assets = random_item.assets
asset_keys = list(assets.keys())
# Filter the assets to get the one that ends with *_TCI_60m.jp2
tci_assets = [assets[key].href for key in asset_keys if assets[key].href.endswith('_TCI_60m.jp2')]
if not tci_assets:
return "No TCI assets found in the selected image", None
filepath = extract_url_after_filename(tci_assets[0])
metadata["filename"] = os.path.basename(filepath)
# Download the file
url = f"{stac_base_url}/stac/download?filename={filepath}"
headers = {
'Authorization': f'Bearer {access_token}'
}
response = requests.post(url, headers=headers, data={})
if response.status_code == 200:
return response.content, metadata
else:
return f"Failed to download the file. Status code: {response.status_code}", None