EDL / EDL.py
Udbhav01's picture
Upload EDL.py
a1588af verified
import keras
from keras.datasets import mnist, cifar10, cifar100
from keras import layers
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras import backend as K
import cv2
import tensorflow as tf
#import GPy
#import gpflow, gpflux
import time
from tensorflow.keras.applications import VGG16,ResNet50
from keras import regularizers
import numpy as np
import sklearn
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score
import sklearn.gaussian_process as gp
from sklearn.gaussian_process import GaussianProcessClassifier
from sklearn.gaussian_process.kernels import RBF, WhiteKernel
import matplotlib.pyplot as plt
# import official.nlp.modeling.layers as nlp_layers
# from official.nlp.modeling.layers import SpectralNormalization
import gp_layer
from sklearn.metrics import roc_auc_score
#%matplotlib inline
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
#Load training data
(X_train, y_train), (X_test, y_test) = cifar10.load_data()
X_train = X_train.astype('float32')
X_test = X_test.astype('float32')
X_train /= 255
X_test /= 255
num_classes = 10
y_train_one_hot = keras.utils.to_categorical(y_train, num_classes)
y_test_one_hot = keras.utils.to_categorical(y_test, num_classes)
print('x_train shape:', X_train.shape)
print(X_train.shape[0], 'train samples')
print(X_test.shape[0], 'test samples')
# kernel = gpflow.kernels.SquaredExponential()
# inducing_variable = gpflow.inducing_variables.InducingPoints(
# np.linspace(0, 1, 128*100).reshape(-1, 128)
# )
# mean = gpflow.mean_functions.Zero()
# invlink = gpflow.likelihoods.RobustMax(10)
# likelihood = gpflow.likelihoods.MultiClass(10, invlink=invlink)
# likelihood_container = gpflux.layers.TrackableLayer()
# likelihood_container.likelihood = likelihood
# loss = gpflux.losses.LikelihoodLoss(likelihood)
gp_layer = gp_layer.RandomFeatureGaussianProcess(units=10,
num_inducing=2048,
normalize_input=True,
scale_random_features=False,
gp_cov_momentum=-1,
return_gp_cov=True)
def feature_extractor(inputs):
feature_extractor = tf.keras.applications.resnet.ResNet50(input_shape=(224, 224, 3),
include_top=False,
weights='imagenet')(inputs)
return feature_extractor
def classifier(inputs):
x = tf.keras.layers.GlobalAveragePooling2D()(inputs)
x = tf.keras.layers.Flatten()(x)
# x = tf.keras.layers.Dropout(0.3)(x)
# x = tf.keras.layers.Dense(256, activation="relu")(x)
# x = tf.keras.layers.Dense(128, activation="relu")(x)
# x = tf.keras.layers.Dropout(0.1)(x)
#x = tf.keras.layers.Dense(10, activation="softmax", name="classification")(x)
#x = tf.keras.layers.SpectralNormalization(tf.keras.layers.Dense(512, activation='relu'))(x)
x = (tf.keras.layers.Dense(256, activation='relu'))(x)
x = (tf.keras.layers.Dense(128, activation='relu'))(x)
x = (tf.keras.layers.Dense(10, activation='linear'))(x)
# outputs = gpflux.layers.GPLayer(mean_function=mean,
# kernel=kernel,
# inducing_variable=inducing_variable,
# num_data=X_train.shape[0],
# num_latent_gps=10)(x)
#outputs, sd = gp_layer(x)
return x
def final_model(inputs):
resize = tf.keras.layers.UpSampling2D(size=(7,7))(inputs)
resnet_feature_extractor = feature_extractor(resize)
classification_output = classifier(resnet_feature_extractor)
return classification_output
# lr_schedule = tf.keras.optimizers.schedules.InverseTimeDecay(
# 0.001,
# decay_steps=20*50,
# decay_rate=1,
# staircase=False)
# def get_optimizer():
# return tf.keras.optimizers.Adam(lr_schedule)
def define_compile_model():
inputs = tf.keras.layers.Input(shape=(32,32,3))
classification_output = final_model(inputs)
model = tf.keras.Model(inputs=inputs, outputs = classification_output)
# model.compile(optimizer=get_optimizer(),
# loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
# metrics = ['accuracy'])
return model
# inputs = tf.keras.Input(shape=(28, 28, 1))
# x = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same')(inputs)
# x = tf.keras.layers.MaxPooling2D((1, 1))(x)
# x = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', padding='same')(x)
# x = tf.keras.layers.MaxPooling2D((2, 2))(x)
# x = tf.keras.layers.Conv2D(256, (3, 3), activation='relu', padding='same')(x)
# x = tf.keras.layers.MaxPooling2D((2, 2))(x)
# x = tf.keras.layers.Conv2D(512, (3, 3), activation='relu', padding='same')(x)
# x = tf.keras.layers.MaxPooling2D((2, 2))(x)
# x = tf.keras.layers.Conv2D(256, (3, 3), activation='relu', padding='same')(x)
# x = tf.keras.layers.MaxPooling2D((2, 2))(x)
# x = tf.keras.layers.Flatten()(x)
# #x = tf.keras.layers.Dropout(0.5)(x)
# x = tf.keras.layers.Dense(256, activation='linear')(x)
# #x = tf.keras.layers.Dense(128, activation='linear')(x)
# #l = tf.keras.layers.Dense(10, activation='linear')(x)
# gp_output, gp_std= gp_layer(x)
# model = tf.keras.Model(inputs=inputs, outputs=gp_output)
model = define_compile_model()
model.summary()
# t = tf.expand_dims(X_train[0], axis=0)
# model(t)[0]
# from tensorflow.keras.callbacks import ReduceLROnPlateau
# lr_schedule = tf.keras.optimizers.schedules.InverseTimeDecay(
# 0.001,
# decay_steps=20*50,
# decay_rate=1,
# staircase=False)
# def get_optimizer():
# return tf.keras.optimizers.Adam(lr_schedule)
# #Compiling the model
# model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), optimizer = get_optimizer(), metrics=['accuracy'])
# # early_stop = EarlyStopping(monitor='val_loss',patience=5)
# # checkpoint = ModelCheckpoint("./Best_model/",save_best_only=True,)
# rlrp = ReduceLROnPlateau(monitor='loss', factor=0.4, verbose=0, patience=2, min_lr=0.0000001)
# # # # Train the model
# model.fit(X_train, y_train, batch_size=32, epochs=20, validation_data=(X_test, y_test), callbacks=[rlrp])
# predictions = np.argmax(model.predict(X_test), axis=1)
# print(classification_report(y_test, predictions))
# print(model(X_train[0].reshape(1,32,32,3)))
#t = X_train[0].reshape(1,32,32,3)
#model.predict(t)
def relu_evidence(logits):
return tf.nn.relu(logits)
def exp_evidence(logits):
return tf.exp(tf.clip_by_value(logits, -10, 10))
def softplus_evidence(logits):
return tf.nn.softplus(((logits + 1)**2) / 2)
# # # def log_marginal_likelihood_gp_layer(model, X_train, y_train):
# # # """Compute the log marginal likelihood for a GP layer within the model."""
# # # gp_layer = model.layers[-1]
# # # kernel = gp_layer.kernel
# # # inducing_points = gp_layer.inducing_variable.Z.numpy()
# # # mean = gp_layer.mean_function
# # # y_train_subset = y_train[:inducing_points.shape[0]].astype(np.float64) # Ensure float64 dtype
# # # K = kernel.K(inducing_points)
# # # K += np.eye(inducing_points.shape[0]) * 1e-6
# # # L = tf.linalg.cholesky(K)
# # # alpha = tf.linalg.cholesky_solve(L, y_train_subset)
# # # log_likelihood = -0.5 * tf.reduce_sum(tf.matmul(tf.transpose(y_train_subset), alpha)) - tf.reduce_sum(tf.math.log(tf.linalg.diag_part(L))) - 0.5 * inducing_points.shape[0] * np.log(2 * np.pi)
# # # return tf.squeeze(log_likelihood)
def kl_divergence(alpha):
# KL divergence for Dirichlet distribution
beta = tf.ones_like(alpha)
S_alpha = tf.reduce_sum(alpha, axis=1, keepdims=True)
S_beta = tf.reduce_sum(beta, axis=1, keepdims=True)
lnB = tf.math.lgamma(S_alpha) - tf.reduce_sum(tf.math.lgamma(alpha), axis=1, keepdims=True)
lnB_uni = tf.reduce_sum(tf.math.lgamma(beta), axis=1, keepdims=True) - tf.math.lgamma(S_beta)
dg0 = tf.math.digamma(S_alpha)
dg1 = tf.math.digamma(alpha)
kl = tf.reduce_sum((alpha - beta) * (dg1 - dg0), axis=1, keepdims=True) + lnB + lnB_uni
return kl
def loglikelihood_loss(y, alpha):
S = tf.reduce_sum(alpha, axis=1, keepdims=True)
S = tf.cast(S, tf.float32)
y = tf.cast(y, tf.float32)
alpha = tf.cast(alpha, tf.float32)
loglikelihood_err = tf.reduce_sum(tf.square(y - (alpha / S)), axis=1, keepdims=True)
loglikelihood_var = tf.reduce_sum(alpha * (S - alpha) / (S * S * (S + 1)), axis=1, keepdims=True)
loglikelihood = loglikelihood_err + loglikelihood_var
return loglikelihood
def mse_loss(y, alpha, epoch_num, num_classes=10, annealing_step=10):
loglikelihood = loglikelihood_loss(y, alpha)
annealing_coef = tf.minimum(
tf.constant(1.0, dtype=tf.float32),
tf.cast(epoch_num / annealing_step, dtype=tf.float32),
)
kl_alpha = (alpha - 1) * (1 - y) + 1
kl_div = annealing_coef * kl_divergence(kl_alpha)
S = tf.reduce_sum(alpha, axis=1, keepdims=True)
vacuity = num_classes / tf.stop_gradient(S)
vacuity = tf.identity(vacuity, name="vacuity")
# gp_layer = model.layers[-1]
# ker = gp_layer.kernel
# ind = gp_layer.inducing_variable
# K = ker.K(inducing_variable.Z) # Kernel matrix at inducing points
# reg = tf.sqrt(tf.reduce_sum(tf.square(K))).numpy()*0.001
#reg = log_marginal_likelihood_gp_layer(model, X_train, y_train_one_hot)
#reg = tf.cast(reg, dtype=tf.float32)
return loglikelihood + kl_div, vacuity
# # # def edl_loss(func, y, alpha, epoch_num, num_classes, annealing_step, device=None):
# # # y = tf.convert_to_tensor(y, dtype=tf.float32)
# # # alpha = tf.convert_to_tensor(alpha, dtype=tf.float32)
# # # S = tf.reduce_sum(alpha, axis=1, keepdims=True)
# # # A = tf.reduce_sum(y * (func(S) - func(alpha)), axis=1, keepdims=True)
# # # annealing_coef = tf.minimum(
# # # tf.constant(1.0, dtype=tf.float32),
# # # tf.constant(epoch_num / annealing_step, dtype=tf.float32),
# # # )
# # # kl_alpha = (alpha - 1) * (1 - y) + 1
# # # kl_div = annealing_coef * kl_divergence(kl_alpha)
# # # S = tf.reduce_sum(alpha, axis=1, keepdims=True)
# # # with tf.GradientTape() as tape:
# # # vacuity = num_classes / tf.stop_gradient(S)
# # # return A + kl_div, vacuity
def compute_metrics(logits, Y, epoch, global_step, annealing_step, lmb=0.0005):
logits = tf.cast(logits, tf.float32)
evidence = exp_evidence(logits)
alpha = evidence + 1
alpha = tf.cast(alpha, tf.float32)
Y_onehot = tf.one_hot(Y, depth=10)
K = 10
if len(alpha.shape) == 1:
u = K / tf.reduce_sum(alpha)
else:
u = K / tf.reduce_sum(alpha, axis=1, keepdims=True)
#u = K / tf.reduce_sum(alpha, axis=1, keepdims=True) # uncertainty
prob = alpha / tf.reduce_sum(alpha, axis=1, keepdims=True)
mse_loss_val, vacuity = mse_loss(Y_onehot, alpha, epoch, num_classes, annealing_step)
loss = tf.reduce_mean(mse_loss_val)
output_correct = logits * Y_onehot
#print(vacuity * output_correct)
loss -= (tf.reduce_sum(vacuity * output_correct) / tf.cast(tf.shape(output_correct)[0], tf.float32))
#print(loss)
# loss, vacuity = mse_loss(Y_onehot, alpha, epoch)
# l2 = model.l2_loss_last_layers()
# loss = tf.reduce_mean(loss) + lmb * l2
return loss, u, prob
x_train = np.array(X_train)
y_train = np.array(y_train)
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)
model.compile(optimizer=optimizer)
num_epochs = 15
batch_size = 32
train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=len(X_train)).batch(batch_size)
test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test))
test_dataset = test_dataset.shuffle(buffer_size=len(X_test)).batch(batch_size)
# # # def get_multiple_samples(model, inputs, num_samples=5):
# # # samples = [model(inputs, training=True) for _ in range(num_samples)]
# # # mean_output = tf.reduce_mean(samples, axis=0)
# # # return mean_output
for epoch in range(num_epochs):
total_loss = 0.0
correct = 0
total = 0
# indices = np.random.permutation(len(x_train))
# x_train_shuffled = x_train[indices]
# y_train_shuffled = y_train[indices]
for inputs, labels in train_dataset:
labels = tf.squeeze(labels)
# inputs = x_train_shuffled[i:i+batch_size]
# labels = y_train_shuffled[i:i+batch_size]
# inputs = tf.convert_to_tensor(inputs, dtype=tf.float32)
# labels = tf.convert_to_tensor(labels, dtype=tf.int32)
with tf.GradientTape() as tape:
outputs = model(inputs, training=True)
#outputs = outputs[0]
#outputs = get_multiple_samples(model, inputs, num_samples=5)
#print(outputs)
#gradient_penalty = calc_gradient_penalty(X_train, outputs)
loss, _, _ = compute_metrics(outputs, labels, epoch, global_step=epoch, annealing_step=10)
#print(loss)
gradients = tape.gradient(loss, model.trainable_variables)
# gradients_l2 = [tf.norm(grad) for grad in gradients]
# gradients_l2 = [0.000001*(grad_norm - 1)**2 for grad_norm in gradients_l2]
# # Penalize the loss with the L2 norm of gradients
# penalty_weight = 0.001 # Adjust this weight as needed
# penalty = tf.reduce_sum([tf.square(grad) for grad in gradients_l2])
# loss += penalty_weight * penalty
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
total_loss += loss.numpy()
predicted = tf.argmax(outputs, axis=1)
predicted = tf.cast(predicted, tf.int32)
total += labels.shape[0]
#labels = tf.squeeze(labels)
#print(predicted)
#print(labels)
correct += tf.reduce_sum(tf.cast(predicted == tf.cast(labels, tf.int32), tf.float32)).numpy()
#print(correct)
#print(len(x_train))
avg_loss = total_loss / (len(x_train) // batch_size)
accuracy = 100 * correct / len(x_train)
print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%')
if avg_loss < 0.05:
print(f'Stopping training. Loss ({avg_loss:.4f}) is below threshold ({0.05}).')
break
predictions = np.argmax(model.predict(X_test), axis=1)
print(classification_report(y_test, predictions))
# # # #model.save('test_sngp.keras')
def test(model, test_dataset):
correct = 0
total = 0
all_predictions = []
all_uncertainties = []
for inputs, labels in test_dataset:
labels = tf.squeeze(labels)
outputs = model(inputs, training=False)
#outputs[0]
predicted = tf.argmax(outputs, axis=1)
predicted = tf.cast(predicted, tf.int32)
_, u, _ = compute_metrics(outputs, labels, epoch=0, global_step=0, annealing_step=10) # Calculate loss and uncertainty
all_predictions.append(predicted.numpy())
all_uncertainties.append(u.numpy())
total += labels.shape[0]
correct += tf.reduce_sum(tf.cast(predicted == tf.cast(labels, tf.int32), tf.float32)).numpy()
accuracy = 100 * correct / total
all_predictions = np.concatenate(all_predictions)
all_uncertainties = np.concatenate(all_uncertainties)
print(f'Test Accuracy: {accuracy:.2f}%')
print(f'Shape of predictions array: {all_predictions.shape}')
print(f'Shape of uncertainties array: {all_uncertainties.shape}')
np.save('predictions.npy', all_predictions)
np.save('uncertainties.npy', all_uncertainties)
return accuracy, all_predictions, all_uncertainties
# def add_gaussian_noise_to_image(image, noise_stddev=0.3):
# noise = tf.random.normal(shape=tf.shape(image), mean=0.0, stddev=noise_stddev)
# corrupted_image = tf.clip_by_value(image + noise, 0.0, 1.0) # Clip values to [0, 1]
# return corrupted_image
# # Corrupt the test dataset images with Gaussian noise
# corrupted_test_dataset = test_dataset.map(lambda x, y: (add_gaussian_noise_to_image(x), y))
# X, y = corrupted_test_dataset
# predictions = np.argmax(model.predict(X), axis=1)
# print(classification_report(y, predictions))
# _,u,_ = compute_metrics(predictions, y_test, 1, global_step=1, annealing_step=10)
test_accuracy, predictions_1, uncertainties = test(model, test_dataset)
TC_indices = [] # True Certainty (TC)
TU_indices = [] # True Uncertainty (TU)
FU_indices = [] # False Uncertainty (FU)
FC_indices = [] # False Certainty (FC)
for i in range(len(predictions)):
#p = y_pred_mc_dropout[i]
if (predictions[i] == y_test[i]):
if uncertainties[i] < 0.3:
# True certainty (TU): Correct and certain
TC_indices.append(i)
else:
# False certainty (FU): Correct and uncertain
FU_indices.append(i)
else:
# Certain prediction
if uncertainties[i] < 0.3:
# True Unertainty (TC): Incorrect and certain
FC_indices.append(i)
else:
# False Uncertainty (FC): Incorrect and uncertain
TU_indices.append(i)
print('USen:',len(TU_indices) / (len(TU_indices) + len(FC_indices)))
print('USpe:', len(TC_indices) / (len(TC_indices) + len(FU_indices)))
print('UPre:', len(TU_indices) / (len(TU_indices) + len(FU_indices)))
print('UAcc:', (len(TU_indices) + len(TC_indices)) / (len(TU_indices) + len(TC_indices) + len(FU_indices) + len(FC_indices)))
# def combine_images_with_padding(img_index_1, img_index_2, padding_type="top_bottom"):
# """
# Combines two CIFAR-10 images with padding and normalization.
# Args:
# img_index_1: Index of the first image in the dataset.
# img_index_2: Index of the second image in the dataset.
# padding_type: Type of padding to use ("top_bottom" or "left_right").
# Returns:
# A combined image tensor.
# """
# def combine_images_with_padding(img_index_1, img_index_2, padding_type):
# (train_images, train_labels), (test_images, test_labels) = cifar10.load_data()
# img_1 = tf.convert_to_tensor(test_images[img_index_1], dtype=tf.float32) / 255.0
# img_2 = tf.convert_to_tensor(test_images[img_index_2], dtype=tf.float32) / 255.0
# if padding_type == "top_bottom":
# padding_amount = (img_2.shape[0] - img_1.shape[0]) // 2
# top_bottom_padding = tf.zeros((padding_amount, img_1.shape[1], 3))
# padded_img_1 = tf.concat([top_bottom_padding, img_1, top_bottom_padding], axis=0)
# padded_img_2 = img_2
# elif padding_type == "left_right":
# padding_amount = (img_2.shape[1] - img_1.shape[1]) // 2
# left_right_padding = tf.zeros((img_1.shape[0], padding_amount, 3))
# padded_img_1 = tf.concat([left_right_padding, img_1, left_right_padding], axis=1)
# padded_img_2 = img_2
# else:
# raise ValueError("Invalid padding type. Choose 'top_bottom' or 'left_right'.")
# combined_img = tf.concat([padded_img_1, padded_img_2], axis=0)
# combined_img_resized = tf.image.resize(combined_img, [32, 32])
# return combined_img_resized
# img_index_1 = 50
# img_index_2 = 100
# padding_type = "top_bottom"
# combined_img = combine_images_with_padding(img_index_1, img_index_2, padding_type)
# combined_img = np.expand_dims(combined_img, axis=0)
# image1_index = 10
# image2_index = 21
# combined_img = np.zeros((32, 32))
# combined_img[:, :-6] += x_train[image1_index][:, 6:]
# combined_img[:, 14:] += x_train[image2_index][:, 5:19]
# combined_img /= combined_img.max()
# combined_img = combined_img.reshape(1, 32, 32, 3)
(train_images, _), (_, _) = mnist.load_data()
mnist_image = train_images[np.random.randint(0, train_images.shape[0])]
rescaled_image = cv2.resize(mnist_image, (32, 32))
rgb_image = cv2.cvtColor(rescaled_image, cv2.COLOR_GRAY2RGB)
rgb_image = np.expand_dims(rgb_image, axis=0)
# pred_unc = model(combined_img)
pred = model(X_test[0].reshape(1, 32, 32, 3))
#var = pred.variance().numpy()
pred_rgb = model(rgb_image)
#var_rgb = pred_rgb.variance().numpy()
# l_unc, u_unc, p_unc = compute_metrics(pred_unc, y_test[50], 0, global_step=0, annealing_step=10)
l, u, p = compute_metrics(pred, y_test[0], 0, global_step=0, annealing_step=10)
l_rgb, u_rgb, p_rgb = compute_metrics(pred_rgb, y_test[0], 0, global_step=0, annealing_step=10)
# print('u_unc:',u_unc)
# print('p_unc:',p_unc)
# print('preds:', pred_unc)
print('u:', u)
print('p:', p)
print('pred:', pred)
#print('sd:', var)
print('u_rgb:', u_rgb)
print('p_rgb:', p_rgb)
print('preds:', pred_rgb)
#print('sd_rgb:', var_rgb)
#----------------------------------------------------------------------------------------------------
#Variance based EDL
def uncertainty(alpha, reduce=True):
S = tf.reduce_sum(alpha, axis=1, keepdims=True)
p = alpha / S
variance = p - tf.square(p)
EU = (alpha / S) * (1 - alpha / S) / (S + 1)
AU = variance - EU
if reduce:
AU = tf.reduce_sum(AU) / alpha.shape[0]
EU = tf.reduce_sum(EU) / alpha.shape[0]
return AU, EU
pred_var = model(rgb_image)
pred_var = exp_evidence(pred_var)
unc_ale, unc_eps = uncertainty(pred_var)
print('u_ale:', unc_ale)
print('p_eps:', unc_eps)
y_pred_probs = model.predict(X_test)
y_pred = np.argmax(y_pred_probs, axis=1)
#-----------------------------------------------------------------------------------------------------
#-----------------------------------------------------------------------------------------------------
#Different Variance based unc
# def total_uncertainty_variance(probs):
# if isinstance(probs, tf.Tensor):
# mean = tf.reduce_mean(probs, axis=2)
# t_u = tf.reduce_sum(mean * (1 - mean), axis=1)
# else:
# probs = tf.convert_to_tensor(probs, dtype=tf.float32)
# mean = tf.reduce_mean(probs, axis=2)
# t_u = tf.reduce_sum(mean * (1 - mean), axis=1)
# return t_u
# def aleatoric_uncertainty_variance(probs):
# if isinstance(probs, tf.Tensor):
# a_u = tf.reduce_mean(tf.reduce_sum(probs * (1 - probs), axis=1), axis=1)
# else:
# probs = tf.convert_to_tensor(probs, dtype=tf.float32)
# a_u = tf.reduce_mean(tf.reduce_sum(probs * (1 - probs), axis=1), axis=1)
# return a_u
# def epistemic_uncertainty_variance(probs):
# if isinstance(probs, tf.Tensor):
# mean = tf.reduce_mean(probs, axis=2, keepdims=True)
# e_u = tf.reduce_mean(tf.reduce_sum(probs * (probs - mean), axis=1), axis=1)
# else:
# probs = tf.convert_to_tensor(probs, dtype=tf.float32)
# mean = tf.reduce_mean(probs, axis=2, keepdims=True)
# e_u = tf.reduce_mean(tf.reduce_sum(probs * (probs - mean), axis=1), axis=1)
# return e_u
# eu = epistemic_uncertainty_variance(pred_rgb)
# au = aleatoric_uncertainty_variance(pred_rgb)
# print('eu:', eu)
# print('au:', au)
#------------------------------------------------------------------------------------------------------
def softmax(vector):
e = np.exp(vector)
return e / e.sum()
def expected_calibration_error(samples, true_labels, M=5):
# uniform binning approach with M number of bins
bin_boundaries = np.linspace(0, 1, M + 1)
bin_lowers = bin_boundaries[:-1]
bin_uppers = bin_boundaries[1:]
#samples = softmax(samples)
# get max probability per sample i
confidences = np.max(samples, axis=1)
# get predictions from confidences (positional in this case)
predicted_label = np.argmax(samples, axis=1)
# get a boolean list of correct/false predictions
accuracies = predicted_label==true_labels
ece = np.zeros(1)
for bin_lower, bin_upper in zip(bin_lowers, bin_uppers):
# determine if sample is in bin m (between bin lower & upper)
in_bin = np.logical_and(confidences > bin_lower.item(), confidences <= bin_upper.item())
# can calculate the empirical probability of a sample falling into bin m: (|Bm|/n)
prob_in_bin = in_bin.mean()
if prob_in_bin.item() > 0:
# get the accuracy of bin m: acc(Bm)
accuracy_in_bin = accuracies[in_bin].mean()
# get the average confidence of bin m: conf(Bm)
avg_confidence_in_bin = confidences[in_bin].mean()
# calculate |acc(Bm) - conf(Bm)| * (|Bm|/n) for bin m and add to the total ECE
ece += np.abs(avg_confidence_in_bin - accuracy_in_bin) * prob_in_bin
return ece
ece = expected_calibration_error(y_pred_probs, y_test)
print("Expected Calibration Error:", ece)
# xtest = X_test[0]
# xtest = tf.convert_to_tensor([xtest])
# # Define the FGSM attack function
# def fgsm_attack(image, label, epsilon):
# with tf.GradientTape() as tape:
# tape.watch(image)
# prediction = model(image)
# prediction = exp_evidence(prediction) + 1
# loss,_ = mse_loss(label, prediction, epoch_num=1, num_classes=10, annealing_step=10)
# #loss = tf.keras.losses.sparse_categorical_crossentropy(label, prediction)
# gradient = tape.gradient(loss, image)
# signed_grad = tf.sign(gradient)
# adversarial_image = image + epsilon * signed_grad
# adversarial_image = tf.clip_by_value(adversarial_image, -1, 1)
# return adversarial_image
# # Create the adversarial image
# epsilon = 0.5
# label = tf.convert_to_tensor([y_test[0]], dtype=tf.int64)
# adversarial_image = fgsm_attack(xtest, label, epsilon)
# # Get the model predictions for both images
# original_pred = model(xtest)
# adversarial_pred = model(adversarial_image)
# l1, u1, p1 = compute_metrics(adversarial_pred, y_test[0], 0, global_step=0, annealing_step=10)
# print('u_rgb:', u1)
# print('p_rgb:', p1)
#print('preds:', pred_rgb)
# # # def plot_reliability_diagram(confidences, true_labels, M=5):
# # # """Plots the reliability diagram for the given data."""
# # # bin_boundaries = np.linspace(0, 1, M + 1)
# # # bin_centers = (bin_boundaries[:-1] + bin_boundaries[1:]) / 2
# # # # Get binned accuracy (average accuracy for each confidence bin)
# # # binned_accuracy = np.zeros(M)
# # # for i, bin_lower in enumerate(bin_boundaries[:-1]):
# # # bin_upper = bin_boundaries[i + 1]
# # # in_bin = np.logical_and(confidences >= bin_lower, confidences < bin_upper)
# # # if in_bin.sum() > 0:
# # # binned_accuracy[i] = true_labels[in_bin].mean()
# # # # Perfect calibration line (y = x)
# # # perfect_calibration = np.linspace(0, 1, M)
# # # plt.plot(bin_centers, binned_accuracy, 'o', label='Binned Accuracy')
# # # plt.plot(perfect_calibration, perfect_calibration, '-', label='Perfect Calibration')
# # # plt.xlabel('Predicted Probability')
# # # plt.ylabel('Observed Accuracy')
# # # plt.title('Reliability Diagram')
# # # plt.legend()
# # # plt.grid(True)
# # # plt.show()
# # #plot_reliability_diagram(y_pred_probs, y_test)
# # # def fgsm_attack(image, epsilon, data_grad):
# # # # Collect the element-wise sign of the data gradient
# # # sign_data_grad = tf.sign(data_grad)
# # # # Create the perturbed image by adjusting each pixel of the input image
# # # perturbed_image = image + epsilon * sign_data_grad
# # # # Adding clipping to maintain [0,1] range
# # # perturbed_image = tf.clip_by_value(perturbed_image, 0, 1)
# # # # Return the perturbed image
# # # return perturbed_image
# # # # Restores the tensors to their original scale
# # # def denorm(batch, mean=[0.1307], std=[0.3081]):
# # # mean = tf.convert_to_tensor(mean)
# # # std = tf.convert_to_tensor(std)
# # # return batch * std + mean
# # # def test(model, test_dataset, epsilon):
# # # # Accuracy counter
# # # correct = 0
# # # adv_examples = []
# # # # Loop over all examples in test set
# # # for data, target in test_dataset:
# # # # Send the data and label to the device
# # # data, target = data.numpy(), target.numpy()
# # # # Set requires_grad attribute of tensor. Important for Attack
# # # data = tf.convert_to_tensor(data, dtype=tf.float32)
# # # with tf.GradientTape() as tape:
# # # tape.watch(data)
# # # # Forward pass the data through the model
# # # output = model(data)
# # # init_pred = tf.argmax(output, axis=1, output_type=tf.int32)
# # # # If the initial prediction is wrong, don't bother attacking, just move on
# # # if not np.array_equal(init_pred.numpy(), target):
# # # continue
# # # # Calculate the loss
# # # loss, _, _ = compute_metrics(outputs, target, epoch=1, global_step=0, annealing_step=10)
# # # # Calculate gradients of model in backward pass
# # # data_grad = tape.gradient(loss, data)
# # # # Call FGSM Attack
# # # perturbed_data = fgsm_attack(data, epsilon, data_grad)
# # # # Re-classify the perturbed image
# # # output = model(perturbed_data)
# # # # Check for success
# # # final_pred = tf.argmax(output, axis=1, output_type=tf.int32)
# # # if np.array_equal(final_pred.numpy(), target):
# # # correct += 1
# # # # Special case for saving 0 epsilon examples
# # # if epsilon == 0 and len(adv_examples) < 5:
# # # adv_examples.append((init_pred.numpy()[0], final_pred.numpy()[0], perturbed_data.numpy()))
# # # else:
# # # # Save some adv examples for visualization later
# # # if len(adv_examples) < 5:
# # # adv_examples.append((init_pred.numpy()[0], final_pred.numpy()[0], perturbed_data.numpy()))
# # # # Calculate final accuracy for this epsilon
# # # final_acc = correct / float(len(test_dataset))
# # # print(f"Epsilon: {epsilon}\tTest Accuracy = {correct} / {len(test_dataset)} = {final_acc}")
# # # # Return the accuracy and adversarial examples
# # # return final_acc, adv_examples
# # # accuracies = []
# # # examples = []
# # # epsilons = [0,0.05, 0.1, 0.15,0.2,0.25,0.3]
# # # # Run test for each epsilon
# # # for eps in epsilons:
# # # acc, ex = test(model, test_dataset, eps)
# # # accuracies.append(acc)
# # # examples.append(ex)
# # # import matplotlib.pyplot as plt
# # # # Plot accuracy vs epsilon
# # # plt.figure(figsize=(5,5))
# # # plt.plot(epsilons, accuracies, "*-")
# # # plt.yticks(np.arange(0, 1.1, step=0.1))
# # # plt.xticks(np.arange(0, .35, step=0.05))
# # # plt.title("Accuracy vs Epsilon")
# # # plt.xlabel("Epsilon")
# # # plt.ylabel("Accuracy")
# # # plt.grid(True)
# # # plt.show()
# # # # Save the plot as a PNG file
# # # plt.savefig('accuracy_vs_epsilon.png')