FibrilSegNet / training-model /train_fibril_segment.py
himanshuch8055's picture
Implement DeepLabV3+ with EfficientNet-B3 for fibril segmentation; add GPU selection, data preparation, and training loop
4971505
# =============== Fibril Segmentation — DeepLabV3+ with EfficientNet-B3 ===============
import os, random, subprocess
from glob import glob
import numpy as np
from PIL import Image
from tqdm import tqdm
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
import torch
torch.cuda.empty_cache()
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import albumentations as A
from albumentations.pytorch import ToTensorV2
import segmentation_models_pytorch as smp
import json
from sklearn.utils import shuffle
import os
import subprocess
# ─── GPU Selection Function ───────────────────────────────
def get_free_gpu(threshold_mb=1000):
try:
result = subprocess.run(
["nvidia-smi", "--query-gpu=memory.used,memory.total", "--format=csv,nounits,noheader"],
stdout=subprocess.PIPE, text=True
)
for idx, line in enumerate(result.stdout.strip().split("\n")):
used, total = map(int, line.split(","))
if total - used > threshold_mb:
return str(idx)
except Exception as e:
print("GPU check failed:", e)
return None
# ─── Find Free GPU BEFORE Defining Config ────────────────
free_gpu_id = get_free_gpu()
# ─── Configurations ───────────────────────────────────────
config = {
"seed": 42,
"img_size": 512,
"batch_size": 2,
"num_workers": 4,
"epochs": 100,
"lr": 1e-4,
"train_img_dir": "./alldataset/images",
"train_mask_dir": "./alldataset/masks",
"save_path": "./trained-models/encoder_resnest101e_decoder_UnetPlusPlus_fibril_seg_model.pth",
"gpu_id": free_gpu_id,
}
# ─── GPU Setup ────────────────────────────────────────────
if config["gpu_id"] is not None:
os.environ["CUDA_VISIBLE_DEVICES"] = config["gpu_id"]
print(f"✅ Using GPU ID: {config['gpu_id']}")
else:
print("⚠️ No free GPU detected — training may use default device or fail")
# ─── Reproducibility ───────────────────────────────────────
def seed_everything(seed=42):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
seed_everything(config["seed"])
# ─── Dataset ───────────────────────────────────────────────
class FibrilSegmentationDataset(torch.utils.data.Dataset):
def __init__(self, image_paths, mask_paths, transform=None):
self.image_paths = image_paths
self.mask_paths = mask_paths
self.transform = transform
def __len__(self): return len(self.image_paths)
def __getitem__(self, idx):
image = np.array(Image.open(self.image_paths[idx]).convert("L"))
mask = (np.array(Image.open(self.mask_paths[idx]).convert("L")) > 127).astype(np.float32)
if self.transform:
aug = self.transform(image=image, mask=mask)
image, mask = aug['image'], aug['mask']
return image, mask.unsqueeze(0)
# ─── Image-Mask Matcher ────────────────────────────────────
def match_images_and_masks(img_dir, mask_dir, img_exts=("jpg", "jpeg", "png"), mask_exts=("jpg", "png")):
image_paths, mask_paths = [], []
for ext in img_exts:
for img_path in glob(f"{img_dir}/*.{ext}"):
base = os.path.splitext(os.path.basename(img_path))[0]
for mext in mask_exts:
mask_path = os.path.join(mask_dir, f"{base}-vectors.{mext}")
if os.path.exists(mask_path):
image_paths.append(img_path)
mask_paths.append(mask_path)
break
return image_paths, mask_paths
# ─── Loss Function ─────────────────────────────────────────
class DiceBCELoss(nn.Module):
def __init__(self):
super().__init__()
self.bce = nn.BCEWithLogitsLoss()
# def forward(self, inputs, targets):
# inputs = torch.sigmoid(inputs)
# intersection = (inputs * targets).sum()
# dice = (2. * intersection + 1e-6) / (inputs.sum() + targets.sum() + 1e-6)
# return 1 - dice + self.bce(inputs, targets)
def forward(self, inputs, targets):
bce_loss = self.bce(inputs, targets) # Raw logits
inputs = torch.sigmoid(inputs) # Probabilities for Dice
intersection = (inputs * targets).sum()
dice_loss = 1 - (2. * intersection + 1e-6) / (inputs.sum() + targets.sum() + 1e-6)
return dice_loss + bce_loss
# ─── Metrics ───────────────────────────────────────────────
@torch.no_grad()
def dice_coeff(pred, target, smooth=1e-6):
pred = (torch.sigmoid(pred) > 0.5).float()
intersection = (pred * target).sum()
return (2. * intersection + smooth) / (pred.sum() + target.sum() + smooth)
@torch.no_grad()
def iou_score(pred, target, smooth=1e-6):
pred = (torch.sigmoid(pred) > 0.5).float()
intersection = (pred * target).sum()
union = pred.sum() + target.sum() - intersection
return (intersection + smooth) / (union + smooth)
# ─── Data Preparation ──────────────────────────────────────
# image_paths, mask_paths = match_images_and_masks(config["train_img_dir"], config["train_mask_dir"])
# split = int(0.8 * len(image_paths))
# train_imgs, val_imgs = image_paths[:split], image_paths[split:]
# train_masks, val_masks = mask_paths[:split], mask_paths[split:]
# ─── Data Preparation with persistent train/val split ──────
split_path = "train_val_split.json"
if os.path.exists(split_path):
print(f"Loading saved train/val split from {split_path}")
with open(split_path, "r") as f:
split_data = json.load(f)
train_imgs = split_data["train_images"]
train_masks = split_data["train_masks"]
val_imgs = split_data["val_images"]
val_masks = split_data["val_masks"]
else:
print("Creating new train/val split and saving it...")
image_paths, mask_paths = match_images_and_masks(config["train_img_dir"], config["train_mask_dir"])
# Shuffle dataset to randomize
train_val = list(zip(image_paths, mask_paths))
random.seed(config["seed"])
random.shuffle(train_val)
image_paths, mask_paths = zip(*train_val)
split = int(0.8 * len(image_paths))
train_imgs = list(image_paths[:split])
train_masks = list(mask_paths[:split])
val_imgs = list(image_paths[split:])
val_masks = list(mask_paths[split:])
split_data = {
"train_images": train_imgs,
"train_masks": train_masks,
"val_images": val_imgs,
"val_masks": val_masks
}
with open(split_path, "w") as f:
json.dump(split_data, f, indent=2)
common_norm = A.Normalize(mean=(0.5,), std=(0.5,))
train_tf = A.Compose([
A.Resize(config["img_size"], config["img_size"]), A.HorizontalFlip(0.5), A.VerticalFlip(0.5), A.RandomRotate90(0.5),
A.Affine(scale=(0.9, 1.1), translate_percent=0.05, rotate=(-30, 30), shear=(-5, 5), p=0.5),
A.RandomBrightnessContrast(0.3), A.ElasticTransform(alpha=1.0, sigma=50.0, approximate=True, p=0.2),
A.Blur(3, p=0.2), common_norm, ToTensorV2()
])
val_tf = A.Compose([A.Resize(config["img_size"], config["img_size"]), common_norm, ToTensorV2()])
train_loader = DataLoader(FibrilSegmentationDataset(train_imgs, train_masks, train_tf),
batch_size=config["batch_size"], shuffle=True, num_workers=config["num_workers"])
val_loader = DataLoader(FibrilSegmentationDataset(val_imgs, val_masks, val_tf),
batch_size=1, shuffle=False, num_workers=config["num_workers"])
print(f"Train samples: {len(train_imgs)} | Batch size: {config['batch_size']}")
print(f"Steps/epoch: {int(np.ceil(len(train_imgs) / config['batch_size']))}")
# ─── Model Setup ──────────────────────────────────────────
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device = torch.device("cpu")
# model = smp.Unet(
# encoder_name="resnet34",
# encoder_weights="imagenet",
# in_channels=1, # grayscale
# classes=1 # binary segmentation
# ).to(device)
# model = smp.Unet(
# encoder_name="efficientnet-b3",
# encoder_weights="imagenet",
# in_channels=1,
# classes=1
# ).to(device)
# model = smp.DeepLabV3Plus(
# encoder_name='efficientnet-b3',
# encoder_depth=5,
# encoder_weights='imagenet',
# decoder_use_norm='batchnorm',
# decoder_channels=(256, 128, 64, 32, 16),
# decoder_attention_type=None,
# decoder_interpolation='nearest',
# in_channels=1,
# classes=1,
# activation=None,
# aux_params=None
# ).to(device)
# model = smp.Unet(
# encoder_name="mobilenet_v2", # much lighter than resnet34
# encoder_weights="imagenet",
# in_channels=1, # grayscale input
# classes=1 # binary mask
# ).to(device)
# model = smp.UnetPlusPlus(
# encoder_name='resnet34',
# encoder_depth=5,
# encoder_weights='imagenet',
# decoder_use_norm='batchnorm',
# decoder_channels=(256, 128, 64, 32, 16),
# decoder_attention_type=None,
# decoder_interpolation='nearest',
# in_channels=1,
# classes=1,
# activation=None,
# aux_params=None
# ).to(device)
model = smp.UnetPlusPlus(
encoder_name='resnest101e',
encoder_depth=5,
encoder_weights='imagenet',
decoder_use_norm='batchnorm',
decoder_channels=(256, 128, 64, 32, 16),
decoder_attention_type=None,
decoder_interpolation='nearest',
in_channels=1,
classes=1,
activation=None,
aux_params=None
).to(device)
# model = smp.UnetPlusPlus(
# encoder_name='efficientnet-b3', # Lightweight, solid performance
# encoder_depth=5, # Standard depth
# encoder_weights='imagenet', # Useful even for grayscale (see note below)
# decoder_use_norm='batchnorm', # Recommended for stability
# decoder_channels=(256, 128, 64, 32, 16), # Deep decoder, good for details
# decoder_attention_type=None, # Optional, can add SE or SCSE for boost
# decoder_interpolation='nearest', # Good, avoids checkerboard artifacts
# in_channels=1, # Correct for grayscale (e.g., EM images)
# classes=1, # Binary segmentation (fibrils vs background)
# activation=None, # No activation for logits output
# aux_params=None # No classification head
# ).to(device)
loss_fn = DiceBCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=config["lr"])
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5)
# ─── Training Loop ─────────────────────────────────────────
best_dice = 0.0
os.makedirs(os.path.dirname(config["save_path"]), exist_ok=True)
for epoch in range(1, config["epochs"] + 1):
model.train()
total_loss, total_dice = 0, 0
for imgs, masks in tqdm(train_loader, desc=f"Epoch {epoch} - Train"):
imgs, masks = imgs.to(device), masks.to(device)
preds = model(imgs)
loss = loss_fn(preds, masks)
optimizer.zero_grad()
loss.backward()
nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
total_loss += loss.item()
total_dice += dice_coeff(preds, masks).item()
avg_loss = total_loss / len(train_loader)
avg_dice = total_dice / len(train_loader)
print(f"[Train] Epoch {epoch} | Loss: {avg_loss:.4f} | Dice: {avg_dice:.4f}")
# ─── Validation ────────────────────────────────────────
model.eval()
val_loss, val_dice, val_iou = 0, 0, 0
with torch.no_grad():
for imgs, masks in val_loader:
imgs, masks = imgs.to(device), masks.to(device)
preds = model(imgs)
val_loss += loss_fn(preds, masks).item()
val_dice += dice_coeff(preds, masks).item()
val_iou += iou_score(preds, masks).item()
val_loss /= len(val_loader)
val_dice /= len(val_loader)
val_iou /= len(val_loader)
scheduler.step(val_loss)
print(f"[Val] Epoch {epoch} | Loss: {val_loss:.4f} | Dice: {val_dice:.4f} | IoU: {val_iou:.4f}")
if val_dice > best_dice:
best_dice = val_dice
torch.save(model.state_dict(), config["save_path"])
print(f"✅ Saved Best Model (Epoch {epoch} - Dice: {val_dice:.4f})")
# import os
# import random
# import subprocess
# from glob import glob
# import numpy as np
# from PIL import Image
# from tqdm import tqdm
# import torch
# import torch.nn as nn
# from torch.utils.data import Dataset, DataLoader
# from torch.cuda.amp import autocast, GradScaler
# import albumentations as A
# from albumentations.pytorch import ToTensorV2
# import segmentation_models_pytorch as smp
# # ─── Select Free GPU ──────────────────────────────────────
# def get_free_gpu(threshold_mb=500):
# try:
# result = subprocess.run(
# ["nvidia-smi", "--query-gpu=memory.used,memory.total", "--format=csv,nounits,noheader"],
# stdout=subprocess.PIPE, text=True
# )
# for idx, line in enumerate(result.stdout.strip().split("\n")):
# used, total = map(int, line.strip().split(","))
# if total - used > threshold_mb:
# return str(idx)
# except Exception as e:
# print("GPU check failed:", e)
# return None
# free_gpu = get_free_gpu()
# if free_gpu is not None:
# os.environ["CUDA_VISIBLE_DEVICES"] = free_gpu
# print(f"Using GPU {free_gpu}")
# else:
# print("No free GPU found — training may fail due to lack of memory")
# # ─── Seed Everything ──────────────────────────────────────
# def seed_everything(seed=42):
# random.seed(seed)
# np.random.seed(seed)
# torch.manual_seed(seed)
# torch.cuda.manual_seed_all(seed)
# torch.backends.cudnn.deterministic = True
# torch.backends.cudnn.benchmark = False
# seed_everything()
# # ─── Dataset ──────────────────────────────────────────────
# class FibrilSegmentationDataset(Dataset):
# def __init__(self, image_paths, mask_paths, transform=None):
# self.image_paths = image_paths
# self.mask_paths = mask_paths
# self.transform = transform
# def __len__(self):
# return len(self.image_paths)
# def __getitem__(self, idx):
# image = Image.open(self.image_paths[idx]).convert("L")
# mask = Image.open(self.mask_paths[idx]).convert("L")
# image = np.array(image)
# mask = (np.array(mask) > 127).astype(np.float32)
# if self.transform:
# augmented = self.transform(image=image, mask=mask)
# image = augmented['image']
# mask = augmented['mask']
# return image, mask.unsqueeze(0) # [1, H, W]
# # ─── Match Image-Mask ─────────────────────────────────────
# def match_images_and_masks(img_dir, mask_dir, img_exts=("jpg", "jpeg", "png"), mask_exts=("jpg", "png")):
# image_paths, mask_paths = [], []
# for ext in img_exts:
# for img_path in glob(f"{img_dir}/*.{ext}"):
# base_name = os.path.splitext(os.path.basename(img_path))[0]
# for mask_ext in mask_exts:
# possible_mask = os.path.join(mask_dir, f"{base_name}-vectors.{mask_ext}")
# if os.path.exists(possible_mask):
# image_paths.append(img_path)
# mask_paths.append(possible_mask)
# break
# return image_paths, mask_paths
# # ─── Loss Function ────────────────────────────────────────
# class DiceBCELoss(nn.Module):
# def __init__(self):
# super().__init__()
# self.bce = nn.BCEWithLogitsLoss()
# def forward(self, inputs, targets):
# smooth = 1e-6
# inputs = torch.sigmoid(inputs)
# intersection = (inputs * targets).sum()
# dice = (2.*intersection + smooth)/(inputs.sum() + targets.sum() + smooth)
# return 1 - dice + self.bce(inputs, targets)
# # ─── Data ─────────────────────────────────────────────────
# image_paths, mask_paths = match_images_and_masks("./dataset4/images", "./dataset4/masks")
# split = int(0.8 * len(image_paths))
# train_imgs, val_imgs = image_paths[:split], image_paths[split:]
# train_masks, val_masks = mask_paths[:split], mask_paths[split:]
# common_normalization = A.Normalize(mean=(0.5,), std=(0.5,))
# train_transform = A.Compose([
# A.Resize(512, 512),
# A.HorizontalFlip(p=0.5),
# A.VerticalFlip(p=0.5),
# A.RandomRotate90(p=0.5),
# A.Affine(scale=(0.9, 1.1), translate_percent=(0.05, 0.05), rotate=(-30, 30), shear=(-5, 5), p=0.5),
# A.RandomBrightnessContrast(p=0.3),
# A.ElasticTransform(alpha=1.0, sigma=50.0, approximate=True, p=0.2),
# A.Blur(blur_limit=3, p=0.2),
# common_normalization,
# ToTensorV2()
# ])
# val_transform = A.Compose([
# A.Resize(512, 512),
# common_normalization,
# ToTensorV2()
# ])
# train_ds = FibrilSegmentationDataset(train_imgs, train_masks, train_transform)
# val_ds = FibrilSegmentationDataset(val_imgs, val_masks, val_transform)
# train_loader = DataLoader(train_ds, batch_size=2, shuffle=True, num_workers=4)
# val_loader = DataLoader(val_ds, batch_size=1, shuffle=False, num_workers=4)
# # ─── Model ────────────────────────────────────────────────
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model = smp.DeepLabV3Plus(
# encoder_name="efficientnet-b3",
# encoder_weights="imagenet",
# in_channels=1,
# classes=1
# ).to(device)
# loss_fn = DiceBCELoss()
# optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
# scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5)
# scaler = GradScaler()
# # ─── Metrics ───────────────────────────────────────────────
# def dice_coeff(pred, target, smooth=1e-6):
# pred = torch.sigmoid(pred)
# pred = (pred > 0.5).float()
# intersection = (pred * target).sum()
# return (2. * intersection + smooth) / (pred.sum() + target.sum() + smooth)
# def iou_score(pred, target, smooth=1e-6):
# pred = torch.sigmoid(pred)
# pred = (pred > 0.5).float()
# intersection = (pred * target).sum()
# union = pred.sum() + target.sum() - intersection
# return (intersection + smooth) / (union + smooth)
# # ─── Training ──────────────────────────────────────────────
# best_dice = 0.0
# os.makedirs("./trained-models", exist_ok=True)
# for epoch in range(1, 101):
# model.train()
# total_loss, total_dice = 0, 0
# for imgs, masks in tqdm(train_loader, desc=f"Epoch {epoch} - Train"):
# imgs, masks = imgs.to(device), masks.to(device)
# optimizer.zero_grad()
# with autocast():
# preds = model(imgs)
# loss = loss_fn(preds, masks)
# scaler.scale(loss).backward()
# nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# scaler.step(optimizer)
# scaler.update()
# total_loss += loss.item()
# total_dice += dice_coeff(preds, masks).item()
# avg_loss = total_loss / len(train_loader)
# avg_dice = total_dice / len(train_loader)
# print(f"[Train] Epoch {epoch} | Loss: {avg_loss:.4f} | Dice: {avg_dice:.4f}")
# model.eval()
# val_loss, val_dice, val_iou = 0, 0, 0
# with torch.no_grad():
# for imgs, masks in val_loader:
# imgs, masks = imgs.to(device), masks.to(device)
# preds = model(imgs)
# val_loss += loss_fn(preds, masks).item()
# val_dice += dice_coeff(preds, masks).item()
# val_iou += iou_score(preds, masks).item()
# val_loss /= len(val_loader)
# val_dice /= len(val_loader)
# val_iou /= len(val_loader)
# scheduler.step(val_loss)
# print(f"[Val] Epoch {epoch} | Loss: {val_loss:.4f} | Dice: {val_dice:.4f} | IoU: {val_iou:.4f}")
# if val_dice > best_dice:
# best_dice = val_dice
# torch.save(model.state_dict(), f"./trained-models/fibril_epoch{epoch}_dice{val_dice:.4f}.pth")
# print(f"✅ Saved Best Model (Epoch {epoch} - Dice: {val_dice:.4f})")
# # # =============== Working fine with Gary images (UNet model with ResNet34 as the encoder ===================
# # # =============== Encoder (ResNet34) and Decoder (UNet)==============
# import os
# import random
# from glob import glob
# import numpy as np
# from PIL import Image
# from tqdm import tqdm
# from itertools import chain
# import torch
# import torch.nn as nn
# from torch.utils.data import Dataset, DataLoader
# import albumentations as A
# from albumentations.pytorch import ToTensorV2
# import segmentation_models_pytorch as smp
# import subprocess
# import os
# # Force GPU selection if available
# # import os
# # os.environ["CUDA_VISIBLE_DEVICES"] = "3" # Change '3' to any free GPU ID
# def get_free_gpu(threshold_mb=500):
# try:
# result = subprocess.run(
# ["nvidia-smi", "--query-gpu=memory.used,memory.total", "--format=csv,nounits,noheader"],
# stdout=subprocess.PIPE, text=True
# )
# for idx, line in enumerate(result.stdout.strip().split("\n")):
# used, total = map(int, line.strip().split(","))
# if total - used > threshold_mb:
# return str(idx)
# except Exception as e:
# print("GPU check failed:", e)
# return None
# # free_gpu = get_free_gpu()
# free_gpu = "5"
# if free_gpu is not None:
# os.environ["CUDA_VISIBLE_DEVICES"] = free_gpu
# print(f"Using GPU {free_gpu}")
# else:
# print("No free GPU found — training may fail due to lack of memory")
# # ─── Seed for Reproducibility ─────────────────────────────
# def seed_everything(seed=42):
# random.seed(seed)
# np.random.seed(seed)
# torch.manual_seed(seed)
# torch.cuda.manual_seed_all(seed)
# torch.backends.cudnn.deterministic = True
# torch.backends.cudnn.benchmark = False
# seed_everything()
# # ─── Dataset ──────────────────────────────────────────────
# class FibrilSegmentationDataset(Dataset):
# def __init__(self, image_paths, mask_paths, transform=None):
# self.image_paths = image_paths
# self.mask_paths = mask_paths
# self.transform = transform
# def __len__(self):
# return len(self.image_paths)
# def __getitem__(self, idx):
# image = Image.open(self.image_paths[idx]).convert("L")
# mask = Image.open(self.mask_paths[idx]).convert("L")
# image = np.array(image)
# mask = (np.array(mask) > 127).astype(np.float32)
# if self.transform:
# augmented = self.transform(image=image, mask=mask)
# image = augmented['image']
# mask = augmented['mask']
# return image, mask.unsqueeze(0) # [1, H, W]
# # ─── Utility to Match Image-Mask Pairs ─────────────────────
# def match_images_and_masks(img_dir, mask_dir, img_exts=("jpg", "jpeg", "png"), mask_exts=("jpg", "png")):
# image_paths, mask_paths = [], []
# for ext in img_exts:
# for img_path in glob(f"{img_dir}/*.{ext}"):
# base_name = os.path.splitext(os.path.basename(img_path))[0]
# for mask_ext in mask_exts:
# # possible_mask = os.path.join(mask_dir, f"{base_name}_mask.{mask_ext}")
# possible_mask = os.path.join(mask_dir, f"{base_name}-vectors.{mask_ext}")
# if os.path.exists(possible_mask):
# image_paths.append(img_path)
# mask_paths.append(possible_mask)
# break # Stop after first match
# return image_paths, mask_paths
# class DiceBCELoss(nn.Module):
# def __init__(self):
# super().__init__()
# self.bce = nn.BCEWithLogitsLoss()
# def forward(self, inputs, targets):
# smooth = 1e-6
# inputs = torch.sigmoid(inputs)
# intersection = (inputs * targets).sum()
# dice = (2.*intersection + smooth)/(inputs.sum() + targets.sum() + smooth)
# return 1 - dice + self.bce(inputs, targets)
# # ─── Load Dataset ──────────────────────────────────────────
# image_paths, mask_paths = match_images_and_masks("./dataset4/images", "./dataset4/masks")
# split = int(0.8 * len(image_paths))
# train_imgs, val_imgs = image_paths[:split], image_paths[split:]
# train_masks, val_masks = mask_paths[:split], mask_paths[split:]
# # ─── Transformations ──────────────────────────────────────
# common_normalization = A.Normalize(mean=(0.5,), std=(0.5,))
# train_transform = A.Compose([
# A.Resize(512, 512),
# A.HorizontalFlip(p=0.5),
# A.VerticalFlip(p=0.5),
# A.RandomRotate90(p=0.5),
# A.Affine(scale=(0.9, 1.1), translate_percent=(0.05, 0.05), rotate=(-30, 30), shear=(-5, 5), p=0.5),
# A.RandomBrightnessContrast(p=0.3),
# A.ElasticTransform(alpha=1.0, sigma=50.0, approximate=True, p=0.2),
# A.Blur(blur_limit=3, p=0.2),
# common_normalization,
# ToTensorV2()
# ])
# val_transform = A.Compose([
# A.Resize(512, 512),
# common_normalization,
# ToTensorV2()
# ])
# # ─── Datasets & Loaders ───────────────────────────────────
# train_ds = FibrilSegmentationDataset(train_imgs, train_masks, train_transform)
# val_ds = FibrilSegmentationDataset(val_imgs, val_masks, val_transform)
# # train_loader = DataLoader(train_ds, batch_size=8, shuffle=True, num_workers=4)
# # train_loader = DataLoader(train_ds, batch_size=4, shuffle=True, num_workers=4)
# # For training (20 samples):
# train_loader = DataLoader(train_ds, batch_size=2, shuffle=True, num_workers=4)
# print(f"Train samples: {len(train_ds)}")
# print(f"Batch size: {train_loader.batch_size}")
# print(f"Expected steps per epoch: {int(np.ceil(len(train_ds)/train_loader.batch_size))}")
# # val_loader = DataLoader(val_ds, batch_size=8, num_workers=4)
# # For validation (5 samples):
# val_loader = DataLoader(val_ds, batch_size=1, shuffle=False, num_workers=4)
# # ─── Model Setup ──────────────────────────────────────────
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# # device = torch.device("cpu")
# # model = smp.Unet(
# # encoder_name="resnet34",
# # encoder_weights="imagenet",
# # in_channels=1, # grayscale
# # classes=1 # binary segmentation
# # ).to(device)
# # model = smp.Unet(
# # encoder_name="efficientnet-b3",
# # encoder_weights="imagenet",
# # in_channels=1,
# # classes=1
# # ).to(device)
# model = smp.DeepLabV3Plus(
# encoder_name="efficientnet-b3",
# encoder_weights="imagenet",
# in_channels=1,
# classes=1
# ).to(device)
# # model = smp.Unet(
# # encoder_name="mobilenet_v2", # much lighter than resnet34
# # encoder_weights="imagenet",
# # in_channels=1, # grayscale input
# # classes=1 # binary mask
# # ).to(device)
# # loss_fn = nn.BCEWithLogitsLoss()
# loss_fn = DiceBCELoss()
# optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
# scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5)
# # ─── Metrics ───────────────────────────────────────────────
# def dice_coeff(pred, target, smooth=1e-6):
# pred = torch.sigmoid(pred)
# pred = (pred > 0.5).float()
# intersection = (pred * target).sum()
# return (2. * intersection + smooth) / (pred.sum() + target.sum() + smooth)
# def iou_score(pred, target, smooth=1e-6):
# pred = torch.sigmoid(pred)
# pred = (pred > 0.5).float()
# intersection = (pred * target).sum()
# union = pred.sum() + target.sum() - intersection
# return (intersection + smooth) / (union + smooth)
# # ─── Training Loop ─────────────────────────────────────────
# best_dice = 0.0
# os.makedirs("./trained-models", exist_ok=True)
# for epoch in range(1, 101):
# model.train()
# total_loss, total_dice = 0, 0
# for imgs, masks in tqdm(train_loader, desc=f"Epoch {epoch} - Train"):
# imgs, masks = imgs.to(device), masks.to(device)
# preds = model(imgs)
# loss = loss_fn(preds, masks)
# optimizer.zero_grad()
# loss.backward()
# nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# optimizer.step()
# total_loss += loss.item()
# total_dice += dice_coeff(preds, masks).item()
# avg_loss = total_loss / len(train_loader)
# avg_dice = total_dice / len(train_loader)
# print(f"[Train] Epoch {epoch} | Loss: {avg_loss:.4f} | Dice: {avg_dice:.4f}")
# # Validation
# model.eval()
# val_loss, val_dice, val_iou = 0, 0, 0
# with torch.no_grad():
# for imgs, masks in val_loader:
# imgs, masks = imgs.to(device), masks.to(device)
# preds = model(imgs)
# val_loss += loss_fn(preds, masks).item()
# val_dice += dice_coeff(preds, masks).item()
# val_iou += iou_score(preds, masks).item()
# val_loss /= len(val_loader)
# val_dice /= len(val_loader)
# val_iou /= len(val_loader)
# scheduler.step(val_loss)
# print(f"[Val] Epoch {epoch} | Loss: {val_loss:.4f} | Dice: {val_dice:.4f} | IoU: {val_iou:.4f}")
# # Save best model
# if val_dice > best_dice:
# best_dice = val_dice
# torch.save(model.state_dict(), "./trained-models/amalesh_encoder_efficientnet-b3_decoder_DeepLabV3Plus_fibril_seg_model.pth")
# print(f"✅ Saved Best Model (Epoch {epoch} - Dice: {val_dice:.4f})")
# # Working on the gray images fine
# # =============== Working fine with Gary images (UNet model with ResNet34 as the encoder ===================
# # =============== Encoder (ResNet34) and Decoder (UNet)==============
# import os
# from glob import glob
# import numpy as np
# from PIL import Image
# from tqdm import tqdm
# import torch
# import torch.nn as nn
# from torch.utils.data import Dataset, DataLoader
# import albumentations as A
# from albumentations.pytorch import ToTensorV2
# import segmentation_models_pytorch as smp
# # ─── Dataset ────────────────────────────
# class FibrilSegmentationDataset(Dataset):
# def __init__(self, image_paths, mask_paths, transform=None):
# self.image_paths = image_paths
# self.mask_paths = mask_paths
# self.transform = transform
# def __len__(self):
# return len(self.image_paths)
# def __getitem__(self, idx):
# # Load grayscale image and mask
# image = Image.open(self.image_paths[idx]).convert("L")
# mask = Image.open(self.mask_paths[idx]).convert("L")
# image = image.resize((512, 512))
# mask = mask.resize((512, 512))
# image = np.array(image)
# mask = np.array(mask)
# # Binarize mask
# mask = (mask > 127).astype(np.float32)
# if self.transform:
# augmented = self.transform(image=image, mask=mask)
# image = augmented["image"]
# mask = augmented["mask"]
# # image shape: [1, H, W], mask shape: [H, W]
# return image, mask.unsqueeze(0)
# # ─── Paths ─────────────────────────────
# image_paths = sorted(glob("./dataset/images/*.jpg"))
# mask_paths = sorted(glob("./dataset/masks/*.jpg"))
# split = int(0.8 * len(image_paths))
# train_imgs, val_imgs = image_paths[:split], image_paths[split:]
# train_masks, val_masks = mask_paths[:split], mask_paths[split:]
# # ─── Augmentations ─────────────────────
# train_transform = A.Compose([
# A.Resize(512, 512),
# A.HorizontalFlip(p=0.5),
# A.VerticalFlip(p=0.5),
# A.RandomRotate90(p=0.5),
# A.Affine(
# scale=(0.9, 1.1),
# translate_percent=(0.05, 0.05),
# rotate=(-30, 30),
# shear=(-5, 5),
# p=0.5
# ),
# A.RandomBrightnessContrast(
# brightness_limit=0.2,
# contrast_limit=0.2,
# p=0.3
# ),
# A.ElasticTransform(
# alpha=1.0,
# sigma=50.0,
# approximate=True,
# p=0.2
# ),
# A.Blur(blur_limit=3, p=0.2),
# A.Normalize(mean=(0.5,), std=(0.5,)),
# ToTensorV2()
# ])
# val_transform = A.Compose([
# A.Resize(512, 512),
# A.Normalize(mean=(0.5,), std=(0.5,)),
# ToTensorV2()
# ])
# train_ds = FibrilSegmentationDataset(train_imgs, train_masks, transform=train_transform)
# val_ds = FibrilSegmentationDataset(val_imgs, val_masks, transform=val_transform)
# train_loader = DataLoader(train_ds, batch_size=4, shuffle=True, num_workers=4)
# val_loader = DataLoader(val_ds, batch_size=4, num_workers=4)
# # ─── Model ───────────────────────────────
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model = smp.Unet(
# encoder_name="resnet34",
# encoder_weights="imagenet",
# in_channels=1, # grayscale input
# classes=1 # binary segmentation
# ).to(device)
# loss_fn = nn.()
# optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
# # ─── Metrics ─────────────────────────────
# def dice_coeff(pred, target, smooth=1e-6):
# pred = torch.sigmoid(pred)
# pred = (pred > 0.5).float()
# intersection = (pred * target).sum()
# return (2. * intersection + smooth) / (pred.sum() + target.sum() + smooth)
# # ─── Train Loop ──────────────────────────
# for epoch in range(1, 100):
# model.train()
# total_loss = 0
# total_dice = 0
# for imgs, masks in tqdm(train_loader, desc=f"Epoch {epoch} - Train"):
# imgs, masks = imgs.to(device), masks.to(device)
# preds = model(imgs)
# loss = loss_fn(preds, masks)
# optimizer.zero_grad()
# loss.backward()
# optimizer.step()
# total_loss += loss.item()
# total_dice += dice_coeff(preds, masks).item()
# avg_loss = total_loss / len(train_loader)
# avg_dice = total_dice / len(train_loader)
# print(f"Epoch {epoch} - Train Loss: {avg_loss:.4f}, Dice: {avg_dice:.4f}")
# # Validation
# model.eval()
# val_loss = 0
# val_dice = 0
# with torch.no_grad():
# for imgs, masks in val_loader:
# imgs, masks = imgs.to(device), masks.to(device)
# preds = model(imgs)
# loss = loss_fn(preds, masks)
# val_loss += loss.item()
# val_dice += dice_coeff(preds, masks).item()
# val_loss /= len(val_loader)
# val_dice /= len(val_loader)
# print(f"Epoch {epoch} - Val Loss: {val_loss:.4f}, Val Dice: {val_dice:.4f}")
# torch.save(model.state_dict(), "./trained-models/fibril_seg_model.pth")
# print("✅ Model saved as fibril_seg_model.pth")