Spaces:
Sleeping
Sleeping
| # Install required packages first | |
| import os | |
| import sys | |
| import subprocess | |
| # Function to install packages if they are not already installed | |
| def install_packages(): | |
| required_packages = [ | |
| 'ultralytics', | |
| 'smolagents', | |
| 'pytz', | |
| 'pyyaml', | |
| 'opencv-python', | |
| 'numpy', | |
| 'gradio' | |
| ] | |
| for package in required_packages: | |
| try: | |
| __import__(package) | |
| print(f"{package} is already installed.") | |
| except ImportError: | |
| print(f"Installing {package}...") | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", package]) | |
| print(f"{package} has been installed.") | |
| # Install required packages | |
| print("Checking and installing required packages...") | |
| install_packages() | |
| # Now import the required modules | |
| from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel, load_tool, tool | |
| import datetime | |
| import requests | |
| import pytz | |
| import yaml | |
| import tempfile | |
| import numpy as np | |
| import cv2 | |
| import gradio as gr | |
| from ultralytics import YOLO # YOLOv8 model | |
| # Create tools directory and FinalAnswerTool if they don't exist | |
| os.makedirs("tools", exist_ok=True) | |
| if not os.path.exists("tools/final_answer.py"): | |
| with open("tools/final_answer.py", "w") as f: | |
| f.write(""" | |
| class FinalAnswerTool: | |
| def __call__(self, answer): | |
| return {"answer": answer} | |
| """) | |
| # Import FinalAnswerTool | |
| sys.path.append(os.getcwd()) | |
| from tools.final_answer import FinalAnswerTool | |
| # Create prompts.yaml if it doesn't exist | |
| if not os.path.exists("prompts.yaml"): | |
| prompts = { | |
| "default": "You are an autonomous driving assistant that helps analyze road scenes and make driving decisions.", | |
| "prefix": "Analyze the following driving scenario: ", | |
| "suffix": "Provide a detailed analysis with safety recommendations." | |
| } | |
| with open("prompts.yaml", 'w') as file: | |
| yaml.dump(prompts, file) | |
| def get_yolov8_coco_detections(video_path: str) -> dict: | |
| """Detects objects in an MP4 video file using YOLOv8. | |
| Args: | |
| video_path: Path to the input video. | |
| Returns: | |
| Dictionary with processed video path and detection results. | |
| """ | |
| model = YOLO("yolov8s.pt") # Load pre-trained YOLOv8 model | |
| cap = cv2.VideoCapture(video_path) # Load video | |
| if not cap.isOpened(): | |
| return {"error": f"Could not open video file at {video_path}"} | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for output video | |
| output_path = "output_video.mp4" # Save processed video | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
| unique_detections = set() | |
| frame_count = 0 | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break # End of video | |
| frame_count += 1 | |
| results = model(frame) # Run YOLOv8 inference | |
| for r in results: | |
| boxes = r.boxes | |
| for box in boxes: | |
| x1, y1, x2, y2 = box.xyxy[0].tolist() | |
| conf = box.conf[0].item() | |
| cls = int(box.cls[0].item()) | |
| class_name = model.names[cls] | |
| unique_detections.add(f"{class_name}") | |
| # Draw bounding box | |
| cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2) | |
| # Add label | |
| label = f"{class_name} {conf:.2f}" | |
| cv2.putText(frame, label, (int(x1), int(y1)-10), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) | |
| out.write(frame) # Save frame to output video | |
| cap.release() | |
| out.release() | |
| detections_list = list(unique_detections) | |
| return { | |
| "output_path": output_path, | |
| "detected_objects": [{"object": obj} for obj in detections_list], | |
| "frames_processed": frame_count | |
| } | |
| def detect_road_lanes(video_path: str) -> dict: | |
| """Detects lane markings in an MP4 video using YOLOv8-seg and traditional CV techniques. | |
| Args: | |
| video_path: Path to the input video. | |
| Returns: | |
| Dictionary with processed video path and lane detection results. | |
| """ | |
| # Check if we already have downloaded the model, if not, download it | |
| model_path = "yolov8s-seg.pt" | |
| if not os.path.exists(model_path): | |
| # First, download YOLOv8 segmentation model | |
| model = YOLO("yolov8s-seg.pt") | |
| else: | |
| model = YOLO(model_path) | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return {"error": f"Could not open video file at {video_path}"} | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| output_path = "lanes_output.mp4" | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
| # For lane detection specifically | |
| lane_count = 0 | |
| detected_lanes = [] | |
| frame_count = 0 | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frame_count += 1 | |
| # Create a visualization frame | |
| vis_frame = frame.copy() | |
| # Enhance lane detection with traditional computer vision | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| blur = cv2.GaussianBlur(gray, (5, 5), 0) | |
| edges = cv2.Canny(blur, 50, 150) | |
| # Create a mask focused on the lower portion of the image (where lanes typically are) | |
| mask = np.zeros_like(edges) | |
| height, width = edges.shape | |
| polygon = np.array([[(0, height), (width, height), (width, height//2), (0, height//2)]], dtype=np.int32) | |
| cv2.fillPoly(mask, polygon, 255) | |
| masked_edges = cv2.bitwise_and(edges, mask) | |
| # Apply Hough transform to detect lines | |
| lines = cv2.HoughLinesP(masked_edges, 1, np.pi/180, 50, minLineLength=100, maxLineGap=50) | |
| current_lane_count = 0 | |
| lane_lines = [] | |
| if lines is not None: | |
| for line in lines: | |
| x1, y1, x2, y2 = line[0] | |
| # Filter out horizontal lines (not lanes) | |
| if abs(x2 - x1) > 0 and abs(y2 - y1) / abs(x2 - x1) > 0.5: # Slope threshold | |
| cv2.line(vis_frame, (x1, y1), (x2, y2), (0, 0, 255), 2) # Red lane markings | |
| lane_lines.append(((x1, y1), (x2, y2))) | |
| # Count lanes by clustering similar lines | |
| if lane_lines: | |
| # Simple clustering: group lines with similar slopes | |
| slopes = [] | |
| for ((x1, y1), (x2, y2)) in lane_lines: | |
| # Avoid division by zero | |
| if x2 != x1: | |
| slope = (y2 - y1) / (x2 - x1) | |
| slopes.append(slope) | |
| # Cluster slopes to identify unique lanes | |
| unique_slopes = [] | |
| for slope in slopes: | |
| is_new = True | |
| for us in unique_slopes: | |
| if abs(slope - us) < 0.2: # Threshold for considering slopes similar | |
| is_new = False | |
| break | |
| if is_new: | |
| unique_slopes.append(slope) | |
| current_lane_count = len(unique_slopes) | |
| lane_count = max(lane_count, current_lane_count) | |
| # Update detected lanes information | |
| detected_lanes = [{"lane_id": i, "slope": s} for i, s in enumerate(unique_slopes)] | |
| # Add lane count text | |
| cv2.putText(vis_frame, f"Detected lanes: {current_lane_count}", (50, 50), | |
| cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) | |
| # Try running YOLOv8 segmentation if available | |
| try: | |
| # Run segmentation model for road detection | |
| seg_results = model(frame, classes=[0, 1, 2, 3, 7]) # Focus on relevant classes | |
| if hasattr(seg_results[0], 'masks') and seg_results[0].masks is not None: | |
| masks = seg_results[0].masks | |
| for seg_mask in masks: | |
| # Convert mask to binary image | |
| mask_data = seg_mask.data.cpu().numpy()[0].astype(np.uint8) * 255 | |
| # Resize mask to frame size | |
| mask_data = cv2.resize(mask_data, (width, height)) | |
| # Create colored overlay for the mask | |
| color_mask = np.zeros_like(vis_frame) | |
| color_mask[mask_data > 0] = [0, 255, 255] # Yellow color for segmentation | |
| # Add the mask as semi-transparent overlay | |
| vis_frame = cv2.addWeighted(vis_frame, 1, color_mask, 0.3, 0) | |
| except Exception as e: | |
| print(f"Warning: YOLOv8 segmentation failed: {e}") | |
| # Continue without segmentation - we still have traditional lane detection | |
| out.write(vis_frame) | |
| cap.release() | |
| out.release() | |
| return { | |
| "output_path": output_path, | |
| "detected_lanes": detected_lanes, | |
| "lane_count": lane_count, | |
| "frames_processed": frame_count | |
| } | |
| def driving_situation_analyzer(video_path: str) -> dict: | |
| """Analyzes road conditions by integrating YOLOv8 object detections and lane information. | |
| Args: | |
| video_path: Path to the input video. | |
| Returns: | |
| A dictionary containing situation analysis. | |
| """ | |
| # Run object detection with YOLOv8 | |
| object_results = get_yolov8_coco_detections(video_path) | |
| if "error" in object_results: | |
| return {"error": object_results["error"]} | |
| # Run lane detection with YOLOv8 | |
| lane_results = detect_road_lanes(video_path) | |
| if "error" in lane_results: | |
| return {"error": lane_results["error"]} | |
| # Extract information from results | |
| detected_objects = object_results.get("detected_objects", []) | |
| detected_lanes = lane_results.get("detected_lanes", []) | |
| lane_count = lane_results.get("lane_count", 0) | |
| # Analyze the driving situation | |
| situation = [] | |
| if any(obj["object"] in ["car", "truck", "bus"] for obj in detected_objects): | |
| situation.append("Traffic detected ahead, maintain safe distance.") | |
| if any(obj["object"] == "person" for obj in detected_objects): | |
| situation.append("Pedestrian detected, be prepared to stop.") | |
| if any(obj["object"] == "traffic light" for obj in detected_objects): | |
| situation.append("Traffic light detected, slow down if red.") | |
| if any(obj["object"] == "stop sign" for obj in detected_objects): | |
| situation.append("Stop sign detected, prepare to stop.") | |
| if lane_count == 0: | |
| situation.append("Lane markings not detected, potential risk of veering.") | |
| elif lane_count == 1: | |
| situation.append("Single lane detected, ensure proper lane following.") | |
| elif lane_count >= 2: | |
| situation.append(f"{lane_count} lanes detected, stay within lane boundaries.") | |
| # Evaluate road complexity | |
| road_complexity = "LOW" | |
| if lane_count > 2: | |
| road_complexity = "MEDIUM" | |
| vehicle_count = sum(1 for obj in detected_objects if obj["object"] in ["car", "truck", "bus"]) | |
| if vehicle_count > 3: | |
| road_complexity = "HIGH" | |
| if any(obj["object"] == "person" for obj in detected_objects) and vehicle_count > 1: | |
| road_complexity = "HIGH" | |
| # Evaluate safety level | |
| safety_level = "HIGH" | |
| if "Pedestrian detected" in " ".join(situation): | |
| safety_level = "MEDIUM" | |
| if "Lane markings not detected" in " ".join(situation): | |
| safety_level = "LOW" | |
| if vehicle_count > 5: | |
| safety_level = "MEDIUM" | |
| return { | |
| "situation_summary": " | ".join(situation) if situation else "Road situation unclear, proceed with caution.", | |
| "detected_objects": detected_objects, | |
| "detected_lanes": detected_lanes, | |
| "lane_count": lane_count, | |
| "road_complexity": road_complexity, | |
| "safety_level": safety_level, | |
| "objects_video": object_results.get("output_path"), | |
| "lanes_video": lane_results.get("output_path") | |
| } | |
| def predict_trajectory(video_path: str) -> dict: | |
| """Predicts vehicle trajectory based on the driving situation analysis. | |
| Args: | |
| video_path: Path to the input video. | |
| Returns: | |
| A dictionary containing trajectory predictions. | |
| """ | |
| # First get the comprehensive analysis | |
| analysis = driving_situation_analyzer(video_path) | |
| if "error" in analysis: | |
| return {"error": analysis["error"]} | |
| # Extract key information for trajectory planning | |
| detected_objects = analysis.get("detected_objects", []) | |
| detected_lanes = analysis.get("detected_lanes", []) | |
| lane_count = analysis.get("lane_count", 0) | |
| road_complexity = analysis.get("road_complexity", "MEDIUM") | |
| safety_level = analysis.get("safety_level", "MEDIUM") | |
| summary = analysis.get("situation_summary", "") | |
| # Plan trajectory based on situation analysis | |
| trajectory = [] | |
| # Safety level affects overall driving strategy | |
| if safety_level == "LOW": | |
| trajectory.append("Reduce speed significantly, proceed with extreme caution.") | |
| elif safety_level == "MEDIUM": | |
| trajectory.append("Maintain moderate speed, be alert for changing conditions.") | |
| else: # HIGH | |
| trajectory.append("Normal driving conditions, maintain safe speed.") | |
| # Road complexity affects navigation approach | |
| if road_complexity == "HIGH": | |
| trajectory.append("Complex traffic environment, navigate with extra caution.") | |
| # Object-specific trajectory adjustments | |
| for obj in detected_objects: | |
| obj_name = obj["object"].lower() | |
| if "person" in obj_name: | |
| trajectory.append("Yield to pedestrians, prepare for potential stopping.") | |
| if obj_name in ["car", "truck", "bus"]: | |
| trajectory.append("Vehicle detected, maintain safe following distance.") | |
| if obj_name == "traffic light": | |
| trajectory.append("Approach intersection carefully, prepare to stop if light changes.") | |
| if obj_name == "stop sign": | |
| trajectory.append("Slow down and prepare to stop completely at the stop sign.") | |
| # Lane-specific trajectory planning | |
| if lane_count == 0: | |
| trajectory.append("No lanes detected, follow visual road boundaries carefully.") | |
| elif lane_count == 1: | |
| trajectory.append("Single lane detected, maintain centered position.") | |
| else: | |
| trajectory.append(f"{lane_count} lanes available, stay within current lane.") | |
| # Create video visualization of the predicted trajectory | |
| # Get last frame from the analysis video | |
| lane_video = analysis.get("lanes_video") | |
| if lane_video and os.path.exists(lane_video): | |
| cap = cv2.VideoCapture(lane_video) | |
| else: | |
| cap = cv2.VideoCapture(video_path) | |
| # Get the last frame for visualization | |
| frame = None | |
| while cap.isOpened(): | |
| ret, current_frame = cap.read() | |
| if not ret: | |
| break | |
| frame = current_frame | |
| cap.release() | |
| # Generate trajectory visualization if we have a frame | |
| if frame is not None: | |
| height, width = frame.shape[:2] | |
| # Create a copy of the frame for trajectory visualization | |
| trajectory_frame = frame.copy() | |
| # Draw starting point at bottom center | |
| start_x, start_y = width // 2, height - 50 | |
| cv2.circle(trajectory_frame, (start_x, start_y), 5, (255, 255, 0), -1) | |
| # Generate trajectory points based on analysis | |
| future_positions = [] | |
| x, y = start_x, start_y | |
| # Adjust trajectory based on safety level and road conditions | |
| lateral_variation = 5 # Default lateral variation | |
| if safety_level == "LOW": | |
| lateral_variation = 3 # Less lateral movement when unsafe | |
| elif road_complexity == "HIGH": | |
| lateral_variation = 8 # More potential variation in complex roads | |
| # Generate trajectory points | |
| for t in range(10): | |
| # Move forward (up in the image) | |
| y -= 30 # Move up by 30 pixels | |
| # Calculate lateral adjustment based on conditions | |
| if "pedestrian" in summary.lower(): | |
| # Move away from pedestrians (assume they're on the right) | |
| x -= np.random.uniform(0, lateral_variation) | |
| elif "traffic" in summary.lower() and t > 5: | |
| # Slight movement to adjust for traffic ahead | |
| x += np.random.uniform(-lateral_variation/2, lateral_variation/2) | |
| else: | |
| # Normal driving with slight randomness | |
| x += np.random.uniform(-lateral_variation/3, lateral_variation/3) | |
| # Ensure point is within frame and reasonable drivable area | |
| x = max(width * 0.2, min(width * 0.8, x)) # Keep within 20-80% of width | |
| y = max(0, min(height-1, y)) | |
| future_positions.append((int(x), int(y))) | |
| # Draw point on the frame | |
| cv2.circle(trajectory_frame, (int(x), int(y)), 5, (255, 255, 0), -1) | |
| # Connect points with lines | |
| for i in range(1, len(future_positions)): | |
| cv2.line(trajectory_frame, future_positions[i-1], future_positions[i], (255, 255, 0), 2) | |
| # Add trajectory recommendation text | |
| trajectory_text = " | ".join(trajectory) if trajectory else "Maintain current path." | |
| y_pos = 30 | |
| # Split text into multiple lines if too long | |
| for line in [trajectory_text[i:i+60] for i in range(0, len(trajectory_text), 60)]: | |
| cv2.putText(trajectory_frame, line, (30, y_pos), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) | |
| y_pos += 30 | |
| # Add safety level indicator | |
| safety_color = (0, 255, 0) if safety_level == "HIGH" else \ | |
| (0, 255, 255) if safety_level == "MEDIUM" else \ | |
| (0, 0, 255) # Red for LOW safety | |
| cv2.putText(trajectory_frame, f"Safety: {safety_level}", (width-150, 30), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, safety_color, 2) | |
| # Save the trajectory visualization | |
| output_path = "trajectory_output.mp4" | |
| # Create a short video showing the trajectory | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, 1, (width, height)) | |
| # Write the frame several times to create a video | |
| for _ in range(3): | |
| out.write(trajectory_frame) | |
| out.release() | |
| else: | |
| future_positions = [] | |
| output_path = None | |
| return { | |
| "trajectory_recommendation": " | ".join(trajectory) if trajectory else "Maintain current path.", | |
| "future_positions": future_positions, | |
| "safety_level": safety_level, | |
| "road_complexity": road_complexity, | |
| "trajectory_video": output_path if os.path.exists("trajectory_output.mp4") else None, | |
| "analysis_videos": { | |
| "objects": analysis.get("objects_video"), | |
| "lanes": analysis.get("lanes_video") | |
| } | |
| } | |
| def get_current_time_in_timezone(timezone: str) -> str: | |
| """Fetches the current local time in a specified timezone. | |
| Args: | |
| timezone: A string representing a valid timezone (e.g., 'America/New_York', 'Europe/London', 'Asia/Tokyo'). | |
| Returns: | |
| A string showing the current local time in the specified timezone. | |
| """ | |
| try: | |
| tz = pytz.timezone(timezone) | |
| local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S") | |
| return f"The current local time in {timezone} is: {local_time}" | |
| except Exception as e: | |
| return f"Error fetching time for timezone '{timezone}': {str(e)}" | |
| # Setup FinalAnswerTool | |
| final_answer = FinalAnswerTool() | |
| # Create a placeholder for a GradioUI class if it doesn't exist | |
| class GradioUIPlaceholder: | |
| def __init__(self, agent): | |
| self.agent = agent | |
| def launch(self): | |
| print("Using placeholder GradioUI implementation") | |
| create_gradio_interface().launch() | |
| # Setup model | |
| model = HfApiModel( | |
| max_tokens=2096, | |
| temperature=0.5, | |
| model_id='Qwen/Qwen2.5-Coder-32B-Instruct', | |
| custom_role_conversions=None, | |
| ) | |
| # Load prompts from YAML | |
| with open("prompts.yaml", 'r') as stream: | |
| prompt_templates = yaml.safe_load(stream) | |
| # Define agent | |
| agent = CodeAgent( | |
| model=model, | |
| tools=[ | |
| final_answer, | |
| get_yolov8_coco_detections, | |
| detect_road_lanes, | |
| driving_situation_analyzer, | |
| predict_trajectory, | |
| get_current_time_in_timezone | |
| ], | |
| max_steps=6, | |
| verbosity_level=1, | |
| grammar=None, | |
| planning_interval=None, | |
| name=None, | |
| description=None, | |
| prompt_templates=prompt_templates | |
| ) | |
| # Define Gradio interface for testing without the GradioUI wrapper | |
| def create_gradio_interface(): | |
| with gr.Blocks(title="Autonomous Driving Video Analysis with YOLOv8") as demo: | |
| gr.Markdown("# Autonomous Driving Video Analysis with YOLOv8") | |
| with gr.Row(): | |
| with gr.Column(): | |
| video_input = gr.Video(label="Upload Driving Video") | |
| analysis_type = gr.Radio( | |
| ["Object Detection (YOLOv8)", "Lane Detection (YOLOv8)", "Situation Analysis", "Trajectory Prediction"], | |
| label="Analysis Type", | |
| value="Object Detection (YOLOv8)" | |
| ) | |
| analyze_btn = gr.Button("Analyze Video") | |
| with gr.Column(): | |
| result_text = gr.Textbox(label="Analysis Results", lines=10) | |
| with gr.Tabs(): | |
| with gr.TabItem("Object Detection"): | |
| object_video_output = gr.Video(label="Object Detection Results") | |
| with gr.TabItem("Lane Detection"): | |
| lane_video_output = gr.Video(label="Lane Detection Results") | |
| with gr.TabItem("Trajectory Prediction"): | |
| trajectory_video_output = gr.Video(label="Trajectory Prediction") | |
| def process_video(video_path, analysis_type): | |
| """Process the video based on the selected analysis type.""" | |
| if not video_path: | |
| return "Please upload a video file.", None, None, None | |
| # Save the uploaded video to a temporary file | |
| temp_dir = tempfile.gettempdir() | |
| temp_video_path = os.path.join(temp_dir, "input_video.mp4") | |
| # Save the uploaded video | |
| with open(temp_video_path, "wb") as f: | |
| if hasattr(video_path, "read"): | |
| # If video_path is a file-like object (from Gradio) | |
| f.write(video_path.read()) | |
| else: | |
| # If video_path is already a path | |
| with open(video_path, "rb") as source_file: | |
| f.write(source_file.read()) | |
| result = None | |
| object_video = None | |
| lane_video = None | |
| trajectory_video = None | |
| try: | |
| if analysis_type == "Object Detection (YOLOv8)": | |
| result = get_yolov8_coco_detections(temp_video_path) | |
| if isinstance(result, dict) and "output_path" in result and os.path.exists(result["output_path"]): | |
| object_video = result["output_path"] | |
| elif analysis_type == "Lane Detection (YOLOv8)": | |
| result = detect_road_lanes(temp_video_path) | |
| if isinstance(result, dict) and "output_path" in result and os.path.exists(result["output_path"]): | |
| lane_video = result["output_path"] | |
| elif analysis_type == "Situation Analysis": | |
| result = driving_situation_analyzer(temp_video_path) | |
| if isinstance(result, dict): | |
| if "objects_video" in result and result["objects_video"] and os.path.exists(result["objects_video"]): | |
| object_video = result["objects_video"] | |
| if "lanes_video" in result and result["lanes_video"] and os.path.exists(result["lanes_video"]): | |
| lane_video = result["lanes_video"] | |
| elif analysis_type == "Trajectory Prediction": | |
| result = predict_trajectory(temp_video_path) | |
| if isinstance(result, dict): | |
| if "analysis_videos" in result: | |
| videos = result["analysis_videos"] | |
| if "objects" in videos and videos["objects"] and os.path.exists(videos["objects"]): | |
| object_video = videos["objects"] | |
| if "lanes" in videos and videos["lanes"] and os.path.exists(videos["lanes"]): | |
| lane_video = videos["lanes"] | |
| if "trajectory_video" in result and result["trajectory_video"] and os.path.exists(result["trajectory_video"]): | |
| trajectory_video = result["trajectory_video"] | |
| except Exception as e: | |
| return f"Error processing video: {str(e)}", None, None, None | |
| return str(result), object_video, lane_video, trajectory_video | |
| analyze_btn.click( | |
| fn=process_video, | |
| inputs=[video_input, analysis_type], | |
| outputs=[result_text, object_video_output, lane_video_output, trajectory_video_output] | |
| ) | |
| return demo | |
| # Main execution - Try to use the original GradioUI if available, otherwise use our custom interface | |
| try: | |
| # Check if GradioUI is available in the global namespace | |
| if 'GradioUI' in globals(): | |
| print("Using original GradioUI") | |
| GradioUI(agent).launch() | |
| else: | |
| # Use our placeholder implementation if the original isn't available | |
| raise ImportError("Original GradioUI not found") | |
| except Exception as e: | |
| print(f"Error using original GradioUI: {e}") | |
| print("Launching custom Gradio interface instead") | |
| create_gradio_interface().launch() |