Spaces:
Runtime error
Runtime error
Commit ·
928c611
1
Parent(s): de6ab73
Remove all comments from code
Browse files- app.py +2 -2
- download_model.py +1 -1
- main.py +1 -1
- src/autonomous_vision/detectors/depth_estimator.py +18 -18
- src/autonomous_vision/detectors/lane_detector.py +23 -23
- src/autonomous_vision/detectors/traffic_light_analyzer.py +11 -11
- src/autonomous_vision/detectors/traffic_sign_detector.py +36 -36
- src/autonomous_vision/detectors/yolo_detector.py +11 -11
- src/autonomous_vision/models/detections.py +9 -9
- src/autonomous_vision/pipeline/annotator.py +10 -10
- src/autonomous_vision/pipeline/processor.py +22 -22
app.py
CHANGED
|
@@ -1,14 +1,14 @@
|
|
| 1 |
import os
|
| 2 |
import sys
|
| 3 |
|
| 4 |
-
|
| 5 |
sys.path.append(os.path.join(os.path.dirname(__file__), "src"))
|
| 6 |
|
| 7 |
from autonomous_vision.app import create_demo, THEME, CUSTOM_CSS
|
| 8 |
|
| 9 |
if __name__ == "__main__":
|
| 10 |
demo = create_demo()
|
| 11 |
-
|
| 12 |
demo.launch(
|
| 13 |
allowed_paths=["."]
|
| 14 |
)
|
|
|
|
| 1 |
import os
|
| 2 |
import sys
|
| 3 |
|
| 4 |
+
|
| 5 |
sys.path.append(os.path.join(os.path.dirname(__file__), "src"))
|
| 6 |
|
| 7 |
from autonomous_vision.app import create_demo, THEME, CUSTOM_CSS
|
| 8 |
|
| 9 |
if __name__ == "__main__":
|
| 10 |
demo = create_demo()
|
| 11 |
+
|
| 12 |
demo.launch(
|
| 13 |
allowed_paths=["."]
|
| 14 |
)
|
download_model.py
CHANGED
|
@@ -9,7 +9,7 @@ hf_hub_download(
|
|
| 9 |
local_dir="src/autonomous_vision/models",
|
| 10 |
local_dir_use_symlinks=False
|
| 11 |
)
|
| 12 |
-
|
| 13 |
if os.path.exists("src/autonomous_vision/models/weights/best.pt"):
|
| 14 |
os.rename("src/autonomous_vision/models/weights/best.pt", "src/autonomous_vision/models/yolov8_traffic.pt")
|
| 15 |
os.rmdir("src/autonomous_vision/models/weights")
|
|
|
|
| 9 |
local_dir="src/autonomous_vision/models",
|
| 10 |
local_dir_use_symlinks=False
|
| 11 |
)
|
| 12 |
+
|
| 13 |
if os.path.exists("src/autonomous_vision/models/weights/best.pt"):
|
| 14 |
os.rename("src/autonomous_vision/models/weights/best.pt", "src/autonomous_vision/models/yolov8_traffic.pt")
|
| 15 |
os.rmdir("src/autonomous_vision/models/weights")
|
main.py
CHANGED
|
@@ -1,4 +1,4 @@
|
|
| 1 |
-
|
| 2 |
"""Entry point for Autonomous Vision System."""
|
| 3 |
|
| 4 |
from autonomous_vision.app import main
|
|
|
|
| 1 |
+
|
| 2 |
"""Entry point for Autonomous Vision System."""
|
| 3 |
|
| 4 |
from autonomous_vision.app import main
|
src/autonomous_vision/detectors/depth_estimator.py
CHANGED
|
@@ -15,10 +15,10 @@ class DepthEstimator(BaseDetector):
|
|
| 15 |
approximate distances to detected objects.
|
| 16 |
"""
|
| 17 |
|
| 18 |
-
|
| 19 |
-
|
| 20 |
-
FOCAL_LENGTH = 700
|
| 21 |
-
REFERENCE_DISTANCE = 10.0
|
| 22 |
|
| 23 |
def __init__(
|
| 24 |
self,
|
|
@@ -80,7 +80,7 @@ class DepthEstimator(BaseDetector):
|
|
| 80 |
try:
|
| 81 |
from PIL import Image
|
| 82 |
|
| 83 |
-
|
| 84 |
h, w = frame.shape[:2]
|
| 85 |
scale = 384 / max(h, w)
|
| 86 |
new_h, new_w = int(h * scale), int(w * scale)
|
|
@@ -89,11 +89,11 @@ class DepthEstimator(BaseDetector):
|
|
| 89 |
rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
|
| 90 |
pil_img = Image.fromarray(rgb)
|
| 91 |
|
| 92 |
-
|
| 93 |
result = self._pipeline(pil_img)
|
| 94 |
depth_map = np.array(result['depth'])
|
| 95 |
|
| 96 |
-
|
| 97 |
self._depth_map = cv2.resize(depth_map, (w, h))
|
| 98 |
|
| 99 |
return self._depth_map
|
|
@@ -125,18 +125,18 @@ class DepthEstimator(BaseDetector):
|
|
| 125 |
if not (0 <= x < w and 0 <= y < h):
|
| 126 |
return None
|
| 127 |
|
| 128 |
-
|
| 129 |
depth_value = depth_map[y, x]
|
| 130 |
|
| 131 |
if depth_value <= 0:
|
| 132 |
return 100.0
|
| 133 |
|
| 134 |
-
|
| 135 |
-
|
| 136 |
-
|
| 137 |
distance = 400.0 / depth_value
|
| 138 |
|
| 139 |
-
|
| 140 |
return min(max(distance, 1.5), 100.0)
|
| 141 |
|
| 142 |
def get_distance_for_bbox(
|
|
@@ -160,7 +160,7 @@ class DepthEstimator(BaseDetector):
|
|
| 160 |
x1, y1, x2, y2 = bbox
|
| 161 |
h, w = depth_map.shape[:2]
|
| 162 |
|
| 163 |
-
|
| 164 |
x1 = max(0, x1)
|
| 165 |
y1 = max(0, y1)
|
| 166 |
x2 = min(w, x2)
|
|
@@ -169,8 +169,8 @@ class DepthEstimator(BaseDetector):
|
|
| 169 |
if x1 >= x2 or y1 >= y2:
|
| 170 |
return None
|
| 171 |
|
| 172 |
-
|
| 173 |
-
|
| 174 |
center_x1 = x1 + (x2 - x1) // 4
|
| 175 |
center_x2 = x2 - (x2 - x1) // 4
|
| 176 |
center_y1 = y1 + (y2 - y1) // 4
|
|
@@ -181,7 +181,7 @@ class DepthEstimator(BaseDetector):
|
|
| 181 |
if depth_roi.size == 0:
|
| 182 |
return None
|
| 183 |
|
| 184 |
-
|
| 185 |
median_depth = np.median(depth_roi)
|
| 186 |
|
| 187 |
return self.get_distance_at_point(0, 0, np.full((1, 1), median_depth))
|
|
@@ -198,11 +198,11 @@ class DepthEstimator(BaseDetector):
|
|
| 198 |
Returns:
|
| 199 |
Colorized depth visualization (BGR).
|
| 200 |
"""
|
| 201 |
-
|
| 202 |
normalized = cv2.normalize(depth_map, None, 0, 255, cv2.NORM_MINMAX)
|
| 203 |
normalized = normalized.astype(np.uint8)
|
| 204 |
|
| 205 |
-
|
| 206 |
colored = cv2.applyColorMap(normalized, colormap)
|
| 207 |
|
| 208 |
return colored
|
|
|
|
| 15 |
approximate distances to detected objects.
|
| 16 |
"""
|
| 17 |
|
| 18 |
+
|
| 19 |
+
|
| 20 |
+
FOCAL_LENGTH = 700
|
| 21 |
+
REFERENCE_DISTANCE = 10.0
|
| 22 |
|
| 23 |
def __init__(
|
| 24 |
self,
|
|
|
|
| 80 |
try:
|
| 81 |
from PIL import Image
|
| 82 |
|
| 83 |
+
|
| 84 |
h, w = frame.shape[:2]
|
| 85 |
scale = 384 / max(h, w)
|
| 86 |
new_h, new_w = int(h * scale), int(w * scale)
|
|
|
|
| 89 |
rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
|
| 90 |
pil_img = Image.fromarray(rgb)
|
| 91 |
|
| 92 |
+
|
| 93 |
result = self._pipeline(pil_img)
|
| 94 |
depth_map = np.array(result['depth'])
|
| 95 |
|
| 96 |
+
|
| 97 |
self._depth_map = cv2.resize(depth_map, (w, h))
|
| 98 |
|
| 99 |
return self._depth_map
|
|
|
|
| 125 |
if not (0 <= x < w and 0 <= y < h):
|
| 126 |
return None
|
| 127 |
|
| 128 |
+
|
| 129 |
depth_value = depth_map[y, x]
|
| 130 |
|
| 131 |
if depth_value <= 0:
|
| 132 |
return 100.0
|
| 133 |
|
| 134 |
+
|
| 135 |
+
|
| 136 |
+
|
| 137 |
distance = 400.0 / depth_value
|
| 138 |
|
| 139 |
+
|
| 140 |
return min(max(distance, 1.5), 100.0)
|
| 141 |
|
| 142 |
def get_distance_for_bbox(
|
|
|
|
| 160 |
x1, y1, x2, y2 = bbox
|
| 161 |
h, w = depth_map.shape[:2]
|
| 162 |
|
| 163 |
+
|
| 164 |
x1 = max(0, x1)
|
| 165 |
y1 = max(0, y1)
|
| 166 |
x2 = min(w, x2)
|
|
|
|
| 169 |
if x1 >= x2 or y1 >= y2:
|
| 170 |
return None
|
| 171 |
|
| 172 |
+
|
| 173 |
+
|
| 174 |
center_x1 = x1 + (x2 - x1) // 4
|
| 175 |
center_x2 = x2 - (x2 - x1) // 4
|
| 176 |
center_y1 = y1 + (y2 - y1) // 4
|
|
|
|
| 181 |
if depth_roi.size == 0:
|
| 182 |
return None
|
| 183 |
|
| 184 |
+
|
| 185 |
median_depth = np.median(depth_roi)
|
| 186 |
|
| 187 |
return self.get_distance_at_point(0, 0, np.full((1, 1), median_depth))
|
|
|
|
| 198 |
Returns:
|
| 199 |
Colorized depth visualization (BGR).
|
| 200 |
"""
|
| 201 |
+
|
| 202 |
normalized = cv2.normalize(depth_map, None, 0, 255, cv2.NORM_MINMAX)
|
| 203 |
normalized = normalized.astype(np.uint8)
|
| 204 |
|
| 205 |
+
|
| 206 |
colored = cv2.applyColorMap(normalized, colormap)
|
| 207 |
|
| 208 |
return colored
|
src/autonomous_vision/detectors/lane_detector.py
CHANGED
|
@@ -17,9 +17,9 @@ from autonomous_vision.models.detections import LaneInfo
|
|
| 17 |
class LaneDetector(BaseDetector):
|
| 18 |
"""Lane detector using semantic segmentation."""
|
| 19 |
|
| 20 |
-
|
| 21 |
-
ROAD_CLASS = 0
|
| 22 |
-
SIDEWALK_CLASS = 1
|
| 23 |
|
| 24 |
def __init__(
|
| 25 |
self,
|
|
@@ -62,7 +62,7 @@ class LaneDetector(BaseDetector):
|
|
| 62 |
try:
|
| 63 |
from PIL import Image
|
| 64 |
|
| 65 |
-
|
| 66 |
h, w = frame.shape[:2]
|
| 67 |
scale = 512 / max(h, w)
|
| 68 |
new_h, new_w = int(h * scale), int(w * scale)
|
|
@@ -71,10 +71,10 @@ class LaneDetector(BaseDetector):
|
|
| 71 |
rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
|
| 72 |
pil_img = Image.fromarray(rgb)
|
| 73 |
|
| 74 |
-
|
| 75 |
results = self._seg_pipeline(pil_img)
|
| 76 |
|
| 77 |
-
|
| 78 |
road_mask = None
|
| 79 |
for result in results:
|
| 80 |
label = result.get('label', '').lower()
|
|
@@ -86,7 +86,7 @@ class LaneDetector(BaseDetector):
|
|
| 86 |
road_mask = cv2.bitwise_or(road_mask, mask)
|
| 87 |
|
| 88 |
if road_mask is not None:
|
| 89 |
-
|
| 90 |
road_mask = cv2.resize(road_mask, (w, h), interpolation=cv2.INTER_NEAREST)
|
| 91 |
|
| 92 |
return road_mask
|
|
@@ -99,26 +99,26 @@ class LaneDetector(BaseDetector):
|
|
| 99 |
"""Extract left and right lane boundaries from road mask."""
|
| 100 |
height, width = mask.shape[:2]
|
| 101 |
|
| 102 |
-
|
| 103 |
if mask.dtype != np.uint8:
|
| 104 |
mask = (mask > 0).astype(np.uint8) * 255
|
| 105 |
|
| 106 |
-
|
| 107 |
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
| 108 |
|
| 109 |
if not contours:
|
| 110 |
return None, None
|
| 111 |
|
| 112 |
-
|
| 113 |
main_contour = max(contours, key=cv2.contourArea)
|
| 114 |
|
| 115 |
-
|
| 116 |
left_points = []
|
| 117 |
right_points = []
|
| 118 |
|
| 119 |
-
|
| 120 |
for y in range(int(height * 0.55), height, 10):
|
| 121 |
-
|
| 122 |
mask_row = mask[y, :]
|
| 123 |
road_pixels = np.where(mask_row > 0)[0]
|
| 124 |
|
|
@@ -129,7 +129,7 @@ class LaneDetector(BaseDetector):
|
|
| 129 |
left_points.append((int(left_x), int(y)))
|
| 130 |
right_points.append((int(right_x), int(y)))
|
| 131 |
|
| 132 |
-
return left_points if len(left_points) >= 3 else None,
|
| 133 |
right_points if len(right_points) >= 3 else None
|
| 134 |
|
| 135 |
def _create_lane_mask_cv(self, frame: np.ndarray) -> np.ndarray:
|
|
@@ -140,22 +140,22 @@ class LaneDetector(BaseDetector):
|
|
| 140 |
lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB)
|
| 141 |
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
| 142 |
|
| 143 |
-
|
| 144 |
l_channel = hls[:, :, 1]
|
| 145 |
white_mask = cv2.inRange(l_channel, 200, 255)
|
| 146 |
|
| 147 |
-
|
| 148 |
b_channel = lab[:, :, 2]
|
| 149 |
yellow_mask = cv2.inRange(b_channel, 145, 200)
|
| 150 |
|
| 151 |
color_mask = cv2.bitwise_or(white_mask, yellow_mask)
|
| 152 |
|
| 153 |
-
|
| 154 |
blur = cv2.GaussianBlur(gray, (5, 5), 0)
|
| 155 |
edges = cv2.Canny(blur, 50, 150)
|
| 156 |
combined = cv2.bitwise_or(color_mask, edges)
|
| 157 |
|
| 158 |
-
|
| 159 |
roi_mask = np.zeros_like(combined)
|
| 160 |
roi_pts = np.array([
|
| 161 |
[int(width * 0.0), height],
|
|
@@ -219,7 +219,7 @@ class LaneDetector(BaseDetector):
|
|
| 219 |
if not history:
|
| 220 |
return current
|
| 221 |
|
| 222 |
-
|
| 223 |
avg = np.mean(list(history), axis=0)
|
| 224 |
return [(int(x), int(y)) for x, y in avg]
|
| 225 |
|
|
@@ -231,13 +231,13 @@ class LaneDetector(BaseDetector):
|
|
| 231 |
left_points = None
|
| 232 |
right_points = None
|
| 233 |
|
| 234 |
-
|
| 235 |
if self._seg_available:
|
| 236 |
road_mask = self._segment_road(frame)
|
| 237 |
if road_mask is not None:
|
| 238 |
left_points, right_points = self._extract_lanes_from_mask(road_mask)
|
| 239 |
|
| 240 |
-
|
| 241 |
if left_points is None or right_points is None:
|
| 242 |
cv_mask = self._create_lane_mask_cv(frame)
|
| 243 |
cv_left, cv_right = self._detect_lanes_hough(cv_mask, width, height)
|
|
@@ -246,11 +246,11 @@ class LaneDetector(BaseDetector):
|
|
| 246 |
if right_points is None:
|
| 247 |
right_points = cv_right
|
| 248 |
|
| 249 |
-
|
| 250 |
left_smooth = self._smooth_points(left_points, self._left_history)
|
| 251 |
right_smooth = self._smooth_points(right_points, self._right_history)
|
| 252 |
|
| 253 |
-
|
| 254 |
center_offset = 0.0
|
| 255 |
if left_smooth and right_smooth:
|
| 256 |
left_x = left_smooth[-1][0]
|
|
|
|
| 17 |
class LaneDetector(BaseDetector):
|
| 18 |
"""Lane detector using semantic segmentation."""
|
| 19 |
|
| 20 |
+
|
| 21 |
+
ROAD_CLASS = 0
|
| 22 |
+
SIDEWALK_CLASS = 1
|
| 23 |
|
| 24 |
def __init__(
|
| 25 |
self,
|
|
|
|
| 62 |
try:
|
| 63 |
from PIL import Image
|
| 64 |
|
| 65 |
+
|
| 66 |
h, w = frame.shape[:2]
|
| 67 |
scale = 512 / max(h, w)
|
| 68 |
new_h, new_w = int(h * scale), int(w * scale)
|
|
|
|
| 71 |
rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
|
| 72 |
pil_img = Image.fromarray(rgb)
|
| 73 |
|
| 74 |
+
|
| 75 |
results = self._seg_pipeline(pil_img)
|
| 76 |
|
| 77 |
+
|
| 78 |
road_mask = None
|
| 79 |
for result in results:
|
| 80 |
label = result.get('label', '').lower()
|
|
|
|
| 86 |
road_mask = cv2.bitwise_or(road_mask, mask)
|
| 87 |
|
| 88 |
if road_mask is not None:
|
| 89 |
+
|
| 90 |
road_mask = cv2.resize(road_mask, (w, h), interpolation=cv2.INTER_NEAREST)
|
| 91 |
|
| 92 |
return road_mask
|
|
|
|
| 99 |
"""Extract left and right lane boundaries from road mask."""
|
| 100 |
height, width = mask.shape[:2]
|
| 101 |
|
| 102 |
+
|
| 103 |
if mask.dtype != np.uint8:
|
| 104 |
mask = (mask > 0).astype(np.uint8) * 255
|
| 105 |
|
| 106 |
+
|
| 107 |
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
| 108 |
|
| 109 |
if not contours:
|
| 110 |
return None, None
|
| 111 |
|
| 112 |
+
|
| 113 |
main_contour = max(contours, key=cv2.contourArea)
|
| 114 |
|
| 115 |
+
|
| 116 |
left_points = []
|
| 117 |
right_points = []
|
| 118 |
|
| 119 |
+
|
| 120 |
for y in range(int(height * 0.55), height, 10):
|
| 121 |
+
|
| 122 |
mask_row = mask[y, :]
|
| 123 |
road_pixels = np.where(mask_row > 0)[0]
|
| 124 |
|
|
|
|
| 129 |
left_points.append((int(left_x), int(y)))
|
| 130 |
right_points.append((int(right_x), int(y)))
|
| 131 |
|
| 132 |
+
return left_points if len(left_points) >= 3 else None,\
|
| 133 |
right_points if len(right_points) >= 3 else None
|
| 134 |
|
| 135 |
def _create_lane_mask_cv(self, frame: np.ndarray) -> np.ndarray:
|
|
|
|
| 140 |
lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB)
|
| 141 |
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
| 142 |
|
| 143 |
+
|
| 144 |
l_channel = hls[:, :, 1]
|
| 145 |
white_mask = cv2.inRange(l_channel, 200, 255)
|
| 146 |
|
| 147 |
+
|
| 148 |
b_channel = lab[:, :, 2]
|
| 149 |
yellow_mask = cv2.inRange(b_channel, 145, 200)
|
| 150 |
|
| 151 |
color_mask = cv2.bitwise_or(white_mask, yellow_mask)
|
| 152 |
|
| 153 |
+
|
| 154 |
blur = cv2.GaussianBlur(gray, (5, 5), 0)
|
| 155 |
edges = cv2.Canny(blur, 50, 150)
|
| 156 |
combined = cv2.bitwise_or(color_mask, edges)
|
| 157 |
|
| 158 |
+
|
| 159 |
roi_mask = np.zeros_like(combined)
|
| 160 |
roi_pts = np.array([
|
| 161 |
[int(width * 0.0), height],
|
|
|
|
| 219 |
if not history:
|
| 220 |
return current
|
| 221 |
|
| 222 |
+
|
| 223 |
avg = np.mean(list(history), axis=0)
|
| 224 |
return [(int(x), int(y)) for x, y in avg]
|
| 225 |
|
|
|
|
| 231 |
left_points = None
|
| 232 |
right_points = None
|
| 233 |
|
| 234 |
+
|
| 235 |
if self._seg_available:
|
| 236 |
road_mask = self._segment_road(frame)
|
| 237 |
if road_mask is not None:
|
| 238 |
left_points, right_points = self._extract_lanes_from_mask(road_mask)
|
| 239 |
|
| 240 |
+
|
| 241 |
if left_points is None or right_points is None:
|
| 242 |
cv_mask = self._create_lane_mask_cv(frame)
|
| 243 |
cv_left, cv_right = self._detect_lanes_hough(cv_mask, width, height)
|
|
|
|
| 246 |
if right_points is None:
|
| 247 |
right_points = cv_right
|
| 248 |
|
| 249 |
+
|
| 250 |
left_smooth = self._smooth_points(left_points, self._left_history)
|
| 251 |
right_smooth = self._smooth_points(right_points, self._right_history)
|
| 252 |
|
| 253 |
+
|
| 254 |
center_offset = 0.0
|
| 255 |
if left_smooth and right_smooth:
|
| 256 |
left_x = left_smooth[-1][0]
|
src/autonomous_vision/detectors/traffic_light_analyzer.py
CHANGED
|
@@ -18,11 +18,11 @@ class TrafficLightAnalyzer(BaseDetector):
|
|
| 18 |
"""
|
| 19 |
super().__init__(confidence_threshold)
|
| 20 |
|
| 21 |
-
|
| 22 |
self.color_ranges = {
|
| 23 |
TrafficLightState.RED: [
|
| 24 |
-
((0, 100, 100), (10, 255, 255)),
|
| 25 |
-
((160, 100, 100), (180, 255, 255)),
|
| 26 |
],
|
| 27 |
TrafficLightState.YELLOW: [
|
| 28 |
((15, 100, 100), (35, 255, 255)),
|
|
@@ -65,10 +65,10 @@ class TrafficLightAnalyzer(BaseDetector):
|
|
| 65 |
"""
|
| 66 |
self.ensure_initialized()
|
| 67 |
|
| 68 |
-
|
| 69 |
x1, y1, x2, y2 = detection.bbox
|
| 70 |
|
| 71 |
-
|
| 72 |
height, width = frame.shape[:2]
|
| 73 |
pad = 5
|
| 74 |
x1 = max(0, x1 - pad)
|
|
@@ -95,10 +95,10 @@ class TrafficLightAnalyzer(BaseDetector):
|
|
| 95 |
if roi.size == 0:
|
| 96 |
return TrafficLightState.UNKNOWN
|
| 97 |
|
| 98 |
-
|
| 99 |
hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
|
| 100 |
|
| 101 |
-
|
| 102 |
color_scores: dict[TrafficLightState, float] = {}
|
| 103 |
|
| 104 |
total_pixels = roi.shape[0] * roi.shape[1]
|
|
@@ -112,19 +112,19 @@ class TrafficLightAnalyzer(BaseDetector):
|
|
| 112 |
color_mask = cv2.inRange(hsv, lower_np, upper_np)
|
| 113 |
mask = cv2.bitwise_or(mask, color_mask)
|
| 114 |
|
| 115 |
-
|
| 116 |
pixel_count = cv2.countNonZero(mask)
|
| 117 |
color_scores[state] = pixel_count / total_pixels
|
| 118 |
|
| 119 |
-
|
| 120 |
if not color_scores:
|
| 121 |
return TrafficLightState.UNKNOWN
|
| 122 |
|
| 123 |
max_state = max(color_scores, key=lambda x: color_scores[x])
|
| 124 |
max_score = color_scores[max_state]
|
| 125 |
|
| 126 |
-
|
| 127 |
-
if max_score < 0.05:
|
| 128 |
return TrafficLightState.UNKNOWN
|
| 129 |
|
| 130 |
return max_state
|
|
|
|
| 18 |
"""
|
| 19 |
super().__init__(confidence_threshold)
|
| 20 |
|
| 21 |
+
|
| 22 |
self.color_ranges = {
|
| 23 |
TrafficLightState.RED: [
|
| 24 |
+
((0, 100, 100), (10, 255, 255)),
|
| 25 |
+
((160, 100, 100), (180, 255, 255)),
|
| 26 |
],
|
| 27 |
TrafficLightState.YELLOW: [
|
| 28 |
((15, 100, 100), (35, 255, 255)),
|
|
|
|
| 65 |
"""
|
| 66 |
self.ensure_initialized()
|
| 67 |
|
| 68 |
+
|
| 69 |
x1, y1, x2, y2 = detection.bbox
|
| 70 |
|
| 71 |
+
|
| 72 |
height, width = frame.shape[:2]
|
| 73 |
pad = 5
|
| 74 |
x1 = max(0, x1 - pad)
|
|
|
|
| 95 |
if roi.size == 0:
|
| 96 |
return TrafficLightState.UNKNOWN
|
| 97 |
|
| 98 |
+
|
| 99 |
hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
|
| 100 |
|
| 101 |
+
|
| 102 |
color_scores: dict[TrafficLightState, float] = {}
|
| 103 |
|
| 104 |
total_pixels = roi.shape[0] * roi.shape[1]
|
|
|
|
| 112 |
color_mask = cv2.inRange(hsv, lower_np, upper_np)
|
| 113 |
mask = cv2.bitwise_or(mask, color_mask)
|
| 114 |
|
| 115 |
+
|
| 116 |
pixel_count = cv2.countNonZero(mask)
|
| 117 |
color_scores[state] = pixel_count / total_pixels
|
| 118 |
|
| 119 |
+
|
| 120 |
if not color_scores:
|
| 121 |
return TrafficLightState.UNKNOWN
|
| 122 |
|
| 123 |
max_state = max(color_scores, key=lambda x: color_scores[x])
|
| 124 |
max_score = color_scores[max_state]
|
| 125 |
|
| 126 |
+
|
| 127 |
+
if max_score < 0.05:
|
| 128 |
return TrafficLightState.UNKNOWN
|
| 129 |
|
| 130 |
return max_state
|
src/autonomous_vision/detectors/traffic_sign_detector.py
CHANGED
|
@@ -15,7 +15,7 @@ from autonomous_vision.detectors.base import BaseDetector
|
|
| 15 |
from autonomous_vision.models.detections import Detection
|
| 16 |
|
| 17 |
|
| 18 |
-
|
| 19 |
GTSRB_CLASSES = {
|
| 20 |
0: ("speed_limit_20", "Speed Limit 20"),
|
| 21 |
1: ("speed_limit_30", "Speed Limit 30"),
|
|
@@ -62,7 +62,7 @@ GTSRB_CLASSES = {
|
|
| 62 |
42: ("end_no_overtaking_trucks", "End No Overtaking Trucks"),
|
| 63 |
}
|
| 64 |
|
| 65 |
-
|
| 66 |
DK_CLASS_MAPPING = {
|
| 67 |
0: "End All Limits", 1: "No Parking", 2: "No Stopping", 3: "Speed Limit 100", 4: "End Speed 100",
|
| 68 |
5: "Speed Limit 110", 6: "End Speed 110", 7: "Speed Limit 120", 8: "Speed Limit 30", 9: "Speed Limit 40",
|
|
@@ -70,15 +70,15 @@ DK_CLASS_MAPPING = {
|
|
| 70 |
15: "Speed Limit 70", 16: "End Speed 70", 17: "Speed Limit 90", 18: "End Speed 90"
|
| 71 |
}
|
| 72 |
|
| 73 |
-
|
| 74 |
SIGN_COLORS = {
|
| 75 |
-
"speed": (0, 165, 255),
|
| 76 |
"limit": (0, 165, 255),
|
| 77 |
-
"no_": (0, 0, 255),
|
| 78 |
-
"stop": (0, 0, 200),
|
| 79 |
"yield": (0, 0, 200),
|
| 80 |
-
"priority": (0, 100, 200),
|
| 81 |
-
"curve": (0, 200, 255),
|
| 82 |
"bumpy": (0, 200, 255),
|
| 83 |
"slippery": (0, 200, 255),
|
| 84 |
"road_": (0, 200, 255),
|
|
@@ -89,12 +89,12 @@ SIGN_COLORS = {
|
|
| 89 |
"ice": (0, 200, 255),
|
| 90 |
"wild": (0, 200, 255),
|
| 91 |
"general": (0, 200, 255),
|
| 92 |
-
"turn": (255, 100, 0),
|
| 93 |
"ahead": (255, 100, 0),
|
| 94 |
"straight": (255, 100, 0),
|
| 95 |
"keep": (255, 100, 0),
|
| 96 |
"roundabout": (255, 100, 0),
|
| 97 |
-
"end": (100, 100, 100),
|
| 98 |
}
|
| 99 |
|
| 100 |
|
|
@@ -104,7 +104,7 @@ def _get_color(class_name: str) -> tuple[int, int, int]:
|
|
| 104 |
for key, color in SIGN_COLORS.items():
|
| 105 |
if key in lower:
|
| 106 |
return color
|
| 107 |
-
return (128, 0, 255)
|
| 108 |
|
| 109 |
|
| 110 |
class TrafficSignDetector(BaseDetector):
|
|
@@ -124,7 +124,7 @@ class TrafficSignDetector(BaseDetector):
|
|
| 124 |
|
| 125 |
def initialize(self) -> None:
|
| 126 |
"""Initialize detection and classification models."""
|
| 127 |
-
|
| 128 |
traffic_model_path = Path(__file__).parent.parent / "models" / "yolov8_traffic.pt"
|
| 129 |
|
| 130 |
try:
|
|
@@ -138,7 +138,7 @@ class TrafficSignDetector(BaseDetector):
|
|
| 138 |
except Exception as e:
|
| 139 |
print(f"YOLO loading failed: {e}")
|
| 140 |
|
| 141 |
-
|
| 142 |
try:
|
| 143 |
from transformers import pipeline
|
| 144 |
self._classifier = pipeline(
|
|
@@ -157,36 +157,36 @@ class TrafficSignDetector(BaseDetector):
|
|
| 157 |
h, w = frame.shape[:2]
|
| 158 |
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
|
| 159 |
|
| 160 |
-
|
| 161 |
red_mask1 = cv2.inRange(hsv, np.array([0, 70, 50]), np.array([10, 255, 255]))
|
| 162 |
red_mask2 = cv2.inRange(hsv, np.array([160, 70, 50]), np.array([180, 255, 255]))
|
| 163 |
red_mask = cv2.bitwise_or(red_mask1, red_mask2)
|
| 164 |
|
| 165 |
-
|
| 166 |
blue_mask = cv2.inRange(hsv, np.array([100, 70, 50]), np.array([130, 255, 255]))
|
| 167 |
|
| 168 |
-
|
| 169 |
yellow_mask = cv2.inRange(hsv, np.array([15, 70, 50]), np.array([35, 255, 255]))
|
| 170 |
|
| 171 |
-
|
| 172 |
combined_mask = cv2.bitwise_or(red_mask, blue_mask)
|
| 173 |
combined_mask = cv2.bitwise_or(combined_mask, yellow_mask)
|
| 174 |
|
| 175 |
-
|
| 176 |
kernel = np.ones((3, 3), np.uint8)
|
| 177 |
combined_mask = cv2.morphologyEx(combined_mask, cv2.MORPH_OPEN, kernel)
|
| 178 |
combined_mask = cv2.morphologyEx(combined_mask, cv2.MORPH_CLOSE, kernel, iterations=2)
|
| 179 |
|
| 180 |
-
|
| 181 |
contours, _ = cv2.findContours(combined_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
| 182 |
|
| 183 |
for contour in contours:
|
| 184 |
area = cv2.contourArea(contour)
|
| 185 |
-
|
| 186 |
if 300 < area < (h * w * 0.2):
|
| 187 |
x, y, bw, bh = cv2.boundingRect(contour)
|
| 188 |
aspect = bw / bh if bh > 0 else 0
|
| 189 |
-
|
| 190 |
if 0.5 < aspect < 2.5:
|
| 191 |
pad = int(max(bw, bh) * 0.15)
|
| 192 |
x1 = max(0, x - pad)
|
|
@@ -195,7 +195,7 @@ class TrafficSignDetector(BaseDetector):
|
|
| 195 |
y2 = min(h, y + bh + pad)
|
| 196 |
regions.append((x1, y1, x2, y2))
|
| 197 |
|
| 198 |
-
return regions[:15]
|
| 199 |
|
| 200 |
def _classify_sign(self, roi: np.ndarray) -> Optional[tuple[str, str, float]]:
|
| 201 |
"""Classify a traffic sign ROI using GTSRB classifier."""
|
|
@@ -204,7 +204,7 @@ class TrafficSignDetector(BaseDetector):
|
|
| 204 |
|
| 205 |
try:
|
| 206 |
rgb = cv2.cvtColor(roi, cv2.COLOR_BGR2RGB)
|
| 207 |
-
|
| 208 |
if rgb.shape[0] < 16 or rgb.shape[1] < 16:
|
| 209 |
return None
|
| 210 |
|
|
@@ -259,10 +259,10 @@ class TrafficSignDetector(BaseDetector):
|
|
| 259 |
detections: list[Detection] = []
|
| 260 |
covered_regions = []
|
| 261 |
|
| 262 |
-
|
| 263 |
if self._yolo_model:
|
| 264 |
try:
|
| 265 |
-
|
| 266 |
yolo_results = self._yolo_model(frame, verbose=False, conf=0.4)
|
| 267 |
for result in yolo_results:
|
| 268 |
for box in result.boxes:
|
|
@@ -273,14 +273,14 @@ class TrafficSignDetector(BaseDetector):
|
|
| 273 |
|
| 274 |
label_text = DK_CLASS_MAPPING.get(cls_id, "Traffic Sign")
|
| 275 |
|
| 276 |
-
|
| 277 |
-
|
| 278 |
verified_label = label_text
|
| 279 |
verified_color = _get_color(label_text)
|
| 280 |
|
| 281 |
-
|
| 282 |
h, w = frame.shape[:2]
|
| 283 |
-
|
| 284 |
pad_x = int((x2 - x1) * 0.1)
|
| 285 |
pad_y = int((y2 - y1) * 0.1)
|
| 286 |
c_x1 = max(0, x1 - pad_x)
|
|
@@ -293,14 +293,14 @@ class TrafficSignDetector(BaseDetector):
|
|
| 293 |
cls_result = self._classify_sign(roi)
|
| 294 |
if cls_result:
|
| 295 |
cls_name, cls_eng, cls_score = cls_result
|
| 296 |
-
|
| 297 |
-
|
| 298 |
-
|
| 299 |
if cls_score > 0.4:
|
| 300 |
verified_label = f"{cls_eng}"
|
| 301 |
verified_color = _get_color(cls_name)
|
| 302 |
|
| 303 |
-
|
| 304 |
detections.append(Detection(
|
| 305 |
class_name="traffic_sign",
|
| 306 |
confidence=conf,
|
|
@@ -312,11 +312,11 @@ class TrafficSignDetector(BaseDetector):
|
|
| 312 |
except Exception as e:
|
| 313 |
print(f"YOLO detect error: {e}")
|
| 314 |
|
| 315 |
-
|
| 316 |
hsv_regions = self._find_sign_regions_hsv(frame)
|
| 317 |
|
| 318 |
for region in hsv_regions:
|
| 319 |
-
|
| 320 |
is_duplicate = False
|
| 321 |
for covered in covered_regions:
|
| 322 |
if self._iou(region, covered) > 0.3:
|
|
@@ -326,7 +326,7 @@ class TrafficSignDetector(BaseDetector):
|
|
| 326 |
if is_duplicate:
|
| 327 |
continue
|
| 328 |
|
| 329 |
-
|
| 330 |
x1, y1, x2, y2 = region
|
| 331 |
roi = frame[y1:y2, x1:x2]
|
| 332 |
|
|
|
|
| 15 |
from autonomous_vision.models.detections import Detection
|
| 16 |
|
| 17 |
|
| 18 |
+
|
| 19 |
GTSRB_CLASSES = {
|
| 20 |
0: ("speed_limit_20", "Speed Limit 20"),
|
| 21 |
1: ("speed_limit_30", "Speed Limit 30"),
|
|
|
|
| 62 |
42: ("end_no_overtaking_trucks", "End No Overtaking Trucks"),
|
| 63 |
}
|
| 64 |
|
| 65 |
+
|
| 66 |
DK_CLASS_MAPPING = {
|
| 67 |
0: "End All Limits", 1: "No Parking", 2: "No Stopping", 3: "Speed Limit 100", 4: "End Speed 100",
|
| 68 |
5: "Speed Limit 110", 6: "End Speed 110", 7: "Speed Limit 120", 8: "Speed Limit 30", 9: "Speed Limit 40",
|
|
|
|
| 70 |
15: "Speed Limit 70", 16: "End Speed 70", 17: "Speed Limit 90", 18: "End Speed 90"
|
| 71 |
}
|
| 72 |
|
| 73 |
+
|
| 74 |
SIGN_COLORS = {
|
| 75 |
+
"speed": (0, 165, 255),
|
| 76 |
"limit": (0, 165, 255),
|
| 77 |
+
"no_": (0, 0, 255),
|
| 78 |
+
"stop": (0, 0, 200),
|
| 79 |
"yield": (0, 0, 200),
|
| 80 |
+
"priority": (0, 100, 200),
|
| 81 |
+
"curve": (0, 200, 255),
|
| 82 |
"bumpy": (0, 200, 255),
|
| 83 |
"slippery": (0, 200, 255),
|
| 84 |
"road_": (0, 200, 255),
|
|
|
|
| 89 |
"ice": (0, 200, 255),
|
| 90 |
"wild": (0, 200, 255),
|
| 91 |
"general": (0, 200, 255),
|
| 92 |
+
"turn": (255, 100, 0),
|
| 93 |
"ahead": (255, 100, 0),
|
| 94 |
"straight": (255, 100, 0),
|
| 95 |
"keep": (255, 100, 0),
|
| 96 |
"roundabout": (255, 100, 0),
|
| 97 |
+
"end": (100, 100, 100),
|
| 98 |
}
|
| 99 |
|
| 100 |
|
|
|
|
| 104 |
for key, color in SIGN_COLORS.items():
|
| 105 |
if key in lower:
|
| 106 |
return color
|
| 107 |
+
return (128, 0, 255)
|
| 108 |
|
| 109 |
|
| 110 |
class TrafficSignDetector(BaseDetector):
|
|
|
|
| 124 |
|
| 125 |
def initialize(self) -> None:
|
| 126 |
"""Initialize detection and classification models."""
|
| 127 |
+
|
| 128 |
traffic_model_path = Path(__file__).parent.parent / "models" / "yolov8_traffic.pt"
|
| 129 |
|
| 130 |
try:
|
|
|
|
| 138 |
except Exception as e:
|
| 139 |
print(f"YOLO loading failed: {e}")
|
| 140 |
|
| 141 |
+
|
| 142 |
try:
|
| 143 |
from transformers import pipeline
|
| 144 |
self._classifier = pipeline(
|
|
|
|
| 157 |
h, w = frame.shape[:2]
|
| 158 |
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
|
| 159 |
|
| 160 |
+
|
| 161 |
red_mask1 = cv2.inRange(hsv, np.array([0, 70, 50]), np.array([10, 255, 255]))
|
| 162 |
red_mask2 = cv2.inRange(hsv, np.array([160, 70, 50]), np.array([180, 255, 255]))
|
| 163 |
red_mask = cv2.bitwise_or(red_mask1, red_mask2)
|
| 164 |
|
| 165 |
+
|
| 166 |
blue_mask = cv2.inRange(hsv, np.array([100, 70, 50]), np.array([130, 255, 255]))
|
| 167 |
|
| 168 |
+
|
| 169 |
yellow_mask = cv2.inRange(hsv, np.array([15, 70, 50]), np.array([35, 255, 255]))
|
| 170 |
|
| 171 |
+
|
| 172 |
combined_mask = cv2.bitwise_or(red_mask, blue_mask)
|
| 173 |
combined_mask = cv2.bitwise_or(combined_mask, yellow_mask)
|
| 174 |
|
| 175 |
+
|
| 176 |
kernel = np.ones((3, 3), np.uint8)
|
| 177 |
combined_mask = cv2.morphologyEx(combined_mask, cv2.MORPH_OPEN, kernel)
|
| 178 |
combined_mask = cv2.morphologyEx(combined_mask, cv2.MORPH_CLOSE, kernel, iterations=2)
|
| 179 |
|
| 180 |
+
|
| 181 |
contours, _ = cv2.findContours(combined_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
| 182 |
|
| 183 |
for contour in contours:
|
| 184 |
area = cv2.contourArea(contour)
|
| 185 |
+
|
| 186 |
if 300 < area < (h * w * 0.2):
|
| 187 |
x, y, bw, bh = cv2.boundingRect(contour)
|
| 188 |
aspect = bw / bh if bh > 0 else 0
|
| 189 |
+
|
| 190 |
if 0.5 < aspect < 2.5:
|
| 191 |
pad = int(max(bw, bh) * 0.15)
|
| 192 |
x1 = max(0, x - pad)
|
|
|
|
| 195 |
y2 = min(h, y + bh + pad)
|
| 196 |
regions.append((x1, y1, x2, y2))
|
| 197 |
|
| 198 |
+
return regions[:15]
|
| 199 |
|
| 200 |
def _classify_sign(self, roi: np.ndarray) -> Optional[tuple[str, str, float]]:
|
| 201 |
"""Classify a traffic sign ROI using GTSRB classifier."""
|
|
|
|
| 204 |
|
| 205 |
try:
|
| 206 |
rgb = cv2.cvtColor(roi, cv2.COLOR_BGR2RGB)
|
| 207 |
+
|
| 208 |
if rgb.shape[0] < 16 or rgb.shape[1] < 16:
|
| 209 |
return None
|
| 210 |
|
|
|
|
| 259 |
detections: list[Detection] = []
|
| 260 |
covered_regions = []
|
| 261 |
|
| 262 |
+
|
| 263 |
if self._yolo_model:
|
| 264 |
try:
|
| 265 |
+
|
| 266 |
yolo_results = self._yolo_model(frame, verbose=False, conf=0.4)
|
| 267 |
for result in yolo_results:
|
| 268 |
for box in result.boxes:
|
|
|
|
| 273 |
|
| 274 |
label_text = DK_CLASS_MAPPING.get(cls_id, "Traffic Sign")
|
| 275 |
|
| 276 |
+
|
| 277 |
+
|
| 278 |
verified_label = label_text
|
| 279 |
verified_color = _get_color(label_text)
|
| 280 |
|
| 281 |
+
|
| 282 |
h, w = frame.shape[:2]
|
| 283 |
+
|
| 284 |
pad_x = int((x2 - x1) * 0.1)
|
| 285 |
pad_y = int((y2 - y1) * 0.1)
|
| 286 |
c_x1 = max(0, x1 - pad_x)
|
|
|
|
| 293 |
cls_result = self._classify_sign(roi)
|
| 294 |
if cls_result:
|
| 295 |
cls_name, cls_eng, cls_score = cls_result
|
| 296 |
+
|
| 297 |
+
|
| 298 |
+
|
| 299 |
if cls_score > 0.4:
|
| 300 |
verified_label = f"{cls_eng}"
|
| 301 |
verified_color = _get_color(cls_name)
|
| 302 |
|
| 303 |
+
|
| 304 |
detections.append(Detection(
|
| 305 |
class_name="traffic_sign",
|
| 306 |
confidence=conf,
|
|
|
|
| 312 |
except Exception as e:
|
| 313 |
print(f"YOLO detect error: {e}")
|
| 314 |
|
| 315 |
+
|
| 316 |
hsv_regions = self._find_sign_regions_hsv(frame)
|
| 317 |
|
| 318 |
for region in hsv_regions:
|
| 319 |
+
|
| 320 |
is_duplicate = False
|
| 321 |
for covered in covered_regions:
|
| 322 |
if self._iou(region, covered) > 0.3:
|
|
|
|
| 326 |
if is_duplicate:
|
| 327 |
continue
|
| 328 |
|
| 329 |
+
|
| 330 |
x1, y1, x2, y2 = region
|
| 331 |
roi = frame[y1:y2, x1:x2]
|
| 332 |
|
src/autonomous_vision/detectors/yolo_detector.py
CHANGED
|
@@ -10,7 +10,7 @@ from autonomous_vision.detectors.base import BaseDetector
|
|
| 10 |
from autonomous_vision.models.detections import Detection
|
| 11 |
|
| 12 |
|
| 13 |
-
|
| 14 |
COCO_CLASSES = {
|
| 15 |
0: "person",
|
| 16 |
1: "bicycle",
|
|
@@ -22,19 +22,19 @@ COCO_CLASSES = {
|
|
| 22 |
11: "stop_sign",
|
| 23 |
}
|
| 24 |
|
| 25 |
-
|
| 26 |
CLASS_COLORS = {
|
| 27 |
-
"person": (0, 255, 255),
|
| 28 |
-
"bicycle": (255, 165, 0),
|
| 29 |
-
"car": (255, 100, 0),
|
| 30 |
-
"motorcycle": (255, 0, 128),
|
| 31 |
-
"bus": (0, 128, 255),
|
| 32 |
-
"truck": (0, 100, 200),
|
| 33 |
-
"traffic_light": (0, 255, 0),
|
| 34 |
-
"stop_sign": (0, 0, 255),
|
| 35 |
}
|
| 36 |
|
| 37 |
-
|
| 38 |
ENGLISH_LABELS = {
|
| 39 |
"person": "Pedestrian",
|
| 40 |
"bicycle": "Cyclist",
|
|
|
|
| 10 |
from autonomous_vision.models.detections import Detection
|
| 11 |
|
| 12 |
|
| 13 |
+
|
| 14 |
COCO_CLASSES = {
|
| 15 |
0: "person",
|
| 16 |
1: "bicycle",
|
|
|
|
| 22 |
11: "stop_sign",
|
| 23 |
}
|
| 24 |
|
| 25 |
+
|
| 26 |
CLASS_COLORS = {
|
| 27 |
+
"person": (0, 255, 255),
|
| 28 |
+
"bicycle": (255, 165, 0),
|
| 29 |
+
"car": (255, 100, 0),
|
| 30 |
+
"motorcycle": (255, 0, 128),
|
| 31 |
+
"bus": (0, 128, 255),
|
| 32 |
+
"truck": (0, 100, 200),
|
| 33 |
+
"traffic_light": (0, 255, 0),
|
| 34 |
+
"stop_sign": (0, 0, 255),
|
| 35 |
}
|
| 36 |
|
| 37 |
+
|
| 38 |
ENGLISH_LABELS = {
|
| 39 |
"person": "Pedestrian",
|
| 40 |
"bicycle": "Cyclist",
|
src/autonomous_vision/models/detections.py
CHANGED
|
@@ -17,17 +17,17 @@ class TrafficLightState(Enum):
|
|
| 17 |
class DetectionClass(Enum):
|
| 18 |
"""Detection class categories."""
|
| 19 |
|
| 20 |
-
|
| 21 |
CAR = "car"
|
| 22 |
TRUCK = "truck"
|
| 23 |
BUS = "bus"
|
| 24 |
MOTORCYCLE = "motorcycle"
|
| 25 |
BICYCLE = "bicycle"
|
| 26 |
|
| 27 |
-
|
| 28 |
PERSON = "person"
|
| 29 |
|
| 30 |
-
|
| 31 |
TRAFFIC_LIGHT = "traffic_light"
|
| 32 |
TRAFFIC_SIGN = "traffic_sign"
|
| 33 |
|
|
@@ -38,10 +38,10 @@ class Detection:
|
|
| 38 |
|
| 39 |
class_name: str
|
| 40 |
confidence: float
|
| 41 |
-
bbox: tuple[int, int, int, int]
|
| 42 |
-
label: str = ""
|
| 43 |
-
color: tuple[int, int, int] = (0, 255, 0)
|
| 44 |
-
distance: Optional[float] = None
|
| 45 |
|
| 46 |
@property
|
| 47 |
def center(self) -> tuple[int, int]:
|
|
@@ -60,9 +60,9 @@ class Detection:
|
|
| 60 |
class LaneInfo:
|
| 61 |
"""Information about detected lanes."""
|
| 62 |
|
| 63 |
-
left_lane: Optional[list[tuple[int, int]]] = None
|
| 64 |
right_lane: Optional[list[tuple[int, int]]] = None
|
| 65 |
-
center_offset: float = 0.0
|
| 66 |
lane_width: Optional[int] = None
|
| 67 |
|
| 68 |
|
|
|
|
| 17 |
class DetectionClass(Enum):
|
| 18 |
"""Detection class categories."""
|
| 19 |
|
| 20 |
+
|
| 21 |
CAR = "car"
|
| 22 |
TRUCK = "truck"
|
| 23 |
BUS = "bus"
|
| 24 |
MOTORCYCLE = "motorcycle"
|
| 25 |
BICYCLE = "bicycle"
|
| 26 |
|
| 27 |
+
|
| 28 |
PERSON = "person"
|
| 29 |
|
| 30 |
+
|
| 31 |
TRAFFIC_LIGHT = "traffic_light"
|
| 32 |
TRAFFIC_SIGN = "traffic_sign"
|
| 33 |
|
|
|
|
| 38 |
|
| 39 |
class_name: str
|
| 40 |
confidence: float
|
| 41 |
+
bbox: tuple[int, int, int, int]
|
| 42 |
+
label: str = ""
|
| 43 |
+
color: tuple[int, int, int] = (0, 255, 0)
|
| 44 |
+
distance: Optional[float] = None
|
| 45 |
|
| 46 |
@property
|
| 47 |
def center(self) -> tuple[int, int]:
|
|
|
|
| 60 |
class LaneInfo:
|
| 61 |
"""Information about detected lanes."""
|
| 62 |
|
| 63 |
+
left_lane: Optional[list[tuple[int, int]]] = None
|
| 64 |
right_lane: Optional[list[tuple[int, int]]] = None
|
| 65 |
+
center_offset: float = 0.0
|
| 66 |
lane_width: Optional[int] = None
|
| 67 |
|
| 68 |
|
src/autonomous_vision/pipeline/annotator.py
CHANGED
|
@@ -29,10 +29,10 @@ class FrameAnnotator:
|
|
| 29 |
x1, y1, x2, y2 = detection.bbox
|
| 30 |
color = detection.color
|
| 31 |
|
| 32 |
-
|
| 33 |
cv2.rectangle(frame, (x1, y1), (x2, y2), color, self.thickness)
|
| 34 |
|
| 35 |
-
|
| 36 |
corner_len = min(20, (x2 - x1) // 4, (y2 - y1) // 4)
|
| 37 |
cv2.line(frame, (x1, y1), (x1 + corner_len, y1), color, self.thickness + 1)
|
| 38 |
cv2.line(frame, (x1, y1), (x1, y1 + corner_len), color, self.thickness + 1)
|
|
@@ -43,7 +43,7 @@ class FrameAnnotator:
|
|
| 43 |
cv2.line(frame, (x2, y2), (x2 - corner_len, y2), color, self.thickness + 1)
|
| 44 |
cv2.line(frame, (x2, y2), (x2, y2 - corner_len), color, self.thickness + 1)
|
| 45 |
|
| 46 |
-
|
| 47 |
label = detection.label
|
| 48 |
display_distance = detection.distance is not None and detection.class_name in {
|
| 49 |
"car", "truck", "bus", "motorcycle"
|
|
@@ -220,24 +220,24 @@ class FrameAnnotator:
|
|
| 220 |
panel_w = 200
|
| 221 |
panel_h = 105
|
| 222 |
|
| 223 |
-
|
| 224 |
overlay = frame.copy()
|
| 225 |
cv2.rectangle(overlay, (panel_x, panel_y), (panel_x + panel_w, panel_y + panel_h), (30, 30, 30), -1)
|
| 226 |
cv2.addWeighted(overlay, 0.85, frame, 0.15, 0, frame)
|
| 227 |
cv2.rectangle(frame, (panel_x, panel_y), (panel_x + panel_w, panel_y + panel_h), (80, 80, 80), 1)
|
| 228 |
|
| 229 |
-
|
| 230 |
cv2.putText(frame, "DETECTION STATS", (panel_x + 10, panel_y + 20), self.font, 0.5, (200, 200, 200), 1)
|
| 231 |
|
| 232 |
-
|
| 233 |
stats = [
|
| 234 |
-
(f"Vehicles: {vehicle_count}", (255, 200, 100)),
|
| 235 |
-
(f"Pedestrians: {pedestrian_count}", (100, 255, 255)),
|
| 236 |
-
(f"Signs: {sign_count}", (255, 150, 255)),
|
| 237 |
]
|
| 238 |
|
| 239 |
if fps is not None:
|
| 240 |
-
stats.append((f"FPS: {fps:.1f}", (200, 200, 200)))
|
| 241 |
|
| 242 |
for i, (text, color) in enumerate(stats):
|
| 243 |
cv2.putText(
|
|
|
|
| 29 |
x1, y1, x2, y2 = detection.bbox
|
| 30 |
color = detection.color
|
| 31 |
|
| 32 |
+
|
| 33 |
cv2.rectangle(frame, (x1, y1), (x2, y2), color, self.thickness)
|
| 34 |
|
| 35 |
+
|
| 36 |
corner_len = min(20, (x2 - x1) // 4, (y2 - y1) // 4)
|
| 37 |
cv2.line(frame, (x1, y1), (x1 + corner_len, y1), color, self.thickness + 1)
|
| 38 |
cv2.line(frame, (x1, y1), (x1, y1 + corner_len), color, self.thickness + 1)
|
|
|
|
| 43 |
cv2.line(frame, (x2, y2), (x2 - corner_len, y2), color, self.thickness + 1)
|
| 44 |
cv2.line(frame, (x2, y2), (x2, y2 - corner_len), color, self.thickness + 1)
|
| 45 |
|
| 46 |
+
|
| 47 |
label = detection.label
|
| 48 |
display_distance = detection.distance is not None and detection.class_name in {
|
| 49 |
"car", "truck", "bus", "motorcycle"
|
|
|
|
| 220 |
panel_w = 200
|
| 221 |
panel_h = 105
|
| 222 |
|
| 223 |
+
|
| 224 |
overlay = frame.copy()
|
| 225 |
cv2.rectangle(overlay, (panel_x, panel_y), (panel_x + panel_w, panel_y + panel_h), (30, 30, 30), -1)
|
| 226 |
cv2.addWeighted(overlay, 0.85, frame, 0.15, 0, frame)
|
| 227 |
cv2.rectangle(frame, (panel_x, panel_y), (panel_x + panel_w, panel_y + panel_h), (80, 80, 80), 1)
|
| 228 |
|
| 229 |
+
|
| 230 |
cv2.putText(frame, "DETECTION STATS", (panel_x + 10, panel_y + 20), self.font, 0.5, (200, 200, 200), 1)
|
| 231 |
|
| 232 |
+
|
| 233 |
stats = [
|
| 234 |
+
(f"Vehicles: {vehicle_count}", (255, 200, 100)),
|
| 235 |
+
(f"Pedestrians: {pedestrian_count}", (100, 255, 255)),
|
| 236 |
+
(f"Signs: {sign_count}", (255, 150, 255)),
|
| 237 |
]
|
| 238 |
|
| 239 |
if fps is not None:
|
| 240 |
+
stats.append((f"FPS: {fps:.1f}", (200, 200, 200)))
|
| 241 |
|
| 242 |
for i, (text, color) in enumerate(stats):
|
| 243 |
cv2.putText(
|
src/autonomous_vision/pipeline/processor.py
CHANGED
|
@@ -86,25 +86,25 @@ class VideoProcessor:
|
|
| 86 |
if not self._initialized:
|
| 87 |
self.initialize()
|
| 88 |
|
| 89 |
-
|
| 90 |
detections = self.yolo_detector.detect(frame)
|
| 91 |
|
| 92 |
-
|
| 93 |
if self.traffic_sign_detector and self.traffic_sign_detector.is_available:
|
| 94 |
sign_detections = self.traffic_sign_detector.detect(frame)
|
| 95 |
detections.extend(sign_detections)
|
| 96 |
|
| 97 |
-
|
| 98 |
lane_info = None
|
| 99 |
if self.lane_detector:
|
| 100 |
lane_info = self.lane_detector.detect(frame)
|
| 101 |
|
| 102 |
-
|
| 103 |
depth_map = None
|
| 104 |
if self.depth_estimator and self.depth_estimator.is_available:
|
| 105 |
depth_map = self.depth_estimator.estimate_depth(frame)
|
| 106 |
|
| 107 |
-
|
| 108 |
if depth_map is not None:
|
| 109 |
for detection in detections:
|
| 110 |
distance = self.depth_estimator.get_distance_for_bbox(
|
|
@@ -112,7 +112,7 @@ class VideoProcessor:
|
|
| 112 |
)
|
| 113 |
detection.distance = distance
|
| 114 |
|
| 115 |
-
|
| 116 |
traffic_light_state = TrafficLightState.UNKNOWN
|
| 117 |
if self.traffic_light_analyzer:
|
| 118 |
for detection in detections:
|
|
@@ -120,7 +120,7 @@ class VideoProcessor:
|
|
| 120 |
traffic_light_state = self.traffic_light_analyzer.analyze_detection(
|
| 121 |
frame, detection
|
| 122 |
)
|
| 123 |
-
|
| 124 |
if traffic_light_state == TrafficLightState.RED:
|
| 125 |
detection.color = (0, 0, 255)
|
| 126 |
detection.label = "Sygnalizacja: Nie mozesz jechac"
|
|
@@ -130,7 +130,7 @@ class VideoProcessor:
|
|
| 130 |
elif traffic_light_state == TrafficLightState.GREEN:
|
| 131 |
detection.color = (0, 255, 0)
|
| 132 |
detection.label = "Sygnalizacja: Jedz"
|
| 133 |
-
break
|
| 134 |
|
| 135 |
return FrameResult(
|
| 136 |
frame_number=0,
|
|
@@ -185,12 +185,12 @@ class VideoProcessor:
|
|
| 185 |
if not self._initialized:
|
| 186 |
self.initialize()
|
| 187 |
|
| 188 |
-
|
| 189 |
cap = cv2.VideoCapture(input_path)
|
| 190 |
if not cap.isOpened():
|
| 191 |
raise ValueError(f"Cannot open video: {input_path}")
|
| 192 |
|
| 193 |
-
|
| 194 |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
| 195 |
fps = cap.get(cv2.CAP_PROP_FPS)
|
| 196 |
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
@@ -199,18 +199,18 @@ class VideoProcessor:
|
|
| 199 |
if max_frames:
|
| 200 |
total_frames = min(total_frames, max_frames)
|
| 201 |
|
| 202 |
-
|
| 203 |
if output_path is None:
|
| 204 |
output_path = tempfile.mktemp(suffix=".mp4")
|
| 205 |
|
| 206 |
-
|
| 207 |
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
|
| 208 |
writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
|
| 209 |
|
| 210 |
-
|
| 211 |
analytics = VideoAnalytics(total_frames=total_frames)
|
| 212 |
|
| 213 |
-
|
| 214 |
frame_times: list[float] = []
|
| 215 |
|
| 216 |
try:
|
|
@@ -219,31 +219,31 @@ class VideoProcessor:
|
|
| 219 |
if not ret:
|
| 220 |
break
|
| 221 |
|
| 222 |
-
|
| 223 |
start_time = time.time()
|
| 224 |
|
| 225 |
-
|
| 226 |
result = self.process_frame(frame)
|
| 227 |
result.frame_number = frame_idx
|
| 228 |
|
| 229 |
-
|
| 230 |
analytics.update_from_frame(result)
|
| 231 |
|
| 232 |
-
|
| 233 |
process_time = time.time() - start_time
|
| 234 |
frame_times.append(process_time)
|
| 235 |
|
| 236 |
-
|
| 237 |
recent_times = frame_times[-30:] if len(frame_times) >= 30 else frame_times
|
| 238 |
avg_fps = 1.0 / (sum(recent_times) / len(recent_times)) if recent_times else 0
|
| 239 |
|
| 240 |
-
|
| 241 |
annotated = self.annotate_frame(frame, result, fps=avg_fps)
|
| 242 |
|
| 243 |
-
|
| 244 |
writer.write(annotated)
|
| 245 |
|
| 246 |
-
|
| 247 |
if progress_callback:
|
| 248 |
progress_callback(frame_idx + 1, total_frames)
|
| 249 |
|
|
|
|
| 86 |
if not self._initialized:
|
| 87 |
self.initialize()
|
| 88 |
|
| 89 |
+
|
| 90 |
detections = self.yolo_detector.detect(frame)
|
| 91 |
|
| 92 |
+
|
| 93 |
if self.traffic_sign_detector and self.traffic_sign_detector.is_available:
|
| 94 |
sign_detections = self.traffic_sign_detector.detect(frame)
|
| 95 |
detections.extend(sign_detections)
|
| 96 |
|
| 97 |
+
|
| 98 |
lane_info = None
|
| 99 |
if self.lane_detector:
|
| 100 |
lane_info = self.lane_detector.detect(frame)
|
| 101 |
|
| 102 |
+
|
| 103 |
depth_map = None
|
| 104 |
if self.depth_estimator and self.depth_estimator.is_available:
|
| 105 |
depth_map = self.depth_estimator.estimate_depth(frame)
|
| 106 |
|
| 107 |
+
|
| 108 |
if depth_map is not None:
|
| 109 |
for detection in detections:
|
| 110 |
distance = self.depth_estimator.get_distance_for_bbox(
|
|
|
|
| 112 |
)
|
| 113 |
detection.distance = distance
|
| 114 |
|
| 115 |
+
|
| 116 |
traffic_light_state = TrafficLightState.UNKNOWN
|
| 117 |
if self.traffic_light_analyzer:
|
| 118 |
for detection in detections:
|
|
|
|
| 120 |
traffic_light_state = self.traffic_light_analyzer.analyze_detection(
|
| 121 |
frame, detection
|
| 122 |
)
|
| 123 |
+
|
| 124 |
if traffic_light_state == TrafficLightState.RED:
|
| 125 |
detection.color = (0, 0, 255)
|
| 126 |
detection.label = "Sygnalizacja: Nie mozesz jechac"
|
|
|
|
| 130 |
elif traffic_light_state == TrafficLightState.GREEN:
|
| 131 |
detection.color = (0, 255, 0)
|
| 132 |
detection.label = "Sygnalizacja: Jedz"
|
| 133 |
+
break
|
| 134 |
|
| 135 |
return FrameResult(
|
| 136 |
frame_number=0,
|
|
|
|
| 185 |
if not self._initialized:
|
| 186 |
self.initialize()
|
| 187 |
|
| 188 |
+
|
| 189 |
cap = cv2.VideoCapture(input_path)
|
| 190 |
if not cap.isOpened():
|
| 191 |
raise ValueError(f"Cannot open video: {input_path}")
|
| 192 |
|
| 193 |
+
|
| 194 |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
| 195 |
fps = cap.get(cv2.CAP_PROP_FPS)
|
| 196 |
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
|
|
| 199 |
if max_frames:
|
| 200 |
total_frames = min(total_frames, max_frames)
|
| 201 |
|
| 202 |
+
|
| 203 |
if output_path is None:
|
| 204 |
output_path = tempfile.mktemp(suffix=".mp4")
|
| 205 |
|
| 206 |
+
|
| 207 |
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
|
| 208 |
writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
|
| 209 |
|
| 210 |
+
|
| 211 |
analytics = VideoAnalytics(total_frames=total_frames)
|
| 212 |
|
| 213 |
+
|
| 214 |
frame_times: list[float] = []
|
| 215 |
|
| 216 |
try:
|
|
|
|
| 219 |
if not ret:
|
| 220 |
break
|
| 221 |
|
| 222 |
+
|
| 223 |
start_time = time.time()
|
| 224 |
|
| 225 |
+
|
| 226 |
result = self.process_frame(frame)
|
| 227 |
result.frame_number = frame_idx
|
| 228 |
|
| 229 |
+
|
| 230 |
analytics.update_from_frame(result)
|
| 231 |
|
| 232 |
+
|
| 233 |
process_time = time.time() - start_time
|
| 234 |
frame_times.append(process_time)
|
| 235 |
|
| 236 |
+
|
| 237 |
recent_times = frame_times[-30:] if len(frame_times) >= 30 else frame_times
|
| 238 |
avg_fps = 1.0 / (sum(recent_times) / len(recent_times)) if recent_times else 0
|
| 239 |
|
| 240 |
+
|
| 241 |
annotated = self.annotate_frame(frame, result, fps=avg_fps)
|
| 242 |
|
| 243 |
+
|
| 244 |
writer.write(annotated)
|
| 245 |
|
| 246 |
+
|
| 247 |
if progress_callback:
|
| 248 |
progress_callback(frame_idx + 1, total_frames)
|
| 249 |
|