File size: 5,856 Bytes
88b9d97 d36fd45 88b9d97 d36fd45 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
"""
Inference module for counting wheat heads in field images using a DeepLabV3+ semantic
segmentation model trained on the GWFSS dataset.
The model performs multi-class segmentation (Background, Leaf, Stem, Head) to accurately
distinguish wheat heads from other plant organs, then uses connected component analysis
to count individual heads.
"""
import torch
import torchvision.transforms as transforms
from PIL import Image
import numpy as np
import segmentation_models_pytorch as smp
from scipy import ndimage
from skimage.feature import peak_local_max
# ImageNet normalisation constants
IMAGENET_MEAN = [0.485, 0.456, 0.406]
IMAGENET_STD = [0.229, 0.224, 0.225]
# Mask colours for visualization
MASK_COLORS = [
(0, 0, 0), # Background: black
(214, 255, 50), # Leaf: yellow-green
(50, 132, 255), # Stem: blue
(50, 255, 132), # Head: cyan-green
]
class GWFSSModel:
def __init__(self, model_path, device=None):
if device is None:
if torch.cuda.is_available():
self.device = torch.device("cuda")
elif torch.backends.mps.is_available():
self.device = torch.device("mps")
else:
self.device = torch.device("cpu")
else:
self.device = device
# Load model architecture
self.model = smp.DeepLabV3Plus(
encoder_name="resnet50",
encoder_weights=None,
in_channels=3,
classes=4,
)
# Load trained weights
checkpoint = torch.load(model_path, map_location=self.device, weights_only=False)
self.model.load_state_dict(checkpoint['model_state_dict'])
self.model = self.model.to(self.device)
self.model.eval()
# Image preprocessing
self.transform = transforms.Compose([
transforms.Resize((512, 512), interpolation=transforms.InterpolationMode.BILINEAR),
transforms.ToTensor(),
transforms.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD)
])
def preprocess_image(self, image):
if isinstance(image, np.ndarray):
image = Image.fromarray(image)
if image.mode != 'RGB':
image = image.convert('RGB')
image_tensor = self.transform(image).unsqueeze(0)
return image_tensor.to(self.device)
def predict(self, image):
if isinstance(image, str):
image = Image.open(image)
image_tensor = self.preprocess_image(image)
with torch.no_grad():
logits = self.model(image_tensor)
predictions = torch.argmax(logits, dim=1).squeeze(0).cpu().numpy()
return predictions
def count_heads(self, predictions, min_distance=15):
head_mask = (predictions == 3).astype(np.uint8)
if head_mask.sum() == 0:
return 0
# Compute distance transform
distance = ndimage.distance_transform_edt(head_mask)
# Find local peaks (head centers)
coords = peak_local_max(distance, min_distance=min_distance, labels=head_mask)
# Count the peaks
num_heads = len(coords)
return num_heads
def create_colored_mask(self, predictions):
h, w = predictions.shape
mask_rgb = np.zeros((h, w, 3), dtype=np.uint8)
for class_id, color in enumerate(MASK_COLORS):
mask_rgb[predictions == class_id] = color
return Image.fromarray(mask_rgb)
def overlay_mask(self, image, predictions, alpha=0.5, heads_only=True):
if isinstance(image, np.ndarray):
image = Image.fromarray(image)
if image.size != (512, 512):
image = image.resize((512, 512), Image.Resampling.BILINEAR)
# Create mask
h, w = predictions.shape
mask_rgb = np.zeros((h, w, 3), dtype=np.uint8)
if heads_only:
# Only highlight heads
mask_rgb[predictions == 3] = (50, 255, 132)
else:
# Show all classes
for class_id, color in enumerate(MASK_COLORS):
mask_rgb[predictions == class_id] = color
mask_img = Image.fromarray(mask_rgb)
overlay = Image.blend(image.convert('RGB'), mask_img, alpha)
return overlay
def predict_and_overlay(self, image, alpha=0.5, heads_only=True):
predictions = self.predict(image)
overlay = self.overlay_mask(image, predictions, alpha=alpha, heads_only=heads_only)
return overlay
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python inference.py <image_path> [model_path]")
sys.exit(1)
image_path = sys.argv[1]
model_path = sys.argv[2] if len(sys.argv) > 2 else "cache/02_dice_stem.pth"
print(f"Loading model from {model_path}...")
model = GWFSSModel(model_path)
print(f"Processing image: {image_path}")
image = Image.open(image_path)
predictions = model.predict(image)
# Count heads
num_heads = model.count_heads(predictions)
print(f"\n🌾 {num_heads} heads detected")
# Create visualisations
print("\nGenerating visualisations...")
overlay_heads = model.overlay_mask(image, predictions, alpha=0.5, heads_only=True)
overlay_all = model.overlay_mask(image, predictions, alpha=0.5, heads_only=False)
# Save outputs
output_heads = image_path.rsplit('.', 1)[0] + '_heads_only.png'
output_all = image_path.rsplit('.', 1)[0] + '_all_classes.png'
overlay_heads.save(output_heads)
overlay_all.save(output_all)
print(f"✓ Saved head overlay to: {output_heads}")
print(f"✓ Saved full segmentation to: {output_all}")
|