|
|
import numpy as np |
|
|
import math |
|
|
import pandas as pd |
|
|
import geopandas as gpd |
|
|
from shapely.geometry import LineString, Polygon |
|
|
from tqdm import tqdm |
|
|
import re |
|
|
|
|
|
|
|
|
|
|
|
class Grid(): |
|
|
|
|
|
RADIUS_EQUATOR = 6378.137 |
|
|
|
|
|
def __init__(self,dist,latitude_range=(-85,85),longitude_range=(-180,180),utm_definition='bottomleft'): |
|
|
self.dist = dist |
|
|
self.latitude_range = latitude_range |
|
|
self.longitude_range = longitude_range |
|
|
self.utm_definition = utm_definition |
|
|
self.rows,self.lats = self.get_rows() |
|
|
self.points, self.points_by_row = self.get_points() |
|
|
|
|
|
def get_rows(self): |
|
|
|
|
|
|
|
|
arc_pole_to_pole = math.pi * self.RADIUS_EQUATOR |
|
|
num_divisions_in_hemisphere = math.ceil(arc_pole_to_pole / self.dist) |
|
|
|
|
|
latitudes = np.linspace(-90, 90, num_divisions_in_hemisphere+1)[:-1] |
|
|
latitudes = np.mod(latitudes, 180) - 90 |
|
|
|
|
|
|
|
|
latitudes = np.sort(latitudes) |
|
|
|
|
|
zeroth_row = np.searchsorted(latitudes,0) |
|
|
|
|
|
|
|
|
rows = [None] * len(latitudes) |
|
|
rows[zeroth_row:] = [f'{i}U' for i in range(len(latitudes)-zeroth_row)] |
|
|
rows[:zeroth_row] = [f'{abs(i-zeroth_row)}D' for i in range(zeroth_row)] |
|
|
|
|
|
|
|
|
idxs = (latitudes>=self.latitude_range[0]) * (latitudes<=self.latitude_range[1]) |
|
|
rows,latitudes = np.array(rows), np.array(latitudes) |
|
|
rows,latitudes = rows[idxs],latitudes[idxs] |
|
|
|
|
|
return rows,latitudes |
|
|
|
|
|
def get_circumference_at_latitude(self,lat): |
|
|
|
|
|
|
|
|
|
|
|
radius_at_lat = self.RADIUS_EQUATOR * math.cos(lat * math.pi / 180) |
|
|
circumference = 2 * math.pi * radius_at_lat |
|
|
|
|
|
return circumference |
|
|
|
|
|
def subdivide_circumference(self,lat,return_cols=False): |
|
|
|
|
|
|
|
|
|
|
|
circumference = self.get_circumference_at_latitude(lat) |
|
|
num_divisions = math.ceil(circumference / self.dist) |
|
|
longitudes = np.linspace(-180,180, num_divisions+1)[:-1] |
|
|
longitudes = np.mod(longitudes, 360) - 180 |
|
|
longitudes = np.sort(longitudes) |
|
|
|
|
|
|
|
|
if return_cols: |
|
|
cols = [None] * len(longitudes) |
|
|
zeroth_idx = np.where(longitudes==0)[0][0] |
|
|
cols[zeroth_idx:] = [f'{i}R' for i in range(len(longitudes)-zeroth_idx)] |
|
|
cols[:zeroth_idx] = [f'{abs(i-zeroth_idx)}L' for i in range(zeroth_idx)] |
|
|
return np.array(cols),np.array(longitudes) |
|
|
|
|
|
return np.array(longitudes) |
|
|
|
|
|
def get_points(self): |
|
|
|
|
|
r_idx = 0 |
|
|
points_by_row = [None]*len(self.rows) |
|
|
for r,lat in zip(self.rows,self.lats): |
|
|
point_names,grid_row_names,grid_col_names,grid_row_idx,grid_col_idx,grid_lats,grid_lons,utm_zones,epsgs = [],[],[],[],[],[],[],[],[] |
|
|
cols,lons = self.subdivide_circumference(lat,return_cols=True) |
|
|
|
|
|
cols,lons = self.filter_longitude(cols,lons) |
|
|
c_idx = 0 |
|
|
for c,lon in zip(cols,lons): |
|
|
point_names.append(f'{r}_{c}') |
|
|
grid_row_names.append(r) |
|
|
grid_col_names.append(c) |
|
|
grid_row_idx.append(r_idx) |
|
|
grid_col_idx.append(c_idx) |
|
|
grid_lats.append(lat) |
|
|
grid_lons.append(lon) |
|
|
if self.utm_definition == 'bottomleft': |
|
|
utm_zones.append(get_utm_zone_from_latlng([lat,lon])) |
|
|
elif self.utm_definition == 'center': |
|
|
center_lat = lat + (1000*self.dist/2)/111_120 |
|
|
center_lon = lon + (1000*self.dist/2)/(111_120*math.cos(center_lat*math.pi/180)) |
|
|
utm_zones.append(get_utm_zone_from_latlng([center_lat,center_lon])) |
|
|
else: |
|
|
raise ValueError(f'Invalid utm_definition {self.utm_definition}') |
|
|
epsgs.append(f'EPSG:{utm_zones[-1]}') |
|
|
|
|
|
c_idx += 1 |
|
|
points_by_row[r_idx] = gpd.GeoDataFrame({ |
|
|
'name':point_names, |
|
|
'row':grid_row_names, |
|
|
'col':grid_col_names, |
|
|
'row_idx':grid_row_idx, |
|
|
'col_idx':grid_col_idx, |
|
|
'utm_zone':utm_zones, |
|
|
'epsg':epsgs |
|
|
},geometry=gpd.points_from_xy(grid_lons,grid_lats)) |
|
|
r_idx += 1 |
|
|
points = gpd.GeoDataFrame(pd.concat(points_by_row)) |
|
|
|
|
|
return points, points_by_row |
|
|
|
|
|
def group_points_by_row(self): |
|
|
|
|
|
points_by_row = [None]*len(self.rows) |
|
|
for i,row in enumerate(self.rows): |
|
|
points_by_row[i] = self.points[self.points.row==row] |
|
|
return points_by_row |
|
|
|
|
|
def filter_longitude(self,cols,lons): |
|
|
idxs = (lons>=self.longitude_range[0]) * (lons<=self.longitude_range[1]) |
|
|
cols,lons = cols[idxs],lons[idxs] |
|
|
return cols,lons |
|
|
|
|
|
def latlon2rowcol(self,lats,lons,return_idx=False,integer=False): |
|
|
""" |
|
|
Convert latitude and longitude to row and column number from the grid |
|
|
""" |
|
|
|
|
|
rows = np.searchsorted(self.lats,lats)-1 |
|
|
|
|
|
|
|
|
possible_points = [self.points_by_row[row] for row in rows] |
|
|
|
|
|
|
|
|
cols = [poss_points.iloc[np.searchsorted(poss_points.geometry.x,lon)-1].col for poss_points,lon in zip(possible_points,lons)] |
|
|
rows = self.rows[rows].tolist() |
|
|
|
|
|
outputs = [rows, cols] |
|
|
if return_idx: |
|
|
|
|
|
idx = [self.points[(self.points.row==row) & (self.points.col==col)].index.values[0] for row,col in zip(rows,cols)] |
|
|
outputs.append(idx) |
|
|
|
|
|
|
|
|
if integer: |
|
|
outputs[0] = [int(el[:-1]) if el[-1] == 'U' else -int(el[:-1]) for el in outputs[0]] |
|
|
outputs[1] = [int(el[:-1]) if el[-1] == 'R' else -int(el[:-1]) for el in outputs[1]] |
|
|
|
|
|
return outputs |
|
|
|
|
|
def rowcol2latlon(self,rows,cols): |
|
|
point_geoms = [self.points.loc[(self.points.row==row) & (self.points.col==col),'geometry'].values[0] for row,col in zip(rows,cols)] |
|
|
lats = [point.y for point in point_geoms] |
|
|
lons = [point.x for point in point_geoms] |
|
|
return lats,lons |
|
|
|
|
|
def get_bounded_footprint(self,point,buffer_ratio=0): |
|
|
|
|
|
|
|
|
|
|
|
bottom,left = point.geometry.y,point.geometry.x |
|
|
row_idx = point.row_idx |
|
|
col_idx = point.col_idx |
|
|
next_row_idx = row_idx+1 |
|
|
next_col_idx = col_idx+1 |
|
|
|
|
|
if next_row_idx >= len(self.lats): |
|
|
height = (self.lats[row_idx] - self.lats[row_idx-1]) |
|
|
top = self.lats[row_idx] + height |
|
|
else: |
|
|
top = self.lats[next_row_idx] |
|
|
|
|
|
max_col = len(self.points_by_row[row_idx].col_idx)-1 |
|
|
if next_col_idx > max_col: |
|
|
width = (self.points_by_row[row_idx].iloc[col_idx].geometry.x - self.points_by_row[row_idx].iloc[col_idx-1].geometry.x) |
|
|
right = self.points_by_row[row_idx].iloc[col_idx].geometry.x + width |
|
|
else: |
|
|
right = self.points_by_row[row_idx].iloc[next_col_idx].geometry.x |
|
|
|
|
|
|
|
|
width = right - left |
|
|
height = top - bottom |
|
|
|
|
|
buffer_horizontal = width * buffer_ratio |
|
|
buffer_vertical = height * buffer_ratio |
|
|
|
|
|
new_left = left - buffer_horizontal |
|
|
new_right = right + buffer_horizontal |
|
|
|
|
|
new_bottom = bottom - buffer_vertical |
|
|
new_top = top + buffer_vertical |
|
|
|
|
|
bbox = Polygon([(new_left,new_bottom),(new_left,new_top),(new_right,new_top),(new_right,new_bottom)]) |
|
|
|
|
|
return bbox |
|
|
|
|
|
def get_utm_zone_from_latlng(latlng): |
|
|
""" |
|
|
Get the UTM zone from a latlng list and return the corresponding EPSG code. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
latlng : List[Union[int, float]] |
|
|
The latlng list to get the UTM zone from. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
str |
|
|
The EPSG code for the UTM zone. |
|
|
""" |
|
|
assert isinstance(latlng, (list, tuple)), "latlng must be in the form of a list or tuple." |
|
|
|
|
|
longitude = latlng[1] |
|
|
latitude = latlng[0] |
|
|
|
|
|
zone_number = (math.floor((longitude + 180) / 6)) % 60 + 1 |
|
|
|
|
|
|
|
|
if latitude >= 56.0 and latitude < 64.0 and longitude >= 3.0 and longitude < 12.0: |
|
|
zone_number = 32 |
|
|
elif latitude >= 72.0 and latitude < 84.0: |
|
|
if longitude >= 0.0 and longitude < 9.0: |
|
|
zone_number = 31 |
|
|
elif longitude >= 9.0 and longitude < 21.0: |
|
|
zone_number = 33 |
|
|
elif longitude >= 21.0 and longitude < 33.0: |
|
|
zone_number = 35 |
|
|
elif longitude >= 33.0 and longitude < 42.0: |
|
|
zone_number = 37 |
|
|
|
|
|
|
|
|
if latitude < 0: |
|
|
epsg_code = f"327{zone_number:02d}" |
|
|
else: |
|
|
epsg_code = f"326{zone_number:02d}" |
|
|
if not re.match(r"32[6-7](0[1-9]|[1-5][0-9]|60)",epsg_code): |
|
|
print(f"latlng: {latlng}, epsg_code: {epsg_code}") |
|
|
raise ValueError(f"out of bound latlng resulted in incorrect EPSG code for the point") |
|
|
|
|
|
return epsg_code |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
|
|
assert get_utm_zone_from_latlng([-1,-174.34]) == "32701" |
|
|
assert get_utm_zone_from_latlng([48,-4]) == "32630" |
|
|
assert get_utm_zone_from_latlng([78,13]) == "32633" |
|
|
assert get_utm_zone_from_latlng([-34,19.7]) == "32734" |
|
|
assert get_utm_zone_from_latlng([-36,175.7]) == "32760" |
|
|
|
|
|
|
|
|
dist = 100 |
|
|
grid = Grid(dist) |
|
|
|
|
|
np.random.seed(0) |
|
|
test_lons = np.random.uniform(-20,20,size=(1000)) % 180 |
|
|
test_lats = np.random.uniform(-20,68,size=(1000)) |
|
|
|
|
|
test_rows,test_cols = grid.latlon2rowcol(test_lats,test_lons) |
|
|
test_lats2,test_lons2 = grid.rowcol2latlon(test_rows,test_cols) |
|
|
|
|
|
print(test_lons[:10]) |
|
|
print(test_lats[:10]) |
|
|
print(test_rows[:10]) |
|
|
print(test_cols[:10]) |
|
|
|
|
|
|
|
|
lines = [] |
|
|
for i in range(len(test_lats)): |
|
|
lines.append([(test_lons[i],test_lats[i]),(test_lons2[i],test_lats2[i])]) |
|
|
|
|
|
lines = gpd.GeoDataFrame(geometry=gpd.GeoSeries([LineString(line) for line in lines])) |
|
|
|
|
|
lines.to_file(f'testlines_{dist}km.geojson',driver='GeoJSON') |
|
|
grid.points.to_file(f'testgrid_{dist}km.geojson',driver='GeoJSON') |
|
|
|