Spaces:
Sleeping
Sleeping
| """ | |
| Lane detection module using OpenCV | |
| Advanced lane detection with multiple methods and GPU acceleration. | |
| This module contains the core lane detection logic without UI dependencies. | |
| """ | |
| import cv2 | |
| import numpy as np | |
| # GPU acceleration setup - prioritize NVIDIA GPU | |
| USE_GPU = False | |
| GPU_TYPE = "none" | |
| try: | |
| if cv2.cuda.getCudaEnabledDeviceCount() > 0: | |
| USE_GPU = True | |
| GPU_TYPE = "nvidia" | |
| cv2.cuda.setDevice(0) # Use first GPU | |
| print(f"โ NVIDIA CUDA enabled! Using GPU acceleration on device: {cv2.cuda.printShortCudaDeviceInfo(cv2.cuda.getDevice())}") | |
| else: | |
| print("โน CUDA not available. Using CPU.") | |
| except Exception as e: | |
| print(f"โน GPU acceleration not available: {e}. Using CPU.") | |
| GPU_TYPE = "none" | |
| def region_of_interest(img, vertices): | |
| """ | |
| Apply a region of interest mask to the image. | |
| """ | |
| mask = np.zeros_like(img) | |
| cv2.fillPoly(mask, vertices, 255) | |
| masked_image = cv2.bitwise_and(img, mask) | |
| return masked_image | |
| def draw_lines_basic(img, lines, color=[0, 255, 0], thickness=3): | |
| """ | |
| Draw lines on the image with filled lane area (Basic method). | |
| - Lane lines: Red color | |
| - Lane interior: Green semi-transparent fill | |
| """ | |
| if lines is None: | |
| return img | |
| line_img = np.zeros_like(img) | |
| # Separate left and right lane lines | |
| left_lines = [] | |
| right_lines = [] | |
| for line in lines: | |
| x1, y1, x2, y2 = line[0] | |
| if x2 == x1: | |
| continue | |
| slope = (y2 - y1) / (x2 - x1) | |
| # Filter by slope to separate left and right lanes | |
| if slope < -0.5: # Left lane (negative slope) | |
| left_lines.append(line[0]) | |
| elif slope > 0.5: # Right lane (positive slope) | |
| right_lines.append(line[0]) | |
| # Average lines for left and right lanes | |
| def average_lines(lines, img_shape): | |
| if len(lines) == 0: | |
| return None | |
| x_coords = [] | |
| y_coords = [] | |
| for line in lines: | |
| x1, y1, x2, y2 = line | |
| x_coords.extend([x1, x2]) | |
| y_coords.extend([y1, y2]) | |
| # Fit a polynomial to the points | |
| poly = np.polyfit(y_coords, x_coords, 1) | |
| # Calculate line endpoints | |
| y1 = img_shape[0] | |
| y2 = int(img_shape[0] * 0.6) | |
| x1 = int(poly[0] * y1 + poly[1]) | |
| x2 = int(poly[0] * y2 + poly[1]) | |
| return [x1, y1, x2, y2] | |
| # Draw averaged lines | |
| left_line = average_lines(left_lines, img.shape) | |
| right_line = average_lines(right_lines, img.shape) | |
| # Fill the lane area with green color | |
| if left_line is not None and right_line is not None: | |
| # Create polygon points for the lane area | |
| lane_polygon = np.array([[ | |
| (left_line[0], left_line[1]), # Left bottom | |
| (left_line[2], left_line[3]), # Left top | |
| (right_line[2], right_line[3]), # Right top | |
| (right_line[0], right_line[1]) # Right bottom | |
| ]], dtype=np.int32) | |
| # Fill the lane area with green (semi-transparent) | |
| cv2.fillPoly(line_img, lane_polygon, (0, 255, 0)) | |
| # Draw the lane lines in red with thicker lines | |
| if left_line is not None: | |
| cv2.line(line_img, (left_line[0], left_line[1]), (left_line[2], left_line[3]), | |
| (0, 0, 255), thickness * 2) # Red color (BGR format) | |
| if right_line is not None: | |
| cv2.line(line_img, (right_line[0], right_line[1]), (right_line[2], right_line[3]), | |
| (0, 0, 255), thickness * 2) # Red color (BGR format) | |
| # Blend with original image (make the overlay semi-transparent) | |
| return cv2.addWeighted(img, 0.8, line_img, 0.5, 0) | |
| def draw_lines_segmented(img, lines, color=[0, 255, 0], thickness=3): | |
| """ | |
| Draw multiple short line segments to represent curves. | |
| Better curve representation with Hough Transform. | |
| - Lane lines: Red segmented lines | |
| - Lane interior: Green semi-transparent fill | |
| """ | |
| if lines is None: | |
| return img | |
| line_img = np.zeros_like(img) | |
| fill_img = np.zeros_like(img) | |
| # Separate left and right lane lines | |
| left_lines = [] | |
| right_lines = [] | |
| for line in lines: | |
| x1, y1, x2, y2 = line[0] | |
| if x2 == x1: | |
| continue | |
| slope = (y2 - y1) / (x2 - x1) | |
| # Filter by slope to separate left and right lanes | |
| if slope < -0.5: # Left lane (negative slope) | |
| left_lines.append(line[0]) | |
| elif slope > 0.5: # Right lane (positive slope) | |
| right_lines.append(line[0]) | |
| # Extract left and right lane boundaries | |
| left_x = [] | |
| left_y = [] | |
| right_x = [] | |
| right_y = [] | |
| for line in left_lines: | |
| x1, y1, x2, y2 = line | |
| left_x.extend([x1, x2]) | |
| left_y.extend([y1, y2]) | |
| for line in right_lines: | |
| x1, y1, x2, y2 = line | |
| right_x.extend([x1, x2]) | |
| right_y.extend([y1, y2]) | |
| # Initialize sorted lists | |
| left_x_sorted = [] | |
| left_y_sorted = [] | |
| right_x_sorted = [] | |
| right_y_sorted = [] | |
| # Sort by y coordinate to maintain order | |
| if len(left_x) > 0: | |
| left_coords = sorted(zip(left_y, left_x)) | |
| left_y_sorted = [c[0] for c in left_coords] | |
| left_x_sorted = [c[1] for c in left_coords] | |
| # Draw all individual line segments for left lane | |
| for line in left_lines: | |
| x1, y1, x2, y2 = line | |
| cv2.line(line_img, (x1, y1), (x2, y2), (0, 0, 255), thickness + 2) | |
| if len(right_x) > 0: | |
| right_coords = sorted(zip(right_y, right_x)) | |
| right_y_sorted = [c[0] for c in right_coords] | |
| right_x_sorted = [c[1] for c in right_coords] | |
| # Draw all individual line segments for right lane | |
| for line in right_lines: | |
| x1, y1, x2, y2 = line | |
| cv2.line(line_img, (x1, y1), (x2, y2), (0, 0, 255), thickness + 2) | |
| # Fill the area between left and right lanes | |
| if len(left_y_sorted) > 0 and len(right_y_sorted) > 0: | |
| # Create a polygon by combining left and right points | |
| min_y = max(min(left_y_sorted), min(right_y_sorted)) | |
| max_y = min(max(left_y_sorted), max(right_y_sorted)) | |
| if max_y > min_y: | |
| # Interpolate to get matching y-coordinates | |
| y_range = np.arange(int(min_y), int(max_y), 10) | |
| poly_points = [] | |
| # Left points | |
| for y in y_range: | |
| if y >= min(left_y_sorted) and y <= max(left_y_sorted): | |
| idx = np.searchsorted(left_y_sorted, y) | |
| if idx > 0 and idx < len(left_x_sorted): | |
| x = left_x_sorted[idx] | |
| poly_points.append([x, y]) | |
| # Right points (reverse order for polygon) | |
| for y in reversed(y_range): | |
| if y >= min(right_y_sorted) and y <= max(right_y_sorted): | |
| idx = np.searchsorted(right_y_sorted, y) | |
| if idx > 0 and idx < len(right_x_sorted): | |
| x = right_x_sorted[idx] | |
| poly_points.append([x, y]) | |
| if len(poly_points) >= 3: | |
| poly_points = np.array(poly_points, dtype=np.int32) | |
| cv2.fillPoly(fill_img, [poly_points], (0, 255, 0)) | |
| # Combine filled area and lines | |
| result_img = cv2.addWeighted(line_img, 0.6, fill_img, 0.7, 0) | |
| # Blend with original image | |
| return cv2.addWeighted(img, 0.8, result_img, 0.5, 0) | |
| def process_frame_basic(frame, use_segmented=False): | |
| """ | |
| Process a single frame for lane detection using basic Hough Transform method. | |
| use_segmented: If True, draw multiple line segments for better curve representation. | |
| If False, draw averaged single line (default). | |
| """ | |
| height, width = frame.shape[:2] | |
| if USE_GPU and GPU_TYPE == "nvidia": | |
| # Upload frame to GPU | |
| gpu_frame = cv2.cuda_GpuMat() | |
| gpu_frame.upload(frame) | |
| # Convert to grayscale on GPU | |
| gpu_gray = cv2.cuda.cvtColor(gpu_frame, cv2.COLOR_BGR2GRAY) | |
| # Apply Gaussian blur on GPU | |
| gpu_blur = cv2.cuda.createGaussianFilter(cv2.CV_8UC1, cv2.CV_8UC1, (5, 5), 0) | |
| gpu_blurred = gpu_blur.apply(gpu_gray) | |
| # Apply Canny edge detection on GPU | |
| gpu_canny = cv2.cuda.createCannyEdgeDetector(50, 150) | |
| gpu_edges = gpu_canny.detect(gpu_blurred) | |
| # Download edges from GPU | |
| edges = gpu_edges.download() | |
| else: | |
| # CPU processing | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| blur = cv2.GaussianBlur(gray, (5, 5), 0) | |
| edges = cv2.Canny(blur, 50, 150) | |
| # Define region of interest (ROI) | |
| vertices = np.array([[ | |
| (int(width * 0.1), height), | |
| (int(width * 0.45), int(height * 0.6)), | |
| (int(width * 0.55), int(height * 0.6)), | |
| (int(width * 0.9), height) | |
| ]], dtype=np.int32) | |
| # Apply ROI mask | |
| masked_edges = region_of_interest(edges, vertices) | |
| # Apply Hough transform to detect lines | |
| lines = cv2.HoughLinesP( | |
| masked_edges, | |
| rho=2, | |
| theta=np.pi / 180, | |
| threshold=50, | |
| minLineLength=40, | |
| maxLineGap=100 | |
| ) | |
| # Draw detected lanes on the original frame | |
| if use_segmented: | |
| result = draw_lines_segmented(frame.copy(), lines) | |
| else: | |
| result = draw_lines_basic(frame.copy(), lines) | |
| return result | |
| def calibrate_perspective(img): | |
| """ | |
| Apply perspective transform to get bird's eye view. | |
| Converts trapezoidal ROI to rectangular view for easier lane detection. | |
| """ | |
| height, width = img.shape[:2] | |
| # Define source points (trapezoid in original image) | |
| src = np.float32([ | |
| [width * 0.45, height * 0.65], # Bottom left | |
| [width * 0.55, height * 0.65], # Bottom right | |
| [width * 0.9, height], # Top right | |
| [width * 0.1, height] # Top left | |
| ]) | |
| # Define destination points (rectangle in bird's eye view) | |
| dst = np.float32([ | |
| [width * 0.25, 0], # Top left | |
| [width * 0.75, 0], # Top right | |
| [width * 0.75, height], # Bottom right | |
| [width * 0.25, height] # Bottom left | |
| ]) | |
| # Calculate perspective transform matrix | |
| M = cv2.getPerspectiveTransform(src, dst) | |
| # Calculate inverse perspective transform matrix | |
| Minv = cv2.getPerspectiveTransform(dst, src) | |
| # Apply perspective transform | |
| warped = cv2.warpPerspective(img, M, (width, height), flags=cv2.INTER_LINEAR) | |
| return warped, M, Minv | |
| def color_and_gradient_threshold(img, use_enhanced=True): | |
| """ | |
| Apply color and gradient thresholding to isolate lane lines. | |
| Enhanced version with better accuracy for various conditions. | |
| Returns binary image with lane pixels set to 255. | |
| """ | |
| # Convert to HLS color space | |
| hls = cv2.cvtColor(img, cv2.COLOR_BGR2HLS) | |
| # Extract channels | |
| h_channel = hls[:, :, 0] | |
| l_channel = hls[:, :, 1] | |
| s_channel = hls[:, :, 2] | |
| # Enhanced thresholding for better lane detection | |
| if use_enhanced: | |
| # Adaptive thresholding for saturation channel | |
| s_thresh = (90, 255) # Lower threshold for yellow lanes | |
| s_binary = np.zeros_like(s_channel) | |
| s_binary[(s_channel >= s_thresh[0]) & (s_channel <= s_thresh[1])] = 255 | |
| # Adaptive thresholding for lightness channel | |
| l_thresh = (180, 255) # Lower threshold for white lanes | |
| l_binary = np.zeros_like(l_channel) | |
| l_binary[(l_channel >= l_thresh[0]) & (l_channel <= l_thresh[1])] = 255 | |
| # Apply CLAHE (Contrast Limited Adaptive Histogram Equalization) for better contrast | |
| clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) | |
| l_channel_enhanced = clahe.apply(l_channel.astype(np.uint8)) | |
| # Apply Sobel operator on enhanced channel | |
| sobelx = cv2.Sobel(l_channel_enhanced, cv2.CV_64F, 1, 0, ksize=3) | |
| abs_sobelx = np.absolute(sobelx) | |
| scaled_sobel = np.uint8(255 * abs_sobelx / np.max(abs_sobelx)) | |
| # More sensitive gradient threshold | |
| sobel_thresh = (15, 255) # Lower threshold for better edge detection | |
| sobel_binary = np.zeros_like(scaled_sobel) | |
| sobel_binary[(scaled_sobel >= sobel_thresh[0]) & (scaled_sobel <= sobel_thresh[1])] = 255 | |
| # Direction threshold to focus on vertical edges | |
| sobely = cv2.Sobel(l_channel_enhanced, cv2.CV_64F, 0, 1, ksize=3) | |
| abs_sobely = np.absolute(sobely) | |
| scaled_sobely = np.uint8(255 * abs_sobely / np.max(abs_sobely)) | |
| # Calculate gradient direction | |
| grad_dir = np.arctan2(abs_sobely, abs_sobelx) | |
| dir_thresh = (0.7, 1.3) # Focus on near-vertical edges (lane lines) | |
| dir_binary = np.zeros_like(scaled_sobel) | |
| dir_binary[(grad_dir >= dir_thresh[0]) & (grad_dir <= dir_thresh[1])] = 255 | |
| # Combine all binary images with direction filter | |
| combined_binary = np.zeros_like(s_binary) | |
| combined_binary[((s_binary == 255) | (l_binary == 255) | (sobel_binary == 255)) & (dir_binary == 255)] = 255 | |
| # Apply morphological operations to reduce noise | |
| kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)) | |
| combined_binary = cv2.morphologyEx(combined_binary, cv2.MORPH_CLOSE, kernel) | |
| combined_binary = cv2.morphologyEx(combined_binary, cv2.MORPH_OPEN, kernel) | |
| else: | |
| # Basic thresholding | |
| s_thresh = (100, 255) | |
| s_binary = np.zeros_like(s_channel) | |
| s_binary[(s_channel >= s_thresh[0]) & (s_channel <= s_thresh[1])] = 255 | |
| l_thresh = (200, 255) | |
| l_binary = np.zeros_like(l_channel) | |
| l_binary[(l_channel >= l_thresh[0]) & (l_channel <= l_thresh[1])] = 255 | |
| sobelx = cv2.Sobel(l_channel, cv2.CV_64F, 1, 0, ksize=3) | |
| abs_sobelx = np.absolute(sobelx) | |
| scaled_sobel = np.uint8(255 * abs_sobelx / np.max(abs_sobelx)) | |
| sobel_thresh = (20, 255) | |
| sobel_binary = np.zeros_like(scaled_sobel) | |
| sobel_binary[(scaled_sobel >= sobel_thresh[0]) & (scaled_sobel <= sobel_thresh[1])] = 255 | |
| combined_binary = np.zeros_like(s_binary) | |
| combined_binary[(s_binary == 255) | (l_binary == 255) | (sobel_binary == 255)] = 255 | |
| return combined_binary | |
| def fit_polynomial_lanes(binary_warped): | |
| """ | |
| Fit 2nd degree polynomials to lane lines using sliding window approach. | |
| Returns left and right lane polynomial coefficients. | |
| """ | |
| # Take a histogram of the bottom half of the image | |
| histogram = np.sum(binary_warped[binary_warped.shape[0]//2:, :], axis=0) | |
| # Find the peak of the left and right halves of the histogram | |
| midpoint = len(histogram) // 2 | |
| leftx_base = np.argmax(histogram[:midpoint]) | |
| rightx_base = np.argmax(histogram[midpoint:]) + midpoint | |
| # Sliding window parameters | |
| nwindows = 9 | |
| window_height = binary_warped.shape[0] // nwindows | |
| margin = 100 | |
| minpix = 50 | |
| # Find nonzero pixels | |
| nonzero = binary_warped.nonzero() | |
| nonzeroy = np.array(nonzero[0]) | |
| nonzerox = np.array(nonzero[1]) | |
| # Current positions | |
| leftx_current = leftx_base | |
| rightx_current = rightx_base | |
| # Lists to store lane pixel indices | |
| left_lane_inds = [] | |
| right_lane_inds = [] | |
| # Step through windows | |
| for window in range(nwindows): | |
| # Window boundaries | |
| win_y_low = binary_warped.shape[0] - (window + 1) * window_height | |
| win_y_high = binary_warped.shape[0] - window * window_height | |
| win_xleft_low = leftx_current - margin | |
| win_xleft_high = leftx_current + margin | |
| win_xright_low = rightx_current - margin | |
| win_xright_high = rightx_current + margin | |
| # Find pixels within windows | |
| good_left_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & | |
| (nonzerox >= win_xleft_low) & (nonzerox < win_xleft_high)).nonzero()[0] | |
| good_right_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & | |
| (nonzerox >= win_xright_low) & (nonzerox < win_xright_high)).nonzero()[0] | |
| left_lane_inds.append(good_left_inds) | |
| right_lane_inds.append(good_right_inds) | |
| # Recenter windows | |
| if len(good_left_inds) > minpix: | |
| leftx_current = int(np.mean(nonzerox[good_left_inds])) | |
| if len(good_right_inds) > minpix: | |
| rightx_current = int(np.mean(nonzerox[good_right_inds])) | |
| # Concatenate indices | |
| left_lane_inds = np.concatenate(left_lane_inds) | |
| right_lane_inds = np.concatenate(right_lane_inds) | |
| # Extract pixel positions | |
| leftx = nonzerox[left_lane_inds] | |
| lefty = nonzeroy[left_lane_inds] | |
| rightx = nonzerox[right_lane_inds] | |
| righty = nonzeroy[right_lane_inds] | |
| # Fit polynomial (2nd degree) | |
| left_fit = None | |
| right_fit = None | |
| if len(leftx) > 0: | |
| left_fit = np.polyfit(lefty, leftx, 2) | |
| if len(rightx) > 0: | |
| right_fit = np.polyfit(righty, rightx, 2) | |
| return left_fit, right_fit | |
| def draw_poly_lines(img, binary_warped, left_fit, right_fit, Minv): | |
| """ | |
| Draw polynomial lane lines on the original image using inverse perspective transform. | |
| """ | |
| if left_fit is None or right_fit is None: | |
| return img | |
| # Create an image to draw on | |
| warp_zero = np.zeros_like(binary_warped).astype(np.uint8) | |
| color_warp = np.dstack((warp_zero, warp_zero, warp_zero)) | |
| # Generate y values | |
| ploty = np.linspace(0, binary_warped.shape[0] - 1, binary_warped.shape[0]) | |
| # Calculate x values using polynomial | |
| left_fitx = left_fit[0] * ploty**2 + left_fit[1] * ploty + left_fit[2] | |
| right_fitx = right_fit[0] * ploty**2 + right_fit[1] * ploty + right_fit[2] | |
| # Ensure values are within image bounds | |
| left_fitx = np.clip(left_fitx, 0, binary_warped.shape[1] - 1) | |
| right_fitx = np.clip(right_fitx, 0, binary_warped.shape[1] - 1) | |
| # Create points for the lane area | |
| pts_left = np.array([np.transpose(np.vstack([left_fitx, ploty]))]) | |
| pts_right = np.array([np.flipud(np.transpose(np.vstack([right_fitx, ploty])))]) | |
| pts = np.hstack((pts_left, pts_right)) | |
| # Fill the lane area with green | |
| cv2.fillPoly(color_warp, np.int_([pts]), (0, 255, 0)) | |
| # Draw the lane lines in red | |
| cv2.polylines(color_warp, np.int32([pts_left]), isClosed=False, color=(0, 0, 255), thickness=15) | |
| cv2.polylines(color_warp, np.int32([pts_right]), isClosed=False, color=(0, 0, 255), thickness=15) | |
| # Apply inverse perspective transform to project back to original image | |
| newwarp = cv2.warpPerspective(color_warp, Minv, (img.shape[1], img.shape[0])) | |
| # Combine with original image | |
| result = cv2.addWeighted(img, 0.8, newwarp, 0.5, 0) | |
| return result | |
| def process_frame_yolop(frame): | |
| """ | |
| YOLOP-inspired lane detection method. | |
| Simulates multi-task learning approach with semantic segmentation. | |
| Uses enhanced color-based segmentation with adaptive thresholding. | |
| """ | |
| height, width = frame.shape[:2] | |
| # Convert to HLS for better color segmentation | |
| hls = cv2.cvtColor(frame, cv2.COLOR_BGR2HLS) | |
| h_channel = hls[:, :, 0] | |
| l_channel = hls[:, :, 1] | |
| s_channel = hls[:, :, 2] | |
| # Multi-threshold approach for different lane colors | |
| # White lanes - high lightness | |
| white_mask = cv2.inRange(l_channel, 200, 255) | |
| # Yellow lanes - specific hue range | |
| yellow_mask = cv2.inRange(h_channel, 15, 35) & cv2.inRange(s_channel, 80, 255) | |
| # Combine masks | |
| color_mask = cv2.bitwise_or(white_mask, yellow_mask) | |
| # Apply morphological operations | |
| kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5)) | |
| color_mask = cv2.morphologyEx(color_mask, cv2.MORPH_CLOSE, kernel) | |
| color_mask = cv2.morphologyEx(color_mask, cv2.MORPH_OPEN, kernel) | |
| # Apply ROI | |
| vertices = np.array([[ | |
| (int(width * 0.1), height), | |
| (int(width * 0.45), int(height * 0.6)), | |
| (int(width * 0.55), int(height * 0.6)), | |
| (int(width * 0.9), height) | |
| ]], dtype=np.int32) | |
| color_mask = region_of_interest(color_mask, vertices) | |
| # Find contours for lane segments | |
| contours, _ = cv2.findContours(color_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| # Create output image | |
| result = frame.copy() | |
| overlay = np.zeros_like(frame) | |
| # Separate left and right lane contours | |
| left_contours = [] | |
| right_contours = [] | |
| midpoint = width // 2 | |
| for contour in contours: | |
| if cv2.contourArea(contour) > 100: | |
| M = cv2.moments(contour) | |
| if M["m00"] != 0: | |
| cx = int(M["m10"] / M["m00"]) | |
| if cx < midpoint: | |
| left_contours.append(contour) | |
| else: | |
| right_contours.append(contour) | |
| # Draw lane regions | |
| if len(left_contours) > 0 or len(right_contours) > 0: | |
| # Fill lane area | |
| if len(left_contours) > 0 and len(right_contours) > 0: | |
| # Get bounding points | |
| left_points = np.vstack(left_contours).squeeze() | |
| right_points = np.vstack(right_contours).squeeze() | |
| if len(left_points.shape) == 2 and len(right_points.shape) == 2: | |
| # Sort by y coordinate | |
| left_points = left_points[left_points[:, 1].argsort()] | |
| right_points = right_points[right_points[:, 1].argsort()] | |
| # Create polygon | |
| poly_points = np.vstack([left_points, right_points[::-1]]) | |
| cv2.fillPoly(overlay, [poly_points], (0, 255, 0)) | |
| # Draw lane lines | |
| for contour in left_contours: | |
| cv2.drawContours(overlay, [contour], -1, (0, 0, 255), 5) | |
| for contour in right_contours: | |
| cv2.drawContours(overlay, [contour], -1, (0, 0, 255), 5) | |
| # Blend with original | |
| result = cv2.addWeighted(result, 0.8, overlay, 0.5, 0) | |
| return result | |
| def process_frame_ufld(frame): | |
| """ | |
| UFLD-inspired (Ultra Fast Lane Detection) method. | |
| Uses row-wise classification approach with efficient feature extraction. | |
| Focuses on speed and accuracy for real-time applications. | |
| """ | |
| height, width = frame.shape[:2] | |
| # Convert to grayscale | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| # Apply CLAHE for enhanced contrast | |
| clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) | |
| enhanced = clahe.apply(gray) | |
| # Apply bilateral filter to preserve edges while reducing noise | |
| filtered = cv2.bilateralFilter(enhanced, 9, 75, 75) | |
| # Adaptive thresholding | |
| binary = cv2.adaptiveThreshold( | |
| filtered, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
| cv2.THRESH_BINARY, 11, 2 | |
| ) | |
| # Apply ROI | |
| vertices = np.array([[ | |
| (int(width * 0.1), height), | |
| (int(width * 0.45), int(height * 0.6)), | |
| (int(width * 0.55), int(height * 0.6)), | |
| (int(width * 0.9), height) | |
| ]], dtype=np.int32) | |
| binary = region_of_interest(binary, vertices) | |
| # Row-wise lane point detection | |
| row_samples = 18 # Number of rows to sample | |
| row_step = height // row_samples | |
| left_lane_points = [] | |
| right_lane_points = [] | |
| midpoint = width // 2 | |
| for i in range(row_samples): | |
| y = height - i * row_step - row_step // 2 | |
| if y < int(height * 0.6): | |
| continue | |
| row = binary[y, :] | |
| # Find peaks in left and right halves | |
| left_half = row[:midpoint] | |
| right_half = row[midpoint:] | |
| # Find lane positions | |
| left_peaks = np.where(left_half > 200)[0] | |
| right_peaks = np.where(right_half > 200)[0] | |
| if len(left_peaks) > 0: | |
| # Use the rightmost peak in left half | |
| x = left_peaks[-1] | |
| left_lane_points.append([x, y]) | |
| if len(right_peaks) > 0: | |
| # Use the leftmost peak in right half | |
| x = midpoint + right_peaks[0] | |
| right_lane_points.append([x, y]) | |
| # Create result image | |
| result = frame.copy() | |
| overlay = np.zeros_like(frame) | |
| # Fit curves to lane points | |
| if len(left_lane_points) >= 3: | |
| left_lane_points = np.array(left_lane_points) | |
| left_fit = np.polyfit(left_lane_points[:, 1], left_lane_points[:, 0], 2) | |
| # Generate smooth curve | |
| ploty = np.linspace(int(height * 0.6), height, 100) | |
| left_fitx = left_fit[0] * ploty**2 + left_fit[1] * ploty + left_fit[2] | |
| left_fitx = np.clip(left_fitx, 0, width - 1) | |
| left_curve = np.array([np.transpose(np.vstack([left_fitx, ploty]))], dtype=np.int32) | |
| cv2.polylines(overlay, left_curve, False, (0, 0, 255), 8) | |
| if len(right_lane_points) >= 3: | |
| right_lane_points = np.array(right_lane_points) | |
| right_fit = np.polyfit(right_lane_points[:, 1], right_lane_points[:, 0], 2) | |
| # Generate smooth curve | |
| ploty = np.linspace(int(height * 0.6), height, 100) | |
| right_fitx = right_fit[0] * ploty**2 + right_fit[1] * ploty + right_fit[2] | |
| right_fitx = np.clip(right_fitx, 0, width - 1) | |
| right_curve = np.array([np.transpose(np.vstack([right_fitx, ploty]))], dtype=np.int32) | |
| cv2.polylines(overlay, right_curve, False, (0, 0, 255), 8) | |
| # Fill lane area | |
| if len(left_lane_points) >= 3 and len(right_lane_points) >= 3: | |
| ploty = np.linspace(int(height * 0.6), height, 100) | |
| left_fitx = left_fit[0] * ploty**2 + left_fit[1] * ploty + left_fit[2] | |
| right_fitx = right_fit[0] * ploty**2 + right_fit[1] * ploty + right_fit[2] | |
| left_fitx = np.clip(left_fitx, 0, width - 1) | |
| right_fitx = np.clip(right_fitx, 0, width - 1) | |
| pts_left = np.array([np.transpose(np.vstack([left_fitx, ploty]))]) | |
| pts_right = np.array([np.flipud(np.transpose(np.vstack([right_fitx, ploty])))]) | |
| pts = np.hstack((pts_left, pts_right)) | |
| cv2.fillPoly(overlay, np.int32([pts]), (0, 255, 0)) | |
| # Blend | |
| result = cv2.addWeighted(result, 0.8, overlay, 0.5, 0) | |
| return result | |
| def process_frame_scnn(frame): | |
| """ | |
| SCNN-inspired (Spatial CNN) method. | |
| Uses spatial message passing for lane detection. | |
| Implements slice-by-slice convolutions in four directions. | |
| """ | |
| height, width = frame.shape[:2] | |
| # Preprocessing | |
| hls = cv2.cvtColor(frame, cv2.COLOR_BGR2HLS) | |
| l_channel = hls[:, :, 1] | |
| s_channel = hls[:, :, 2] | |
| # Enhanced preprocessing with CLAHE | |
| clahe = cv2.createCLAHE(clipLimit=2.5, tileGridSize=(8, 8)) | |
| l_enhanced = clahe.apply(l_channel) | |
| # Multi-scale edge detection | |
| sobel_x = cv2.Sobel(l_enhanced, cv2.CV_64F, 1, 0, ksize=5) | |
| sobel_y = cv2.Sobel(l_enhanced, cv2.CV_64F, 0, 1, ksize=5) | |
| # Gradient magnitude and direction | |
| magnitude = np.sqrt(sobel_x**2 + sobel_y**2) | |
| magnitude = np.uint8(255 * magnitude / np.max(magnitude)) | |
| direction = np.arctan2(sobel_y, sobel_x) | |
| # Focus on near-vertical edges (lane lines) | |
| vertical_mask = np.zeros_like(magnitude) | |
| vertical_mask[(np.abs(direction) > 0.6) & (np.abs(direction) < 1.5)] = 255 | |
| # Combine with color thresholding | |
| s_binary = cv2.inRange(s_channel, 90, 255) | |
| l_binary = cv2.inRange(l_enhanced, 180, 255) | |
| combined = cv2.bitwise_or(s_binary, l_binary) | |
| combined = cv2.bitwise_and(combined, magnitude) | |
| combined = cv2.bitwise_and(combined, vertical_mask) | |
| # Simulate spatial message passing with directional filtering | |
| # Horizontal message passing (left-to-right and right-to-left) | |
| kernel_h = np.ones((1, 15), np.uint8) | |
| horizontal_pass = cv2.morphologyEx(combined, cv2.MORPH_CLOSE, kernel_h) | |
| # Vertical message passing (top-to-bottom and bottom-to-top) | |
| kernel_v = np.ones((15, 1), np.uint8) | |
| spatial_features = cv2.morphologyEx(horizontal_pass, cv2.MORPH_CLOSE, kernel_v) | |
| # Apply ROI | |
| vertices = np.array([[ | |
| (int(width * 0.1), height), | |
| (int(width * 0.45), int(height * 0.6)), | |
| (int(width * 0.55), int(height * 0.6)), | |
| (int(width * 0.9), height) | |
| ]], dtype=np.int32) | |
| spatial_features = region_of_interest(spatial_features, vertices) | |
| # Lane fitting with sliding window | |
| histogram = np.sum(spatial_features[spatial_features.shape[0]//2:, :], axis=0) | |
| midpoint = len(histogram) // 2 | |
| leftx_base = np.argmax(histogram[:midpoint]) | |
| rightx_base = np.argmax(histogram[midpoint:]) + midpoint | |
| # Sliding window parameters | |
| nwindows = 12 | |
| window_height = spatial_features.shape[0] // nwindows | |
| margin = 80 | |
| minpix = 40 | |
| nonzero = spatial_features.nonzero() | |
| nonzeroy = np.array(nonzero[0]) | |
| nonzerox = np.array(nonzero[1]) | |
| leftx_current = leftx_base | |
| rightx_current = rightx_base | |
| left_lane_inds = [] | |
| right_lane_inds = [] | |
| for window in range(nwindows): | |
| win_y_low = spatial_features.shape[0] - (window + 1) * window_height | |
| win_y_high = spatial_features.shape[0] - window * window_height | |
| win_xleft_low = leftx_current - margin | |
| win_xleft_high = leftx_current + margin | |
| win_xright_low = rightx_current - margin | |
| win_xright_high = rightx_current + margin | |
| good_left_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & | |
| (nonzerox >= win_xleft_low) & (nonzerox < win_xleft_high)).nonzero()[0] | |
| good_right_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & | |
| (nonzerox >= win_xright_low) & (nonzerox < win_xright_high)).nonzero()[0] | |
| left_lane_inds.append(good_left_inds) | |
| right_lane_inds.append(good_right_inds) | |
| if len(good_left_inds) > minpix: | |
| leftx_current = int(np.mean(nonzerox[good_left_inds])) | |
| if len(good_right_inds) > minpix: | |
| rightx_current = int(np.mean(nonzerox[good_right_inds])) | |
| left_lane_inds = np.concatenate(left_lane_inds) | |
| right_lane_inds = np.concatenate(right_lane_inds) | |
| leftx = nonzerox[left_lane_inds] | |
| lefty = nonzeroy[left_lane_inds] | |
| rightx = nonzerox[right_lane_inds] | |
| righty = nonzeroy[right_lane_inds] | |
| result = frame.copy() | |
| overlay = np.zeros_like(frame) | |
| if len(leftx) > 0 and len(rightx) > 0: | |
| left_fit = np.polyfit(lefty, leftx, 2) | |
| right_fit = np.polyfit(righty, rightx, 2) | |
| ploty = np.linspace(0, spatial_features.shape[0] - 1, spatial_features.shape[0]) | |
| left_fitx = left_fit[0] * ploty**2 + left_fit[1] * ploty + left_fit[2] | |
| right_fitx = right_fit[0] * ploty**2 + right_fit[1] * ploty + right_fit[2] | |
| left_fitx = np.clip(left_fitx, 0, width - 1) | |
| right_fitx = np.clip(right_fitx, 0, width - 1) | |
| # Draw lane area | |
| pts_left = np.array([np.transpose(np.vstack([left_fitx, ploty]))]) | |
| pts_right = np.array([np.flipud(np.transpose(np.vstack([right_fitx, ploty])))]) | |
| pts = np.hstack((pts_left, pts_right)) | |
| cv2.fillPoly(overlay, np.int32([pts]), (0, 255, 0)) | |
| # Draw lane lines | |
| cv2.polylines(overlay, np.int32([pts_left]), False, (0, 0, 255), 12) | |
| cv2.polylines(overlay, np.int32([pts_right]), False, (0, 0, 255), 12) | |
| result = cv2.addWeighted(result, 0.8, overlay, 0.5, 0) | |
| return result | |
| def process_frame(frame, method="advanced", use_enhanced=True, use_segmented=False): | |
| """ | |
| Process a single frame for lane detection. | |
| method: "basic", "basic_segmented", "advanced", "yolop", "ufld", "scnn" | |
| use_enhanced: Use enhanced thresholding for better accuracy (advanced method only) | |
| use_segmented: Use segmented lines for curve representation (basic method only) | |
| """ | |
| if method == "basic" or method == "basic_standard": | |
| return process_frame_basic(frame, use_segmented=False) | |
| elif method == "basic_segmented": | |
| return process_frame_basic(frame, use_segmented=True) | |
| elif method == "advanced": | |
| return process_frame_advanced(frame, use_enhanced) | |
| elif method == "yolop": | |
| return process_frame_yolop(frame) | |
| elif method == "ufld": | |
| return process_frame_ufld(frame) | |
| elif method == "scnn": | |
| return process_frame_scnn(frame) | |
| else: | |
| raise ValueError(f"Unknown method: {method}. Use 'basic', 'basic_segmented', 'advanced', 'yolop', 'ufld', or 'scnn'") | |
| def process_frame_advanced(frame, use_enhanced=True): | |
| """ | |
| Process a single frame for lane detection using advanced pipeline. | |
| 1. Perspective transform to bird's eye view | |
| 2. Enhanced color and gradient thresholding | |
| 3. Polynomial fitting with sliding windows | |
| 4. Draw lanes with inverse perspective transform | |
| """ | |
| # Step 1: Apply perspective transform to get bird's eye view | |
| warped, M, Minv = calibrate_perspective(frame) | |
| # Step 2: Apply enhanced color and gradient thresholding | |
| binary_warped = color_and_gradient_threshold(warped, use_enhanced) | |
| # Step 3: Fit polynomial lanes using sliding window approach | |
| left_fit, right_fit = fit_polynomial_lanes(binary_warped) | |
| # Step 4: Draw polynomial lines on original image | |
| result = draw_poly_lines(frame, binary_warped, left_fit, right_fit, Minv) | |
| return result | |
| def process_video(input_path, output_path, method="advanced", use_enhanced=True, use_segmented=False, progress_callback=None): | |
| """ | |
| Process the video and create side-by-side comparison. | |
| method: "basic", "basic_segmented", "advanced", "yolop", "ufld", "scnn" | |
| use_enhanced: Use enhanced thresholding for better accuracy (advanced method only) | |
| use_segmented: Use segmented lines for curve representation (basic method only) | |
| progress_callback: Optional callback function to report progress (value between 0 and 1) | |
| Returns True if successful, False otherwise. | |
| """ | |
| # Open the video | |
| cap = cv2.VideoCapture(input_path) | |
| if not cap.isOpened(): | |
| return False | |
| # Get video properties | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| # Video writer for output (side-by-side, so width is doubled) | |
| # Try different codecs for better browser compatibility | |
| codecs_to_try = [ | |
| ('H264', cv2.VideoWriter_fourcc(*'H264')), # H.264 - most compatible | |
| ('h264', cv2.VideoWriter_fourcc(*'h264')), # Alternative H.264 | |
| ('mp4v', cv2.VideoWriter_fourcc(*'mp4v')), # MPEG-4 Part 2 | |
| ('XVID', cv2.VideoWriter_fourcc(*'XVID')), # XVID | |
| ] | |
| out = None | |
| selected_codec = None | |
| for codec_name, fourcc in codecs_to_try: | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (width * 2, height)) | |
| if out.isOpened(): | |
| selected_codec = codec_name | |
| print(f"โ Using codec: {codec_name}") | |
| break | |
| else: | |
| print(f"โ Codec {codec_name} not available, trying next...") | |
| if out is None or not out.isOpened(): | |
| print("โ Error: No suitable video codec found!") | |
| return False | |
| frame_count = 0 | |
| print(f"Processing {total_frames} frames using {method} method...") | |
| if method == "advanced" and use_enhanced: | |
| print("Enhanced thresholding enabled for better accuracy") | |
| if method == "basic" and use_segmented: | |
| print("Segmented line mode enabled for better curve representation") | |
| # Process each frame | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Process frame for lane detection | |
| processed_frame = process_frame(frame, method, use_enhanced, use_segmented) | |
| # Create side-by-side comparison | |
| # Original on left, processed on right | |
| combined = np.hstack((frame, processed_frame)) | |
| # Write the combined frame | |
| out.write(combined) | |
| frame_count += 1 | |
| # Progress indicator | |
| if progress_callback and frame_count % 10 == 0: | |
| progress = frame_count / total_frames if total_frames > 0 else 0 | |
| progress_callback(progress, f"Processing frame {frame_count}/{total_frames}") | |
| elif frame_count % 30 == 0: | |
| progress = (frame_count / total_frames) * 100 if total_frames > 0 else 0 | |
| print(f"Progress: {frame_count}/{total_frames} frames ({progress:.1f}%)") | |
| # Release resources | |
| cap.release() | |
| out.release() | |
| if progress_callback: | |
| progress_callback(1.0, "Completed!") | |
| print(f"โ Completed! Processed {frame_count} frames using {method} method.") | |
| return frame_count > 0 | |