Spaces:
Build error
Build error
| import os | |
| import time | |
| import gradio as gr | |
| import numpy as np | |
| import io | |
| import random | |
| from PIL import Image | |
| from dotenv import load_dotenv | |
| import pystac_client | |
| from datetime import datetime | |
| from src.auth.auth import S3Connector | |
| from src.utils.utils import extract_s3_path_from_url | |
| from src.utils.stac_client import get_product_content | |
| # from src.utils.geo_guesser import get_random_land_point | |
| from src.utils.geo_guesser import get_random_land_point_rejection | |
| from folium.plugins import HeatMap | |
| # Assuming these utility modules are in the specified paths relative to the script | |
| # If not, adjust the import paths accordingly. | |
| # import geopandas as gpd # Not used in the provided snippet, can be removed if not needed elsewhere | |
| # from shapely.geometry import Point # Not used, can be removed | |
| # import geodatasets # Not used, can be removed | |
| import folium | |
| from gradio_folium import Folium | |
| import tempfile | |
| # Function to create a Folium map for a location | |
| def create_location_map(latitude, longitude): | |
| try: | |
| # Create a folium map centered at the point | |
| m = folium.Map(location=[latitude, longitude], width='100%', height='100%', zoom_start=7, tiles="Cartodb dark_matter") | |
| # Add a marker for the point | |
| folium.Marker( | |
| location=[latitude, longitude], | |
| popup=f"Image Location<br>Lon: {longitude:.2f}<br>Lat: {latitude:.2f}", | |
| icon=folium.Icon(color='red', icon='camera') | |
| ).add_to(m) | |
| return m | |
| except Exception as e: | |
| print(f"Error creating map: {e}") | |
| return folium.Map(location=[0, 0], zoom_start=2) | |
| # --- START MODIFICATION: Add Image Quality Check Function --- | |
| def check_image_quality(img: Image.Image, max_black_pct=15.0, max_white_pct=50.0): | |
| """ | |
| Checks if a PIL Image has excessive black or white pixels. | |
| Args: | |
| img: The PIL Image object. | |
| max_black_pct: Maximum allowed percentage of pure black pixels. | |
| max_white_pct: Maximum allowed percentage of pure white pixels. | |
| Returns: | |
| True if the image quality is problematic (too much black/white), False otherwise. | |
| """ | |
| try: | |
| img_array = np.array(img) | |
| # Ensure it's an RGB image for consistent checks | |
| if img_array.ndim != 3 or img_array.shape[2] != 3: | |
| print("Warning: Image is not in standard RGB format. Skipping quality check.") | |
| return False # Assume okay if not standard RGB | |
| total_pixels = img_array.shape[0] * img_array.shape[1] | |
| if total_pixels == 0: | |
| print("Warning: Image has zero pixels.") | |
| return True # Problematic if no pixels | |
| # Count black pixels (0, 0, 0) | |
| black_pixels = np.sum(np.all(img_array == [0, 0, 0], axis=2)) | |
| black_pct = (black_pixels / total_pixels) * 100 | |
| # Count white pixels (255, 255, 255) | |
| white_pixels = np.sum(np.all(img_array == [255, 255, 255], axis=2)) | |
| white_pct = (white_pixels / total_pixels) * 100 | |
| print(f"Image Quality Check - Black: {black_pct:.2f}%, White: {white_pct:.2f}%") | |
| if black_pct > max_black_pct: | |
| print(f"Image rejected: Exceeds black pixel threshold ({black_pct:.2f}% > {max_black_pct}%)") | |
| return True # Problematic | |
| if white_pct > max_white_pct: | |
| print(f"Image rejected: Exceeds white pixel threshold ({white_pct:.2f}% > {max_white_pct}%)") | |
| return True # Problematic | |
| return False # Image quality is acceptable | |
| except Exception as e: | |
| print(f"Error during image quality check: {e}") | |
| # Decide how to handle check errors, e.g., assume okay or problematic | |
| return False # Let's be lenient and assume okay if check fails | |
| # --- END MODIFICATION --- | |
| # Load environment variables | |
| load_dotenv() | |
| # Get credentials from environment variables | |
| ACCESS_KEY_ID = os.environ.get("ACCESS_KEY_ID") | |
| SECRET_ACCESS_KEY = os.environ.get("SECRET_ACCESS_KEY") | |
| ENDPOINT_URL = 'https://eodata.dataspace.copernicus.eu' | |
| ENDPOINT_STAC = "https://stac.dataspace.copernicus.eu/v1/" | |
| BUCKET_NAME = "eodata" | |
| # Initialize the connector | |
| # Ensure S3Connector is correctly defined in src.auth.auth | |
| try: | |
| connector = S3Connector( | |
| endpoint_url=ENDPOINT_URL, | |
| access_key_id=ACCESS_KEY_ID, | |
| secret_access_key=SECRET_ACCESS_KEY, | |
| region_name='default' # Adjust if a specific region is needed | |
| ) | |
| # Connect to S3 | |
| s3 = connector.get_s3_resource() | |
| s3_client = connector.get_s3_client() | |
| # buckets = connector.list_buckets() # Optional: Listing buckets might require different permissions | |
| # print("Available buckets:", buckets) # Comment out if not needed or causes issues | |
| except ImportError: | |
| print("Error: S3Connector class not found. Ensure src/auth/auth.py exists and is correct.") | |
| # Provide dummy clients if needed for Gradio interface to load without full functionality | |
| s3 = None | |
| s3_client = None | |
| except Exception as e: | |
| print(f"Error initializing S3 Connector: {e}") | |
| s3 = None | |
| s3_client = None | |
| # Initialize STAC Client | |
| try: | |
| catalog = pystac_client.Client.open(ENDPOINT_STAC) | |
| except Exception as e: | |
| print(f"Error initializing STAC Client: {e}") | |
| catalog = None | |
| # --- START MODIFICATION: Update fetch_sentinel_image --- | |
| def fetch_sentinel_image(longitude, latitude, date_from, date_to, cloud_cover): | |
| """Fetch a Sentinel image based on criteria, retrying if quality is poor.""" | |
| if not catalog or not s3_client: | |
| error_message = "STAC Catalog or S3 Client not initialized. Check credentials and endpoints." | |
| print(error_message) | |
| default_map = folium.Map(location=[0, 0], zoom_start=2) | |
| return None, error_message, default_map | |
| try: | |
| # Use the coordinates from inputs | |
| LON, LAT = float(longitude), float(latitude) | |
| # Use the date range from inputs | |
| date_range = f"{date_from}/{date_to}" | |
| cloud_query = f"eo:cloud_cover<{cloud_cover}" | |
| # Search for items | |
| search = catalog.search( | |
| collections=['sentinel-2-l2a'], | |
| intersects=dict(type="Point", coordinates=[LON, LAT]), | |
| datetime=date_range, | |
| query=[cloud_query], | |
| max_items=20 # Fetch a few items to have alternatives for quality check | |
| ) | |
| # It's often better to get items as a list directly if possible | |
| # Depending on pystac_client version, .items() or .item_collection() might be preferred | |
| # Using item_collection and converting to list for broader compatibility | |
| items_collection = search.item_collection() | |
| items_list = list(items_collection) | |
| if len(items_list) == 0: | |
| # Return a default map with no data | |
| default_map = create_location_map(LAT, LON) # Use helper function | |
| folium.Marker( | |
| location=[LAT, LON], | |
| popup=f"No images found at this location\nLon: {LON:.2f}, Lat: {LAT:.2f}\nwithin {date_from} to {date_to}\nand cloud cover < {cloud_cover}%", | |
| icon=folium.Icon(color='gray', icon='question-sign') | |
| ).add_to(default_map) | |
| return None, f"No images found for the specified criteria at coordinates ({LON}, {LAT}) with cloud cover < {cloud_cover}%.", default_map | |
| # Shuffle the list to try different items if multiple calls are made | |
| random.shuffle(items_list) | |
| MAX_QUALITY_ATTEMPTS = 20 # Max images to check for quality from the found list | |
| selected_item = None | |
| img = None | |
| product_url = None | |
| metadata = "Failed to retrieve a suitable quality image." # Default failure msg | |
| for attempt, item in enumerate(items_list): | |
| if attempt >= MAX_QUALITY_ATTEMPTS: | |
| print(f"Checked {MAX_QUALITY_ATTEMPTS} images, none passed quality criteria.") | |
| metadata = f"Found {len(items_list)} images, but the first {MAX_QUALITY_ATTEMPTS} checked failed quality check (Black > 15% or White > 50%)." | |
| break # Stop trying after max attempts | |
| print(f"Attempt {attempt + 1}/{min(MAX_QUALITY_ATTEMPTS, len(items_list))}: Trying item {item.id}") | |
| try: | |
| # Ensure 'TCI_60m' asset exists | |
| if 'TCI_60m' not in item.assets: | |
| print(f"Item {item.id} does not have a 'TCI_60m' asset. Skipping.") | |
| continue | |
| # Get the TCI_60m asset from the randomly selected item | |
| current_product_url = extract_s3_path_from_url(item.assets['TCI_60m'].href) | |
| # Ensure get_product_content is correctly defined in src.utils.stac_client | |
| product_content = get_product_content(s3_client=s3_client, bucket_name=BUCKET_NAME, | |
| object_url=current_product_url) | |
| print(f"Selected product URL: {current_product_url}") | |
| # Convert to PIL Image | |
| current_img = Image.open(io.BytesIO(product_content)) | |
| # Perform image quality check | |
| if not check_image_quality(current_img, max_black_pct=15.0, max_white_pct=50.0): | |
| # Quality is good, select this image | |
| selected_item = item | |
| img = current_img | |
| product_url = current_product_url | |
| print(f"Image {item.id} passed quality check.") | |
| break # Found a good image, exit the loop | |
| else: | |
| # Quality is bad, close image and loop continues | |
| print(f"Image {item.id} failed quality check. Trying next.") | |
| current_img.close() # Close the problematic image | |
| except (FileNotFoundError, KeyError) as asset_err: # Handle S3 errors or missing keys | |
| print(f"Error accessing asset for item {item.id}: {asset_err}. Skipping.") | |
| continue # Try the next item | |
| except Exception as proc_err: | |
| print(f"Error processing item {item.id}: {proc_err}. Skipping.") | |
| # Close image if it was opened before error | |
| if 'current_img' in locals() and hasattr(current_img, 'close'): | |
| current_img.close() | |
| continue # Try the next item | |
| # After the loop, check if a good image was found | |
| if selected_item and img: | |
| # Format datetime for readability | |
| datetime_str = selected_item.properties.get('datetime', 'N/A') | |
| try: | |
| # Handle potential timezone 'Z' | |
| if isinstance(datetime_str, str) and datetime_str.endswith('Z'): | |
| datetime_str = datetime_str[:-1] + '+00:00' | |
| dt = datetime.fromisoformat(datetime_str) | |
| formatted_date = dt.strftime('%Y-%m-%d %H:%M:%S UTC') | |
| except ValueError: | |
| formatted_date = datetime_str # Keep original if parsing fails | |
| except Exception as date_e: | |
| print(f"Date formatting error: {date_e}") | |
| formatted_date = datetime_str # Fallback | |
| # Extract metadata for display | |
| metadata = f""" | |
| ## Product Information | |
| - **Location**: {LAT:.4f}°N, {LON:.4f}°E | |
| - **Date**: {formatted_date} | |
| - **Cloud Cover**: {selected_item.properties.get('eo:cloud_cover', 'N/A')}% | |
| - **Cloud Cover Threshold**: < {cloud_cover}% | |
| - **Satellite**: {selected_item.properties.get('platform', 'N/A')} | |
| - **Product ID**: {selected_item.id} | |
| """ | |
| # Create a location map | |
| location_map = create_location_map(LAT, LON) | |
| return img, metadata, location_map | |
| else: | |
| # If loop finished without finding a good image | |
| default_map = create_location_map(LAT, LON) # Show location map | |
| folium.Marker( | |
| location=[LAT, LON], | |
| popup=f"Found {len(items_list)} images, but none passed quality check (checked up to {MAX_QUALITY_ATTEMPTS}).\nLon: {LON:.2f}, Lat: {LAT:.2f}", | |
| icon=folium.Icon(color='orange', icon='exclamation-sign') | |
| ).add_to(default_map) | |
| # Use the metadata message set earlier if loop failed | |
| return None, metadata, default_map | |
| except ValueError as ve: | |
| error_message = f"Invalid input: {str(ve)}. Please ensure longitude and latitude are valid numbers." | |
| print(error_message) | |
| default_map = folium.Map(location=[0, 0], zoom_start=2) | |
| return None, error_message, default_map | |
| except pystac_client.exceptions.APIError as api_err: | |
| error_message = f"STAC API Error: {api_err}. Check STAC endpoint and query parameters." | |
| print(error_message) | |
| default_map = folium.Map(location=[0,0], zoom_start=2) | |
| return None, error_message, default_map | |
| except Exception as e: | |
| error_message = f"An unexpected error occurred: {str(e)}" | |
| import traceback | |
| print(error_message) | |
| traceback.print_exc() # Print full traceback for debugging | |
| default_map = folium.Map(location=[0, 0], zoom_start=2) | |
| return None, error_message, default_map | |
| # --- END MODIFICATION --- | |
| # Function to handle random location and auto-fetch | |
| def random_location_and_fetch(date_from, date_to, cloud_cover): | |
| """Get a random land location and fetch an image from there.""" | |
| # Ensure get_random_land_point is correctly defined in src.utils.geo_guesser | |
| try: | |
| lon, lat = get_random_land_point_rejection() | |
| except ImportError: | |
| print("Error: get_random_land_point_rejection function not found. Ensure src/utils/geo_guesser.py exists.") | |
| return 0, 0, None, "Error: Cannot generate random point.", folium.Map(location=[0,0], zoom_start=2) | |
| except Exception as e: | |
| print(f"Error getting random land point: {e}") | |
| return 0, 0, None, f"Error generating random point: {e}", folium.Map(location=[0,0], zoom_start=2) | |
| print(f"Random land point selected: Longitude={lon}, Latitude={lat}") | |
| img, metadata, location_map = fetch_sentinel_image(lon, lat, date_from, date_to, cloud_cover) | |
| # --- START MODIFICATION: Adjust retry logic message --- | |
| # The retry for *location* is now less likely needed if fetch_sentinel_image | |
| # itself tries multiple images. We keep it as a fallback if an entire area | |
| # yields no images or only bad quality ones repeatedly. | |
| attempts = 1 | |
| max_attempts = 3 # Max attempts for *different random locations* | |
| while img is None and attempts < max_attempts: | |
| attempts += 1 | |
| print(f"Attempt {attempts}/{max_attempts}: No suitable image at ({lon:.2f}, {lat:.2f}). Trying another random location...") | |
| try: | |
| lon, lat = get_random_land_point_rejection() | |
| print(f"New random land point: Longitude={lon}, Latitude={lat}") | |
| img, metadata, location_map = fetch_sentinel_image(lon, lat, date_from, date_to, cloud_cover) | |
| except Exception as e: | |
| print(f"Error getting random land point on attempt {attempts}: {e}") | |
| # Decide if you want to stop or just report error and let loop continue | |
| metadata = f"Error getting random location on attempt {attempts}: {e}" | |
| # Keep trying if attempts remain | |
| if img is None: | |
| # Refine the message if no image was found after multiple location attempts | |
| metadata = f"Failed to find a suitable image after trying {max_attempts} random locations. The last attempt was at ({lon:.2f}, {lat:.2f}).\nDetails: {metadata}" # Append last failure reason | |
| # Ensure map shows the last attempted location | |
| if 'location_map' not in locals() or location_map is None: | |
| location_map = create_location_map(lat, lon) | |
| folium.Marker( | |
| location=[lat, lon], | |
| popup=f"Failed to find image after {max_attempts} attempts.\nLast try: Lon: {lon:.2f}, Lat: {lat:.2f}", | |
| icon=folium.Icon(color='red', icon='times') | |
| ).add_to(location_map) | |
| # Update the Gradio fields with the final lon/lat, even if fetching failed | |
| return lon, lat, img, metadata, location_map | |
| # --- END MODIFICATION --- | |
| # Create Gradio interface | |
| with gr.Blocks(title="Sentinel Product Viewer") as demo: | |
| gr.Markdown("# Sentinel-2 Product Viewer") | |
| gr.Markdown("Browse and view Sentinel-2 satellite product") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| # Location inputs | |
| with gr.Row(): | |
| longitude = gr.Number(label="Longitude", value=15.0, minimum=-180, maximum=180) | |
| latitude = gr.Number(label="Latitude", value=50.0, minimum=-90, maximum=90) | |
| # Date range inputs | |
| with gr.Row(): | |
| # Use gr.Date for better UX if Gradio version supports it well | |
| # Otherwise, stick to Textbox and rely on user format | |
| # date_from = gr.Date(label="Date From", value="2024-05-01") | |
| # date_to = gr.Date(label="Date To", value="2025-02-01") | |
| date_from = gr.Textbox(label="Date From (YYYY-MM-DD)", value="2024-05-01") | |
| date_to = gr.Textbox(label="Date To (YYYY-MM-DD)", value="2025-02-01") | |
| # Cloud cover slider | |
| cloud_cover = gr.Slider( | |
| label="Max Cloud Cover (%)", | |
| minimum=0, | |
| maximum=100, | |
| value=20, # Lowered default for potentially better initial results | |
| step=5 | |
| ) | |
| # Diverse landscape location buttons | |
| gr.Markdown("### Preset Locations") | |
| with gr.Row(): | |
| italy_btn = gr.Button("Italy") | |
| amazon_btn = gr.Button("Amazon Rainforest") | |
| with gr.Row(): | |
| tokyo_btn = gr.Button("Tokyo") | |
| great_barrier_btn = gr.Button("Great Barrier Reef") | |
| with gr.Row(): | |
| iceland_btn = gr.Button("Iceland Glacier") | |
| canada_btn = gr.Button("Baffin Island") | |
| fetch_btn = gr.Button("Fetch Image for Current Location", variant="secondary") | |
| # Add Random Earth Location button (with distinctive styling) | |
| gr.Markdown("### Random Discovery") | |
| random_earth_btn = gr.Button("🌍 Get Random Earth Location & Image", variant="primary") # Changed size attr | |
| metadata_output = gr.Markdown(label="Image Metadata") | |
| with gr.Column(scale=2): | |
| image_output = gr.Image( | |
| type="pil", | |
| label="Sentinel-2 Image (TCI 60m)", | |
| # Removed fixed height/width to allow natural aspect ratio | |
| # height=512, # Example: Set a moderate height if needed | |
| # width=512, | |
| show_download_button=True | |
| ) | |
| map_output = Folium( | |
| # Initialize with a default map view | |
| value=create_location_map(50.0, 15.0), # Use initial lat/lon | |
| label="Location Map", | |
| height=400, # Maintain a fixed height for the map | |
| ) | |
| # Button click handlers for diverse landscapes | |
| # These lambda functions only update the lat/lon input fields | |
| italy_btn.click(lambda: (12.39, 42.05), outputs=[longitude, latitude]) | |
| amazon_btn.click(lambda: (-64.7, -3.42), outputs=[longitude, latitude]) | |
| tokyo_btn.click(lambda: (139.70, 35.65), outputs=[longitude, latitude]) | |
| great_barrier_btn.click(lambda: (150.97, -20.92), outputs=[longitude, latitude]) | |
| iceland_btn.click(lambda: (-18.17, 64.61), outputs=[longitude, latitude]) | |
| canada_btn.click(lambda: (-71.56, 67.03), outputs=[longitude, latitude]) | |
| # Random Earth button - gets random location AND fetches image | |
| random_earth_btn.click( | |
| fn=random_location_and_fetch, | |
| # Inputs: date range and cloud cover from the UI | |
| inputs=[date_from, date_to, cloud_cover], | |
| # Outputs: Update lon/lat fields AND image, metadata, map | |
| outputs=[longitude, latitude, image_output, metadata_output, map_output] | |
| ) | |
| # Main search button - uses current lat/lon, date, cloud cover | |
| fetch_btn.click( | |
| fn=fetch_sentinel_image, | |
| inputs=[longitude, latitude, date_from, date_to, cloud_cover], | |
| outputs=[image_output, metadata_output, map_output] | |
| ) | |
| # Optional: Add back the About section if desired | |
| # gr.Markdown("## About") | |
| # ... (About text) ... | |
| if __name__ == "__main__": | |
| # Ensure necessary helper modules/functions are available | |
| # (S3Connector, extract_s3_path_from_url, get_product_content, get_random_land_point_rejection) | |
| if s3_client and catalog: | |
| print("S3 and STAC clients initialized. Launching Gradio app.") | |
| demo.launch(share=True) # Set share=False for local-only access | |
| else: | |
| print("Could not initialize S3 or STAC client. Ensure credentials and network access are correct.") | |
| print("Gradio app will launch with limited functionality.") | |
| # Optionally launch anyway, or exit | |
| demo.launch(share=True) # Or exit(1) if clients are essential | |