Spaces:
Sleeping
Sleeping
File size: 11,717 Bytes
9cf599c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 | import cv2
import numpy as np
import os
# Directory containing filtered images
dir_path = 'api_inference_filtered_pipeline'
output_dir = 'api_inference_labeled_boxes_pipeline'
os.makedirs(output_dir, exist_ok=True)
# IOU function for non-max suppression
def iou(boxA, boxB):
xA = max(boxA[0], boxB[0])
yA = max(boxA[1], boxB[1])
xB = min(boxA[0]+boxA[2], boxB[0]+boxB[2])
yB = min(boxA[1]+boxA[3], boxB[1]+boxB[3])
interW = max(0, xB - xA)
interH = max(0, yB - yA)
interArea = interW * interH
boxAArea = boxA[2] * boxA[3]
boxBArea = boxB[2] * boxB[3]
iou = interArea / float(boxAArea + boxBArea - interArea + 1e-6)
return iou
# Merge close bounding boxes (same label, centers within dist_thresh)
def merge_close_boxes(boxes, labels, dist_thresh=20):
merged = []
merged_labels = []
used = [False]*len(boxes)
for i in range(len(boxes)):
if used[i]:
continue
x1, y1, w1, h1 = boxes[i]
label1 = labels[i]
x2, y2, w2, h2 = x1, y1, w1, h1
for j in range(i+1, len(boxes)):
if used[j]:
continue
bx, by, bw, bh = boxes[j]
# If boxes are close (distance between centers < dist_thresh)
cx1, cy1 = x1 + w1//2, y1 + h1//2
cx2, cy2 = bx + bw//2, by + bh//2
if abs(cx1-cx2) < dist_thresh and abs(cy1-cy2) < dist_thresh and label1 == labels[j]:
# Merge boxes
x2 = min(x2, bx)
y2 = min(y2, by)
w2 = max(x1+w1, bx+bw) - x2
h2 = max(y1+h1, by+bh) - y2
used[j] = True
merged.append((x2, y2, w2, h2))
merged_labels.append(label1)
used[i] = True
return merged, merged_labels
# Non-max suppression using IOU
def non_max_suppression_iou(boxes, labels, iou_thresh=0.4):
if len(boxes) == 0:
return [], []
idxs = np.argsort([w*h for (x, y, w, h) in boxes])[::-1]
keep = []
keep_labels = []
while len(idxs) > 0:
i = idxs[0]
keep.append(boxes[i])
keep_labels.append(labels[i])
remove = [0]
for j in range(1, len(idxs)):
if iou(boxes[i], boxes[idxs[j]]) > iou_thresh:
remove.append(j)
idxs = np.delete(idxs, remove)
return keep, keep_labels
# Filter out potential boxes that contain a faulty box inside
def filter_faulty_inside_potential(boxes, labels):
filtered_boxes = []
filtered_labels = []
for i, (box, label) in enumerate(zip(boxes, labels)):
if label == 'Point Overload (Potential)':
# Check if any faulty box is inside this potential box
keep = True
for j, (fbox, flabel) in enumerate(zip(boxes, labels)):
if flabel == 'Point Overload (Faulty)':
# Check if faulty box is inside potential box
x, y, w, h = box
fx, fy, fw, fh = fbox
if fx >= x and fy >= y and fx+fw <= x+w and fy+fh <= y+h:
keep = False
break
if keep:
filtered_boxes.append(box)
filtered_labels.append(label)
else:
filtered_boxes.append(box)
filtered_labels.append(label)
return filtered_boxes, filtered_labels
# Remove potential boxes that overlap with a faulty box (not just inside)
def filter_faulty_overlapping_potential(boxes, labels):
# Remove potential boxes that overlap at all with a faulty box (any intersection)
filtered_boxes = []
filtered_labels = []
def is_overlapping(boxA, boxB):
xA = max(boxA[0], boxB[0])
yA = max(boxA[1], boxB[1])
xB = min(boxA[0]+boxA[2], boxB[0]+boxB[2])
yB = min(boxA[1]+boxA[3], boxB[1]+boxB[3])
return (xB > xA) and (yB > yA)
for i, (box, label) in enumerate(zip(boxes, labels)):
if label == 'Point Overload (Potential)':
keep = True
for j, (fbox, flabel) in enumerate(zip(boxes, labels)):
if flabel == 'Point Overload (Faulty)':
if is_overlapping(box, fbox):
keep = False
break
if keep:
filtered_boxes.append(box)
filtered_labels.append(label)
else:
filtered_boxes.append(box)
filtered_labels.append(label)
return filtered_boxes, filtered_labels
# Heuristic classification function
def classify_image(img_path):
img = cv2.imread(img_path)
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
# Color masks
blue_mask = cv2.inRange(hsv, (90, 50, 20), (130, 255, 255))
black_mask = cv2.inRange(hsv, (0, 0, 0), (180, 255, 50))
yellow_mask = cv2.inRange(hsv, (20, 130, 130), (35, 255, 255)) # increased threshold
orange_mask = cv2.inRange(hsv, (10, 100, 100), (25, 255, 255))
red_mask1 = cv2.inRange(hsv, (0, 100, 100), (10, 255, 255))
red_mask2 = cv2.inRange(hsv, (160, 100, 100), (180, 255, 255))
red_mask = cv2.bitwise_or(red_mask1, red_mask2)
total = img.shape[0] * img.shape[1]
blue_count = np.sum(blue_mask > 0)
black_count = np.sum(black_mask > 0)
yellow_count = np.sum(yellow_mask > 0)
orange_count = np.sum(orange_mask > 0)
red_count = np.sum(red_mask > 0)
label = 'Unknown'
box_list = []
label_list = []
# Full image checks
if (blue_count + black_count) / total > 0.8:
label = 'Normal'
elif (red_count + orange_count) / total > 0.5:
label = 'Full Wire Overload'
elif (yellow_count) / total > 0.5:
label = 'Full Wire Overload'
# Check for full wire overload (entire image reddish or yellowish)
full_wire_thresh = 0.7 # 70% of image is reddish or yellowish
if (red_count + orange_count + yellow_count) / total > full_wire_thresh:
label = 'Full Wire Overload'
# Add a box covering the whole image
box_list.append((0, 0, img.shape[1], img.shape[0]))
label_list.append(label)
else:
# Small spot checks (improved: filter tiny spots, merge overlapping boxes)
min_area_faulty = 120 # increased min area for red/orange (faulty)
min_area_potential = 1000 # much higher min area for yellow (potential)
max_area = 0.05 * total
# Faulty (red/orange) spots
for mask, spot_label, min_a in [
(red_mask, 'Point Overload (Faulty)', min_area_faulty),
(yellow_mask, 'Point Overload (Potential)', min_area_potential)
]:
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for cnt in contours:
area = cv2.contourArea(cnt)
if min_a < area < max_area:
x, y, w, h = cv2.boundingRect(cnt)
box_list.append((x, y, w, h))
label_list.append(spot_label)
# Middle area checks
h, w = img.shape[:2]
center = img[h//4:3*h//4, w//4:3*w//4]
center_hsv = cv2.cvtColor(center, cv2.COLOR_BGR2HSV)
center_yellow = cv2.inRange(center_hsv, (20, 130, 130), (35, 255, 255))
center_orange = cv2.inRange(center_hsv, (10, 100, 100), (25, 255, 255))
center_red1 = cv2.inRange(center_hsv, (0, 100, 100), (10, 255, 255))
center_red2 = cv2.inRange(center_hsv, (160, 100, 100), (180, 255, 255))
center_red = cv2.bitwise_or(center_red1, center_red2)
if np.sum(center_red > 0) + np.sum(center_orange > 0) > 0.1 * center.size:
label = 'Loose Joint (Faulty)'
box_list.append((w//4, h//4, w//2, h//2))
label_list.append(label)
elif np.sum(center_yellow > 0) > 0.1 * center.size:
label = 'Loose Joint (Potential)'
box_list.append((w//4, h//4, w//2, h//2))
label_list.append(label)
# Always check for tiny spots, even if image is labeled as Normal
min_area_tiny = 10
max_area_tiny = 30
for mask, spot_label in [
(red_mask, 'Tiny Faulty Spot'),
(yellow_mask, 'Tiny Potential Spot')
]:
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for cnt in contours:
area = cv2.contourArea(cnt)
if min_area_tiny < area < max_area_tiny:
x, y, w, h = cv2.boundingRect(cnt)
box_list.append((x, y, w, h))
label_list.append(spot_label)
# Detect wire-shaped (long, thin) regions for wire overloads only
aspect_ratio_thresh = 5
min_strip_area = 0.01 * total
wire_boxes = []
wire_labels = []
for mask, strip_label in [
(red_mask, 'Wire Overload (Red Strip)'),
(yellow_mask, 'Wire Overload (Yellow Strip)'),
(orange_mask, 'Wire Overload (Orange Strip)')
]:
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for cnt in contours:
area = cv2.contourArea(cnt)
if area > min_strip_area:
x, y, w, h = cv2.boundingRect(cnt)
aspect_ratio = max(w, h) / (min(w, h) + 1e-6)
if aspect_ratio > aspect_ratio_thresh:
wire_boxes.append((x, y, w, h))
wire_labels.append(strip_label)
# Add wire overloads to box_list/label_list
box_list = wire_boxes[:]
label_list = wire_labels[:]
# For point overloads, do not require wire shape
min_area_faulty = 120
min_area_potential = 1000
max_area = 0.05 * total
for mask, spot_label, min_a in [
(red_mask, 'Point Overload (Faulty)', min_area_faulty),
(yellow_mask, 'Point Overload (Potential)', min_area_potential)
]:
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for cnt in contours:
area = cv2.contourArea(cnt)
if min_a < area < max_area:
x, y, w, h = cv2.boundingRect(cnt)
box_list.append((x, y, w, h))
label_list.append(spot_label)
# Remove overlapping boxes using IOU
box_list, label_list = non_max_suppression_iou(box_list, label_list, iou_thresh=0.4)
box_list, label_list = filter_faulty_inside_potential(box_list, label_list)
box_list, label_list = filter_faulty_overlapping_potential(box_list, label_list)
box_list, label_list = merge_close_boxes(box_list, label_list, dist_thresh=100)
return label, box_list, label_list, img
# Batch process all images in the directory
for fname in os.listdir(dir_path):
if not fname.lower().endswith(('.jpg', '.jpeg', '.png')):
continue
label, box_list, label_list, img = classify_image(os.path.join(dir_path, fname))
# Load the original (unfiltered) image for drawing boxes
orig_dir = 'api_inference_pred_masks' # or the directory with original images
orig_img_path = os.path.join(orig_dir, fname)
if os.path.exists(orig_img_path):
draw_img = cv2.imread(orig_img_path)
if draw_img is None:
draw_img = img.copy()
else:
draw_img = img.copy()
# Draw bounding boxes and labels on the original image
for (x, y, w, h), l in zip(box_list, label_list):
cv2.rectangle(draw_img, (x, y), (x+w, y+h), (0, 0, 255), 2)
cv2.putText(draw_img, l, (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
if not box_list:
cv2.putText(draw_img, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2)
out_path = os.path.join(output_dir, fname)
cv2.imwrite(out_path, draw_img)
print(f"{fname}: {label} (saved with boxes on original image)")
|