HayLahav commited on
Commit
287cfc9
·
verified ·
1 Parent(s): ae7a494

Update app.py

Browse files

YOLO5V with OpenPilot Trajectory Prediction

Files changed (1) hide show
  1. app.py +122 -29
app.py CHANGED
@@ -1,61 +1,155 @@
1
- from smolagents import CodeAgent,DuckDuckGoSearchTool, HfApiModel,load_tool,tool
2
  import datetime
3
  import requests
4
  import pytz
5
  import yaml
6
  from tools.final_answer import FinalAnswerTool
7
-
 
 
8
  from Gradio_UI import GradioUI
9
 
10
- # Below is an example of a tool that does nothing. Amaze us with your creativity !
11
  @tool
12
- def my_custom_tool(arg1:str, arg2:int)-> str: #it's import to specify the return type
13
- #Keep this format for the description / args / args description but feel free to modify the tool
14
- """A tool that does nothing yet
15
- Args:
16
- arg1: the first argument
17
- arg2: the second argument
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18
  """
19
- return "What magic will you build ?"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
20
 
21
  @tool
22
  def get_current_time_in_timezone(timezone: str) -> str:
23
- """A tool that fetches the current local time in a specified timezone.
24
- Args:
25
- timezone: A string representing a valid timezone (e.g., 'America/New_York').
26
- """
27
  try:
28
- # Create timezone object
29
  tz = pytz.timezone(timezone)
30
- # Get current time in that timezone
31
  local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S")
32
  return f"The current local time in {timezone} is: {local_time}"
33
  except Exception as e:
34
  return f"Error fetching time for timezone '{timezone}': {str(e)}"
35
 
36
-
37
  final_answer = FinalAnswerTool()
38
 
39
- # If the agent does not answer, the model is overloaded, please use another model or the following Hugging Face Endpoint that also contains qwen2.5 coder:
40
- # model_id='https://pflgm2locj2t89co.us-east-1.aws.endpoints.huggingface.cloud'
41
-
42
  model = HfApiModel(
43
- max_tokens=2096,
44
- temperature=0.5,
45
- model_id='Qwen/Qwen2.5-Coder-32B-Instruct',# it is possible that this model may be overloaded
46
- custom_role_conversions=None,
47
  )
48
 
49
-
50
- # Import tool from Hub
51
  image_generation_tool = load_tool("agents-course/text-to-image", trust_remote_code=True)
52
 
53
  with open("prompts.yaml", 'r') as stream:
54
  prompt_templates = yaml.safe_load(stream)
55
-
56
  agent = CodeAgent(
57
  model=model,
58
- tools=[final_answer], ## add your tools here (don't remove final answer)
 
 
 
 
 
 
59
  max_steps=6,
60
  verbosity_level=1,
61
  grammar=None,
@@ -65,5 +159,4 @@ agent = CodeAgent(
65
  prompt_templates=prompt_templates
66
  )
67
 
68
-
69
  GradioUI(agent).launch()
 
1
+ from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel, load_tool, tool
2
  import datetime
3
  import requests
4
  import pytz
5
  import yaml
6
  from tools.final_answer import FinalAnswerTool
7
+ from ultralytics import YOLO # YOLOv5 model
8
+ import cv2
9
+ import numpy as np
10
  from Gradio_UI import GradioUI
11
 
 
12
  @tool
13
+ def get_yolov5_coco_detections(image_path: str) -> dict:
14
+ """Detects objects using YOLOv5 on the COCO dataset and provides structured outputs."""
15
+ model = YOLO("yolov5s.pt")
16
+ image = cv2.imread(image_path)
17
+ results = model(image)
18
+
19
+ detections = []
20
+ if results:
21
+ for r in results.pred[0]:
22
+ x1, y1, x2, y2, conf, cls = r.tolist()
23
+ class_name = model.names[int(cls)]
24
+ detections.append({"object": class_name, "confidence": conf, "bbox": (x1, y1, x2, y2)})
25
+
26
+ return {"detected_objects": detections} if detections else {"detected_objects": []}
27
+
28
+ @tool
29
+ def detect_road_lanes(image_path: str) -> dict:
30
+ """Detects road lanes using a YOLOv5 model trained for lane detection."""
31
+ model = YOLO("yolov5-lane.pt")
32
+ image = cv2.imread(image_path)
33
+ results = model(image)
34
+
35
+ lane_detections = []
36
+ if results:
37
+ for r in results.pred[0]:
38
+ x1, y1, x2, y2, conf, cls = r.tolist()
39
+ lane_detections.append({"lane": f"Lane {cls}", "confidence": conf, "bbox": (x1, y1, x2, y2)})
40
+
41
+ return {"detected_lanes": lane_detections} if lane_detections else {"detected_lanes": []}
42
+
43
+ @tool
44
+ def driving_situation_analyzer(image_path: str) -> dict:
45
+ """Analyzes road conditions by integrating object detections and lane information."""
46
+ objects_info = get_yolov5_coco_detections(image_path)
47
+ lanes_info = detect_road_lanes(image_path)
48
+
49
+ detected_objects = objects_info.get("detected_objects", [])
50
+ detected_lanes = lanes_info.get("detected_lanes", [])
51
+
52
+ situation = []
53
+
54
+ if any(obj["object"] in ["car", "truck", "bus"] for obj in detected_objects):
55
+ situation.append("Traffic detected ahead, maintain safe distance.")
56
+
57
+ if any(obj["object"] == "pedestrian" for obj in detected_objects):
58
+ situation.append("Pedestrian detected, be prepared to stop.")
59
+
60
+ if any(obj["object"] == "traffic light" for obj in detected_objects):
61
+ situation.append("Traffic light detected, slow down if red.")
62
+
63
+ if not detected_lanes:
64
+ situation.append("Lane markings not detected, potential risk of veering.")
65
+
66
+ elif len(detected_lanes) == 1:
67
+ situation.append("Single lane detected, ensure proper lane following.")
68
+
69
+ elif len(detected_lanes) >= 2:
70
+ situation.append("Multiple lanes detected, stay within lane boundaries.")
71
+
72
+ return {
73
+ "situation_summary": " | ".join(situation) if situation else "Road situation unclear, proceed with caution.",
74
+ "detected_objects": detected_objects,
75
+ "detected_lanes": detected_lanes
76
+ }
77
+
78
+ @tool
79
+ def predict_trajectory(image_path: str) -> dict:
80
+ """Predicts vehicle trajectory based on the driving situation analysis.
81
+
82
+ Uses OpenPilot-style motion prediction based on detected lanes, objects, and road conditions.
83
  """
84
+ analysis = driving_situation_analyzer(image_path)
85
+ detected_objects = analysis["detected_objects"]
86
+ detected_lanes = analysis["detected_lanes"]
87
+ summary = analysis["situation_summary"]
88
+
89
+ trajectory = []
90
+
91
+ # Define simple trajectory logic
92
+ if "Traffic detected" in summary:
93
+ trajectory.append("Reduce speed, maintain a safe distance.")
94
+
95
+ if "Pedestrian detected" in summary:
96
+ trajectory.append("Prepare for sudden braking or yielding.")
97
+
98
+ if "Traffic light detected" in summary:
99
+ trajectory.append("Adjust speed based on light status.")
100
+
101
+ if "Lane markings not detected" in summary:
102
+ trajectory.append("Risk of lane departure, drive cautiously.")
103
+
104
+ if len(detected_lanes) >= 2:
105
+ trajectory.append("Stay centered in the lane, adjust for merging vehicles.")
106
+
107
+ # Generate simple trajectory points (Mocked example)
108
+ future_positions = []
109
+ x, y = 0, 0
110
+ for t in range(10): # Predict 10 future steps
111
+ x += np.random.uniform(-0.5, 0.5) # Small lateral deviation
112
+ y += 1 # Move forward
113
+ future_positions.append((x, y))
114
+
115
+ return {
116
+ "trajectory_recommendation": " | ".join(trajectory) if trajectory else "Maintain current path.",
117
+ "future_positions": future_positions
118
+ }
119
 
120
  @tool
121
  def get_current_time_in_timezone(timezone: str) -> str:
122
+ """Fetches the current local time in a specified timezone."""
 
 
 
123
  try:
 
124
  tz = pytz.timezone(timezone)
 
125
  local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S")
126
  return f"The current local time in {timezone} is: {local_time}"
127
  except Exception as e:
128
  return f"Error fetching time for timezone '{timezone}': {str(e)}"
129
 
 
130
  final_answer = FinalAnswerTool()
131
 
 
 
 
132
  model = HfApiModel(
133
+ max_tokens=2096,
134
+ temperature=0.5,
135
+ model_id='Qwen/Qwen2.5-Coder-32B-Instruct',
136
+ custom_role_conversions=None,
137
  )
138
 
 
 
139
  image_generation_tool = load_tool("agents-course/text-to-image", trust_remote_code=True)
140
 
141
  with open("prompts.yaml", 'r') as stream:
142
  prompt_templates = yaml.safe_load(stream)
143
+
144
  agent = CodeAgent(
145
  model=model,
146
+ tools=[
147
+ final_answer,
148
+ get_yolov5_coco_detections,
149
+ detect_road_lanes,
150
+ driving_situation_analyzer,
151
+ predict_trajectory
152
+ ],
153
  max_steps=6,
154
  verbosity_level=1,
155
  grammar=None,
 
159
  prompt_templates=prompt_templates
160
  )
161
 
 
162
  GradioUI(agent).launch()