zeroFire / utils /data_utils.py
Neylton's picture
Rename data_utils.py to utils/data_utils.py
18c91e7 verified
"""
Data utilities for fire detection classification
Handles data loading, transformations, and dataset management
"""
import os
import torch
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from torchvision import transforms, datasets
from PIL import Image
import numpy as np
from typing import Tuple, Dict, List, Optional
from collections import Counter
import random
class FireDetectionDataset(Dataset):
"""
Custom dataset for fire detection images
Supports both training and validation modes with appropriate transforms
"""
def __init__(self, data_dir: str, split: str = 'train', image_size: int = 224):
"""
Initialize fire detection dataset
Args:
data_dir: Root directory containing train/val folders
split: 'train' or 'val'
image_size: Size to resize images to
"""
self.data_dir = data_dir
self.split = split
self.image_size = image_size
# Define class mapping
self.classes = ['fire', 'no_fire'] # 0: fire, 1: no_fire
self.class_to_idx = {cls: idx for idx, cls in enumerate(self.classes)}
# Load image paths and labels
self.samples = self._load_samples()
# Define transforms
self.transform = self._get_transforms()
print(f"🔥 {split.upper()} Dataset loaded:")
print(f" Total samples: {len(self.samples)}")
print(f" Classes: {self.classes}")
self._print_class_distribution()
def _load_samples(self) -> List[Tuple[str, int]]:
"""Load image paths and corresponding labels"""
samples = []
split_dir = os.path.join(self.data_dir, self.split)
for class_name in self.classes:
class_dir = os.path.join(split_dir, class_name)
if not os.path.exists(class_dir):
print(f"⚠️ Warning: {class_dir} not found")
continue
class_idx = self.class_to_idx[class_name]
# Load all images from class directory and subdirectories
for root, dirs, files in os.walk(class_dir):
for img_name in files:
if img_name.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')):
img_path = os.path.join(root, img_name)
samples.append((img_path, class_idx))
return samples
def _print_class_distribution(self):
"""Print class distribution for the dataset"""
class_counts = Counter([label for _, label in self.samples])
for class_name, class_idx in self.class_to_idx.items():
count = class_counts.get(class_idx, 0)
print(f" {class_name}: {count} samples")
def _get_transforms(self) -> transforms.Compose:
"""Get appropriate transforms for the split"""
if self.split == 'train':
return transforms.Compose([
transforms.Resize((self.image_size + 32, self.image_size + 32)),
transforms.RandomResizedCrop(self.image_size, scale=(0.8, 1.0)),
transforms.RandomHorizontalFlip(p=0.5),
transforms.RandomRotation(degrees=10),
transforms.ColorJitter(
brightness=0.2,
contrast=0.2,
saturation=0.2,
hue=0.1
),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
),
transforms.RandomErasing(p=0.1, scale=(0.02, 0.08))
])
else:
return transforms.Compose([
transforms.Resize((self.image_size, self.image_size)),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
])
def __len__(self) -> int:
return len(self.samples)
def __getitem__(self, idx: int) -> Tuple[torch.Tensor, int]:
"""Get a sample from the dataset"""
img_path, label = self.samples[idx]
# Load image
try:
image = Image.open(img_path).convert('RGB')
except Exception as e:
print(f"⚠️ Error loading image {img_path}: {e}")
# Return a black image as fallback
image = Image.new('RGB', (self.image_size, self.image_size), color='black')
# Apply transforms
if self.transform:
image = self.transform(image)
return image, label
def create_data_loaders(
data_dir: str,
batch_size: int = 16,
num_workers: int = 4,
image_size: int = 224,
use_weighted_sampling: bool = True
) -> Tuple[DataLoader, DataLoader]:
"""
Create train and validation data loaders
Args:
data_dir: Root directory containing train/val folders
batch_size: Batch size for data loaders
num_workers: Number of worker processes
image_size: Size to resize images to
use_weighted_sampling: Whether to use weighted sampling for imbalanced data
Returns:
Tuple of (train_loader, val_loader)
"""
# Create datasets
train_dataset = FireDetectionDataset(data_dir, 'train', image_size)
val_dataset = FireDetectionDataset(data_dir, 'val', image_size)
# Create samplers
train_sampler = None
if use_weighted_sampling and len(train_dataset) > 0:
train_sampler = create_weighted_sampler(train_dataset)
# Create data loaders
train_loader = DataLoader(
train_dataset,
batch_size=batch_size,
sampler=train_sampler,
shuffle=(train_sampler is None),
num_workers=num_workers,
pin_memory=torch.cuda.is_available(),
drop_last=True
)
val_loader = DataLoader(
val_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=num_workers,
pin_memory=torch.cuda.is_available()
)
print(f"📦 Data loaders created:")
print(f" Batch size: {batch_size}")
print(f" Num workers: {num_workers}")
print(f" Train batches: {len(train_loader)}")
print(f" Val batches: {len(val_loader)}")
print(f" Weighted sampling: {use_weighted_sampling}")
return train_loader, val_loader
def create_weighted_sampler(dataset: FireDetectionDataset) -> WeightedRandomSampler:
"""
Create weighted random sampler for imbalanced datasets
Args:
dataset: The dataset to create sampler for
Returns:
WeightedRandomSampler for balanced sampling
"""
# Count samples per class
class_counts = Counter([label for _, label in dataset.samples])
total_samples = len(dataset.samples)
# Calculate weights (inverse frequency)
class_weights = {}
for class_idx, count in class_counts.items():
class_weights[class_idx] = total_samples / count
# Create sample weights
sample_weights = [class_weights[label] for _, label in dataset.samples]
# Create sampler
sampler = WeightedRandomSampler(
weights=sample_weights,
num_samples=total_samples,
replacement=True
)
print(f"⚖️ Weighted sampler created:")
for class_name, class_idx in dataset.class_to_idx.items():
count = class_counts.get(class_idx, 0)
weight = class_weights.get(class_idx, 0)
print(f" {class_name}: {count} samples, weight: {weight:.2f}")
return sampler
def get_inference_transform(image_size: int = 224) -> transforms.Compose:
"""
Get transforms for inference/prediction
Args:
image_size: Size to resize images to
Returns:
Transform pipeline for inference
"""
return transforms.Compose([
transforms.Resize((image_size, image_size)),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
])
def prepare_image_for_inference(image: Image.Image, transform: transforms.Compose) -> torch.Tensor:
"""
Prepare an image for inference
Args:
image: PIL Image
transform: Transform pipeline
Returns:
Tensor ready for model inference
"""
# Apply transforms
image_tensor = transform(image)
# Add batch dimension
image_tensor = image_tensor.unsqueeze(0)
return image_tensor
def visualize_batch(data_loader: DataLoader, num_samples: int = 8) -> None:
"""
Visualize a batch of images from the data loader
Args:
data_loader: DataLoader to sample from
num_samples: Number of samples to visualize
"""
import matplotlib.pyplot as plt
# Get a batch
images, labels = next(iter(data_loader))
# Denormalize images
mean = torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1)
std = torch.tensor([0.229, 0.224, 0.225]).view(3, 1, 1)
# Create figure
fig, axes = plt.subplots(2, 4, figsize=(15, 8))
axes = axes.flatten()
class_names = ['Fire', 'No Fire']
for i in range(min(num_samples, len(images))):
# Denormalize
img = images[i] * std + mean
img = torch.clamp(img, 0, 1)
# Convert to numpy
img_np = img.permute(1, 2, 0).numpy()
# Plot
axes[i].imshow(img_np)
axes[i].set_title(f'{class_names[labels[i]]}')
axes[i].axis('off')
plt.tight_layout()
plt.show()
def check_data_directory(data_dir: str) -> Dict[str, int]:
"""
Check data directory structure and count samples
Args:
data_dir: Directory to check
Returns:
Dictionary with data counts
"""
data_counts = {}
if not os.path.exists(data_dir):
print(f"❌ Data directory not found: {data_dir}")
return data_counts
print(f"📊 Data Directory Analysis: {data_dir}")
print("=" * 50)
total_samples = 0
for split in ['train', 'val']:
split_dir = os.path.join(data_dir, split)
if not os.path.exists(split_dir):
continue
print(f"\n{split.upper()} SET:")
split_total = 0
for class_name in ['fire', 'no_fire']:
class_dir = os.path.join(split_dir, class_name)
if not os.path.exists(class_dir):
continue
# Count images recursively
count = 0
for root, dirs, files in os.walk(class_dir):
for file in files:
if file.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')):
count += 1
print(f" {class_name}: {count} images")
data_counts[f"{split}_{class_name}"] = count
split_total += count
print(f" Total {split}: {split_total}")
total_samples += split_total
data_counts[f"{split}_total"] = split_total
print(f"\nOVERALL TOTAL: {total_samples} images")
data_counts['total'] = total_samples
print("=" * 50)
return data_counts
def create_sample_data_structure():
"""Create sample data structure for testing"""
print("🔥 Creating sample fire detection data structure...")
# Create directories
directories = [
'data/train/fire',
'data/train/no_fire',
'data/val/fire',
'data/val/no_fire'
]
for directory in directories:
os.makedirs(directory, exist_ok=True)
print("✅ Sample data structure created")
print(" Please add your fire detection images to the appropriate directories")
print(" - data/train/fire/ (training fire images)")
print(" - data/train/no_fire/ (training no-fire images)")
print(" - data/val/fire/ (validation fire images)")
print(" - data/val/no_fire/ (validation no-fire images)")