Spaces:
Build error
Build error
File size: 5,805 Bytes
eb1aec4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | import pyarrow.parquet as pq
import pandas as pd
import geopandas as gpd
from pathlib import Path
import urllib.request
import fsspec
from fsspec.parquet import open_parquet_file
from io import BytesIO
from PIL import Image
from rasterio.io import MemoryFile
from tqdm.notebook import tqdm
import os
from .sample_helpers import *
def metadata_from_url(access_url, local_url):
local_url, response = urllib.request.urlretrieve(access_url, local_url)
df = pq.read_table(local_url).to_pandas()
df['timestamp'] = pd.to_datetime(df.timestamp)
gdf = gpd.GeoDataFrame(
df, geometry=gpd.points_from_xy(df.centre_lon, df.centre_lat), crs=df.crs.iloc[0]
)
return gdf
def filter_metadata(df,
region=None,
daterange=None,
cloud_cover=(0,100),
nodata=(0, 1.0)
):
"""Filters the Major-TOM dataframe based on several parameters
Args:
df (geopandas dataframe): Parent dataframe
region (shapely geometry object) : Region of interest
daterange (tuple) : Inclusive range of dates (example format: '2020-01-01')
cloud_cover (tuple) : Inclusive percentage range (0-100) of cloud cover
nodata (tuple) : Inclusive fraction (0.0-1.0) of no data allowed in a sample
Returns:
df: a filtered dataframe
"""
# temporal filtering
if daterange is not None:
assert (isinstance(daterange, list) or isinstance(daterange, tuple)) and len(daterange)==2
df = df[df.timestamp >= daterange[0]]
df = df[df.timestamp <= daterange[1]]
# spatial filtering
if region is not None:
idxs = df.sindex.query(region)
df = df.take(idxs)
# cloud filtering
if cloud_cover is not None:
df = df[df.cloud_cover >= cloud_cover[0]]
df = df[df.cloud_cover <= cloud_cover[1]]
# spatial filtering
if nodata is not None:
df = df[df.nodata >= nodata[0]]
df = df[df.nodata <= nodata[1]]
return df
def read_row(row, columns=["thumbnail"]):
"""Reads a row from a Major-TOM dataframe
Args:
row (row from geopandas dataframe): The row of metadata
columns (list): columns to be read from the file
Returns:
data (dict): dictionary with returned data from requested columns
"""
with open_parquet_file(row.parquet_url, columns=columns, footer_sample_size=2000000) as f:
with pq.ParquetFile(f) as pf:
row_group = pf.read_row_group(row.parquet_row, columns=columns)
if columns == ["thumbnail"]:
stream = BytesIO(row_group['thumbnail'][0].as_py())
return Image.open(stream)
else:
row_output = {}
for col in columns:
bytes = row_group[col][0].as_py()
if col != 'thumbnail':
row_output[col] = read_tif_bytes(bytes)
else:
stream = BytesIO(bytes)
row_output[col] = Image.open(stream)
return row_output
def filter_download(df, local_dir, source_name, by_row = False, verbose = False, tif_columns=None):
"""Downloads and unpacks the data of Major-TOM based on a metadata dataframe
Args:
df (geopandas dataframe): Metadata dataframe
local_dir (str or Path) : Path to the where the data is to be stored locally
source_name (str) : Name alias of the resulting dataset
by_row (bool): If True, it will access individual rows of parquet via http - otherwise entire parquets are downloaded temporarily
verbose (bool) : option for potential internal state printing
tif_columns (list of str) : Optionally specified columns to be downloaded as .tifs, e.g. ['B04', 'B03', 'B02']
Returns:
None
"""
if isinstance(local_dir, str):
local_dir = Path(local_dir)
temp_file = local_dir / 'temp.parquet'
# identify all parquets that need to be downloaded (group them)
urls = df.parquet_url.unique()
print('Starting download of {} parquet files.'.format(len(urls))) if verbose else None
for url in tqdm(urls, desc='Downloading and unpacking...', disable=not verbose):
# identify all relevant rows
rows = df[df.parquet_url == url].parquet_row.unique()
if not by_row: # (downloads entire parquet)
# download a temporary file
temp_path, http_resp = urllib.request.urlretrieve(url, temp_file)
else:
f=fsspec.open(url)
temp_path = f.open()
# populate the bands
with pq.ParquetFile(temp_path) as pf:
for row_idx in rows:
table = pf.read_row_group(row_idx)
product_id = table['product_id'][0].as_py()
grid_cell = table['grid_cell'][0].as_py()
row = grid_cell.split('_')[0]
dest = local_dir / Path("{}/{}/{}/{}".format(source_name, row, grid_cell, product_id))
dest.mkdir(exist_ok=True, parents=True)
columns = [col for col in table.column_names if col[0] == 'B'] + ['cloud_mask'] if tif_columns is None else tif_columns
# tifs
for col in columns:
with open(dest / "{}.tif".format(col), "wb") as f:
# Write bytes to file
f.write(table[col][0].as_py())
# thumbnail (png)
col = 'thumbnail'
with open(dest / "{}.png".format(col), "wb") as f:
# Write bytes to file
f.write(table[col][0].as_py())
if not by_row:
# remove downloaded file
os.remove(temp_path)
else:
f.close()
|