| import keras | |
| from keras.datasets import mnist, cifar10, cifar100 | |
| from keras import layers | |
| from keras.models import Sequential | |
| from keras.layers import Dense, Dropout, Flatten | |
| from keras.layers import Conv2D, MaxPooling2D | |
| from keras import backend as K | |
| import cv2 | |
| import tensorflow as tf | |
| #import GPy | |
| #import gpflow, gpflux | |
| import time | |
| from tensorflow.keras.applications import VGG16,ResNet50 | |
| from keras import regularizers | |
| import numpy as np | |
| import sklearn | |
| from sklearn.metrics import classification_report | |
| from sklearn.metrics import accuracy_score | |
| import sklearn.gaussian_process as gp | |
| from sklearn.gaussian_process import GaussianProcessClassifier | |
| from sklearn.gaussian_process.kernels import RBF, WhiteKernel | |
| import matplotlib.pyplot as plt | |
| # import official.nlp.modeling.layers as nlp_layers | |
| # from official.nlp.modeling.layers import SpectralNormalization | |
| import gp_layer | |
| from sklearn.metrics import roc_auc_score | |
| #%matplotlib inline | |
| import os | |
| os.environ["CUDA_VISIBLE_DEVICES"] = "0" | |
| #Load training data | |
| (X_train, y_train), (X_test, y_test) = cifar10.load_data() | |
| X_train = X_train.astype('float32') | |
| X_test = X_test.astype('float32') | |
| X_train /= 255 | |
| X_test /= 255 | |
| num_classes = 10 | |
| y_train_one_hot = keras.utils.to_categorical(y_train, num_classes) | |
| y_test_one_hot = keras.utils.to_categorical(y_test, num_classes) | |
| print('x_train shape:', X_train.shape) | |
| print(X_train.shape[0], 'train samples') | |
| print(X_test.shape[0], 'test samples') | |
| # kernel = gpflow.kernels.SquaredExponential() | |
| # inducing_variable = gpflow.inducing_variables.InducingPoints( | |
| # np.linspace(0, 1, 128*100).reshape(-1, 128) | |
| # ) | |
| # mean = gpflow.mean_functions.Zero() | |
| # invlink = gpflow.likelihoods.RobustMax(10) | |
| # likelihood = gpflow.likelihoods.MultiClass(10, invlink=invlink) | |
| # likelihood_container = gpflux.layers.TrackableLayer() | |
| # likelihood_container.likelihood = likelihood | |
| # loss = gpflux.losses.LikelihoodLoss(likelihood) | |
| gp_layer = gp_layer.RandomFeatureGaussianProcess(units=10, | |
| num_inducing=2048, | |
| normalize_input=True, | |
| scale_random_features=False, | |
| gp_cov_momentum=-1, | |
| return_gp_cov=True) | |
| def feature_extractor(inputs): | |
| feature_extractor = tf.keras.applications.resnet.ResNet50(input_shape=(224, 224, 3), | |
| include_top=False, | |
| weights='imagenet')(inputs) | |
| return feature_extractor | |
| def classifier(inputs): | |
| x = tf.keras.layers.GlobalAveragePooling2D()(inputs) | |
| x = tf.keras.layers.Flatten()(x) | |
| # x = tf.keras.layers.Dropout(0.3)(x) | |
| # x = tf.keras.layers.Dense(256, activation="relu")(x) | |
| # x = tf.keras.layers.Dense(128, activation="relu")(x) | |
| # x = tf.keras.layers.Dropout(0.1)(x) | |
| #x = tf.keras.layers.Dense(10, activation="softmax", name="classification")(x) | |
| #x = tf.keras.layers.SpectralNormalization(tf.keras.layers.Dense(512, activation='relu'))(x) | |
| x = (tf.keras.layers.Dense(256, activation='relu'))(x) | |
| x = (tf.keras.layers.Dense(128, activation='relu'))(x) | |
| x = (tf.keras.layers.Dense(10, activation='linear'))(x) | |
| # outputs = gpflux.layers.GPLayer(mean_function=mean, | |
| # kernel=kernel, | |
| # inducing_variable=inducing_variable, | |
| # num_data=X_train.shape[0], | |
| # num_latent_gps=10)(x) | |
| #outputs, sd = gp_layer(x) | |
| return x | |
| def final_model(inputs): | |
| resize = tf.keras.layers.UpSampling2D(size=(7,7))(inputs) | |
| resnet_feature_extractor = feature_extractor(resize) | |
| classification_output = classifier(resnet_feature_extractor) | |
| return classification_output | |
| # lr_schedule = tf.keras.optimizers.schedules.InverseTimeDecay( | |
| # 0.001, | |
| # decay_steps=20*50, | |
| # decay_rate=1, | |
| # staircase=False) | |
| # def get_optimizer(): | |
| # return tf.keras.optimizers.Adam(lr_schedule) | |
| def define_compile_model(): | |
| inputs = tf.keras.layers.Input(shape=(32,32,3)) | |
| classification_output = final_model(inputs) | |
| model = tf.keras.Model(inputs=inputs, outputs = classification_output) | |
| # model.compile(optimizer=get_optimizer(), | |
| # loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), | |
| # metrics = ['accuracy']) | |
| return model | |
| # inputs = tf.keras.Input(shape=(28, 28, 1)) | |
| # x = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same')(inputs) | |
| # x = tf.keras.layers.MaxPooling2D((1, 1))(x) | |
| # x = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', padding='same')(x) | |
| # x = tf.keras.layers.MaxPooling2D((2, 2))(x) | |
| # x = tf.keras.layers.Conv2D(256, (3, 3), activation='relu', padding='same')(x) | |
| # x = tf.keras.layers.MaxPooling2D((2, 2))(x) | |
| # x = tf.keras.layers.Conv2D(512, (3, 3), activation='relu', padding='same')(x) | |
| # x = tf.keras.layers.MaxPooling2D((2, 2))(x) | |
| # x = tf.keras.layers.Conv2D(256, (3, 3), activation='relu', padding='same')(x) | |
| # x = tf.keras.layers.MaxPooling2D((2, 2))(x) | |
| # x = tf.keras.layers.Flatten()(x) | |
| # #x = tf.keras.layers.Dropout(0.5)(x) | |
| # x = tf.keras.layers.Dense(256, activation='linear')(x) | |
| # #x = tf.keras.layers.Dense(128, activation='linear')(x) | |
| # #l = tf.keras.layers.Dense(10, activation='linear')(x) | |
| # gp_output, gp_std= gp_layer(x) | |
| # model = tf.keras.Model(inputs=inputs, outputs=gp_output) | |
| model = define_compile_model() | |
| model.summary() | |
| # t = tf.expand_dims(X_train[0], axis=0) | |
| # model(t)[0] | |
| # from tensorflow.keras.callbacks import ReduceLROnPlateau | |
| # lr_schedule = tf.keras.optimizers.schedules.InverseTimeDecay( | |
| # 0.001, | |
| # decay_steps=20*50, | |
| # decay_rate=1, | |
| # staircase=False) | |
| # def get_optimizer(): | |
| # return tf.keras.optimizers.Adam(lr_schedule) | |
| # #Compiling the model | |
| # model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), optimizer = get_optimizer(), metrics=['accuracy']) | |
| # # early_stop = EarlyStopping(monitor='val_loss',patience=5) | |
| # # checkpoint = ModelCheckpoint("./Best_model/",save_best_only=True,) | |
| # rlrp = ReduceLROnPlateau(monitor='loss', factor=0.4, verbose=0, patience=2, min_lr=0.0000001) | |
| # # # # Train the model | |
| # model.fit(X_train, y_train, batch_size=32, epochs=20, validation_data=(X_test, y_test), callbacks=[rlrp]) | |
| # predictions = np.argmax(model.predict(X_test), axis=1) | |
| # print(classification_report(y_test, predictions)) | |
| # print(model(X_train[0].reshape(1,32,32,3))) | |
| #t = X_train[0].reshape(1,32,32,3) | |
| #model.predict(t) | |
| def relu_evidence(logits): | |
| return tf.nn.relu(logits) | |
| def exp_evidence(logits): | |
| return tf.exp(tf.clip_by_value(logits, -10, 10)) | |
| def softplus_evidence(logits): | |
| return tf.nn.softplus(((logits + 1)**2) / 2) | |
| # # # def log_marginal_likelihood_gp_layer(model, X_train, y_train): | |
| # # # """Compute the log marginal likelihood for a GP layer within the model.""" | |
| # # # gp_layer = model.layers[-1] | |
| # # # kernel = gp_layer.kernel | |
| # # # inducing_points = gp_layer.inducing_variable.Z.numpy() | |
| # # # mean = gp_layer.mean_function | |
| # # # y_train_subset = y_train[:inducing_points.shape[0]].astype(np.float64) # Ensure float64 dtype | |
| # # # K = kernel.K(inducing_points) | |
| # # # K += np.eye(inducing_points.shape[0]) * 1e-6 | |
| # # # L = tf.linalg.cholesky(K) | |
| # # # alpha = tf.linalg.cholesky_solve(L, y_train_subset) | |
| # # # log_likelihood = -0.5 * tf.reduce_sum(tf.matmul(tf.transpose(y_train_subset), alpha)) - tf.reduce_sum(tf.math.log(tf.linalg.diag_part(L))) - 0.5 * inducing_points.shape[0] * np.log(2 * np.pi) | |
| # # # return tf.squeeze(log_likelihood) | |
| def kl_divergence(alpha): | |
| # KL divergence for Dirichlet distribution | |
| beta = tf.ones_like(alpha) | |
| S_alpha = tf.reduce_sum(alpha, axis=1, keepdims=True) | |
| S_beta = tf.reduce_sum(beta, axis=1, keepdims=True) | |
| lnB = tf.math.lgamma(S_alpha) - tf.reduce_sum(tf.math.lgamma(alpha), axis=1, keepdims=True) | |
| lnB_uni = tf.reduce_sum(tf.math.lgamma(beta), axis=1, keepdims=True) - tf.math.lgamma(S_beta) | |
| dg0 = tf.math.digamma(S_alpha) | |
| dg1 = tf.math.digamma(alpha) | |
| kl = tf.reduce_sum((alpha - beta) * (dg1 - dg0), axis=1, keepdims=True) + lnB + lnB_uni | |
| return kl | |
| def loglikelihood_loss(y, alpha): | |
| S = tf.reduce_sum(alpha, axis=1, keepdims=True) | |
| S = tf.cast(S, tf.float32) | |
| y = tf.cast(y, tf.float32) | |
| alpha = tf.cast(alpha, tf.float32) | |
| loglikelihood_err = tf.reduce_sum(tf.square(y - (alpha / S)), axis=1, keepdims=True) | |
| loglikelihood_var = tf.reduce_sum(alpha * (S - alpha) / (S * S * (S + 1)), axis=1, keepdims=True) | |
| loglikelihood = loglikelihood_err + loglikelihood_var | |
| return loglikelihood | |
| def mse_loss(y, alpha, epoch_num, num_classes=10, annealing_step=10): | |
| loglikelihood = loglikelihood_loss(y, alpha) | |
| annealing_coef = tf.minimum( | |
| tf.constant(1.0, dtype=tf.float32), | |
| tf.cast(epoch_num / annealing_step, dtype=tf.float32), | |
| ) | |
| kl_alpha = (alpha - 1) * (1 - y) + 1 | |
| kl_div = annealing_coef * kl_divergence(kl_alpha) | |
| S = tf.reduce_sum(alpha, axis=1, keepdims=True) | |
| vacuity = num_classes / tf.stop_gradient(S) | |
| vacuity = tf.identity(vacuity, name="vacuity") | |
| # gp_layer = model.layers[-1] | |
| # ker = gp_layer.kernel | |
| # ind = gp_layer.inducing_variable | |
| # K = ker.K(inducing_variable.Z) # Kernel matrix at inducing points | |
| # reg = tf.sqrt(tf.reduce_sum(tf.square(K))).numpy()*0.001 | |
| #reg = log_marginal_likelihood_gp_layer(model, X_train, y_train_one_hot) | |
| #reg = tf.cast(reg, dtype=tf.float32) | |
| return loglikelihood + kl_div, vacuity | |
| # # # def edl_loss(func, y, alpha, epoch_num, num_classes, annealing_step, device=None): | |
| # # # y = tf.convert_to_tensor(y, dtype=tf.float32) | |
| # # # alpha = tf.convert_to_tensor(alpha, dtype=tf.float32) | |
| # # # S = tf.reduce_sum(alpha, axis=1, keepdims=True) | |
| # # # A = tf.reduce_sum(y * (func(S) - func(alpha)), axis=1, keepdims=True) | |
| # # # annealing_coef = tf.minimum( | |
| # # # tf.constant(1.0, dtype=tf.float32), | |
| # # # tf.constant(epoch_num / annealing_step, dtype=tf.float32), | |
| # # # ) | |
| # # # kl_alpha = (alpha - 1) * (1 - y) + 1 | |
| # # # kl_div = annealing_coef * kl_divergence(kl_alpha) | |
| # # # S = tf.reduce_sum(alpha, axis=1, keepdims=True) | |
| # # # with tf.GradientTape() as tape: | |
| # # # vacuity = num_classes / tf.stop_gradient(S) | |
| # # # return A + kl_div, vacuity | |
| def compute_metrics(logits, Y, epoch, global_step, annealing_step, lmb=0.0005): | |
| logits = tf.cast(logits, tf.float32) | |
| evidence = exp_evidence(logits) | |
| alpha = evidence + 1 | |
| alpha = tf.cast(alpha, tf.float32) | |
| Y_onehot = tf.one_hot(Y, depth=10) | |
| K = 10 | |
| if len(alpha.shape) == 1: | |
| u = K / tf.reduce_sum(alpha) | |
| else: | |
| u = K / tf.reduce_sum(alpha, axis=1, keepdims=True) | |
| #u = K / tf.reduce_sum(alpha, axis=1, keepdims=True) # uncertainty | |
| prob = alpha / tf.reduce_sum(alpha, axis=1, keepdims=True) | |
| mse_loss_val, vacuity = mse_loss(Y_onehot, alpha, epoch, num_classes, annealing_step) | |
| loss = tf.reduce_mean(mse_loss_val) | |
| output_correct = logits * Y_onehot | |
| #print(vacuity * output_correct) | |
| loss -= (tf.reduce_sum(vacuity * output_correct) / tf.cast(tf.shape(output_correct)[0], tf.float32)) | |
| #print(loss) | |
| # loss, vacuity = mse_loss(Y_onehot, alpha, epoch) | |
| # l2 = model.l2_loss_last_layers() | |
| # loss = tf.reduce_mean(loss) + lmb * l2 | |
| return loss, u, prob | |
| x_train = np.array(X_train) | |
| y_train = np.array(y_train) | |
| optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001) | |
| model.compile(optimizer=optimizer) | |
| num_epochs = 15 | |
| batch_size = 32 | |
| train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)) | |
| train_dataset = train_dataset.shuffle(buffer_size=len(X_train)).batch(batch_size) | |
| test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test)) | |
| test_dataset = test_dataset.shuffle(buffer_size=len(X_test)).batch(batch_size) | |
| # # # def get_multiple_samples(model, inputs, num_samples=5): | |
| # # # samples = [model(inputs, training=True) for _ in range(num_samples)] | |
| # # # mean_output = tf.reduce_mean(samples, axis=0) | |
| # # # return mean_output | |
| for epoch in range(num_epochs): | |
| total_loss = 0.0 | |
| correct = 0 | |
| total = 0 | |
| # indices = np.random.permutation(len(x_train)) | |
| # x_train_shuffled = x_train[indices] | |
| # y_train_shuffled = y_train[indices] | |
| for inputs, labels in train_dataset: | |
| labels = tf.squeeze(labels) | |
| # inputs = x_train_shuffled[i:i+batch_size] | |
| # labels = y_train_shuffled[i:i+batch_size] | |
| # inputs = tf.convert_to_tensor(inputs, dtype=tf.float32) | |
| # labels = tf.convert_to_tensor(labels, dtype=tf.int32) | |
| with tf.GradientTape() as tape: | |
| outputs = model(inputs, training=True) | |
| #outputs = outputs[0] | |
| #outputs = get_multiple_samples(model, inputs, num_samples=5) | |
| #print(outputs) | |
| #gradient_penalty = calc_gradient_penalty(X_train, outputs) | |
| loss, _, _ = compute_metrics(outputs, labels, epoch, global_step=epoch, annealing_step=10) | |
| #print(loss) | |
| gradients = tape.gradient(loss, model.trainable_variables) | |
| # gradients_l2 = [tf.norm(grad) for grad in gradients] | |
| # gradients_l2 = [0.000001*(grad_norm - 1)**2 for grad_norm in gradients_l2] | |
| # # Penalize the loss with the L2 norm of gradients | |
| # penalty_weight = 0.001 # Adjust this weight as needed | |
| # penalty = tf.reduce_sum([tf.square(grad) for grad in gradients_l2]) | |
| # loss += penalty_weight * penalty | |
| optimizer.apply_gradients(zip(gradients, model.trainable_variables)) | |
| total_loss += loss.numpy() | |
| predicted = tf.argmax(outputs, axis=1) | |
| predicted = tf.cast(predicted, tf.int32) | |
| total += labels.shape[0] | |
| #labels = tf.squeeze(labels) | |
| #print(predicted) | |
| #print(labels) | |
| correct += tf.reduce_sum(tf.cast(predicted == tf.cast(labels, tf.int32), tf.float32)).numpy() | |
| #print(correct) | |
| #print(len(x_train)) | |
| avg_loss = total_loss / (len(x_train) // batch_size) | |
| accuracy = 100 * correct / len(x_train) | |
| print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%') | |
| if avg_loss < 0.05: | |
| print(f'Stopping training. Loss ({avg_loss:.4f}) is below threshold ({0.05}).') | |
| break | |
| predictions = np.argmax(model.predict(X_test), axis=1) | |
| print(classification_report(y_test, predictions)) | |
| # # # #model.save('test_sngp.keras') | |
| def test(model, test_dataset): | |
| correct = 0 | |
| total = 0 | |
| all_predictions = [] | |
| all_uncertainties = [] | |
| for inputs, labels in test_dataset: | |
| labels = tf.squeeze(labels) | |
| outputs = model(inputs, training=False) | |
| #outputs[0] | |
| predicted = tf.argmax(outputs, axis=1) | |
| predicted = tf.cast(predicted, tf.int32) | |
| _, u, _ = compute_metrics(outputs, labels, epoch=0, global_step=0, annealing_step=10) # Calculate loss and uncertainty | |
| all_predictions.append(predicted.numpy()) | |
| all_uncertainties.append(u.numpy()) | |
| total += labels.shape[0] | |
| correct += tf.reduce_sum(tf.cast(predicted == tf.cast(labels, tf.int32), tf.float32)).numpy() | |
| accuracy = 100 * correct / total | |
| all_predictions = np.concatenate(all_predictions) | |
| all_uncertainties = np.concatenate(all_uncertainties) | |
| print(f'Test Accuracy: {accuracy:.2f}%') | |
| print(f'Shape of predictions array: {all_predictions.shape}') | |
| print(f'Shape of uncertainties array: {all_uncertainties.shape}') | |
| np.save('predictions.npy', all_predictions) | |
| np.save('uncertainties.npy', all_uncertainties) | |
| return accuracy, all_predictions, all_uncertainties | |
| # def add_gaussian_noise_to_image(image, noise_stddev=0.3): | |
| # noise = tf.random.normal(shape=tf.shape(image), mean=0.0, stddev=noise_stddev) | |
| # corrupted_image = tf.clip_by_value(image + noise, 0.0, 1.0) # Clip values to [0, 1] | |
| # return corrupted_image | |
| # # Corrupt the test dataset images with Gaussian noise | |
| # corrupted_test_dataset = test_dataset.map(lambda x, y: (add_gaussian_noise_to_image(x), y)) | |
| # X, y = corrupted_test_dataset | |
| # predictions = np.argmax(model.predict(X), axis=1) | |
| # print(classification_report(y, predictions)) | |
| # _,u,_ = compute_metrics(predictions, y_test, 1, global_step=1, annealing_step=10) | |
| test_accuracy, predictions_1, uncertainties = test(model, test_dataset) | |
| TC_indices = [] # True Certainty (TC) | |
| TU_indices = [] # True Uncertainty (TU) | |
| FU_indices = [] # False Uncertainty (FU) | |
| FC_indices = [] # False Certainty (FC) | |
| for i in range(len(predictions)): | |
| #p = y_pred_mc_dropout[i] | |
| if (predictions[i] == y_test[i]): | |
| if uncertainties[i] < 0.3: | |
| # True certainty (TU): Correct and certain | |
| TC_indices.append(i) | |
| else: | |
| # False certainty (FU): Correct and uncertain | |
| FU_indices.append(i) | |
| else: | |
| # Certain prediction | |
| if uncertainties[i] < 0.3: | |
| # True Unertainty (TC): Incorrect and certain | |
| FC_indices.append(i) | |
| else: | |
| # False Uncertainty (FC): Incorrect and uncertain | |
| TU_indices.append(i) | |
| print('USen:',len(TU_indices) / (len(TU_indices) + len(FC_indices))) | |
| print('USpe:', len(TC_indices) / (len(TC_indices) + len(FU_indices))) | |
| print('UPre:', len(TU_indices) / (len(TU_indices) + len(FU_indices))) | |
| print('UAcc:', (len(TU_indices) + len(TC_indices)) / (len(TU_indices) + len(TC_indices) + len(FU_indices) + len(FC_indices))) | |
| # def combine_images_with_padding(img_index_1, img_index_2, padding_type="top_bottom"): | |
| # """ | |
| # Combines two CIFAR-10 images with padding and normalization. | |
| # Args: | |
| # img_index_1: Index of the first image in the dataset. | |
| # img_index_2: Index of the second image in the dataset. | |
| # padding_type: Type of padding to use ("top_bottom" or "left_right"). | |
| # Returns: | |
| # A combined image tensor. | |
| # """ | |
| # def combine_images_with_padding(img_index_1, img_index_2, padding_type): | |
| # (train_images, train_labels), (test_images, test_labels) = cifar10.load_data() | |
| # img_1 = tf.convert_to_tensor(test_images[img_index_1], dtype=tf.float32) / 255.0 | |
| # img_2 = tf.convert_to_tensor(test_images[img_index_2], dtype=tf.float32) / 255.0 | |
| # if padding_type == "top_bottom": | |
| # padding_amount = (img_2.shape[0] - img_1.shape[0]) // 2 | |
| # top_bottom_padding = tf.zeros((padding_amount, img_1.shape[1], 3)) | |
| # padded_img_1 = tf.concat([top_bottom_padding, img_1, top_bottom_padding], axis=0) | |
| # padded_img_2 = img_2 | |
| # elif padding_type == "left_right": | |
| # padding_amount = (img_2.shape[1] - img_1.shape[1]) // 2 | |
| # left_right_padding = tf.zeros((img_1.shape[0], padding_amount, 3)) | |
| # padded_img_1 = tf.concat([left_right_padding, img_1, left_right_padding], axis=1) | |
| # padded_img_2 = img_2 | |
| # else: | |
| # raise ValueError("Invalid padding type. Choose 'top_bottom' or 'left_right'.") | |
| # combined_img = tf.concat([padded_img_1, padded_img_2], axis=0) | |
| # combined_img_resized = tf.image.resize(combined_img, [32, 32]) | |
| # return combined_img_resized | |
| # img_index_1 = 50 | |
| # img_index_2 = 100 | |
| # padding_type = "top_bottom" | |
| # combined_img = combine_images_with_padding(img_index_1, img_index_2, padding_type) | |
| # combined_img = np.expand_dims(combined_img, axis=0) | |
| # image1_index = 10 | |
| # image2_index = 21 | |
| # combined_img = np.zeros((32, 32)) | |
| # combined_img[:, :-6] += x_train[image1_index][:, 6:] | |
| # combined_img[:, 14:] += x_train[image2_index][:, 5:19] | |
| # combined_img /= combined_img.max() | |
| # combined_img = combined_img.reshape(1, 32, 32, 3) | |
| (train_images, _), (_, _) = mnist.load_data() | |
| mnist_image = train_images[np.random.randint(0, train_images.shape[0])] | |
| rescaled_image = cv2.resize(mnist_image, (32, 32)) | |
| rgb_image = cv2.cvtColor(rescaled_image, cv2.COLOR_GRAY2RGB) | |
| rgb_image = np.expand_dims(rgb_image, axis=0) | |
| # pred_unc = model(combined_img) | |
| pred = model(X_test[0].reshape(1, 32, 32, 3)) | |
| #var = pred.variance().numpy() | |
| pred_rgb = model(rgb_image) | |
| #var_rgb = pred_rgb.variance().numpy() | |
| # l_unc, u_unc, p_unc = compute_metrics(pred_unc, y_test[50], 0, global_step=0, annealing_step=10) | |
| l, u, p = compute_metrics(pred, y_test[0], 0, global_step=0, annealing_step=10) | |
| l_rgb, u_rgb, p_rgb = compute_metrics(pred_rgb, y_test[0], 0, global_step=0, annealing_step=10) | |
| # print('u_unc:',u_unc) | |
| # print('p_unc:',p_unc) | |
| # print('preds:', pred_unc) | |
| print('u:', u) | |
| print('p:', p) | |
| print('pred:', pred) | |
| #print('sd:', var) | |
| print('u_rgb:', u_rgb) | |
| print('p_rgb:', p_rgb) | |
| print('preds:', pred_rgb) | |
| #print('sd_rgb:', var_rgb) | |
| #---------------------------------------------------------------------------------------------------- | |
| #Variance based EDL | |
| def uncertainty(alpha, reduce=True): | |
| S = tf.reduce_sum(alpha, axis=1, keepdims=True) | |
| p = alpha / S | |
| variance = p - tf.square(p) | |
| EU = (alpha / S) * (1 - alpha / S) / (S + 1) | |
| AU = variance - EU | |
| if reduce: | |
| AU = tf.reduce_sum(AU) / alpha.shape[0] | |
| EU = tf.reduce_sum(EU) / alpha.shape[0] | |
| return AU, EU | |
| pred_var = model(rgb_image) | |
| pred_var = exp_evidence(pred_var) | |
| unc_ale, unc_eps = uncertainty(pred_var) | |
| print('u_ale:', unc_ale) | |
| print('p_eps:', unc_eps) | |
| y_pred_probs = model.predict(X_test) | |
| y_pred = np.argmax(y_pred_probs, axis=1) | |
| #----------------------------------------------------------------------------------------------------- | |
| #----------------------------------------------------------------------------------------------------- | |
| #Different Variance based unc | |
| # def total_uncertainty_variance(probs): | |
| # if isinstance(probs, tf.Tensor): | |
| # mean = tf.reduce_mean(probs, axis=2) | |
| # t_u = tf.reduce_sum(mean * (1 - mean), axis=1) | |
| # else: | |
| # probs = tf.convert_to_tensor(probs, dtype=tf.float32) | |
| # mean = tf.reduce_mean(probs, axis=2) | |
| # t_u = tf.reduce_sum(mean * (1 - mean), axis=1) | |
| # return t_u | |
| # def aleatoric_uncertainty_variance(probs): | |
| # if isinstance(probs, tf.Tensor): | |
| # a_u = tf.reduce_mean(tf.reduce_sum(probs * (1 - probs), axis=1), axis=1) | |
| # else: | |
| # probs = tf.convert_to_tensor(probs, dtype=tf.float32) | |
| # a_u = tf.reduce_mean(tf.reduce_sum(probs * (1 - probs), axis=1), axis=1) | |
| # return a_u | |
| # def epistemic_uncertainty_variance(probs): | |
| # if isinstance(probs, tf.Tensor): | |
| # mean = tf.reduce_mean(probs, axis=2, keepdims=True) | |
| # e_u = tf.reduce_mean(tf.reduce_sum(probs * (probs - mean), axis=1), axis=1) | |
| # else: | |
| # probs = tf.convert_to_tensor(probs, dtype=tf.float32) | |
| # mean = tf.reduce_mean(probs, axis=2, keepdims=True) | |
| # e_u = tf.reduce_mean(tf.reduce_sum(probs * (probs - mean), axis=1), axis=1) | |
| # return e_u | |
| # eu = epistemic_uncertainty_variance(pred_rgb) | |
| # au = aleatoric_uncertainty_variance(pred_rgb) | |
| # print('eu:', eu) | |
| # print('au:', au) | |
| #------------------------------------------------------------------------------------------------------ | |
| def softmax(vector): | |
| e = np.exp(vector) | |
| return e / e.sum() | |
| def expected_calibration_error(samples, true_labels, M=5): | |
| # uniform binning approach with M number of bins | |
| bin_boundaries = np.linspace(0, 1, M + 1) | |
| bin_lowers = bin_boundaries[:-1] | |
| bin_uppers = bin_boundaries[1:] | |
| #samples = softmax(samples) | |
| # get max probability per sample i | |
| confidences = np.max(samples, axis=1) | |
| # get predictions from confidences (positional in this case) | |
| predicted_label = np.argmax(samples, axis=1) | |
| # get a boolean list of correct/false predictions | |
| accuracies = predicted_label==true_labels | |
| ece = np.zeros(1) | |
| for bin_lower, bin_upper in zip(bin_lowers, bin_uppers): | |
| # determine if sample is in bin m (between bin lower & upper) | |
| in_bin = np.logical_and(confidences > bin_lower.item(), confidences <= bin_upper.item()) | |
| # can calculate the empirical probability of a sample falling into bin m: (|Bm|/n) | |
| prob_in_bin = in_bin.mean() | |
| if prob_in_bin.item() > 0: | |
| # get the accuracy of bin m: acc(Bm) | |
| accuracy_in_bin = accuracies[in_bin].mean() | |
| # get the average confidence of bin m: conf(Bm) | |
| avg_confidence_in_bin = confidences[in_bin].mean() | |
| # calculate |acc(Bm) - conf(Bm)| * (|Bm|/n) for bin m and add to the total ECE | |
| ece += np.abs(avg_confidence_in_bin - accuracy_in_bin) * prob_in_bin | |
| return ece | |
| ece = expected_calibration_error(y_pred_probs, y_test) | |
| print("Expected Calibration Error:", ece) | |
| # xtest = X_test[0] | |
| # xtest = tf.convert_to_tensor([xtest]) | |
| # # Define the FGSM attack function | |
| # def fgsm_attack(image, label, epsilon): | |
| # with tf.GradientTape() as tape: | |
| # tape.watch(image) | |
| # prediction = model(image) | |
| # prediction = exp_evidence(prediction) + 1 | |
| # loss,_ = mse_loss(label, prediction, epoch_num=1, num_classes=10, annealing_step=10) | |
| # #loss = tf.keras.losses.sparse_categorical_crossentropy(label, prediction) | |
| # gradient = tape.gradient(loss, image) | |
| # signed_grad = tf.sign(gradient) | |
| # adversarial_image = image + epsilon * signed_grad | |
| # adversarial_image = tf.clip_by_value(adversarial_image, -1, 1) | |
| # return adversarial_image | |
| # # Create the adversarial image | |
| # epsilon = 0.5 | |
| # label = tf.convert_to_tensor([y_test[0]], dtype=tf.int64) | |
| # adversarial_image = fgsm_attack(xtest, label, epsilon) | |
| # # Get the model predictions for both images | |
| # original_pred = model(xtest) | |
| # adversarial_pred = model(adversarial_image) | |
| # l1, u1, p1 = compute_metrics(adversarial_pred, y_test[0], 0, global_step=0, annealing_step=10) | |
| # print('u_rgb:', u1) | |
| # print('p_rgb:', p1) | |
| #print('preds:', pred_rgb) | |
| # # # def plot_reliability_diagram(confidences, true_labels, M=5): | |
| # # # """Plots the reliability diagram for the given data.""" | |
| # # # bin_boundaries = np.linspace(0, 1, M + 1) | |
| # # # bin_centers = (bin_boundaries[:-1] + bin_boundaries[1:]) / 2 | |
| # # # # Get binned accuracy (average accuracy for each confidence bin) | |
| # # # binned_accuracy = np.zeros(M) | |
| # # # for i, bin_lower in enumerate(bin_boundaries[:-1]): | |
| # # # bin_upper = bin_boundaries[i + 1] | |
| # # # in_bin = np.logical_and(confidences >= bin_lower, confidences < bin_upper) | |
| # # # if in_bin.sum() > 0: | |
| # # # binned_accuracy[i] = true_labels[in_bin].mean() | |
| # # # # Perfect calibration line (y = x) | |
| # # # perfect_calibration = np.linspace(0, 1, M) | |
| # # # plt.plot(bin_centers, binned_accuracy, 'o', label='Binned Accuracy') | |
| # # # plt.plot(perfect_calibration, perfect_calibration, '-', label='Perfect Calibration') | |
| # # # plt.xlabel('Predicted Probability') | |
| # # # plt.ylabel('Observed Accuracy') | |
| # # # plt.title('Reliability Diagram') | |
| # # # plt.legend() | |
| # # # plt.grid(True) | |
| # # # plt.show() | |
| # # #plot_reliability_diagram(y_pred_probs, y_test) | |
| # # # def fgsm_attack(image, epsilon, data_grad): | |
| # # # # Collect the element-wise sign of the data gradient | |
| # # # sign_data_grad = tf.sign(data_grad) | |
| # # # # Create the perturbed image by adjusting each pixel of the input image | |
| # # # perturbed_image = image + epsilon * sign_data_grad | |
| # # # # Adding clipping to maintain [0,1] range | |
| # # # perturbed_image = tf.clip_by_value(perturbed_image, 0, 1) | |
| # # # # Return the perturbed image | |
| # # # return perturbed_image | |
| # # # # Restores the tensors to their original scale | |
| # # # def denorm(batch, mean=[0.1307], std=[0.3081]): | |
| # # # mean = tf.convert_to_tensor(mean) | |
| # # # std = tf.convert_to_tensor(std) | |
| # # # return batch * std + mean | |
| # # # def test(model, test_dataset, epsilon): | |
| # # # # Accuracy counter | |
| # # # correct = 0 | |
| # # # adv_examples = [] | |
| # # # # Loop over all examples in test set | |
| # # # for data, target in test_dataset: | |
| # # # # Send the data and label to the device | |
| # # # data, target = data.numpy(), target.numpy() | |
| # # # # Set requires_grad attribute of tensor. Important for Attack | |
| # # # data = tf.convert_to_tensor(data, dtype=tf.float32) | |
| # # # with tf.GradientTape() as tape: | |
| # # # tape.watch(data) | |
| # # # # Forward pass the data through the model | |
| # # # output = model(data) | |
| # # # init_pred = tf.argmax(output, axis=1, output_type=tf.int32) | |
| # # # # If the initial prediction is wrong, don't bother attacking, just move on | |
| # # # if not np.array_equal(init_pred.numpy(), target): | |
| # # # continue | |
| # # # # Calculate the loss | |
| # # # loss, _, _ = compute_metrics(outputs, target, epoch=1, global_step=0, annealing_step=10) | |
| # # # # Calculate gradients of model in backward pass | |
| # # # data_grad = tape.gradient(loss, data) | |
| # # # # Call FGSM Attack | |
| # # # perturbed_data = fgsm_attack(data, epsilon, data_grad) | |
| # # # # Re-classify the perturbed image | |
| # # # output = model(perturbed_data) | |
| # # # # Check for success | |
| # # # final_pred = tf.argmax(output, axis=1, output_type=tf.int32) | |
| # # # if np.array_equal(final_pred.numpy(), target): | |
| # # # correct += 1 | |
| # # # # Special case for saving 0 epsilon examples | |
| # # # if epsilon == 0 and len(adv_examples) < 5: | |
| # # # adv_examples.append((init_pred.numpy()[0], final_pred.numpy()[0], perturbed_data.numpy())) | |
| # # # else: | |
| # # # # Save some adv examples for visualization later | |
| # # # if len(adv_examples) < 5: | |
| # # # adv_examples.append((init_pred.numpy()[0], final_pred.numpy()[0], perturbed_data.numpy())) | |
| # # # # Calculate final accuracy for this epsilon | |
| # # # final_acc = correct / float(len(test_dataset)) | |
| # # # print(f"Epsilon: {epsilon}\tTest Accuracy = {correct} / {len(test_dataset)} = {final_acc}") | |
| # # # # Return the accuracy and adversarial examples | |
| # # # return final_acc, adv_examples | |
| # # # accuracies = [] | |
| # # # examples = [] | |
| # # # epsilons = [0,0.05, 0.1, 0.15,0.2,0.25,0.3] | |
| # # # # Run test for each epsilon | |
| # # # for eps in epsilons: | |
| # # # acc, ex = test(model, test_dataset, eps) | |
| # # # accuracies.append(acc) | |
| # # # examples.append(ex) | |
| # # # import matplotlib.pyplot as plt | |
| # # # # Plot accuracy vs epsilon | |
| # # # plt.figure(figsize=(5,5)) | |
| # # # plt.plot(epsilons, accuracies, "*-") | |
| # # # plt.yticks(np.arange(0, 1.1, step=0.1)) | |
| # # # plt.xticks(np.arange(0, .35, step=0.05)) | |
| # # # plt.title("Accuracy vs Epsilon") | |
| # # # plt.xlabel("Epsilon") | |
| # # # plt.ylabel("Accuracy") | |
| # # # plt.grid(True) | |
| # # # plt.show() | |
| # # # # Save the plot as a PNG file | |
| # # # plt.savefig('accuracy_vs_epsilon.png') | |