Image_Segmentation_ / experiments /watershed_segmenter.py
AJain1234's picture
Upload folder using huggingface_hub
0f9608b verified
import numpy as np
import cv2
import heapq
import matplotlib.pyplot as plt
from collections import deque
# 1. Compute local minima as markers
def get_local_minima(gray):
kernel = np.ones((3, 3), np.uint8)
eroded = cv2.erode(gray, kernel)
minima = (gray == eroded)
return minima.astype(np.uint8)
# 2. Label each connected component (marker)
def label_markers(minima):
num_labels, markers = cv2.connectedComponents(minima)
return markers, num_labels
# 3. Watershed from scratch
def watershed_from_scratch(gray, markers):
h, w = gray.shape
# Constants
WATERSHED = -1
INIT = -2
# Initialize label and visited map
label_map = np.full((h, w), INIT, dtype=np.int32)
label_map[markers > 0] = markers[markers > 0]
# Priority queue for pixels: (intensity, y, x)
pq = []
# Populate queue with boundary of initial markers
for y in range(h):
for x in range(w):
if markers[y, x] > 0:
for dy in [-1, 0, 1]:
for dx in [-1, 0, 1]:
ny, nx = y + dy, x + dx
if 0 <= ny < h and 0 <= nx < w:
if markers[ny, nx] == 0 and label_map[ny, nx] == INIT:
heapq.heappush(pq, (gray[ny, nx], ny, nx))
label_map[ny, nx] = 0 # Mark as in queue
# Flooding
while pq:
intensity, y, x = heapq.heappop(pq)
neighbor_labels = set()
for dy in [-1, 0, 1]:
for dx in [-1, 0, 1]:
ny, nx = y + dy, x + dx
if 0 <= ny < h and 0 <= nx < w:
lbl = label_map[ny, nx]
if lbl > 0:
neighbor_labels.add(lbl)
if len(neighbor_labels) == 1:
label_map[y, x] = neighbor_labels.pop()
elif len(neighbor_labels) > 1:
label_map[y, x] = WATERSHED
# Add unvisited neighbors to the queue
for dy in [-1, 0, 1]:
for dx in [-1, 0, 1]:
ny, nx = y + dy, x + dx
if 0 <= ny < h and 0 <= nx < w:
if label_map[ny, nx] == INIT:
heapq.heappush(pq, (gray[ny, nx], ny, nx))
label_map[ny, nx] = 0 # Mark as in queue
return label_map
import numpy as np
import cv2
import heapq
def improved_watershed(image_path):
# Load and preprocess image
original = cv2.imread(image_path)
gray = cv2.cvtColor(original, cv2.COLOR_BGR2GRAY)
blurred = cv2.GaussianBlur(gray, (9, 9), 2)
# Step 1: Better marker detection using adaptive thresholding
thresh = cv2.adaptiveThreshold(blurred, 255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY_INV, 21, 4)
# Step 2: Noise removal and sure background area
kernel = np.ones((3,3), np.uint8)
opening = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel, iterations=2)
# Step 3: Distance transform for better foreground detection
dist_transform = cv2.distanceTransform(opening, cv2.DIST_L2, 5)
_, sure_fg = cv2.threshold(dist_transform, 0.5*dist_transform.max(), 255, 0)
sure_fg = np.uint8(sure_fg)
# Step 4: Create markers using connected components
_, markers = cv2.connectedComponents(sure_fg)
markers += 1 # Add 1 to all labels so background is 1
# Step 5: Apply custom watershed algorithm
label_map = watershed_from_scratch(blurred, markers)
# Enhanced visualization
output = original.copy()
boundaries = (label_map == -1).astype(np.uint8) * 255
contours, _ = cv2.findContours(boundaries, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
cv2.drawContours(output, contours, -1, (0,0,255), 1)
# Create intermediate step visualization
process_steps = {
"Original": original,
"Blurred": cv2.cvtColor(blurred, cv2.COLOR_GRAY2BGR),
"Threshold": cv2.cvtColor(thresh, cv2.COLOR_GRAY2BGR),
"Foreground Markers": cv2.cvtColor(sure_fg, cv2.COLOR_GRAY2BGR),
"Final Segmentation": output
}
return process_steps
def watershed_from_scratch(gray, markers):
h, w = gray.shape
WATERSHED = -1
INIT = -2
label_map = np.full((h, w), INIT, dtype=np.int32)
label_map[markers > 1] = markers[markers > 1] # Skip background marker
pq = []
# Initialize queue with marker boundaries
for y in range(h):
for x in range(w):
if label_map[y, x] > 0:
for dy in [-1, 0, 1]:
for dx in [-1, 0, 1]:
ny, nx = y+dy, x+dx
if 0 <= ny < h and 0 <= nx < w:
if label_map[ny, nx] == INIT:
heapq.heappush(pq, (gray[ny, nx], ny, nx))
label_map[ny, nx] = 0 # Queued
# Improved flooding with gradient consideration
while pq:
intensity, y, x = heapq.heappop(pq)
neighbors = []
# Check 8 neighbors
for dy in [-1, 0, 1]:
for dx in [-1, 0, 1]:
if dy == 0 and dx == 0:
continue
ny, nx = y+dy, x+dx
if 0 <= ny < h and 0 <= nx < w:
neighbors.append(label_map[ny, nx])
# Find unique labels excluding watershed and background
unique = set(n for n in neighbors if n > 0)
if len(unique) == 0:
label_map[y, x] = 1 # Background
elif len(unique) == 1:
label_map[y, x] = unique.pop()
else:
label_map[y, x] = WATERSHED
# Add neighbors to queue
for dy in [-1, 0, 1]:
for dx in [-1, 0, 1]:
ny, nx = y+dy, x+dx
if 0 <= ny < h and 0 <= nx < w:
if label_map[ny, nx] == INIT:
heapq.heappush(pq, (gray[ny, nx], ny, nx))
label_map[ny, nx] = 0
return label_map
# Gradio integration would use:
def generate_watershed(image_path):
results = improved_watershed(image_path)
return (
results["Original"],
results["Blurred"],
results["Threshold"],
)
if __name__ == "__main__":
# Run the process
# Load grayscale image
image = cv2.imread("/home/akshat/projects/CSL7360_Project/bird.jpeg", cv2.IMREAD_GRAYSCALE)
image = cv2.GaussianBlur(image, (5, 5), 0)
minima = get_local_minima(image)
markers, num_labels = label_markers(minima)
result = watershed_from_scratch(image, markers)
# Visualization
output = np.zeros((image.shape[0], image.shape[1], 3), dtype=np.uint8)
output[result == -1] = [255, 0, 0] # Watershed lines in red
output[result > 0] = [0, 255, 0] # Segments in green
output[markers > 0] = [0, 0, 255] # Original minima in blue
# Save the original grayscale and the output image
cv2.imwrite("original_grayscale.png", image)
cv2.imwrite("watershed_output.png", output)
print("Images saved as 'original_grayscale.png' and 'watershed_output.png'")