|
|
import cv2, os, json |
|
|
import numpy as np |
|
|
from math import atan2, cos, sin, sqrt, pi |
|
|
from sklearn.cluster import DBSCAN |
|
|
from sklearn.preprocessing import StandardScaler |
|
|
from sklearn.cluster import OPTICS, cluster_optics_dbscan |
|
|
from tqdm import tqdm |
|
|
from glob import glob |
|
|
|
|
|
def nms(boxes, thresh): |
|
|
if len(boxes) == 0: |
|
|
return [] |
|
|
|
|
|
pick = [] |
|
|
x1, y1, x2, y2 = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] |
|
|
area = (x2 - x1 + 1) * (y2 - y1 + 1) |
|
|
idxs = np.argsort(y2) |
|
|
|
|
|
while len(idxs) > 0: |
|
|
last = len(idxs) - 1 |
|
|
i = idxs[last] |
|
|
pick.append(i) |
|
|
|
|
|
xx1 = np.maximum(x1[i], x1[idxs[:last]]) |
|
|
yy1 = np.maximum(y1[i], y1[idxs[:last]]) |
|
|
xx2 = np.minimum(x2[i], x2[idxs[:last]]) |
|
|
yy2 = np.minimum(y2[i], y2[idxs[:last]]) |
|
|
|
|
|
w = np.maximum(0, xx2 - xx1 + 1) |
|
|
h = np.maximum(0, yy2 - yy1 + 1) |
|
|
|
|
|
overlap = (w * h) / area[idxs[:last]] |
|
|
idxs = np.delete(idxs, np.concatenate(([last], np.where(overlap > thresh)[0]))) |
|
|
|
|
|
return boxes[pick] |
|
|
|
|
|
def find_pts_on_line(og, slope, d): |
|
|
cx, cy = og |
|
|
x1 = float(cx - d / ((1 + slope**2)**0.5)) |
|
|
y1 = float(cy - slope * cx + x1 * slope) |
|
|
return (x1, y1) |
|
|
|
|
|
def cluster(img_path, im, save_dir): |
|
|
img = cv2.imread(img_path) |
|
|
imgray = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE) |
|
|
|
|
|
ret, binary_map = cv2.threshold(imgray, 127, 255, 0) |
|
|
|
|
|
nlabels, labels, stats, centroids = cv2.connectedComponentsWithStats(binary_map, None, None, None, 8, cv2.CV_32S) |
|
|
areas = stats[1:, cv2.CC_STAT_AREA] |
|
|
result = np.zeros((labels.shape), np.uint8) |
|
|
for i in range(0, nlabels - 1): |
|
|
if areas[i] >= 250: |
|
|
result[labels == i + 1] = 255 |
|
|
re_copy = result.copy() |
|
|
|
|
|
edgeimg = cv2.Canny(result, 10, 150) |
|
|
|
|
|
size = np.size(result) |
|
|
skel = np.zeros(result.shape, np.uint8) |
|
|
element = cv2.getStructuringElement(cv2.MORPH_CROSS, (3, 3)) |
|
|
|
|
|
while True: |
|
|
open_ = cv2.morphologyEx(result, cv2.MORPH_OPEN, element) |
|
|
temp = cv2.subtract(result, open_) |
|
|
eroded = cv2.erode(result, element) |
|
|
skel = cv2.bitwise_or(skel, temp) |
|
|
result = eroded.copy() |
|
|
if cv2.countNonZero(result) == 0: |
|
|
break |
|
|
|
|
|
nlabels, labels, stats, centroids = cv2.connectedComponentsWithStats(skel, None, None, None, 8, cv2.CV_32S) |
|
|
areas = stats[1:, cv2.CC_STAT_AREA] |
|
|
skel = np.zeros((labels.shape), np.uint8) |
|
|
for i in range(0, nlabels - 1): |
|
|
if areas[i] >= 2: |
|
|
skel[labels == i + 1] = 255 |
|
|
|
|
|
|
|
|
base_name = os.path.splitext(im)[0] |
|
|
os.makedirs(save_dir, exist_ok=True) |
|
|
cv2.imwrite(os.path.join(save_dir, f"Skeleton_{base_name}.png"), skel) |
|
|
|
|
|
combined_img = cv2.addWeighted(skel, 1, edgeimg, 1, 0) |
|
|
cv2.imwrite(os.path.join(save_dir, f"SkeletonContour_{base_name}.png"), combined_img) |
|
|
|
|
|
filter_size = (10, 10) |
|
|
white_pixels = np.where(skel == 255) |
|
|
x_coords, y_coords = white_pixels[1], white_pixels[0] |
|
|
|
|
|
x1 = x_coords - filter_size[0] // 2 |
|
|
y1 = y_coords - filter_size[1] // 2 |
|
|
x2 = x_coords + filter_size[0] // 2 |
|
|
y2 = y_coords + filter_size[1] // 2 |
|
|
|
|
|
white_regions = np.column_stack((x1, y1, x2, y2)) |
|
|
white_regions = nms(white_regions, thresh=0.1) |
|
|
|
|
|
skeleton_image = cv2.cvtColor(skel.copy(), cv2.COLOR_GRAY2BGR) |
|
|
filtered_image = cv2.cvtColor(re_copy.copy(), cv2.COLOR_GRAY2BGR) |
|
|
center_points, directions = [], [] |
|
|
|
|
|
def get_direction2(bbox_pixels): |
|
|
nonzero_indices = np.column_stack(np.nonzero(bbox_pixels)) |
|
|
nonzero_indices = np.float32(nonzero_indices) |
|
|
if len(nonzero_indices) >= 2: |
|
|
mean, eigenvectors = cv2.PCACompute(nonzero_indices, mean=None) |
|
|
cntr = ((mean[0, 1]), (mean[0, 0])) |
|
|
return eigenvectors[0], cntr |
|
|
else: |
|
|
return (0, 0), (0, 0) |
|
|
|
|
|
for coor in white_regions: |
|
|
x1, y1, x2, y2 = coor |
|
|
bbox_pixels = skel[int(y1):int(y2), int(x1):int(x2)] |
|
|
direction, mean = get_direction2(bbox_pixels) |
|
|
directions.append(direction) |
|
|
center_points.append((mean[0] + x1, mean[1] + y1)) |
|
|
cv2.rectangle(skeleton_image, (int(x1), int(y1)), (int(x2), int(y2)), (255, 255, 255), 1) |
|
|
cv2.circle(skeleton_image, (int(mean[0] + x1), int(mean[1] + y1)), 1, (255, 0, 0), -1) |
|
|
|
|
|
cv2.imwrite(os.path.join(save_dir, f"Result_{base_name}.png"), skeleton_image) |
|
|
|
|
|
pts_group, bbox_group = [], [] |
|
|
for idx, pts in enumerate(center_points): |
|
|
if 640 > pts[0] > 0 and 480 > pts[1] > 0: |
|
|
pts_group.append([int(pts[0]), int(pts[1])]) |
|
|
x1, y1, x2, y2 = white_regions[idx] |
|
|
bbox_group.append([int(x1), int(y1), int(x2), int(y2)]) |
|
|
|
|
|
return pts_group, bbox_group |
|
|
|
|
|
|
|
|
mask_dir = "datasets/seg_train" |
|
|
save_json_dir = "datasets" |
|
|
save_img_dir = os.path.join(save_json_dir, "output") |
|
|
|
|
|
patterns = ['*.png', '*.jpg', '*.jpeg', '*.PNG', '*.JPG', '*.JPEG'] |
|
|
files = [] |
|
|
for p in patterns: |
|
|
files.extend(glob(os.path.join(mask_dir, p))) |
|
|
files = sorted(set(files)) |
|
|
print(f"Found {len(files)} files in {mask_dir}") |
|
|
|
|
|
file_dict = {} |
|
|
bbox_dict = {} |
|
|
|
|
|
for filepath in tqdm(files): |
|
|
filename = os.path.basename(filepath) |
|
|
pts, bbox = cluster(filepath, filename, save_img_dir) |
|
|
if len(pts) != 0: |
|
|
file_dict[filename] = pts |
|
|
bbox_dict[filename] = bbox |
|
|
|
|
|
with open(os.path.join(save_json_dir, 'train_seg_points.json'), 'w') as json_file: |
|
|
json.dump(file_dict, json_file) |
|
|
|
|
|
with open(os.path.join(save_json_dir, 'train_bbox_points.json'), 'w') as json_file: |
|
|
json.dump(bbox_dict, json_file) |
|
|
|