| | import io |
| | import logging |
| | import os |
| | import pathlib |
| | import shutil |
| | import sys |
| | import tempfile |
| | from collections import OrderedDict |
| | from contextlib import contextmanager |
| | from typing import (IO, Dict, Iterable, Iterator, Mapping, Optional, Tuple, |
| | Union) |
| |
|
| | from .parser import Binding, parse_stream |
| | from .variables import parse_variables |
| |
|
| | |
| | |
| | |
| | |
| | StrPath = Union[str, 'os.PathLike[str]'] |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | def with_warn_for_invalid_lines(mappings: Iterator[Binding]) -> Iterator[Binding]: |
| | for mapping in mappings: |
| | if mapping.error: |
| | logger.warning( |
| | "Python-dotenv could not parse statement starting at line %s", |
| | mapping.original.line, |
| | ) |
| | yield mapping |
| |
|
| |
|
| | class DotEnv: |
| | def __init__( |
| | self, |
| | dotenv_path: Optional[StrPath], |
| | stream: Optional[IO[str]] = None, |
| | verbose: bool = False, |
| | encoding: Optional[str] = None, |
| | interpolate: bool = True, |
| | override: bool = True, |
| | ) -> None: |
| | self.dotenv_path: Optional[StrPath] = dotenv_path |
| | self.stream: Optional[IO[str]] = stream |
| | self._dict: Optional[Dict[str, Optional[str]]] = None |
| | self.verbose: bool = verbose |
| | self.encoding: Optional[str] = encoding |
| | self.interpolate: bool = interpolate |
| | self.override: bool = override |
| |
|
| | @contextmanager |
| | def _get_stream(self) -> Iterator[IO[str]]: |
| | if self.dotenv_path and os.path.isfile(self.dotenv_path): |
| | with open(self.dotenv_path, encoding=self.encoding) as stream: |
| | yield stream |
| | elif self.stream is not None: |
| | yield self.stream |
| | else: |
| | if self.verbose: |
| | logger.info( |
| | "Python-dotenv could not find configuration file %s.", |
| | self.dotenv_path or '.env', |
| | ) |
| | yield io.StringIO('') |
| |
|
| | def dict(self) -> Dict[str, Optional[str]]: |
| | """Return dotenv as dict""" |
| | if self._dict: |
| | return self._dict |
| |
|
| | raw_values = self.parse() |
| |
|
| | if self.interpolate: |
| | self._dict = OrderedDict(resolve_variables(raw_values, override=self.override)) |
| | else: |
| | self._dict = OrderedDict(raw_values) |
| |
|
| | return self._dict |
| |
|
| | def parse(self) -> Iterator[Tuple[str, Optional[str]]]: |
| | with self._get_stream() as stream: |
| | for mapping in with_warn_for_invalid_lines(parse_stream(stream)): |
| | if mapping.key is not None: |
| | yield mapping.key, mapping.value |
| |
|
| | def set_as_environment_variables(self) -> bool: |
| | """ |
| | Load the current dotenv as system environment variable. |
| | """ |
| | if not self.dict(): |
| | return False |
| |
|
| | for k, v in self.dict().items(): |
| | if k in os.environ and not self.override: |
| | continue |
| | if v is not None: |
| | os.environ[k] = v |
| |
|
| | return True |
| |
|
| | def get(self, key: str) -> Optional[str]: |
| | """ |
| | """ |
| | data = self.dict() |
| |
|
| | if key in data: |
| | return data[key] |
| |
|
| | if self.verbose: |
| | logger.warning("Key %s not found in %s.", key, self.dotenv_path) |
| |
|
| | return None |
| |
|
| |
|
| | def get_key( |
| | dotenv_path: StrPath, |
| | key_to_get: str, |
| | encoding: Optional[str] = "utf-8", |
| | ) -> Optional[str]: |
| | """ |
| | Get the value of a given key from the given .env. |
| | |
| | Returns `None` if the key isn't found or doesn't have a value. |
| | """ |
| | return DotEnv(dotenv_path, verbose=True, encoding=encoding).get(key_to_get) |
| |
|
| |
|
| | @contextmanager |
| | def rewrite( |
| | path: StrPath, |
| | encoding: Optional[str], |
| | ) -> Iterator[Tuple[IO[str], IO[str]]]: |
| | pathlib.Path(path).touch() |
| |
|
| | with tempfile.NamedTemporaryFile(mode="w", encoding=encoding, delete=False) as dest: |
| | error = None |
| | try: |
| | with open(path, encoding=encoding) as source: |
| | yield (source, dest) |
| | except BaseException as err: |
| | error = err |
| |
|
| | if error is None: |
| | shutil.move(dest.name, path) |
| | else: |
| | os.unlink(dest.name) |
| | raise error from None |
| |
|
| |
|
| | def set_key( |
| | dotenv_path: StrPath, |
| | key_to_set: str, |
| | value_to_set: str, |
| | quote_mode: str = "always", |
| | export: bool = False, |
| | encoding: Optional[str] = "utf-8", |
| | ) -> Tuple[Optional[bool], str, str]: |
| | """ |
| | Adds or Updates a key/value to the given .env |
| | |
| | If the .env path given doesn't exist, fails instead of risking creating |
| | an orphan .env somewhere in the filesystem |
| | """ |
| | if quote_mode not in ("always", "auto", "never"): |
| | raise ValueError(f"Unknown quote_mode: {quote_mode}") |
| |
|
| | quote = ( |
| | quote_mode == "always" |
| | or (quote_mode == "auto" and not value_to_set.isalnum()) |
| | ) |
| |
|
| | if quote: |
| | value_out = "'{}'".format(value_to_set.replace("'", "\\'")) |
| | else: |
| | value_out = value_to_set |
| | if export: |
| | line_out = f'export {key_to_set}={value_out}\n' |
| | else: |
| | line_out = f"{key_to_set}={value_out}\n" |
| |
|
| | with rewrite(dotenv_path, encoding=encoding) as (source, dest): |
| | replaced = False |
| | missing_newline = False |
| | for mapping in with_warn_for_invalid_lines(parse_stream(source)): |
| | if mapping.key == key_to_set: |
| | dest.write(line_out) |
| | replaced = True |
| | else: |
| | dest.write(mapping.original.string) |
| | missing_newline = not mapping.original.string.endswith("\n") |
| | if not replaced: |
| | if missing_newline: |
| | dest.write("\n") |
| | dest.write(line_out) |
| |
|
| | return True, key_to_set, value_to_set |
| |
|
| |
|
| | def unset_key( |
| | dotenv_path: StrPath, |
| | key_to_unset: str, |
| | quote_mode: str = "always", |
| | encoding: Optional[str] = "utf-8", |
| | ) -> Tuple[Optional[bool], str]: |
| | """ |
| | Removes a given key from the given `.env` file. |
| | |
| | If the .env path given doesn't exist, fails. |
| | If the given key doesn't exist in the .env, fails. |
| | """ |
| | if not os.path.exists(dotenv_path): |
| | logger.warning("Can't delete from %s - it doesn't exist.", dotenv_path) |
| | return None, key_to_unset |
| |
|
| | removed = False |
| | with rewrite(dotenv_path, encoding=encoding) as (source, dest): |
| | for mapping in with_warn_for_invalid_lines(parse_stream(source)): |
| | if mapping.key == key_to_unset: |
| | removed = True |
| | else: |
| | dest.write(mapping.original.string) |
| |
|
| | if not removed: |
| | logger.warning("Key %s not removed from %s - key doesn't exist.", key_to_unset, dotenv_path) |
| | return None, key_to_unset |
| |
|
| | return removed, key_to_unset |
| |
|
| |
|
| | def resolve_variables( |
| | values: Iterable[Tuple[str, Optional[str]]], |
| | override: bool, |
| | ) -> Mapping[str, Optional[str]]: |
| | new_values: Dict[str, Optional[str]] = {} |
| |
|
| | for (name, value) in values: |
| | if value is None: |
| | result = None |
| | else: |
| | atoms = parse_variables(value) |
| | env: Dict[str, Optional[str]] = {} |
| | if override: |
| | env.update(os.environ) |
| | env.update(new_values) |
| | else: |
| | env.update(new_values) |
| | env.update(os.environ) |
| | result = "".join(atom.resolve(env) for atom in atoms) |
| |
|
| | new_values[name] = result |
| |
|
| | return new_values |
| |
|
| |
|
| | def _walk_to_root(path: str) -> Iterator[str]: |
| | """ |
| | Yield directories starting from the given directory up to the root |
| | """ |
| | if not os.path.exists(path): |
| | raise IOError('Starting path not found') |
| |
|
| | if os.path.isfile(path): |
| | path = os.path.dirname(path) |
| |
|
| | last_dir = None |
| | current_dir = os.path.abspath(path) |
| | while last_dir != current_dir: |
| | yield current_dir |
| | parent_dir = os.path.abspath(os.path.join(current_dir, os.path.pardir)) |
| | last_dir, current_dir = current_dir, parent_dir |
| |
|
| |
|
| | def find_dotenv( |
| | filename: str = '.env', |
| | raise_error_if_not_found: bool = False, |
| | usecwd: bool = False, |
| | ) -> str: |
| | """ |
| | Search in increasingly higher folders for the given file |
| | |
| | Returns path to the file if found, or an empty string otherwise |
| | """ |
| |
|
| | def _is_interactive(): |
| | """ Decide whether this is running in a REPL or IPython notebook """ |
| | try: |
| | main = __import__('__main__', None, None, fromlist=['__file__']) |
| | except ModuleNotFoundError: |
| | return False |
| | return not hasattr(main, '__file__') |
| |
|
| | if usecwd or _is_interactive() or getattr(sys, 'frozen', False): |
| | |
| | path = os.getcwd() |
| | else: |
| | |
| | frame = sys._getframe() |
| | current_file = __file__ |
| |
|
| | while frame.f_code.co_filename == current_file or not os.path.exists( |
| | frame.f_code.co_filename |
| | ): |
| | assert frame.f_back is not None |
| | frame = frame.f_back |
| | frame_filename = frame.f_code.co_filename |
| | path = os.path.dirname(os.path.abspath(frame_filename)) |
| |
|
| | for dirname in _walk_to_root(path): |
| | check_path = os.path.join(dirname, filename) |
| | if os.path.isfile(check_path): |
| | return check_path |
| |
|
| | if raise_error_if_not_found: |
| | raise IOError('File not found') |
| |
|
| | return '' |
| |
|
| |
|
| | def load_dotenv( |
| | dotenv_path: Optional[StrPath] = None, |
| | stream: Optional[IO[str]] = None, |
| | verbose: bool = False, |
| | override: bool = False, |
| | interpolate: bool = True, |
| | encoding: Optional[str] = "utf-8", |
| | ) -> bool: |
| | """Parse a .env file and then load all the variables found as environment variables. |
| | |
| | Parameters: |
| | dotenv_path: Absolute or relative path to .env file. |
| | stream: Text stream (such as `io.StringIO`) with .env content, used if |
| | `dotenv_path` is `None`. |
| | verbose: Whether to output a warning the .env file is missing. |
| | override: Whether to override the system environment variables with the variables |
| | from the `.env` file. |
| | encoding: Encoding to be used to read the file. |
| | Returns: |
| | Bool: True if at least one environment variable is set else False |
| | |
| | If both `dotenv_path` and `stream` are `None`, `find_dotenv()` is used to find the |
| | .env file. |
| | """ |
| | if dotenv_path is None and stream is None: |
| | dotenv_path = find_dotenv() |
| |
|
| | dotenv = DotEnv( |
| | dotenv_path=dotenv_path, |
| | stream=stream, |
| | verbose=verbose, |
| | interpolate=interpolate, |
| | override=override, |
| | encoding=encoding, |
| | ) |
| | return dotenv.set_as_environment_variables() |
| |
|
| |
|
| | def dotenv_values( |
| | dotenv_path: Optional[StrPath] = None, |
| | stream: Optional[IO[str]] = None, |
| | verbose: bool = False, |
| | interpolate: bool = True, |
| | encoding: Optional[str] = "utf-8", |
| | ) -> Dict[str, Optional[str]]: |
| | """ |
| | Parse a .env file and return its content as a dict. |
| | |
| | The returned dict will have `None` values for keys without values in the .env file. |
| | For example, `foo=bar` results in `{"foo": "bar"}` whereas `foo` alone results in |
| | `{"foo": None}` |
| | |
| | Parameters: |
| | dotenv_path: Absolute or relative path to the .env file. |
| | stream: `StringIO` object with .env content, used if `dotenv_path` is `None`. |
| | verbose: Whether to output a warning if the .env file is missing. |
| | encoding: Encoding to be used to read the file. |
| | |
| | If both `dotenv_path` and `stream` are `None`, `find_dotenv()` is used to find the |
| | .env file. |
| | """ |
| | if dotenv_path is None and stream is None: |
| | dotenv_path = find_dotenv() |
| |
|
| | return DotEnv( |
| | dotenv_path=dotenv_path, |
| | stream=stream, |
| | verbose=verbose, |
| | interpolate=interpolate, |
| | override=True, |
| | encoding=encoding, |
| | ).dict() |
| |
|