resnet50nd-grayscale / models /ndlinear_util.py
zhongfang-zhuang's picture
Upload folder using huggingface_hub
f0f2d1e verified
import json
import time
import numpy as np
import torch
import torch.nn as nn
import wandb
from fvcore.nn import FlopCountAnalysis
from sklearn.metrics import roc_curve
from torchvision import models, transforms
from ndlinear import NdLinear
transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.RandomHorizontalFlip(),
transforms.ColorJitter(brightness=0.2, contrast=0.2),
transforms.RandomRotation(10),
transforms.RandomResizedCrop((224, 224), scale=(0.8, 1.0)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
class ReshapedNdLinear(torch.nn.Module):
def __init__(self, nd_linear_layer):
super(ReshapedNdLinear, self).__init__()
self.nd_linear = nd_linear_layer
def forward(self, x):
x = x.reshape(*x.shape, 1)
x = self.nd_linear(x)
return x.view(x.size(0), -1)
def print_cpu_layers(model):
found_cpu_layer = False
for name, module in model.named_modules():
if any(p.device.type == 'cpu' for p in module.parameters(recurse=False)):
print(f"Layer: {name}, Device: CPU")
found_cpu_layer = True
if not found_cpu_layer:
print("No layers are on the CPU.")
def calculate_flops(model, input_tensor):
model.eval()
device = next(model.parameters()).device
input_tensor = input_tensor.to(device)
flops_analysis = FlopCountAnalysis(model, input_tensor)
flops = flops_analysis.total()
return flops
def print_model_parameters(model):
return sum(p.numel() for p in model.parameters())
def measure_latency_and_flops_cuda(model, input_tensor, warmup=10, runs=100):
assert torch.cuda.is_available(), "CUDA is not available."
device = torch.device('cuda')
model.to(device)
input_tensor = input_tensor.to(device)
model.eval()
torch.backends.cudnn.benchmark = True
with torch.no_grad():
for _ in range(warmup):
_ = model(input_tensor)
torch.cuda.synchronize()
timings = []
with torch.no_grad():
for _ in range(runs):
start = time.time()
_ = model(input_tensor)
torch.cuda.synchronize()
end = time.time()
timings.append(end - start)
avg_latency = sum(timings) / len(timings)
flops = calculate_flops(model, input_tensor[:1, ...])
print(f"Average CUDA Latency over {runs} runs: {avg_latency * 1000:.3f} ms")
print(f"Approx. FPS: {1.0 / avg_latency:.2f}")
print(f"Approx. Flops: {flops / 10 ** 9:.2f} GFlops")
return avg_latency, flops
def modify_and_evaluate_backbone(model, cfg):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.train()
in_features = model.fc.in_features
fc_nd = NdLinear((in_features, 1), (cfg.embedding_size // 32, 32))
reshaped_fc = ReshapedNdLinear(fc_nd).to(device)
# Add dropout to the student model's fully connected layer
model.fc = nn.Sequential(
nn.Dropout(p=0.2),
reshaped_fc
)
for param in model.fc.parameters():
param.requires_grad = True
total_params = print_model_parameters(model)
wandb.log({"total_parameters": total_params})
model.to(device)
print_cpu_layers(model)
print(model)
return model
def load_config(config_path='config.json'):
try:
with open(config_path, 'r') as f:
return json.load(f)
except FileNotFoundError as fe:
config = {
"learning_rate": 0.001, # Adjusted learning rate
"epochs": 1000,
"batch_size": 32,
"eval_batch_size": 512,
"eval_every": 1000
}
return config
def find_optimal_threshold(embeddings1, embeddings2, labels):
cosine_sim = np.sum(embeddings1 * embeddings2, axis=1)
fpr, tpr, thresholds = roc_curve(labels, cosine_sim)
# Youden's J statistic
j_scores = tpr - fpr
optimal_idx = np.argmax(j_scores)
optimal_threshold = thresholds[optimal_idx]
return optimal_threshold