| | import numpy as np |
| | from scipy.sparse import dok_matrix, save_npz, load_npz |
| | from tqdm.auto import tqdm |
| | import json |
| | import os |
| |
|
| | class LonLatMapper: |
| | def __init__(self, x_corner, y_corner, cellsize, rows, cols): |
| | """ |
| | Khởi tạo bộ ánh xạ lon-lat sang x-y và ngược lại. |
| | - x_corner: Tọa độ x (kinh độ) của góc dưới-trái ô đầu tiên. |
| | - y_corner: Tọa độ y (vĩ độ) của góc dưới-trái ô đầu tiên. |
| | - cellsize: Kích thước ô lưới (độ hoặc mét). |
| | - rows: Số hàng của lưới. |
| | - cols: Số cột của lưới. |
| | """ |
| | self.x_corner = x_corner |
| | self.y_corner = y_corner |
| | self.cellsize = cellsize |
| | self.rows = rows |
| | self.cols = cols |
| | |
| | def lonlat2xy(self, lon, lat): |
| | """ |
| | Ánh xạ (lon, lat) sang chỉ số ô lưới (x, y). |
| | - lon: Kinh độ (hoặc tọa độ x trong hệ chiếu). |
| | - lat: Vĩ độ (hoặc tọa độ y trong hệ chiếu). |
| | - Trả về: (x, y) - chỉ số cột và hàng. |
| | """ |
| | x = int(round((lon - self.x_corner) / self.cellsize)) |
| | y = abs(int(round((lat + self.y_corner) / self.cellsize))) |
| | |
| | |
| | if not (0 <= x < self.cols and 0 <= y < self.rows): |
| | raise ValueError(f"Chỉ số (x={x}, y={y}) ngoài phạm vi lưới ({self.cols}x{self.rows})") |
| | |
| | return x, y |
| | |
| | def xy2lonlat(self, x, y): |
| | """ |
| | Ánh xạ chỉ số ô lưới (x, y) sang (lon, lat). |
| | - x: Chỉ số cột. |
| | - y: Chỉ số hàng. |
| | - Trả về: (lon, lat) - kinh độ và vĩ độ. |
| | """ |
| | |
| | if not (0 <= x < self.cols and 0 <= y < self.rows): |
| | raise ValueError(f"Chỉ số (x={x}, y={y}) ngoài phạm vi lưới ({self.cols}x{self.rows})") |
| | |
| | lon = self.x_corner + x * self.cellsize |
| | lat = abs(self.y_corner) - y * self.cellsize |
| | return lon, lat |
| | |
| | def lonlat2xy_batch(self, lons, lats): |
| | """ |
| | Ánh xạ hàng loạt (lon, lat) sang (x, y). |
| | - lons, lats: Mảng hoặc danh sách kinh độ và vĩ độ. |
| | - Trả về: Danh sách (x, y). |
| | """ |
| | lons = np.asarray(lons) |
| | lats = np.asarray(lats) |
| | x = np.round((lons - self.x_corner) / self.cellsize).astype(int) |
| | y = np.abs(np.round((lats + self.y_corner) / self.cellsize)).astype(int) |
| | |
| | |
| | valid = (x >= 0) & (x < self.cols) & (y >= 0) & (y < self.rows) |
| | if not valid.all(): |
| | raise ValueError("Một số tọa độ ngoài phạm vi lưới") |
| | |
| | return list(zip(x, y)) |
| | |
| | def xy2lonlat_batch(self, xs, ys): |
| | """ |
| | Ánh xạ hàng loạt (x, y) sang (lon, lat). |
| | - xs, ys: Mảng hoặc danh sách chỉ số cột và hàng. |
| | - Trả về: Danh sách (lon, lat). |
| | """ |
| | xs = np.asarray(xs) |
| | ys = np.asarray(ys) |
| | |
| | |
| | valid = (xs >= 0) & (xs < self.cols) & (ys >= 0) & (ys < self.rows) |
| | if not valid.all(): |
| | raise ValueError("Một số chỉ số ngoài phạm vi lưới") |
| | |
| | lons = self.x_corner + xs * self.cellsize |
| | lats = abs(self.y_corner) - ys * self.cellsize |
| | return list(zip(lons, lats)) |
| |
|
| |
|
| | class SolidData(): |
| | def __init__(self, M, rows, cols, x_corner, y_corner, cellsize, name, description="Unknown"): |
| | self.M, self.rows, self.cols, self.x_corner, self.y_corner, self.cellsize = M, rows, cols, x_corner, y_corner, cellsize |
| | self.name = name |
| | self.description = description |
| | self.mapper = LonLatMapper(self.x_corner, self.y_corner, self.cellsize, self.rows, self.cols) |
| |
|
| | def __getitem__(self, key): |
| | """ |
| | Truy cập ma trận thưa hoặc metadata qua toán tử []. |
| | - key là tuple (i,j): Trả về giá trị M[i,j] (hoặc 0 nếu không tồn tại). i, j là lon, lat |
| | """ |
| | if isinstance(key, tuple): |
| | if self.M is None: |
| | raise ValueError("Ma trận chưa được tải. Gọi load_data() trước.") |
| | x, y = self.mapper.lonlat2xy(key[0], key[1]) |
| | return self.M[y, x] |
| | else: |
| | raise TypeError("Khóa phải là tuple (lon, lat)") |
| | |
| | @staticmethod |
| | def read_asc(file_path, name, dtype='int', description="Unknown"): |
| | |
| | with open(file_path, "r") as file: |
| | cols = int(file.readline().split()[1]) |
| | rows = int(file.readline().split()[1]) |
| | x_corner = float(file.readline().split()[1]) |
| | y_corner = float(file.readline().split()[1]) |
| | cellsize = float(file.readline().split()[1]) |
| | nodata_val = float(file.readline().split()[1]) |
| |
|
| | |
| | data = np.loadtxt(file_path, skiprows=6, dtype=np.float64) |
| |
|
| | |
| | mask = (data != nodata_val) |
| | if dtype == 'int': |
| | data = data.astype(np.int64) |
| | elif dtype == 'float': |
| | data = data.astype(np.float64) |
| |
|
| | |
| | M = dok_matrix((rows, cols), dtype=data.dtype) |
| | rows_idx, cols_idx = np.where(mask) |
| | M[rows_idx, cols_idx] = data[rows_idx, cols_idx] |
| |
|
| | return SolidData(M, rows, cols, x_corner, y_corner, cellsize, name, description) |
| | |
| | def save_dok_matrix(self, output_dir): |
| | """ |
| | Lưu ma trận dok_matrix và metadata vào thư mục output_dir. |
| | - output_dir: Thư mục lưu tệp. |
| | - Tệp ma trận: <tên thư mục>.npz. |
| | - Tệp metadata: metadata.json. |
| | """ |
| | print("Bắt đầu lưu dữ liệu: ") |
| | |
| | folder_name = os.path.basename(os.path.normpath(output_dir)) |
| | |
| | |
| | os.makedirs(output_dir, exist_ok=True) |
| | |
| | |
| | matrix_path = os.path.join(output_dir, f"{folder_name}.npz") |
| | metadata_path = os.path.join(output_dir, "metadata.json") |
| | |
| | |
| | M_csr = self.M.tocsr() |
| | save_npz(matrix_path, M_csr) |
| | |
| | |
| | metadata = { |
| | 'name' : self.name, |
| | 'rows' : self.rows, |
| | 'cols' : self.cols, |
| | 'x_corner': self.x_corner, |
| | 'y_corner': self.y_corner, |
| | 'cellsize': self.cellsize, |
| | 'description' : self.description |
| | } |
| | with open(metadata_path, 'w') as f: |
| | json.dump(metadata, f) |
| | |
| | @staticmethod |
| | def load_dok_matrix(input_dir): |
| | """ |
| | Đọc ma trận dok_matrix và metadata từ thư mục input_dir. |
| | - input_dir: Thư mục chứa tệp <tên thư mục>.npz và metadata.json. |
| | - Trả về: (M, x_corner, y_corner, cellsize) |
| | """ |
| | |
| | folder_name = os.path.basename(os.path.normpath(input_dir)) |
| | |
| | |
| | matrix_path = os.path.join(input_dir, f"{folder_name}.npz") |
| | metadata_path = os.path.join(input_dir, "metadata.json") |
| | |
| | |
| | M_csr = load_npz(matrix_path) |
| | M = M_csr.todok() |
| | |
| | |
| | with open(metadata_path, 'r') as f: |
| | metadata = json.load(f) |
| | |
| | rows = metadata['rows'] |
| | cols = metadata['cols'] |
| | x_corner = metadata['x_corner'] |
| | y_corner = metadata['y_corner'] |
| | cellsize = metadata['cellsize'] |
| | description = metadata['description'] |
| | name = metadata['name'] |
| | data = SolidData(M, rows, cols, x_corner, y_corner, cellsize, name, description) |
| | return data |
| |
|
| | def sparse2dict(self): |
| | sparse_dict = { |
| | 'lon': [], |
| | 'lat': [], |
| | self.name: [], |
| | } |
| |
|
| | llmapper = LonLatMapper(self.x_corner, self.y_corner, self.cellsize, self.rows, self.cols) |
| | for i in tqdm(self.M.items()): |
| | y, x = i[0] |
| | lon, lat = llmapper.xy2lonlat(x, y) |
| | sparse_dict['lon'].append(lon) |
| | sparse_dict['lat'].append(lat) |
| | sparse_dict[self.name].append(i[1]) |
| | return sparse_dict |
| | |
| | def plot(self, output_file=None): |
| | import matplotlib.pyplot as plt |
| | import vaex |
| | |
| | data_dict = self.sparse2dict() |
| | df = vaex.from_dict(data_dict) |
| | df.viz.heatmap(df.lon, df.lat, what=vaex.stat.mean(getattr(df, self.name))) |
| | if output_file: |
| | print("Bắt đầu lưu biểu đồ") |
| | folder_name = os.path.dirname(output_file) |
| | |
| | os.makedirs(folder_name, exist_ok=True) |
| | plt.savefig(output_file) |
| | else: |
| | print("Bắt đầu plot biểu đồ:") |
| | plt.show() |
| | plt.close() |
| |
|
| |
|
| |
|