File size: 11,089 Bytes
50df397
 
968a67a
50df397
 
968a67a
50df397
968a67a
 
 
 
 
 
 
 
 
 
 
ca6fbc3
968a67a
 
 
50df397
 
 
968a67a
50df397
 
 
9ff7774
 
968a67a
 
 
ca6fbc3
968a67a
 
 
 
ca6fbc3
 
50df397
 
 
968a67a
 
 
 
ca6fbc3
968a67a
ca6fbc3
968a67a
 
 
 
 
 
 
ca6fbc3
968a67a
 
 
 
 
 
50df397
968a67a
 
ca6fbc3
968a67a
 
ca6fbc3
968a67a
 
 
 
 
 
 
 
ca6fbc3
968a67a
 
 
 
 
 
 
ca6fbc3
968a67a
 
ca6fbc3
 
 
968a67a
 
 
 
ca6fbc3
968a67a
 
ca6fbc3
968a67a
 
ca6fbc3
968a67a
 
 
 
 
 
 
 
 
ca6fbc3
968a67a
 
 
ca6fbc3
50df397
968a67a
 
9ff7774
968a67a
 
50df397
968a67a
 
ca6fbc3
968a67a
 
 
 
ca6fbc3
 
 
 
968a67a
 
 
 
 
 
9ff7774
968a67a
 
 
 
9ff7774
968a67a
 
 
ca6fbc3
968a67a
 
 
ca6fbc3
50df397
ca6fbc3
 
 
 
968a67a
 
 
 
ca6fbc3
50df397
ca6fbc3
50df397
968a67a
50df397
 
ca6fbc3
 
 
968a67a
 
ca6fbc3
 
 
 
968a67a
ca6fbc3
968a67a
 
 
 
 
ca6fbc3
968a67a
 
 
ca6fbc3
968a67a
 
ca6fbc3
968a67a
ca6fbc3
 
 
 
 
 
 
 
968a67a
 
 
ca6fbc3
968a67a
 
ca6fbc3
968a67a
 
ca6fbc3
 
 
50df397
968a67a
ca6fbc3
50df397
968a67a
50df397
968a67a
 
ca6fbc3
50df397
968a67a
50df397
 
968a67a
 
ca6fbc3
968a67a
 
 
ca6fbc3
968a67a
 
ca6fbc3
968a67a
 
ca6fbc3
968a67a
 
 
ca6fbc3
968a67a
 
 
ca6fbc3
50df397
 
968a67a
50df397
968a67a
 
 
50df397
968a67a
 
 
 
ca6fbc3
50df397
ca6fbc3
50df397
968a67a
ca6fbc3
 
 
 
968a67a
 
 
 
 
 
ca6fbc3
50df397
ca6fbc3
50df397
968a67a
 
 
 
 
 
 
ca6fbc3
50df397
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
from smolagents import Tool
import os
import time
import tempfile
from transformers import pipeline
from typing import List, Dict
from PIL import Image
import io

# Import required browser automation libraries
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import helium


class WebVideoAnalyzerTool(Tool):
    name = "web_video_analyzer"
    description = "Analyzes a video on a webpage (YouTube, Vimeo, etc.) by taking screenshots at intervals and counting objects of a specified type in each frame."
    inputs = {
        "url": {
            "type": "string",
            "description": "The URL of the web page containing the video to analyze.",
        },
        "label": {
            "type": "string",
            "description": "The type of object to count (e.g., 'bird', 'person', 'car', 'dog'). Use common object names recognized by standard object detection models.",
        },
        "duration": {
            "type": "integer",
            "description": "How many seconds of the video to analyze (default: 30)",
            "nullable": True,
        },
        "interval": {
            "type": "integer",
            "description": "How often to take screenshots (in seconds, default: 1)",
            "nullable": True,
        },
    }
    output_type = "string"

    def _setup_browser(self):
        """Initialize the browser with appropriate settings."""
        if self.driver is not None:
            return self.driver

        print("Setting up browser...")

        # Configure Chrome options
        chrome_options = webdriver.ChromeOptions()
        chrome_options.add_argument("--force-device-scale-factor=1")
        chrome_options.add_argument("--window-size=1280,720")
        chrome_options.add_argument("--disable-pdf-viewer")
        chrome_options.add_argument("--window-position=0,0")
        chrome_options.add_argument("--autoplay-policy=no-user-gesture-required")

        # Initialize the driver
        self.driver = helium.start_chrome(headless=False, options=chrome_options)
        return self.driver

    def _navigate_to_video(self, url: str) -> bool:
        """Navigate to the video URL and prepare for playback."""
        try:
            print(f"Navigating to {url}...")
            helium.go_to(url)

            # Wait for page to load
            time.sleep(3)

            # Handle YouTube-specific interactions
            if "youtube.com" in url:
                try:
                    # Accept cookies if prompted
                    if helium.Button("Accept all").exists():
                        helium.click("Accept all")
                    elif helium.Button("I agree").exists():
                        helium.click("I agree")

                    # Click on the video to ensure it's playing
                    try:
                        # Find the video player element
                        video_element = WebDriverWait(self.driver, 10).until(
                            EC.presence_of_element_located((By.TAG_NAME, "video"))
                        )
                        video_element.click()

                        # Ensure the video is playing by trying to click the play button if visible
                        try:
                            play_button = self.driver.find_element(
                                By.CLASS_NAME, "ytp-play-button"
                            )
                            if "Play" in play_button.get_attribute("aria-label"):
                                play_button.click()
                        except:
                            pass

                    except:
                        print("Could not locate video element to click")

                except Exception as e:
                    print(f"Error during YouTube setup: {str(e)}")

            # General approach - try to find and click on any video element
            else:
                try:
                    # Try to find video element
                    video_elements = self.driver.find_elements(By.TAG_NAME, "video")
                    if video_elements:
                        video_elements[0].click()
                except Exception as e:
                    print(f"Could not find or click video element: {str(e)}")

            # Allow video to start
            time.sleep(2)
            return True

        except Exception as e:
            print(f"Error navigating to {url}: {str(e)}")
            return False

    def _close_popups(self):
        """Attempt to close any popups or overlays."""
        try:
            # Try pressing Escape key to close general popups
            webdriver.ActionChains(self.driver).send_keys(Keys.ESCAPE).perform()

            # YouTube-specific: try to close any visible dialog or popup
            if "youtube.com" in self.driver.current_url:
                # Try to find and click close buttons on popups
                try:
                    close_buttons = self.driver.find_elements(
                        By.CSS_SELECTOR,
                        "button.ytp-ad-overlay-close-button, button.ytp-ad-skip-button",
                    )
                    for button in close_buttons:
                        button.click()
                except:
                    pass
        except Exception as e:
            print(f"Error closing popups: {str(e)}")

    def _take_screenshot(self) -> Image.Image:
        """Take a screenshot of the current browser window."""
        png_bytes = self.driver.get_screenshot_as_png()
        return Image.open(io.BytesIO(png_bytes))

    def _analyze_screenshot(self, image: Image.Image, label: str) -> int:
        """Count objects of the specified label in a screenshot."""
        detector = pipeline("object-detection", model="facebook/detr-resnet-50")

        try:
            # Run detection on the image
            results = detector(image)

            # Count objects matching the label
            object_count = sum(
                1 for result in results if label.lower() in result["label"].lower()
            )

            # Debug: print detected classes
            detected_classes = [result["label"] for result in results]
            if detected_classes:
                print(f"Detected classes: {', '.join(detected_classes)}")

            return object_count

        except Exception as e:
            print(f"Error detecting objects in screenshot: {str(e)}")
            return 0

    def _capture_video_frames(
        self, duration: int = 30, interval: int = 1, label: str = ""
    ) -> List[Dict]:
        """Capture frames from the video at regular intervals."""
        results = []

        print(
            f"Starting frame capture for {duration} seconds with {interval} second intervals..."
        )
        temp_dir = tempfile.mkdtemp()

        for seconds_elapsed in range(0, duration, interval):
            # Take screenshot
            try:
                print(f"Capturing frame at {seconds_elapsed} seconds...")
                screenshot = self._take_screenshot()

                # Save screenshot for debugging (optional)
                screenshot_path = os.path.join(temp_dir, f"frame_{seconds_elapsed}.jpg")
                screenshot.save(screenshot_path)

                # Analyze screenshot
                object_count = self._analyze_screenshot(screenshot, label)

                # Store results
                results.append(
                    {
                        "time": seconds_elapsed,
                        "object_count": object_count,
                        "screenshot_path": screenshot_path,
                    }
                )

                # Wait for next interval
                if seconds_elapsed + interval < duration:
                    time.sleep(interval)

            except Exception as e:
                print(f"Error capturing frame at {seconds_elapsed} seconds: {str(e)}")

        return results

    def forward(
        self, url: str, label: str, duration: int = 30, interval: int = 1
    ) -> str:
        """
        Analyzes a video on a webpage by taking screenshots and counting objects.

        Args:
            url (str): The URL of the webpage containing the video.
            label (str): The type of object to count (e.g., 'bird', 'person', 'car', 'dog').
            duration (int): How many seconds of the video to analyze.
            interval (int): How often to take screenshots (in seconds).

        Returns:
            str: A detailed report of object counts over time.
        """
        try:
            # Setup the browser
            self._setup_browser()

            # Navigate to the video
            if not self._navigate_to_video(url):
                return f"Error: Could not navigate to or play the video at {url}"

            # Close any popups or overlays
            self._close_popups()

            # Capture and analyze frames
            frame_results = self._capture_video_frames(duration, interval, label)

            # Calculate summary statistics
            if not frame_results:
                return f"Error: No frames were successfully captured and analyzed"

            total_objects = sum(result["object_count"] for result in frame_results)
            avg_objects = total_objects / len(frame_results)
            max_objects = max(frame_results, key=lambda x: x["object_count"])

            # Generate a report
            report = [
                f"# {label.title()} Count Analysis for Video",
                f"Video URL: {url}",
                f"Analysis duration: {duration} seconds",
                f"Screenshots taken: {len(frame_results)} (every {interval} second(s))",
                "",
                "## Summary",
                f"Total {label}s detected: {total_objects}",
                f"Average {label}s per screenshot: {avg_objects:.2f}",
                f"Maximum {label}s in a single screenshot: {max_objects['object_count']} (at {max_objects['time']} seconds)",
                "",
                "## Time-based Analysis",
            ]

            # Add frame-by-frame details
            for result in frame_results:
                report.append(
                    f"Time {result['time']} seconds: {result['object_count']} {label}s"
                )

            # Clean up
            try:
                helium.kill_browser()
                self.driver = None
            except:
                print("Warning: Could not properly close the browser")

            return "\n".join(report)

        except Exception as e:
            # Ensure browser is closed on error
            try:
                if self.driver:
                    helium.kill_browser()
                    self.driver = None
            except:
                pass

            return f"Error analyzing video: {str(e)}"