Spaces:
Sleeping
Sleeping
| from smolagents import Tool | |
| import os | |
| import time | |
| import tempfile | |
| from transformers import pipeline | |
| from typing import List, Dict | |
| from PIL import Image | |
| import io | |
| # Import required browser automation libraries | |
| from selenium import webdriver | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.common.keys import Keys | |
| from selenium.common.exceptions import TimeoutException, NoSuchElementException | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| import helium | |
| class WebVideoAnalyzerTool(Tool): | |
| name = "web_video_analyzer" | |
| description = "Analyzes a video on a webpage (YouTube, Vimeo, etc.) by taking screenshots at intervals and counting objects of a specified type in each frame." | |
| inputs = { | |
| "url": { | |
| "type": "string", | |
| "description": "The URL of the web page containing the video to analyze.", | |
| }, | |
| "label": { | |
| "type": "string", | |
| "description": "The type of object to count (e.g., 'bird', 'person', 'car', 'dog'). Use common object names recognized by standard object detection models.", | |
| }, | |
| "duration": { | |
| "type": "integer", | |
| "description": "How many seconds of the video to analyze (default: 30)", | |
| "nullable": True, | |
| }, | |
| "interval": { | |
| "type": "integer", | |
| "description": "How often to take screenshots (in seconds, default: 1)", | |
| "nullable": True, | |
| }, | |
| } | |
| output_type = "string" | |
| def _setup_browser(self): | |
| """Initialize the browser with appropriate settings.""" | |
| if self.driver is not None: | |
| return self.driver | |
| print("Setting up browser...") | |
| # Configure Chrome options | |
| chrome_options = webdriver.ChromeOptions() | |
| chrome_options.add_argument("--force-device-scale-factor=1") | |
| chrome_options.add_argument("--window-size=1280,720") | |
| chrome_options.add_argument("--disable-pdf-viewer") | |
| chrome_options.add_argument("--window-position=0,0") | |
| chrome_options.add_argument("--autoplay-policy=no-user-gesture-required") | |
| # Initialize the driver | |
| self.driver = helium.start_chrome(headless=False, options=chrome_options) | |
| return self.driver | |
| def _navigate_to_video(self, url: str) -> bool: | |
| """Navigate to the video URL and prepare for playback.""" | |
| try: | |
| print(f"Navigating to {url}...") | |
| helium.go_to(url) | |
| # Wait for page to load | |
| time.sleep(3) | |
| # Handle YouTube-specific interactions | |
| if "youtube.com" in url: | |
| try: | |
| # Accept cookies if prompted | |
| if helium.Button("Accept all").exists(): | |
| helium.click("Accept all") | |
| elif helium.Button("I agree").exists(): | |
| helium.click("I agree") | |
| # Click on the video to ensure it's playing | |
| try: | |
| # Find the video player element | |
| video_element = WebDriverWait(self.driver, 10).until( | |
| EC.presence_of_element_located((By.TAG_NAME, "video")) | |
| ) | |
| video_element.click() | |
| # Ensure the video is playing by trying to click the play button if visible | |
| try: | |
| play_button = self.driver.find_element( | |
| By.CLASS_NAME, "ytp-play-button" | |
| ) | |
| if "Play" in play_button.get_attribute("aria-label"): | |
| play_button.click() | |
| except: | |
| pass | |
| except: | |
| print("Could not locate video element to click") | |
| except Exception as e: | |
| print(f"Error during YouTube setup: {str(e)}") | |
| # General approach - try to find and click on any video element | |
| else: | |
| try: | |
| # Try to find video element | |
| video_elements = self.driver.find_elements(By.TAG_NAME, "video") | |
| if video_elements: | |
| video_elements[0].click() | |
| except Exception as e: | |
| print(f"Could not find or click video element: {str(e)}") | |
| # Allow video to start | |
| time.sleep(2) | |
| return True | |
| except Exception as e: | |
| print(f"Error navigating to {url}: {str(e)}") | |
| return False | |
| def _close_popups(self): | |
| """Attempt to close any popups or overlays.""" | |
| try: | |
| # Try pressing Escape key to close general popups | |
| webdriver.ActionChains(self.driver).send_keys(Keys.ESCAPE).perform() | |
| # YouTube-specific: try to close any visible dialog or popup | |
| if "youtube.com" in self.driver.current_url: | |
| # Try to find and click close buttons on popups | |
| try: | |
| close_buttons = self.driver.find_elements( | |
| By.CSS_SELECTOR, | |
| "button.ytp-ad-overlay-close-button, button.ytp-ad-skip-button", | |
| ) | |
| for button in close_buttons: | |
| button.click() | |
| except: | |
| pass | |
| except Exception as e: | |
| print(f"Error closing popups: {str(e)}") | |
| def _take_screenshot(self) -> Image.Image: | |
| """Take a screenshot of the current browser window.""" | |
| png_bytes = self.driver.get_screenshot_as_png() | |
| return Image.open(io.BytesIO(png_bytes)) | |
| def _analyze_screenshot(self, image: Image.Image, label: str) -> int: | |
| """Count objects of the specified label in a screenshot.""" | |
| detector = pipeline("object-detection", model="facebook/detr-resnet-50") | |
| try: | |
| # Run detection on the image | |
| results = detector(image) | |
| # Count objects matching the label | |
| object_count = sum( | |
| 1 for result in results if label.lower() in result["label"].lower() | |
| ) | |
| # Debug: print detected classes | |
| detected_classes = [result["label"] for result in results] | |
| if detected_classes: | |
| print(f"Detected classes: {', '.join(detected_classes)}") | |
| return object_count | |
| except Exception as e: | |
| print(f"Error detecting objects in screenshot: {str(e)}") | |
| return 0 | |
| def _capture_video_frames( | |
| self, duration: int = 30, interval: int = 1, label: str = "" | |
| ) -> List[Dict]: | |
| """Capture frames from the video at regular intervals.""" | |
| results = [] | |
| print( | |
| f"Starting frame capture for {duration} seconds with {interval} second intervals..." | |
| ) | |
| temp_dir = tempfile.mkdtemp() | |
| for seconds_elapsed in range(0, duration, interval): | |
| # Take screenshot | |
| try: | |
| print(f"Capturing frame at {seconds_elapsed} seconds...") | |
| screenshot = self._take_screenshot() | |
| # Save screenshot for debugging (optional) | |
| screenshot_path = os.path.join(temp_dir, f"frame_{seconds_elapsed}.jpg") | |
| screenshot.save(screenshot_path) | |
| # Analyze screenshot | |
| object_count = self._analyze_screenshot(screenshot, label) | |
| # Store results | |
| results.append( | |
| { | |
| "time": seconds_elapsed, | |
| "object_count": object_count, | |
| "screenshot_path": screenshot_path, | |
| } | |
| ) | |
| # Wait for next interval | |
| if seconds_elapsed + interval < duration: | |
| time.sleep(interval) | |
| except Exception as e: | |
| print(f"Error capturing frame at {seconds_elapsed} seconds: {str(e)}") | |
| return results | |
| def forward( | |
| self, url: str, label: str, duration: int = 30, interval: int = 1 | |
| ) -> str: | |
| """ | |
| Analyzes a video on a webpage by taking screenshots and counting objects. | |
| Args: | |
| url (str): The URL of the webpage containing the video. | |
| label (str): The type of object to count (e.g., 'bird', 'person', 'car', 'dog'). | |
| duration (int): How many seconds of the video to analyze. | |
| interval (int): How often to take screenshots (in seconds). | |
| Returns: | |
| str: A detailed report of object counts over time. | |
| """ | |
| try: | |
| # Setup the browser | |
| self._setup_browser() | |
| # Navigate to the video | |
| if not self._navigate_to_video(url): | |
| return f"Error: Could not navigate to or play the video at {url}" | |
| # Close any popups or overlays | |
| self._close_popups() | |
| # Capture and analyze frames | |
| frame_results = self._capture_video_frames(duration, interval, label) | |
| # Calculate summary statistics | |
| if not frame_results: | |
| return f"Error: No frames were successfully captured and analyzed" | |
| total_objects = sum(result["object_count"] for result in frame_results) | |
| avg_objects = total_objects / len(frame_results) | |
| max_objects = max(frame_results, key=lambda x: x["object_count"]) | |
| # Generate a report | |
| report = [ | |
| f"# {label.title()} Count Analysis for Video", | |
| f"Video URL: {url}", | |
| f"Analysis duration: {duration} seconds", | |
| f"Screenshots taken: {len(frame_results)} (every {interval} second(s))", | |
| "", | |
| "## Summary", | |
| f"Total {label}s detected: {total_objects}", | |
| f"Average {label}s per screenshot: {avg_objects:.2f}", | |
| f"Maximum {label}s in a single screenshot: {max_objects['object_count']} (at {max_objects['time']} seconds)", | |
| "", | |
| "## Time-based Analysis", | |
| ] | |
| # Add frame-by-frame details | |
| for result in frame_results: | |
| report.append( | |
| f"Time {result['time']} seconds: {result['object_count']} {label}s" | |
| ) | |
| # Clean up | |
| try: | |
| helium.kill_browser() | |
| self.driver = None | |
| except: | |
| print("Warning: Could not properly close the browser") | |
| return "\n".join(report) | |
| except Exception as e: | |
| # Ensure browser is closed on error | |
| try: | |
| if self.driver: | |
| helium.kill_browser() | |
| self.driver = None | |
| except: | |
| pass | |
| return f"Error analyzing video: {str(e)}" | |