| import torch | |
| import math | |
| from torchvision.datasets import ImageFolder | |
| from torch.utils.data import DataLoader | |
| from transformers import ViTFeatureExtractor, ViTForImageClassification | |
| import kagglehub | |
| from torch.optim import AdamW | |
| from transformers import get_scheduler | |
| from tqdm.auto import tqdm | |
| gpu_available = torch.cuda.is_available() | |
| print("GPU available:", gpu_available) | |
| if gpu_available: | |
| print("GPU:", torch.cuda.get_device_name(0)) | |
| print("GPU count:", torch.cuda.device_count()) | |
| print("#memory avail:", torch.cuda.get_device_properties(0).total_memory / (1024 ** 3), "GB") | |
| else: | |
| print("No GPU available.") | |
| kaggle_path = kagglehub.dataset_download("mostafaabla/garbage-classification") | |
| folder_root_path = kaggle_path+'/garbage_classification' | |
| print("Path to dataset files:", kaggle_path) | |
| ds = ImageFolder(folder_root_path) | |
| indices = torch.randperm(len(ds)).tolist() | |
| n_val = math.floor(len(indices) * .20) | |
| train_ds = torch.utils.data.Subset(ds, indices[:-n_val]) | |
| val_ds = torch.utils.data.Subset(ds, indices[-n_val:]) | |
| print(ds.classes) | |
| label2id = {} | |
| id2label = {} | |
| for i, class_name in enumerate(ds.classes): | |
| label2id[class_name] = str(i) | |
| id2label[str(i)] = class_name | |
| class ImageClassificationCollator: | |
| def __init__(self, feature_extractor): | |
| self.feature_extractor = feature_extractor | |
| def __call__(self, batch): | |
| encodings = self.feature_extractor([x[0] for x in batch], return_tensors='pt') | |
| encodings['labels'] = torch.tensor([x[1] for x in batch], dtype=torch.long) | |
| return encodings | |
| feature_extractor = ViTFeatureExtractor.from_pretrained('google/vit-base-patch16-224-in21k') | |
| model = ViTForImageClassification.from_pretrained( | |
| 'google/vit-base-patch16-224-in21k', | |
| num_labels=len(label2id), | |
| label2id=label2id, | |
| id2label=id2label | |
| ) | |
| collator = ImageClassificationCollator(feature_extractor) | |
| train_loader = DataLoader(train_ds, batch_size=16, collate_fn=collator, num_workers=2, shuffle=True) | |
| val_loader = DataLoader(val_ds, batch_size=16, collate_fn=collator, num_workers=2) | |
| optimizer = AdamW(model.parameters(), lr=5e-5) | |
| num_epochs = 10 | |
| num_training_steps = num_epochs * len(train_loader) | |
| lr_scheduler = get_scheduler( | |
| name="linear", optimizer=optimizer, num_warmup_steps=0, num_training_steps=num_training_steps | |
| ) | |
| device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") | |
| model.to(device) | |
| progress_bar = tqdm(range(num_training_steps)) | |
| model.train() | |
| for epoch in range(num_epochs): | |
| for batch in train_loader: | |
| batch = {k: v.to(device) for k, v in batch.items()} | |
| outputs = model(**batch) | |
| loss = outputs.loss | |
| loss.backward() | |
| optimizer.step() | |
| lr_scheduler.step() | |
| optimizer.zero_grad() | |
| progress_bar.update(1) | |
| model.eval() | |
| for batch in val_loader: | |
| batch = {k: v.to(device) for k, v in batch.items()} | |
| with torch.no_grad(): | |
| outputs = model(**batch) | |
| logits = outputs.logits | |
| predictions = torch.argmax(logits, dim=-1) | |
| import os | |
| save_directory = "./saved_model" | |
| if not os.path.exists(save_directory): | |
| os.makedirs(save_directory) | |
| model.save_pretrained(save_directory) | |
| feature_extractor.save_pretrained(save_directory) | |
| print(f"Model saved: {save_directory}") | |