File size: 7,801 Bytes
f884fa5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
import torch
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import joblib
import os
from typing import Tuple, Dict
import warnings
warnings.filterwarnings('ignore')
class MultiHouseDataset(torch.utils.data.Dataset):
def __init__(self, data_dir: str, window_size: int = 96, step_size: int = 1,
scaler_path: str = 'global_scaler.gz', cache_in_memory: bool = True,
dtype: torch.dtype = torch.float32, limit_to_one_year: bool = True):
self.window_size = window_size
self.step_size = step_size
self.cache_in_memory = cache_in_memory
self.dtype = dtype
self.limit_to_one_year = limit_to_one_year
all_files = sorted([f for f in os.listdir(data_dir) if f.endswith('.csv')])
print(f"Found {len(all_files)} house files in '{data_dir}'.")
self.num_houses = len(all_files)
print("Reading house data...")
if self.limit_to_one_year:
print("INFO: Limiting data to the first year (17,520 samples) for each house.")
data_per_house = []
timestamps_per_house = []
SAMPLES_PER_YEAR = 17520
for filename in all_files:
df = pd.read_csv(os.path.join(data_dir, filename), parse_dates=['timestamp'])
timestamps_per_house.append(df['timestamp'].values)
time_series_values = df[['grid_usage', 'solar_generation']].values.astype(np.float32)
if self.limit_to_one_year:
time_series_values = time_series_values[:SAMPLES_PER_YEAR]
num_timesteps = len(time_series_values)
timesteps_of_day = np.arange(num_timesteps) % 48
sin_time = np.sin(2 * np.pi * timesteps_of_day / 48.0).astype(np.float32)
cos_time = np.cos(2 * np.pi * timesteps_of_day / 48.0).astype(np.float32)
time_series_values = np.concatenate([
time_series_values,
sin_time[:, np.newaxis],
cos_time[:, np.newaxis]
], axis=1)
data_per_house.append(time_series_values)
if os.path.exists(scaler_path):
scaler = joblib.load(scaler_path)
print(f"Scaler loaded from {scaler_path}")
else:
print("Fitting global scaler...")
combined_data = np.vstack(data_per_house)
scaler = MinMaxScaler(feature_range=(-1, 1))
scaler.fit(combined_data)
joblib.dump(scaler, scaler_path)
print(f"Scaler saved to {scaler_path}")
if self.cache_in_memory:
print("Caching normalized data...")
self.normalized_data_per_house = []
for series in data_per_house:
normalized = scaler.transform(series)
tensor_data = torch.from_numpy(normalized).to(dtype=self.dtype)
self.normalized_data_per_house.append(tensor_data)
else:
self.normalized_data_per_house = []
for series in data_per_house:
self.normalized_data_per_house.append(scaler.transform(series))
del data_per_house
print("Pre-computing mappings...")
self.windows_per_house = [(len(d) - self.window_size) // self.step_size + 1 for d in self.normalized_data_per_house]
self.cumulative_windows = np.cumsum([0] + self.windows_per_house)
self.total_windows = self.cumulative_windows[-1]
self.sample_to_house = np.empty(self.total_windows, dtype=np.int32)
self.sample_to_local_idx = np.empty(self.total_windows, dtype=np.int32)
self.sample_to_day_of_week = np.empty(self.total_windows, dtype=np.int32)
self.sample_to_day_of_year = np.empty(self.total_windows, dtype=np.int32)
for house_idx in range(self.num_houses):
start_global_idx = self.cumulative_windows[house_idx]
end_global_idx = self.cumulative_windows[house_idx + 1]
num_windows_for_this_house = self.windows_per_house[house_idx]
self.sample_to_house[start_global_idx:end_global_idx] = house_idx
local_indices = np.arange(num_windows_for_this_house) * self.step_size
self.sample_to_local_idx[start_global_idx:end_global_idx] = local_indices
house_timestamps = pd.Series(timestamps_per_house[house_idx][local_indices])
self.sample_to_day_of_week[start_global_idx:end_global_idx] = house_timestamps.dt.dayofweek
self.sample_to_day_of_year[start_global_idx:end_global_idx] = house_timestamps.dt.dayofyear - 1
print(f"Dataset initialized. Total windows: {self.total_windows} from {self.num_houses} houses.")
memory_usage = sum(data.numel() * data.element_size() for data in self.normalized_data_per_house) / 1e6 if self.cache_in_memory else 0
print(f"Memory usage for cached tensors: {memory_usage:.1f} MB")
def __len__(self) -> int:
return self.total_windows
def __getitem__(self, idx: int) -> Tuple[torch.Tensor, Dict[str, torch.Tensor]]:
if idx < 0 or idx >= self.total_windows:
raise IndexError("Index out of range")
house_index = self.sample_to_house[idx]
local_start_pos = self.sample_to_local_idx[idx]
window_data = self.normalized_data_per_house[house_index][local_start_pos : local_start_pos + self.window_size]
conditions = {
"house_id": torch.tensor(house_index, dtype=torch.long),
"day_of_week": torch.tensor(self.sample_to_day_of_week[idx], dtype=torch.long),
"day_of_year": torch.tensor(self.sample_to_day_of_year[idx], dtype=torch.long),
}
return window_data, conditions
def get_memory_usage(self) -> dict:
if self.cache_in_memory:
tensor_memory = sum(data.numel() * data.element_size() for data in self.normalized_data_per_house) / 1e6
else:
tensor_memory = 0
mapping_memory = (self.sample_to_house.nbytes + self.sample_to_local_idx.nbytes) / 1e6
return {
'tensor_cache_mb': tensor_memory,
'mapping_arrays_mb': mapping_memory,
'total_mb': tensor_memory + mapping_memory
}
class LatentDataset(torch.utils.data.Dataset):
def __init__(self, latent_vectors: torch.Tensor, house_ids: torch.Tensor):
assert len(latent_vectors) == len(house_ids), "Latent vectors and house IDs must have same length"
self.latent_vectors = latent_vectors.contiguous()
self.house_ids = house_ids.contiguous()
def __len__(self) -> int:
return len(self.latent_vectors)
def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
return self.latent_vectors[idx], self.house_ids[idx]
if __name__ == "__main__":
import time
DATA_DIRECTORY = './data/per_house/'
if os.path.exists(DATA_DIRECTORY):
print("--- Testing Dataset Setup ---")
start_time = time.time()
dataset = MultiHouseDataset(data_dir=DATA_DIRECTORY, window_size=96, step_size=96)
init_time = time.time() - start_time
print(f"Dataset initialization: {init_time:.2f}s")
print(f"Memory usage: {dataset.get_memory_usage()}")
if len(dataset) > 0:
first_sample, first_conditions = dataset[0]
print(f"\nSample data shape: {first_sample.shape}")
print(f"Sample conditions: {first_conditions}")
print(f"Total houses: {dataset.num_houses}")
else:
print(f"ERROR: Data directory not found at '{DATA_DIRECTORY}'. Please create and populate this directory.")
|