TSEditor / utils /data_utils /real_datasets.py
PeterYu's picture
update
2875fe6
import os
import torch
import numpy as np
import pandas as pd
from scipy import io
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import Dataset
from models.model_utils import normalize_to_neg_one_to_one, unnormalize_to_zero_to_one
from utils.masking_utils import noise_mask
class CustomDataset(Dataset):
def __init__(
self,
name,
data_root,
window=64,
proportion=0.8,
save2npy=True,
neg_one_to_one=True,
seed=123,
period="train",
output_dir="./OUTPUT",
predict_length=None,
missing_ratio=None,
style="separate",
distribution="geometric",
mean_mask_length=3,
):
super(CustomDataset, self).__init__()
assert period in ["train", "test"], "period must be train or test."
if period == "train":
assert not (predict_length is not None or missing_ratio is not None), ""
self.period = period
self.name = name
self.pred_len = predict_length
self.missing_ratio = missing_ratio
self.style = style
self.distribution = distribution
self.mean_mask_length = mean_mask_length
self.rawdata, self.scaler = self.read_data(data_root, self.name)
self.dir = os.path.join(output_dir, "samples")
os.makedirs(self.dir, exist_ok=True)
self.window, self.period = window, period
self.len, self.var_num = self.rawdata.shape[0], self.rawdata.shape[-1]
self.sample_num_total = max(self.len - self.window + 1, 0)
self.save2npy = save2npy
self.auto_norm = neg_one_to_one
self.data = self.__normalize(self.rawdata)
train, inference = self.getsamples(self.data, proportion, seed)
self.samples = train if period == "train" else inference
if period == "test":
if missing_ratio is not None:
self.masking = self.mask_data(seed)
elif predict_length is not None:
masks = np.ones(self.samples.shape)
masks[:, -predict_length:, :] = 0
self.masking = masks.astype(bool)
else:
raise NotImplementedError()
self.sample_num = self.samples.shape[0]
print(f"Dataset load from {data_root} with shape {self.samples.shape}")
def getsamples(self, data, proportion, seed):
x = np.zeros((self.sample_num_total, self.window, self.var_num))
for i in range(self.sample_num_total):
start = i
end = i + self.window
x[i, :, :] = data[start:end, :]
train_data, test_data = self.divide(x, proportion, seed)
if self.save2npy:
if 1 - proportion > 0:
np.save(
os.path.join(
self.dir, f"{self.name}_ground_truth_{self.window}_test.npy"
),
self.unnormalize(test_data),
)
np.save(
os.path.join(
self.dir, f"{self.name}_ground_truth_{self.window}_train.npy"
),
self.unnormalize(train_data),
)
if self.auto_norm:
if 1 - proportion > 0:
np.save(
os.path.join(
self.dir, f"{self.name}_norm_truth_{self.window}_test.npy"
),
unnormalize_to_zero_to_one(test_data),
)
np.save(
os.path.join(
self.dir, f"{self.name}_norm_truth_{self.window}_train.npy"
),
unnormalize_to_zero_to_one(train_data),
)
else:
if 1 - proportion > 0:
np.save(
os.path.join(
self.dir, f"{self.name}_norm_truth_{self.window}_test.npy"
),
test_data,
)
np.save(
os.path.join(
self.dir, f"{self.name}_norm_truth_{self.window}_train.npy"
),
train_data,
)
return train_data, test_data
def normalize(self, sq):
d = sq.reshape(-1, self.var_num)
d = self.scaler.transform(d)
if self.auto_norm:
d = normalize_to_neg_one_to_one(d)
return d.reshape(-1, self.window, self.var_num)
def unnormalize(self, sq):
d = self.__unnormalize(sq.reshape(-1, self.var_num))
return d.reshape(-1, self.window, self.var_num)
def __normalize(self, rawdata):
data = self.scaler.transform(rawdata)
if self.auto_norm:
data = normalize_to_neg_one_to_one(data)
return data
def __unnormalize(self, data):
if self.auto_norm:
data = unnormalize_to_zero_to_one(data)
x = data
return self.scaler.inverse_transform(x)
@staticmethod
def divide(data, ratio, seed=2023):
size = data.shape[0]
# Store the state of the RNG to restore later.
st0 = np.random.get_state()
np.random.seed(seed)
regular_train_num = int(np.ceil(size * ratio))
id_rdm = np.random.permutation(size)
# id_rdm = np.arange(size)
regular_train_id = id_rdm[:regular_train_num]
irregular_train_id = id_rdm[regular_train_num:]
regular_data = data[regular_train_id, :]
irregular_data = data[irregular_train_id, :]
# Restore RNG.
np.random.set_state(st0)
return regular_data, irregular_data
@staticmethod
def read_data(filepath, name=""):
"""Reads a single .csv"""
df = pd.read_csv(filepath, header=0)
if name == "etth":
df.drop(df.columns[0], axis=1, inplace=True)
data = df.values
scaler = MinMaxScaler()
scaler = scaler.fit(data)
return data, scaler
def mask_data(self, seed=2023):
masks = np.ones_like(self.samples)
# Store the state of the RNG to restore later.
st0 = np.random.get_state()
np.random.seed(seed)
for idx in range(self.samples.shape[0]):
x = self.samples[idx, :, :] # (seq_length, feat_dim) array
mask = noise_mask(
x,
self.missing_ratio,
self.mean_mask_length,
self.style,
self.distribution,
) # (seq_length, feat_dim) boolean array
masks[idx, :, :] = mask
if self.save2npy:
np.save(
os.path.join(self.dir, f"{self.name}_masking_{self.window}.npy"), masks
)
# Restore RNG.
np.random.set_state(st0)
return masks.astype(bool)
def __getitem__(self, ind):
if self.period == "test":
x = self.samples[ind, :, :] # (seq_length, feat_dim) array
m = self.masking[ind, :, :] # (seq_length, feat_dim) boolean array
return torch.from_numpy(x).float(), torch.from_numpy(m)
x = self.samples[ind, :, :] # (seq_length, feat_dim) array
return torch.from_numpy(x).float()
def __len__(self):
return self.sample_num
class RevenueDataset(CustomDataset):
def __init__(
self,
name,
data_root,
window=64,
proportion=0.8,
save2npy=True,
neg_one_to_one=True,
seed=123,
period="train",
output_dir="./OUTPUT",
predict_length=None,
missing_ratio=None,
style="separate",
distribution="geometric",
mean_mask_length=3,
):
super(CustomDataset, self).__init__()
assert period in ["train", "test"], "period must be train or test."
if period == "train":
assert not (predict_length is not None or missing_ratio is not None), ""
self.period = period
self.name = name
self.pred_len = predict_length
self.missing_ratio = missing_ratio
self.style = style
self.distribution = distribution
self.mean_mask_length = mean_mask_length
self.dir = os.path.join(output_dir, "samples")
os.makedirs(self.dir, exist_ok=True)
self.window, self.period = window, period
# self.len, self.var_num = self.rawdata.shape[0], self.rawdata.shape[-1]
self.rawdata, self.scaler = self.read_data(data_root, self.name)
self.len = len(self.rawdata) // self.window
self.var_num = 3
self.sample_num_total = self.len
# self.sample_num_total = max(self.len - self.window + 1, 0)
self.save2npy = save2npy
self.auto_norm = neg_one_to_one
self.data = self.__normalize(self.rawdata)
train, inference = self.getsamples(self.data, proportion, seed)
self.samples = train if period == "train" else inference
if period == "test":
if missing_ratio is not None:
self.masking = self.mask_data(seed)
elif predict_length is not None:
masks = np.ones(self.samples.shape)
masks[:, -predict_length:, :] = 0
self.masking = masks.astype(bool)
else:
raise NotImplementedError()
self.sample_num = self.samples.shape[0]
print(f"Dataset load from {data_root} with shape {self.samples.shape}")
# @staticmethod
def read_data(self, filepath, name=""):
"""Reads a single .csv"""
df = pd.read_csv(filepath)
min_max_scale = lambda series: (series - series.min()) / (
series.max() - series.min()
)
mean_std_scale = lambda series: (series - series.mean()) / series.std()
moving_average = lambda series: series.rolling(window=7, min_periods=1).mean()
for variable in ["revenue", "download", "au"]:
df[variable] = df.groupby("app_id")[variable].transform(min_max_scale)
# data = df.groupby("app_id").first(min_count=self.window).values
# get the first window days of each app after sorting by date
data = (
df.groupby("app_id").head(self.window)[["download", "revenue", "au"]].values
)
# print(data.shape, self.window)
# print(self.window * len(df["app_id"].unique()))
scaler = MinMaxScaler()
scaler = scaler.fit(data)
return data, scaler
def __normalize(self, rawdata):
data = self.scaler.transform(rawdata)
if self.auto_norm:
data = normalize_to_neg_one_to_one(data)
return data
def __unnormalize(self, data):
if self.auto_norm:
data = unnormalize_to_zero_to_one(data)
x = data
return self.scaler.inverse_transform(x)
class ControlRevenueDataset(RevenueDataset):
def getsamples(self, data, proportion, seed):
x = np.zeros((self.sample_num_total, self.window, self.var_num))
for i in range(self.sample_num_total):
start = i
end = i + self.window
x[i, :, :] = data[start:end, :]
train_data, test_data = self.divide(x, proportion, seed)
# print("Origin split, train ", train_data.shape, "; test", test_data.shape)
# print(train_data.max(), train_data.min())
import random
# data agumentation
# plt five times aug data[0] with 3 channel
# import matplotlib.pyplot as plt
# sub = plt.subplot(111)
# sub.plot(train_data[0, :, 0])
# sub.plot(train_data[0, :, 1])
# sub.plot(train_data[0, :, 2])
# plt.show()
aug_data = []
# for delta in np.linspace(-.3, 0.5, 4):
for delta in np.linspace(-0.3, 0.3, 4):
print(delta)
random.seed(2023)
tmp = train_data.copy()
tmp[:, :, 0] += np.random.normal(delta, 2, tmp.shape[1]) / 10
tmp[:, :, 1] += np.random.normal(delta, 0.15, tmp.shape[1]) / 10
tmp[:, :, 2] += np.random.normal(delta, 0.1, tmp.shape[1]) / 10
for c in range(3):
# min max resacele
tmp[:, :, c] = (
(tmp[:, :, c] - tmp[:, :, c].min())
/ (tmp[:, :, c].max() - tmp[:, :, c].min())
- 0.5
) * 2
aug_data.append(tmp)
# sub = plt.subplot(111)
# sub.plot(tmp[0, :, 0])
# sub.plot(tmp[0, :, 1])
# sub.plot(tmp[0, :, 2])
# plt.show()
train_data = np.concatenate([train_data] + aug_data, axis=0).clip(-1, 1)
if self.save2npy:
if 1 - proportion > 0:
np.save(
os.path.join(
self.dir, f"{self.name}_ground_truth_{self.window}_test.npy"
),
self.unnormalize(test_data),
)
np.save(
os.path.join(
self.dir, f"{self.name}_ground_truth_{self.window}_train.npy"
),
self.unnormalize(train_data),
)
if self.auto_norm:
if 1 - proportion > 0:
np.save(
os.path.join(
self.dir, f"{self.name}_norm_truth_{self.window}_test.npy"
),
unnormalize_to_zero_to_one(test_data),
)
np.save(
os.path.join(
self.dir, f"{self.name}_norm_truth_{self.window}_train.npy"
),
unnormalize_to_zero_to_one(train_data),
)
else:
if 1 - proportion > 0:
np.save(
os.path.join(
self.dir, f"{self.name}_norm_truth_{self.window}_test.npy"
),
test_data,
)
np.save(
os.path.join(
self.dir, f"{self.name}_norm_truth_{self.window}_train.npy"
),
train_data,
)
# print("Split, train ", train_data.shape, "; test", test_data.shape)
return train_data, test_data
def __normalize(self, rawdata):
data = self.scaler.transform(rawdata)
if self.auto_norm:
data = normalize_to_neg_one_to_one(data)
return data
def __unnormalize(self, data):
if self.auto_norm:
data = unnormalize_to_zero_to_one(data)
x = data
return self.scaler.inverse_transform(x)
class fMRIDataset(CustomDataset):
def __init__(self, proportion=1.0, **kwargs):
super().__init__(proportion=proportion, **kwargs)
@staticmethod
def read_data(filepath, name=""):
"""Reads a single .csv"""
data = io.loadmat(filepath + "/sim4.mat")["ts"]
scaler = MinMaxScaler()
scaler = scaler.fit(data)
return data, scaler