import io from datetime import datetime, timedelta import random import requests from pystac_client import Client import os from src.auth.auth import get_direct_access_token from src.utils.image import extract_url_after_filename class ProductDownloader: """ Class for downloading products from Copernicus Data Space Ecosystem. This class provides methods to download products using an S3 client connection, either as in-memory content (bytes) or as files saved to disk. """ def __init__(self, s3_client, bucket_name='eodata'): """ Initialize the product downloader with an S3 client. Args: s3_client: The boto3 S3 client to use for downloads bucket_name (str): The S3 bucket name where products are stored (default: 'eodata') """ self.s3_client = s3_client self.bucket_name = bucket_name def get_product_content(self, product_path): """ Download a Sentinel-2 product directly from Copernicus Data Space Ecosystem as a bytes object. Args: product_path (str): S3 key or full S3 URI to the product Returns: bytes: The product content as bytes str: The filename of the product """ # Extract S3 key if full URI is provided # Extract the filename from the path _, filename = os.path.split(product_path) # Download the file to a bytes buffer try: # Create a bytes buffer buffer = io.BytesIO() # Download the file to the buffer using the client self.s3_client.download_fileobj(self.bucket_name, product_path, buffer) # Reset buffer position to the start buffer.seek(0) # Get the bytes product_content = buffer.getvalue() print(f"Successfully downloaded product: {filename}") # Return both the bytes and the filename return product_content, filename except Exception as e: print(f"Error downloading product: {str(e)}") raise async def download_product(self, product_path, output_filename=None): """ Download a Sentinel-2 product directly from Copernicus Data Space Ecosystem to disk. Args: product_path (str): S3 key or full S3 URI to the product output_filename (str, optional): Filename to save the product to. If None, uses the original filename. Returns: str: Path to the downloaded file """ # Extract S3 key if full URI is provided if product_path.startswith('s3://'): product_path = self.get_s3_key_from_href(product_path) # Extract the filename from the path _, filename = os.path.split(product_path) # Use custom filename if provided, otherwise use the original if output_filename is None: output_filename = filename # Download the file using the client try: self.s3_client.download_file(self.bucket_name, product_path, output_filename) print(f"Successfully downloaded product {filename} to {output_filename}") return output_filename except Exception as e: print(f"Error downloading product: {str(e)}") raise from src.auth.auth import get_direct_access_token from src.utils.image import extract_url_after_filename def download_sentinel_image(username, password, start_date, end_date, bbox=[-180, -90, 180, 90], limit=10): """ Download a random Sentinel-2 image based on criteria. Args: username (str): DESTINE username password (str): DESTINE password # date_range (str): Date range in format "YYYY-MM-DD/YYYY-MM-DD" cloud_cover (int, optional): Maximum cloud cover percentage bbox (list): Bounding box coordinates [west, south, east, north] limit (int): Maximum number of results to return Returns: tuple: (image_content or error_message, metadata) """ # Get access token token_result = get_direct_access_token(username=username, password=password) if not token_result: return "Failed to authenticate", None access_token = token_result["access_token"] # Set up STAC API client stac_base_url = "https://cachea.destine.eu" stac_url = f"{stac_base_url}/stac/api" catalog = Client.open(stac_url) start_date = datetime.strptime(start_date, "%Y-%m-%d") end_date = datetime.strptime(end_date, "%Y-%m-%d") days_between = (end_date - start_date).days random_start_day = random.randint(0, days_between - 7) # Ensure we have 7 days random_start_date = start_date + timedelta(days=random_start_day) random_end_date = random_start_date + timedelta(days=1) # Format dates for the API start_date_str = random_start_date.strftime("%Y-%m-%d") end_date_str = random_end_date.strftime("%Y-%m-%d") # Build search parameters search_params = { "method": "GET", "collections": ["SENTINEL-2"], "bbox": bbox, "datetime": f"{start_date_str}/{end_date_str}", "limit": limit } # Search for Sentinel-2 images search = catalog.search(**search_params) # Get a list of items items = list(search.items()) if not items: return "No Sentinel-2 images found", None # Select a random item random_item = random.choice(items) # Get metadata for the selected item metadata = { "id": random_item.id, "datetime": random_item.datetime.strftime("%Y-%m-%d %H:%M:%S"), "bbox": random_item.bbox, } # Get the assets of the random item assets = random_item.assets asset_keys = list(assets.keys()) # Filter the assets to get the one that ends with *_TCI_60m.jp2 tci_assets = [assets[key].href for key in asset_keys if assets[key].href.endswith('_TCI_60m.jp2')] if not tci_assets: return "No TCI assets found in the selected image", None filepath = extract_url_after_filename(tci_assets[0]) metadata["filename"] = os.path.basename(filepath) # Download the file url = f"{stac_base_url}/stac/download?filename={filepath}" headers = { 'Authorization': f'Bearer {access_token}' } response = requests.post(url, headers=headers, data={}) if response.status_code == 200: return response.content, metadata else: return f"Failed to download the file. Status code: {response.status_code}", None