EarthEmbeddingExplorer / MajorTOM /metadata_helpers.py
ML4RS-Anonymous's picture
Upload all files
eb1aec4 verified
import pyarrow.parquet as pq
import pandas as pd
import geopandas as gpd
from pathlib import Path
import urllib.request
import fsspec
from fsspec.parquet import open_parquet_file
from io import BytesIO
from PIL import Image
from rasterio.io import MemoryFile
from tqdm.notebook import tqdm
import os
from .sample_helpers import *
def metadata_from_url(access_url, local_url):
local_url, response = urllib.request.urlretrieve(access_url, local_url)
df = pq.read_table(local_url).to_pandas()
df['timestamp'] = pd.to_datetime(df.timestamp)
gdf = gpd.GeoDataFrame(
df, geometry=gpd.points_from_xy(df.centre_lon, df.centre_lat), crs=df.crs.iloc[0]
)
return gdf
def filter_metadata(df,
region=None,
daterange=None,
cloud_cover=(0,100),
nodata=(0, 1.0)
):
"""Filters the Major-TOM dataframe based on several parameters
Args:
df (geopandas dataframe): Parent dataframe
region (shapely geometry object) : Region of interest
daterange (tuple) : Inclusive range of dates (example format: '2020-01-01')
cloud_cover (tuple) : Inclusive percentage range (0-100) of cloud cover
nodata (tuple) : Inclusive fraction (0.0-1.0) of no data allowed in a sample
Returns:
df: a filtered dataframe
"""
# temporal filtering
if daterange is not None:
assert (isinstance(daterange, list) or isinstance(daterange, tuple)) and len(daterange)==2
df = df[df.timestamp >= daterange[0]]
df = df[df.timestamp <= daterange[1]]
# spatial filtering
if region is not None:
idxs = df.sindex.query(region)
df = df.take(idxs)
# cloud filtering
if cloud_cover is not None:
df = df[df.cloud_cover >= cloud_cover[0]]
df = df[df.cloud_cover <= cloud_cover[1]]
# spatial filtering
if nodata is not None:
df = df[df.nodata >= nodata[0]]
df = df[df.nodata <= nodata[1]]
return df
def read_row(row, columns=["thumbnail"]):
"""Reads a row from a Major-TOM dataframe
Args:
row (row from geopandas dataframe): The row of metadata
columns (list): columns to be read from the file
Returns:
data (dict): dictionary with returned data from requested columns
"""
with open_parquet_file(row.parquet_url, columns=columns, footer_sample_size=2000000) as f:
with pq.ParquetFile(f) as pf:
row_group = pf.read_row_group(row.parquet_row, columns=columns)
if columns == ["thumbnail"]:
stream = BytesIO(row_group['thumbnail'][0].as_py())
return Image.open(stream)
else:
row_output = {}
for col in columns:
bytes = row_group[col][0].as_py()
if col != 'thumbnail':
row_output[col] = read_tif_bytes(bytes)
else:
stream = BytesIO(bytes)
row_output[col] = Image.open(stream)
return row_output
def filter_download(df, local_dir, source_name, by_row = False, verbose = False, tif_columns=None):
"""Downloads and unpacks the data of Major-TOM based on a metadata dataframe
Args:
df (geopandas dataframe): Metadata dataframe
local_dir (str or Path) : Path to the where the data is to be stored locally
source_name (str) : Name alias of the resulting dataset
by_row (bool): If True, it will access individual rows of parquet via http - otherwise entire parquets are downloaded temporarily
verbose (bool) : option for potential internal state printing
tif_columns (list of str) : Optionally specified columns to be downloaded as .tifs, e.g. ['B04', 'B03', 'B02']
Returns:
None
"""
if isinstance(local_dir, str):
local_dir = Path(local_dir)
temp_file = local_dir / 'temp.parquet'
# identify all parquets that need to be downloaded (group them)
urls = df.parquet_url.unique()
print('Starting download of {} parquet files.'.format(len(urls))) if verbose else None
for url in tqdm(urls, desc='Downloading and unpacking...', disable=not verbose):
# identify all relevant rows
rows = df[df.parquet_url == url].parquet_row.unique()
if not by_row: # (downloads entire parquet)
# download a temporary file
temp_path, http_resp = urllib.request.urlretrieve(url, temp_file)
else:
f=fsspec.open(url)
temp_path = f.open()
# populate the bands
with pq.ParquetFile(temp_path) as pf:
for row_idx in rows:
table = pf.read_row_group(row_idx)
product_id = table['product_id'][0].as_py()
grid_cell = table['grid_cell'][0].as_py()
row = grid_cell.split('_')[0]
dest = local_dir / Path("{}/{}/{}/{}".format(source_name, row, grid_cell, product_id))
dest.mkdir(exist_ok=True, parents=True)
columns = [col for col in table.column_names if col[0] == 'B'] + ['cloud_mask'] if tif_columns is None else tif_columns
# tifs
for col in columns:
with open(dest / "{}.tif".format(col), "wb") as f:
# Write bytes to file
f.write(table[col][0].as_py())
# thumbnail (png)
col = 'thumbnail'
with open(dest / "{}.png".format(col), "wb") as f:
# Write bytes to file
f.write(table[col][0].as_py())
if not by_row:
# remove downloaded file
os.remove(temp_path)
else:
f.close()