| | from __future__ import annotations |
| |
|
| | import ipaddress |
| | import os |
| | import re |
| | import typing |
| | from urllib.request import getproxies |
| |
|
| | from ._types import PrimitiveData |
| |
|
| | if typing.TYPE_CHECKING: |
| | from ._urls import URL |
| |
|
| |
|
| | def primitive_value_to_str(value: PrimitiveData) -> str: |
| | """ |
| | Coerce a primitive data type into a string value. |
| | |
| | Note that we prefer JSON-style 'true'/'false' for boolean values here. |
| | """ |
| | if value is True: |
| | return "true" |
| | elif value is False: |
| | return "false" |
| | elif value is None: |
| | return "" |
| | return str(value) |
| |
|
| |
|
| | def get_environment_proxies() -> dict[str, str | None]: |
| | """Gets proxy information from the environment""" |
| |
|
| | |
| | |
| | |
| | |
| | proxy_info = getproxies() |
| | mounts: dict[str, str | None] = {} |
| |
|
| | for scheme in ("http", "https", "all"): |
| | if proxy_info.get(scheme): |
| | hostname = proxy_info[scheme] |
| | mounts[f"{scheme}://"] = ( |
| | hostname if "://" in hostname else f"http://{hostname}" |
| | ) |
| |
|
| | no_proxy_hosts = [host.strip() for host in proxy_info.get("no", "").split(",")] |
| | for hostname in no_proxy_hosts: |
| | |
| | |
| | if hostname == "*": |
| | |
| | |
| | |
| | |
| | return {} |
| | elif hostname: |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | if "://" in hostname: |
| | mounts[hostname] = None |
| | elif is_ipv4_hostname(hostname): |
| | mounts[f"all://{hostname}"] = None |
| | elif is_ipv6_hostname(hostname): |
| | mounts[f"all://[{hostname}]"] = None |
| | elif hostname.lower() == "localhost": |
| | mounts[f"all://{hostname}"] = None |
| | else: |
| | mounts[f"all://*{hostname}"] = None |
| |
|
| | return mounts |
| |
|
| |
|
| | def to_bytes(value: str | bytes, encoding: str = "utf-8") -> bytes: |
| | return value.encode(encoding) if isinstance(value, str) else value |
| |
|
| |
|
| | def to_str(value: str | bytes, encoding: str = "utf-8") -> str: |
| | return value if isinstance(value, str) else value.decode(encoding) |
| |
|
| |
|
| | def to_bytes_or_str(value: str, match_type_of: typing.AnyStr) -> typing.AnyStr: |
| | return value if isinstance(match_type_of, str) else value.encode() |
| |
|
| |
|
| | def unquote(value: str) -> str: |
| | return value[1:-1] if value[0] == value[-1] == '"' else value |
| |
|
| |
|
| | def peek_filelike_length(stream: typing.Any) -> int | None: |
| | """ |
| | Given a file-like stream object, return its length in number of bytes |
| | without reading it into memory. |
| | """ |
| | try: |
| | |
| | fd = stream.fileno() |
| | |
| | length = os.fstat(fd).st_size |
| | except (AttributeError, OSError): |
| | |
| | try: |
| | |
| | |
| | offset = stream.tell() |
| | length = stream.seek(0, os.SEEK_END) |
| | stream.seek(offset) |
| | except (AttributeError, OSError): |
| | |
| | return None |
| |
|
| | return length |
| |
|
| |
|
| | class URLPattern: |
| | """ |
| | A utility class currently used for making lookups against proxy keys... |
| | |
| | # Wildcard matching... |
| | >>> pattern = URLPattern("all://") |
| | >>> pattern.matches(httpx.URL("http://example.com")) |
| | True |
| | |
| | # Witch scheme matching... |
| | >>> pattern = URLPattern("https://") |
| | >>> pattern.matches(httpx.URL("https://example.com")) |
| | True |
| | >>> pattern.matches(httpx.URL("http://example.com")) |
| | False |
| | |
| | # With domain matching... |
| | >>> pattern = URLPattern("https://example.com") |
| | >>> pattern.matches(httpx.URL("https://example.com")) |
| | True |
| | >>> pattern.matches(httpx.URL("http://example.com")) |
| | False |
| | >>> pattern.matches(httpx.URL("https://other.com")) |
| | False |
| | |
| | # Wildcard scheme, with domain matching... |
| | >>> pattern = URLPattern("all://example.com") |
| | >>> pattern.matches(httpx.URL("https://example.com")) |
| | True |
| | >>> pattern.matches(httpx.URL("http://example.com")) |
| | True |
| | >>> pattern.matches(httpx.URL("https://other.com")) |
| | False |
| | |
| | # With port matching... |
| | >>> pattern = URLPattern("https://example.com:1234") |
| | >>> pattern.matches(httpx.URL("https://example.com:1234")) |
| | True |
| | >>> pattern.matches(httpx.URL("https://example.com")) |
| | False |
| | """ |
| |
|
| | def __init__(self, pattern: str) -> None: |
| | from ._urls import URL |
| |
|
| | if pattern and ":" not in pattern: |
| | raise ValueError( |
| | f"Proxy keys should use proper URL forms rather " |
| | f"than plain scheme strings. " |
| | f'Instead of "{pattern}", use "{pattern}://"' |
| | ) |
| |
|
| | url = URL(pattern) |
| | self.pattern = pattern |
| | self.scheme = "" if url.scheme == "all" else url.scheme |
| | self.host = "" if url.host == "*" else url.host |
| | self.port = url.port |
| | if not url.host or url.host == "*": |
| | self.host_regex: typing.Pattern[str] | None = None |
| | elif url.host.startswith("*."): |
| | |
| | domain = re.escape(url.host[2:]) |
| | self.host_regex = re.compile(f"^.+\\.{domain}$") |
| | elif url.host.startswith("*"): |
| | |
| | domain = re.escape(url.host[1:]) |
| | self.host_regex = re.compile(f"^(.+\\.)?{domain}$") |
| | else: |
| | |
| | domain = re.escape(url.host) |
| | self.host_regex = re.compile(f"^{domain}$") |
| |
|
| | def matches(self, other: URL) -> bool: |
| | if self.scheme and self.scheme != other.scheme: |
| | return False |
| | if ( |
| | self.host |
| | and self.host_regex is not None |
| | and not self.host_regex.match(other.host) |
| | ): |
| | return False |
| | if self.port is not None and self.port != other.port: |
| | return False |
| | return True |
| |
|
| | @property |
| | def priority(self) -> tuple[int, int, int]: |
| | """ |
| | The priority allows URLPattern instances to be sortable, so that |
| | we can match from most specific to least specific. |
| | """ |
| | |
| | port_priority = 0 if self.port is not None else 1 |
| | |
| | host_priority = -len(self.host) |
| | |
| | scheme_priority = -len(self.scheme) |
| | return (port_priority, host_priority, scheme_priority) |
| |
|
| | def __hash__(self) -> int: |
| | return hash(self.pattern) |
| |
|
| | def __lt__(self, other: URLPattern) -> bool: |
| | return self.priority < other.priority |
| |
|
| | def __eq__(self, other: typing.Any) -> bool: |
| | return isinstance(other, URLPattern) and self.pattern == other.pattern |
| |
|
| |
|
| | def is_ipv4_hostname(hostname: str) -> bool: |
| | try: |
| | ipaddress.IPv4Address(hostname.split("/")[0]) |
| | except Exception: |
| | return False |
| | return True |
| |
|
| |
|
| | def is_ipv6_hostname(hostname: str) -> bool: |
| | try: |
| | ipaddress.IPv6Address(hostname.split("/")[0]) |
| | except Exception: |
| | return False |
| | return True |
| |
|