Spaces:
Sleeping
Sleeping
| import subprocess | |
| import tempfile | |
| import os | |
| from io import StringIO | |
| import re | |
| from functools import lru_cache | |
| from typing import Optional, Dict, Union | |
| from browserforge.headers import Browser, HeaderGenerator | |
| from tldextract import extract | |
| from markitdown import MarkItDown | |
| from markdown import Markdown | |
| import brotli | |
| import zstandard as zstd | |
| import gzip | |
| import zlib | |
| from urllib.parse import unquote | |
| from smolagents import tool | |
| class Response: | |
| def __init__(self, response, convert_to_markdown, convert_to_plain_text): | |
| self._response = response | |
| self._convert_to_markdown = convert_to_markdown | |
| self._convert_to_plain_text = convert_to_plain_text | |
| self._markdown = None | |
| self._plain_text = None | |
| def __getattr__(self, item): | |
| return getattr(self._response, item) | |
| def markdown(self) -> str: | |
| if self._markdown is None: | |
| self._markdown = self._convert_to_markdown(self._response.content) | |
| return self._markdown | |
| def plain_text(self) -> str: | |
| if self._plain_text is None: | |
| self._plain_text = self._convert_to_plain_text(self._response.content) | |
| return self._plain_text | |
| def generate_headers() -> Dict[str, str]: | |
| browsers = [ | |
| Browser(name='chrome', min_version=120), | |
| Browser(name='firefox', min_version=120), | |
| Browser(name='edge', min_version=120), | |
| ] | |
| return HeaderGenerator(browser=browsers, device='desktop').generate() | |
| def generate_convincing_referer(url: str) -> str: | |
| website_name = extract(url).domain | |
| return f'https://www.google.com/search?q={website_name}' | |
| def headers_job(headers: Optional[Dict], url: str) -> Dict: | |
| headers = headers or {} | |
| # Ensure a User-Agent is present. | |
| headers['User-Agent'] = generate_headers().get('User-Agent') | |
| extra_headers = generate_headers() | |
| headers.update(extra_headers) | |
| headers.update({'referer': generate_convincing_referer(url)}) | |
| return headers | |
| def convert_to_markdown(content: bytes) -> str: | |
| md = MarkItDown() | |
| temp_path = None | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False) as tmp_file: | |
| tmp_file.write(content) | |
| tmp_file.flush() | |
| temp_path = tmp_file.name | |
| markdown_result = md.convert_local(temp_path).text_content | |
| return markdown_result | |
| except Exception as e: | |
| raise e | |
| finally: | |
| if temp_path and os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| def convert_to_plain_text(content: bytes) -> str: | |
| md_content = convert_to_markdown(content) | |
| def unmark_element(element, stream=None): | |
| if stream is None: | |
| stream = StringIO() | |
| if element.text: | |
| stream.write(element.text) | |
| for sub in element: | |
| unmark_element(sub, stream) | |
| if element.tail: | |
| stream.write(element.tail) | |
| return stream.getvalue() | |
| Markdown.output_formats["plain"] = unmark_element | |
| __md = Markdown(output_format="plain") | |
| __md.stripTopLevelTags = False | |
| final_text = __md.convert(md_content) | |
| final_text = re.sub(r"\n+", " ", final_text) | |
| return final_text | |
| class BasicScraper: | |
| """Basic scraper class for making HTTP requests using curl.""" | |
| def __init__( | |
| self, | |
| proxy: Optional[str] = None, | |
| follow_redirects: bool = True, | |
| timeout: Optional[Union[int, float]] = None, | |
| retries: Optional[int] = 3 | |
| ): | |
| self.proxy = proxy | |
| self.timeout = timeout | |
| self.follow_redirects = bool(follow_redirects) | |
| self.retries = retries | |
| def _curl_get( | |
| self, | |
| url: str, | |
| headers: Dict[str, str], | |
| cookies: Optional[Dict], | |
| timeout: Optional[Union[int, float]], | |
| proxy: Optional[str], | |
| follow_redirects: bool | |
| ) -> bytes: | |
| # Use -i to include HTTP headers in the output. | |
| curl_command = ["curl", "-s", "-i"] | |
| if follow_redirects: | |
| curl_command.append("-L") | |
| if self.retries: | |
| curl_command.extend(["--retry", str(self.retries)]) | |
| # Add headers. | |
| for key, value in headers.items(): | |
| curl_command.extend(["-H", f"{key}: {value}"]) | |
| # Add cookies if provided. | |
| if cookies: | |
| cookie_str = "; ".join([f"{k}={v}" for k, v in cookies.items()]) | |
| curl_command.extend(["--cookie", cookie_str]) | |
| # Set proxy if specified. | |
| if proxy: | |
| curl_command.extend(["--proxy", proxy]) | |
| # Set timeout options. | |
| if timeout: | |
| curl_command.extend(["--connect-timeout", str(timeout), "--max-time", str(timeout)]) | |
| curl_command.append(url) | |
| try: | |
| result = subprocess.run( | |
| curl_command, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| check=False | |
| ) | |
| if result.returncode != 0: | |
| raise Exception(f"Curl command failed: {result.stderr.decode('utf-8')}") | |
| raw_response = result.stdout | |
| # Split the response into header blocks and body. | |
| parts = raw_response.split(b'\r\n\r\n') | |
| if len(parts) >= 2: | |
| body = parts[-1] | |
| last_header_block = parts[-2] | |
| else: | |
| body = raw_response | |
| last_header_block = b"" | |
| # Look for a Content-Encoding header in the last header block. | |
| content_encoding = None | |
| for line in last_header_block.decode('utf-8', errors='ignore').splitlines(): | |
| if line.lower().startswith("content-encoding:"): | |
| content_encoding = line.split(":", 1)[1].strip().lower() | |
| break | |
| # Decode Brotli or Zstandard if needed. | |
| if content_encoding: | |
| try: | |
| if 'br' in content_encoding: | |
| body = brotli.decompress(body) | |
| elif 'zstd' in content_encoding: | |
| dctx = zstd.ZstdDecompressor() | |
| try: | |
| body = dctx.decompress(body) | |
| except zstd.ZstdError as e: | |
| # Fallback to streaming decompression if content size is unknown | |
| if "could not determine content size" in str(e): | |
| dctx_stream = zstd.ZstdDecompressor().decompressobj() | |
| body = dctx_stream.decompress(body) | |
| body += dctx_stream.flush() | |
| else: | |
| raise | |
| elif 'gzip' in content_encoding: | |
| body = gzip.decompress(body) | |
| elif 'deflate' in content_encoding: | |
| body = zlib.decompress(body) | |
| except Exception as e: | |
| raise Exception(f"Error decompressing content: {e}") | |
| return body | |
| except Exception as e: | |
| raise Exception(f"Error during curl request: {e}") | |
| def get( | |
| self, | |
| url: str, | |
| cookies: Optional[Dict] = None, | |
| timeout: Optional[Union[int, float]] = None, | |
| **kwargs: Dict | |
| ) -> Response: | |
| url = unquote(url).replace(" ", "+") | |
| hdrs = headers_job(kwargs.pop('headers', {}), url) | |
| effective_timeout = self.timeout if self.timeout is not None else timeout | |
| content = self._curl_get( | |
| url, | |
| headers=hdrs, | |
| cookies=cookies, | |
| timeout=effective_timeout, | |
| proxy=self.proxy, | |
| follow_redirects=self.follow_redirects | |
| ) | |
| # Create a dummy response object with a 'content' attribute. | |
| class DummyResponse: | |
| pass | |
| dummy = DummyResponse() | |
| dummy.content = content | |
| return Response( | |
| response=dummy, | |
| convert_to_markdown=convert_to_markdown, | |
| convert_to_plain_text=convert_to_plain_text | |
| ) |