# Install required packages first import os import sys import subprocess # Function to install packages if they are not already installed def install_packages(): required_packages = [ 'ultralytics', 'smolagents', 'pytz', 'pyyaml', 'opencv-python', 'numpy', 'gradio' ] for package in required_packages: try: __import__(package) print(f"{package} is already installed.") except ImportError: print(f"Installing {package}...") subprocess.check_call([sys.executable, "-m", "pip", "install", package]) print(f"{package} has been installed.") # Install required packages print("Checking and installing required packages...") install_packages() # Now import the required modules from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel, load_tool, tool import datetime import requests import pytz import yaml import tempfile import numpy as np import cv2 import gradio as gr from ultralytics import YOLO # YOLOv8 model # Create tools directory and FinalAnswerTool if they don't exist os.makedirs("tools", exist_ok=True) if not os.path.exists("tools/final_answer.py"): with open("tools/final_answer.py", "w") as f: f.write(""" class FinalAnswerTool: def __call__(self, answer): return {"answer": answer} """) # Import FinalAnswerTool sys.path.append(os.getcwd()) from tools.final_answer import FinalAnswerTool # Create prompts.yaml if it doesn't exist if not os.path.exists("prompts.yaml"): prompts = { "default": "You are an autonomous driving assistant that helps analyze road scenes and make driving decisions.", "prefix": "Analyze the following driving scenario: ", "suffix": "Provide a detailed analysis with safety recommendations." } with open("prompts.yaml", 'w') as file: yaml.dump(prompts, file) @tool def get_yolov8_coco_detections(video_path: str) -> dict: """Detects objects in an MP4 video file using YOLOv8. Args: video_path: Path to the input video. Returns: Dictionary with processed video path and detection results. """ model = YOLO("yolov8s.pt") # Load pre-trained YOLOv8 model cap = cv2.VideoCapture(video_path) # Load video if not cap.isOpened(): return {"error": f"Could not open video file at {video_path}"} width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for output video output_path = "output_video.mp4" # Save processed video out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) unique_detections = set() frame_count = 0 while cap.isOpened(): ret, frame = cap.read() if not ret: break # End of video frame_count += 1 results = model(frame) # Run YOLOv8 inference for r in results: boxes = r.boxes for box in boxes: x1, y1, x2, y2 = box.xyxy[0].tolist() conf = box.conf[0].item() cls = int(box.cls[0].item()) class_name = model.names[cls] unique_detections.add(f"{class_name}") # Draw bounding box cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2) # Add label label = f"{class_name} {conf:.2f}" cv2.putText(frame, label, (int(x1), int(y1)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) out.write(frame) # Save frame to output video cap.release() out.release() detections_list = list(unique_detections) return { "output_path": output_path, "detected_objects": [{"object": obj} for obj in detections_list], "frames_processed": frame_count } @tool def detect_road_lanes(video_path: str) -> dict: """Detects lane markings in an MP4 video using YOLOv8-seg and traditional CV techniques. Args: video_path: Path to the input video. Returns: Dictionary with processed video path and lane detection results. """ # Check if we already have downloaded the model, if not, download it model_path = "yolov8s-seg.pt" if not os.path.exists(model_path): # First, download YOLOv8 segmentation model model = YOLO("yolov8s-seg.pt") else: model = YOLO(model_path) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return {"error": f"Could not open video file at {video_path}"} width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) fourcc = cv2.VideoWriter_fourcc(*'mp4v') output_path = "lanes_output.mp4" out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) # For lane detection specifically lane_count = 0 detected_lanes = [] frame_count = 0 while cap.isOpened(): ret, frame = cap.read() if not ret: break frame_count += 1 # Create a visualization frame vis_frame = frame.copy() # Enhance lane detection with traditional computer vision gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) blur = cv2.GaussianBlur(gray, (5, 5), 0) edges = cv2.Canny(blur, 50, 150) # Create a mask focused on the lower portion of the image (where lanes typically are) mask = np.zeros_like(edges) height, width = edges.shape polygon = np.array([[(0, height), (width, height), (width, height//2), (0, height//2)]], dtype=np.int32) cv2.fillPoly(mask, polygon, 255) masked_edges = cv2.bitwise_and(edges, mask) # Apply Hough transform to detect lines lines = cv2.HoughLinesP(masked_edges, 1, np.pi/180, 50, minLineLength=100, maxLineGap=50) current_lane_count = 0 lane_lines = [] if lines is not None: for line in lines: x1, y1, x2, y2 = line[0] # Filter out horizontal lines (not lanes) if abs(x2 - x1) > 0 and abs(y2 - y1) / abs(x2 - x1) > 0.5: # Slope threshold cv2.line(vis_frame, (x1, y1), (x2, y2), (0, 0, 255), 2) # Red lane markings lane_lines.append(((x1, y1), (x2, y2))) # Count lanes by clustering similar lines if lane_lines: # Simple clustering: group lines with similar slopes slopes = [] for ((x1, y1), (x2, y2)) in lane_lines: # Avoid division by zero if x2 != x1: slope = (y2 - y1) / (x2 - x1) slopes.append(slope) # Cluster slopes to identify unique lanes unique_slopes = [] for slope in slopes: is_new = True for us in unique_slopes: if abs(slope - us) < 0.2: # Threshold for considering slopes similar is_new = False break if is_new: unique_slopes.append(slope) current_lane_count = len(unique_slopes) lane_count = max(lane_count, current_lane_count) # Update detected lanes information detected_lanes = [{"lane_id": i, "slope": s} for i, s in enumerate(unique_slopes)] # Add lane count text cv2.putText(vis_frame, f"Detected lanes: {current_lane_count}", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) # Try running YOLOv8 segmentation if available try: # Run segmentation model for road detection seg_results = model(frame, classes=[0, 1, 2, 3, 7]) # Focus on relevant classes if hasattr(seg_results[0], 'masks') and seg_results[0].masks is not None: masks = seg_results[0].masks for seg_mask in masks: # Convert mask to binary image mask_data = seg_mask.data.cpu().numpy()[0].astype(np.uint8) * 255 # Resize mask to frame size mask_data = cv2.resize(mask_data, (width, height)) # Create colored overlay for the mask color_mask = np.zeros_like(vis_frame) color_mask[mask_data > 0] = [0, 255, 255] # Yellow color for segmentation # Add the mask as semi-transparent overlay vis_frame = cv2.addWeighted(vis_frame, 1, color_mask, 0.3, 0) except Exception as e: print(f"Warning: YOLOv8 segmentation failed: {e}") # Continue without segmentation - we still have traditional lane detection out.write(vis_frame) cap.release() out.release() return { "output_path": output_path, "detected_lanes": detected_lanes, "lane_count": lane_count, "frames_processed": frame_count } @tool def driving_situation_analyzer(video_path: str) -> dict: """Analyzes road conditions by integrating YOLOv8 object detections and lane information. Args: video_path: Path to the input video. Returns: A dictionary containing situation analysis. """ # Run object detection with YOLOv8 object_results = get_yolov8_coco_detections(video_path) if "error" in object_results: return {"error": object_results["error"]} # Run lane detection with YOLOv8 lane_results = detect_road_lanes(video_path) if "error" in lane_results: return {"error": lane_results["error"]} # Extract information from results detected_objects = object_results.get("detected_objects", []) detected_lanes = lane_results.get("detected_lanes", []) lane_count = lane_results.get("lane_count", 0) # Analyze the driving situation situation = [] if any(obj["object"] in ["car", "truck", "bus"] for obj in detected_objects): situation.append("Traffic detected ahead, maintain safe distance.") if any(obj["object"] == "person" for obj in detected_objects): situation.append("Pedestrian detected, be prepared to stop.") if any(obj["object"] == "traffic light" for obj in detected_objects): situation.append("Traffic light detected, slow down if red.") if any(obj["object"] == "stop sign" for obj in detected_objects): situation.append("Stop sign detected, prepare to stop.") if lane_count == 0: situation.append("Lane markings not detected, potential risk of veering.") elif lane_count == 1: situation.append("Single lane detected, ensure proper lane following.") elif lane_count >= 2: situation.append(f"{lane_count} lanes detected, stay within lane boundaries.") # Evaluate road complexity road_complexity = "LOW" if lane_count > 2: road_complexity = "MEDIUM" vehicle_count = sum(1 for obj in detected_objects if obj["object"] in ["car", "truck", "bus"]) if vehicle_count > 3: road_complexity = "HIGH" if any(obj["object"] == "person" for obj in detected_objects) and vehicle_count > 1: road_complexity = "HIGH" # Evaluate safety level safety_level = "HIGH" if "Pedestrian detected" in " ".join(situation): safety_level = "MEDIUM" if "Lane markings not detected" in " ".join(situation): safety_level = "LOW" if vehicle_count > 5: safety_level = "MEDIUM" return { "situation_summary": " | ".join(situation) if situation else "Road situation unclear, proceed with caution.", "detected_objects": detected_objects, "detected_lanes": detected_lanes, "lane_count": lane_count, "road_complexity": road_complexity, "safety_level": safety_level, "objects_video": object_results.get("output_path"), "lanes_video": lane_results.get("output_path") } @tool def predict_trajectory(video_path: str) -> dict: """Predicts vehicle trajectory based on the driving situation analysis. Args: video_path: Path to the input video. Returns: A dictionary containing trajectory predictions. """ # First get the comprehensive analysis analysis = driving_situation_analyzer(video_path) if "error" in analysis: return {"error": analysis["error"]} # Extract key information for trajectory planning detected_objects = analysis.get("detected_objects", []) detected_lanes = analysis.get("detected_lanes", []) lane_count = analysis.get("lane_count", 0) road_complexity = analysis.get("road_complexity", "MEDIUM") safety_level = analysis.get("safety_level", "MEDIUM") summary = analysis.get("situation_summary", "") # Plan trajectory based on situation analysis trajectory = [] # Safety level affects overall driving strategy if safety_level == "LOW": trajectory.append("Reduce speed significantly, proceed with extreme caution.") elif safety_level == "MEDIUM": trajectory.append("Maintain moderate speed, be alert for changing conditions.") else: # HIGH trajectory.append("Normal driving conditions, maintain safe speed.") # Road complexity affects navigation approach if road_complexity == "HIGH": trajectory.append("Complex traffic environment, navigate with extra caution.") # Object-specific trajectory adjustments for obj in detected_objects: obj_name = obj["object"].lower() if "person" in obj_name: trajectory.append("Yield to pedestrians, prepare for potential stopping.") if obj_name in ["car", "truck", "bus"]: trajectory.append("Vehicle detected, maintain safe following distance.") if obj_name == "traffic light": trajectory.append("Approach intersection carefully, prepare to stop if light changes.") if obj_name == "stop sign": trajectory.append("Slow down and prepare to stop completely at the stop sign.") # Lane-specific trajectory planning if lane_count == 0: trajectory.append("No lanes detected, follow visual road boundaries carefully.") elif lane_count == 1: trajectory.append("Single lane detected, maintain centered position.") else: trajectory.append(f"{lane_count} lanes available, stay within current lane.") # Create video visualization of the predicted trajectory # Get last frame from the analysis video lane_video = analysis.get("lanes_video") if lane_video and os.path.exists(lane_video): cap = cv2.VideoCapture(lane_video) else: cap = cv2.VideoCapture(video_path) # Get the last frame for visualization frame = None while cap.isOpened(): ret, current_frame = cap.read() if not ret: break frame = current_frame cap.release() # Generate trajectory visualization if we have a frame if frame is not None: height, width = frame.shape[:2] # Create a copy of the frame for trajectory visualization trajectory_frame = frame.copy() # Draw starting point at bottom center start_x, start_y = width // 2, height - 50 cv2.circle(trajectory_frame, (start_x, start_y), 5, (255, 255, 0), -1) # Generate trajectory points based on analysis future_positions = [] x, y = start_x, start_y # Adjust trajectory based on safety level and road conditions lateral_variation = 5 # Default lateral variation if safety_level == "LOW": lateral_variation = 3 # Less lateral movement when unsafe elif road_complexity == "HIGH": lateral_variation = 8 # More potential variation in complex roads # Generate trajectory points for t in range(10): # Move forward (up in the image) y -= 30 # Move up by 30 pixels # Calculate lateral adjustment based on conditions if "pedestrian" in summary.lower(): # Move away from pedestrians (assume they're on the right) x -= np.random.uniform(0, lateral_variation) elif "traffic" in summary.lower() and t > 5: # Slight movement to adjust for traffic ahead x += np.random.uniform(-lateral_variation/2, lateral_variation/2) else: # Normal driving with slight randomness x += np.random.uniform(-lateral_variation/3, lateral_variation/3) # Ensure point is within frame and reasonable drivable area x = max(width * 0.2, min(width * 0.8, x)) # Keep within 20-80% of width y = max(0, min(height-1, y)) future_positions.append((int(x), int(y))) # Draw point on the frame cv2.circle(trajectory_frame, (int(x), int(y)), 5, (255, 255, 0), -1) # Connect points with lines for i in range(1, len(future_positions)): cv2.line(trajectory_frame, future_positions[i-1], future_positions[i], (255, 255, 0), 2) # Add trajectory recommendation text trajectory_text = " | ".join(trajectory) if trajectory else "Maintain current path." y_pos = 30 # Split text into multiple lines if too long for line in [trajectory_text[i:i+60] for i in range(0, len(trajectory_text), 60)]: cv2.putText(trajectory_frame, line, (30, y_pos), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) y_pos += 30 # Add safety level indicator safety_color = (0, 255, 0) if safety_level == "HIGH" else \ (0, 255, 255) if safety_level == "MEDIUM" else \ (0, 0, 255) # Red for LOW safety cv2.putText(trajectory_frame, f"Safety: {safety_level}", (width-150, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, safety_color, 2) # Save the trajectory visualization output_path = "trajectory_output.mp4" # Create a short video showing the trajectory fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, 1, (width, height)) # Write the frame several times to create a video for _ in range(3): out.write(trajectory_frame) out.release() else: future_positions = [] output_path = None return { "trajectory_recommendation": " | ".join(trajectory) if trajectory else "Maintain current path.", "future_positions": future_positions, "safety_level": safety_level, "road_complexity": road_complexity, "trajectory_video": output_path if os.path.exists("trajectory_output.mp4") else None, "analysis_videos": { "objects": analysis.get("objects_video"), "lanes": analysis.get("lanes_video") } } @tool def get_current_time_in_timezone(timezone: str) -> str: """Fetches the current local time in a specified timezone. Args: timezone: A string representing a valid timezone (e.g., 'America/New_York', 'Europe/London', 'Asia/Tokyo'). Returns: A string showing the current local time in the specified timezone. """ try: tz = pytz.timezone(timezone) local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S") return f"The current local time in {timezone} is: {local_time}" except Exception as e: return f"Error fetching time for timezone '{timezone}': {str(e)}" # Setup FinalAnswerTool final_answer = FinalAnswerTool() # Create a placeholder for a GradioUI class if it doesn't exist class GradioUIPlaceholder: def __init__(self, agent): self.agent = agent def launch(self): print("Using placeholder GradioUI implementation") create_gradio_interface().launch() # Setup model model = HfApiModel( max_tokens=2096, temperature=0.5, model_id='Qwen/Qwen2.5-Coder-32B-Instruct', custom_role_conversions=None, ) # Load prompts from YAML with open("prompts.yaml", 'r') as stream: prompt_templates = yaml.safe_load(stream) # Define agent agent = CodeAgent( model=model, tools=[ final_answer, get_yolov8_coco_detections, detect_road_lanes, driving_situation_analyzer, predict_trajectory, get_current_time_in_timezone ], max_steps=6, verbosity_level=1, grammar=None, planning_interval=None, name=None, description=None, prompt_templates=prompt_templates ) # Define Gradio interface for testing without the GradioUI wrapper def create_gradio_interface(): with gr.Blocks(title="Autonomous Driving Video Analysis with YOLOv8") as demo: gr.Markdown("# Autonomous Driving Video Analysis with YOLOv8") with gr.Row(): with gr.Column(): video_input = gr.Video(label="Upload Driving Video") analysis_type = gr.Radio( ["Object Detection (YOLOv8)", "Lane Detection (YOLOv8)", "Situation Analysis", "Trajectory Prediction"], label="Analysis Type", value="Object Detection (YOLOv8)" ) analyze_btn = gr.Button("Analyze Video") with gr.Column(): result_text = gr.Textbox(label="Analysis Results", lines=10) with gr.Tabs(): with gr.TabItem("Object Detection"): object_video_output = gr.Video(label="Object Detection Results") with gr.TabItem("Lane Detection"): lane_video_output = gr.Video(label="Lane Detection Results") with gr.TabItem("Trajectory Prediction"): trajectory_video_output = gr.Video(label="Trajectory Prediction") def process_video(video_path, analysis_type): """Process the video based on the selected analysis type.""" if not video_path: return "Please upload a video file.", None, None, None # Save the uploaded video to a temporary file temp_dir = tempfile.gettempdir() temp_video_path = os.path.join(temp_dir, "input_video.mp4") # Save the uploaded video with open(temp_video_path, "wb") as f: if hasattr(video_path, "read"): # If video_path is a file-like object (from Gradio) f.write(video_path.read()) else: # If video_path is already a path with open(video_path, "rb") as source_file: f.write(source_file.read()) result = None object_video = None lane_video = None trajectory_video = None try: if analysis_type == "Object Detection (YOLOv8)": result = get_yolov8_coco_detections(temp_video_path) if isinstance(result, dict) and "output_path" in result and os.path.exists(result["output_path"]): object_video = result["output_path"] elif analysis_type == "Lane Detection (YOLOv8)": result = detect_road_lanes(temp_video_path) if isinstance(result, dict) and "output_path" in result and os.path.exists(result["output_path"]): lane_video = result["output_path"] elif analysis_type == "Situation Analysis": result = driving_situation_analyzer(temp_video_path) if isinstance(result, dict): if "objects_video" in result and result["objects_video"] and os.path.exists(result["objects_video"]): object_video = result["objects_video"] if "lanes_video" in result and result["lanes_video"] and os.path.exists(result["lanes_video"]): lane_video = result["lanes_video"] elif analysis_type == "Trajectory Prediction": result = predict_trajectory(temp_video_path) if isinstance(result, dict): if "analysis_videos" in result: videos = result["analysis_videos"] if "objects" in videos and videos["objects"] and os.path.exists(videos["objects"]): object_video = videos["objects"] if "lanes" in videos and videos["lanes"] and os.path.exists(videos["lanes"]): lane_video = videos["lanes"] if "trajectory_video" in result and result["trajectory_video"] and os.path.exists(result["trajectory_video"]): trajectory_video = result["trajectory_video"] except Exception as e: return f"Error processing video: {str(e)}", None, None, None return str(result), object_video, lane_video, trajectory_video analyze_btn.click( fn=process_video, inputs=[video_input, analysis_type], outputs=[result_text, object_video_output, lane_video_output, trajectory_video_output] ) return demo # Main execution - Try to use the original GradioUI if available, otherwise use our custom interface try: # Check if GradioUI is available in the global namespace if 'GradioUI' in globals(): print("Using original GradioUI") GradioUI(agent).launch() else: # Use our placeholder implementation if the original isn't available raise ImportError("Original GradioUI not found") except Exception as e: print(f"Error using original GradioUI: {e}") print("Launching custom Gradio interface instead") create_gradio_interface().launch()