Spaces:
Build error
Build error
| import numpy as np | |
| import cv2 | |
| import heapq | |
| import matplotlib.pyplot as plt | |
| from collections import deque | |
| # 1. Compute local minima as markers | |
| def get_local_minima(gray): | |
| kernel = np.ones((3, 3), np.uint8) | |
| eroded = cv2.erode(gray, kernel) | |
| minima = (gray == eroded) | |
| return minima.astype(np.uint8) | |
| # 2. Label each connected component (marker) | |
| def label_markers(minima): | |
| num_labels, markers = cv2.connectedComponents(minima) | |
| return markers, num_labels | |
| # 3. Watershed from scratch | |
| def watershed_from_scratch(gray, markers): | |
| h, w = gray.shape | |
| # Constants | |
| WATERSHED = -1 | |
| INIT = -2 | |
| # Initialize label and visited map | |
| label_map = np.full((h, w), INIT, dtype=np.int32) | |
| label_map[markers > 0] = markers[markers > 0] | |
| # Priority queue for pixels: (intensity, y, x) | |
| pq = [] | |
| # Populate queue with boundary of initial markers | |
| for y in range(h): | |
| for x in range(w): | |
| if markers[y, x] > 0: | |
| for dy in [-1, 0, 1]: | |
| for dx in [-1, 0, 1]: | |
| ny, nx = y + dy, x + dx | |
| if 0 <= ny < h and 0 <= nx < w: | |
| if markers[ny, nx] == 0 and label_map[ny, nx] == INIT: | |
| heapq.heappush(pq, (gray[ny, nx], ny, nx)) | |
| label_map[ny, nx] = 0 # Mark as in queue | |
| # Flooding | |
| while pq: | |
| intensity, y, x = heapq.heappop(pq) | |
| neighbor_labels = set() | |
| for dy in [-1, 0, 1]: | |
| for dx in [-1, 0, 1]: | |
| ny, nx = y + dy, x + dx | |
| if 0 <= ny < h and 0 <= nx < w: | |
| lbl = label_map[ny, nx] | |
| if lbl > 0: | |
| neighbor_labels.add(lbl) | |
| if len(neighbor_labels) == 1: | |
| label_map[y, x] = neighbor_labels.pop() | |
| elif len(neighbor_labels) > 1: | |
| label_map[y, x] = WATERSHED | |
| # Add unvisited neighbors to the queue | |
| for dy in [-1, 0, 1]: | |
| for dx in [-1, 0, 1]: | |
| ny, nx = y + dy, x + dx | |
| if 0 <= ny < h and 0 <= nx < w: | |
| if label_map[ny, nx] == INIT: | |
| heapq.heappush(pq, (gray[ny, nx], ny, nx)) | |
| label_map[ny, nx] = 0 # Mark as in queue | |
| return label_map | |
| import numpy as np | |
| import cv2 | |
| import heapq | |
| def improved_watershed(image_path): | |
| # Load and preprocess image | |
| original = cv2.imread(image_path) | |
| gray = cv2.cvtColor(original, cv2.COLOR_BGR2GRAY) | |
| blurred = cv2.GaussianBlur(gray, (9, 9), 2) | |
| # Step 1: Better marker detection using adaptive thresholding | |
| thresh = cv2.adaptiveThreshold(blurred, 255, | |
| cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
| cv2.THRESH_BINARY_INV, 21, 4) | |
| # Step 2: Noise removal and sure background area | |
| kernel = np.ones((3,3), np.uint8) | |
| opening = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel, iterations=2) | |
| # Step 3: Distance transform for better foreground detection | |
| dist_transform = cv2.distanceTransform(opening, cv2.DIST_L2, 5) | |
| _, sure_fg = cv2.threshold(dist_transform, 0.5*dist_transform.max(), 255, 0) | |
| sure_fg = np.uint8(sure_fg) | |
| # Step 4: Create markers using connected components | |
| _, markers = cv2.connectedComponents(sure_fg) | |
| markers += 1 # Add 1 to all labels so background is 1 | |
| # Step 5: Apply custom watershed algorithm | |
| label_map = watershed_from_scratch(blurred, markers) | |
| # Enhanced visualization | |
| output = original.copy() | |
| boundaries = (label_map == -1).astype(np.uint8) * 255 | |
| contours, _ = cv2.findContours(boundaries, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| cv2.drawContours(output, contours, -1, (0,0,255), 1) | |
| # Create intermediate step visualization | |
| process_steps = { | |
| "Original": original, | |
| "Blurred": cv2.cvtColor(blurred, cv2.COLOR_GRAY2BGR), | |
| "Threshold": cv2.cvtColor(thresh, cv2.COLOR_GRAY2BGR), | |
| "Foreground Markers": cv2.cvtColor(sure_fg, cv2.COLOR_GRAY2BGR), | |
| "Final Segmentation": output | |
| } | |
| return process_steps | |
| def watershed_from_scratch(gray, markers): | |
| h, w = gray.shape | |
| WATERSHED = -1 | |
| INIT = -2 | |
| label_map = np.full((h, w), INIT, dtype=np.int32) | |
| label_map[markers > 1] = markers[markers > 1] # Skip background marker | |
| pq = [] | |
| # Initialize queue with marker boundaries | |
| for y in range(h): | |
| for x in range(w): | |
| if label_map[y, x] > 0: | |
| for dy in [-1, 0, 1]: | |
| for dx in [-1, 0, 1]: | |
| ny, nx = y+dy, x+dx | |
| if 0 <= ny < h and 0 <= nx < w: | |
| if label_map[ny, nx] == INIT: | |
| heapq.heappush(pq, (gray[ny, nx], ny, nx)) | |
| label_map[ny, nx] = 0 # Queued | |
| # Improved flooding with gradient consideration | |
| while pq: | |
| intensity, y, x = heapq.heappop(pq) | |
| neighbors = [] | |
| # Check 8 neighbors | |
| for dy in [-1, 0, 1]: | |
| for dx in [-1, 0, 1]: | |
| if dy == 0 and dx == 0: | |
| continue | |
| ny, nx = y+dy, x+dx | |
| if 0 <= ny < h and 0 <= nx < w: | |
| neighbors.append(label_map[ny, nx]) | |
| # Find unique labels excluding watershed and background | |
| unique = set(n for n in neighbors if n > 0) | |
| if len(unique) == 0: | |
| label_map[y, x] = 1 # Background | |
| elif len(unique) == 1: | |
| label_map[y, x] = unique.pop() | |
| else: | |
| label_map[y, x] = WATERSHED | |
| # Add neighbors to queue | |
| for dy in [-1, 0, 1]: | |
| for dx in [-1, 0, 1]: | |
| ny, nx = y+dy, x+dx | |
| if 0 <= ny < h and 0 <= nx < w: | |
| if label_map[ny, nx] == INIT: | |
| heapq.heappush(pq, (gray[ny, nx], ny, nx)) | |
| label_map[ny, nx] = 0 | |
| return label_map | |
| # Gradio integration would use: | |
| def generate_watershed(image_path): | |
| results = improved_watershed(image_path) | |
| return ( | |
| results["Original"], | |
| results["Blurred"], | |
| results["Threshold"], | |
| ) | |
| if __name__ == "__main__": | |
| # Run the process | |
| # Load grayscale image | |
| image = cv2.imread("/home/akshat/projects/CSL7360_Project/bird.jpeg", cv2.IMREAD_GRAYSCALE) | |
| image = cv2.GaussianBlur(image, (5, 5), 0) | |
| minima = get_local_minima(image) | |
| markers, num_labels = label_markers(minima) | |
| result = watershed_from_scratch(image, markers) | |
| # Visualization | |
| output = np.zeros((image.shape[0], image.shape[1], 3), dtype=np.uint8) | |
| output[result == -1] = [255, 0, 0] # Watershed lines in red | |
| output[result > 0] = [0, 255, 0] # Segments in green | |
| output[markers > 0] = [0, 0, 255] # Original minima in blue | |
| # Save the original grayscale and the output image | |
| cv2.imwrite("original_grayscale.png", image) | |
| cv2.imwrite("watershed_output.png", output) | |
| print("Images saved as 'original_grayscale.png' and 'watershed_output.png'") |