File size: 4,106 Bytes
f0f2d1e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
import json
import time
import numpy as np
import torch
import torch.nn as nn
import wandb
from fvcore.nn import FlopCountAnalysis
from sklearn.metrics import roc_curve
from torchvision import models, transforms
from ndlinear import NdLinear
transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.RandomHorizontalFlip(),
transforms.ColorJitter(brightness=0.2, contrast=0.2),
transforms.RandomRotation(10),
transforms.RandomResizedCrop((224, 224), scale=(0.8, 1.0)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
class ReshapedNdLinear(torch.nn.Module):
def __init__(self, nd_linear_layer):
super(ReshapedNdLinear, self).__init__()
self.nd_linear = nd_linear_layer
def forward(self, x):
x = x.reshape(*x.shape, 1)
x = self.nd_linear(x)
return x.view(x.size(0), -1)
def print_cpu_layers(model):
found_cpu_layer = False
for name, module in model.named_modules():
if any(p.device.type == 'cpu' for p in module.parameters(recurse=False)):
print(f"Layer: {name}, Device: CPU")
found_cpu_layer = True
if not found_cpu_layer:
print("No layers are on the CPU.")
def calculate_flops(model, input_tensor):
model.eval()
device = next(model.parameters()).device
input_tensor = input_tensor.to(device)
flops_analysis = FlopCountAnalysis(model, input_tensor)
flops = flops_analysis.total()
return flops
def print_model_parameters(model):
return sum(p.numel() for p in model.parameters())
def measure_latency_and_flops_cuda(model, input_tensor, warmup=10, runs=100):
assert torch.cuda.is_available(), "CUDA is not available."
device = torch.device('cuda')
model.to(device)
input_tensor = input_tensor.to(device)
model.eval()
torch.backends.cudnn.benchmark = True
with torch.no_grad():
for _ in range(warmup):
_ = model(input_tensor)
torch.cuda.synchronize()
timings = []
with torch.no_grad():
for _ in range(runs):
start = time.time()
_ = model(input_tensor)
torch.cuda.synchronize()
end = time.time()
timings.append(end - start)
avg_latency = sum(timings) / len(timings)
flops = calculate_flops(model, input_tensor[:1, ...])
print(f"Average CUDA Latency over {runs} runs: {avg_latency * 1000:.3f} ms")
print(f"Approx. FPS: {1.0 / avg_latency:.2f}")
print(f"Approx. Flops: {flops / 10 ** 9:.2f} GFlops")
return avg_latency, flops
def modify_and_evaluate_backbone(model, cfg):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.train()
in_features = model.fc.in_features
fc_nd = NdLinear((in_features, 1), (cfg.embedding_size // 32, 32))
reshaped_fc = ReshapedNdLinear(fc_nd).to(device)
# Add dropout to the student model's fully connected layer
model.fc = nn.Sequential(
nn.Dropout(p=0.2),
reshaped_fc
)
for param in model.fc.parameters():
param.requires_grad = True
total_params = print_model_parameters(model)
wandb.log({"total_parameters": total_params})
model.to(device)
print_cpu_layers(model)
print(model)
return model
def load_config(config_path='config.json'):
try:
with open(config_path, 'r') as f:
return json.load(f)
except FileNotFoundError as fe:
config = {
"learning_rate": 0.001, # Adjusted learning rate
"epochs": 1000,
"batch_size": 32,
"eval_batch_size": 512,
"eval_every": 1000
}
return config
def find_optimal_threshold(embeddings1, embeddings2, labels):
cosine_sim = np.sum(embeddings1 * embeddings2, axis=1)
fpr, tpr, thresholds = roc_curve(labels, cosine_sim)
# Youden's J statistic
j_scores = tpr - fpr
optimal_idx = np.argmax(j_scores)
optimal_threshold = thresholds[optimal_idx]
return optimal_threshold |