mapster_space-test / src /utils /stac_client.py
rodolphethinks1's picture
Update src/utils/stac_client.py
51fc298 verified
import io
from datetime import datetime, timedelta
import random
import requests
from pystac_client import Client
import os
from src.auth.auth import get_direct_access_token
from src.utils.image import extract_url_after_filename
def get_product_content(s3_client, bucket_name, object_url):
"""
Download the content of a product from S3 bucket.
Args:
s3_client: boto3 S3 client object
bucket_name (str): Name of the S3 bucket
object_url (str): Path to the object within the bucket
Returns:
bytes: Content of the downloaded file
"""
print(f"Downloading {object_url}")
try:
# Download the file from S3
response = s3_client.get_object(Bucket=bucket_name, Key=object_url)
content = response['Body'].read()
print(f"Successfully downloaded {object_url}")
except Exception as e:
print(f"Error downloading file: {str(e)}")
raise
return content
def get_product(s3_resource, bucket_name, object_url, output_path):
"""
Download a product from S3 bucket and create output directory if it doesn't exist.
Args:
s3_resource: boto3 S3 resource object
bucket_name (str): Name of the S3 bucket
object_url (str): Path to the object within the bucket
output_path (str): Local directory to save the file
Returns:
str: Path to the downloaded file
"""
# Create output directory if it doesn't exist
os.makedirs(output_path, exist_ok=True)
# Extract filename from the object URL
_, filename = os.path.split(object_url)
# Full path where the file will be saved
local_file_path = os.path.join(output_path, filename)
print(f"Downloading {object_url} to {local_file_path}...")
try:
# Download the file from S3
s3_resource.Bucket(bucket_name).download_file(object_url, local_file_path)
print(f"Successfully downloaded to {local_file_path}")
except Exception as e:
print(f"Error downloading file: {str(e)}")
raise
return local_file_path
def download_sentinel_image(username, password, start_date, end_date,
bbox=[-180, -90, 180, 90], limit=10):
"""
Download a random Sentinel-2 image based on criteria.
Args:
username (str): DESTINE username
password (str): DESTINE password
# date_range (str): Date range in format "YYYY-MM-DD/YYYY-MM-DD"
cloud_cover (int, optional): Maximum cloud cover percentage
bbox (list): Bounding box coordinates [west, south, east, north]
limit (int): Maximum number of results to return
Returns:
tuple: (image_content or error_message, metadata)
"""
# Get access token
token_result = get_direct_access_token(username=username, password=password)
if not token_result:
return "Failed to authenticate", None
access_token = token_result["access_token"]
# Set up STAC API client
stac_base_url = "https://cachea.destine.eu"
stac_url = f"{stac_base_url}/stac/api"
catalog = Client.open(stac_url)
start_date = datetime.strptime(start_date, "%Y-%m-%d")
end_date = datetime.strptime(end_date, "%Y-%m-%d")
days_between = (end_date - start_date).days
random_start_day = random.randint(0, days_between - 7) # Ensure we have 7 days
random_start_date = start_date + timedelta(days=random_start_day)
random_end_date = random_start_date + timedelta(days=1)
# Format dates for the API
start_date_str = random_start_date.strftime("%Y-%m-%d")
end_date_str = random_end_date.strftime("%Y-%m-%d")
# Build search parameters
search_params = {
"method": "GET",
"collections": ["SENTINEL-2"],
"bbox": bbox,
"datetime": f"{start_date_str}/{end_date_str}",
"limit": limit
}
# Search for Sentinel-2 images
search = catalog.search(**search_params)
# Get a list of items
items = list(search.items())
if not items:
return "No Sentinel-2 images found", None
# Select a random item
random_item = random.choice(items)
# Get metadata for the selected item
metadata = {
"id": random_item.id,
"datetime": random_item.datetime.strftime("%Y-%m-%d %H:%M:%S"),
"bbox": random_item.bbox,
}
# Get the assets of the random item
assets = random_item.assets
asset_keys = list(assets.keys())
# Filter the assets to get the one that ends with *_TCI_60m.jp2
tci_assets = [assets[key].href for key in asset_keys if assets[key].href.endswith('_TCI_60m.jp2')]
if not tci_assets:
return "No TCI assets found in the selected image", None
filepath = extract_url_after_filename(tci_assets[0])
metadata["filename"] = os.path.basename(filepath)
# Download the file
url = f"{stac_base_url}/stac/download?filename={filepath}"
headers = {
'Authorization': f'Bearer {access_token}'
}
response = requests.post(url, headers=headers, data={})
if response.status_code == 200:
return response.content, metadata
else:
return f"Failed to download the file. Status code: {response.status_code}", None