| from __future__ import annotations |
|
|
| import json |
| import os |
| from typing import Optional |
|
|
| import requests |
| from smolagents.tools import Tool |
|
|
|
|
| class BrightDataScraperTool(Tool): |
| name = "brightdata_web_scraper" |
| description = """ |
| Scrape any webpage and return content in Markdown format. |
| This tool can bypass bot detection and CAPTCHAs. |
| Use this when you need to extract content from websites. |
| """ |
| output_type = "string" |
|
|
| def __init__(self) -> None: |
| self.inputs = { |
| "url": { |
| "type": "string", |
| "description": "The URL of the webpage to scrape", |
| } |
| } |
| super().__init__() |
|
|
| def forward(self, url) -> str: |
| url_str = self._coerce_url_input(url) |
|
|
| if not url_str: |
| return json.dumps({"error": "No valid URL provided"}) |
|
|
| api_token = os.getenv("BRIGHT_DATA_API_TOKEN") |
| unlocker_zone = os.getenv("BRIGHT_DATA_UNLOCKER_ZONE", "web_unlocker1") |
|
|
| if not api_token: |
| raise ValueError("BRIGHT_DATA_API_TOKEN not found in environment variables") |
|
|
| api_url = "https://api.brightdata.com/request" |
| headers = { |
| "Authorization": f"Bearer {api_token}", |
| "Content-Type": "application/json", |
| } |
|
|
| payload = { |
| "url": url_str, |
| "zone": unlocker_zone, |
| "format": "raw", |
| "data_format": "markdown", |
| } |
|
|
| try: |
| response = requests.post(api_url, json=payload, headers=headers, timeout=30) |
| response.raise_for_status() |
| return response.text |
| except requests.exceptions.RequestException as exc: |
| details = exc.response.text if getattr(exc, "response", None) is not None else "" |
| return json.dumps({"error": str(exc), "details": details}) |
|
|
| def _coerce_url_input(self, raw) -> Optional[str]: |
| |
| if isinstance(raw, str): |
| if raw.strip().startswith("{") and "orig_name" in raw: |
| parsed = self._parse_file_dict_string(raw) |
| if parsed: |
| raw = parsed |
| else: |
| return self._ensure_scheme(raw) |
| else: |
| return self._ensure_scheme(raw) |
|
|
| if isinstance(raw, dict): |
| orig_name = raw.get("orig_name") |
| if isinstance(orig_name, str) and orig_name: |
| return self._ensure_scheme(orig_name) |
|
|
| url_value = raw.get("url") |
| if isinstance(url_value, str): |
| if url_value.startswith(("http://", "https://")): |
| return url_value |
| return None |
|
|
| return None |
|
|
| def _ensure_scheme(self, url: str) -> str: |
| if url.startswith(("http://", "https://")): |
| return url |
| return f"https://{url}" |
|
|
| def _parse_file_dict_string(self, value: str) -> Optional[dict]: |
| import ast |
|
|
| try: |
| parsed = ast.literal_eval(value) |
| return parsed if isinstance(parsed, dict) else None |
| except (ValueError, SyntaxError): |
| return None |
|
|