| | import logging |
| | import shutil |
| | import sys |
| | import textwrap |
| | import xmlrpc.client |
| | from collections import OrderedDict |
| | from optparse import Values |
| | from typing import TYPE_CHECKING, Dict, List, Optional |
| |
|
| | from pip._vendor.packaging.version import parse as parse_version |
| |
|
| | from pip._internal.cli.base_command import Command |
| | from pip._internal.cli.req_command import SessionCommandMixin |
| | from pip._internal.cli.status_codes import NO_MATCHES_FOUND, SUCCESS |
| | from pip._internal.exceptions import CommandError |
| | from pip._internal.metadata import get_default_environment |
| | from pip._internal.models.index import PyPI |
| | from pip._internal.network.xmlrpc import PipXmlrpcTransport |
| | from pip._internal.utils.logging import indent_log |
| | from pip._internal.utils.misc import write_output |
| |
|
| | if TYPE_CHECKING: |
| | from typing import TypedDict |
| |
|
| | class TransformedHit(TypedDict): |
| | name: str |
| | summary: str |
| | versions: List[str] |
| |
|
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class SearchCommand(Command, SessionCommandMixin): |
| | """Search for PyPI packages whose name or summary contains <query>.""" |
| |
|
| | usage = """ |
| | %prog [options] <query>""" |
| | ignore_require_venv = True |
| |
|
| | def add_options(self) -> None: |
| | self.cmd_opts.add_option( |
| | "-i", |
| | "--index", |
| | dest="index", |
| | metavar="URL", |
| | default=PyPI.pypi_url, |
| | help="Base URL of Python Package Index (default %default)", |
| | ) |
| |
|
| | self.parser.insert_option_group(0, self.cmd_opts) |
| |
|
| | def run(self, options: Values, args: List[str]) -> int: |
| | if not args: |
| | raise CommandError("Missing required argument (search query).") |
| | query = args |
| | pypi_hits = self.search(query, options) |
| | hits = transform_hits(pypi_hits) |
| |
|
| | terminal_width = None |
| | if sys.stdout.isatty(): |
| | terminal_width = shutil.get_terminal_size()[0] |
| |
|
| | print_results(hits, terminal_width=terminal_width) |
| | if pypi_hits: |
| | return SUCCESS |
| | return NO_MATCHES_FOUND |
| |
|
| | def search(self, query: List[str], options: Values) -> List[Dict[str, str]]: |
| | index_url = options.index |
| |
|
| | session = self.get_default_session(options) |
| |
|
| | transport = PipXmlrpcTransport(index_url, session) |
| | pypi = xmlrpc.client.ServerProxy(index_url, transport) |
| | try: |
| | hits = pypi.search({"name": query, "summary": query}, "or") |
| | except xmlrpc.client.Fault as fault: |
| | message = "XMLRPC request failed [code: {code}]\n{string}".format( |
| | code=fault.faultCode, |
| | string=fault.faultString, |
| | ) |
| | raise CommandError(message) |
| | assert isinstance(hits, list) |
| | return hits |
| |
|
| |
|
| | def transform_hits(hits: List[Dict[str, str]]) -> List["TransformedHit"]: |
| | """ |
| | The list from pypi is really a list of versions. We want a list of |
| | packages with the list of versions stored inline. This converts the |
| | list from pypi into one we can use. |
| | """ |
| | packages: Dict[str, "TransformedHit"] = OrderedDict() |
| | for hit in hits: |
| | name = hit["name"] |
| | summary = hit["summary"] |
| | version = hit["version"] |
| |
|
| | if name not in packages.keys(): |
| | packages[name] = { |
| | "name": name, |
| | "summary": summary, |
| | "versions": [version], |
| | } |
| | else: |
| | packages[name]["versions"].append(version) |
| |
|
| | |
| | if version == highest_version(packages[name]["versions"]): |
| | packages[name]["summary"] = summary |
| |
|
| | return list(packages.values()) |
| |
|
| |
|
| | def print_dist_installation_info(name: str, latest: str) -> None: |
| | env = get_default_environment() |
| | dist = env.get_distribution(name) |
| | if dist is not None: |
| | with indent_log(): |
| | if dist.version == latest: |
| | write_output("INSTALLED: %s (latest)", dist.version) |
| | else: |
| | write_output("INSTALLED: %s", dist.version) |
| | if parse_version(latest).pre: |
| | write_output( |
| | "LATEST: %s (pre-release; install" |
| | " with `pip install --pre`)", |
| | latest, |
| | ) |
| | else: |
| | write_output("LATEST: %s", latest) |
| |
|
| |
|
| | def print_results( |
| | hits: List["TransformedHit"], |
| | name_column_width: Optional[int] = None, |
| | terminal_width: Optional[int] = None, |
| | ) -> None: |
| | if not hits: |
| | return |
| | if name_column_width is None: |
| | name_column_width = ( |
| | max( |
| | [ |
| | len(hit["name"]) + len(highest_version(hit.get("versions", ["-"]))) |
| | for hit in hits |
| | ] |
| | ) |
| | + 4 |
| | ) |
| |
|
| | for hit in hits: |
| | name = hit["name"] |
| | summary = hit["summary"] or "" |
| | latest = highest_version(hit.get("versions", ["-"])) |
| | if terminal_width is not None: |
| | target_width = terminal_width - name_column_width - 5 |
| | if target_width > 10: |
| | |
| | summary_lines = textwrap.wrap(summary, target_width) |
| | summary = ("\n" + " " * (name_column_width + 3)).join(summary_lines) |
| |
|
| | name_latest = f"{name} ({latest})" |
| | line = f"{name_latest:{name_column_width}} - {summary}" |
| | try: |
| | write_output(line) |
| | print_dist_installation_info(name, latest) |
| | except UnicodeEncodeError: |
| | pass |
| |
|
| |
|
| | def highest_version(versions: List[str]) -> str: |
| | return max(versions, key=parse_version) |
| |
|