Spaces:
Running
Running
| # -*- coding: utf-8 -*- | |
| from sys import stderr | |
| from functools import wraps | |
| from re import sub as re_sub | |
| from collections import namedtuple | |
| from shlex import split as shlex_split | |
| from inspect import signature, Parameter | |
| from tempfile import mkstemp as make_temp_file | |
| from argparse import ArgumentParser, SUPPRESS | |
| from webbrowser import open as open_in_browser | |
| from urllib.parse import urlparse, urlunparse, parse_qsl | |
| from logging import ( | |
| DEBUG, | |
| INFO, | |
| WARNING, | |
| ERROR, | |
| CRITICAL, | |
| FATAL, | |
| getLogger, | |
| getLevelName, | |
| ) | |
| from orjson import loads as json_loads, JSONDecodeError | |
| from ._shell_signatures import Signatures_map | |
| from scrapling import __version__ | |
| from scrapling.core.utils import log | |
| from scrapling.parser import Selector, Selectors | |
| from scrapling.core.custom_types import TextHandler | |
| from scrapling.engines.toolbelt.custom import Response | |
| from scrapling.core.utils._shell import _ParseHeaders, _CookieParser | |
| from scrapling.core._types import ( | |
| Callable, | |
| Dict, | |
| Any, | |
| cast, | |
| Optional, | |
| Generator, | |
| extraction_types, | |
| ) | |
| _known_logging_levels = { | |
| "debug": DEBUG, | |
| "info": INFO, | |
| "warning": WARNING, | |
| "error": ERROR, | |
| "critical": CRITICAL, | |
| "fatal": FATAL, | |
| } | |
| # Define the structure for parsed context - Simplified for Fetcher args | |
| Request = namedtuple( | |
| "Request", | |
| [ | |
| "method", | |
| "url", | |
| "params", | |
| "data", # Can be str, bytes, or dict (for urlencoded) | |
| "json_data", # Python object (dict/list) for JSON payload | |
| "headers", | |
| "cookies", | |
| "proxy", | |
| "follow_redirects", # Added for -L flag | |
| ], | |
| ) | |
| # Suppress exit on error to handle parsing errors gracefully | |
| class NoExitArgumentParser(ArgumentParser): # pragma: no cover | |
| def error(self, message): | |
| log.error(f"Curl arguments parsing error: {message}") | |
| raise ValueError(f"Curl arguments parsing error: {message}") | |
| def exit(self, status=0, message=None): | |
| if message: | |
| log.error(f"Scrapling shell exited with status {status}: {message}") | |
| self._print_message(message, stderr) | |
| raise ValueError(f"Scrapling shell exited with status {status}: {message or 'Unknown reason'}") | |
| class CurlParser: | |
| """Builds the argument parser for relevant curl flags from DevTools.""" | |
| def __init__(self) -> None: | |
| from scrapling.fetchers import Fetcher as __Fetcher | |
| self.__fetcher = __Fetcher | |
| # We will use argparse parser to parse the curl command directly instead of regex | |
| # We will focus more on flags that will show up on curl commands copied from DevTools's network tab | |
| _parser = NoExitArgumentParser(add_help=False) # Disable default help | |
| # Basic curl arguments | |
| _parser.add_argument("curl_command_placeholder", nargs="?", help=SUPPRESS) | |
| _parser.add_argument("url") | |
| _parser.add_argument("-X", "--request", dest="method", default=None) | |
| _parser.add_argument("-H", "--header", action="append", default=[]) | |
| _parser.add_argument( | |
| "-A", "--user-agent", help="Will be parsed from -H if present" | |
| ) # Note: DevTools usually includes this in -H | |
| # Data arguments (prioritizing types common from DevTools) | |
| _parser.add_argument("-d", "--data", default=None) | |
| _parser.add_argument("--data-raw", default=None) # Often used by browsers for JSON body | |
| _parser.add_argument("--data-binary", default=None) | |
| # Keep urlencode for completeness, though less common from browser copy/paste | |
| _parser.add_argument("--data-urlencode", action="append", default=[]) | |
| _parser.add_argument("-G", "--get", action="store_true") # Use GET and put data in URL | |
| _parser.add_argument( | |
| "-b", | |
| "--cookie", | |
| default=None, | |
| help="Send cookies from string/file (string format used by DevTools)", | |
| ) | |
| # Proxy | |
| _parser.add_argument("-x", "--proxy", default=None) | |
| _parser.add_argument("-U", "--proxy-user", default=None) # Basic proxy auth | |
| # Connection/Security | |
| _parser.add_argument("-k", "--insecure", action="store_true") | |
| _parser.add_argument("--compressed", action="store_true") # Very common from browsers | |
| # Other flags often included but may not map directly to request args | |
| _parser.add_argument("-i", "--include", action="store_true") | |
| _parser.add_argument("-s", "--silent", action="store_true") | |
| _parser.add_argument("-v", "--verbose", action="store_true") | |
| self.parser: NoExitArgumentParser = _parser | |
| self._supported_methods = ("get", "post", "put", "delete") | |
| # --- Main Parsing Logic --- | |
| def parse(self, curl_command: str) -> Optional[Request]: | |
| """Parses the curl command string into a structured context for Fetcher.""" | |
| clean_command = curl_command.strip().lstrip("curl").strip().replace("\\\n", " ") | |
| try: | |
| tokens = shlex_split(clean_command) # Split the string using shell-like syntax | |
| except ValueError as e: # pragma: no cover | |
| log.error(f"Could not split command line: {e}") | |
| return None | |
| try: | |
| parsed_args, unknown = self.parser.parse_known_args(tokens) | |
| if unknown: | |
| raise AttributeError(f"Unknown/Unsupported curl arguments: {unknown}") | |
| except ValueError: # pragma: no cover | |
| return None | |
| except AttributeError: | |
| raise | |
| except Exception as e: # pragma: no cover | |
| log.error(f"An unexpected error occurred during curl arguments parsing: {e}") | |
| return None | |
| # --- Determine Method --- | |
| method = "get" # Default | |
| if parsed_args.get: # `-G` forces GET | |
| method = "get" | |
| elif parsed_args.method: | |
| method = parsed_args.method.strip().lower() | |
| # Infer POST if data is present (unless overridden by -X or -G) | |
| elif any( | |
| [ | |
| parsed_args.data, | |
| parsed_args.data_raw, | |
| parsed_args.data_binary, | |
| parsed_args.data_urlencode, | |
| ] | |
| ): | |
| method = "post" | |
| headers, cookies = _ParseHeaders(parsed_args.header) | |
| if parsed_args.cookie: | |
| # We are focusing on the string format from DevTools. | |
| try: | |
| for key, value in _CookieParser(parsed_args.cookie): | |
| # Update the cookie dict, potentially overwriting cookies with the same name from -H 'cookie:' | |
| cookies[key] = value | |
| log.debug(f"Parsed cookies from -b argument: {list(cookies.keys())}") | |
| except Exception as e: # pragma: no cover | |
| log.error(f"Could not parse cookie string from -b '{parsed_args.cookie}': {e}") | |
| # --- Process Data Payload --- | |
| params = dict() | |
| data_payload: Optional[str | bytes | Dict] = None | |
| json_payload: Optional[Any] = None | |
| # DevTools often uses --data-raw for JSON bodies | |
| # Precedence: --data-binary > --data-raw / -d > --data-urlencode | |
| if parsed_args.data_binary is not None: # pragma: no cover | |
| try: | |
| data_payload = parsed_args.data_binary.encode("utf-8") | |
| log.debug("Using data from --data-binary as bytes.") | |
| except Exception as e: | |
| log.warning( | |
| f"Could not encode binary data '{parsed_args.data_binary}' as bytes: {e}. Using raw string." | |
| ) | |
| data_payload = parsed_args.data_binary # Fallback to string | |
| elif parsed_args.data_raw is not None: | |
| data_payload = parsed_args.data_raw.lstrip("$") | |
| elif parsed_args.data is not None: | |
| data_payload = parsed_args.data | |
| elif parsed_args.data_urlencode: # pragma: no cover | |
| # Combine and parse urlencoded data | |
| combined_data = "&".join(parsed_args.data_urlencode) | |
| try: | |
| data_payload = dict(parse_qsl(combined_data, keep_blank_values=True)) | |
| except Exception as e: | |
| log.warning(f"Could not parse urlencoded data '{combined_data}': {e}. Treating as raw string.") | |
| data_payload = combined_data | |
| # Check if raw data looks like JSON, prefer 'json' param if so | |
| if isinstance(data_payload, str): | |
| try: | |
| maybe_json = json_loads(data_payload) | |
| if isinstance(maybe_json, (dict, list)): | |
| json_payload = maybe_json | |
| data_payload = None | |
| except JSONDecodeError: | |
| pass # Not JSON, keep it in data_payload | |
| # Handle `-G`: Move data to params if the method is GET | |
| if method == "get" and data_payload: # pragma: no cover | |
| if isinstance(data_payload, dict): # From --data-urlencode likely | |
| params.update(data_payload) | |
| elif isinstance(data_payload, str): | |
| try: | |
| params.update(dict(parse_qsl(data_payload, keep_blank_values=True))) | |
| except ValueError: | |
| log.warning(f"Could not parse data '{data_payload}' into GET parameters for -G.") | |
| if params: | |
| data_payload = None # Clear data as it's moved to params | |
| json_payload = None # Should not have JSON body with -G | |
| # --- Process Proxy --- | |
| proxies: Optional[Dict[str, str]] = None | |
| if parsed_args.proxy: | |
| proxy_url = f"http://{parsed_args.proxy}" if "://" not in parsed_args.proxy else parsed_args.proxy | |
| if parsed_args.proxy_user: | |
| user_pass = parsed_args.proxy_user | |
| parts = urlparse(proxy_url) | |
| netloc_parts = parts.netloc.split("@") | |
| netloc = f"{user_pass}@{netloc_parts[-1]}" if len(netloc_parts) > 1 else f"{user_pass}@{parts.netloc}" | |
| proxy_url = urlunparse( | |
| ( | |
| parts.scheme, | |
| netloc, | |
| parts.path, | |
| parts.params, | |
| parts.query, | |
| parts.fragment, | |
| ) | |
| ) | |
| # Standard proxy dict format | |
| proxies = {"http": proxy_url, "https": proxy_url} | |
| log.debug(f"Using proxy configuration: {proxies}") | |
| # --- Final Context --- | |
| return Request( | |
| method=method, | |
| url=parsed_args.url, | |
| params=params, | |
| data=data_payload, | |
| json_data=json_payload, | |
| headers=headers, | |
| cookies=cookies, | |
| proxy=proxies, | |
| follow_redirects=True, # Scrapling default is True | |
| ) | |
| def convert2fetcher(self, curl_command: Request | str) -> Optional[Response]: | |
| if isinstance(curl_command, (Request, str)): | |
| request = self.parse(curl_command) if isinstance(curl_command, str) else curl_command | |
| # Ensure request parsing was successful before proceeding | |
| if request is None: # pragma: no cover | |
| log.error("Failed to parse curl command, cannot convert to fetcher.") | |
| return None | |
| request_args = request._asdict() | |
| method = request_args.pop("method").strip().lower() | |
| if method in self._supported_methods: | |
| request_args["json"] = request_args.pop("json_data") | |
| # Ensure data/json are removed for non-POST/PUT methods | |
| if method not in ("post", "put"): | |
| _ = request_args.pop("data", None) | |
| _ = request_args.pop("json", None) | |
| try: | |
| return getattr(self.__fetcher, method)(**request_args) | |
| except Exception as e: # pragma: no cover | |
| log.error(f"Error calling Fetcher.{method}: {e}") | |
| return None | |
| else: # pragma: no cover | |
| log.error(f'Request method "{method}" isn\'t supported by Scrapling yet') | |
| return None | |
| else: # pragma: no cover | |
| log.error("Input must be a valid curl command string or a Request object.") | |
| return None | |
| def _unpack_signature(func, signature_name=None): | |
| """ | |
| Unpack TypedDict from Unpack[TypedDict] annotations in **kwargs and reconstruct the signature. | |
| This allows the interactive shell to show individual parameters instead of just **kwargs, similar to how IDEs display them. | |
| """ | |
| try: | |
| sig = signature(func) | |
| func_name = signature_name or getattr(func, "__name__", None) | |
| # Check if this function has known parameters | |
| if func_name not in Signatures_map: | |
| return sig | |
| new_params = [] | |
| for param in sig.parameters.values(): | |
| if param.kind == Parameter.VAR_KEYWORD: | |
| # Replace **kwargs with individual keyword-only parameters | |
| for field_name, field_type in Signatures_map[func_name].items(): | |
| new_params.append( | |
| Parameter(field_name, Parameter.KEYWORD_ONLY, default=Parameter.empty, annotation=field_type) | |
| ) | |
| else: | |
| new_params.append(param) | |
| # Reconstruct signature with unpacked parameters | |
| if len(new_params) != len(sig.parameters): | |
| return sig.replace(parameters=new_params) | |
| return sig | |
| except Exception: # pragma: no cover | |
| return signature(func) | |
| def show_page_in_browser(page: Selector): # pragma: no cover | |
| if not page or not isinstance(page, Selector): | |
| log.error("Input must be of type `Selector`") | |
| return | |
| try: | |
| fd, fname = make_temp_file(prefix="scrapling_view_", suffix=".html") | |
| with open(fd, "w", encoding=page.encoding) as f: | |
| f.write(page.html_content) | |
| open_in_browser(f"file://{fname}") | |
| except IOError as e: | |
| log.error(f"Failed to write temporary file for viewing: {e}") | |
| except Exception as e: | |
| log.error(f"An unexpected error occurred while viewing the page: {e}") | |
| class CustomShell: | |
| """A custom IPython shell with minimal dependencies""" | |
| def __init__(self, code, log_level="debug"): | |
| from IPython.terminal.embed import InteractiveShellEmbed as __InteractiveShellEmbed | |
| from scrapling.fetchers import ( | |
| Fetcher as __Fetcher, | |
| AsyncFetcher as __AsyncFetcher, | |
| FetcherSession as __FetcherSession, | |
| DynamicFetcher as __DynamicFetcher, | |
| DynamicSession as __DynamicSession, | |
| AsyncDynamicSession as __AsyncDynamicSession, | |
| StealthyFetcher as __StealthyFetcher, | |
| StealthySession as __StealthySession, | |
| AsyncStealthySession as __AsyncStealthySession, | |
| ) | |
| self.__InteractiveShellEmbed = __InteractiveShellEmbed | |
| self.__Fetcher = __Fetcher | |
| self.__AsyncFetcher = __AsyncFetcher | |
| self.__FetcherSession = __FetcherSession | |
| self.__DynamicFetcher = __DynamicFetcher | |
| self.__DynamicSession = __DynamicSession | |
| self.__AsyncDynamicSession = __AsyncDynamicSession | |
| self.__StealthyFetcher = __StealthyFetcher | |
| self.__StealthySession = __StealthySession | |
| self.__AsyncStealthySession = __AsyncStealthySession | |
| self.code = code | |
| self.page = None | |
| self.pages = Selectors([]) | |
| self._curl_parser = CurlParser() | |
| log_level = log_level.strip().lower() | |
| if _known_logging_levels.get(log_level): | |
| self.log_level = _known_logging_levels[log_level] | |
| else: # pragma: no cover | |
| log.warning(f'Unknown log level "{log_level}", defaulting to "DEBUG"') | |
| self.log_level = DEBUG | |
| self.shell = None | |
| # Initialize your application components | |
| self.init_components() | |
| def init_components(self): | |
| """Initialize application components""" | |
| # This is where you'd set up your application-specific objects | |
| if self.log_level: | |
| getLogger("scrapling").setLevel(self.log_level) | |
| settings = self.__Fetcher.display_config() | |
| settings.pop("storage", None) | |
| settings.pop("storage_args", None) | |
| log.info(f"Scrapling {__version__} shell started") | |
| log.info(f"Logging level is set to '{getLevelName(self.log_level)}'") | |
| log.info(f"Fetchers' parsing settings: {settings}") | |
| def banner(): | |
| """Create a custom banner for the shell""" | |
| return f""" | |
| -> Available Scrapling objects: | |
| - Fetcher/AsyncFetcher/FetcherSession | |
| - DynamicFetcher/DynamicSession/AsyncDynamicSession | |
| - StealthyFetcher/StealthySession/AsyncStealthySession | |
| - Selector | |
| -> Useful shortcuts: | |
| - {"get":<30} Shortcut for `Fetcher.get` | |
| - {"post":<30} Shortcut for `Fetcher.post` | |
| - {"put":<30} Shortcut for `Fetcher.put` | |
| - {"delete":<30} Shortcut for `Fetcher.delete` | |
| - {"fetch":<30} Shortcut for `DynamicFetcher.fetch` | |
| - {"stealthy_fetch":<30} Shortcut for `StealthyFetcher.fetch` | |
| -> Useful commands | |
| - {"page / response":<30} The response object of the last page you fetched | |
| - {"pages":<30} Selectors object of the last 5 response objects you fetched | |
| - {"uncurl('curl_command')":<30} Convert curl command to a Request object. (Optimized to handle curl commands copied from DevTools network tab.) | |
| - {"curl2fetcher('curl_command')":<30} Convert curl command and make the request with Fetcher. (Optimized to handle curl commands copied from DevTools network tab.) | |
| - {"view(page)":<30} View page in a browser | |
| - {"help()":<30} Show this help message (Shell help) | |
| Type 'exit' or press Ctrl+D to exit. | |
| """ | |
| def update_page(self, result): # pragma: no cover | |
| """Update the current page and add to pages history""" | |
| self.page = result | |
| if isinstance(result, (Response, Selector)): | |
| self.pages.append(result) | |
| if len(self.pages) > 5: | |
| self.pages.pop(0) # Remove the oldest item | |
| # Update in IPython namespace too | |
| if self.shell: | |
| self.shell.user_ns["page"] = self.page | |
| self.shell.user_ns["response"] = self.page | |
| self.shell.user_ns["pages"] = self.pages | |
| return result | |
| def create_wrapper( | |
| self, func: Callable, get_signature: bool = True, signature_name: Optional[str] = None | |
| ) -> Callable: | |
| """Create a wrapper that preserves function signature but updates page""" | |
| def wrapper(*args: Any, **kwargs: Any) -> Any: | |
| result = func(*args, **kwargs) | |
| return self.update_page(result) | |
| if get_signature: | |
| # Explicitly preserve and unpack signature for IPython introspection and autocompletion | |
| setattr(wrapper, "__signature__", _unpack_signature(func, signature_name)) | |
| else: | |
| setattr(wrapper, "__signature__", signature(func)) | |
| return wrapper | |
| def get_namespace(self): | |
| """Create a namespace with application-specific objects""" | |
| # Create wrapped versions of fetch functions | |
| get = self.create_wrapper(self.__Fetcher.get) | |
| post = self.create_wrapper(self.__Fetcher.post) | |
| put = self.create_wrapper(self.__Fetcher.put) | |
| delete = self.create_wrapper(self.__Fetcher.delete) | |
| dynamic_fetch = self.create_wrapper(self.__DynamicFetcher.fetch) | |
| stealthy_fetch = self.create_wrapper(self.__StealthyFetcher.fetch, signature_name="stealthy_fetch") | |
| curl2fetcher = self.create_wrapper(self._curl_parser.convert2fetcher, get_signature=False) | |
| # Create the namespace dictionary | |
| return { | |
| "get": get, | |
| "post": post, | |
| "put": put, | |
| "delete": delete, | |
| "Fetcher": self.__Fetcher, | |
| "AsyncFetcher": self.__AsyncFetcher, | |
| "FetcherSession": self.__FetcherSession, | |
| "DynamicSession": self.__DynamicSession, | |
| "AsyncDynamicSession": self.__AsyncDynamicSession, | |
| "StealthySession": self.__StealthySession, | |
| "AsyncStealthySession": self.__AsyncStealthySession, | |
| "fetch": dynamic_fetch, | |
| "DynamicFetcher": self.__DynamicFetcher, | |
| "stealthy_fetch": stealthy_fetch, | |
| "StealthyFetcher": self.__StealthyFetcher, | |
| "Selector": Selector, | |
| "page": self.page, | |
| "response": self.page, | |
| "pages": self.pages, | |
| "view": show_page_in_browser, | |
| "uncurl": self._curl_parser.parse, | |
| "curl2fetcher": curl2fetcher, | |
| "help": self.show_help, | |
| } | |
| def show_help(self): # pragma: no cover | |
| """Show help information""" | |
| print(self.banner()) | |
| def start(self): # pragma: no cover | |
| """Start the interactive shell""" | |
| # Get our namespace with application objects | |
| namespace = self.get_namespace() | |
| ipython_shell = self.__InteractiveShellEmbed( | |
| banner1=self.banner(), | |
| banner2="", | |
| enable_tip=False, | |
| exit_msg="Bye Bye", | |
| user_ns=namespace, | |
| ) | |
| self.shell = ipython_shell | |
| # If a command was provided, execute it and exit | |
| if self.code: | |
| log.info(f"Executing provided code: {self.code}") | |
| try: | |
| ipython_shell.run_cell(self.code, store_history=False) | |
| except Exception as e: | |
| log.error(f"Error executing initial code: {e}") | |
| return | |
| ipython_shell() | |
| class Convertor: | |
| """Utils for the extract shell command""" | |
| _extension_map: Dict[str, extraction_types] = { | |
| "md": "markdown", | |
| "html": "html", | |
| "txt": "text", | |
| } | |
| def _convert_to_markdown(cls, body: TextHandler) -> str: | |
| """Convert HTML content to Markdown""" | |
| from markdownify import markdownify | |
| return markdownify(body) | |
| def _extract_content( | |
| cls, | |
| page: Selector, | |
| extraction_type: extraction_types = "markdown", | |
| css_selector: Optional[str] = None, | |
| main_content_only: bool = False, | |
| ) -> Generator[str, None, None]: | |
| """Extract the content of a Selector""" | |
| if not page or not isinstance(page, Selector): # pragma: no cover | |
| raise TypeError("Input must be of type `Selector`") | |
| elif not extraction_type or extraction_type not in cls._extension_map.values(): | |
| raise ValueError(f"Unknown extraction type: {extraction_type}") | |
| else: | |
| if main_content_only: | |
| page = cast(Selector, page.css("body").first) or page | |
| pages = [page] if not css_selector else cast(Selectors, page.css(css_selector)) | |
| for page in pages: | |
| match extraction_type: | |
| case "markdown": | |
| yield cls._convert_to_markdown(page.html_content) | |
| case "html": | |
| yield page.html_content | |
| case "text": | |
| txt_content = page.get_all_text(strip=True) | |
| for s in ( | |
| "\n", | |
| "\r", | |
| "\t", | |
| " ", | |
| ): | |
| # Remove consecutive white-spaces | |
| txt_content = TextHandler(re_sub(f"[{s}]+", s, txt_content)) | |
| yield txt_content | |
| yield "" | |
| def write_content_to_file(cls, page: Selector, filename: str, css_selector: Optional[str] = None) -> None: | |
| """Write a Selector's content to a file""" | |
| if not page or not isinstance(page, Selector): # pragma: no cover | |
| raise TypeError("Input must be of type `Selector`") | |
| elif not filename or not isinstance(filename, str) or not filename.strip(): | |
| raise ValueError("Filename must be provided") | |
| elif not filename.endswith((".md", ".html", ".txt")): | |
| raise ValueError("Unknown file type: filename must end with '.md', '.html', or '.txt'") | |
| else: | |
| with open(filename, "w", encoding=page.encoding) as f: | |
| extension = filename.split(".")[-1] | |
| f.write( | |
| "".join( | |
| cls._extract_content( | |
| page, | |
| cls._extension_map[extension], | |
| css_selector=css_selector, | |
| ) | |
| ) | |
| ) | |