Weights-ResidualsModels-MappingInference-SOCmapping / Archive /SimpleTimeModel /RandomForest /dataloader /dataloaderMapping.py
| import numpy as np | |
| import torch | |
| from torch.utils.data import Dataset, DataLoader | |
| import os | |
| from pathlib import Path | |
| import re | |
| import glob | |
| class RasterTensorDatasetMapping(Dataset): | |
| def __init__(self, base_path): | |
| """ | |
| Initialize the dataset | |
| Parameters: | |
| base_path: str, base path to RasterTensorData directory | |
| subfolder: str, name of the subfolder (e.g., 'Elevation') | |
| """ | |
| # Replace "OC_LUCAS_LFU_LfL_Coordinates" with "RasterTensorData" in the base path | |
| self.base_path = Path(base_path.replace("Coordinates1Mil", "RasterTensorData")) | |
| self.folder_path = self.base_path | |
| # Create ID to filename mapping | |
| self.id_to_file = self._create_id_mapping() | |
| # Load all numpy arrays into memory (optional, can be modified to load on demand) | |
| self.data_cache = {} | |
| for id_num, filepath in self.id_to_file.items(): | |
| self.data_cache[id_num] = np.load(filepath) | |
| def _create_id_mapping(self): | |
| """Create a dictionary mapping IDs to their corresponding file paths""" | |
| id_to_file = {} | |
| for file_path in self.folder_path.glob("*.npy"): | |
| # Extract ID number from filename | |
| match = re.search(r'ID(\d+)N', file_path.name) | |
| if match: | |
| id_num = int(match.group(1)) | |
| id_to_file[id_num] = file_path | |
| return id_to_file | |
| def get_tensor_by_location(self, id_num, x, y, window_size=17): | |
| """ | |
| Get a window_size x window_size square around the specified x,y coordinates | |
| Parameters: | |
| id_num: int, ID number from filename | |
| x: int, x coordinate | |
| y: int, y coordinate | |
| window_size: int, size of the square window (default 17) | |
| Returns: | |
| torch.Tensor: window_size x window_size tensor | |
| """ | |
| if id_num not in self.id_to_file: | |
| raise ValueError(f"ID {id_num} not found in dataset") | |
| # Get the data array | |
| if id_num in self.data_cache: | |
| data = self.data_cache[id_num] | |
| else: | |
| data = np.load(self.id_to_file[id_num]) | |
| # Calculate window boundaries | |
| half_window = window_size // 2 | |
| x_start = int(max(0, x - half_window)) | |
| x_end = int(min(data.shape[0], x + half_window + 1)) | |
| y_start = int(max(0, y - half_window)) | |
| y_end = int(min(data.shape[1], y + half_window + 1)) | |
| # Extract window | |
| window = data[x_start:x_end, y_start:y_end] | |
| # Pad if necessary | |
| if window.shape != (window_size, window_size): | |
| padded_window = np.zeros((window_size, window_size)) | |
| x_offset = half_window - (x - x_start) | |
| y_offset = half_window - (y - y_start) | |
| padded_window[ | |
| x_offset:x_offset+window.shape[0], | |
| y_offset:y_offset+window.shape[1] | |
| ] = window | |
| window = padded_window | |
| return torch.from_numpy(window).float() | |
| def __len__(self): | |
| return len(self.id_to_file) | |
| def __getitem__(self, idx): | |
| # This is a placeholder implementation | |
| # Modify according to your specific needs | |
| id_num = list(self.id_to_file.keys())[idx] | |
| return self.data_cache[id_num] | |
| # Example usage: | |
| """ | |
| # Initialize the dataset | |
| base_path = "/content/drive/MyDrive/Colab Notebooks/MappingSOC/Data/RasterTensorData" | |
| dataset = RasterTensorDataset(base_path, "Elevation") | |
| # Get the dictionary mapping IDs to filenames | |
| id_mapping = dataset.id_to_file | |
| print("ID to filename mapping:", id_mapping) | |
| # Get a 17x17 window for a specific location | |
| id_num = 10 # example ID | |
| x, y = 100, 100 # example coordinates | |
| window = dataset.get_tensor_by_location(id_num, x, y) | |
| print("Window shape:", window.shape) | |
| # Create a DataLoader if needed | |
| from torch.utils.data import DataLoader | |
| dataloader = DataLoader(dataset, batch_size=4, shuffle=True) | |
| """ | |
| class MultiRasterDatasetMapping(Dataset): | |
| def __init__(self, subfolders, dataframe): | |
| """ | |
| Parameters: | |
| subfolders: list of str, names of subfolders to include | |
| dataframe: pandas.DataFrame, contains columns GPS_LONG, GPS_LAT, and OC (target variable) | |
| """ | |
| self.subfolders = subfolders | |
| self.dataframe = dataframe | |
| self.datasets = { | |
| subfolder: RasterTensorDatasetMapping(subfolder) | |
| for subfolder in subfolders | |
| } | |
| self.coordinates = { | |
| subfolder: np.load(f"{subfolder}/coordinates.npy") | |
| for subfolder in subfolders | |
| } | |
| def find_coordinates_index(self, subfolder, longitude, latitude): | |
| """ | |
| Finds the index of the matching coordinates in the subfolder's coordinates.npy file. | |
| Parameters: | |
| subfolder: str, name of the subfolder | |
| longitude: float, longitude to match | |
| latitude: float, latitude to match | |
| Returns: | |
| tuple: (id_num, x, y) if match is found, otherwise raises an error | |
| """ | |
| coords = self.coordinates[subfolder] | |
| # Assuming the first two columns of `coordinates.npy` are longitude and latitude | |
| match = np.where((coords[:, 1] == longitude) & (coords[:, 0] == latitude))[0] | |
| if match.size == 0: | |
| raise ValueError(f"Coordinates ({longitude}, {latitude}) not found in {subfolder}") | |
| # Return id_num, x, y from the same row | |
| return coords[match[0], 2], coords[match[0], 3], coords[match[0], 4] | |
| def __getitem__(self, index): | |
| """ | |
| Retrieve tensor and target value for a given index. | |
| Parameters: | |
| index: int, index of the row in the dataframe | |
| Returns: | |
| tuple: (tensor, OC), where tensor is the data and OC is the target variable | |
| """ | |
| row = self.dataframe.iloc[index] | |
| longitude, latitude = row["longitude"], row["latitude"] | |
| tensors = {} | |
| for subfolder in self.subfolders: | |
| id_num, x, y = self.find_coordinates_index(subfolder, longitude, latitude) | |
| tensors[subfolder] = self.datasets[subfolder].get_tensor_by_location(id_num, x, y) | |
| return longitude, latitude, tensors | |
| def __len__(self): | |
| """ | |
| Return the number of samples in the dataset. | |
| """ | |
| return len(self.dataframe) | |
| def get_tensor_by_location(self, subfolder, id_num, x, y): | |
| """Get tensor from specific subfolder dataset""" | |
| return self.datasets[subfolder].get_tensor_by_location(id_num, x, y) | |