Spaces:
Sleeping
Sleeping
File size: 6,888 Bytes
fca871f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 | import io
from datetime import datetime, timedelta
import random
import requests
from pystac_client import Client
import os
from src.auth.auth import get_direct_access_token
from src.utils.image import extract_url_after_filename
class ProductDownloader:
"""
Class for downloading products from Copernicus Data Space Ecosystem.
This class provides methods to download products using an S3 client connection,
either as in-memory content (bytes) or as files saved to disk.
"""
def __init__(self, s3_client, bucket_name='eodata'):
"""
Initialize the product downloader with an S3 client.
Args:
s3_client: The boto3 S3 client to use for downloads
bucket_name (str): The S3 bucket name where products are stored (default: 'eodata')
"""
self.s3_client = s3_client
self.bucket_name = bucket_name
def get_product_content(self, product_path):
"""
Download a Sentinel-2 product directly from Copernicus Data Space Ecosystem as a bytes object.
Args:
product_path (str): S3 key or full S3 URI to the product
Returns:
bytes: The product content as bytes
str: The filename of the product
"""
# Extract S3 key if full URI is provided
# Extract the filename from the path
_, filename = os.path.split(product_path)
# Download the file to a bytes buffer
try:
# Create a bytes buffer
buffer = io.BytesIO()
# Download the file to the buffer using the client
self.s3_client.download_fileobj(self.bucket_name, product_path, buffer)
# Reset buffer position to the start
buffer.seek(0)
# Get the bytes
product_content = buffer.getvalue()
print(f"Successfully downloaded product: {filename}")
# Return both the bytes and the filename
return product_content, filename
except Exception as e:
print(f"Error downloading product: {str(e)}")
raise
async def download_product(self, product_path, output_filename=None):
"""
Download a Sentinel-2 product directly from Copernicus Data Space Ecosystem to disk.
Args:
product_path (str): S3 key or full S3 URI to the product
output_filename (str, optional): Filename to save the product to.
If None, uses the original filename.
Returns:
str: Path to the downloaded file
"""
# Extract S3 key if full URI is provided
if product_path.startswith('s3://'):
product_path = self.get_s3_key_from_href(product_path)
# Extract the filename from the path
_, filename = os.path.split(product_path)
# Use custom filename if provided, otherwise use the original
if output_filename is None:
output_filename = filename
# Download the file using the client
try:
self.s3_client.download_file(self.bucket_name, product_path, output_filename)
print(f"Successfully downloaded product {filename} to {output_filename}")
return output_filename
except Exception as e:
print(f"Error downloading product: {str(e)}")
raise
from src.auth.auth import get_direct_access_token
from src.utils.image import extract_url_after_filename
def download_sentinel_image(username, password, start_date, end_date,
bbox=[-180, -90, 180, 90], limit=10):
"""
Download a random Sentinel-2 image based on criteria.
Args:
username (str): DESTINE username
password (str): DESTINE password
# date_range (str): Date range in format "YYYY-MM-DD/YYYY-MM-DD"
cloud_cover (int, optional): Maximum cloud cover percentage
bbox (list): Bounding box coordinates [west, south, east, north]
limit (int): Maximum number of results to return
Returns:
tuple: (image_content or error_message, metadata)
"""
# Get access token
token_result = get_direct_access_token(username=username, password=password)
if not token_result:
return "Failed to authenticate", None
access_token = token_result["access_token"]
# Set up STAC API client
stac_base_url = "https://cachea.destine.eu"
stac_url = f"{stac_base_url}/stac/api"
catalog = Client.open(stac_url)
start_date = datetime.strptime(start_date, "%Y-%m-%d")
end_date = datetime.strptime(end_date, "%Y-%m-%d")
days_between = (end_date - start_date).days
random_start_day = random.randint(0, days_between - 7) # Ensure we have 7 days
random_start_date = start_date + timedelta(days=random_start_day)
random_end_date = random_start_date + timedelta(days=1)
# Format dates for the API
start_date_str = random_start_date.strftime("%Y-%m-%d")
end_date_str = random_end_date.strftime("%Y-%m-%d")
# Build search parameters
search_params = {
"method": "GET",
"collections": ["SENTINEL-2"],
"bbox": bbox,
"datetime": f"{start_date_str}/{end_date_str}",
"limit": limit
}
# Search for Sentinel-2 images
search = catalog.search(**search_params)
# Get a list of items
items = list(search.items())
if not items:
return "No Sentinel-2 images found", None
# Select a random item
random_item = random.choice(items)
# Get metadata for the selected item
metadata = {
"id": random_item.id,
"datetime": random_item.datetime.strftime("%Y-%m-%d %H:%M:%S"),
"bbox": random_item.bbox,
}
# Get the assets of the random item
assets = random_item.assets
asset_keys = list(assets.keys())
# Filter the assets to get the one that ends with *_TCI_60m.jp2
tci_assets = [assets[key].href for key in asset_keys if assets[key].href.endswith('_TCI_60m.jp2')]
if not tci_assets:
return "No TCI assets found in the selected image", None
filepath = extract_url_after_filename(tci_assets[0])
metadata["filename"] = os.path.basename(filepath)
# Download the file
url = f"{stac_base_url}/stac/download?filename={filepath}"
headers = {
'Authorization': f'Bearer {access_token}'
}
response = requests.post(url, headers=headers, data={})
if response.status_code == 200:
return response.content, metadata
else:
return f"Failed to download the file. Status code: {response.status_code}", None
|