Spaces:
Sleeping
Sleeping
| import io | |
| from datetime import datetime, timedelta | |
| import random | |
| import requests | |
| from pystac_client import Client | |
| import os | |
| from src.auth.auth import get_direct_access_token | |
| from src.utils.image import extract_url_after_filename | |
| class ProductDownloader: | |
| """ | |
| Class for downloading products from Copernicus Data Space Ecosystem. | |
| This class provides methods to download products using an S3 client connection, | |
| either as in-memory content (bytes) or as files saved to disk. | |
| """ | |
| def __init__(self, s3_client, bucket_name='eodata'): | |
| """ | |
| Initialize the product downloader with an S3 client. | |
| Args: | |
| s3_client: The boto3 S3 client to use for downloads | |
| bucket_name (str): The S3 bucket name where products are stored (default: 'eodata') | |
| """ | |
| self.s3_client = s3_client | |
| self.bucket_name = bucket_name | |
| def get_product_content(self, product_path): | |
| """ | |
| Download a Sentinel-2 product directly from Copernicus Data Space Ecosystem as a bytes object. | |
| Args: | |
| product_path (str): S3 key or full S3 URI to the product | |
| Returns: | |
| bytes: The product content as bytes | |
| str: The filename of the product | |
| """ | |
| # Extract S3 key if full URI is provided | |
| # Extract the filename from the path | |
| _, filename = os.path.split(product_path) | |
| # Download the file to a bytes buffer | |
| try: | |
| # Create a bytes buffer | |
| buffer = io.BytesIO() | |
| # Download the file to the buffer using the client | |
| self.s3_client.download_fileobj(self.bucket_name, product_path, buffer) | |
| # Reset buffer position to the start | |
| buffer.seek(0) | |
| # Get the bytes | |
| product_content = buffer.getvalue() | |
| print(f"Successfully downloaded product: {filename}") | |
| # Return both the bytes and the filename | |
| return product_content, filename | |
| except Exception as e: | |
| print(f"Error downloading product: {str(e)}") | |
| raise | |
| async def download_product(self, product_path, output_filename=None): | |
| """ | |
| Download a Sentinel-2 product directly from Copernicus Data Space Ecosystem to disk. | |
| Args: | |
| product_path (str): S3 key or full S3 URI to the product | |
| output_filename (str, optional): Filename to save the product to. | |
| If None, uses the original filename. | |
| Returns: | |
| str: Path to the downloaded file | |
| """ | |
| # Extract S3 key if full URI is provided | |
| if product_path.startswith('s3://'): | |
| product_path = self.get_s3_key_from_href(product_path) | |
| # Extract the filename from the path | |
| _, filename = os.path.split(product_path) | |
| # Use custom filename if provided, otherwise use the original | |
| if output_filename is None: | |
| output_filename = filename | |
| # Download the file using the client | |
| try: | |
| self.s3_client.download_file(self.bucket_name, product_path, output_filename) | |
| print(f"Successfully downloaded product {filename} to {output_filename}") | |
| return output_filename | |
| except Exception as e: | |
| print(f"Error downloading product: {str(e)}") | |
| raise | |
| from src.auth.auth import get_direct_access_token | |
| from src.utils.image import extract_url_after_filename | |
| def download_sentinel_image(username, password, start_date, end_date, | |
| bbox=[-180, -90, 180, 90], limit=10): | |
| """ | |
| Download a random Sentinel-2 image based on criteria. | |
| Args: | |
| username (str): DESTINE username | |
| password (str): DESTINE password | |
| # date_range (str): Date range in format "YYYY-MM-DD/YYYY-MM-DD" | |
| cloud_cover (int, optional): Maximum cloud cover percentage | |
| bbox (list): Bounding box coordinates [west, south, east, north] | |
| limit (int): Maximum number of results to return | |
| Returns: | |
| tuple: (image_content or error_message, metadata) | |
| """ | |
| # Get access token | |
| token_result = get_direct_access_token(username=username, password=password) | |
| if not token_result: | |
| return "Failed to authenticate", None | |
| access_token = token_result["access_token"] | |
| # Set up STAC API client | |
| stac_base_url = "https://cachea.destine.eu" | |
| stac_url = f"{stac_base_url}/stac/api" | |
| catalog = Client.open(stac_url) | |
| start_date = datetime.strptime(start_date, "%Y-%m-%d") | |
| end_date = datetime.strptime(end_date, "%Y-%m-%d") | |
| days_between = (end_date - start_date).days | |
| random_start_day = random.randint(0, days_between - 7) # Ensure we have 7 days | |
| random_start_date = start_date + timedelta(days=random_start_day) | |
| random_end_date = random_start_date + timedelta(days=1) | |
| # Format dates for the API | |
| start_date_str = random_start_date.strftime("%Y-%m-%d") | |
| end_date_str = random_end_date.strftime("%Y-%m-%d") | |
| # Build search parameters | |
| search_params = { | |
| "method": "GET", | |
| "collections": ["SENTINEL-2"], | |
| "bbox": bbox, | |
| "datetime": f"{start_date_str}/{end_date_str}", | |
| "limit": limit | |
| } | |
| # Search for Sentinel-2 images | |
| search = catalog.search(**search_params) | |
| # Get a list of items | |
| items = list(search.items()) | |
| if not items: | |
| return "No Sentinel-2 images found", None | |
| # Select a random item | |
| random_item = random.choice(items) | |
| # Get metadata for the selected item | |
| metadata = { | |
| "id": random_item.id, | |
| "datetime": random_item.datetime.strftime("%Y-%m-%d %H:%M:%S"), | |
| "bbox": random_item.bbox, | |
| } | |
| # Get the assets of the random item | |
| assets = random_item.assets | |
| asset_keys = list(assets.keys()) | |
| # Filter the assets to get the one that ends with *_TCI_60m.jp2 | |
| tci_assets = [assets[key].href for key in asset_keys if assets[key].href.endswith('_TCI_60m.jp2')] | |
| if not tci_assets: | |
| return "No TCI assets found in the selected image", None | |
| filepath = extract_url_after_filename(tci_assets[0]) | |
| metadata["filename"] = os.path.basename(filepath) | |
| # Download the file | |
| url = f"{stac_base_url}/stac/download?filename={filepath}" | |
| headers = { | |
| 'Authorization': f'Bearer {access_token}' | |
| } | |
| response = requests.post(url, headers=headers, data={}) | |
| if response.status_code == 200: | |
| return response.content, metadata | |
| else: | |
| return f"Failed to download the file. Status code: {response.status_code}", None | |