| from transformers import PreTrainedModel, PretrainedConfig |
| |
|
|
| import pandas as pd |
| import numpy as np |
| import os |
| import torch |
| import torch.nn as nn |
|
|
|
|
| |
| |
| |
|
|
| |
| class Encoder(nn.Module): |
| def __init__(self, input_dim: int, hidden_dim: int, latent_dim: int): |
| super(Encoder, self).__init__() |
| self.encoder = nn.Sequential( |
| nn.Linear(input_dim, hidden_dim), |
| nn.BatchNorm1d(hidden_dim), |
| nn.ReLU(), |
| nn.Dropout(0.3), |
| nn.Linear(hidden_dim, latent_dim), |
| nn.ReLU(), |
| nn.BatchNorm1d(latent_dim) |
| ) |
|
|
| |
| def forward(self, x): |
| return self.encoder(x) |
|
|
|
|
| |
| |
| |
|
|
| |
| class Decoder(nn.Module): |
| def __init__(self, latent_dim: int, hidden_dim: int, target_dim: int): |
| super(Decoder, self).__init__() |
| self.decoder = nn.Sequential( |
| nn.Linear(latent_dim, hidden_dim), |
| nn.Dropout(0.3), |
| nn.Linear(hidden_dim, target_dim), |
| ) |
|
|
| def forward(self, x): |
| return self.decoder(x) |
| |
| |
| |
| |
|
|
|
|
| class DomainClassifier(nn.Module): |
| """ Distinguish the domain of the input. |
| """ |
|
|
| def __init__(self, input_dim: int, n_class: int): |
| super(DomainClassifier, self).__init__() |
|
|
| |
| self.domain_classifier = nn.Sequential( |
| nn.Linear(input_dim, input_dim), |
| nn.ReLU(), |
| nn.Linear(input_dim, n_class) |
| ) |
|
|
| def forward(self, x): |
| return self.domain_classifier(x) |
| |
| |
| |
| |
|
|
| class GradientReversalFunction(torch.autograd.Function): |
| @staticmethod |
| def forward(ctx, x, lambd=1.0): |
| ctx.lambd = lambd |
| return x.view_as(x) |
|
|
| @staticmethod |
| def backward(ctx, grad_output): |
| return grad_output.neg() * ctx.lambd, None |
|
|
| class GradientReversalLayer(nn.Module): |
| def __init__(self, lambd=1.0): |
| super(GradientReversalLayer, self).__init__() |
| self.lambd = lambd |
|
|
| def forward(self, x): |
| return GradientReversalFunction.apply(x, self.lambd) |
| |
| |
| |
| |
|
|
| class Params: |
|
|
| def __init__( |
| self, |
| input=None, |
| output="imputed", |
| ref=None, |
| output_folder=f"{os.getcwd()}/results/", |
| header=None, |
| num_iterations=2001, |
| batch_size=128, |
| alpha=10, |
| miss_rate=0.1, |
| hint_rate=0.9, |
| lr_D=0.001, |
| lr_G=0.001, |
| override=0, |
| output_all=0, |
| ): |
| self.input = input |
| self.output = output |
| self.output_folder = output_folder |
| self.ref = ref |
| self.header = header |
| self.num_iterations = num_iterations |
| self.batch_size = batch_size |
| self.alpha = alpha |
| self.miss_rate = miss_rate |
| self.hint_rate = hint_rate |
| self.lr_D = lr_D |
| self.lr_G = lr_G |
| self.override = override |
| self.output_all = output_all |
|
|
|
|
| |
| |
| |
|
|
| class Metrics: |
| def __init__(self, hypers: Params): |
|
|
| self.hypers = hypers |
|
|
| self.loss_D = np.zeros(hypers.num_iterations) |
| self.loss_D_evaluate = np.zeros(hypers.num_iterations) |
|
|
| self.loss_G = np.zeros(hypers.num_iterations) |
| self.loss_G_evaluate = np.zeros(hypers.num_iterations) |
|
|
| self.loss_MSE_train = np.zeros(hypers.num_iterations) |
| self.loss_MSE_train_evaluate = np.zeros(hypers.num_iterations) |
|
|
| self.loss_MSE_test = np.zeros(hypers.num_iterations) |
|
|
| self.cpu = np.zeros(hypers.num_iterations) |
| self.cpu_evaluate = np.zeros(hypers.num_iterations) |
|
|
| self.ram = np.zeros(hypers.num_iterations) |
| self.ram_evaluate = np.zeros(hypers.num_iterations) |
|
|
| self.ram_percentage = np.zeros(hypers.num_iterations) |
| self.ram_percentage_evaluate = np.zeros(hypers.num_iterations) |
|
|
| self.data_imputed = None |
| self.ref_data_imputed = None |
|
|
|
|
| |
| |
| |
|
|
| def generate_hint(mask, hint_rate): |
| hint_mask = generate_mask(mask, 1 - hint_rate) |
| hint = mask * hint_mask |
|
|
| return hint |
|
|
|
|
| def generate_mask(data, miss_rate): |
| dim = data.shape[1] |
| size = data.shape[0] |
| A = np.random.uniform(0.0, 1.0, size=(size, dim)) |
| B = A > miss_rate |
| mask = 1.0 * B |
|
|
| return mask |
|
|
| |
| |
| |
|
|
| class Network: |
| def __init__(self, hypers: Params, net_G, net_D, metrics: Metrics): |
|
|
| |
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| for name, param in net_D.named_parameters(): |
| if "weight" in name: |
| nn.init.xavier_normal_(param) |
| |
|
|
| for name, param in net_G.named_parameters(): |
| if "weight" in name: |
| nn.init.xavier_normal_(param) |
| |
|
|
| self.hypers = hypers |
| self.net_G = net_G |
| self.net_D = net_D |
| self.metrics = metrics |
|
|
| self.optimizer_D = torch.optim.Adam(net_D.parameters(), lr=hypers.lr_D) |
| self.optimizer_G = torch.optim.Adam(net_G.parameters(), lr=hypers.lr_G) |
|
|
| |
|
|
| def generate_sample(cls, data, mask): |
| dim = data.shape[1] |
| size = data.shape[0] |
|
|
| Z = torch.rand((size, dim)) * 0.01 |
| missing_data_with_noise = mask * data + (1 - mask) * Z |
| input_G = torch.cat((missing_data_with_noise, mask), 1).float() |
|
|
| return cls.net_G(input_G) |
|
|
| |
| |
| |
|
|
| class GAIN_DANN(nn.Module): |
| def __init__(self, input_dim: int, latent_dim: int, n_class: int, params: Params, metrics: Metrics, hint_rate = 0.9): |
| super(GAIN_DANN, self).__init__() |
| self.encoder = Encoder(input_dim=input_dim, hidden_dim=128, latent_dim=latent_dim) |
| |
| |
| self.grl = GradientReversalLayer() |
|
|
| self.domain_classifier = DomainClassifier(latent_dim, n_class=n_class) |
| print("latent_dim1:", latent_dim) |
| |
| self.gain = Network(hypers=params, |
| net_G= nn.Sequential( |
| nn.Linear(latent_dim* 2, latent_dim), |
| nn.ReLU(), |
| nn.Linear(latent_dim, latent_dim), |
| nn.ReLU(), |
| nn.Linear(latent_dim, latent_dim), |
| nn.Sigmoid(), |
| ), |
| net_D= nn.Sequential( |
| nn.Linear(latent_dim * 2, latent_dim), |
| nn.ReLU(), |
| nn.Linear(latent_dim, latent_dim), |
| nn.ReLU(), |
| nn.Linear(latent_dim, latent_dim), |
| nn.Sigmoid(), |
| ), |
| metrics=metrics) |
| |
| self.decoder = Decoder(latent_dim=latent_dim, hidden_dim=128, target_dim=input_dim) |
| print("latent_dim2:",latent_dim) |
|
|
|
|
|
|
| def forward(self, x): |
| """ |
| Forward pass for GAIN_DANN. |
| Handles missing values (NaNs) by replacing them with noise and using a mask. |
| """ |
|
|
| |
|
|
| x_filled = x.clone() |
| x_filled[torch.isnan(x_filled)] = 0 |
|
|
| mask = (~torch.isnan(x)).float() |
|
|
| |
| x_encoded = self.encoder(x_filled) |
| x_grl = self.grl(x_encoded) |
|
|
| |
| sample = self.gain.generate_sample(x_grl, mask) |
| x_imputed = x_encoded * mask + sample * (1 - mask) |
|
|
| |
| x_domain = self.domain_classifier(x_encoded) |
| x_domain = torch.argmax(x_domain, dim=1) |
|
|
| |
| x_reconstructed = self.decoder(x_imputed) |
|
|
| |
|
|
| return x_reconstructed, x_domain |
| |
|
|
|
|
| |
| |
| |
|
|
|
|
| class GainDANNConfig(PretrainedConfig): |
| model_type = "gain_dann" |
|
|
| def __init__(self, input_dim=3013, latent_dim=3013, n_class=17, hint_rate=0.9, lr_D=0.001, lr_G=0.001, |
| num_iterations=2001, batch_size=128, alpha=10, miss_rate=0.1, override=0, output_all=0, **kwargs): |
| super().__init__(**kwargs) |
| self.input_dim = input_dim |
| self.latent_dim = latent_dim |
| self.n_class = n_class |
| self.hint_rate = hint_rate |
| self.lr_D = lr_D |
| self.lr_G = lr_G |
| self.num_iterations = num_iterations |
| self.batch_size = batch_size |
| self.alpha = alpha |
| self.miss_rate = miss_rate |
| self.override = override |
| self.output_all = output_all |
|
|
|
|
|
|
|
|
| class GainDANN(PreTrainedModel): |
| config_class = GainDANNConfig |
|
|
| def __init__(self, config): |
| super().__init__(config) |
| params = Params(lr_D=config.lr_D, |
| lr_G=config.lr_G, |
| hint_rate=config.hint_rate, |
| num_iterations=getattr(config, "num_iterations", 2001), |
| batch_size=getattr(config, "batch_size", 128), |
| alpha=getattr(config, "alpha", 10), |
| miss_rate=getattr(config, "miss_rate", 0.1), |
| override=getattr(config, "override", 0), |
| output_all=getattr(config, "output_all", 0)) |
| metrics = Metrics(params) |
| self.model = GAIN_DANN( |
| input_dim=config.input_dim, |
| latent_dim=config.latent_dim, |
| n_class=config.n_class, |
| params=params, |
| metrics=metrics, |
| hint_rate=config.hint_rate |
| ) |
|
|
| def forward(self, x): |
| return self.model(x) |
| |
|
|
| |
|
|