import numpy as np import torch from torch.utils.data import Dataset, DataLoader import os from pathlib import Path import re import glob class RasterTensorDatasetMapping(Dataset): def __init__(self, base_path): """ Initialize the dataset Parameters: base_path: str, base path to RasterTensorData directory subfolder: str, name of the subfolder (e.g., 'Elevation') """ # Replace "OC_LUCAS_LFU_LfL_Coordinates" with "RasterTensorData" in the base path self.base_path = Path(base_path.replace("Coordinates1Mil", "RasterTensorData")) self.folder_path = self.base_path # Create ID to filename mapping self.id_to_file = self._create_id_mapping() # Load all numpy arrays into memory (optional, can be modified to load on demand) self.data_cache = {} for id_num, filepath in self.id_to_file.items(): self.data_cache[id_num] = np.load(filepath) def _create_id_mapping(self): """Create a dictionary mapping IDs to their corresponding file paths""" id_to_file = {} for file_path in self.folder_path.glob("*.npy"): # Extract ID number from filename match = re.search(r'ID(\d+)N', file_path.name) if match: id_num = int(match.group(1)) id_to_file[id_num] = file_path return id_to_file def get_tensor_by_location(self, id_num, x, y, window_size=17): """ Get a window_size x window_size square around the specified x,y coordinates Parameters: id_num: int, ID number from filename x: int, x coordinate y: int, y coordinate window_size: int, size of the square window (default 17) Returns: torch.Tensor: window_size x window_size tensor """ if id_num not in self.id_to_file: raise ValueError(f"ID {id_num} not found in dataset") # Get the data array if id_num in self.data_cache: data = self.data_cache[id_num] else: data = np.load(self.id_to_file[id_num]) # Calculate window boundaries half_window = window_size // 2 x_start = int(max(0, x - half_window)) x_end = int(min(data.shape[0], x + half_window + 1)) y_start = int(max(0, y - half_window)) y_end = int(min(data.shape[1], y + half_window + 1)) # Extract window window = data[x_start:x_end, y_start:y_end] # Pad if necessary if window.shape != (window_size, window_size): padded_window = np.zeros((window_size, window_size)) x_offset = half_window - (x - x_start) y_offset = half_window - (y - y_start) padded_window[ x_offset:x_offset+window.shape[0], y_offset:y_offset+window.shape[1] ] = window window = padded_window return torch.from_numpy(window).float() def __len__(self): return len(self.id_to_file) def __getitem__(self, idx): # This is a placeholder implementation # Modify according to your specific needs id_num = list(self.id_to_file.keys())[idx] return self.data_cache[id_num] # Example usage: """ # Initialize the dataset base_path = "/content/drive/MyDrive/Colab Notebooks/MappingSOC/Data/RasterTensorData" dataset = RasterTensorDataset(base_path, "Elevation") # Get the dictionary mapping IDs to filenames id_mapping = dataset.id_to_file print("ID to filename mapping:", id_mapping) # Get a 17x17 window for a specific location id_num = 10 # example ID x, y = 100, 100 # example coordinates window = dataset.get_tensor_by_location(id_num, x, y) print("Window shape:", window.shape) # Create a DataLoader if needed from torch.utils.data import DataLoader dataloader = DataLoader(dataset, batch_size=4, shuffle=True) """ class MultiRasterDatasetMapping(Dataset): def __init__(self, subfolders, dataframe): """ Parameters: subfolders: list of str, names of subfolders to include dataframe: pandas.DataFrame, contains columns GPS_LONG, GPS_LAT, and OC (target variable) """ self.subfolders = subfolders self.dataframe = dataframe self.datasets = { subfolder: RasterTensorDatasetMapping(subfolder) for subfolder in subfolders } self.coordinates = { subfolder: np.load(f"{subfolder}/coordinates.npy") for subfolder in subfolders } def find_coordinates_index(self, subfolder, longitude, latitude): """ Finds the index of the matching coordinates in the subfolder's coordinates.npy file. Parameters: subfolder: str, name of the subfolder longitude: float, longitude to match latitude: float, latitude to match Returns: tuple: (id_num, x, y) if match is found, otherwise raises an error """ coords = self.coordinates[subfolder] # Assuming the first two columns of `coordinates.npy` are longitude and latitude match = np.where((coords[:, 1] == longitude) & (coords[:, 0] == latitude))[0] if match.size == 0: raise ValueError(f"Coordinates ({longitude}, {latitude}) not found in {subfolder}") # Return id_num, x, y from the same row return coords[match[0], 2], coords[match[0], 3], coords[match[0], 4] def __getitem__(self, index): """ Retrieve tensor and target value for a given index. Parameters: index: int, index of the row in the dataframe Returns: tuple: (tensor, OC), where tensor is the data and OC is the target variable """ row = self.dataframe.iloc[index] longitude, latitude = row["longitude"], row["latitude"] tensors = {} for subfolder in self.subfolders: id_num, x, y = self.find_coordinates_index(subfolder, longitude, latitude) tensors[subfolder] = self.datasets[subfolder].get_tensor_by_location(id_num, x, y) return longitude, latitude, tensors def __len__(self): """ Return the number of samples in the dataset. """ return len(self.dataframe) def get_tensor_by_location(self, subfolder, id_num, x, y): """Get tensor from specific subfolder dataset""" return self.datasets[subfolder].get_tensor_by_location(id_num, x, y)