Spaces:
Running
Running
| from __future__ import annotations | |
| import os | |
| import gc | |
| import base64 | |
| import io | |
| import time | |
| import shutil | |
| import numpy as np | |
| import torch | |
| import cv2 | |
| import ezdxf | |
| from ezdxf.addons.text2path import make_paths_from_str | |
| from ezdxf import path | |
| from ezdxf.addons import text2path | |
| from ezdxf.enums import TextEntityAlignment | |
| from ezdxf.fonts.fonts import FontFace, get_font_face | |
| import gradio as gr | |
| from PIL import Image, ImageEnhance | |
| from pathlib import Path | |
| from typing import List, Union | |
| from ultralytics import YOLOWorld, YOLO | |
| from ultralytics.engine.results import Results | |
| from ultralytics.utils.plotting import save_one_box | |
| from transformers import AutoModelForImageSegmentation | |
| from torchvision import transforms | |
| from scalingtestupdated import calculate_scaling_factor | |
| from shapely.geometry import Polygon, Point, MultiPolygon | |
| from scipy.interpolate import splprep, splev | |
| from scipy.ndimage import gaussian_filter1d | |
| from u2net import U2NETP | |
| # --------------------- | |
| # Create a cache folder for models | |
| # --------------------- | |
| CACHE_DIR = os.path.join(os.path.dirname(__file__), ".cache") | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| # --------------------- | |
| # Custom Exceptions | |
| # --------------------- | |
| class DrawerNotDetectedError(Exception): | |
| """Raised when the drawer cannot be detected in the image""" | |
| pass | |
| class ReferenceBoxNotDetectedError(Exception): | |
| """Raised when the Reference coin cannot be detected in the image""" | |
| pass | |
| class BoundaryOverlapError(Exception): | |
| """The specified boundary dimensions are too small and overlap with the inner contours.Please provide larger value for boundary length and width.""" | |
| pass | |
| class TextOverlapError(Exception): | |
| """Raised when the text overlaps with the inner contours (with a margin of 0.75).Please provide larger value for boundary length and width.""" | |
| pass | |
| # --------------------- | |
| # Global Model Initialization with caching and print statements | |
| # --------------------- | |
| print("Loading YOLOWorld model...") | |
| start_time = time.time() | |
| yolo_model_path = os.path.join(CACHE_DIR, "yolov8x-worldv2.pt") | |
| if not os.path.exists(yolo_model_path): | |
| print("Caching YOLOWorld model to", yolo_model_path) | |
| shutil.copy("yolov8x-worldv2.pt", yolo_model_path) | |
| drawer_detector_global = YOLOWorld(yolo_model_path) | |
| drawer_detector_global.set_classes(["box"]) | |
| print("YOLOWorld model loaded in {:.2f} seconds".format(time.time() - start_time)) | |
| print("Loading YOLO reference model...") | |
| start_time = time.time() | |
| reference_model_path = os.path.join(CACHE_DIR, "coin_det.pt") | |
| if not os.path.exists(reference_model_path): | |
| print("Caching YOLO reference model to", reference_model_path) | |
| shutil.copy("coin_det.pt", reference_model_path) | |
| reference_detector_global = YOLO(reference_model_path) | |
| print("YOLO reference model loaded in {:.2f} seconds".format(time.time() - start_time)) | |
| print("Loading U²-Net model for reference background removal (U2NETP)...") | |
| start_time = time.time() | |
| u2net_model_path = os.path.join(CACHE_DIR, "u2netp.pth") | |
| if not os.path.exists(u2net_model_path): | |
| print("Caching U²-Net model to", u2net_model_path) | |
| shutil.copy("u2netp.pth", u2net_model_path) | |
| u2net_global = U2NETP(3, 1) | |
| u2net_global.load_state_dict(torch.load(u2net_model_path, map_location="cpu")) | |
| device = "cpu" | |
| u2net_global.to(device) | |
| u2net_global.eval() | |
| print("U²-Net model loaded in {:.2f} seconds".format(time.time() - start_time)) | |
| print("Loading BiRefNet model...") | |
| start_time = time.time() | |
| birefnet_global = AutoModelForImageSegmentation.from_pretrained( | |
| "zhengpeng7/BiRefNet", trust_remote_code=True, cache_dir=CACHE_DIR | |
| ) | |
| torch.set_float32_matmul_precision("high") | |
| birefnet_global.to(device) | |
| birefnet_global.eval() | |
| print("BiRefNet model loaded in {:.2f} seconds".format(time.time() - start_time)) | |
| # Define transform for BiRefNet | |
| transform_image_global = transforms.Compose([ | |
| transforms.Resize((1024, 1024)), | |
| transforms.ToTensor(), | |
| transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), | |
| ]) | |
| # --------------------- | |
| # Model Reload Function (if needed) | |
| # --------------------- | |
| def unload_and_reload_models(): | |
| global drawer_detector_global, reference_detector_global, birefnet_global, u2net_global | |
| print("Reloading models...") | |
| start_time = time.time() | |
| del drawer_detector_global, reference_detector_global, birefnet_global, u2net_global | |
| gc.collect() | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| new_drawer_detector = YOLOWorld(os.path.join(CACHE_DIR, "yolov8x-worldv2.pt")) | |
| new_drawer_detector.set_classes(["box"]) | |
| new_reference_detector = YOLO(os.path.join(CACHE_DIR, "coin_det.pt")) | |
| new_birefnet = AutoModelForImageSegmentation.from_pretrained( | |
| "zhengpeng7/BiRefNet", trust_remote_code=True, cache_dir=CACHE_DIR | |
| ) | |
| new_birefnet.to(device) | |
| new_birefnet.eval() | |
| new_u2net = U2NETP(3, 1) | |
| new_u2net.load_state_dict(torch.load(os.path.join(CACHE_DIR, "u2netp.pth"), map_location="cpu")) | |
| new_u2net.to(device) | |
| new_u2net.eval() | |
| drawer_detector_global = new_drawer_detector | |
| reference_detector_global = new_reference_detector | |
| birefnet_global = new_birefnet | |
| u2net_global = new_u2net | |
| print("Models reloaded in {:.2f} seconds".format(time.time() - start_time)) | |
| # --------------------- | |
| # Helper Function: resize_img (defined once) | |
| # --------------------- | |
| def resize_img(img: np.ndarray, resize_dim): | |
| return np.array(Image.fromarray(img).resize(resize_dim)) | |
| # --------------------- | |
| # Other Helper Functions for Detection & Processing | |
| # --------------------- | |
| def yolo_detect(image: Union[str, Path, int, Image.Image, list, tuple, np.ndarray, torch.Tensor]) -> np.ndarray: | |
| t = time.time() | |
| results: List[Results] = drawer_detector_global.predict(image) | |
| if not results or len(results) == 0 or len(results[0].boxes) == 0: | |
| raise DrawerNotDetectedError("Drawer not detected in the image.") | |
| print("Drawer detection completed in {:.2f} seconds".format(time.time() - t)) | |
| return save_one_box(results[0].cpu().boxes.xyxy, im=results[0].orig_img, save=False) | |
| def detect_reference_square(img: np.ndarray): | |
| t = time.time() | |
| res = reference_detector_global.predict(img, conf=0.15) | |
| if not res or len(res) == 0 or len(res[0].boxes) == 0: | |
| raise ReferenceBoxNotDetectedError("Reference Coin not detected in the image.") | |
| print("Reference coin detection completed in {:.2f} seconds".format(time.time() - t)) | |
| return ( | |
| save_one_box(res[0].cpu().boxes.xyxy, res[0].orig_img, save=False), | |
| res[0].cpu().boxes.xyxy[0] | |
| ) | |
| # Use U2NETP for reference background removal. | |
| def remove_bg_u2netp(image: np.ndarray) -> np.ndarray: | |
| t = time.time() | |
| image_pil = Image.fromarray(image) | |
| transform_u2netp = transforms.Compose([ | |
| transforms.Resize((320, 320)), | |
| transforms.ToTensor(), | |
| transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), | |
| ]) | |
| input_tensor = transform_u2netp(image_pil).unsqueeze(0).to("cpu") | |
| with torch.no_grad(): | |
| outputs = u2net_global(input_tensor) | |
| pred = outputs[0] | |
| pred = (pred - pred.min()) / (pred.max() - pred.min() + 1e-8) | |
| pred_np = pred.squeeze().cpu().numpy() | |
| pred_np = cv2.resize(pred_np, (image_pil.width, image_pil.height)) | |
| pred_np = (pred_np * 255).astype(np.uint8) | |
| print("U2NETP background removal completed in {:.2f} seconds".format(time.time() - t)) | |
| return pred_np | |
| # Use BiRefNet for main object background removal. | |
| def remove_bg(image: np.ndarray) -> np.ndarray: | |
| t = time.time() | |
| image_pil = Image.fromarray(image) | |
| input_images = transform_image_global(image_pil).unsqueeze(0).to("cpu") | |
| with torch.no_grad(): | |
| preds = birefnet_global(input_images)[-1].sigmoid().cpu() | |
| pred = preds[0].squeeze() | |
| pred_pil = transforms.ToPILImage()(pred) | |
| scale_ratio = 1024 / max(image_pil.size) | |
| scaled_size = (int(image_pil.size[0] * scale_ratio), int(image_pil.size[1] * scale_ratio)) | |
| result = np.array(pred_pil.resize(scaled_size)) | |
| print("BiRefNet background removal completed in {:.2f} seconds".format(time.time() - t)) | |
| return result | |
| def make_square(img: np.ndarray): | |
| height, width = img.shape[:2] | |
| max_dim = max(height, width) | |
| pad_height = (max_dim - height) // 2 | |
| pad_width = (max_dim - width) // 2 | |
| pad_height_extra = max_dim - height - 2 * pad_height | |
| pad_width_extra = max_dim - width - 2 * pad_width | |
| if len(img.shape) == 3: | |
| padded = np.pad(img, ((pad_height, pad_height + pad_height_extra), | |
| (pad_width, pad_width + pad_width_extra), | |
| (0, 0)), mode="edge") | |
| else: | |
| padded = np.pad(img, ((pad_height, pad_height + pad_height_extra), | |
| (pad_width, pad_width + pad_width_extra)), mode="edge") | |
| return padded | |
| def shrink_bbox(image: np.ndarray, shrink_factor: float): | |
| height, width = image.shape[:2] | |
| center_x, center_y = width // 2, height // 2 | |
| new_width = int(width * shrink_factor) | |
| new_height = int(height * shrink_factor) | |
| x1 = max(center_x - new_width // 2, 0) | |
| y1 = max(center_y - new_height // 2, 0) | |
| x2 = min(center_x + new_width // 2, width) | |
| y2 = min(center_y + new_height // 2, height) | |
| return image[y1:y2, x1:x2] | |
| def exclude_scaling_box(image: np.ndarray, bbox: np.ndarray, orig_size: tuple, processed_size: tuple, expansion_factor: float = 1.2) -> np.ndarray: | |
| x_min, y_min, x_max, y_max = map(int, bbox) | |
| scale_x = processed_size[1] / orig_size[1] | |
| scale_y = processed_size[0] / orig_size[0] | |
| x_min = int(x_min * scale_x) | |
| x_max = int(x_max * scale_x) | |
| y_min = int(y_min * scale_y) | |
| y_max = int(y_max * scale_y) | |
| box_width = x_max - x_min | |
| box_height = y_max - y_min | |
| expanded_x_min = max(0, int(x_min - (expansion_factor - 1) * box_width / 2)) | |
| expanded_x_max = min(image.shape[1], int(x_max + (expansion_factor - 1) * box_width / 2)) | |
| expanded_y_min = max(0, int(y_min - (expansion_factor - 1) * box_height / 2)) | |
| expanded_y_max = min(image.shape[0], int(y_max + (expansion_factor - 1) * box_height / 2)) | |
| image[expanded_y_min:expanded_y_max, expanded_x_min:expanded_x_max] = 0 | |
| return image | |
| # def resample_contour(contour): | |
| # num_points = 1000 | |
| # smoothing_factor = 5 | |
| # spline_degree = 3 | |
| # if len(contour) < spline_degree + 1: | |
| # raise ValueError(f"Contour must have at least {spline_degree + 1} points, but has {len(contour)} points.") | |
| # contour = contour[:, 0, :] | |
| # tck, _ = splprep([contour[:, 0], contour[:, 1]], s=smoothing_factor) | |
| # u = np.linspace(0, 1, num_points) | |
| # resampled_points = splev(u, tck) | |
| # smoothed_x = gaussian_filter1d(resampled_points[0], sigma=1) | |
| # smoothed_y = gaussian_filter1d(resampled_points[1], sigma=1) | |
| # return np.array([smoothed_x, smoothed_y]).T | |
| # # --------------------- | |
| # # Add the missing extract_outlines function | |
| # # --------------------- | |
| # def extract_outlines(binary_image: np.ndarray) -> (np.ndarray, list): | |
| # contours, _ = cv2.findContours(binary_image, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) | |
| # outline_image = np.zeros_like(binary_image) | |
| # cv2.drawContours(outline_image, contours, -1, (255), thickness=2) | |
| # return cv2.bitwise_not(outline_image), contours | |
| # # # --------------------- | |
| # # # Functions for Finger Cut Clearance | |
| # # # --------------------- | |
| # # def union_tool_and_circle(tool_polygon: Polygon, center_inch, circle_diameter=1.0): | |
| # # radius = circle_diameter / 2.0 | |
| # # circle_poly = Point(center_inch).buffer(radius, resolution=64) | |
| # # union_poly = tool_polygon.union(circle_poly) | |
| # # return union_poly | |
| # # def build_tool_polygon(points_inch): | |
| # # return Polygon(points_inch) | |
| # # def polygon_to_exterior_coords(poly: Polygon): | |
| # # if poly.geom_type == "MultiPolygon": | |
| # # biggest = max(poly.geoms, key=lambda g: g.area) | |
| # # poly = biggest | |
| # # if not poly.exterior: | |
| # # return [] | |
| # # return list(poly.exterior.coords) | |
| # # from shapely.geometry import Point, Polygon | |
| # # import numpy as np | |
| # # import random | |
| # # from shapely.geometry import Point, Polygon | |
| # # import numpy as np | |
| # # import random | |
| # # def place_finger_cut_adjusted( | |
| # # tool_polygon: Polygon, | |
| # # points_inch: list, | |
| # # existing_centers: list, | |
| # # all_polygons: list, | |
| # # circle_diameter: float = 1.0, | |
| # # min_gap: float = 0.5, | |
| # # max_attempts: int = 100 | |
| # # ) -> (Polygon, tuple): | |
| # # needed_center_distance = circle_diameter + min_gap | |
| # # radius = circle_diameter / 2.0 | |
| # # attempts = 0 | |
| # # timeout_secs = 10 # 100s timeout | |
| # # start_time = time.perf_counter() | |
| # # fallback_triggered = False | |
| # # # Randomize candidate points order. | |
| # # indices = list(range(len(points_inch))) | |
| # # random.shuffle(indices) | |
| # # while attempts < max_attempts and not fallback_triggered: | |
| # # for i in indices: | |
| # # if time.perf_counter() - start_time >= timeout_secs: | |
| # # fallback_triggered = True | |
| # # break | |
| # # cx, cy = points_inch[i] | |
| # # # Try small adjustments around the candidate point. | |
| # # for dx in np.linspace(-0.3, 0.3, 7): # adjust by ±0.3 inches in 7 steps | |
| # # for dy in np.linspace(-0.3, 0.3, 7): | |
| # # if time.perf_counter() - start_time >= timeout_secs: | |
| # # fallback_triggered = True | |
| # # break | |
| # # candidate_center = (cx + dx, cy + dy) | |
| # # # Ensure candidate center is not too close to any already placed centers. | |
| # # if any(np.hypot(candidate_center[0] - ex, candidate_center[1] - ey) < needed_center_distance | |
| # # for ex, ey in existing_centers): | |
| # # continue | |
| # # # Create candidate circle with a high resolution. | |
| # # candidate_circle = Point(candidate_center).buffer(radius, resolution=64) | |
| # # # Reject candidate if circle is completely inside the tool polygon. | |
| # # if tool_polygon.contains(candidate_circle): | |
| # # continue | |
| # # # Also reject candidate if circle does not intersect the tool at all. | |
| # # if not candidate_circle.intersects(tool_polygon): | |
| # # continue | |
| # # # Ensure that the candidate circle crosses the tool boundary. | |
| # # inter_area = candidate_circle.intersection(tool_polygon).area | |
| # # if inter_area <= 0 or inter_area >= candidate_circle.area: | |
| # # continue | |
| # # # Verify candidate circle is not too close to any neighboring tool polygons. | |
| # # too_close = False | |
| # # for other_poly in all_polygons: | |
| # # if other_poly.equals(tool_polygon): | |
| # # continue | |
| # # if candidate_circle.buffer(0.1).intersects(other_poly): | |
| # # too_close = True | |
| # # if other_poly.distance(candidate_circle) < min_gap: | |
| # # too_close = True | |
| # # break | |
| # # if too_close: | |
| # # continue | |
| # # # Attempt the union, using a buffering trick to fix potential geometry problems. | |
| # # try: | |
| # # union_poly = tool_polygon.union(candidate_circle) | |
| # # except Exception: | |
| # # union_poly = tool_polygon.buffer(0).union(candidate_circle.buffer(0)) | |
| # # # Verify that the union is a single contiguous polygon. | |
| # # if union_poly.geom_type == "MultiPolygon" and len(union_poly.geoms) > 1: | |
| # # continue | |
| # # # If the union did not change the polygon (no effective union), skip candidate. | |
| # # if union_poly.equals(tool_polygon): | |
| # # continue | |
| # # # We have found a valid candidate. | |
| # # existing_centers.append(candidate_center) | |
| # # return union_poly, candidate_center | |
| # # if fallback_triggered: | |
| # # break | |
| # # attempts += 1 | |
| # # if fallback_triggered: | |
| # # print("In fallback block") | |
| # # # Fallback: If no candidate was found after max_attempts, force a candidate from median of points. | |
| # # candidate_center = points_inch[len(points_inch) // 2] | |
| # # candidate_circle = Point(candidate_center).buffer(radius, resolution=64) | |
| # # try: | |
| # # too_close= False | |
| # # for other_poly in all_polygons: | |
| # # if other_poly.equals(tool_polygon): | |
| # # continue | |
| # # if candidate_circle.buffer(0.1).intersects(other_poly): | |
| # # too_close = True | |
| # # if other_poly.distance(candidate_circle) < min_gap: | |
| # # too_close = True | |
| # # if too_close: | |
| # # continue | |
| # # union_poly = tool_polygon.union(candidate_circle) | |
| # # except Exception: | |
| # # too_close= False | |
| # # for other_poly in all_polygons: | |
| # # if other_poly.equals(tool_polygon): | |
| # # continue | |
| # # if candidate_circle.buffer(0.1).intersects(other_poly): | |
| # # too_close = True | |
| # # if other_poly.distance(candidate_circle) < min_gap: | |
| # # too_close = True | |
| # # if too_close: | |
| # # continue | |
| # # union_poly = tool_polygon.buffer(0).union(candidate_circle.buffer(0)) | |
| # # existing_centers.append(candidate_center) | |
| # # return union_poly, candidate_center | |
| # # # --------------------- | |
| # # # DXF Spline and Boundary Functions | |
| # # # --------------------- | |
| # # def save_dxf_spline(inflated_contours, scaling_factor, height, finger_clearance=False): | |
| # degree = 3 | |
| # closed = True | |
| # doc = ezdxf.new(units=0) | |
| # doc.units = ezdxf.units.IN | |
| # doc.header["$INSUNITS"] = ezdxf.units.IN | |
| # msp = doc.modelspace() | |
| # finger_cut_centers = [] | |
| # final_polygons_inch = [] | |
| # for contour in inflated_contours: | |
| # try: | |
| # resampled_contour = resample_contour(contour) | |
| # points_inch = [(x * scaling_factor, (height - y) * scaling_factor) for x, y in resampled_contour] | |
| # if len(points_inch) < 3: | |
| # continue | |
| # if np.linalg.norm(np.array(points_inch[0]) - np.array(points_inch[-1])) > 1e-2: | |
| # points_inch.append(points_inch[0]) | |
| # tool_polygon = build_tool_polygon(points_inch) | |
| # if finger_clearance: | |
| # union_poly, center = place_finger_cut_adjusted(tool_polygon, points_inch, finger_cut_centers, final_polygons_inch, circle_diameter=1.0, min_gap=0.25, max_attempts=100) | |
| # if union_poly is not None: | |
| # tool_polygon = union_poly | |
| # exterior_coords = polygon_to_exterior_coords(tool_polygon) | |
| # if len(exterior_coords) < 3: | |
| # continue | |
| # msp.add_spline(exterior_coords, degree=degree, dxfattribs={"layer": "TOOLS"}) | |
| # final_polygons_inch.append(tool_polygon) | |
| # except ValueError as e: | |
| # print(f"Skipping contour: {e}") | |
| # return doc, final_polygons_inch | |
| # import random | |
| # import time | |
| # import numpy as np | |
| # from shapely.geometry import Point, Polygon | |
| # # --------------------- | |
| # # Utility functions | |
| # # --------------------- | |
| # def union_tool_and_circle(tool_polygon: Polygon, center_inch, circle_diameter=1.0): | |
| # radius = circle_diameter / 2.0 | |
| # circle_poly = Point(center_inch).buffer(radius, resolution=64) | |
| # union_poly = tool_polygon.union(circle_poly) | |
| # return union_poly | |
| # def build_tool_polygon(points_inch): | |
| # return Polygon(points_inch) | |
| # def polygon_to_exterior_coords(poly: Polygon): | |
| # if poly.geom_type == "MultiPolygon": | |
| # biggest = max(poly.geoms, key=lambda g: g.area) | |
| # poly = biggest | |
| # if not poly.exterior: | |
| # return [] | |
| # return list(poly.exterior.coords) | |
| # --------------------- | |
| # Main candidate placement function | |
| # --------------------- | |
| # def place_finger_cut_adjusted( | |
| # tool_polygon1: Polygon, | |
| # points_inch: list, | |
| # existing_centers: list, | |
| # all_polygons: list, | |
| # circle_diameter: float = 1.0, | |
| # min_gap: float = 0.5, | |
| # max_attempts: int = 100 | |
| # ) -> (Polygon, tuple): | |
| # """ | |
| # Adjust and union a candidate circle (finger cut) with the tool_polygon. | |
| # If a candidate meeting all conditions is found, update existing_centers | |
| # and return the union and candidate_center. | |
| # If no candidate is found after max_attempts (or if a timeout is reached), | |
| # use a fallback candidate (the median point from points_inch). | |
| # """ | |
| # needed_center_distance = circle_diameter + min_gap | |
| # radius = circle_diameter / 2.0 | |
| # attempts = 0 | |
| # timeout_secs = 0.1 # 100ms timeout | |
| # start_time = time.perf_counter() | |
| # fallback_triggered = False | |
| # tool_polygon= tool_polygon1 | |
| # # Randomize candidate points order. | |
| # indices = list(range(len(points_inch))) | |
| # random.shuffle(indices) | |
| # while attempts < max_attempts and not fallback_triggered: | |
| # for i in indices: | |
| # if time.perf_counter() - start_time >= timeout_secs: | |
| # fallback_triggered = True | |
| # break | |
| # cx, cy = points_inch[i] | |
| # # Try small adjustments around the candidate point. | |
| # for dx in np.linspace(-0.3, 0.3, 7): | |
| # for dy in np.linspace(-0.3, 0.3, 7): | |
| # if time.perf_counter() - start_time >= timeout_secs: | |
| # fallback_triggered = True | |
| # break | |
| # candidate_center = (cx + dx, cy + dy) | |
| # # Ensure candidate center is not too close to any already placed centers. | |
| # if any(np.hypot(candidate_center[0] - ex, candidate_center[1] - ey) < needed_center_distance | |
| # for ex, ey in existing_centers): | |
| # continue | |
| # # Create candidate circle with high resolution. | |
| # candidate_circle = Point(candidate_center).buffer(radius, resolution=64) | |
| # # Reject candidate if circle is completely inside the tool polygon. | |
| # if tool_polygon.contains(candidate_circle): | |
| # continue | |
| # # Reject candidate if circle does not intersect the tool at all. | |
| # if not candidate_circle.intersects(tool_polygon): | |
| # continue | |
| # # Ensure that the candidate circle crosses the tool boundary. | |
| # inter_area = candidate_circle.intersection(tool_polygon).area | |
| # if inter_area <= 0 or inter_area >= candidate_circle.area: | |
| # continue | |
| # # Verify candidate circle is not too close to any neighboring tool polygons. | |
| # too_close = False | |
| # for other_poly in all_polygons: | |
| # if other_poly.equals(tool_polygon): | |
| # continue | |
| # # Use a small buffer around the circle for safety. | |
| # if candidate_circle.buffer(0.1).intersects(other_poly): | |
| # too_close = True | |
| # if other_poly.distance(candidate_circle) < min_gap: | |
| # too_close = True | |
| # break | |
| # if too_close: | |
| # continue | |
| # # Attempt the union, using buffering to fix any potential geometry issues. | |
| # try: | |
| # union_poly = tool_polygon.union(candidate_circle) | |
| # except Exception: | |
| # union_poly = tool_polygon.buffer(0).union(candidate_circle.buffer(0)) | |
| # # Clean the unioned polygon. | |
| # union_poly = union_poly.buffer(0) | |
| # # Verify that the union is a single contiguous polygon. | |
| # if union_poly.geom_type == "MultiPolygon" and len(union_poly.geoms) > 1: | |
| # continue | |
| # # If the union did not change the tool polygon (no effective union), skip candidate. | |
| # if union_poly.equals(tool_polygon): | |
| # continue | |
| # # We have found a valid candidate. Update the centers list. | |
| # existing_centers.append(candidate_center) | |
| # return tool_polygon1, existing_centers[-1] | |
| # if fallback_triggered: | |
| # break | |
| # attempts += 1 | |
| # # Fallback: If no candidate is found (or timeout reached), use a fallback candidate. | |
| # if fallback_triggered: | |
| # print("Fallback triggered") | |
| # # Use a fallback candidate – here the median point is used. | |
| # xs = [p[0] for p in points_inch] | |
| # ys = [p[1] for p in points_inch] | |
| # candidate_center = (np.median(xs), np.median(ys)) | |
| # candidate_circle = Point(candidate_center).buffer(radius, resolution=64) | |
| # try: | |
| # union_poly = tool_polygon.union(candidate_circle) | |
| # except Exception: | |
| # union_poly = tool_polygon.buffer(0).union(candidate_circle.buffer(0)) | |
| # union_poly = union_poly.buffer(0) | |
| # # Add the fallback center to avoid duplicate placements later. | |
| # existing_centers.append(candidate_center) | |
| # return tool_polygon1, existing_centers[-1] | |
| # --------------------- | |
| # DXF Spline and Boundary Functions | |
| # --------------------- | |
| # def save_dxf_spline(inflated_contours, scaling_factor, height, finger_clearance=False): | |
| # import ezdxf # assuming ezdxf is installed | |
| # degree = 3 | |
| # closed = True | |
| # doc = ezdxf.new(units=0) | |
| # doc.units = ezdxf.units.IN | |
| # doc.header["$INSUNITS"] = ezdxf.units.IN | |
| # msp = doc.modelspace() | |
| # # Global shared lists for finger cut centers and final tool polygons. | |
| # finger_cut_centers = [] | |
| # final_polygons_inch = [] | |
| # for contour in inflated_contours: | |
| # try: | |
| # # resample_contour should be defined elsewhere; | |
| # # here it returns a list of (x, y) points. | |
| # resampled_contour = resample_contour(contour) | |
| # # Scale and flip Y coordinate according to height. | |
| # points_inch = [(x * scaling_factor, (height - y) * scaling_factor) for x, y in resampled_contour] | |
| # if len(points_inch) < 3: | |
| # continue | |
| # # Ensure the polygon is closed. | |
| # if np.linalg.norm(np.array(points_inch[0]) - np.array(points_inch[-1])) > 1e-2: | |
| # points_inch.append(points_inch[0]) | |
| # tool_polygon = build_tool_polygon(points_inch) | |
| # # Add finger clearance cuts if needed. | |
| # if finger_clearance: | |
| # tool, center = place_finger_cut_adjusted( | |
| # tool_polygon, points_inch, finger_cut_centers, final_polygons_inch, | |
| # circle_diameter=1.0, min_gap=0.25, max_attempts=100 | |
| # ) | |
| # union_poly=union_tool_and_circle(tool,center) | |
| # if union_poly is not None: | |
| # tool_polygon = union_poly | |
| # exterior_coords = polygon_to_exterior_coords(tool_polygon) | |
| # if len(exterior_coords) < 3: | |
| # continue | |
| # # Add the tool geometry to the DXF document as a spline. | |
| # msp.add_spline(exterior_coords, degree=degree, dxfattribs={"layer": "TOOLS"}) | |
| # final_polygons_inch.append(tool_polygon) | |
| # except ValueError as e: | |
| # print(f"Skipping contour: {e}") | |
| # return doc, final_polygons_inch | |
| import logging | |
| import time | |
| import signal | |
| import numpy as np | |
| import cv2 | |
| from scipy.interpolate import splprep, splev | |
| from scipy.ndimage import gaussian_filter1d | |
| from shapely.geometry import Point, Polygon | |
| import random | |
| import ezdxf | |
| import functools | |
| # Set up logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Custom TimeoutError class | |
| class TimeoutReachedError(Exception): | |
| pass | |
| # Timeout context manager | |
| class TimeoutContext: | |
| def __init__(self, seconds): | |
| self.seconds = seconds | |
| self.original_handler = None | |
| def timeout_handler(self, signum, frame): | |
| raise TimeoutReachedError(f"Function timed out after {self.seconds} seconds") | |
| def __enter__(self): | |
| if hasattr(signal, 'SIGALRM'): # Unix-like systems | |
| self.original_handler = signal.getsignal(signal.SIGALRM) | |
| signal.signal(signal.SIGALRM, self.timeout_handler) | |
| signal.alarm(self.seconds) | |
| self.start_time = time.time() | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| if hasattr(signal, 'SIGALRM'): # Unix-like systems | |
| signal.alarm(0) | |
| signal.signal(signal.SIGALRM, self.original_handler) | |
| if exc_type is TimeoutReachedError: | |
| logger.warning(f"Timeout reached: {exc_val}") | |
| return True # Suppress the exception | |
| return False | |
| def resample_contour(contour): | |
| logger.info(f"Starting resample_contour with contour of shape {contour.shape}") | |
| num_points = 1000 | |
| smoothing_factor = 5 | |
| spline_degree = 3 | |
| if len(contour) < spline_degree + 1: | |
| error_msg = f"Contour must have at least {spline_degree + 1} points, but has {len(contour)} points." | |
| logger.error(error_msg) | |
| raise ValueError(error_msg) | |
| try: | |
| contour = contour[:, 0, :] | |
| logger.debug(f"Reshaped contour to shape {contour.shape}") | |
| tck, _ = splprep([contour[:, 0], contour[:, 1]], s=smoothing_factor) | |
| logger.debug("Generated spline parameters") | |
| u = np.linspace(0, 1, num_points) | |
| resampled_points = splev(u, tck) | |
| logger.debug(f"Resampled to {num_points} points") | |
| smoothed_x = gaussian_filter1d(resampled_points[0], sigma=1) | |
| smoothed_y = gaussian_filter1d(resampled_points[1], sigma=1) | |
| result = np.array([smoothed_x, smoothed_y]).T | |
| logger.info(f"Completed resample_contour with result shape {result.shape}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error in resample_contour: {e}") | |
| raise | |
| def extract_outlines(binary_image: np.ndarray) -> (np.ndarray, list): | |
| logger.info(f"Starting extract_outlines with image shape {binary_image.shape}") | |
| try: | |
| contours, _ = cv2.findContours(binary_image, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) | |
| logger.debug(f"Found {len(contours)} contours") | |
| outline_image = np.zeros_like(binary_image) | |
| cv2.drawContours(outline_image, contours, -1, (255), thickness=2) | |
| result_image = cv2.bitwise_not(outline_image) | |
| logger.info(f"Completed extract_outlines with {len(contours)} contours") | |
| return result_image, contours | |
| except Exception as e: | |
| logger.error(f"Error in extract_outlines: {e}") | |
| raise | |
| def union_tool_and_circle(tool_polygon: Polygon, center_inch, circle_diameter=1.0): | |
| logger.info(f"Starting union_tool_and_circle with center at {center_inch}") | |
| try: | |
| radius = circle_diameter / 2.0 | |
| circle_poly = Point(center_inch).buffer(radius, resolution=64) | |
| logger.debug(f"Created circle with radius {radius} at {center_inch}") | |
| union_poly = tool_polygon.union(circle_poly) | |
| logger.info(f"Completed union_tool_and_circle, result area: {union_poly.area}") | |
| return union_poly | |
| except Exception as e: | |
| logger.error(f"Error in union_tool_and_circle: {e}") | |
| raise | |
| def build_tool_polygon(points_inch): | |
| logger.info(f"Starting build_tool_polygon with {len(points_inch)} points") | |
| try: | |
| polygon = Polygon(points_inch) | |
| logger.info(f"Completed build_tool_polygon, polygon area: {polygon.area}") | |
| return polygon | |
| except Exception as e: | |
| logger.error(f"Error in build_tool_polygon: {e}") | |
| raise | |
| # def polygon_to_exterior_coords(poly: Polygon): | |
| # logger.info(f"Starting polygon_to_exterior_coords with polygon type {poly.geom_type}") | |
| # try: | |
| # if poly.geom_type == "MultiPolygon": | |
| # logger.debug("Converting MultiPolygon to single Polygon") | |
| # biggest = max(poly.geoms, key=lambda g: g.area) | |
| # poly = biggest | |
| # if not poly.exterior: | |
| # logger.warning("Polygon has no exterior") | |
| # return [] | |
| # coords = list(poly.exterior.coords) | |
| # logger.info(f"Completed polygon_to_exterior_coords with {len(coords)} coordinates") | |
| # return coords | |
| # except Exception as e: | |
| # logger.error(f"Error in polygon_to_exterior_coords: {e}") | |
| # raise | |
| def polygon_to_exterior_coords(poly): | |
| logger.info(f"Starting polygon_to_exterior_coords with polygon type {poly.geom_type}") | |
| try: | |
| # Handle GeometryCollection case specifically | |
| if poly.geom_type == "GeometryCollection": | |
| logger.warning("Converting GeometryCollection to Polygon") | |
| # Find the largest geometry in the collection that has an exterior | |
| largest_area = 0 | |
| largest_geom = None | |
| for geom in poly.geoms: | |
| if hasattr(geom, 'area') and geom.area > largest_area: | |
| if hasattr(geom, 'exterior') or geom.geom_type == "MultiPolygon": | |
| largest_area = geom.area | |
| largest_geom = geom | |
| if largest_geom is None: | |
| logger.warning("No valid geometry found in GeometryCollection") | |
| return [] | |
| poly = largest_geom | |
| if poly.geom_type == "MultiPolygon": | |
| logger.debug("Converting MultiPolygon to single Polygon") | |
| biggest = max(poly.geoms, key=lambda g: g.area) | |
| poly = biggest | |
| if not hasattr(poly, 'exterior') or poly.exterior is None: | |
| logger.warning("Polygon has no exterior") | |
| return [] | |
| coords = list(poly.exterior.coords) | |
| logger.info(f"Completed polygon_to_exterior_coords with {len(coords)} coordinates") | |
| return coords | |
| except Exception as e: | |
| logger.error(f"Error in polygon_to_exterior_coords: {e}") | |
| # Return empty list as fallback | |
| return [] | |
| def place_finger_cut_adjusted( | |
| tool_polygon: Polygon, | |
| points_inch: list, | |
| existing_centers: list, | |
| all_polygons: list, | |
| circle_diameter: float = 1.0, | |
| min_gap: float = 0.5, | |
| max_attempts: int = 100 | |
| ) -> (Polygon, tuple): | |
| logger.info(f"Starting place_finger_cut_adjusted with {len(points_inch)} points") | |
| # Define fallback function for timeout case | |
| def fallback_solution(): | |
| logger.warning("Using fallback approach for finger cut placement") | |
| candidate_center = points_inch[len(points_inch) // 2] | |
| radius = circle_diameter / 2.0 | |
| candidate_circle = Point(candidate_center).buffer(radius, resolution=64) | |
| try: | |
| union_poly = tool_polygon.union(candidate_circle) | |
| except Exception as e: | |
| logger.warning(f"Fallback union failed, using buffer trick: {e}") | |
| union_poly = tool_polygon.buffer(0).union(candidate_circle.buffer(0)) | |
| existing_centers.append(candidate_center) | |
| logger.info(f"Used fallback finger cut at center {candidate_center}") | |
| return union_poly, candidate_center | |
| needed_center_distance = circle_diameter + min_gap | |
| radius = circle_diameter / 2.0 | |
| # Limit points to prevent timeout - use a subset for efficient processing | |
| if len(points_inch) > 100: | |
| logger.info(f"Limiting points from {len(points_inch)} to 100 for efficiency") | |
| step = len(points_inch) // 100 | |
| points_inch = points_inch[::step] | |
| # Randomize candidate points order | |
| indices = list(range(len(points_inch))) | |
| random.shuffle(indices) | |
| logger.debug(f"Shuffled {len(indices)} point indices") | |
| # Use a non-blocking timeout approach with explicit time checks | |
| start_time = time.time() | |
| timeout_seconds = 5 | |
| attempts = 0 | |
| try: | |
| while attempts < max_attempts: | |
| # Check if we're approaching the timeout | |
| current_time = time.time() | |
| if current_time - start_time > timeout_seconds - 0.1: # Leave 0.1s margin | |
| logger.warning(f"Approaching timeout after {attempts} attempts") | |
| return fallback_solution() | |
| # Process a batch of points to improve efficiency | |
| for i in indices: | |
| # Check timeout frequently | |
| if time.time() - start_time > timeout_seconds - 0.05: | |
| logger.warning("Timeout during point processing") | |
| return fallback_solution() | |
| cx, cy = points_inch[i] | |
| # Reduce the number of adjustments to speed up processing | |
| for dx, dy in [(0,0), (-0.2,0), (0.2,0), (0,0.2), (0,-0.2)]: | |
| candidate_center = (cx + dx, cy + dy) | |
| # Quick check for existing centers distance | |
| if any(np.hypot(candidate_center[0] - ex, candidate_center[1] - ey) < needed_center_distance | |
| for ex, ey in existing_centers): | |
| continue | |
| # Create candidate circle | |
| candidate_circle = Point(candidate_center).buffer(radius, resolution=32) # Reduced resolution | |
| # Quick geometric checks | |
| if tool_polygon.contains(candidate_circle) or not candidate_circle.intersects(tool_polygon): | |
| continue | |
| # Check intersection area - use simplified geometry for speed | |
| try: | |
| inter_area = candidate_circle.intersection(tool_polygon).area | |
| if inter_area <= 0 or inter_area >= candidate_circle.area: | |
| continue | |
| except Exception: | |
| continue | |
| # Quick distance check to other polygons | |
| too_close = False | |
| for other_poly in all_polygons: | |
| if other_poly.equals(tool_polygon): | |
| continue | |
| if other_poly.distance(candidate_circle) < min_gap: | |
| too_close = True | |
| break | |
| if too_close: | |
| continue | |
| # Attempt the union | |
| try: | |
| union_poly = tool_polygon.union(candidate_circle) | |
| # Check if we got a multi-polygon when we don't want one | |
| if union_poly.geom_type == "MultiPolygon" and len(union_poly.geoms) > 1: | |
| continue | |
| # Check if the union actually changed anything | |
| if union_poly.equals(tool_polygon): | |
| continue | |
| except Exception: | |
| continue | |
| # We found a valid candidate | |
| existing_centers.append(candidate_center) | |
| logger.info(f"Completed place_finger_cut_adjusted successfully at center {candidate_center}") | |
| return union_poly, candidate_center | |
| attempts += 1 | |
| # If we've made several attempts and are running out of time, use fallback | |
| if attempts >= max_attempts // 2 and (time.time() - start_time) > timeout_seconds * 0.8: | |
| logger.warning(f"Approaching timeout after {attempts} attempts") | |
| return fallback_solution() | |
| logger.debug(f"Completed attempt {attempts}/{max_attempts}") | |
| # If we reached max attempts without finding a solution | |
| logger.warning(f"No suitable finger cut found after {max_attempts} attempts, using fallback") | |
| return fallback_solution() | |
| except Exception as e: | |
| logger.error(f"Error in place_finger_cut_adjusted: {e}") | |
| return fallback_solution() | |
| def save_dxf_spline(offset_value,inflated_contours, scaling_factor, height, finger_clearance=False): | |
| logger.info(f"Starting save_dxf_spline with {len(inflated_contours)} contours") | |
| degree = 3 | |
| closed = True | |
| try: | |
| doc = ezdxf.new(units=0) | |
| doc.units = ezdxf.units.IN | |
| doc.header["$INSUNITS"] = ezdxf.units.IN | |
| msp = doc.modelspace() | |
| finger_cut_centers = [] | |
| final_polygons_inch = [] | |
| for idx, contour in enumerate(inflated_contours): | |
| logger.debug(f"Processing contour {idx+1}/{len(inflated_contours)}") | |
| try: | |
| resampled_contour = resample_contour(contour) | |
| points_inch = [(x * scaling_factor, (height - y) * scaling_factor) for x, y in resampled_contour] | |
| if len(points_inch) < 3: | |
| logger.warning(f"Skipping contour {idx}: insufficient points ({len(points_inch)})") | |
| continue | |
| if np.linalg.norm(np.array(points_inch[0]) - np.array(points_inch[-1])) > 1e-2: | |
| logger.debug("Closing contour by adding first point to end") | |
| points_inch.append(points_inch[0]) | |
| tool_polygon = build_tool_polygon(points_inch) | |
| if finger_clearance: | |
| logger.debug("Applying finger clearance") | |
| try: | |
| # Use a hard 5-second timeout for the entire finger cut operation | |
| start_time = time.time() | |
| union_poly, center = place_finger_cut_adjusted( | |
| tool_polygon, | |
| points_inch, | |
| finger_cut_centers, | |
| final_polygons_inch, | |
| circle_diameter=1.0, | |
| min_gap=(0.25+offset_value), | |
| max_attempts=100 | |
| ) | |
| # Check if we exceeded the timeout anyway | |
| if time.time() - start_time > 5: | |
| logger.warning(f"Finger cut took too long for contour {idx} ({time.time() - start_time:.2f}s)") | |
| if union_poly is not None: | |
| tool_polygon = union_poly | |
| logger.debug(f"Applied finger cut at {center}") | |
| except Exception as e: | |
| logger.warning(f"Finger cut failed for contour {idx}: {e}, using original polygon") | |
| exterior_coords = polygon_to_exterior_coords(tool_polygon) | |
| if len(exterior_coords) < 3: | |
| logger.warning(f"Skipping contour {idx}: insufficient exterior points ({len(exterior_coords)})") | |
| continue | |
| msp.add_spline(exterior_coords, degree=degree, dxfattribs={"layer": "TOOLS"}) | |
| final_polygons_inch.append(tool_polygon) | |
| logger.debug(f"Added spline for contour {idx}") | |
| except ValueError as e: | |
| logger.warning(f"Skipping contour {idx}: {e}") | |
| logger.info(f"Completed save_dxf_spline with {len(final_polygons_inch)} successful polygons") | |
| return doc, final_polygons_inch | |
| except Exception as e: | |
| logger.error(f"Error in save_dxf_spline: {e}") | |
| raise | |
| def add_rectangular_boundary(doc, polygons_inch, boundary_length, boundary_width, offset_unit, annotation_text="", image_height_in=None, image_width_in=None): | |
| msp = doc.modelspace() | |
| # Convert from mm if necessary | |
| if offset_unit.lower() == "mm": | |
| if boundary_length < 50: | |
| boundary_length = boundary_length * 25.4 | |
| if boundary_width < 50: | |
| boundary_width = boundary_width * 25.4 | |
| boundary_length_in = boundary_length / 25.4 | |
| boundary_width_in = boundary_width / 25.4 | |
| else: | |
| boundary_length_in = boundary_length | |
| boundary_width_in = boundary_width | |
| # Compute bounding box of inner contours | |
| min_x = float("inf") | |
| min_y = float("inf") | |
| max_x = -float("inf") | |
| max_y = -float("inf") | |
| for poly in polygons_inch: | |
| b = poly.bounds | |
| min_x = min(min_x, b[0]) | |
| min_y = min(min_y, b[1]) | |
| max_x = max(max_x, b[2]) | |
| max_y = max(max_y, b[3]) | |
| if min_x == float("inf"): | |
| print("No tool polygons found, skipping boundary.") | |
| return None | |
| # Compute inner bounding box dimensions | |
| inner_width = max_x - min_x | |
| inner_length = max_y - min_y | |
| # Set clearance margins | |
| clearance_side = 0.25 # left/right clearance | |
| clearance_tb = 0.25 # top/bottom clearance | |
| if annotation_text.strip(): | |
| clearance_tb = 0.75 | |
| # Calculate center of inner contours | |
| center_x = (min_x + max_x) / 2 | |
| center_y = (min_y + max_y) / 2 | |
| # Draw rectangle centered at (center_x, center_y) | |
| left = center_x - boundary_width_in / 2 | |
| right = center_x + boundary_width_in / 2 | |
| bottom = center_y - boundary_length_in / 2 | |
| top = center_y + boundary_length_in / 2 | |
| rect_coords = [(left, bottom), (right, bottom), (right, top), (left, top), (left, bottom)] | |
| from shapely.geometry import Polygon as ShapelyPolygon | |
| boundary_polygon = ShapelyPolygon(rect_coords) | |
| msp.add_lwpolyline(rect_coords, close=True, dxfattribs={"layer": "BOUNDARY"}) | |
| text_top = boundary_polygon.bounds[1] + 1 | |
| too_small = boundary_width_in < inner_width + 2 * clearance_side or boundary_length_in < inner_length + 2 * clearance_tb | |
| if too_small: | |
| raise BoundaryOverlapError("Error: The specified boundary dimensions are too small and overlap with the inner contours. Please provide larger value for boundary length and width.") | |
| if annotation_text.strip() and text_top > min_y - 0.75: | |
| raise TextOverlapError("Error: The text is too close to the inner contours. Please provide larger value for boundary length and width.") | |
| return boundary_polygon | |
| def draw_polygons_inch(polygons_inch, image_rgb, scaling_factor, image_height, color=(0,0,255), thickness=2): | |
| for poly in polygons_inch: | |
| if poly.geom_type == "MultiPolygon": | |
| for subpoly in poly.geoms: | |
| draw_single_polygon(subpoly, image_rgb, scaling_factor, image_height, color, thickness) | |
| else: | |
| draw_single_polygon(poly, image_rgb, scaling_factor, image_height, color, thickness) | |
| def draw_single_polygon(poly, image_rgb, scaling_factor, image_height, color=(0,0,255), thickness=2): | |
| ext = list(poly.exterior.coords) | |
| if len(ext) < 3: | |
| return | |
| pts_px = [] | |
| for (x_in, y_in) in ext: | |
| px = int(x_in / scaling_factor) | |
| py = int(image_height - (y_in / scaling_factor)) | |
| pts_px.append([px, py]) | |
| pts_px = np.array(pts_px, dtype=np.int32) | |
| cv2.polylines(image_rgb, [pts_px], isClosed=True, color=color, thickness=thickness, lineType=cv2.LINE_AA) | |
| # --------------------- | |
| # Main Predict Function with Finger Cut Clearance, Boundary Box, Annotation and Sharpness Enhancement | |
| # --------------------- | |
| def predict( | |
| image: Union[str, bytes, np.ndarray], | |
| offset_value: float, | |
| offset_unit: str, # "mm" or "inches" | |
| finger_clearance: str, # "Yes" or "No" | |
| add_boundary: str, # "Yes" or "No" | |
| boundary_length: float, | |
| boundary_width: float, | |
| annotation_text: str | |
| ): | |
| overall_start = time.time() | |
| # Convert image to NumPy array if needed | |
| if isinstance(image, str): | |
| if os.path.exists(image): | |
| image = np.array(Image.open(image).convert("RGB")) | |
| else: | |
| try: | |
| image = np.array(Image.open(io.BytesIO(base64.b64decode(image))).convert("RGB")) | |
| except Exception: | |
| raise ValueError("Invalid base64 image data") | |
| # Apply brightness and sharpness enhancement | |
| if isinstance(image, np.ndarray): | |
| pil_image = Image.fromarray(image) | |
| enhanced_image = ImageEnhance.Sharpness(pil_image).enhance(1.5) | |
| image = np.array(enhanced_image) | |
| # --------------------- | |
| # 1) Detect the drawer with YOLOWorld (or use original image if not detected) | |
| # --------------------- | |
| drawer_detected = True | |
| try: | |
| t = time.time() | |
| drawer_img = yolo_detect(image) | |
| print("Drawer detection completed in {:.2f} seconds".format(time.time() - t)) | |
| except DrawerNotDetectedError as e: | |
| print(f"Drawer not detected: {e}, using original image.") | |
| drawer_detected = False | |
| drawer_img = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) | |
| # Process the image (either cropped drawer or original) | |
| t = time.time() | |
| if drawer_detected: | |
| # For detected drawers: shrink and square | |
| shrunked_img = make_square(shrink_bbox(drawer_img, 0.95)) | |
| else: | |
| # For non-drawer images: keep original dimensions | |
| shrunked_img = drawer_img # Already in BGR format from above | |
| del drawer_img | |
| gc.collect() | |
| print("Image processing completed in {:.2f} seconds".format(time.time() - t)) | |
| # --------------------- | |
| # 2) Detect the reference box with YOLO (now works on either cropped or original image) | |
| # --------------------- | |
| try: | |
| t = time.time() | |
| reference_obj_img, scaling_box_coords = detect_reference_square(shrunked_img) | |
| print("Reference coin detection completed in {:.2f} seconds".format(time.time() - t)) | |
| except ReferenceBoxNotDetectedError as e: | |
| return None, None, None, None, f"Error: {str(e)}" | |
| # --------------------- | |
| # 3) Remove background of the reference box to compute scaling factor | |
| # --------------------- | |
| t = time.time() | |
| reference_obj_img = make_square(reference_obj_img) | |
| reference_square_mask = remove_bg_u2netp(reference_obj_img) | |
| reference_square_mask= resize_img(reference_square_mask,(reference_obj_img.shape[1],reference_obj_img.shape[0])) | |
| print("Reference image processing completed in {:.2f} seconds".format(time.time() - t)) | |
| t = time.time() | |
| try: | |
| cv2.imwrite("mask.jpg", cv2.cvtColor(reference_obj_img, cv2.COLOR_RGB2GRAY)) | |
| scaling_factor = calculate_scaling_factor( | |
| target_image=reference_square_mask, | |
| reference_obj_size_mm=0.955, | |
| feature_detector="ORB", | |
| ) | |
| except ZeroDivisionError: | |
| scaling_factor = None | |
| print("Error calculating scaling factor: Division by zero") | |
| except Exception as e: | |
| scaling_factor = None | |
| print(f"Error calculating scaling factor: {e}") | |
| if scaling_factor is None or scaling_factor == 0: | |
| scaling_factor = 0.7 | |
| print("Using default scaling factor of 0.7 due to calculation error") | |
| gc.collect() | |
| print("Scaling factor determined: {}".format(scaling_factor)) | |
| # --------------------- | |
| # 4) Optional boundary dimension checks (now without size limits) | |
| # --------------------- | |
| if add_boundary.lower() == "yes": | |
| if offset_unit.lower() == "mm": | |
| if boundary_length < 50: | |
| boundary_length = boundary_length * 25.4 | |
| if boundary_width < 50: | |
| boundary_width = boundary_width * 25.4 | |
| boundary_length_in = boundary_length / 25.4 | |
| boundary_width_in = boundary_width / 25.4 | |
| else: | |
| boundary_length_in = boundary_length | |
| boundary_width_in = boundary_width | |
| # --------------------- | |
| # 5) Remove background from the shrunked drawer image (main objects) | |
| # --------------------- | |
| if offset_unit.lower() == "mm": | |
| if offset_value < 1: | |
| offset_value = offset_value * 25.4 | |
| offset_inches = offset_value / 25.4 | |
| if offset_value==0: | |
| offset_value = offset_value * 25.4 | |
| offset_inches = offset_value / 25.4 | |
| offset_inches+=0.005 | |
| else: | |
| offset_inches = offset_value | |
| if offset_inches==0: | |
| offset_inches+=0.005 | |
| t = time.time() | |
| orig_size = shrunked_img.shape[:2] | |
| objects_mask = remove_bg(shrunked_img) | |
| processed_size = objects_mask.shape[:2] | |
| objects_mask = exclude_scaling_box(objects_mask, scaling_box_coords, orig_size, processed_size, expansion_factor=1.2) | |
| objects_mask = resize_img(objects_mask, (shrunked_img.shape[1], shrunked_img.shape[0])) | |
| del scaling_box_coords | |
| gc.collect() | |
| print("Object masking completed in {:.2f} seconds".format(time.time() - t)) | |
| # Dilate mask by offset_pixels | |
| t = time.time() | |
| offset_pixels = (offset_inches / scaling_factor) * 2 + 1 if scaling_factor != 0 else 1 | |
| dilated_mask = cv2.dilate(objects_mask, np.ones((int(offset_pixels), int(offset_pixels)), np.uint8)) | |
| del objects_mask | |
| gc.collect() | |
| print("Mask dilation completed in {:.2f} seconds".format(time.time() - t)) | |
| Image.fromarray(dilated_mask).save("./outputs/scaled_mask_new.jpg") | |
| # --------------------- | |
| # 6) Extract outlines from the mask and convert them to DXF splines | |
| # --------------------- | |
| t = time.time() | |
| outlines, contours = extract_outlines(dilated_mask) | |
| print("Outline extraction completed in {:.2f} seconds".format(time.time() - t)) | |
| output_img = shrunked_img.copy() | |
| del shrunked_img | |
| gc.collect() | |
| t = time.time() | |
| use_finger_clearance = True if finger_clearance.lower() == "yes" else False | |
| doc, final_polygons_inch = save_dxf_spline( | |
| offset_inches,contours, scaling_factor, processed_size[0], finger_clearance=use_finger_clearance | |
| ) | |
| del contours | |
| gc.collect() | |
| print("DXF generation completed in {:.2f} seconds".format(time.time() - t)) | |
| # --------------------- | |
| # Compute bounding box of inner tool contours BEFORE adding optional boundary | |
| # --------------------- | |
| inner_min_x = float("inf") | |
| inner_min_y = float("inf") | |
| inner_max_x = -float("inf") | |
| inner_max_y = -float("inf") | |
| for poly in final_polygons_inch: | |
| b = poly.bounds | |
| inner_min_x = min(inner_min_x, b[0]) | |
| inner_min_y = min(inner_min_y, b[1]) | |
| inner_max_x = max(inner_max_x, b[2]) | |
| inner_max_y = max(inner_max_y, b[3]) | |
| # --------------------- | |
| # 7) Add optional rectangular boundary | |
| # --------------------- | |
| boundary_polygon = None | |
| if add_boundary.lower() == "yes": | |
| boundary_polygon = add_rectangular_boundary( | |
| doc, | |
| final_polygons_inch, | |
| boundary_length, | |
| boundary_width, | |
| offset_unit, | |
| annotation_text, | |
| image_height_in=output_img.shape[0] * scaling_factor, | |
| image_width_in=output_img.shape[1] * scaling_factor | |
| ) | |
| if boundary_polygon is not None: | |
| final_polygons_inch.append(boundary_polygon) | |
| # --------------------- | |
| # 8) Add annotation text (if provided) in the DXF | |
| # --------------------- | |
| msp = doc.modelspace() | |
| if annotation_text.strip(): | |
| if boundary_polygon is not None: | |
| text_x = ((inner_min_x + inner_max_x) / 2.0) - (int(len(annotation_text.strip()) / 2.0)) | |
| text_height_dxf = 0.75 | |
| text_y_dxf = boundary_polygon.bounds[1] + 0.25 | |
| font = get_font_face("Arial") | |
| paths = text2path.make_paths_from_str( | |
| annotation_text.strip().upper(), | |
| font=font, # Use default font | |
| size=text_height_dxf, | |
| align=TextEntityAlignment.LEFT | |
| ) | |
| # Create a translation matrix | |
| translation = ezdxf.math.Matrix44.translate(text_x, text_y_dxf, 0) | |
| # Apply the translation to each path | |
| translated_paths = [p.transform(translation) for p in paths] | |
| # Render the paths as splines and polylines | |
| path.render_splines_and_polylines( | |
| msp, | |
| translated_paths, | |
| dxfattribs={"layer": "ANNOTATION", "color": 7} | |
| ) | |
| # Save the DXF | |
| dxf_filepath = os.path.join("./outputs", "out.dxf") | |
| doc.saveas(dxf_filepath) | |
| # --------------------- | |
| # 9) For the preview images, draw the polygons and place text similarly | |
| # --------------------- | |
| draw_polygons_inch(final_polygons_inch, output_img, scaling_factor, processed_size[0], color=(0, 0, 255), thickness=2) | |
| new_outlines = np.ones_like(output_img) * 255 | |
| draw_polygons_inch(final_polygons_inch, new_outlines, scaling_factor, processed_size[0], color=(0, 0, 255), thickness=2) | |
| if annotation_text.strip(): | |
| if boundary_polygon is not None: | |
| text_height_cv = 0.75 | |
| text_x_img = int(((inner_min_x + inner_max_x) / 2.0) / scaling_factor) | |
| text_y_in = boundary_polygon.bounds[1] + 0.25 | |
| text_y_img = int(processed_size[0] - (text_y_in / scaling_factor)) | |
| org = (text_x_img - int(len(annotation_text.strip()) * 6), text_y_img) | |
| # Method 2: Use two different thicknesses | |
| # Draw thicker outline | |
| temp_img = np.zeros_like(output_img) | |
| cv2.putText( | |
| temp_img, | |
| annotation_text.strip().upper(), | |
| org, | |
| cv2.FONT_HERSHEY_SIMPLEX, | |
| 2, | |
| (0, 0, 255), # Red color | |
| 4, # Thicker outline | |
| cv2.LINE_AA | |
| ) | |
| cv2.putText( | |
| temp_img, | |
| annotation_text.strip().upper(), | |
| org, | |
| cv2.FONT_HERSHEY_SIMPLEX, | |
| 2, | |
| (0, 0, 0), # Black to create hole | |
| 2, # Thinner inner part | |
| cv2.LINE_AA | |
| ) | |
| outline_mask = cv2.cvtColor(temp_img, cv2.COLOR_BGR2GRAY) | |
| _, outline_mask = cv2.threshold(outline_mask, 1, 255, cv2.THRESH_BINARY) | |
| output_img[outline_mask > 0] = temp_img[outline_mask > 0] | |
| cv2.putText( | |
| new_outlines, | |
| annotation_text.strip().upper(), | |
| org, | |
| cv2.FONT_HERSHEY_SIMPLEX, | |
| 2, | |
| (0, 0, 255), # Red color | |
| 4, # Thicker outline | |
| cv2.LINE_AA | |
| ) | |
| cv2.putText( | |
| new_outlines, | |
| annotation_text.strip().upper(), | |
| org, | |
| cv2.FONT_HERSHEY_SIMPLEX, | |
| 2, | |
| (255, 255, 255), # Inner text in white | |
| 2, # Thinner inner part | |
| cv2.LINE_AA | |
| ) | |
| else: | |
| text_height_cv = 0.75 | |
| text_x_img = int(((inner_min_x + inner_max_x) / 2.0) / scaling_factor) | |
| text_y_in = inner_min_y - 0.125 - text_height_cv | |
| text_y_img = int(processed_size[0] - (text_y_in / scaling_factor)) | |
| org = (text_x_img - int(len(annotation_text.strip()) * 6), text_y_img) | |
| cv2.putText( | |
| output_img, | |
| annotation_text.strip(), | |
| org, | |
| cv2.FONT_HERSHEY_SIMPLEX, | |
| 1.2, | |
| (0, 0, 255), | |
| 2, | |
| cv2.LINE_AA | |
| ) | |
| cv2.putText( | |
| new_outlines, | |
| annotation_text.strip(), | |
| org, | |
| cv2.FONT_HERSHEY_SIMPLEX, | |
| 1.2, | |
| (0, 0, 255), | |
| 2, | |
| cv2.LINE_AA | |
| ) | |
| outlines_color = cv2.cvtColor(new_outlines, cv2.COLOR_BGR2RGB) | |
| print("Total prediction time: {:.2f} seconds".format(time.time() - overall_start)) | |
| return ( | |
| cv2.cvtColor(output_img, cv2.COLOR_BGR2RGB), | |
| outlines_color, | |
| dxf_filepath, | |
| dilated_mask, | |
| str(scaling_factor) | |
| ) | |
| # --------------------- | |
| # Gradio Interface | |
| # --------------------- | |
| if __name__ == "__main__": | |
| os.makedirs("./outputs", exist_ok=True) | |
| def gradio_predict(img, offset, offset_unit, finger_clearance, add_boundary, boundary_length, boundary_width, annotation_text): | |
| try: | |
| return predict(img, offset, offset_unit, finger_clearance, add_boundary, boundary_length, boundary_width, annotation_text) | |
| except Exception as e: | |
| return None, None, None, None, f"Error: {str(e)}" | |
| iface = gr.Interface( | |
| fn=gradio_predict, | |
| inputs=[ | |
| gr.Image(label="Input Image"), | |
| gr.Number(label="Offset value for Mask", value=0.075), | |
| gr.Dropdown(label="Offset Unit", choices=["mm", "inches"], value="inches"), | |
| gr.Dropdown(label="Add Finger Clearance?", choices=["Yes", "No"], value="Yes"), | |
| gr.Dropdown(label="Add Rectangular Boundary?", choices=["Yes", "No"], value="Yes"), | |
| gr.Number(label="Boundary Length", value=50, precision=2), | |
| gr.Number(label="Boundary Width", value=50, precision=2), | |
| gr.Textbox(label="Annotation (max 20 chars)", max_length=20, placeholder="Type up to 20 characters") | |
| ], | |
| outputs=[ | |
| gr.Image(label="Output Image"), | |
| gr.Image(label="Outlines of Objects"), | |
| gr.File(label="DXF file"), | |
| gr.Image(label="Mask"), | |
| gr.Textbox(label="Scaling Factor (inches/pixel)") | |
| ], | |
| examples=[ | |
| ["./Test20.jpg", 0.075, "inches", "Yes", "No", 300.0, 200.0, "MyTool"], | |
| ["./Test21.jpg", 0.075, "inches", "Yes", "Yes", 300.0, 200.0, "Tool2"] | |
| ] | |
| ) | |
| iface.launch(share=True) | |