Spaces:
Runtime error
Runtime error
| import numpy as np | |
| try: | |
| import tensorflow.compat.v1 as tf | |
| except ImportError as error: | |
| from logging import warning | |
| warning("{}: AdversarialDebiasing will be unavailable. To install, run:\n" | |
| "pip install 'aif360[AdversarialDebiasing]'".format(error)) | |
| from aif360.algorithms import Transformer | |
| class AdversarialDebiasing(Transformer): | |
| """Adversarial debiasing is an in-processing technique that learns a | |
| classifier to maximize prediction accuracy and simultaneously reduce an | |
| adversary's ability to determine the protected attribute from the | |
| predictions [5]_. This approach leads to a fair classifier as the | |
| predictions cannot carry any group discrimination information that the | |
| adversary can exploit. | |
| References: | |
| .. [5] B. H. Zhang, B. Lemoine, and M. Mitchell, "Mitigating Unwanted | |
| Biases with Adversarial Learning," AAAI/ACM Conference on Artificial | |
| Intelligence, Ethics, and Society, 2018. | |
| """ | |
| def __init__(self, | |
| unprivileged_groups, | |
| privileged_groups, | |
| scope_name, | |
| sess, | |
| seed=None, | |
| adversary_loss_weight=0.1, | |
| num_epochs=50, | |
| batch_size=128, | |
| classifier_num_hidden_units=200, | |
| debias=True): | |
| """ | |
| Args: | |
| unprivileged_groups (tuple): Representation for unprivileged groups | |
| privileged_groups (tuple): Representation for privileged groups | |
| scope_name (str): scope name for the tenforflow variables | |
| sess (tf.Session): tensorflow session | |
| seed (int, optional): Seed to make `predict` repeatable. | |
| adversary_loss_weight (float, optional): Hyperparameter that chooses | |
| the strength of the adversarial loss. | |
| num_epochs (int, optional): Number of training epochs. | |
| batch_size (int, optional): Batch size. | |
| classifier_num_hidden_units (int, optional): Number of hidden units | |
| in the classifier model. | |
| debias (bool, optional): Learn a classifier with or without | |
| debiasing. | |
| """ | |
| super(AdversarialDebiasing, self).__init__( | |
| unprivileged_groups=unprivileged_groups, | |
| privileged_groups=privileged_groups) | |
| self.scope_name = scope_name | |
| self.seed = seed | |
| self.unprivileged_groups = unprivileged_groups | |
| self.privileged_groups = privileged_groups | |
| if len(self.unprivileged_groups) > 1 or len(self.privileged_groups) > 1: | |
| raise ValueError("Only one unprivileged_group or privileged_group supported.") | |
| self.protected_attribute_name = list(self.unprivileged_groups[0].keys())[0] | |
| self.sess = sess | |
| self.adversary_loss_weight = adversary_loss_weight | |
| self.num_epochs = num_epochs | |
| self.batch_size = batch_size | |
| self.classifier_num_hidden_units = classifier_num_hidden_units | |
| self.debias = debias | |
| self.features_dim = None | |
| self.features_ph = None | |
| self.protected_attributes_ph = None | |
| self.true_labels_ph = None | |
| self.pred_labels = None | |
| def _classifier_model(self, features, features_dim, keep_prob): | |
| """Compute the classifier predictions for the outcome variable. | |
| """ | |
| with tf.variable_scope("classifier_model"): | |
| W1 = tf.get_variable('W1', [features_dim, self.classifier_num_hidden_units], | |
| initializer=tf.initializers.glorot_uniform(seed=self.seed1)) | |
| b1 = tf.Variable(tf.zeros(shape=[self.classifier_num_hidden_units]), name='b1') | |
| h1 = tf.nn.relu(tf.matmul(features, W1) + b1) | |
| h1 = tf.nn.dropout(h1, keep_prob=keep_prob, seed=self.seed2) | |
| W2 = tf.get_variable('W2', [self.classifier_num_hidden_units, 1], | |
| initializer=tf.initializers.glorot_uniform(seed=self.seed3)) | |
| b2 = tf.Variable(tf.zeros(shape=[1]), name='b2') | |
| pred_logit = tf.matmul(h1, W2) + b2 | |
| pred_label = tf.sigmoid(pred_logit) | |
| return pred_label, pred_logit | |
| def _adversary_model(self, pred_logits, true_labels): | |
| """Compute the adversary predictions for the protected attribute. | |
| """ | |
| with tf.variable_scope("adversary_model"): | |
| c = tf.get_variable('c', initializer=tf.constant(1.0)) | |
| s = tf.sigmoid((1 + tf.abs(c)) * pred_logits) | |
| W2 = tf.get_variable('W2', [3, 1], | |
| initializer=tf.initializers.glorot_uniform(seed=self.seed4)) | |
| b2 = tf.Variable(tf.zeros(shape=[1]), name='b2') | |
| pred_protected_attribute_logit = tf.matmul(tf.concat([s, s * true_labels, s * (1.0 - true_labels)], axis=1), W2) + b2 | |
| pred_protected_attribute_label = tf.sigmoid(pred_protected_attribute_logit) | |
| return pred_protected_attribute_label, pred_protected_attribute_logit | |
| def fit(self, dataset): | |
| """Compute the model parameters of the fair classifier using gradient | |
| descent. | |
| Args: | |
| dataset (BinaryLabelDataset): Dataset containing true labels. | |
| Returns: | |
| AdversarialDebiasing: Returns self. | |
| """ | |
| if tf.executing_eagerly(): | |
| raise RuntimeError("AdversarialDebiasing does not work in eager " | |
| "execution mode. To fix, add `tf.disable_eager_execution()`" | |
| " to the top of the calling script.") | |
| if self.seed is not None: | |
| np.random.seed(self.seed) | |
| ii32 = np.iinfo(np.int32) | |
| self.seed1, self.seed2, self.seed3, self.seed4 = np.random.randint(ii32.min, ii32.max, size=4) | |
| # Map the dataset labels to 0 and 1. | |
| temp_labels = dataset.labels.copy() | |
| temp_labels[(dataset.labels == dataset.favorable_label).ravel(),0] = 1.0 | |
| temp_labels[(dataset.labels == dataset.unfavorable_label).ravel(),0] = 0.0 | |
| with tf.variable_scope(self.scope_name): | |
| num_train_samples, self.features_dim = np.shape(dataset.features) | |
| # Setup placeholders | |
| self.features_ph = tf.placeholder(tf.float32, shape=[None, self.features_dim]) | |
| self.protected_attributes_ph = tf.placeholder(tf.float32, shape=[None,1]) | |
| self.true_labels_ph = tf.placeholder(tf.float32, shape=[None,1]) | |
| self.keep_prob = tf.placeholder(tf.float32) | |
| # Obtain classifier predictions and classifier loss | |
| self.pred_labels, pred_logits = self._classifier_model(self.features_ph, self.features_dim, self.keep_prob) | |
| pred_labels_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(labels=self.true_labels_ph, logits=pred_logits)) | |
| if self.debias: | |
| # Obtain adversary predictions and adversary loss | |
| pred_protected_attributes_labels, pred_protected_attributes_logits = self._adversary_model(pred_logits, self.true_labels_ph) | |
| pred_protected_attributes_loss = tf.reduce_mean( | |
| tf.nn.sigmoid_cross_entropy_with_logits(labels=self.protected_attributes_ph, logits=pred_protected_attributes_logits)) | |
| # Setup optimizers with learning rates | |
| global_step = tf.Variable(0, trainable=False) | |
| starter_learning_rate = 0.001 | |
| learning_rate = tf.train.exponential_decay(starter_learning_rate, global_step, | |
| 1000, 0.96, staircase=True) | |
| classifier_opt = tf.train.AdamOptimizer(learning_rate) | |
| if self.debias: | |
| adversary_opt = tf.train.AdamOptimizer(learning_rate) | |
| classifier_vars = [var for var in tf.trainable_variables(scope=self.scope_name) if 'classifier_model' in var.name] | |
| if self.debias: | |
| adversary_vars = [var for var in tf.trainable_variables(scope=self.scope_name) if 'adversary_model' in var.name] | |
| # Update classifier parameters | |
| adversary_grads = {var: grad for (grad, var) in adversary_opt.compute_gradients(pred_protected_attributes_loss, | |
| var_list=classifier_vars)} | |
| normalize = lambda x: x / (tf.norm(x) + np.finfo(np.float32).tiny) | |
| classifier_grads = [] | |
| for (grad,var) in classifier_opt.compute_gradients(pred_labels_loss, var_list=classifier_vars): | |
| if self.debias: | |
| unit_adversary_grad = normalize(adversary_grads[var]) | |
| grad -= tf.reduce_sum(grad * unit_adversary_grad) * unit_adversary_grad | |
| grad -= self.adversary_loss_weight * adversary_grads[var] | |
| classifier_grads.append((grad, var)) | |
| classifier_minimizer = classifier_opt.apply_gradients(classifier_grads, global_step=global_step) | |
| if self.debias: | |
| # Update adversary parameters | |
| with tf.control_dependencies([classifier_minimizer]): | |
| adversary_minimizer = adversary_opt.minimize(pred_protected_attributes_loss, var_list=adversary_vars)#, global_step=global_step) | |
| self.sess.run(tf.global_variables_initializer()) | |
| self.sess.run(tf.local_variables_initializer()) | |
| # Begin training | |
| for epoch in range(self.num_epochs): | |
| shuffled_ids = np.random.choice(num_train_samples, num_train_samples, replace=False) | |
| for i in range(num_train_samples//self.batch_size): | |
| batch_ids = shuffled_ids[self.batch_size*i: self.batch_size*(i+1)] | |
| batch_features = dataset.features[batch_ids] | |
| batch_labels = np.reshape(temp_labels[batch_ids], [-1,1]) | |
| batch_protected_attributes = np.reshape(dataset.protected_attributes[batch_ids][:, | |
| dataset.protected_attribute_names.index(self.protected_attribute_name)], [-1,1]) | |
| batch_feed_dict = {self.features_ph: batch_features, | |
| self.true_labels_ph: batch_labels, | |
| self.protected_attributes_ph: batch_protected_attributes, | |
| self.keep_prob: 0.8} | |
| if self.debias: | |
| _, _, pred_labels_loss_value, pred_protected_attributes_loss_vale = self.sess.run([classifier_minimizer, | |
| adversary_minimizer, | |
| pred_labels_loss, | |
| pred_protected_attributes_loss], feed_dict=batch_feed_dict) | |
| if i % 200 == 0: | |
| print("epoch %d; iter: %d; batch classifier loss: %f; batch adversarial loss: %f" % (epoch, i, pred_labels_loss_value, | |
| pred_protected_attributes_loss_vale)) | |
| else: | |
| _, pred_labels_loss_value = self.sess.run( | |
| [classifier_minimizer, | |
| pred_labels_loss], feed_dict=batch_feed_dict) | |
| if i % 200 == 0: | |
| print("epoch %d; iter: %d; batch classifier loss: %f" % ( | |
| epoch, i, pred_labels_loss_value)) | |
| return self | |
| def predict(self, dataset): | |
| """Obtain the predictions for the provided dataset using the fair | |
| classifier learned. | |
| Args: | |
| dataset (BinaryLabelDataset): Dataset containing labels that needs | |
| to be transformed. | |
| Returns: | |
| dataset (BinaryLabelDataset): Transformed dataset. | |
| """ | |
| if self.seed is not None: | |
| np.random.seed(self.seed) | |
| num_test_samples, _ = np.shape(dataset.features) | |
| samples_covered = 0 | |
| pred_labels = [] | |
| while samples_covered < num_test_samples: | |
| start = samples_covered | |
| end = samples_covered + self.batch_size | |
| if end > num_test_samples: | |
| end = num_test_samples | |
| batch_ids = np.arange(start, end) | |
| batch_features = dataset.features[batch_ids] | |
| batch_labels = np.reshape(dataset.labels[batch_ids], [-1,1]) | |
| batch_protected_attributes = np.reshape(dataset.protected_attributes[batch_ids][:, | |
| dataset.protected_attribute_names.index(self.protected_attribute_name)], [-1,1]) | |
| batch_feed_dict = {self.features_ph: batch_features, | |
| self.true_labels_ph: batch_labels, | |
| self.protected_attributes_ph: batch_protected_attributes, | |
| self.keep_prob: 1.0} | |
| pred_labels += self.sess.run(self.pred_labels, feed_dict=batch_feed_dict)[:,0].tolist() | |
| samples_covered += len(batch_features) | |
| # Mutated, fairer dataset with new labels | |
| dataset_new = dataset.copy(deepcopy = True) | |
| dataset_new.scores = np.array(pred_labels, dtype=np.float64).reshape(-1, 1) | |
| dataset_new.labels = (np.array(pred_labels)>0.5).astype(np.float64).reshape(-1,1) | |
| # Map the dataset labels to back to their original values. | |
| temp_labels = dataset_new.labels.copy() | |
| temp_labels[(dataset_new.labels == 1.0).ravel(), 0] = dataset.favorable_label | |
| temp_labels[(dataset_new.labels == 0.0).ravel(), 0] = dataset.unfavorable_label | |
| dataset_new.labels = temp_labels.copy() | |
| return dataset_new | |