| import torch | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.preprocessing import MinMaxScaler | |
| import joblib | |
| import os | |
| from typing import Tuple, Dict | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| class MultiHouseDataset(torch.utils.data.Dataset): | |
| def __init__(self, data_dir: str, window_size: int = 96, step_size: int = 1, | |
| scaler_path: str = 'global_scaler.gz', cache_in_memory: bool = True, | |
| dtype: torch.dtype = torch.float32, limit_to_one_year: bool = True): | |
| self.window_size = window_size | |
| self.step_size = step_size | |
| self.cache_in_memory = cache_in_memory | |
| self.dtype = dtype | |
| self.limit_to_one_year = limit_to_one_year | |
| all_files = sorted([f for f in os.listdir(data_dir) if f.endswith('.csv')]) | |
| print(f"Found {len(all_files)} house files in '{data_dir}'.") | |
| self.num_houses = len(all_files) | |
| print("Reading house data...") | |
| if self.limit_to_one_year: | |
| print("INFO: Limiting data to the first year (17,520 samples) for each house.") | |
| data_per_house = [] | |
| timestamps_per_house = [] | |
| SAMPLES_PER_YEAR = 17520 | |
| for filename in all_files: | |
| df = pd.read_csv(os.path.join(data_dir, filename), parse_dates=['timestamp']) | |
| timestamps_per_house.append(df['timestamp'].values) | |
| time_series_values = df[['grid_usage', 'solar_generation']].values.astype(np.float32) | |
| if self.limit_to_one_year: | |
| time_series_values = time_series_values[:SAMPLES_PER_YEAR] | |
| num_timesteps = len(time_series_values) | |
| timesteps_of_day = np.arange(num_timesteps) % 48 | |
| sin_time = np.sin(2 * np.pi * timesteps_of_day / 48.0).astype(np.float32) | |
| cos_time = np.cos(2 * np.pi * timesteps_of_day / 48.0).astype(np.float32) | |
| time_series_values = np.concatenate([ | |
| time_series_values, | |
| sin_time[:, np.newaxis], | |
| cos_time[:, np.newaxis] | |
| ], axis=1) | |
| data_per_house.append(time_series_values) | |
| if os.path.exists(scaler_path): | |
| scaler = joblib.load(scaler_path) | |
| print(f"Scaler loaded from {scaler_path}") | |
| else: | |
| print("Fitting global scaler...") | |
| combined_data = np.vstack(data_per_house) | |
| scaler = MinMaxScaler(feature_range=(-1, 1)) | |
| scaler.fit(combined_data) | |
| joblib.dump(scaler, scaler_path) | |
| print(f"Scaler saved to {scaler_path}") | |
| if self.cache_in_memory: | |
| print("Caching normalized data...") | |
| self.normalized_data_per_house = [] | |
| for series in data_per_house: | |
| normalized = scaler.transform(series) | |
| tensor_data = torch.from_numpy(normalized).to(dtype=self.dtype) | |
| self.normalized_data_per_house.append(tensor_data) | |
| else: | |
| self.normalized_data_per_house = [] | |
| for series in data_per_house: | |
| self.normalized_data_per_house.append(scaler.transform(series)) | |
| del data_per_house | |
| print("Pre-computing mappings...") | |
| self.windows_per_house = [(len(d) - self.window_size) // self.step_size + 1 for d in self.normalized_data_per_house] | |
| self.cumulative_windows = np.cumsum([0] + self.windows_per_house) | |
| self.total_windows = self.cumulative_windows[-1] | |
| self.sample_to_house = np.empty(self.total_windows, dtype=np.int32) | |
| self.sample_to_local_idx = np.empty(self.total_windows, dtype=np.int32) | |
| self.sample_to_day_of_week = np.empty(self.total_windows, dtype=np.int32) | |
| self.sample_to_day_of_year = np.empty(self.total_windows, dtype=np.int32) | |
| for house_idx in range(self.num_houses): | |
| start_global_idx = self.cumulative_windows[house_idx] | |
| end_global_idx = self.cumulative_windows[house_idx + 1] | |
| num_windows_for_this_house = self.windows_per_house[house_idx] | |
| self.sample_to_house[start_global_idx:end_global_idx] = house_idx | |
| local_indices = np.arange(num_windows_for_this_house) * self.step_size | |
| self.sample_to_local_idx[start_global_idx:end_global_idx] = local_indices | |
| house_timestamps = pd.Series(timestamps_per_house[house_idx][local_indices]) | |
| self.sample_to_day_of_week[start_global_idx:end_global_idx] = house_timestamps.dt.dayofweek | |
| self.sample_to_day_of_year[start_global_idx:end_global_idx] = house_timestamps.dt.dayofyear - 1 | |
| print(f"Dataset initialized. Total windows: {self.total_windows} from {self.num_houses} houses.") | |
| memory_usage = sum(data.numel() * data.element_size() for data in self.normalized_data_per_house) / 1e6 if self.cache_in_memory else 0 | |
| print(f"Memory usage for cached tensors: {memory_usage:.1f} MB") | |
| def __len__(self) -> int: | |
| return self.total_windows | |
| def __getitem__(self, idx: int) -> Tuple[torch.Tensor, Dict[str, torch.Tensor]]: | |
| if idx < 0 or idx >= self.total_windows: | |
| raise IndexError("Index out of range") | |
| house_index = self.sample_to_house[idx] | |
| local_start_pos = self.sample_to_local_idx[idx] | |
| window_data = self.normalized_data_per_house[house_index][local_start_pos : local_start_pos + self.window_size] | |
| conditions = { | |
| "house_id": torch.tensor(house_index, dtype=torch.long), | |
| "day_of_week": torch.tensor(self.sample_to_day_of_week[idx], dtype=torch.long), | |
| "day_of_year": torch.tensor(self.sample_to_day_of_year[idx], dtype=torch.long), | |
| } | |
| return window_data, conditions | |
| def get_memory_usage(self) -> dict: | |
| if self.cache_in_memory: | |
| tensor_memory = sum(data.numel() * data.element_size() for data in self.normalized_data_per_house) / 1e6 | |
| else: | |
| tensor_memory = 0 | |
| mapping_memory = (self.sample_to_house.nbytes + self.sample_to_local_idx.nbytes) / 1e6 | |
| return { | |
| 'tensor_cache_mb': tensor_memory, | |
| 'mapping_arrays_mb': mapping_memory, | |
| 'total_mb': tensor_memory + mapping_memory | |
| } | |
| class LatentDataset(torch.utils.data.Dataset): | |
| def __init__(self, latent_vectors: torch.Tensor, house_ids: torch.Tensor): | |
| assert len(latent_vectors) == len(house_ids), "Latent vectors and house IDs must have same length" | |
| self.latent_vectors = latent_vectors.contiguous() | |
| self.house_ids = house_ids.contiguous() | |
| def __len__(self) -> int: | |
| return len(self.latent_vectors) | |
| def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]: | |
| return self.latent_vectors[idx], self.house_ids[idx] | |
| if __name__ == "__main__": | |
| import time | |
| DATA_DIRECTORY = './data/per_house/' | |
| if os.path.exists(DATA_DIRECTORY): | |
| print("--- Testing Dataset Setup ---") | |
| start_time = time.time() | |
| dataset = MultiHouseDataset(data_dir=DATA_DIRECTORY, window_size=96, step_size=96) | |
| init_time = time.time() - start_time | |
| print(f"Dataset initialization: {init_time:.2f}s") | |
| print(f"Memory usage: {dataset.get_memory_usage()}") | |
| if len(dataset) > 0: | |
| first_sample, first_conditions = dataset[0] | |
| print(f"\nSample data shape: {first_sample.shape}") | |
| print(f"Sample conditions: {first_conditions}") | |
| print(f"Total houses: {dataset.num_houses}") | |
| else: | |
| print(f"ERROR: Data directory not found at '{DATA_DIRECTORY}'. Please create and populate this directory.") | |