# -*- coding: utf-8 -*- from sys import stderr from functools import wraps from re import sub as re_sub from collections import namedtuple from shlex import split as shlex_split from inspect import signature, Parameter from tempfile import mkstemp as make_temp_file from argparse import ArgumentParser, SUPPRESS from webbrowser import open as open_in_browser from urllib.parse import urlparse, urlunparse, parse_qsl from logging import ( DEBUG, INFO, WARNING, ERROR, CRITICAL, FATAL, getLogger, getLevelName, ) from orjson import loads as json_loads, JSONDecodeError from ._shell_signatures import Signatures_map from scrapling import __version__ from scrapling.core.utils import log from scrapling.parser import Selector, Selectors from scrapling.core.custom_types import TextHandler from scrapling.engines.toolbelt.custom import Response from scrapling.core.utils._shell import _ParseHeaders, _CookieParser from scrapling.core._types import ( Callable, Dict, Any, cast, Optional, Generator, extraction_types, ) _known_logging_levels = { "debug": DEBUG, "info": INFO, "warning": WARNING, "error": ERROR, "critical": CRITICAL, "fatal": FATAL, } # Define the structure for parsed context - Simplified for Fetcher args Request = namedtuple( "Request", [ "method", "url", "params", "data", # Can be str, bytes, or dict (for urlencoded) "json_data", # Python object (dict/list) for JSON payload "headers", "cookies", "proxy", "follow_redirects", # Added for -L flag ], ) # Suppress exit on error to handle parsing errors gracefully class NoExitArgumentParser(ArgumentParser): # pragma: no cover def error(self, message): log.error(f"Curl arguments parsing error: {message}") raise ValueError(f"Curl arguments parsing error: {message}") def exit(self, status=0, message=None): if message: log.error(f"Scrapling shell exited with status {status}: {message}") self._print_message(message, stderr) raise ValueError(f"Scrapling shell exited with status {status}: {message or 'Unknown reason'}") class CurlParser: """Builds the argument parser for relevant curl flags from DevTools.""" def __init__(self) -> None: from scrapling.fetchers import Fetcher as __Fetcher self.__fetcher = __Fetcher # We will use argparse parser to parse the curl command directly instead of regex # We will focus more on flags that will show up on curl commands copied from DevTools's network tab _parser = NoExitArgumentParser(add_help=False) # Disable default help # Basic curl arguments _parser.add_argument("curl_command_placeholder", nargs="?", help=SUPPRESS) _parser.add_argument("url") _parser.add_argument("-X", "--request", dest="method", default=None) _parser.add_argument("-H", "--header", action="append", default=[]) _parser.add_argument( "-A", "--user-agent", help="Will be parsed from -H if present" ) # Note: DevTools usually includes this in -H # Data arguments (prioritizing types common from DevTools) _parser.add_argument("-d", "--data", default=None) _parser.add_argument("--data-raw", default=None) # Often used by browsers for JSON body _parser.add_argument("--data-binary", default=None) # Keep urlencode for completeness, though less common from browser copy/paste _parser.add_argument("--data-urlencode", action="append", default=[]) _parser.add_argument("-G", "--get", action="store_true") # Use GET and put data in URL _parser.add_argument( "-b", "--cookie", default=None, help="Send cookies from string/file (string format used by DevTools)", ) # Proxy _parser.add_argument("-x", "--proxy", default=None) _parser.add_argument("-U", "--proxy-user", default=None) # Basic proxy auth # Connection/Security _parser.add_argument("-k", "--insecure", action="store_true") _parser.add_argument("--compressed", action="store_true") # Very common from browsers # Other flags often included but may not map directly to request args _parser.add_argument("-i", "--include", action="store_true") _parser.add_argument("-s", "--silent", action="store_true") _parser.add_argument("-v", "--verbose", action="store_true") self.parser: NoExitArgumentParser = _parser self._supported_methods = ("get", "post", "put", "delete") # --- Main Parsing Logic --- def parse(self, curl_command: str) -> Optional[Request]: """Parses the curl command string into a structured context for Fetcher.""" clean_command = curl_command.strip().lstrip("curl").strip().replace("\\\n", " ") try: tokens = shlex_split(clean_command) # Split the string using shell-like syntax except ValueError as e: # pragma: no cover log.error(f"Could not split command line: {e}") return None try: parsed_args, unknown = self.parser.parse_known_args(tokens) if unknown: raise AttributeError(f"Unknown/Unsupported curl arguments: {unknown}") except ValueError: # pragma: no cover return None except AttributeError: raise except Exception as e: # pragma: no cover log.error(f"An unexpected error occurred during curl arguments parsing: {e}") return None # --- Determine Method --- method = "get" # Default if parsed_args.get: # `-G` forces GET method = "get" elif parsed_args.method: method = parsed_args.method.strip().lower() # Infer POST if data is present (unless overridden by -X or -G) elif any( [ parsed_args.data, parsed_args.data_raw, parsed_args.data_binary, parsed_args.data_urlencode, ] ): method = "post" headers, cookies = _ParseHeaders(parsed_args.header) if parsed_args.cookie: # We are focusing on the string format from DevTools. try: for key, value in _CookieParser(parsed_args.cookie): # Update the cookie dict, potentially overwriting cookies with the same name from -H 'cookie:' cookies[key] = value log.debug(f"Parsed cookies from -b argument: {list(cookies.keys())}") except Exception as e: # pragma: no cover log.error(f"Could not parse cookie string from -b '{parsed_args.cookie}': {e}") # --- Process Data Payload --- params = dict() data_payload: Optional[str | bytes | Dict] = None json_payload: Optional[Any] = None # DevTools often uses --data-raw for JSON bodies # Precedence: --data-binary > --data-raw / -d > --data-urlencode if parsed_args.data_binary is not None: # pragma: no cover try: data_payload = parsed_args.data_binary.encode("utf-8") log.debug("Using data from --data-binary as bytes.") except Exception as e: log.warning( f"Could not encode binary data '{parsed_args.data_binary}' as bytes: {e}. Using raw string." ) data_payload = parsed_args.data_binary # Fallback to string elif parsed_args.data_raw is not None: data_payload = parsed_args.data_raw.lstrip("$") elif parsed_args.data is not None: data_payload = parsed_args.data elif parsed_args.data_urlencode: # pragma: no cover # Combine and parse urlencoded data combined_data = "&".join(parsed_args.data_urlencode) try: data_payload = dict(parse_qsl(combined_data, keep_blank_values=True)) except Exception as e: log.warning(f"Could not parse urlencoded data '{combined_data}': {e}. Treating as raw string.") data_payload = combined_data # Check if raw data looks like JSON, prefer 'json' param if so if isinstance(data_payload, str): try: maybe_json = json_loads(data_payload) if isinstance(maybe_json, (dict, list)): json_payload = maybe_json data_payload = None except JSONDecodeError: pass # Not JSON, keep it in data_payload # Handle `-G`: Move data to params if the method is GET if method == "get" and data_payload: # pragma: no cover if isinstance(data_payload, dict): # From --data-urlencode likely params.update(data_payload) elif isinstance(data_payload, str): try: params.update(dict(parse_qsl(data_payload, keep_blank_values=True))) except ValueError: log.warning(f"Could not parse data '{data_payload}' into GET parameters for -G.") if params: data_payload = None # Clear data as it's moved to params json_payload = None # Should not have JSON body with -G # --- Process Proxy --- proxies: Optional[Dict[str, str]] = None if parsed_args.proxy: proxy_url = f"http://{parsed_args.proxy}" if "://" not in parsed_args.proxy else parsed_args.proxy if parsed_args.proxy_user: user_pass = parsed_args.proxy_user parts = urlparse(proxy_url) netloc_parts = parts.netloc.split("@") netloc = f"{user_pass}@{netloc_parts[-1]}" if len(netloc_parts) > 1 else f"{user_pass}@{parts.netloc}" proxy_url = urlunparse( ( parts.scheme, netloc, parts.path, parts.params, parts.query, parts.fragment, ) ) # Standard proxy dict format proxies = {"http": proxy_url, "https": proxy_url} log.debug(f"Using proxy configuration: {proxies}") # --- Final Context --- return Request( method=method, url=parsed_args.url, params=params, data=data_payload, json_data=json_payload, headers=headers, cookies=cookies, proxy=proxies, follow_redirects=True, # Scrapling default is True ) def convert2fetcher(self, curl_command: Request | str) -> Optional[Response]: if isinstance(curl_command, (Request, str)): request = self.parse(curl_command) if isinstance(curl_command, str) else curl_command # Ensure request parsing was successful before proceeding if request is None: # pragma: no cover log.error("Failed to parse curl command, cannot convert to fetcher.") return None request_args = request._asdict() method = request_args.pop("method").strip().lower() if method in self._supported_methods: request_args["json"] = request_args.pop("json_data") # Ensure data/json are removed for non-POST/PUT methods if method not in ("post", "put"): _ = request_args.pop("data", None) _ = request_args.pop("json", None) try: return getattr(self.__fetcher, method)(**request_args) except Exception as e: # pragma: no cover log.error(f"Error calling Fetcher.{method}: {e}") return None else: # pragma: no cover log.error(f'Request method "{method}" isn\'t supported by Scrapling yet') return None else: # pragma: no cover log.error("Input must be a valid curl command string or a Request object.") return None def _unpack_signature(func, signature_name=None): """ Unpack TypedDict from Unpack[TypedDict] annotations in **kwargs and reconstruct the signature. This allows the interactive shell to show individual parameters instead of just **kwargs, similar to how IDEs display them. """ try: sig = signature(func) func_name = signature_name or getattr(func, "__name__", None) # Check if this function has known parameters if func_name not in Signatures_map: return sig new_params = [] for param in sig.parameters.values(): if param.kind == Parameter.VAR_KEYWORD: # Replace **kwargs with individual keyword-only parameters for field_name, field_type in Signatures_map[func_name].items(): new_params.append( Parameter(field_name, Parameter.KEYWORD_ONLY, default=Parameter.empty, annotation=field_type) ) else: new_params.append(param) # Reconstruct signature with unpacked parameters if len(new_params) != len(sig.parameters): return sig.replace(parameters=new_params) return sig except Exception: # pragma: no cover return signature(func) def show_page_in_browser(page: Selector): # pragma: no cover if not page or not isinstance(page, Selector): log.error("Input must be of type `Selector`") return try: fd, fname = make_temp_file(prefix="scrapling_view_", suffix=".html") with open(fd, "w", encoding=page.encoding) as f: f.write(page.html_content) open_in_browser(f"file://{fname}") except IOError as e: log.error(f"Failed to write temporary file for viewing: {e}") except Exception as e: log.error(f"An unexpected error occurred while viewing the page: {e}") class CustomShell: """A custom IPython shell with minimal dependencies""" def __init__(self, code, log_level="debug"): from IPython.terminal.embed import InteractiveShellEmbed as __InteractiveShellEmbed from scrapling.fetchers import ( Fetcher as __Fetcher, AsyncFetcher as __AsyncFetcher, FetcherSession as __FetcherSession, DynamicFetcher as __DynamicFetcher, DynamicSession as __DynamicSession, AsyncDynamicSession as __AsyncDynamicSession, StealthyFetcher as __StealthyFetcher, StealthySession as __StealthySession, AsyncStealthySession as __AsyncStealthySession, ) self.__InteractiveShellEmbed = __InteractiveShellEmbed self.__Fetcher = __Fetcher self.__AsyncFetcher = __AsyncFetcher self.__FetcherSession = __FetcherSession self.__DynamicFetcher = __DynamicFetcher self.__DynamicSession = __DynamicSession self.__AsyncDynamicSession = __AsyncDynamicSession self.__StealthyFetcher = __StealthyFetcher self.__StealthySession = __StealthySession self.__AsyncStealthySession = __AsyncStealthySession self.code = code self.page = None self.pages = Selectors([]) self._curl_parser = CurlParser() log_level = log_level.strip().lower() if _known_logging_levels.get(log_level): self.log_level = _known_logging_levels[log_level] else: # pragma: no cover log.warning(f'Unknown log level "{log_level}", defaulting to "DEBUG"') self.log_level = DEBUG self.shell = None # Initialize your application components self.init_components() def init_components(self): """Initialize application components""" # This is where you'd set up your application-specific objects if self.log_level: getLogger("scrapling").setLevel(self.log_level) settings = self.__Fetcher.display_config() settings.pop("storage", None) settings.pop("storage_args", None) log.info(f"Scrapling {__version__} shell started") log.info(f"Logging level is set to '{getLevelName(self.log_level)}'") log.info(f"Fetchers' parsing settings: {settings}") @staticmethod def banner(): """Create a custom banner for the shell""" return f""" -> Available Scrapling objects: - Fetcher/AsyncFetcher/FetcherSession - DynamicFetcher/DynamicSession/AsyncDynamicSession - StealthyFetcher/StealthySession/AsyncStealthySession - Selector -> Useful shortcuts: - {"get":<30} Shortcut for `Fetcher.get` - {"post":<30} Shortcut for `Fetcher.post` - {"put":<30} Shortcut for `Fetcher.put` - {"delete":<30} Shortcut for `Fetcher.delete` - {"fetch":<30} Shortcut for `DynamicFetcher.fetch` - {"stealthy_fetch":<30} Shortcut for `StealthyFetcher.fetch` -> Useful commands - {"page / response":<30} The response object of the last page you fetched - {"pages":<30} Selectors object of the last 5 response objects you fetched - {"uncurl('curl_command')":<30} Convert curl command to a Request object. (Optimized to handle curl commands copied from DevTools network tab.) - {"curl2fetcher('curl_command')":<30} Convert curl command and make the request with Fetcher. (Optimized to handle curl commands copied from DevTools network tab.) - {"view(page)":<30} View page in a browser - {"help()":<30} Show this help message (Shell help) Type 'exit' or press Ctrl+D to exit. """ def update_page(self, result): # pragma: no cover """Update the current page and add to pages history""" self.page = result if isinstance(result, (Response, Selector)): self.pages.append(result) if len(self.pages) > 5: self.pages.pop(0) # Remove the oldest item # Update in IPython namespace too if self.shell: self.shell.user_ns["page"] = self.page self.shell.user_ns["response"] = self.page self.shell.user_ns["pages"] = self.pages return result def create_wrapper( self, func: Callable, get_signature: bool = True, signature_name: Optional[str] = None ) -> Callable: """Create a wrapper that preserves function signature but updates page""" @wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: result = func(*args, **kwargs) return self.update_page(result) if get_signature: # Explicitly preserve and unpack signature for IPython introspection and autocompletion setattr(wrapper, "__signature__", _unpack_signature(func, signature_name)) else: setattr(wrapper, "__signature__", signature(func)) return wrapper def get_namespace(self): """Create a namespace with application-specific objects""" # Create wrapped versions of fetch functions get = self.create_wrapper(self.__Fetcher.get) post = self.create_wrapper(self.__Fetcher.post) put = self.create_wrapper(self.__Fetcher.put) delete = self.create_wrapper(self.__Fetcher.delete) dynamic_fetch = self.create_wrapper(self.__DynamicFetcher.fetch) stealthy_fetch = self.create_wrapper(self.__StealthyFetcher.fetch, signature_name="stealthy_fetch") curl2fetcher = self.create_wrapper(self._curl_parser.convert2fetcher, get_signature=False) # Create the namespace dictionary return { "get": get, "post": post, "put": put, "delete": delete, "Fetcher": self.__Fetcher, "AsyncFetcher": self.__AsyncFetcher, "FetcherSession": self.__FetcherSession, "DynamicSession": self.__DynamicSession, "AsyncDynamicSession": self.__AsyncDynamicSession, "StealthySession": self.__StealthySession, "AsyncStealthySession": self.__AsyncStealthySession, "fetch": dynamic_fetch, "DynamicFetcher": self.__DynamicFetcher, "stealthy_fetch": stealthy_fetch, "StealthyFetcher": self.__StealthyFetcher, "Selector": Selector, "page": self.page, "response": self.page, "pages": self.pages, "view": show_page_in_browser, "uncurl": self._curl_parser.parse, "curl2fetcher": curl2fetcher, "help": self.show_help, } def show_help(self): # pragma: no cover """Show help information""" print(self.banner()) def start(self): # pragma: no cover """Start the interactive shell""" # Get our namespace with application objects namespace = self.get_namespace() ipython_shell = self.__InteractiveShellEmbed( banner1=self.banner(), banner2="", enable_tip=False, exit_msg="Bye Bye", user_ns=namespace, ) self.shell = ipython_shell # If a command was provided, execute it and exit if self.code: log.info(f"Executing provided code: {self.code}") try: ipython_shell.run_cell(self.code, store_history=False) except Exception as e: log.error(f"Error executing initial code: {e}") return ipython_shell() class Convertor: """Utils for the extract shell command""" _extension_map: Dict[str, extraction_types] = { "md": "markdown", "html": "html", "txt": "text", } @classmethod def _convert_to_markdown(cls, body: TextHandler) -> str: """Convert HTML content to Markdown""" from markdownify import markdownify return markdownify(body) @classmethod def _extract_content( cls, page: Selector, extraction_type: extraction_types = "markdown", css_selector: Optional[str] = None, main_content_only: bool = False, ) -> Generator[str, None, None]: """Extract the content of a Selector""" if not page or not isinstance(page, Selector): # pragma: no cover raise TypeError("Input must be of type `Selector`") elif not extraction_type or extraction_type not in cls._extension_map.values(): raise ValueError(f"Unknown extraction type: {extraction_type}") else: if main_content_only: page = cast(Selector, page.css("body").first) or page pages = [page] if not css_selector else cast(Selectors, page.css(css_selector)) for page in pages: match extraction_type: case "markdown": yield cls._convert_to_markdown(page.html_content) case "html": yield page.html_content case "text": txt_content = page.get_all_text(strip=True) for s in ( "\n", "\r", "\t", " ", ): # Remove consecutive white-spaces txt_content = TextHandler(re_sub(f"[{s}]+", s, txt_content)) yield txt_content yield "" @classmethod def write_content_to_file(cls, page: Selector, filename: str, css_selector: Optional[str] = None) -> None: """Write a Selector's content to a file""" if not page or not isinstance(page, Selector): # pragma: no cover raise TypeError("Input must be of type `Selector`") elif not filename or not isinstance(filename, str) or not filename.strip(): raise ValueError("Filename must be provided") elif not filename.endswith((".md", ".html", ".txt")): raise ValueError("Unknown file type: filename must end with '.md', '.html', or '.txt'") else: with open(filename, "w", encoding=page.encoding) as f: extension = filename.split(".")[-1] f.write( "".join( cls._extract_content( page, cls._extension_map[extension], css_selector=css_selector, ) ) )