| | import email.message |
| | import email.parser |
| | import logging |
| | import os |
| | import zipfile |
| | from typing import Collection, Iterable, Iterator, List, Mapping, NamedTuple, Optional |
| |
|
| | from pip._vendor import pkg_resources |
| | from pip._vendor.packaging.requirements import Requirement |
| | from pip._vendor.packaging.utils import NormalizedName, canonicalize_name |
| | from pip._vendor.packaging.version import parse as parse_version |
| |
|
| | from pip._internal.exceptions import InvalidWheel, NoneMetadataError, UnsupportedWheel |
| | from pip._internal.utils.egg_link import egg_link_path_from_location |
| | from pip._internal.utils.misc import display_path, normalize_path |
| | from pip._internal.utils.wheel import parse_wheel, read_wheel_metadata_file |
| |
|
| | from .base import ( |
| | BaseDistribution, |
| | BaseEntryPoint, |
| | BaseEnvironment, |
| | DistributionVersion, |
| | InfoPath, |
| | Wheel, |
| | ) |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class EntryPoint(NamedTuple): |
| | name: str |
| | value: str |
| | group: str |
| |
|
| |
|
| | class InMemoryMetadata: |
| | """IMetadataProvider that reads metadata files from a dictionary. |
| | |
| | This also maps metadata decoding exceptions to our internal exception type. |
| | """ |
| |
|
| | def __init__(self, metadata: Mapping[str, bytes], wheel_name: str) -> None: |
| | self._metadata = metadata |
| | self._wheel_name = wheel_name |
| |
|
| | def has_metadata(self, name: str) -> bool: |
| | return name in self._metadata |
| |
|
| | def get_metadata(self, name: str) -> str: |
| | try: |
| | return self._metadata[name].decode() |
| | except UnicodeDecodeError as e: |
| | |
| | raise UnsupportedWheel( |
| | f"Error decoding metadata for {self._wheel_name}: {e} in {name} file" |
| | ) |
| |
|
| | def get_metadata_lines(self, name: str) -> Iterable[str]: |
| | return pkg_resources.yield_lines(self.get_metadata(name)) |
| |
|
| | def metadata_isdir(self, name: str) -> bool: |
| | return False |
| |
|
| | def metadata_listdir(self, name: str) -> List[str]: |
| | return [] |
| |
|
| | def run_script(self, script_name: str, namespace: str) -> None: |
| | pass |
| |
|
| |
|
| | class Distribution(BaseDistribution): |
| | def __init__(self, dist: pkg_resources.Distribution) -> None: |
| | self._dist = dist |
| |
|
| | @classmethod |
| | def from_directory(cls, directory: str) -> BaseDistribution: |
| | dist_dir = directory.rstrip(os.sep) |
| |
|
| | |
| | base_dir, dist_dir_name = os.path.split(dist_dir) |
| | metadata = pkg_resources.PathMetadata(base_dir, dist_dir) |
| |
|
| | |
| | if dist_dir.endswith(".egg-info"): |
| | dist_cls = pkg_resources.Distribution |
| | dist_name = os.path.splitext(dist_dir_name)[0] |
| | else: |
| | assert dist_dir.endswith(".dist-info") |
| | dist_cls = pkg_resources.DistInfoDistribution |
| | dist_name = os.path.splitext(dist_dir_name)[0].split("-")[0] |
| |
|
| | dist = dist_cls(base_dir, project_name=dist_name, metadata=metadata) |
| | return cls(dist) |
| |
|
| | @classmethod |
| | def from_metadata_file_contents( |
| | cls, |
| | metadata_contents: bytes, |
| | filename: str, |
| | project_name: str, |
| | ) -> BaseDistribution: |
| | metadata_dict = { |
| | "METADATA": metadata_contents, |
| | } |
| | dist = pkg_resources.DistInfoDistribution( |
| | location=filename, |
| | metadata=InMemoryMetadata(metadata_dict, filename), |
| | project_name=project_name, |
| | ) |
| | return cls(dist) |
| |
|
| | @classmethod |
| | def from_wheel(cls, wheel: Wheel, name: str) -> BaseDistribution: |
| | try: |
| | with wheel.as_zipfile() as zf: |
| | info_dir, _ = parse_wheel(zf, name) |
| | metadata_dict = { |
| | path.split("/", 1)[-1]: read_wheel_metadata_file(zf, path) |
| | for path in zf.namelist() |
| | if path.startswith(f"{info_dir}/") |
| | } |
| | except zipfile.BadZipFile as e: |
| | raise InvalidWheel(wheel.location, name) from e |
| | except UnsupportedWheel as e: |
| | raise UnsupportedWheel(f"{name} has an invalid wheel, {e}") |
| | dist = pkg_resources.DistInfoDistribution( |
| | location=wheel.location, |
| | metadata=InMemoryMetadata(metadata_dict, wheel.location), |
| | project_name=name, |
| | ) |
| | return cls(dist) |
| |
|
| | @property |
| | def location(self) -> Optional[str]: |
| | return self._dist.location |
| |
|
| | @property |
| | def installed_location(self) -> Optional[str]: |
| | egg_link = egg_link_path_from_location(self.raw_name) |
| | if egg_link: |
| | location = egg_link |
| | elif self.location: |
| | location = self.location |
| | else: |
| | return None |
| | return normalize_path(location) |
| |
|
| | @property |
| | def info_location(self) -> Optional[str]: |
| | return self._dist.egg_info |
| |
|
| | @property |
| | def installed_by_distutils(self) -> bool: |
| | |
| | |
| | |
| | try: |
| | return bool(self._dist._provider.path) |
| | except AttributeError: |
| | return False |
| |
|
| | @property |
| | def canonical_name(self) -> NormalizedName: |
| | return canonicalize_name(self._dist.project_name) |
| |
|
| | @property |
| | def version(self) -> DistributionVersion: |
| | return parse_version(self._dist.version) |
| |
|
| | def is_file(self, path: InfoPath) -> bool: |
| | return self._dist.has_metadata(str(path)) |
| |
|
| | def iter_distutils_script_names(self) -> Iterator[str]: |
| | yield from self._dist.metadata_listdir("scripts") |
| |
|
| | def read_text(self, path: InfoPath) -> str: |
| | name = str(path) |
| | if not self._dist.has_metadata(name): |
| | raise FileNotFoundError(name) |
| | content = self._dist.get_metadata(name) |
| | if content is None: |
| | raise NoneMetadataError(self, name) |
| | return content |
| |
|
| | def iter_entry_points(self) -> Iterable[BaseEntryPoint]: |
| | for group, entries in self._dist.get_entry_map().items(): |
| | for name, entry_point in entries.items(): |
| | name, _, value = str(entry_point).partition("=") |
| | yield EntryPoint(name=name.strip(), value=value.strip(), group=group) |
| |
|
| | def _metadata_impl(self) -> email.message.Message: |
| | """ |
| | :raises NoneMetadataError: if the distribution reports `has_metadata()` |
| | True but `get_metadata()` returns None. |
| | """ |
| | if isinstance(self._dist, pkg_resources.DistInfoDistribution): |
| | metadata_name = "METADATA" |
| | else: |
| | metadata_name = "PKG-INFO" |
| | try: |
| | metadata = self.read_text(metadata_name) |
| | except FileNotFoundError: |
| | if self.location: |
| | displaying_path = display_path(self.location) |
| | else: |
| | displaying_path = repr(self.location) |
| | logger.warning("No metadata found in %s", displaying_path) |
| | metadata = "" |
| | feed_parser = email.parser.FeedParser() |
| | feed_parser.feed(metadata) |
| | return feed_parser.close() |
| |
|
| | def iter_dependencies(self, extras: Collection[str] = ()) -> Iterable[Requirement]: |
| | if extras: |
| | extras = frozenset(extras).intersection(self._dist.extras) |
| | return self._dist.requires(extras) |
| |
|
| | def iter_provided_extras(self) -> Iterable[str]: |
| | return self._dist.extras |
| |
|
| |
|
| | class Environment(BaseEnvironment): |
| | def __init__(self, ws: pkg_resources.WorkingSet) -> None: |
| | self._ws = ws |
| |
|
| | @classmethod |
| | def default(cls) -> BaseEnvironment: |
| | return cls(pkg_resources.working_set) |
| |
|
| | @classmethod |
| | def from_paths(cls, paths: Optional[List[str]]) -> BaseEnvironment: |
| | return cls(pkg_resources.WorkingSet(paths)) |
| |
|
| | def _iter_distributions(self) -> Iterator[BaseDistribution]: |
| | for dist in self._ws: |
| | yield Distribution(dist) |
| |
|
| | def _search_distribution(self, name: str) -> Optional[BaseDistribution]: |
| | """Find a distribution matching the ``name`` in the environment. |
| | |
| | This searches from *all* distributions available in the environment, to |
| | match the behavior of ``pkg_resources.get_distribution()``. |
| | """ |
| | canonical_name = canonicalize_name(name) |
| | for dist in self.iter_all_distributions(): |
| | if dist.canonical_name == canonical_name: |
| | return dist |
| | return None |
| |
|
| | def get_distribution(self, name: str) -> Optional[BaseDistribution]: |
| | |
| | dist = self._search_distribution(name) |
| | if dist: |
| | return dist |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | try: |
| | |
| | |
| | self._ws.require(name) |
| | except pkg_resources.DistributionNotFound: |
| | return None |
| | return self._search_distribution(name) |
| |
|