| | from __future__ import annotations |
| |
|
| | import collections.abc as cabc |
| | import enum |
| | import os |
| | import stat |
| | import sys |
| | import typing as t |
| | from datetime import datetime |
| | from gettext import gettext as _ |
| | from gettext import ngettext |
| |
|
| | from ._compat import _get_argv_encoding |
| | from ._compat import open_stream |
| | from .exceptions import BadParameter |
| | from .utils import format_filename |
| | from .utils import LazyFile |
| | from .utils import safecall |
| |
|
| | if t.TYPE_CHECKING: |
| | import typing_extensions as te |
| |
|
| | from .core import Context |
| | from .core import Parameter |
| | from .shell_completion import CompletionItem |
| |
|
| | ParamTypeValue = t.TypeVar("ParamTypeValue") |
| |
|
| |
|
| | class ParamType: |
| | """Represents the type of a parameter. Validates and converts values |
| | from the command line or Python into the correct type. |
| | |
| | To implement a custom type, subclass and implement at least the |
| | following: |
| | |
| | - The :attr:`name` class attribute must be set. |
| | - Calling an instance of the type with ``None`` must return |
| | ``None``. This is already implemented by default. |
| | - :meth:`convert` must convert string values to the correct type. |
| | - :meth:`convert` must accept values that are already the correct |
| | type. |
| | - It must be able to convert a value if the ``ctx`` and ``param`` |
| | arguments are ``None``. This can occur when converting prompt |
| | input. |
| | """ |
| |
|
| | is_composite: t.ClassVar[bool] = False |
| | arity: t.ClassVar[int] = 1 |
| |
|
| | |
| | name: str |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | envvar_list_splitter: t.ClassVar[str | None] = None |
| |
|
| | def to_info_dict(self) -> dict[str, t.Any]: |
| | """Gather information that could be useful for a tool generating |
| | user-facing documentation. |
| | |
| | Use :meth:`click.Context.to_info_dict` to traverse the entire |
| | CLI structure. |
| | |
| | .. versionadded:: 8.0 |
| | """ |
| | |
| | param_type = type(self).__name__.partition("ParamType")[0] |
| | param_type = param_type.partition("ParameterType")[0] |
| |
|
| | |
| | if hasattr(self, "name"): |
| | name = self.name |
| | else: |
| | name = param_type |
| |
|
| | return {"param_type": param_type, "name": name} |
| |
|
| | def __call__( |
| | self, |
| | value: t.Any, |
| | param: Parameter | None = None, |
| | ctx: Context | None = None, |
| | ) -> t.Any: |
| | if value is not None: |
| | return self.convert(value, param, ctx) |
| |
|
| | def get_metavar(self, param: Parameter, ctx: Context) -> str | None: |
| | """Returns the metavar default for this param if it provides one.""" |
| |
|
| | def get_missing_message(self, param: Parameter, ctx: Context | None) -> str | None: |
| | """Optionally might return extra information about a missing |
| | parameter. |
| | |
| | .. versionadded:: 2.0 |
| | """ |
| |
|
| | def convert( |
| | self, value: t.Any, param: Parameter | None, ctx: Context | None |
| | ) -> t.Any: |
| | """Convert the value to the correct type. This is not called if |
| | the value is ``None`` (the missing value). |
| | |
| | This must accept string values from the command line, as well as |
| | values that are already the correct type. It may also convert |
| | other compatible types. |
| | |
| | The ``param`` and ``ctx`` arguments may be ``None`` in certain |
| | situations, such as when converting prompt input. |
| | |
| | If the value cannot be converted, call :meth:`fail` with a |
| | descriptive message. |
| | |
| | :param value: The value to convert. |
| | :param param: The parameter that is using this type to convert |
| | its value. May be ``None``. |
| | :param ctx: The current context that arrived at this value. May |
| | be ``None``. |
| | """ |
| | return value |
| |
|
| | def split_envvar_value(self, rv: str) -> cabc.Sequence[str]: |
| | """Given a value from an environment variable this splits it up |
| | into small chunks depending on the defined envvar list splitter. |
| | |
| | If the splitter is set to `None`, which means that whitespace splits, |
| | then leading and trailing whitespace is ignored. Otherwise, leading |
| | and trailing splitters usually lead to empty items being included. |
| | """ |
| | return (rv or "").split(self.envvar_list_splitter) |
| |
|
| | def fail( |
| | self, |
| | message: str, |
| | param: Parameter | None = None, |
| | ctx: Context | None = None, |
| | ) -> t.NoReturn: |
| | """Helper method to fail with an invalid value message.""" |
| | raise BadParameter(message, ctx=ctx, param=param) |
| |
|
| | def shell_complete( |
| | self, ctx: Context, param: Parameter, incomplete: str |
| | ) -> list[CompletionItem]: |
| | """Return a list of |
| | :class:`~click.shell_completion.CompletionItem` objects for the |
| | incomplete value. Most types do not provide completions, but |
| | some do, and this allows custom types to provide custom |
| | completions as well. |
| | |
| | :param ctx: Invocation context for this command. |
| | :param param: The parameter that is requesting completion. |
| | :param incomplete: Value being completed. May be empty. |
| | |
| | .. versionadded:: 8.0 |
| | """ |
| | return [] |
| |
|
| |
|
| | class CompositeParamType(ParamType): |
| | is_composite = True |
| |
|
| | @property |
| | def arity(self) -> int: |
| | raise NotImplementedError() |
| |
|
| |
|
| | class FuncParamType(ParamType): |
| | def __init__(self, func: t.Callable[[t.Any], t.Any]) -> None: |
| | self.name: str = func.__name__ |
| | self.func = func |
| |
|
| | def to_info_dict(self) -> dict[str, t.Any]: |
| | info_dict = super().to_info_dict() |
| | info_dict["func"] = self.func |
| | return info_dict |
| |
|
| | def convert( |
| | self, value: t.Any, param: Parameter | None, ctx: Context | None |
| | ) -> t.Any: |
| | try: |
| | return self.func(value) |
| | except ValueError: |
| | try: |
| | value = str(value) |
| | except UnicodeError: |
| | value = value.decode("utf-8", "replace") |
| |
|
| | self.fail(value, param, ctx) |
| |
|
| |
|
| | class UnprocessedParamType(ParamType): |
| | name = "text" |
| |
|
| | def convert( |
| | self, value: t.Any, param: Parameter | None, ctx: Context | None |
| | ) -> t.Any: |
| | return value |
| |
|
| | def __repr__(self) -> str: |
| | return "UNPROCESSED" |
| |
|
| |
|
| | class StringParamType(ParamType): |
| | name = "text" |
| |
|
| | def convert( |
| | self, value: t.Any, param: Parameter | None, ctx: Context | None |
| | ) -> t.Any: |
| | if isinstance(value, bytes): |
| | enc = _get_argv_encoding() |
| | try: |
| | value = value.decode(enc) |
| | except UnicodeError: |
| | fs_enc = sys.getfilesystemencoding() |
| | if fs_enc != enc: |
| | try: |
| | value = value.decode(fs_enc) |
| | except UnicodeError: |
| | value = value.decode("utf-8", "replace") |
| | else: |
| | value = value.decode("utf-8", "replace") |
| | return value |
| | return str(value) |
| |
|
| | def __repr__(self) -> str: |
| | return "STRING" |
| |
|
| |
|
| | class Choice(ParamType, t.Generic[ParamTypeValue]): |
| | """The choice type allows a value to be checked against a fixed set |
| | of supported values. |
| | |
| | You may pass any iterable value which will be converted to a tuple |
| | and thus will only be iterated once. |
| | |
| | The resulting value will always be one of the originally passed choices. |
| | See :meth:`normalize_choice` for more info on the mapping of strings |
| | to choices. See :ref:`choice-opts` for an example. |
| | |
| | :param case_sensitive: Set to false to make choices case |
| | insensitive. Defaults to true. |
| | |
| | .. versionchanged:: 8.2.0 |
| | Non-``str`` ``choices`` are now supported. It can additionally be any |
| | iterable. Before you were not recommended to pass anything but a list or |
| | tuple. |
| | |
| | .. versionadded:: 8.2.0 |
| | Choice normalization can be overridden via :meth:`normalize_choice`. |
| | """ |
| |
|
| | name = "choice" |
| |
|
| | def __init__( |
| | self, choices: cabc.Iterable[ParamTypeValue], case_sensitive: bool = True |
| | ) -> None: |
| | self.choices: cabc.Sequence[ParamTypeValue] = tuple(choices) |
| | self.case_sensitive = case_sensitive |
| |
|
| | def to_info_dict(self) -> dict[str, t.Any]: |
| | info_dict = super().to_info_dict() |
| | info_dict["choices"] = self.choices |
| | info_dict["case_sensitive"] = self.case_sensitive |
| | return info_dict |
| |
|
| | def _normalized_mapping( |
| | self, ctx: Context | None = None |
| | ) -> cabc.Mapping[ParamTypeValue, str]: |
| | """ |
| | Returns mapping where keys are the original choices and the values are |
| | the normalized values that are accepted via the command line. |
| | |
| | This is a simple wrapper around :meth:`normalize_choice`, use that |
| | instead which is supported. |
| | """ |
| | return { |
| | choice: self.normalize_choice( |
| | choice=choice, |
| | ctx=ctx, |
| | ) |
| | for choice in self.choices |
| | } |
| |
|
| | def normalize_choice(self, choice: ParamTypeValue, ctx: Context | None) -> str: |
| | """ |
| | Normalize a choice value, used to map a passed string to a choice. |
| | Each choice must have a unique normalized value. |
| | |
| | By default uses :meth:`Context.token_normalize_func` and if not case |
| | sensitive, convert it to a casefolded value. |
| | |
| | .. versionadded:: 8.2.0 |
| | """ |
| | normed_value = choice.name if isinstance(choice, enum.Enum) else str(choice) |
| |
|
| | if ctx is not None and ctx.token_normalize_func is not None: |
| | normed_value = ctx.token_normalize_func(normed_value) |
| |
|
| | if not self.case_sensitive: |
| | normed_value = normed_value.casefold() |
| |
|
| | return normed_value |
| |
|
| | def get_metavar(self, param: Parameter, ctx: Context) -> str | None: |
| | if param.param_type_name == "option" and not param.show_choices: |
| | choice_metavars = [ |
| | convert_type(type(choice)).name.upper() for choice in self.choices |
| | ] |
| | choices_str = "|".join([*dict.fromkeys(choice_metavars)]) |
| | else: |
| | choices_str = "|".join( |
| | [str(i) for i in self._normalized_mapping(ctx=ctx).values()] |
| | ) |
| |
|
| | |
| | if param.required and param.param_type_name == "argument": |
| | return f"{{{choices_str}}}" |
| |
|
| | |
| | return f"[{choices_str}]" |
| |
|
| | def get_missing_message(self, param: Parameter, ctx: Context | None) -> str: |
| | """ |
| | Message shown when no choice is passed. |
| | |
| | .. versionchanged:: 8.2.0 Added ``ctx`` argument. |
| | """ |
| | return _("Choose from:\n\t{choices}").format( |
| | choices=",\n\t".join(self._normalized_mapping(ctx=ctx).values()) |
| | ) |
| |
|
| | def convert( |
| | self, value: t.Any, param: Parameter | None, ctx: Context | None |
| | ) -> ParamTypeValue: |
| | """ |
| | For a given value from the parser, normalize it and find its |
| | matching normalized value in the list of choices. Then return the |
| | matched "original" choice. |
| | """ |
| | normed_value = self.normalize_choice(choice=value, ctx=ctx) |
| | normalized_mapping = self._normalized_mapping(ctx=ctx) |
| |
|
| | try: |
| | return next( |
| | original |
| | for original, normalized in normalized_mapping.items() |
| | if normalized == normed_value |
| | ) |
| | except StopIteration: |
| | self.fail( |
| | self.get_invalid_choice_message(value=value, ctx=ctx), |
| | param=param, |
| | ctx=ctx, |
| | ) |
| |
|
| | def get_invalid_choice_message(self, value: t.Any, ctx: Context | None) -> str: |
| | """Get the error message when the given choice is invalid. |
| | |
| | :param value: The invalid value. |
| | |
| | .. versionadded:: 8.2 |
| | """ |
| | choices_str = ", ".join(map(repr, self._normalized_mapping(ctx=ctx).values())) |
| | return ngettext( |
| | "{value!r} is not {choice}.", |
| | "{value!r} is not one of {choices}.", |
| | len(self.choices), |
| | ).format(value=value, choice=choices_str, choices=choices_str) |
| |
|
| | def __repr__(self) -> str: |
| | return f"Choice({list(self.choices)})" |
| |
|
| | def shell_complete( |
| | self, ctx: Context, param: Parameter, incomplete: str |
| | ) -> list[CompletionItem]: |
| | """Complete choices that start with the incomplete value. |
| | |
| | :param ctx: Invocation context for this command. |
| | :param param: The parameter that is requesting completion. |
| | :param incomplete: Value being completed. May be empty. |
| | |
| | .. versionadded:: 8.0 |
| | """ |
| | from click.shell_completion import CompletionItem |
| |
|
| | str_choices = map(str, self.choices) |
| |
|
| | if self.case_sensitive: |
| | matched = (c for c in str_choices if c.startswith(incomplete)) |
| | else: |
| | incomplete = incomplete.lower() |
| | matched = (c for c in str_choices if c.lower().startswith(incomplete)) |
| |
|
| | return [CompletionItem(c) for c in matched] |
| |
|
| |
|
| | class DateTime(ParamType): |
| | """The DateTime type converts date strings into `datetime` objects. |
| | |
| | The format strings which are checked are configurable, but default to some |
| | common (non-timezone aware) ISO 8601 formats. |
| | |
| | When specifying *DateTime* formats, you should only pass a list or a tuple. |
| | Other iterables, like generators, may lead to surprising results. |
| | |
| | The format strings are processed using ``datetime.strptime``, and this |
| | consequently defines the format strings which are allowed. |
| | |
| | Parsing is tried using each format, in order, and the first format which |
| | parses successfully is used. |
| | |
| | :param formats: A list or tuple of date format strings, in the order in |
| | which they should be tried. Defaults to |
| | ``'%Y-%m-%d'``, ``'%Y-%m-%dT%H:%M:%S'``, |
| | ``'%Y-%m-%d %H:%M:%S'``. |
| | """ |
| |
|
| | name = "datetime" |
| |
|
| | def __init__(self, formats: cabc.Sequence[str] | None = None): |
| | self.formats: cabc.Sequence[str] = formats or [ |
| | "%Y-%m-%d", |
| | "%Y-%m-%dT%H:%M:%S", |
| | "%Y-%m-%d %H:%M:%S", |
| | ] |
| |
|
| | def to_info_dict(self) -> dict[str, t.Any]: |
| | info_dict = super().to_info_dict() |
| | info_dict["formats"] = self.formats |
| | return info_dict |
| |
|
| | def get_metavar(self, param: Parameter, ctx: Context) -> str | None: |
| | return f"[{'|'.join(self.formats)}]" |
| |
|
| | def _try_to_convert_date(self, value: t.Any, format: str) -> datetime | None: |
| | try: |
| | return datetime.strptime(value, format) |
| | except ValueError: |
| | return None |
| |
|
| | def convert( |
| | self, value: t.Any, param: Parameter | None, ctx: Context | None |
| | ) -> t.Any: |
| | if isinstance(value, datetime): |
| | return value |
| |
|
| | for format in self.formats: |
| | converted = self._try_to_convert_date(value, format) |
| |
|
| | if converted is not None: |
| | return converted |
| |
|
| | formats_str = ", ".join(map(repr, self.formats)) |
| | self.fail( |
| | ngettext( |
| | "{value!r} does not match the format {format}.", |
| | "{value!r} does not match the formats {formats}.", |
| | len(self.formats), |
| | ).format(value=value, format=formats_str, formats=formats_str), |
| | param, |
| | ctx, |
| | ) |
| |
|
| | def __repr__(self) -> str: |
| | return "DateTime" |
| |
|
| |
|
| | class _NumberParamTypeBase(ParamType): |
| | _number_class: t.ClassVar[type[t.Any]] |
| |
|
| | def convert( |
| | self, value: t.Any, param: Parameter | None, ctx: Context | None |
| | ) -> t.Any: |
| | try: |
| | return self._number_class(value) |
| | except ValueError: |
| | self.fail( |
| | _("{value!r} is not a valid {number_type}.").format( |
| | value=value, number_type=self.name |
| | ), |
| | param, |
| | ctx, |
| | ) |
| |
|
| |
|
| | class _NumberRangeBase(_NumberParamTypeBase): |
| | def __init__( |
| | self, |
| | min: float | None = None, |
| | max: float | None = None, |
| | min_open: bool = False, |
| | max_open: bool = False, |
| | clamp: bool = False, |
| | ) -> None: |
| | self.min = min |
| | self.max = max |
| | self.min_open = min_open |
| | self.max_open = max_open |
| | self.clamp = clamp |
| |
|
| | def to_info_dict(self) -> dict[str, t.Any]: |
| | info_dict = super().to_info_dict() |
| | info_dict.update( |
| | min=self.min, |
| | max=self.max, |
| | min_open=self.min_open, |
| | max_open=self.max_open, |
| | clamp=self.clamp, |
| | ) |
| | return info_dict |
| |
|
| | def convert( |
| | self, value: t.Any, param: Parameter | None, ctx: Context | None |
| | ) -> t.Any: |
| | import operator |
| |
|
| | rv = super().convert(value, param, ctx) |
| | lt_min: bool = self.min is not None and ( |
| | operator.le if self.min_open else operator.lt |
| | )(rv, self.min) |
| | gt_max: bool = self.max is not None and ( |
| | operator.ge if self.max_open else operator.gt |
| | )(rv, self.max) |
| |
|
| | if self.clamp: |
| | if lt_min: |
| | return self._clamp(self.min, 1, self.min_open) |
| |
|
| | if gt_max: |
| | return self._clamp(self.max, -1, self.max_open) |
| |
|
| | if lt_min or gt_max: |
| | self.fail( |
| | _("{value} is not in the range {range}.").format( |
| | value=rv, range=self._describe_range() |
| | ), |
| | param, |
| | ctx, |
| | ) |
| |
|
| | return rv |
| |
|
| | def _clamp(self, bound: float, dir: t.Literal[1, -1], open: bool) -> float: |
| | """Find the valid value to clamp to bound in the given |
| | direction. |
| | |
| | :param bound: The boundary value. |
| | :param dir: 1 or -1 indicating the direction to move. |
| | :param open: If true, the range does not include the bound. |
| | """ |
| | raise NotImplementedError |
| |
|
| | def _describe_range(self) -> str: |
| | """Describe the range for use in help text.""" |
| | if self.min is None: |
| | op = "<" if self.max_open else "<=" |
| | return f"x{op}{self.max}" |
| |
|
| | if self.max is None: |
| | op = ">" if self.min_open else ">=" |
| | return f"x{op}{self.min}" |
| |
|
| | lop = "<" if self.min_open else "<=" |
| | rop = "<" if self.max_open else "<=" |
| | return f"{self.min}{lop}x{rop}{self.max}" |
| |
|
| | def __repr__(self) -> str: |
| | clamp = " clamped" if self.clamp else "" |
| | return f"<{type(self).__name__} {self._describe_range()}{clamp}>" |
| |
|
| |
|
| | class IntParamType(_NumberParamTypeBase): |
| | name = "integer" |
| | _number_class = int |
| |
|
| | def __repr__(self) -> str: |
| | return "INT" |
| |
|
| |
|
| | class IntRange(_NumberRangeBase, IntParamType): |
| | """Restrict an :data:`click.INT` value to a range of accepted |
| | values. See :ref:`ranges`. |
| | |
| | If ``min`` or ``max`` are not passed, any value is accepted in that |
| | direction. If ``min_open`` or ``max_open`` are enabled, the |
| | corresponding boundary is not included in the range. |
| | |
| | If ``clamp`` is enabled, a value outside the range is clamped to the |
| | boundary instead of failing. |
| | |
| | .. versionchanged:: 8.0 |
| | Added the ``min_open`` and ``max_open`` parameters. |
| | """ |
| |
|
| | name = "integer range" |
| |
|
| | def _clamp( |
| | self, bound: int, dir: t.Literal[1, -1], open: bool |
| | ) -> int: |
| | if not open: |
| | return bound |
| |
|
| | return bound + dir |
| |
|
| |
|
| | class FloatParamType(_NumberParamTypeBase): |
| | name = "float" |
| | _number_class = float |
| |
|
| | def __repr__(self) -> str: |
| | return "FLOAT" |
| |
|
| |
|
| | class FloatRange(_NumberRangeBase, FloatParamType): |
| | """Restrict a :data:`click.FLOAT` value to a range of accepted |
| | values. See :ref:`ranges`. |
| | |
| | If ``min`` or ``max`` are not passed, any value is accepted in that |
| | direction. If ``min_open`` or ``max_open`` are enabled, the |
| | corresponding boundary is not included in the range. |
| | |
| | If ``clamp`` is enabled, a value outside the range is clamped to the |
| | boundary instead of failing. This is not supported if either |
| | boundary is marked ``open``. |
| | |
| | .. versionchanged:: 8.0 |
| | Added the ``min_open`` and ``max_open`` parameters. |
| | """ |
| |
|
| | name = "float range" |
| |
|
| | def __init__( |
| | self, |
| | min: float | None = None, |
| | max: float | None = None, |
| | min_open: bool = False, |
| | max_open: bool = False, |
| | clamp: bool = False, |
| | ) -> None: |
| | super().__init__( |
| | min=min, max=max, min_open=min_open, max_open=max_open, clamp=clamp |
| | ) |
| |
|
| | if (min_open or max_open) and clamp: |
| | raise TypeError("Clamping is not supported for open bounds.") |
| |
|
| | def _clamp(self, bound: float, dir: t.Literal[1, -1], open: bool) -> float: |
| | if not open: |
| | return bound |
| |
|
| | |
| | |
| | |
| | raise RuntimeError("Clamping is not supported for open bounds.") |
| |
|
| |
|
| | class BoolParamType(ParamType): |
| | name = "boolean" |
| |
|
| | bool_states: dict[str, bool] = { |
| | "1": True, |
| | "0": False, |
| | "yes": True, |
| | "no": False, |
| | "true": True, |
| | "false": False, |
| | "on": True, |
| | "off": False, |
| | "t": True, |
| | "f": False, |
| | "y": True, |
| | "n": False, |
| | |
| | "": False, |
| | } |
| | """A mapping of string values to boolean states. |
| | |
| | Mapping is inspired by :py:attr:`configparser.ConfigParser.BOOLEAN_STATES` |
| | and extends it. |
| | |
| | .. caution:: |
| | String values are lower-cased, as the ``str_to_bool`` comparison function |
| | below is case-insensitive. |
| | |
| | .. warning:: |
| | The mapping is not exhaustive, and does not cover all possible boolean strings |
| | representations. It will remains as it is to avoid endless bikeshedding. |
| | |
| | Future work my be considered to make this mapping user-configurable from public |
| | API. |
| | """ |
| |
|
| | @staticmethod |
| | def str_to_bool(value: str | bool) -> bool | None: |
| | """Convert a string to a boolean value. |
| | |
| | If the value is already a boolean, it is returned as-is. If the value is a |
| | string, it is stripped of whitespaces and lower-cased, then checked against |
| | the known boolean states pre-defined in the `BoolParamType.bool_states` mapping |
| | above. |
| | |
| | Returns `None` if the value does not match any known boolean state. |
| | """ |
| | if isinstance(value, bool): |
| | return value |
| | return BoolParamType.bool_states.get(value.strip().lower()) |
| |
|
| | def convert( |
| | self, value: t.Any, param: Parameter | None, ctx: Context | None |
| | ) -> bool: |
| | normalized = self.str_to_bool(value) |
| | if normalized is None: |
| | self.fail( |
| | _( |
| | "{value!r} is not a valid boolean. Recognized values: {states}" |
| | ).format(value=value, states=", ".join(sorted(self.bool_states))), |
| | param, |
| | ctx, |
| | ) |
| | return normalized |
| |
|
| | def __repr__(self) -> str: |
| | return "BOOL" |
| |
|
| |
|
| | class UUIDParameterType(ParamType): |
| | name = "uuid" |
| |
|
| | def convert( |
| | self, value: t.Any, param: Parameter | None, ctx: Context | None |
| | ) -> t.Any: |
| | import uuid |
| |
|
| | if isinstance(value, uuid.UUID): |
| | return value |
| |
|
| | value = value.strip() |
| |
|
| | try: |
| | return uuid.UUID(value) |
| | except ValueError: |
| | self.fail( |
| | _("{value!r} is not a valid UUID.").format(value=value), param, ctx |
| | ) |
| |
|
| | def __repr__(self) -> str: |
| | return "UUID" |
| |
|
| |
|
| | class File(ParamType): |
| | """Declares a parameter to be a file for reading or writing. The file |
| | is automatically closed once the context tears down (after the command |
| | finished working). |
| | |
| | Files can be opened for reading or writing. The special value ``-`` |
| | indicates stdin or stdout depending on the mode. |
| | |
| | By default, the file is opened for reading text data, but it can also be |
| | opened in binary mode or for writing. The encoding parameter can be used |
| | to force a specific encoding. |
| | |
| | The `lazy` flag controls if the file should be opened immediately or upon |
| | first IO. The default is to be non-lazy for standard input and output |
| | streams as well as files opened for reading, `lazy` otherwise. When opening a |
| | file lazily for reading, it is still opened temporarily for validation, but |
| | will not be held open until first IO. lazy is mainly useful when opening |
| | for writing to avoid creating the file until it is needed. |
| | |
| | Files can also be opened atomically in which case all writes go into a |
| | separate file in the same folder and upon completion the file will |
| | be moved over to the original location. This is useful if a file |
| | regularly read by other users is modified. |
| | |
| | See :ref:`file-args` for more information. |
| | |
| | .. versionchanged:: 2.0 |
| | Added the ``atomic`` parameter. |
| | """ |
| |
|
| | name = "filename" |
| | envvar_list_splitter: t.ClassVar[str] = os.path.pathsep |
| |
|
| | def __init__( |
| | self, |
| | mode: str = "r", |
| | encoding: str | None = None, |
| | errors: str | None = "strict", |
| | lazy: bool | None = None, |
| | atomic: bool = False, |
| | ) -> None: |
| | self.mode = mode |
| | self.encoding = encoding |
| | self.errors = errors |
| | self.lazy = lazy |
| | self.atomic = atomic |
| |
|
| | def to_info_dict(self) -> dict[str, t.Any]: |
| | info_dict = super().to_info_dict() |
| | info_dict.update(mode=self.mode, encoding=self.encoding) |
| | return info_dict |
| |
|
| | def resolve_lazy_flag(self, value: str | os.PathLike[str]) -> bool: |
| | if self.lazy is not None: |
| | return self.lazy |
| | if os.fspath(value) == "-": |
| | return False |
| | elif "w" in self.mode: |
| | return True |
| | return False |
| |
|
| | def convert( |
| | self, |
| | value: str | os.PathLike[str] | t.IO[t.Any], |
| | param: Parameter | None, |
| | ctx: Context | None, |
| | ) -> t.IO[t.Any]: |
| | if _is_file_like(value): |
| | return value |
| |
|
| | value = t.cast("str | os.PathLike[str]", value) |
| |
|
| | try: |
| | lazy = self.resolve_lazy_flag(value) |
| |
|
| | if lazy: |
| | lf = LazyFile( |
| | value, self.mode, self.encoding, self.errors, atomic=self.atomic |
| | ) |
| |
|
| | if ctx is not None: |
| | ctx.call_on_close(lf.close_intelligently) |
| |
|
| | return t.cast("t.IO[t.Any]", lf) |
| |
|
| | f, should_close = open_stream( |
| | value, self.mode, self.encoding, self.errors, atomic=self.atomic |
| | ) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | if ctx is not None: |
| | if should_close: |
| | ctx.call_on_close(safecall(f.close)) |
| | else: |
| | ctx.call_on_close(safecall(f.flush)) |
| |
|
| | return f |
| | except OSError as e: |
| | self.fail(f"'{format_filename(value)}': {e.strerror}", param, ctx) |
| |
|
| | def shell_complete( |
| | self, ctx: Context, param: Parameter, incomplete: str |
| | ) -> list[CompletionItem]: |
| | """Return a special completion marker that tells the completion |
| | system to use the shell to provide file path completions. |
| | |
| | :param ctx: Invocation context for this command. |
| | :param param: The parameter that is requesting completion. |
| | :param incomplete: Value being completed. May be empty. |
| | |
| | .. versionadded:: 8.0 |
| | """ |
| | from click.shell_completion import CompletionItem |
| |
|
| | return [CompletionItem(incomplete, type="file")] |
| |
|
| |
|
| | def _is_file_like(value: t.Any) -> te.TypeGuard[t.IO[t.Any]]: |
| | return hasattr(value, "read") or hasattr(value, "write") |
| |
|
| |
|
| | class Path(ParamType): |
| | """The ``Path`` type is similar to the :class:`File` type, but |
| | returns the filename instead of an open file. Various checks can be |
| | enabled to validate the type of file and permissions. |
| | |
| | :param exists: The file or directory needs to exist for the value to |
| | be valid. If this is not set to ``True``, and the file does not |
| | exist, then all further checks are silently skipped. |
| | :param file_okay: Allow a file as a value. |
| | :param dir_okay: Allow a directory as a value. |
| | :param readable: if true, a readable check is performed. |
| | :param writable: if true, a writable check is performed. |
| | :param executable: if true, an executable check is performed. |
| | :param resolve_path: Make the value absolute and resolve any |
| | symlinks. A ``~`` is not expanded, as this is supposed to be |
| | done by the shell only. |
| | :param allow_dash: Allow a single dash as a value, which indicates |
| | a standard stream (but does not open it). Use |
| | :func:`~click.open_file` to handle opening this value. |
| | :param path_type: Convert the incoming path value to this type. If |
| | ``None``, keep Python's default, which is ``str``. Useful to |
| | convert to :class:`pathlib.Path`. |
| | |
| | .. versionchanged:: 8.1 |
| | Added the ``executable`` parameter. |
| | |
| | .. versionchanged:: 8.0 |
| | Allow passing ``path_type=pathlib.Path``. |
| | |
| | .. versionchanged:: 6.0 |
| | Added the ``allow_dash`` parameter. |
| | """ |
| |
|
| | envvar_list_splitter: t.ClassVar[str] = os.path.pathsep |
| |
|
| | def __init__( |
| | self, |
| | exists: bool = False, |
| | file_okay: bool = True, |
| | dir_okay: bool = True, |
| | writable: bool = False, |
| | readable: bool = True, |
| | resolve_path: bool = False, |
| | allow_dash: bool = False, |
| | path_type: type[t.Any] | None = None, |
| | executable: bool = False, |
| | ): |
| | self.exists = exists |
| | self.file_okay = file_okay |
| | self.dir_okay = dir_okay |
| | self.readable = readable |
| | self.writable = writable |
| | self.executable = executable |
| | self.resolve_path = resolve_path |
| | self.allow_dash = allow_dash |
| | self.type = path_type |
| |
|
| | if self.file_okay and not self.dir_okay: |
| | self.name: str = _("file") |
| | elif self.dir_okay and not self.file_okay: |
| | self.name = _("directory") |
| | else: |
| | self.name = _("path") |
| |
|
| | def to_info_dict(self) -> dict[str, t.Any]: |
| | info_dict = super().to_info_dict() |
| | info_dict.update( |
| | exists=self.exists, |
| | file_okay=self.file_okay, |
| | dir_okay=self.dir_okay, |
| | writable=self.writable, |
| | readable=self.readable, |
| | allow_dash=self.allow_dash, |
| | ) |
| | return info_dict |
| |
|
| | def coerce_path_result( |
| | self, value: str | os.PathLike[str] |
| | ) -> str | bytes | os.PathLike[str]: |
| | if self.type is not None and not isinstance(value, self.type): |
| | if self.type is str: |
| | return os.fsdecode(value) |
| | elif self.type is bytes: |
| | return os.fsencode(value) |
| | else: |
| | return t.cast("os.PathLike[str]", self.type(value)) |
| |
|
| | return value |
| |
|
| | def convert( |
| | self, |
| | value: str | os.PathLike[str], |
| | param: Parameter | None, |
| | ctx: Context | None, |
| | ) -> str | bytes | os.PathLike[str]: |
| | rv = value |
| |
|
| | is_dash = self.file_okay and self.allow_dash and rv in (b"-", "-") |
| |
|
| | if not is_dash: |
| | if self.resolve_path: |
| | rv = os.path.realpath(rv) |
| |
|
| | try: |
| | st = os.stat(rv) |
| | except OSError: |
| | if not self.exists: |
| | return self.coerce_path_result(rv) |
| | self.fail( |
| | _("{name} {filename!r} does not exist.").format( |
| | name=self.name.title(), filename=format_filename(value) |
| | ), |
| | param, |
| | ctx, |
| | ) |
| |
|
| | if not self.file_okay and stat.S_ISREG(st.st_mode): |
| | self.fail( |
| | _("{name} {filename!r} is a file.").format( |
| | name=self.name.title(), filename=format_filename(value) |
| | ), |
| | param, |
| | ctx, |
| | ) |
| | if not self.dir_okay and stat.S_ISDIR(st.st_mode): |
| | self.fail( |
| | _("{name} {filename!r} is a directory.").format( |
| | name=self.name.title(), filename=format_filename(value) |
| | ), |
| | param, |
| | ctx, |
| | ) |
| |
|
| | if self.readable and not os.access(rv, os.R_OK): |
| | self.fail( |
| | _("{name} {filename!r} is not readable.").format( |
| | name=self.name.title(), filename=format_filename(value) |
| | ), |
| | param, |
| | ctx, |
| | ) |
| |
|
| | if self.writable and not os.access(rv, os.W_OK): |
| | self.fail( |
| | _("{name} {filename!r} is not writable.").format( |
| | name=self.name.title(), filename=format_filename(value) |
| | ), |
| | param, |
| | ctx, |
| | ) |
| |
|
| | if self.executable and not os.access(value, os.X_OK): |
| | self.fail( |
| | _("{name} {filename!r} is not executable.").format( |
| | name=self.name.title(), filename=format_filename(value) |
| | ), |
| | param, |
| | ctx, |
| | ) |
| |
|
| | return self.coerce_path_result(rv) |
| |
|
| | def shell_complete( |
| | self, ctx: Context, param: Parameter, incomplete: str |
| | ) -> list[CompletionItem]: |
| | """Return a special completion marker that tells the completion |
| | system to use the shell to provide path completions for only |
| | directories or any paths. |
| | |
| | :param ctx: Invocation context for this command. |
| | :param param: The parameter that is requesting completion. |
| | :param incomplete: Value being completed. May be empty. |
| | |
| | .. versionadded:: 8.0 |
| | """ |
| | from click.shell_completion import CompletionItem |
| |
|
| | type = "dir" if self.dir_okay and not self.file_okay else "file" |
| | return [CompletionItem(incomplete, type=type)] |
| |
|
| |
|
| | class Tuple(CompositeParamType): |
| | """The default behavior of Click is to apply a type on a value directly. |
| | This works well in most cases, except for when `nargs` is set to a fixed |
| | count and different types should be used for different items. In this |
| | case the :class:`Tuple` type can be used. This type can only be used |
| | if `nargs` is set to a fixed number. |
| | |
| | For more information see :ref:`tuple-type`. |
| | |
| | This can be selected by using a Python tuple literal as a type. |
| | |
| | :param types: a list of types that should be used for the tuple items. |
| | """ |
| |
|
| | def __init__(self, types: cabc.Sequence[type[t.Any] | ParamType]) -> None: |
| | self.types: cabc.Sequence[ParamType] = [convert_type(ty) for ty in types] |
| |
|
| | def to_info_dict(self) -> dict[str, t.Any]: |
| | info_dict = super().to_info_dict() |
| | info_dict["types"] = [t.to_info_dict() for t in self.types] |
| | return info_dict |
| |
|
| | @property |
| | def name(self) -> str: |
| | return f"<{' '.join(ty.name for ty in self.types)}>" |
| |
|
| | @property |
| | def arity(self) -> int: |
| | return len(self.types) |
| |
|
| | def convert( |
| | self, value: t.Any, param: Parameter | None, ctx: Context | None |
| | ) -> t.Any: |
| | len_type = len(self.types) |
| | len_value = len(value) |
| |
|
| | if len_value != len_type: |
| | self.fail( |
| | ngettext( |
| | "{len_type} values are required, but {len_value} was given.", |
| | "{len_type} values are required, but {len_value} were given.", |
| | len_value, |
| | ).format(len_type=len_type, len_value=len_value), |
| | param=param, |
| | ctx=ctx, |
| | ) |
| |
|
| | return tuple( |
| | ty(x, param, ctx) for ty, x in zip(self.types, value, strict=False) |
| | ) |
| |
|
| |
|
| | def convert_type(ty: t.Any | None, default: t.Any | None = None) -> ParamType: |
| | """Find the most appropriate :class:`ParamType` for the given Python |
| | type. If the type isn't provided, it can be inferred from a default |
| | value. |
| | """ |
| | guessed_type = False |
| |
|
| | if ty is None and default is not None: |
| | if isinstance(default, (tuple, list)): |
| | |
| | |
| | if default: |
| | item = default[0] |
| |
|
| | |
| | |
| | |
| | if isinstance(item, (tuple, list)): |
| | ty = tuple(map(type, item)) |
| | else: |
| | ty = type(item) |
| | else: |
| | ty = type(default) |
| |
|
| | guessed_type = True |
| |
|
| | if isinstance(ty, tuple): |
| | return Tuple(ty) |
| |
|
| | if isinstance(ty, ParamType): |
| | return ty |
| |
|
| | if ty is str or ty is None: |
| | return STRING |
| |
|
| | if ty is int: |
| | return INT |
| |
|
| | if ty is float: |
| | return FLOAT |
| |
|
| | if ty is bool: |
| | return BOOL |
| |
|
| | if guessed_type: |
| | return STRING |
| |
|
| | if __debug__: |
| | try: |
| | if issubclass(ty, ParamType): |
| | raise AssertionError( |
| | f"Attempted to use an uninstantiated parameter type ({ty})." |
| | ) |
| | except TypeError: |
| | |
| | pass |
| |
|
| | return FuncParamType(ty) |
| |
|
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | UNPROCESSED = UnprocessedParamType() |
| |
|
| | |
| | |
| | STRING = StringParamType() |
| |
|
| | |
| | |
| | INT = IntParamType() |
| |
|
| | |
| | |
| | FLOAT = FloatParamType() |
| |
|
| | |
| | |
| | BOOL = BoolParamType() |
| |
|
| | |
| | UUID = UUIDParameterType() |
| |
|
| |
|
| | class OptionHelpExtra(t.TypedDict, total=False): |
| | envvars: tuple[str, ...] |
| | default: str |
| | range: str |
| | required: str |
| |
|