HayLahav commited on
Commit
7fdeaa3
·
verified ·
1 Parent(s): 6d66207

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +167 -102
app.py CHANGED
@@ -1,32 +1,85 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel, load_tool, tool
2
  import datetime
3
  import requests
4
  import pytz
5
  import yaml
6
- from tools.final_answer import FinalAnswerTool
7
- from ultralytics import YOLO # YOLOv8 model
8
- import cv2
9
- import numpy as np
10
- import os
11
  import tempfile
 
 
12
  import gradio as gr
13
- from Gradio_UI import GradioUI
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
 
15
  @tool
16
- def get_yolov8_coco_detections(video_path: str) -> str:
17
  """Detects objects in an MP4 video file using YOLOv8.
18
 
19
  Args:
20
  video_path: Path to the input video.
21
 
22
  Returns:
23
- Processed video file path with detections.
24
  """
25
  model = YOLO("yolov8s.pt") # Load pre-trained YOLOv8 model
26
  cap = cv2.VideoCapture(video_path) # Load video
27
 
28
  if not cap.isOpened():
29
- return f"Error: Could not open video file at {video_path}"
30
 
31
  width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
32
  height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
@@ -37,12 +90,14 @@ def get_yolov8_coco_detections(video_path: str) -> str:
37
  out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
38
 
39
  unique_detections = set()
 
40
 
41
  while cap.isOpened():
42
  ret, frame = cap.read()
43
  if not ret:
44
  break # End of video
45
-
 
46
  results = model(frame) # Run YOLOv8 inference
47
 
48
  for r in results:
@@ -71,13 +126,14 @@ def get_yolov8_coco_detections(video_path: str) -> str:
71
 
72
  return {
73
  "output_path": output_path,
74
- "detected_objects": [{"object": obj} for obj in detections_list]
 
75
  }
76
 
77
 
78
  @tool
79
  def detect_road_lanes(video_path: str) -> dict:
80
- """Detects lane markings in an MP4 video using YOLOv8-seg.
81
 
82
  Args:
83
  video_path: Path to the input video.
@@ -109,94 +165,96 @@ def detect_road_lanes(video_path: str) -> dict:
109
  # For lane detection specifically
110
  lane_count = 0
111
  detected_lanes = []
 
112
 
113
  while cap.isOpened():
114
  ret, frame = cap.read()
115
  if not ret:
116
  break
117
-
118
- # Run segmentation model for lane detection
119
- # YOLOv8-seg can identify roads and potentially lane markings
120
- results = model(frame, classes=[0, 1, 2, 3, 7]) # Focus on relevant classes like road, person, car
121
 
 
 
122
  # Create a visualization frame
123
  vis_frame = frame.copy()
124
 
125
- # Use the segmentation masks to help identify lanes
126
- if hasattr(results[0], 'masks') and results[0].masks is not None:
127
- masks = results[0].masks
128
-
129
- # Enhance lane detection with traditional computer vision
130
- gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
131
- blur = cv2.GaussianBlur(gray, (5, 5), 0)
132
- edges = cv2.Canny(blur, 50, 150)
133
-
134
- # Create a mask focused on the lower portion of the image (where lanes typically are)
135
- mask = np.zeros_like(edges)
136
- height, width = edges.shape
137
- polygon = np.array([[(0, height), (width, height), (width, height//2), (0, height//2)]], dtype=np.int32)
138
- cv2.fillPoly(mask, polygon, 255)
139
- masked_edges = cv2.bitwise_and(edges, mask)
140
-
141
- # Apply Hough transform to detect lines
142
- lines = cv2.HoughLinesP(masked_edges, 1, np.pi/180, 50, minLineLength=100, maxLineGap=50)
143
-
144
- current_lane_count = 0
145
- lane_lines = []
 
 
 
 
 
146
 
147
- if lines is not None:
148
- for line in lines:
149
- x1, y1, x2, y2 = line[0]
150
-
151
- # Filter out horizontal lines (not lanes)
152
- if abs(x2 - x1) > 0 and abs(y2 - y1) / abs(x2 - x1) > 0.5: # Slope threshold
153
- cv2.line(vis_frame, (x1, y1), (x2, y2), (0, 0, 255), 2) # Red lane markings
154
- lane_lines.append(((x1, y1), (x2, y2)))
 
155
 
156
- # Count lanes by clustering similar lines
157
- if lane_lines:
158
- # Simple clustering: group lines with similar slopes
159
- slopes = []
160
- for ((x1, y1), (x2, y2)) in lane_lines:
161
- # Avoid division by zero
162
- if x2 != x1:
163
- slope = (y2 - y1) / (x2 - x1)
164
- slopes.append(slope)
165
-
166
- # Cluster slopes to identify unique lanes
167
- unique_slopes = []
168
- for slope in slopes:
169
- is_new = True
170
- for us in unique_slopes:
171
- if abs(slope - us) < 0.2: # Threshold for considering slopes similar
172
- is_new = False
173
- break
174
- if is_new:
175
- unique_slopes.append(slope)
176
-
177
- current_lane_count = len(unique_slopes)
178
- lane_count = max(lane_count, current_lane_count)
179
-
180
- # Update detected lanes information
181
- detected_lanes = [{"lane_id": i, "slope": s} for i, s in enumerate(unique_slopes)]
182
-
183
  # Add lane count text
184
  cv2.putText(vis_frame, f"Detected lanes: {current_lane_count}", (50, 50),
185
  cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
186
 
187
- # Add segmentation visualization
188
- if hasattr(results[0], 'masks') and results[0].masks is not None:
189
- masks = results[0].masks
190
- for mask in masks:
191
- # Convert mask to binary image
192
- seg_mask = mask.data.cpu().numpy()[0].astype(np.uint8) * 255
193
- # Resize mask to frame size
194
- seg_mask = cv2.resize(seg_mask, (width, height))
195
- # Create colored overlay for the mask
196
- color_mask = np.zeros_like(vis_frame)
197
- color_mask[seg_mask > 0] = [0, 255, 255] # Yellow color for segmentation
198
- # Add the mask as semi-transparent overlay
199
- vis_frame = cv2.addWeighted(vis_frame, 1, color_mask, 0.3, 0)
 
 
 
 
 
 
 
200
 
201
  out.write(vis_frame)
202
 
@@ -206,7 +264,8 @@ def detect_road_lanes(video_path: str) -> dict:
206
  return {
207
  "output_path": output_path,
208
  "detected_lanes": detected_lanes,
209
- "lane_count": lane_count
 
210
  }
211
 
212
 
@@ -485,6 +544,16 @@ def get_current_time_in_timezone(timezone: str) -> str:
485
  # Setup FinalAnswerTool
486
  final_answer = FinalAnswerTool()
487
 
 
 
 
 
 
 
 
 
 
 
488
  # Setup model
489
  model = HfApiModel(
490
  max_tokens=2096,
@@ -493,18 +562,9 @@ model = HfApiModel(
493
  custom_role_conversions=None,
494
  )
495
 
496
- # Create or load prompts.yaml
497
- if not os.path.exists("prompts.yaml"):
498
- prompts = {
499
- "default": "You are an autonomous driving assistant that helps analyze road scenes and make driving decisions.",
500
- "prefix": "Analyze the following driving scenario: ",
501
- "suffix": "Provide a detailed analysis with safety recommendations."
502
- }
503
- with open("prompts.yaml", 'w') as file:
504
- yaml.dump(prompts, file)
505
- else:
506
- with open("prompts.yaml", 'r') as stream:
507
- prompt_templates = yaml.safe_load(stream)
508
 
509
  # Define agent
510
  agent = CodeAgent(
@@ -620,11 +680,16 @@ def create_gradio_interface():
620
 
621
  return demo
622
 
623
- # Try to use the GradioUI wrapper if it's available, otherwise use our custom interface
624
  try:
625
- # Launch using the GradioUI wrapper from the original code
626
- GradioUI(agent).launch()
 
 
 
 
 
627
  except Exception as e:
628
- print(f"Error using GradioUI wrapper: {e}")
629
  print("Launching custom Gradio interface instead")
630
  create_gradio_interface().launch()
 
1
+ # Install required packages first
2
+ import os
3
+ import sys
4
+ import subprocess
5
+
6
+ # Function to install packages if they are not already installed
7
+ def install_packages():
8
+ required_packages = [
9
+ 'ultralytics',
10
+ 'smolagents',
11
+ 'pytz',
12
+ 'pyyaml',
13
+ 'opencv-python',
14
+ 'numpy',
15
+ 'gradio'
16
+ ]
17
+
18
+ for package in required_packages:
19
+ try:
20
+ __import__(package)
21
+ print(f"{package} is already installed.")
22
+ except ImportError:
23
+ print(f"Installing {package}...")
24
+ subprocess.check_call([sys.executable, "-m", "pip", "install", package])
25
+ print(f"{package} has been installed.")
26
+
27
+ # Install required packages
28
+ print("Checking and installing required packages...")
29
+ install_packages()
30
+
31
+ # Now import the required modules
32
  from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel, load_tool, tool
33
  import datetime
34
  import requests
35
  import pytz
36
  import yaml
 
 
 
 
 
37
  import tempfile
38
+ import numpy as np
39
+ import cv2
40
  import gradio as gr
41
+ from ultralytics import YOLO # YOLOv8 model
42
+
43
+ # Create tools directory and FinalAnswerTool if they don't exist
44
+ os.makedirs("tools", exist_ok=True)
45
+ if not os.path.exists("tools/final_answer.py"):
46
+ with open("tools/final_answer.py", "w") as f:
47
+ f.write("""
48
+ class FinalAnswerTool:
49
+ def __call__(self, answer):
50
+ return {"answer": answer}
51
+ """)
52
+
53
+ # Import FinalAnswerTool
54
+ sys.path.append(os.getcwd())
55
+ from tools.final_answer import FinalAnswerTool
56
+
57
+ # Create prompts.yaml if it doesn't exist
58
+ if not os.path.exists("prompts.yaml"):
59
+ prompts = {
60
+ "default": "You are an autonomous driving assistant that helps analyze road scenes and make driving decisions.",
61
+ "prefix": "Analyze the following driving scenario: ",
62
+ "suffix": "Provide a detailed analysis with safety recommendations."
63
+ }
64
+ with open("prompts.yaml", 'w') as file:
65
+ yaml.dump(prompts, file)
66
+
67
 
68
  @tool
69
+ def get_yolov8_coco_detections(video_path: str) -> dict:
70
  """Detects objects in an MP4 video file using YOLOv8.
71
 
72
  Args:
73
  video_path: Path to the input video.
74
 
75
  Returns:
76
+ Dictionary with processed video path and detection results.
77
  """
78
  model = YOLO("yolov8s.pt") # Load pre-trained YOLOv8 model
79
  cap = cv2.VideoCapture(video_path) # Load video
80
 
81
  if not cap.isOpened():
82
+ return {"error": f"Could not open video file at {video_path}"}
83
 
84
  width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
85
  height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
 
90
  out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
91
 
92
  unique_detections = set()
93
+ frame_count = 0
94
 
95
  while cap.isOpened():
96
  ret, frame = cap.read()
97
  if not ret:
98
  break # End of video
99
+
100
+ frame_count += 1
101
  results = model(frame) # Run YOLOv8 inference
102
 
103
  for r in results:
 
126
 
127
  return {
128
  "output_path": output_path,
129
+ "detected_objects": [{"object": obj} for obj in detections_list],
130
+ "frames_processed": frame_count
131
  }
132
 
133
 
134
  @tool
135
  def detect_road_lanes(video_path: str) -> dict:
136
+ """Detects lane markings in an MP4 video using YOLOv8-seg and traditional CV techniques.
137
 
138
  Args:
139
  video_path: Path to the input video.
 
165
  # For lane detection specifically
166
  lane_count = 0
167
  detected_lanes = []
168
+ frame_count = 0
169
 
170
  while cap.isOpened():
171
  ret, frame = cap.read()
172
  if not ret:
173
  break
 
 
 
 
174
 
175
+ frame_count += 1
176
+
177
  # Create a visualization frame
178
  vis_frame = frame.copy()
179
 
180
+ # Enhance lane detection with traditional computer vision
181
+ gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
182
+ blur = cv2.GaussianBlur(gray, (5, 5), 0)
183
+ edges = cv2.Canny(blur, 50, 150)
184
+
185
+ # Create a mask focused on the lower portion of the image (where lanes typically are)
186
+ mask = np.zeros_like(edges)
187
+ height, width = edges.shape
188
+ polygon = np.array([[(0, height), (width, height), (width, height//2), (0, height//2)]], dtype=np.int32)
189
+ cv2.fillPoly(mask, polygon, 255)
190
+ masked_edges = cv2.bitwise_and(edges, mask)
191
+
192
+ # Apply Hough transform to detect lines
193
+ lines = cv2.HoughLinesP(masked_edges, 1, np.pi/180, 50, minLineLength=100, maxLineGap=50)
194
+
195
+ current_lane_count = 0
196
+ lane_lines = []
197
+
198
+ if lines is not None:
199
+ for line in lines:
200
+ x1, y1, x2, y2 = line[0]
201
+
202
+ # Filter out horizontal lines (not lanes)
203
+ if abs(x2 - x1) > 0 and abs(y2 - y1) / abs(x2 - x1) > 0.5: # Slope threshold
204
+ cv2.line(vis_frame, (x1, y1), (x2, y2), (0, 0, 255), 2) # Red lane markings
205
+ lane_lines.append(((x1, y1), (x2, y2)))
206
 
207
+ # Count lanes by clustering similar lines
208
+ if lane_lines:
209
+ # Simple clustering: group lines with similar slopes
210
+ slopes = []
211
+ for ((x1, y1), (x2, y2)) in lane_lines:
212
+ # Avoid division by zero
213
+ if x2 != x1:
214
+ slope = (y2 - y1) / (x2 - x1)
215
+ slopes.append(slope)
216
 
217
+ # Cluster slopes to identify unique lanes
218
+ unique_slopes = []
219
+ for slope in slopes:
220
+ is_new = True
221
+ for us in unique_slopes:
222
+ if abs(slope - us) < 0.2: # Threshold for considering slopes similar
223
+ is_new = False
224
+ break
225
+ if is_new:
226
+ unique_slopes.append(slope)
227
+
228
+ current_lane_count = len(unique_slopes)
229
+ lane_count = max(lane_count, current_lane_count)
230
+
231
+ # Update detected lanes information
232
+ detected_lanes = [{"lane_id": i, "slope": s} for i, s in enumerate(unique_slopes)]
233
+
 
 
 
 
 
 
 
 
 
 
234
  # Add lane count text
235
  cv2.putText(vis_frame, f"Detected lanes: {current_lane_count}", (50, 50),
236
  cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
237
 
238
+ # Try running YOLOv8 segmentation if available
239
+ try:
240
+ # Run segmentation model for road detection
241
+ seg_results = model(frame, classes=[0, 1, 2, 3, 7]) # Focus on relevant classes
242
+
243
+ if hasattr(seg_results[0], 'masks') and seg_results[0].masks is not None:
244
+ masks = seg_results[0].masks
245
+ for seg_mask in masks:
246
+ # Convert mask to binary image
247
+ mask_data = seg_mask.data.cpu().numpy()[0].astype(np.uint8) * 255
248
+ # Resize mask to frame size
249
+ mask_data = cv2.resize(mask_data, (width, height))
250
+ # Create colored overlay for the mask
251
+ color_mask = np.zeros_like(vis_frame)
252
+ color_mask[mask_data > 0] = [0, 255, 255] # Yellow color for segmentation
253
+ # Add the mask as semi-transparent overlay
254
+ vis_frame = cv2.addWeighted(vis_frame, 1, color_mask, 0.3, 0)
255
+ except Exception as e:
256
+ print(f"Warning: YOLOv8 segmentation failed: {e}")
257
+ # Continue without segmentation - we still have traditional lane detection
258
 
259
  out.write(vis_frame)
260
 
 
264
  return {
265
  "output_path": output_path,
266
  "detected_lanes": detected_lanes,
267
+ "lane_count": lane_count,
268
+ "frames_processed": frame_count
269
  }
270
 
271
 
 
544
  # Setup FinalAnswerTool
545
  final_answer = FinalAnswerTool()
546
 
547
+ # Create a placeholder for a GradioUI class if it doesn't exist
548
+ class GradioUIPlaceholder:
549
+ def __init__(self, agent):
550
+ self.agent = agent
551
+
552
+ def launch(self):
553
+ print("Using placeholder GradioUI implementation")
554
+ create_gradio_interface().launch()
555
+
556
+
557
  # Setup model
558
  model = HfApiModel(
559
  max_tokens=2096,
 
562
  custom_role_conversions=None,
563
  )
564
 
565
+ # Load prompts from YAML
566
+ with open("prompts.yaml", 'r') as stream:
567
+ prompt_templates = yaml.safe_load(stream)
 
 
 
 
 
 
 
 
 
568
 
569
  # Define agent
570
  agent = CodeAgent(
 
680
 
681
  return demo
682
 
683
+ # Main execution - Try to use the original GradioUI if available, otherwise use our custom interface
684
  try:
685
+ # Check if GradioUI is available in the global namespace
686
+ if 'GradioUI' in globals():
687
+ print("Using original GradioUI")
688
+ GradioUI(agent).launch()
689
+ else:
690
+ # Use our placeholder implementation if the original isn't available
691
+ raise ImportError("Original GradioUI not found")
692
  except Exception as e:
693
+ print(f"Error using original GradioUI: {e}")
694
  print("Launching custom Gradio interface instead")
695
  create_gradio_interface().launch()