HayLahav's picture
Update app.py
d1217b2 verified
# Install required packages first
import os
import sys
import subprocess
# Function to install packages if they are not already installed
def install_packages():
required_packages = [
'ultralytics',
'smolagents',
'pytz',
'pyyaml',
'opencv-python',
'numpy',
'gradio'
]
for package in required_packages:
try:
__import__(package)
print(f"{package} is already installed.")
except ImportError:
print(f"Installing {package}...")
subprocess.check_call([sys.executable, "-m", "pip", "install", package])
print(f"{package} has been installed.")
# Install required packages
print("Checking and installing required packages...")
install_packages()
# Now import the required modules
from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel, load_tool, tool
import datetime
import requests
import pytz
import yaml
import tempfile
import numpy as np
import cv2
import gradio as gr
from ultralytics import YOLO # YOLOv8 model
# Create tools directory and FinalAnswerTool if they don't exist
os.makedirs("tools", exist_ok=True)
if not os.path.exists("tools/final_answer.py"):
with open("tools/final_answer.py", "w") as f:
f.write("""
class FinalAnswerTool:
def __call__(self, answer):
return {"answer": answer}
""")
# Import FinalAnswerTool
sys.path.append(os.getcwd())
from tools.final_answer import FinalAnswerTool
# Create prompts.yaml if it doesn't exist
if not os.path.exists("prompts.yaml"):
prompts = {
"default": "You are an autonomous driving assistant that helps analyze road scenes and make driving decisions.",
"prefix": "Analyze the following driving scenario: ",
"suffix": "Provide a detailed analysis with safety recommendations."
}
with open("prompts.yaml", 'w') as file:
yaml.dump(prompts, file)
@tool
def get_yolov8_coco_detections(video_path: str) -> dict:
"""Detects objects in an MP4 video file using YOLOv8.
Args:
video_path: Path to the input video.
Returns:
Dictionary with processed video path and detection results.
"""
model = YOLO("yolov8s.pt") # Load pre-trained YOLOv8 model
cap = cv2.VideoCapture(video_path) # Load video
if not cap.isOpened():
return {"error": f"Could not open video file at {video_path}"}
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for output video
output_path = "output_video.mp4" # Save processed video
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
unique_detections = set()
frame_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break # End of video
frame_count += 1
results = model(frame) # Run YOLOv8 inference
for r in results:
boxes = r.boxes
for box in boxes:
x1, y1, x2, y2 = box.xyxy[0].tolist()
conf = box.conf[0].item()
cls = int(box.cls[0].item())
class_name = model.names[cls]
unique_detections.add(f"{class_name}")
# Draw bounding box
cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
# Add label
label = f"{class_name} {conf:.2f}"
cv2.putText(frame, label, (int(x1), int(y1)-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
out.write(frame) # Save frame to output video
cap.release()
out.release()
detections_list = list(unique_detections)
return {
"output_path": output_path,
"detected_objects": [{"object": obj} for obj in detections_list],
"frames_processed": frame_count
}
@tool
def detect_road_lanes(video_path: str) -> dict:
"""Detects lane markings in an MP4 video using YOLOv8-seg and traditional CV techniques.
Args:
video_path: Path to the input video.
Returns:
Dictionary with processed video path and lane detection results.
"""
# Check if we already have downloaded the model, if not, download it
model_path = "yolov8s-seg.pt"
if not os.path.exists(model_path):
# First, download YOLOv8 segmentation model
model = YOLO("yolov8s-seg.pt")
else:
model = YOLO(model_path)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return {"error": f"Could not open video file at {video_path}"}
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
output_path = "lanes_output.mp4"
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
# For lane detection specifically
lane_count = 0
detected_lanes = []
frame_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame_count += 1
# Create a visualization frame
vis_frame = frame.copy()
# Enhance lane detection with traditional computer vision
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
blur = cv2.GaussianBlur(gray, (5, 5), 0)
edges = cv2.Canny(blur, 50, 150)
# Create a mask focused on the lower portion of the image (where lanes typically are)
mask = np.zeros_like(edges)
height, width = edges.shape
polygon = np.array([[(0, height), (width, height), (width, height//2), (0, height//2)]], dtype=np.int32)
cv2.fillPoly(mask, polygon, 255)
masked_edges = cv2.bitwise_and(edges, mask)
# Apply Hough transform to detect lines
lines = cv2.HoughLinesP(masked_edges, 1, np.pi/180, 50, minLineLength=100, maxLineGap=50)
current_lane_count = 0
lane_lines = []
if lines is not None:
for line in lines:
x1, y1, x2, y2 = line[0]
# Filter out horizontal lines (not lanes)
if abs(x2 - x1) > 0 and abs(y2 - y1) / abs(x2 - x1) > 0.5: # Slope threshold
cv2.line(vis_frame, (x1, y1), (x2, y2), (0, 0, 255), 2) # Red lane markings
lane_lines.append(((x1, y1), (x2, y2)))
# Count lanes by clustering similar lines
if lane_lines:
# Simple clustering: group lines with similar slopes
slopes = []
for ((x1, y1), (x2, y2)) in lane_lines:
# Avoid division by zero
if x2 != x1:
slope = (y2 - y1) / (x2 - x1)
slopes.append(slope)
# Cluster slopes to identify unique lanes
unique_slopes = []
for slope in slopes:
is_new = True
for us in unique_slopes:
if abs(slope - us) < 0.2: # Threshold for considering slopes similar
is_new = False
break
if is_new:
unique_slopes.append(slope)
current_lane_count = len(unique_slopes)
lane_count = max(lane_count, current_lane_count)
# Update detected lanes information
detected_lanes = [{"lane_id": i, "slope": s} for i, s in enumerate(unique_slopes)]
# Add lane count text
cv2.putText(vis_frame, f"Detected lanes: {current_lane_count}", (50, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
# Try running YOLOv8 segmentation if available
try:
# Run segmentation model for road detection
seg_results = model(frame, classes=[0, 1, 2, 3, 7]) # Focus on relevant classes
if hasattr(seg_results[0], 'masks') and seg_results[0].masks is not None:
masks = seg_results[0].masks
for seg_mask in masks:
# Convert mask to binary image
mask_data = seg_mask.data.cpu().numpy()[0].astype(np.uint8) * 255
# Resize mask to frame size
mask_data = cv2.resize(mask_data, (width, height))
# Create colored overlay for the mask
color_mask = np.zeros_like(vis_frame)
color_mask[mask_data > 0] = [0, 255, 255] # Yellow color for segmentation
# Add the mask as semi-transparent overlay
vis_frame = cv2.addWeighted(vis_frame, 1, color_mask, 0.3, 0)
except Exception as e:
print(f"Warning: YOLOv8 segmentation failed: {e}")
# Continue without segmentation - we still have traditional lane detection
out.write(vis_frame)
cap.release()
out.release()
return {
"output_path": output_path,
"detected_lanes": detected_lanes,
"lane_count": lane_count,
"frames_processed": frame_count
}
@tool
def driving_situation_analyzer(video_path: str) -> dict:
"""Analyzes road conditions by integrating YOLOv8 object detections and lane information.
Args:
video_path: Path to the input video.
Returns:
A dictionary containing situation analysis.
"""
# Run object detection with YOLOv8
object_results = get_yolov8_coco_detections(video_path)
if "error" in object_results:
return {"error": object_results["error"]}
# Run lane detection with YOLOv8
lane_results = detect_road_lanes(video_path)
if "error" in lane_results:
return {"error": lane_results["error"]}
# Extract information from results
detected_objects = object_results.get("detected_objects", [])
detected_lanes = lane_results.get("detected_lanes", [])
lane_count = lane_results.get("lane_count", 0)
# Analyze the driving situation
situation = []
if any(obj["object"] in ["car", "truck", "bus"] for obj in detected_objects):
situation.append("Traffic detected ahead, maintain safe distance.")
if any(obj["object"] == "person" for obj in detected_objects):
situation.append("Pedestrian detected, be prepared to stop.")
if any(obj["object"] == "traffic light" for obj in detected_objects):
situation.append("Traffic light detected, slow down if red.")
if any(obj["object"] == "stop sign" for obj in detected_objects):
situation.append("Stop sign detected, prepare to stop.")
if lane_count == 0:
situation.append("Lane markings not detected, potential risk of veering.")
elif lane_count == 1:
situation.append("Single lane detected, ensure proper lane following.")
elif lane_count >= 2:
situation.append(f"{lane_count} lanes detected, stay within lane boundaries.")
# Evaluate road complexity
road_complexity = "LOW"
if lane_count > 2:
road_complexity = "MEDIUM"
vehicle_count = sum(1 for obj in detected_objects if obj["object"] in ["car", "truck", "bus"])
if vehicle_count > 3:
road_complexity = "HIGH"
if any(obj["object"] == "person" for obj in detected_objects) and vehicle_count > 1:
road_complexity = "HIGH"
# Evaluate safety level
safety_level = "HIGH"
if "Pedestrian detected" in " ".join(situation):
safety_level = "MEDIUM"
if "Lane markings not detected" in " ".join(situation):
safety_level = "LOW"
if vehicle_count > 5:
safety_level = "MEDIUM"
return {
"situation_summary": " | ".join(situation) if situation else "Road situation unclear, proceed with caution.",
"detected_objects": detected_objects,
"detected_lanes": detected_lanes,
"lane_count": lane_count,
"road_complexity": road_complexity,
"safety_level": safety_level,
"objects_video": object_results.get("output_path"),
"lanes_video": lane_results.get("output_path")
}
@tool
def predict_trajectory(video_path: str) -> dict:
"""Predicts vehicle trajectory based on the driving situation analysis.
Args:
video_path: Path to the input video.
Returns:
A dictionary containing trajectory predictions.
"""
# First get the comprehensive analysis
analysis = driving_situation_analyzer(video_path)
if "error" in analysis:
return {"error": analysis["error"]}
# Extract key information for trajectory planning
detected_objects = analysis.get("detected_objects", [])
detected_lanes = analysis.get("detected_lanes", [])
lane_count = analysis.get("lane_count", 0)
road_complexity = analysis.get("road_complexity", "MEDIUM")
safety_level = analysis.get("safety_level", "MEDIUM")
summary = analysis.get("situation_summary", "")
# Plan trajectory based on situation analysis
trajectory = []
# Safety level affects overall driving strategy
if safety_level == "LOW":
trajectory.append("Reduce speed significantly, proceed with extreme caution.")
elif safety_level == "MEDIUM":
trajectory.append("Maintain moderate speed, be alert for changing conditions.")
else: # HIGH
trajectory.append("Normal driving conditions, maintain safe speed.")
# Road complexity affects navigation approach
if road_complexity == "HIGH":
trajectory.append("Complex traffic environment, navigate with extra caution.")
# Object-specific trajectory adjustments
for obj in detected_objects:
obj_name = obj["object"].lower()
if "person" in obj_name:
trajectory.append("Yield to pedestrians, prepare for potential stopping.")
if obj_name in ["car", "truck", "bus"]:
trajectory.append("Vehicle detected, maintain safe following distance.")
if obj_name == "traffic light":
trajectory.append("Approach intersection carefully, prepare to stop if light changes.")
if obj_name == "stop sign":
trajectory.append("Slow down and prepare to stop completely at the stop sign.")
# Lane-specific trajectory planning
if lane_count == 0:
trajectory.append("No lanes detected, follow visual road boundaries carefully.")
elif lane_count == 1:
trajectory.append("Single lane detected, maintain centered position.")
else:
trajectory.append(f"{lane_count} lanes available, stay within current lane.")
# Create video visualization of the predicted trajectory
# Get last frame from the analysis video
lane_video = analysis.get("lanes_video")
if lane_video and os.path.exists(lane_video):
cap = cv2.VideoCapture(lane_video)
else:
cap = cv2.VideoCapture(video_path)
# Get the last frame for visualization
frame = None
while cap.isOpened():
ret, current_frame = cap.read()
if not ret:
break
frame = current_frame
cap.release()
# Generate trajectory visualization if we have a frame
if frame is not None:
height, width = frame.shape[:2]
# Create a copy of the frame for trajectory visualization
trajectory_frame = frame.copy()
# Draw starting point at bottom center
start_x, start_y = width // 2, height - 50
cv2.circle(trajectory_frame, (start_x, start_y), 5, (255, 255, 0), -1)
# Generate trajectory points based on analysis
future_positions = []
x, y = start_x, start_y
# Adjust trajectory based on safety level and road conditions
lateral_variation = 5 # Default lateral variation
if safety_level == "LOW":
lateral_variation = 3 # Less lateral movement when unsafe
elif road_complexity == "HIGH":
lateral_variation = 8 # More potential variation in complex roads
# Generate trajectory points
for t in range(10):
# Move forward (up in the image)
y -= 30 # Move up by 30 pixels
# Calculate lateral adjustment based on conditions
if "pedestrian" in summary.lower():
# Move away from pedestrians (assume they're on the right)
x -= np.random.uniform(0, lateral_variation)
elif "traffic" in summary.lower() and t > 5:
# Slight movement to adjust for traffic ahead
x += np.random.uniform(-lateral_variation/2, lateral_variation/2)
else:
# Normal driving with slight randomness
x += np.random.uniform(-lateral_variation/3, lateral_variation/3)
# Ensure point is within frame and reasonable drivable area
x = max(width * 0.2, min(width * 0.8, x)) # Keep within 20-80% of width
y = max(0, min(height-1, y))
future_positions.append((int(x), int(y)))
# Draw point on the frame
cv2.circle(trajectory_frame, (int(x), int(y)), 5, (255, 255, 0), -1)
# Connect points with lines
for i in range(1, len(future_positions)):
cv2.line(trajectory_frame, future_positions[i-1], future_positions[i], (255, 255, 0), 2)
# Add trajectory recommendation text
trajectory_text = " | ".join(trajectory) if trajectory else "Maintain current path."
y_pos = 30
# Split text into multiple lines if too long
for line in [trajectory_text[i:i+60] for i in range(0, len(trajectory_text), 60)]:
cv2.putText(trajectory_frame, line, (30, y_pos),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
y_pos += 30
# Add safety level indicator
safety_color = (0, 255, 0) if safety_level == "HIGH" else \
(0, 255, 255) if safety_level == "MEDIUM" else \
(0, 0, 255) # Red for LOW safety
cv2.putText(trajectory_frame, f"Safety: {safety_level}", (width-150, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, safety_color, 2)
# Save the trajectory visualization
output_path = "trajectory_output.mp4"
# Create a short video showing the trajectory
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, 1, (width, height))
# Write the frame several times to create a video
for _ in range(3):
out.write(trajectory_frame)
out.release()
else:
future_positions = []
output_path = None
return {
"trajectory_recommendation": " | ".join(trajectory) if trajectory else "Maintain current path.",
"future_positions": future_positions,
"safety_level": safety_level,
"road_complexity": road_complexity,
"trajectory_video": output_path if os.path.exists("trajectory_output.mp4") else None,
"analysis_videos": {
"objects": analysis.get("objects_video"),
"lanes": analysis.get("lanes_video")
}
}
@tool
def get_current_time_in_timezone(timezone: str) -> str:
"""Fetches the current local time in a specified timezone.
Args:
timezone: A string representing a valid timezone (e.g., 'America/New_York', 'Europe/London', 'Asia/Tokyo').
Returns:
A string showing the current local time in the specified timezone.
"""
try:
tz = pytz.timezone(timezone)
local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S")
return f"The current local time in {timezone} is: {local_time}"
except Exception as e:
return f"Error fetching time for timezone '{timezone}': {str(e)}"
# Setup FinalAnswerTool
final_answer = FinalAnswerTool()
# Create a placeholder for a GradioUI class if it doesn't exist
class GradioUIPlaceholder:
def __init__(self, agent):
self.agent = agent
def launch(self):
print("Using placeholder GradioUI implementation")
create_gradio_interface().launch()
# Setup model
model = HfApiModel(
max_tokens=2096,
temperature=0.5,
model_id='Qwen/Qwen2.5-Coder-32B-Instruct',
custom_role_conversions=None,
)
# Load prompts from YAML
with open("prompts.yaml", 'r') as stream:
prompt_templates = yaml.safe_load(stream)
# Define agent
agent = CodeAgent(
model=model,
tools=[
final_answer,
get_yolov8_coco_detections,
detect_road_lanes,
driving_situation_analyzer,
predict_trajectory,
get_current_time_in_timezone
],
max_steps=6,
verbosity_level=1,
grammar=None,
planning_interval=None,
name=None,
description=None,
prompt_templates=prompt_templates
)
# Define Gradio interface for testing without the GradioUI wrapper
def create_gradio_interface():
with gr.Blocks(title="Autonomous Driving Video Analysis with YOLOv8") as demo:
gr.Markdown("# Autonomous Driving Video Analysis with YOLOv8")
with gr.Row():
with gr.Column():
video_input = gr.Video(label="Upload Driving Video")
analysis_type = gr.Radio(
["Object Detection (YOLOv8)", "Lane Detection (YOLOv8)", "Situation Analysis", "Trajectory Prediction"],
label="Analysis Type",
value="Object Detection (YOLOv8)"
)
analyze_btn = gr.Button("Analyze Video")
with gr.Column():
result_text = gr.Textbox(label="Analysis Results", lines=10)
with gr.Tabs():
with gr.TabItem("Object Detection"):
object_video_output = gr.Video(label="Object Detection Results")
with gr.TabItem("Lane Detection"):
lane_video_output = gr.Video(label="Lane Detection Results")
with gr.TabItem("Trajectory Prediction"):
trajectory_video_output = gr.Video(label="Trajectory Prediction")
def process_video(video_path, analysis_type):
"""Process the video based on the selected analysis type."""
if not video_path:
return "Please upload a video file.", None, None, None
# Save the uploaded video to a temporary file
temp_dir = tempfile.gettempdir()
temp_video_path = os.path.join(temp_dir, "input_video.mp4")
# Save the uploaded video
with open(temp_video_path, "wb") as f:
if hasattr(video_path, "read"):
# If video_path is a file-like object (from Gradio)
f.write(video_path.read())
else:
# If video_path is already a path
with open(video_path, "rb") as source_file:
f.write(source_file.read())
result = None
object_video = None
lane_video = None
trajectory_video = None
try:
if analysis_type == "Object Detection (YOLOv8)":
result = get_yolov8_coco_detections(temp_video_path)
if isinstance(result, dict) and "output_path" in result and os.path.exists(result["output_path"]):
object_video = result["output_path"]
elif analysis_type == "Lane Detection (YOLOv8)":
result = detect_road_lanes(temp_video_path)
if isinstance(result, dict) and "output_path" in result and os.path.exists(result["output_path"]):
lane_video = result["output_path"]
elif analysis_type == "Situation Analysis":
result = driving_situation_analyzer(temp_video_path)
if isinstance(result, dict):
if "objects_video" in result and result["objects_video"] and os.path.exists(result["objects_video"]):
object_video = result["objects_video"]
if "lanes_video" in result and result["lanes_video"] and os.path.exists(result["lanes_video"]):
lane_video = result["lanes_video"]
elif analysis_type == "Trajectory Prediction":
result = predict_trajectory(temp_video_path)
if isinstance(result, dict):
if "analysis_videos" in result:
videos = result["analysis_videos"]
if "objects" in videos and videos["objects"] and os.path.exists(videos["objects"]):
object_video = videos["objects"]
if "lanes" in videos and videos["lanes"] and os.path.exists(videos["lanes"]):
lane_video = videos["lanes"]
if "trajectory_video" in result and result["trajectory_video"] and os.path.exists(result["trajectory_video"]):
trajectory_video = result["trajectory_video"]
except Exception as e:
return f"Error processing video: {str(e)}", None, None, None
return str(result), object_video, lane_video, trajectory_video
analyze_btn.click(
fn=process_video,
inputs=[video_input, analysis_type],
outputs=[result_text, object_video_output, lane_video_output, trajectory_video_output]
)
return demo
# Main execution - Try to use the original GradioUI if available, otherwise use our custom interface
try:
# Check if GradioUI is available in the global namespace
if 'GradioUI' in globals():
print("Using original GradioUI")
GradioUI(agent).launch()
else:
# Use our placeholder implementation if the original isn't available
raise ImportError("Original GradioUI not found")
except Exception as e:
print(f"Error using original GradioUI: {e}")
print("Launching custom Gradio interface instead")
create_gradio_interface().launch()