""" Lane detection module using OpenCV Advanced lane detection with multiple methods and GPU acceleration. This module contains the core lane detection logic without UI dependencies. """ import cv2 import numpy as np # GPU acceleration setup - prioritize NVIDIA GPU USE_GPU = False GPU_TYPE = "none" try: if cv2.cuda.getCudaEnabledDeviceCount() > 0: USE_GPU = True GPU_TYPE = "nvidia" cv2.cuda.setDevice(0) # Use first GPU print(f"✓ NVIDIA CUDA enabled! Using GPU acceleration on device: {cv2.cuda.printShortCudaDeviceInfo(cv2.cuda.getDevice())}") else: print("ℹ CUDA not available. Using CPU.") except Exception as e: print(f"ℹ GPU acceleration not available: {e}. Using CPU.") GPU_TYPE = "none" def region_of_interest(img, vertices): """ Apply a region of interest mask to the image. """ mask = np.zeros_like(img) cv2.fillPoly(mask, vertices, 255) masked_image = cv2.bitwise_and(img, mask) return masked_image def draw_lines_basic(img, lines, color=[0, 255, 0], thickness=3): """ Draw lines on the image with filled lane area (Basic method). - Lane lines: Red color - Lane interior: Green semi-transparent fill """ if lines is None: return img line_img = np.zeros_like(img) # Separate left and right lane lines left_lines = [] right_lines = [] for line in lines: x1, y1, x2, y2 = line[0] if x2 == x1: continue slope = (y2 - y1) / (x2 - x1) # Filter by slope to separate left and right lanes if slope < -0.5: # Left lane (negative slope) left_lines.append(line[0]) elif slope > 0.5: # Right lane (positive slope) right_lines.append(line[0]) # Average lines for left and right lanes def average_lines(lines, img_shape): if len(lines) == 0: return None x_coords = [] y_coords = [] for line in lines: x1, y1, x2, y2 = line x_coords.extend([x1, x2]) y_coords.extend([y1, y2]) # Fit a polynomial to the points poly = np.polyfit(y_coords, x_coords, 1) # Calculate line endpoints y1 = img_shape[0] y2 = int(img_shape[0] * 0.6) x1 = int(poly[0] * y1 + poly[1]) x2 = int(poly[0] * y2 + poly[1]) return [x1, y1, x2, y2] # Draw averaged lines left_line = average_lines(left_lines, img.shape) right_line = average_lines(right_lines, img.shape) # Fill the lane area with green color if left_line is not None and right_line is not None: # Create polygon points for the lane area lane_polygon = np.array([[ (left_line[0], left_line[1]), # Left bottom (left_line[2], left_line[3]), # Left top (right_line[2], right_line[3]), # Right top (right_line[0], right_line[1]) # Right bottom ]], dtype=np.int32) # Fill the lane area with green (semi-transparent) cv2.fillPoly(line_img, lane_polygon, (0, 255, 0)) # Draw the lane lines in red with thicker lines if left_line is not None: cv2.line(line_img, (left_line[0], left_line[1]), (left_line[2], left_line[3]), (0, 0, 255), thickness * 2) # Red color (BGR format) if right_line is not None: cv2.line(line_img, (right_line[0], right_line[1]), (right_line[2], right_line[3]), (0, 0, 255), thickness * 2) # Red color (BGR format) # Blend with original image (make the overlay semi-transparent) return cv2.addWeighted(img, 0.8, line_img, 0.5, 0) def draw_lines_segmented(img, lines, color=[0, 255, 0], thickness=3): """ Draw multiple short line segments to represent curves. Better curve representation with Hough Transform. - Lane lines: Red segmented lines - Lane interior: Green semi-transparent fill """ if lines is None: return img line_img = np.zeros_like(img) fill_img = np.zeros_like(img) # Separate left and right lane lines left_lines = [] right_lines = [] for line in lines: x1, y1, x2, y2 = line[0] if x2 == x1: continue slope = (y2 - y1) / (x2 - x1) # Filter by slope to separate left and right lanes if slope < -0.5: # Left lane (negative slope) left_lines.append(line[0]) elif slope > 0.5: # Right lane (positive slope) right_lines.append(line[0]) # Extract left and right lane boundaries left_x = [] left_y = [] right_x = [] right_y = [] for line in left_lines: x1, y1, x2, y2 = line left_x.extend([x1, x2]) left_y.extend([y1, y2]) for line in right_lines: x1, y1, x2, y2 = line right_x.extend([x1, x2]) right_y.extend([y1, y2]) # Initialize sorted lists left_x_sorted = [] left_y_sorted = [] right_x_sorted = [] right_y_sorted = [] # Sort by y coordinate to maintain order if len(left_x) > 0: left_coords = sorted(zip(left_y, left_x)) left_y_sorted = [c[0] for c in left_coords] left_x_sorted = [c[1] for c in left_coords] # Draw all individual line segments for left lane for line in left_lines: x1, y1, x2, y2 = line cv2.line(line_img, (x1, y1), (x2, y2), (0, 0, 255), thickness + 2) if len(right_x) > 0: right_coords = sorted(zip(right_y, right_x)) right_y_sorted = [c[0] for c in right_coords] right_x_sorted = [c[1] for c in right_coords] # Draw all individual line segments for right lane for line in right_lines: x1, y1, x2, y2 = line cv2.line(line_img, (x1, y1), (x2, y2), (0, 0, 255), thickness + 2) # Fill the area between left and right lanes if len(left_y_sorted) > 0 and len(right_y_sorted) > 0: # Create a polygon by combining left and right points min_y = max(min(left_y_sorted), min(right_y_sorted)) max_y = min(max(left_y_sorted), max(right_y_sorted)) if max_y > min_y: # Interpolate to get matching y-coordinates y_range = np.arange(int(min_y), int(max_y), 10) poly_points = [] # Left points for y in y_range: if y >= min(left_y_sorted) and y <= max(left_y_sorted): idx = np.searchsorted(left_y_sorted, y) if idx > 0 and idx < len(left_x_sorted): x = left_x_sorted[idx] poly_points.append([x, y]) # Right points (reverse order for polygon) for y in reversed(y_range): if y >= min(right_y_sorted) and y <= max(right_y_sorted): idx = np.searchsorted(right_y_sorted, y) if idx > 0 and idx < len(right_x_sorted): x = right_x_sorted[idx] poly_points.append([x, y]) if len(poly_points) >= 3: poly_points = np.array(poly_points, dtype=np.int32) cv2.fillPoly(fill_img, [poly_points], (0, 255, 0)) # Combine filled area and lines result_img = cv2.addWeighted(line_img, 0.6, fill_img, 0.7, 0) # Blend with original image return cv2.addWeighted(img, 0.8, result_img, 0.5, 0) def process_frame_basic(frame, use_segmented=False): """ Process a single frame for lane detection using basic Hough Transform method. use_segmented: If True, draw multiple line segments for better curve representation. If False, draw averaged single line (default). """ height, width = frame.shape[:2] if USE_GPU and GPU_TYPE == "nvidia": # Upload frame to GPU gpu_frame = cv2.cuda_GpuMat() gpu_frame.upload(frame) # Convert to grayscale on GPU gpu_gray = cv2.cuda.cvtColor(gpu_frame, cv2.COLOR_BGR2GRAY) # Apply Gaussian blur on GPU gpu_blur = cv2.cuda.createGaussianFilter(cv2.CV_8UC1, cv2.CV_8UC1, (5, 5), 0) gpu_blurred = gpu_blur.apply(gpu_gray) # Apply Canny edge detection on GPU gpu_canny = cv2.cuda.createCannyEdgeDetector(50, 150) gpu_edges = gpu_canny.detect(gpu_blurred) # Download edges from GPU edges = gpu_edges.download() else: # CPU processing gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) blur = cv2.GaussianBlur(gray, (5, 5), 0) edges = cv2.Canny(blur, 50, 150) # Define region of interest (ROI) vertices = np.array([[ (int(width * 0.1), height), (int(width * 0.45), int(height * 0.6)), (int(width * 0.55), int(height * 0.6)), (int(width * 0.9), height) ]], dtype=np.int32) # Apply ROI mask masked_edges = region_of_interest(edges, vertices) # Apply Hough transform to detect lines lines = cv2.HoughLinesP( masked_edges, rho=2, theta=np.pi / 180, threshold=50, minLineLength=40, maxLineGap=100 ) # Draw detected lanes on the original frame if use_segmented: result = draw_lines_segmented(frame.copy(), lines) else: result = draw_lines_basic(frame.copy(), lines) return result def calibrate_perspective(img): """ Apply perspective transform to get bird's eye view. Converts trapezoidal ROI to rectangular view for easier lane detection. """ height, width = img.shape[:2] # Define source points (trapezoid in original image) src = np.float32([ [width * 0.45, height * 0.65], # Bottom left [width * 0.55, height * 0.65], # Bottom right [width * 0.9, height], # Top right [width * 0.1, height] # Top left ]) # Define destination points (rectangle in bird's eye view) dst = np.float32([ [width * 0.25, 0], # Top left [width * 0.75, 0], # Top right [width * 0.75, height], # Bottom right [width * 0.25, height] # Bottom left ]) # Calculate perspective transform matrix M = cv2.getPerspectiveTransform(src, dst) # Calculate inverse perspective transform matrix Minv = cv2.getPerspectiveTransform(dst, src) # Apply perspective transform warped = cv2.warpPerspective(img, M, (width, height), flags=cv2.INTER_LINEAR) return warped, M, Minv def color_and_gradient_threshold(img, use_enhanced=True): """ Apply color and gradient thresholding to isolate lane lines. Enhanced version with better accuracy for various conditions. Returns binary image with lane pixels set to 255. """ # Convert to HLS color space hls = cv2.cvtColor(img, cv2.COLOR_BGR2HLS) # Extract channels h_channel = hls[:, :, 0] l_channel = hls[:, :, 1] s_channel = hls[:, :, 2] # Enhanced thresholding for better lane detection if use_enhanced: # Adaptive thresholding for saturation channel s_thresh = (90, 255) # Lower threshold for yellow lanes s_binary = np.zeros_like(s_channel) s_binary[(s_channel >= s_thresh[0]) & (s_channel <= s_thresh[1])] = 255 # Adaptive thresholding for lightness channel l_thresh = (180, 255) # Lower threshold for white lanes l_binary = np.zeros_like(l_channel) l_binary[(l_channel >= l_thresh[0]) & (l_channel <= l_thresh[1])] = 255 # Apply CLAHE (Contrast Limited Adaptive Histogram Equalization) for better contrast clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) l_channel_enhanced = clahe.apply(l_channel.astype(np.uint8)) # Apply Sobel operator on enhanced channel sobelx = cv2.Sobel(l_channel_enhanced, cv2.CV_64F, 1, 0, ksize=3) abs_sobelx = np.absolute(sobelx) scaled_sobel = np.uint8(255 * abs_sobelx / np.max(abs_sobelx)) # More sensitive gradient threshold sobel_thresh = (15, 255) # Lower threshold for better edge detection sobel_binary = np.zeros_like(scaled_sobel) sobel_binary[(scaled_sobel >= sobel_thresh[0]) & (scaled_sobel <= sobel_thresh[1])] = 255 # Direction threshold to focus on vertical edges sobely = cv2.Sobel(l_channel_enhanced, cv2.CV_64F, 0, 1, ksize=3) abs_sobely = np.absolute(sobely) scaled_sobely = np.uint8(255 * abs_sobely / np.max(abs_sobely)) # Calculate gradient direction grad_dir = np.arctan2(abs_sobely, abs_sobelx) dir_thresh = (0.7, 1.3) # Focus on near-vertical edges (lane lines) dir_binary = np.zeros_like(scaled_sobel) dir_binary[(grad_dir >= dir_thresh[0]) & (grad_dir <= dir_thresh[1])] = 255 # Combine all binary images with direction filter combined_binary = np.zeros_like(s_binary) combined_binary[((s_binary == 255) | (l_binary == 255) | (sobel_binary == 255)) & (dir_binary == 255)] = 255 # Apply morphological operations to reduce noise kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)) combined_binary = cv2.morphologyEx(combined_binary, cv2.MORPH_CLOSE, kernel) combined_binary = cv2.morphologyEx(combined_binary, cv2.MORPH_OPEN, kernel) else: # Basic thresholding s_thresh = (100, 255) s_binary = np.zeros_like(s_channel) s_binary[(s_channel >= s_thresh[0]) & (s_channel <= s_thresh[1])] = 255 l_thresh = (200, 255) l_binary = np.zeros_like(l_channel) l_binary[(l_channel >= l_thresh[0]) & (l_channel <= l_thresh[1])] = 255 sobelx = cv2.Sobel(l_channel, cv2.CV_64F, 1, 0, ksize=3) abs_sobelx = np.absolute(sobelx) scaled_sobel = np.uint8(255 * abs_sobelx / np.max(abs_sobelx)) sobel_thresh = (20, 255) sobel_binary = np.zeros_like(scaled_sobel) sobel_binary[(scaled_sobel >= sobel_thresh[0]) & (scaled_sobel <= sobel_thresh[1])] = 255 combined_binary = np.zeros_like(s_binary) combined_binary[(s_binary == 255) | (l_binary == 255) | (sobel_binary == 255)] = 255 return combined_binary def fit_polynomial_lanes(binary_warped): """ Fit 2nd degree polynomials to lane lines using sliding window approach. Returns left and right lane polynomial coefficients. """ # Take a histogram of the bottom half of the image histogram = np.sum(binary_warped[binary_warped.shape[0]//2:, :], axis=0) # Find the peak of the left and right halves of the histogram midpoint = len(histogram) // 2 leftx_base = np.argmax(histogram[:midpoint]) rightx_base = np.argmax(histogram[midpoint:]) + midpoint # Sliding window parameters nwindows = 9 window_height = binary_warped.shape[0] // nwindows margin = 100 minpix = 50 # Find nonzero pixels nonzero = binary_warped.nonzero() nonzeroy = np.array(nonzero[0]) nonzerox = np.array(nonzero[1]) # Current positions leftx_current = leftx_base rightx_current = rightx_base # Lists to store lane pixel indices left_lane_inds = [] right_lane_inds = [] # Step through windows for window in range(nwindows): # Window boundaries win_y_low = binary_warped.shape[0] - (window + 1) * window_height win_y_high = binary_warped.shape[0] - window * window_height win_xleft_low = leftx_current - margin win_xleft_high = leftx_current + margin win_xright_low = rightx_current - margin win_xright_high = rightx_current + margin # Find pixels within windows good_left_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & (nonzerox >= win_xleft_low) & (nonzerox < win_xleft_high)).nonzero()[0] good_right_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & (nonzerox >= win_xright_low) & (nonzerox < win_xright_high)).nonzero()[0] left_lane_inds.append(good_left_inds) right_lane_inds.append(good_right_inds) # Recenter windows if len(good_left_inds) > minpix: leftx_current = int(np.mean(nonzerox[good_left_inds])) if len(good_right_inds) > minpix: rightx_current = int(np.mean(nonzerox[good_right_inds])) # Concatenate indices left_lane_inds = np.concatenate(left_lane_inds) right_lane_inds = np.concatenate(right_lane_inds) # Extract pixel positions leftx = nonzerox[left_lane_inds] lefty = nonzeroy[left_lane_inds] rightx = nonzerox[right_lane_inds] righty = nonzeroy[right_lane_inds] # Fit polynomial (2nd degree) left_fit = None right_fit = None if len(leftx) > 0: left_fit = np.polyfit(lefty, leftx, 2) if len(rightx) > 0: right_fit = np.polyfit(righty, rightx, 2) return left_fit, right_fit def draw_poly_lines(img, binary_warped, left_fit, right_fit, Minv): """ Draw polynomial lane lines on the original image using inverse perspective transform. """ if left_fit is None or right_fit is None: return img # Create an image to draw on warp_zero = np.zeros_like(binary_warped).astype(np.uint8) color_warp = np.dstack((warp_zero, warp_zero, warp_zero)) # Generate y values ploty = np.linspace(0, binary_warped.shape[0] - 1, binary_warped.shape[0]) # Calculate x values using polynomial left_fitx = left_fit[0] * ploty**2 + left_fit[1] * ploty + left_fit[2] right_fitx = right_fit[0] * ploty**2 + right_fit[1] * ploty + right_fit[2] # Ensure values are within image bounds left_fitx = np.clip(left_fitx, 0, binary_warped.shape[1] - 1) right_fitx = np.clip(right_fitx, 0, binary_warped.shape[1] - 1) # Create points for the lane area pts_left = np.array([np.transpose(np.vstack([left_fitx, ploty]))]) pts_right = np.array([np.flipud(np.transpose(np.vstack([right_fitx, ploty])))]) pts = np.hstack((pts_left, pts_right)) # Fill the lane area with green cv2.fillPoly(color_warp, np.int_([pts]), (0, 255, 0)) # Draw the lane lines in red cv2.polylines(color_warp, np.int32([pts_left]), isClosed=False, color=(0, 0, 255), thickness=15) cv2.polylines(color_warp, np.int32([pts_right]), isClosed=False, color=(0, 0, 255), thickness=15) # Apply inverse perspective transform to project back to original image newwarp = cv2.warpPerspective(color_warp, Minv, (img.shape[1], img.shape[0])) # Combine with original image result = cv2.addWeighted(img, 0.8, newwarp, 0.5, 0) return result def process_frame_yolop(frame): """ YOLOP-inspired lane detection method. Simulates multi-task learning approach with semantic segmentation. Uses enhanced color-based segmentation with adaptive thresholding. """ height, width = frame.shape[:2] # Convert to HLS for better color segmentation hls = cv2.cvtColor(frame, cv2.COLOR_BGR2HLS) h_channel = hls[:, :, 0] l_channel = hls[:, :, 1] s_channel = hls[:, :, 2] # Multi-threshold approach for different lane colors # White lanes - high lightness white_mask = cv2.inRange(l_channel, 200, 255) # Yellow lanes - specific hue range yellow_mask = cv2.inRange(h_channel, 15, 35) & cv2.inRange(s_channel, 80, 255) # Combine masks color_mask = cv2.bitwise_or(white_mask, yellow_mask) # Apply morphological operations kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5)) color_mask = cv2.morphologyEx(color_mask, cv2.MORPH_CLOSE, kernel) color_mask = cv2.morphologyEx(color_mask, cv2.MORPH_OPEN, kernel) # Apply ROI vertices = np.array([[ (int(width * 0.1), height), (int(width * 0.45), int(height * 0.6)), (int(width * 0.55), int(height * 0.6)), (int(width * 0.9), height) ]], dtype=np.int32) color_mask = region_of_interest(color_mask, vertices) # Find contours for lane segments contours, _ = cv2.findContours(color_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) # Create output image result = frame.copy() overlay = np.zeros_like(frame) # Separate left and right lane contours left_contours = [] right_contours = [] midpoint = width // 2 for contour in contours: if cv2.contourArea(contour) > 100: M = cv2.moments(contour) if M["m00"] != 0: cx = int(M["m10"] / M["m00"]) if cx < midpoint: left_contours.append(contour) else: right_contours.append(contour) # Draw lane regions if len(left_contours) > 0 or len(right_contours) > 0: # Fill lane area if len(left_contours) > 0 and len(right_contours) > 0: # Get bounding points left_points = np.vstack(left_contours).squeeze() right_points = np.vstack(right_contours).squeeze() if len(left_points.shape) == 2 and len(right_points.shape) == 2: # Sort by y coordinate left_points = left_points[left_points[:, 1].argsort()] right_points = right_points[right_points[:, 1].argsort()] # Create polygon poly_points = np.vstack([left_points, right_points[::-1]]) cv2.fillPoly(overlay, [poly_points], (0, 255, 0)) # Draw lane lines for contour in left_contours: cv2.drawContours(overlay, [contour], -1, (0, 0, 255), 5) for contour in right_contours: cv2.drawContours(overlay, [contour], -1, (0, 0, 255), 5) # Blend with original result = cv2.addWeighted(result, 0.8, overlay, 0.5, 0) return result def process_frame_ufld(frame): """ UFLD-inspired (Ultra Fast Lane Detection) method. Uses row-wise classification approach with efficient feature extraction. Focuses on speed and accuracy for real-time applications. """ height, width = frame.shape[:2] # Convert to grayscale gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # Apply CLAHE for enhanced contrast clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) enhanced = clahe.apply(gray) # Apply bilateral filter to preserve edges while reducing noise filtered = cv2.bilateralFilter(enhanced, 9, 75, 75) # Adaptive thresholding binary = cv2.adaptiveThreshold( filtered, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2 ) # Apply ROI vertices = np.array([[ (int(width * 0.1), height), (int(width * 0.45), int(height * 0.6)), (int(width * 0.55), int(height * 0.6)), (int(width * 0.9), height) ]], dtype=np.int32) binary = region_of_interest(binary, vertices) # Row-wise lane point detection row_samples = 18 # Number of rows to sample row_step = height // row_samples left_lane_points = [] right_lane_points = [] midpoint = width // 2 for i in range(row_samples): y = height - i * row_step - row_step // 2 if y < int(height * 0.6): continue row = binary[y, :] # Find peaks in left and right halves left_half = row[:midpoint] right_half = row[midpoint:] # Find lane positions left_peaks = np.where(left_half > 200)[0] right_peaks = np.where(right_half > 200)[0] if len(left_peaks) > 0: # Use the rightmost peak in left half x = left_peaks[-1] left_lane_points.append([x, y]) if len(right_peaks) > 0: # Use the leftmost peak in right half x = midpoint + right_peaks[0] right_lane_points.append([x, y]) # Create result image result = frame.copy() overlay = np.zeros_like(frame) # Fit curves to lane points if len(left_lane_points) >= 3: left_lane_points = np.array(left_lane_points) left_fit = np.polyfit(left_lane_points[:, 1], left_lane_points[:, 0], 2) # Generate smooth curve ploty = np.linspace(int(height * 0.6), height, 100) left_fitx = left_fit[0] * ploty**2 + left_fit[1] * ploty + left_fit[2] left_fitx = np.clip(left_fitx, 0, width - 1) left_curve = np.array([np.transpose(np.vstack([left_fitx, ploty]))], dtype=np.int32) cv2.polylines(overlay, left_curve, False, (0, 0, 255), 8) if len(right_lane_points) >= 3: right_lane_points = np.array(right_lane_points) right_fit = np.polyfit(right_lane_points[:, 1], right_lane_points[:, 0], 2) # Generate smooth curve ploty = np.linspace(int(height * 0.6), height, 100) right_fitx = right_fit[0] * ploty**2 + right_fit[1] * ploty + right_fit[2] right_fitx = np.clip(right_fitx, 0, width - 1) right_curve = np.array([np.transpose(np.vstack([right_fitx, ploty]))], dtype=np.int32) cv2.polylines(overlay, right_curve, False, (0, 0, 255), 8) # Fill lane area if len(left_lane_points) >= 3 and len(right_lane_points) >= 3: ploty = np.linspace(int(height * 0.6), height, 100) left_fitx = left_fit[0] * ploty**2 + left_fit[1] * ploty + left_fit[2] right_fitx = right_fit[0] * ploty**2 + right_fit[1] * ploty + right_fit[2] left_fitx = np.clip(left_fitx, 0, width - 1) right_fitx = np.clip(right_fitx, 0, width - 1) pts_left = np.array([np.transpose(np.vstack([left_fitx, ploty]))]) pts_right = np.array([np.flipud(np.transpose(np.vstack([right_fitx, ploty])))]) pts = np.hstack((pts_left, pts_right)) cv2.fillPoly(overlay, np.int32([pts]), (0, 255, 0)) # Blend result = cv2.addWeighted(result, 0.8, overlay, 0.5, 0) return result def process_frame_scnn(frame): """ SCNN-inspired (Spatial CNN) method. Uses spatial message passing for lane detection. Implements slice-by-slice convolutions in four directions. """ height, width = frame.shape[:2] # Preprocessing hls = cv2.cvtColor(frame, cv2.COLOR_BGR2HLS) l_channel = hls[:, :, 1] s_channel = hls[:, :, 2] # Enhanced preprocessing with CLAHE clahe = cv2.createCLAHE(clipLimit=2.5, tileGridSize=(8, 8)) l_enhanced = clahe.apply(l_channel) # Multi-scale edge detection sobel_x = cv2.Sobel(l_enhanced, cv2.CV_64F, 1, 0, ksize=5) sobel_y = cv2.Sobel(l_enhanced, cv2.CV_64F, 0, 1, ksize=5) # Gradient magnitude and direction magnitude = np.sqrt(sobel_x**2 + sobel_y**2) magnitude = np.uint8(255 * magnitude / np.max(magnitude)) direction = np.arctan2(sobel_y, sobel_x) # Focus on near-vertical edges (lane lines) vertical_mask = np.zeros_like(magnitude) vertical_mask[(np.abs(direction) > 0.6) & (np.abs(direction) < 1.5)] = 255 # Combine with color thresholding s_binary = cv2.inRange(s_channel, 90, 255) l_binary = cv2.inRange(l_enhanced, 180, 255) combined = cv2.bitwise_or(s_binary, l_binary) combined = cv2.bitwise_and(combined, magnitude) combined = cv2.bitwise_and(combined, vertical_mask) # Simulate spatial message passing with directional filtering # Horizontal message passing (left-to-right and right-to-left) kernel_h = np.ones((1, 15), np.uint8) horizontal_pass = cv2.morphologyEx(combined, cv2.MORPH_CLOSE, kernel_h) # Vertical message passing (top-to-bottom and bottom-to-top) kernel_v = np.ones((15, 1), np.uint8) spatial_features = cv2.morphologyEx(horizontal_pass, cv2.MORPH_CLOSE, kernel_v) # Apply ROI vertices = np.array([[ (int(width * 0.1), height), (int(width * 0.45), int(height * 0.6)), (int(width * 0.55), int(height * 0.6)), (int(width * 0.9), height) ]], dtype=np.int32) spatial_features = region_of_interest(spatial_features, vertices) # Lane fitting with sliding window histogram = np.sum(spatial_features[spatial_features.shape[0]//2:, :], axis=0) midpoint = len(histogram) // 2 leftx_base = np.argmax(histogram[:midpoint]) rightx_base = np.argmax(histogram[midpoint:]) + midpoint # Sliding window parameters nwindows = 12 window_height = spatial_features.shape[0] // nwindows margin = 80 minpix = 40 nonzero = spatial_features.nonzero() nonzeroy = np.array(nonzero[0]) nonzerox = np.array(nonzero[1]) leftx_current = leftx_base rightx_current = rightx_base left_lane_inds = [] right_lane_inds = [] for window in range(nwindows): win_y_low = spatial_features.shape[0] - (window + 1) * window_height win_y_high = spatial_features.shape[0] - window * window_height win_xleft_low = leftx_current - margin win_xleft_high = leftx_current + margin win_xright_low = rightx_current - margin win_xright_high = rightx_current + margin good_left_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & (nonzerox >= win_xleft_low) & (nonzerox < win_xleft_high)).nonzero()[0] good_right_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & (nonzerox >= win_xright_low) & (nonzerox < win_xright_high)).nonzero()[0] left_lane_inds.append(good_left_inds) right_lane_inds.append(good_right_inds) if len(good_left_inds) > minpix: leftx_current = int(np.mean(nonzerox[good_left_inds])) if len(good_right_inds) > minpix: rightx_current = int(np.mean(nonzerox[good_right_inds])) left_lane_inds = np.concatenate(left_lane_inds) right_lane_inds = np.concatenate(right_lane_inds) leftx = nonzerox[left_lane_inds] lefty = nonzeroy[left_lane_inds] rightx = nonzerox[right_lane_inds] righty = nonzeroy[right_lane_inds] result = frame.copy() overlay = np.zeros_like(frame) if len(leftx) > 0 and len(rightx) > 0: left_fit = np.polyfit(lefty, leftx, 2) right_fit = np.polyfit(righty, rightx, 2) ploty = np.linspace(0, spatial_features.shape[0] - 1, spatial_features.shape[0]) left_fitx = left_fit[0] * ploty**2 + left_fit[1] * ploty + left_fit[2] right_fitx = right_fit[0] * ploty**2 + right_fit[1] * ploty + right_fit[2] left_fitx = np.clip(left_fitx, 0, width - 1) right_fitx = np.clip(right_fitx, 0, width - 1) # Draw lane area pts_left = np.array([np.transpose(np.vstack([left_fitx, ploty]))]) pts_right = np.array([np.flipud(np.transpose(np.vstack([right_fitx, ploty])))]) pts = np.hstack((pts_left, pts_right)) cv2.fillPoly(overlay, np.int32([pts]), (0, 255, 0)) # Draw lane lines cv2.polylines(overlay, np.int32([pts_left]), False, (0, 0, 255), 12) cv2.polylines(overlay, np.int32([pts_right]), False, (0, 0, 255), 12) result = cv2.addWeighted(result, 0.8, overlay, 0.5, 0) return result def process_frame(frame, method="advanced", use_enhanced=True, use_segmented=False): """ Process a single frame for lane detection. method: "basic", "basic_segmented", "advanced", "yolop", "ufld", "scnn" use_enhanced: Use enhanced thresholding for better accuracy (advanced method only) use_segmented: Use segmented lines for curve representation (basic method only) """ if method == "basic" or method == "basic_standard": return process_frame_basic(frame, use_segmented=False) elif method == "basic_segmented": return process_frame_basic(frame, use_segmented=True) elif method == "advanced": return process_frame_advanced(frame, use_enhanced) elif method == "yolop": return process_frame_yolop(frame) elif method == "ufld": return process_frame_ufld(frame) elif method == "scnn": return process_frame_scnn(frame) else: raise ValueError(f"Unknown method: {method}. Use 'basic', 'basic_segmented', 'advanced', 'yolop', 'ufld', or 'scnn'") def process_frame_advanced(frame, use_enhanced=True): """ Process a single frame for lane detection using advanced pipeline. 1. Perspective transform to bird's eye view 2. Enhanced color and gradient thresholding 3. Polynomial fitting with sliding windows 4. Draw lanes with inverse perspective transform """ # Step 1: Apply perspective transform to get bird's eye view warped, M, Minv = calibrate_perspective(frame) # Step 2: Apply enhanced color and gradient thresholding binary_warped = color_and_gradient_threshold(warped, use_enhanced) # Step 3: Fit polynomial lanes using sliding window approach left_fit, right_fit = fit_polynomial_lanes(binary_warped) # Step 4: Draw polynomial lines on original image result = draw_poly_lines(frame, binary_warped, left_fit, right_fit, Minv) return result def process_video(input_path, output_path, method="advanced", use_enhanced=True, use_segmented=False, progress_callback=None): """ Process the video and create side-by-side comparison. method: "basic", "basic_segmented", "advanced", "yolop", "ufld", "scnn" use_enhanced: Use enhanced thresholding for better accuracy (advanced method only) use_segmented: Use segmented lines for curve representation (basic method only) progress_callback: Optional callback function to report progress (value between 0 and 1) Returns True if successful, False otherwise. """ # Open the video cap = cv2.VideoCapture(input_path) if not cap.isOpened(): return False # Get video properties fps = int(cap.get(cv2.CAP_PROP_FPS)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Video writer for output (side-by-side, so width is doubled) # Try different codecs for better browser compatibility codecs_to_try = [ ('H264', cv2.VideoWriter_fourcc(*'H264')), # H.264 - most compatible ('h264', cv2.VideoWriter_fourcc(*'h264')), # Alternative H.264 ('mp4v', cv2.VideoWriter_fourcc(*'mp4v')), # MPEG-4 Part 2 ('XVID', cv2.VideoWriter_fourcc(*'XVID')), # XVID ] out = None selected_codec = None for codec_name, fourcc in codecs_to_try: out = cv2.VideoWriter(output_path, fourcc, fps, (width * 2, height)) if out.isOpened(): selected_codec = codec_name print(f"✓ Using codec: {codec_name}") break else: print(f"⚠ Codec {codec_name} not available, trying next...") if out is None or not out.isOpened(): print("✗ Error: No suitable video codec found!") return False frame_count = 0 print(f"Processing {total_frames} frames using {method} method...") if method == "advanced" and use_enhanced: print("Enhanced thresholding enabled for better accuracy") if method == "basic" and use_segmented: print("Segmented line mode enabled for better curve representation") # Process each frame while True: ret, frame = cap.read() if not ret: break # Process frame for lane detection processed_frame = process_frame(frame, method, use_enhanced, use_segmented) # Create side-by-side comparison # Original on left, processed on right combined = np.hstack((frame, processed_frame)) # Write the combined frame out.write(combined) frame_count += 1 # Progress indicator if progress_callback and frame_count % 10 == 0: progress = frame_count / total_frames if total_frames > 0 else 0 progress_callback(progress, f"Processing frame {frame_count}/{total_frames}") elif frame_count % 30 == 0: progress = (frame_count / total_frames) * 100 if total_frames > 0 else 0 print(f"Progress: {frame_count}/{total_frames} frames ({progress:.1f}%)") # Release resources cap.release() out.release() if progress_callback: progress_callback(1.0, "Completed!") print(f"✓ Completed! Processed {frame_count} frames using {method} method.") return frame_count > 0