import torch import pandas as pd import numpy as np from sklearn.preprocessing import MinMaxScaler import joblib import os from typing import Tuple, Dict import warnings warnings.filterwarnings('ignore') class MultiHouseDataset(torch.utils.data.Dataset): def __init__(self, data_dir: str, window_size: int = 96, step_size: int = 1, scaler_path: str = 'global_scaler.gz', cache_in_memory: bool = True, dtype: torch.dtype = torch.float32, limit_to_one_year: bool = True): self.window_size = window_size self.step_size = step_size self.cache_in_memory = cache_in_memory self.dtype = dtype self.limit_to_one_year = limit_to_one_year all_files = sorted([f for f in os.listdir(data_dir) if f.endswith('.csv')]) print(f"Found {len(all_files)} house files in '{data_dir}'.") self.num_houses = len(all_files) print("Reading house data...") if self.limit_to_one_year: print("INFO: Limiting data to the first year (17,520 samples) for each house.") data_per_house = [] timestamps_per_house = [] SAMPLES_PER_YEAR = 17520 for filename in all_files: df = pd.read_csv(os.path.join(data_dir, filename), parse_dates=['timestamp']) timestamps_per_house.append(df['timestamp'].values) time_series_values = df[['grid_usage', 'solar_generation']].values.astype(np.float32) if self.limit_to_one_year: time_series_values = time_series_values[:SAMPLES_PER_YEAR] num_timesteps = len(time_series_values) timesteps_of_day = np.arange(num_timesteps) % 48 sin_time = np.sin(2 * np.pi * timesteps_of_day / 48.0).astype(np.float32) cos_time = np.cos(2 * np.pi * timesteps_of_day / 48.0).astype(np.float32) time_series_values = np.concatenate([ time_series_values, sin_time[:, np.newaxis], cos_time[:, np.newaxis] ], axis=1) data_per_house.append(time_series_values) if os.path.exists(scaler_path): scaler = joblib.load(scaler_path) print(f"Scaler loaded from {scaler_path}") else: print("Fitting global scaler...") combined_data = np.vstack(data_per_house) scaler = MinMaxScaler(feature_range=(-1, 1)) scaler.fit(combined_data) joblib.dump(scaler, scaler_path) print(f"Scaler saved to {scaler_path}") if self.cache_in_memory: print("Caching normalized data...") self.normalized_data_per_house = [] for series in data_per_house: normalized = scaler.transform(series) tensor_data = torch.from_numpy(normalized).to(dtype=self.dtype) self.normalized_data_per_house.append(tensor_data) else: self.normalized_data_per_house = [] for series in data_per_house: self.normalized_data_per_house.append(scaler.transform(series)) del data_per_house print("Pre-computing mappings...") self.windows_per_house = [(len(d) - self.window_size) // self.step_size + 1 for d in self.normalized_data_per_house] self.cumulative_windows = np.cumsum([0] + self.windows_per_house) self.total_windows = self.cumulative_windows[-1] self.sample_to_house = np.empty(self.total_windows, dtype=np.int32) self.sample_to_local_idx = np.empty(self.total_windows, dtype=np.int32) self.sample_to_day_of_week = np.empty(self.total_windows, dtype=np.int32) self.sample_to_day_of_year = np.empty(self.total_windows, dtype=np.int32) for house_idx in range(self.num_houses): start_global_idx = self.cumulative_windows[house_idx] end_global_idx = self.cumulative_windows[house_idx + 1] num_windows_for_this_house = self.windows_per_house[house_idx] self.sample_to_house[start_global_idx:end_global_idx] = house_idx local_indices = np.arange(num_windows_for_this_house) * self.step_size self.sample_to_local_idx[start_global_idx:end_global_idx] = local_indices house_timestamps = pd.Series(timestamps_per_house[house_idx][local_indices]) self.sample_to_day_of_week[start_global_idx:end_global_idx] = house_timestamps.dt.dayofweek self.sample_to_day_of_year[start_global_idx:end_global_idx] = house_timestamps.dt.dayofyear - 1 print(f"Dataset initialized. Total windows: {self.total_windows} from {self.num_houses} houses.") memory_usage = sum(data.numel() * data.element_size() for data in self.normalized_data_per_house) / 1e6 if self.cache_in_memory else 0 print(f"Memory usage for cached tensors: {memory_usage:.1f} MB") def __len__(self) -> int: return self.total_windows def __getitem__(self, idx: int) -> Tuple[torch.Tensor, Dict[str, torch.Tensor]]: if idx < 0 or idx >= self.total_windows: raise IndexError("Index out of range") house_index = self.sample_to_house[idx] local_start_pos = self.sample_to_local_idx[idx] window_data = self.normalized_data_per_house[house_index][local_start_pos : local_start_pos + self.window_size] conditions = { "house_id": torch.tensor(house_index, dtype=torch.long), "day_of_week": torch.tensor(self.sample_to_day_of_week[idx], dtype=torch.long), "day_of_year": torch.tensor(self.sample_to_day_of_year[idx], dtype=torch.long), } return window_data, conditions def get_memory_usage(self) -> dict: if self.cache_in_memory: tensor_memory = sum(data.numel() * data.element_size() for data in self.normalized_data_per_house) / 1e6 else: tensor_memory = 0 mapping_memory = (self.sample_to_house.nbytes + self.sample_to_local_idx.nbytes) / 1e6 return { 'tensor_cache_mb': tensor_memory, 'mapping_arrays_mb': mapping_memory, 'total_mb': tensor_memory + mapping_memory } class LatentDataset(torch.utils.data.Dataset): def __init__(self, latent_vectors: torch.Tensor, house_ids: torch.Tensor): assert len(latent_vectors) == len(house_ids), "Latent vectors and house IDs must have same length" self.latent_vectors = latent_vectors.contiguous() self.house_ids = house_ids.contiguous() def __len__(self) -> int: return len(self.latent_vectors) def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]: return self.latent_vectors[idx], self.house_ids[idx] if __name__ == "__main__": import time DATA_DIRECTORY = './data/per_house/' if os.path.exists(DATA_DIRECTORY): print("--- Testing Dataset Setup ---") start_time = time.time() dataset = MultiHouseDataset(data_dir=DATA_DIRECTORY, window_size=96, step_size=96) init_time = time.time() - start_time print(f"Dataset initialization: {init_time:.2f}s") print(f"Memory usage: {dataset.get_memory_usage()}") if len(dataset) > 0: first_sample, first_conditions = dataset[0] print(f"\nSample data shape: {first_sample.shape}") print(f"Sample conditions: {first_conditions}") print(f"Total houses: {dataset.num_houses}") else: print(f"ERROR: Data directory not found at '{DATA_DIRECTORY}'. Please create and populate this directory.")