Spaces:
Runtime error
Runtime error
| import ee | |
| import numpy as np | |
| from PIL import Image | |
| import io | |
| import pandas as pd | |
| from datetime import datetime, timedelta | |
| import requests | |
| import os | |
| import logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| service_account = "deploy-1@firepatternrecognition.iam.gserviceaccount.com" | |
| credentials = ee.ServiceAccountCredentials(service_account, '2AB9532A8B7759C26D6980A570EF6EEE8B49DE78_firepatternrecognition-c10928272b24.json') | |
| ee.Initialize(credentials) | |
| # Initialize the Earth Engine module. | |
| width, height = 158,292 | |
| nw_corner_list = [98.00, 20.15] | |
| ne_corner_list = [99.58, 20.15] | |
| sw_corner_list = [98.00, 17.23] | |
| se_corner_list = [99.58, 17.23] | |
| # Define the coordinates of the new area. | |
| nw_corner = ee.Geometry.Point(nw_corner_list) | |
| ne_corner = ee.Geometry.Point(ne_corner_list) | |
| sw_corner = ee.Geometry.Point(sw_corner_list) | |
| se_corner = ee.Geometry.Point(se_corner_list) | |
| # Create a rectangle from the corners. | |
| area = ee.Geometry.Polygon([ | |
| [nw_corner.coordinates(), ne_corner.coordinates(), se_corner.coordinates(), sw_corner.coordinates(), nw_corner.coordinates()] | |
| ]) | |
| def calculate_ndvi(image): | |
| """Calculate NDVI for an image.""" | |
| ndvi = image.normalizedDifference(['B8', 'B4']).rename('NDVI') | |
| return image.addBands(ndvi) | |
| def create_image_ndvi(date1, date2, new_area=area): | |
| logging.info("Starting create image ndvi") | |
| date_obj = datetime.strptime(date2, '%Y-%m-%d') | |
| new_date_obj = date_obj + timedelta(days=1) | |
| new_date_str = new_date_obj.strftime('%Y-%m-%d') | |
| # Create an ImageCollection and filter it by date and bounds | |
| image_collection = ee.ImageCollection('COPERNICUS/S2_SR') \ | |
| .filterDate(date1, new_date_str) \ | |
| .filterBounds(new_area) | |
| # Select bands 4 and 8 and calculate NDVI for each image in the collection | |
| image_collection = image_collection.select(['B4', 'B8']).map(calculate_ndvi) | |
| # Get the median NDVI image and clip it to the area of interest | |
| ndvi_image = image_collection.select('NDVI').max().clip(new_area) | |
| ndvi_vis_params = { | |
| 'min': -1, | |
| 'max': 1, | |
| 'palette': ['white', 'black'] | |
| } | |
| # Get the URL for the NDVI image | |
| url = ndvi_image.getThumbURL({ | |
| 'min': ndvi_vis_params['min'], | |
| 'max': ndvi_vis_params['max'], | |
| 'palette': ','.join(ndvi_vis_params['palette']), | |
| 'region': new_area, | |
| 'dimensions': [width, height], | |
| 'format': 'png' | |
| }) | |
| response = requests.get(url) | |
| image = Image.open(io.BytesIO(response.content)).convert("L") | |
| ndvi_array = np.array(image) | |
| ndvi_array = ndvi_array / 255.0 | |
| logging.info("End create image ndvi") | |
| return ndvi_array,image | |
| def calculate_ndwi(image): | |
| ndwi = image.normalizedDifference(['B3', 'B8']).rename('NDWI') | |
| return image.addBands(ndwi) | |
| def create_image_ndwi(date1, date2, new_area=area): | |
| logging.info("Starting create image ndwi") | |
| date_obj = datetime.strptime(date2, '%Y-%m-%d') | |
| new_date_obj = date_obj + timedelta(days=1) | |
| new_date_str = new_date_obj.strftime('%Y-%m-%d') | |
| image_collection = ee.ImageCollection('COPERNICUS/S2_SR') \ | |
| .filterDate(date1, new_date_str) \ | |
| .filterBounds(new_area) | |
| # Select bands 3 and 8 and calculate NDWI for each image in the collection | |
| image_collection = image_collection.select(['B3', 'B8']).map(calculate_ndwi) | |
| # Get the mean NDWI image and clip it to the area of interest | |
| ndwi_image = image_collection.select('NDWI').max().clip(new_area) | |
| # Define visualization parameters for NDWI | |
| ndwi_vis_params = { | |
| 'min': -1, | |
| 'max': 1, | |
| 'palette': ['white', 'black'] | |
| } | |
| # Get the URL for the NDWI image | |
| url = ndwi_image.getThumbURL({ | |
| 'min': ndwi_vis_params['min'], | |
| 'max': ndwi_vis_params['max'], | |
| 'palette': ','.join(ndwi_vis_params['palette']), | |
| 'region': new_area, | |
| 'dimensions': [width, height], | |
| 'format': 'png' | |
| }) | |
| # Request the image from the URL | |
| response = requests.get(url) | |
| image = Image.open(io.BytesIO(response.content)).convert("L") | |
| # Convert the image to a NumPy array | |
| ndwi_array = np.array(image) | |
| ndwi_array = ndwi_array / 255 | |
| logging.info("End create image ndwi") | |
| return ndwi_array,image | |
| def calculate_ndmi(image): | |
| ndmi = image.normalizedDifference(['B8', 'B11']).rename('NDMI') | |
| return image.addBands(ndmi) | |
| def create_image_ndmi(date1, date2, new_area=area): | |
| logging.info("Starting create image ndmi") | |
| date_obj = datetime.strptime(date2, '%Y-%m-%d') | |
| new_date_obj = date_obj + timedelta(days=1) | |
| new_date_str = new_date_obj.strftime('%Y-%m-%d') | |
| # Create an ImageCollection and filter it by date and bounds | |
| image_collection = ee.ImageCollection('COPERNICUS/S2_SR') \ | |
| .filterDate(date1, new_date_str) \ | |
| .filterBounds(new_area) | |
| # Select bands 8 and 11 and calculate NDMI for each image in the collection | |
| image_collection = image_collection.select(['B8', 'B11']).map(calculate_ndmi) | |
| # Get the mean NDMI image and clip it to the area of interest | |
| ndmi_image = image_collection.select('NDMI').max().clip(new_area) | |
| # Define visualization parameters for NDMI | |
| ndmi_vis_params = { | |
| 'min': -1, | |
| 'max': 1, | |
| 'palette': ['white', 'black'] | |
| } | |
| # Get the URL for the NDMI image | |
| url = ndmi_image.getThumbURL({ | |
| 'min': ndmi_vis_params['min'], | |
| 'max': ndmi_vis_params['max'], | |
| 'palette': ','.join(ndmi_vis_params['palette']), | |
| 'region': new_area, | |
| 'dimensions': [width, height], | |
| 'format': 'png' | |
| }) | |
| # Request the image from the URL | |
| response = requests.get(url) | |
| image = Image.open(io.BytesIO(response.content)).convert("L") | |
| # Convert the image to a NumPy array | |
| ndmi_array = np.array(image) | |
| ndmi_array = ndmi_array/255 | |
| logging.info("End create image ndmi") | |
| return ndmi_array,image | |
| def calculate_nddi(image): | |
| ndvi = image.select('NDVI') | |
| ndwi = image.select('NDWI') | |
| # nddi = ndvi.subtract(ndwi).divide(ndvi.add(ndwi)).rename('NDDI') | |
| nddi = ndvi.subtract(ndwi).rename('NDDI') | |
| return nddi | |
| def create_image_nddi(date1, date2, new_area=area): | |
| logging.info("Starting create image nddi") | |
| # Parse the end date and add one day | |
| date_obj = datetime.strptime(date2, '%Y-%m-%d') | |
| new_date_obj = date_obj + timedelta(days=1) | |
| new_date_str = new_date_obj.strftime('%Y-%m-%d') | |
| # Create an ImageCollection and filter it by date and bounds | |
| image_collection = ee.ImageCollection('COPERNICUS/S2_SR') \ | |
| .filterDate(date1, new_date_str) \ | |
| .filterBounds(new_area) | |
| # Select bands 8, 4, and 3 and calculate NDVI and NDWI for each image in the collection | |
| image_collection = image_collection.select(['B8', 'B4', 'B3']).map(calculate_ndvi).map(calculate_ndwi) | |
| # Calculate NDDI for each image in the collection | |
| # image_collection = image_collection.map(lambda image: image.addBands(calculate_nddi(image))) | |
| image_collection = image_collection.map(calculate_nddi) | |
| # Get the mean NDDI image and clip it to the area of interest | |
| nddi_image = image_collection.select('NDDI').max().clip(new_area) | |
| # Define visualization parameters for NDDI | |
| nddi_vis_params = { | |
| 'min': -1, | |
| 'max': 1, | |
| 'palette': ['white', 'black'] | |
| } | |
| # Get the URL for the NDDI image | |
| url = nddi_image.getThumbURL({ | |
| 'min': nddi_vis_params['min'], | |
| 'max': nddi_vis_params['max'], | |
| 'palette': ','.join(nddi_vis_params['palette']), | |
| 'region': new_area, | |
| 'dimensions': [width, height], | |
| 'format': 'png' | |
| }) | |
| # Request the image from the URL | |
| response = requests.get(url) | |
| image = Image.open(io.BytesIO(response.content)).convert("L") | |
| # Convert the image to a NumPy array | |
| nddi_array = np.array(image) | |
| nddi_array = nddi_array /255 | |
| logging.info("End create image nddi") | |
| return nddi_array,image | |
| def calculate_savi(image): | |
| red = image.select('B4') | |
| nir = image.select('B8') | |
| # Define the soil adjustment factor (L), typically between 0 to 1 | |
| L = 0.5 | |
| savi = nir.subtract(red).multiply(1 + L).divide(nir.add(red).add(L)).rename('SAVI') | |
| return image.addBands(savi) | |
| def create_image_savi(date1, date2, new_area=area): | |
| logging.info("Starting create image savi") | |
| # Parse the end date and add one day | |
| date_obj = datetime.strptime(date2, '%Y-%m-%d') | |
| new_date_obj = date_obj + timedelta(days=1) | |
| new_date_str = new_date_obj.strftime('%Y-%m-%d') | |
| # Create an ImageCollection and filter it by date and bounds | |
| image_collection = ee.ImageCollection('COPERNICUS/S2_SR') \ | |
| .filterDate(date1, new_date_str) \ | |
| .filterBounds(new_area) | |
| # Select bands 4, 8, and 12 and calculate SAVI for each image in the collection | |
| image_collection = image_collection.select(['B4', 'B8', 'B12']).map(calculate_savi) | |
| # Get the mean SAVI image and clip it to the area of interest | |
| savi_image = image_collection.select('SAVI').max().clip(new_area) | |
| # Define visualization parameters for SAVI | |
| savi_vis_params = { | |
| 'min': -1, | |
| 'max': 1, | |
| 'palette': ['white', 'black'] | |
| } | |
| # Get the URL for the SAVI image | |
| url = savi_image.getThumbURL({ | |
| 'min': savi_vis_params['min'], | |
| 'max': savi_vis_params['max'], | |
| 'palette': ','.join(savi_vis_params['palette']), | |
| 'region': new_area, | |
| 'dimensions': [158, 292], | |
| 'format': 'png' | |
| }) | |
| # Request the image from the URL | |
| response = requests.get(url) | |
| image = Image.open(io.BytesIO(response.content)).convert("L") | |
| # Convert the image to a NumPy array | |
| savi_array = np.array(image) | |
| # Normalize the values to be between -1 and 1 | |
| savi_array = savi_array / 255 | |
| logging.info("End create image savi") | |
| return savi_array,image | |
| def date_create(today_date,new_area=area): | |
| # today_date = datetime.today().strftime('%Y-%m-%d') | |
| today_date = datetime.strptime(today_date, '%Y-%m-%d') | |
| seven_days_ago = (today_date - timedelta(days=7)).strftime('%Y-%m-%d') | |
| image_collection = ee.ImageCollection('COPERNICUS/S2_SR') | |
| image_collection_filter = image_collection.filterDate(seven_days_ago, today_date).filterBounds(new_area) | |
| image_info = image_collection_filter.aggregate_array('system:index').getInfo() | |
| image_dates = image_collection_filter.aggregate_array('system:time_start').map(lambda d: ee.Date(d).format('YYYY-MM-dd')).getInfo() | |
| df_day = pd.DataFrame({'Image ID': image_info, 'Date': image_dates}) | |
| df_du = pd.unique(df_day['Date']) | |
| df_latest_2day = df_du[-2:] | |
| return df_latest_2day | |
| def load_and_process_file(file_path): | |
| logging.info(f"Loading file: {file_path}") | |
| # Check if the file exists | |
| if not os.path.isfile(file_path): | |
| raise FileNotFoundError(f"The file '{file_path}' does not exist.") | |
| # Check the file extension | |
| file_extension = os.path.splitext(file_path)[1].lower() | |
| if file_extension == '.csv': | |
| # Load CSV file | |
| df = pd.read_csv(file_path) | |
| elif file_extension in ['.xls', '.xlsx']: | |
| # Load Excel file | |
| df = pd.read_excel(file_path) | |
| else: | |
| raise ValueError("The file must be a CSV or Excel file with extensions '.csv', '.xls', or '.xlsx'.") | |
| # Convert column names to lowercase | |
| df.columns = df.columns.str.lower() | |
| df = df[["latitude","longitude","acq_date"]] | |
| return df | |
| def filter_latest_date(df, date_column='date'): | |
| lat_min = sw_corner_list[1] | |
| lat_max = nw_corner_list[1] | |
| lon_min = sw_corner_list[0] | |
| lon_max = se_corner_list[0] | |
| df = df[(df['latitude'] >= lat_min) & (df['latitude'] <= lat_max) & | |
| (df['longitude'] >= lon_min) & (df['longitude'] <= lon_max)] | |
| # Convert the date column to datetime format | |
| df[date_column] = pd.to_datetime(df[date_column]) | |
| # Find the latest date | |
| latest_date = df[date_column].max() | |
| # Filter the DataFrame to only include rows with the latest date | |
| filtered_df = df[df[date_column] == latest_date] | |
| filtered_df = filtered_df.reset_index(drop=True) | |
| logging.info(f"Loading file: {str(filtered_df[date_column][0].date())}") | |
| return filtered_df | |
| def create_image_from_coordinates(df): | |
| logging.info("Start create_image_from_coordinates") | |
| # Create a blank (black) image array | |
| image_array = np.zeros((height, width), dtype=np.uint8) | |
| # Function to convert lat/lon to pixel coordinates | |
| def latlon_to_pixel(lat, lon): | |
| lat_range = nw_corner_list[1] - sw_corner_list[1] | |
| lon_range = ne_corner_list[0] - nw_corner_list[0] | |
| y = int(height * (nw_corner_list[1] - lat) / lat_range) | |
| x = int(width * (lon - nw_corner_list[0]) / lon_range) | |
| return x, y | |
| # Iterate through each coordinate in the dataframe | |
| for index, row in df.iterrows(): | |
| lat, lon = row['latitude'], row['longitude'] | |
| x, y = latlon_to_pixel(lat, lon) | |
| # Paint a 11x11 pixel square centered on the hotspot coordinate if within bounds | |
| for dx in range(-2, 3): | |
| for dy in range(-2, 3): | |
| new_x = x + dx | |
| new_y = y + dy | |
| if 0 <= new_x < width and 0 <= new_y < height: | |
| image_array[new_y, new_x] = 255 | |
| # Convert array to PIL Image | |
| pil_image = Image.fromarray(image_array).convert("L") | |
| logging.info("End create_image_from_coordinates") | |
| # Return both the array and the PIL Image | |
| return image_array |