Buckets:
| from __future__ import annotations | |
| import fnmatch | |
| import os | |
| import subprocess | |
| import sys | |
| import threading | |
| import time | |
| import typing as t | |
| from itertools import chain | |
| from pathlib import PurePath | |
| from ._internal import _log | |
| # The various system prefixes where imports are found. Base values are | |
| # different when running in a virtualenv. All reloaders will ignore the | |
| # base paths (usually the system installation). The stat reloader won't | |
| # scan the virtualenv paths, it will only include modules that are | |
| # already imported. | |
| _ignore_always = tuple({sys.base_prefix, sys.base_exec_prefix}) | |
| prefix = {*_ignore_always, sys.prefix, sys.exec_prefix} | |
| if hasattr(sys, "real_prefix"): | |
| # virtualenv < 20 | |
| prefix.add(sys.real_prefix) | |
| _stat_ignore_scan = tuple(prefix) | |
| del prefix | |
| # Ignore __pycache__ since a change there will always have a change to | |
| # the source file (or initial pyc file) as well. Ignore common version control | |
| # internals. Ignore common tool caches. | |
| _ignore_common_dirs = { | |
| "__pycache__", | |
| ".git", | |
| ".hg", | |
| ".tox", | |
| ".nox", | |
| ".pytest_cache", | |
| ".mypy_cache", | |
| } | |
| def _iter_module_paths() -> t.Iterator[str]: | |
| """Find the filesystem paths associated with imported modules.""" | |
| # List is in case the value is modified by the app while updating. | |
| for module in list(sys.modules.values()): | |
| name = getattr(module, "__file__", None) | |
| if name is None or name.startswith(_ignore_always): | |
| continue | |
| while not os.path.isfile(name): | |
| # Zip file, find the base file without the module path. | |
| old = name | |
| name = os.path.dirname(name) | |
| if name == old: # skip if it was all directories somehow | |
| break | |
| else: | |
| yield name | |
| def _remove_by_pattern(paths: set[str], exclude_patterns: set[str]) -> None: | |
| for pattern in exclude_patterns: | |
| paths.difference_update(fnmatch.filter(paths, pattern)) | |
| def _find_stat_paths( | |
| extra_files: set[str], exclude_patterns: set[str] | |
| ) -> t.Iterable[str]: | |
| """Find paths for the stat reloader to watch. Returns imported | |
| module files, Python files under non-system paths. Extra files and | |
| Python files under extra directories can also be scanned. | |
| System paths have to be excluded for efficiency. Non-system paths, | |
| such as a project root or ``sys.path.insert``, should be the paths | |
| of interest to the user anyway. | |
| """ | |
| paths = set() | |
| for path in chain(list(sys.path), extra_files): | |
| path = os.path.abspath(path) | |
| if os.path.isfile(path): | |
| # zip file on sys.path, or extra file | |
| paths.add(path) | |
| continue | |
| parent_has_py = {os.path.dirname(path): True} | |
| for root, dirs, files in os.walk(path): | |
| if ( | |
| root.startswith(_stat_ignore_scan) | |
| or os.path.basename(root) in _ignore_common_dirs | |
| ): | |
| dirs.clear() | |
| continue | |
| has_py = False | |
| for name in files: | |
| if name.endswith((".py", ".pyc")): | |
| has_py = True | |
| paths.add(os.path.join(root, name)) | |
| # Optimization: stop scanning a directory if neither it nor | |
| # its parent contained Python files. | |
| if not (has_py or parent_has_py[os.path.dirname(root)]): | |
| dirs.clear() | |
| continue | |
| parent_has_py[root] = has_py | |
| paths.update(_iter_module_paths()) | |
| _remove_by_pattern(paths, exclude_patterns) | |
| return paths | |
| def _find_watchdog_paths( | |
| extra_files: set[str], exclude_patterns: set[str] | |
| ) -> t.Iterable[str]: | |
| """Find paths for the stat reloader to watch. Looks at the same | |
| sources as the stat reloader, but watches everything under | |
| directories instead of individual files. | |
| """ | |
| dirs = set() | |
| for name in chain(list(sys.path), extra_files): | |
| name = os.path.abspath(name) | |
| if os.path.isfile(name): | |
| name = os.path.dirname(name) | |
| dirs.add(name) | |
| for name in _iter_module_paths(): | |
| dirs.add(os.path.dirname(name)) | |
| _remove_by_pattern(dirs, exclude_patterns) | |
| return _find_common_roots(dirs) | |
| def _find_common_roots(paths: t.Iterable[str]) -> t.Iterable[str]: | |
| root: dict[str, dict[str, t.Any]] = {} | |
| for chunks in sorted((PurePath(x).parts for x in paths), key=len, reverse=True): | |
| node = root | |
| for chunk in chunks: | |
| node = node.setdefault(chunk, {}) | |
| node.clear() | |
| rv = set() | |
| def _walk(node: t.Mapping[str, dict[str, t.Any]], path: tuple[str, ...]) -> None: | |
| for prefix, child in node.items(): | |
| _walk(child, path + (prefix,)) | |
| # If there are no more nodes, and a path has been accumulated, add it. | |
| # Path may be empty if the "" entry is in sys.path. | |
| if not node and path: | |
| rv.add(os.path.join(*path)) | |
| _walk(root, ()) | |
| return rv | |
| def _get_args_for_reloading() -> list[str]: | |
| """Determine how the script was executed, and return the args needed | |
| to execute it again in a new process. | |
| """ | |
| if sys.version_info >= (3, 10): | |
| # sys.orig_argv, added in Python 3.10, contains the exact args used to invoke | |
| # Python. Still replace argv[0] with sys.executable for accuracy. | |
| return [sys.executable, *sys.orig_argv[1:]] | |
| rv = [sys.executable] | |
| py_script = sys.argv[0] | |
| args = sys.argv[1:] | |
| # Need to look at main module to determine how it was executed. | |
| __main__ = sys.modules["__main__"] | |
| # The value of __package__ indicates how Python was called. It may | |
| # not exist if a setuptools script is installed as an egg. It may be | |
| # set incorrectly for entry points created with pip on Windows. | |
| if getattr(__main__, "__package__", None) is None or ( | |
| os.name == "nt" | |
| and __main__.__package__ == "" | |
| and not os.path.exists(py_script) | |
| and os.path.exists(f"{py_script}.exe") | |
| ): | |
| # Executed a file, like "python app.py". | |
| py_script = os.path.abspath(py_script) | |
| if os.name == "nt": | |
| # Windows entry points have ".exe" extension and should be | |
| # called directly. | |
| if not os.path.exists(py_script) and os.path.exists(f"{py_script}.exe"): | |
| py_script += ".exe" | |
| if ( | |
| os.path.splitext(sys.executable)[1] == ".exe" | |
| and os.path.splitext(py_script)[1] == ".exe" | |
| ): | |
| rv.pop(0) | |
| rv.append(py_script) | |
| else: | |
| # Executed a module, like "python -m werkzeug.serving". | |
| if os.path.isfile(py_script): | |
| # Rewritten by Python from "-m script" to "/path/to/script.py". | |
| py_module = t.cast(str, __main__.__package__) | |
| name = os.path.splitext(os.path.basename(py_script))[0] | |
| if name != "__main__": | |
| py_module += f".{name}" | |
| else: | |
| # Incorrectly rewritten by pydevd debugger from "-m script" to "script". | |
| py_module = py_script | |
| rv.extend(("-m", py_module.lstrip("."))) | |
| rv.extend(args) | |
| return rv | |
| class ReloaderLoop: | |
| name = "" | |
| def __init__( | |
| self, | |
| extra_files: t.Iterable[str] | None = None, | |
| exclude_patterns: t.Iterable[str] | None = None, | |
| interval: int | float = 1, | |
| ) -> None: | |
| self.extra_files: set[str] = {os.path.abspath(x) for x in extra_files or ()} | |
| self.exclude_patterns: set[str] = set(exclude_patterns or ()) | |
| self.interval = interval | |
| def __enter__(self) -> ReloaderLoop: | |
| """Do any setup, then run one step of the watch to populate the | |
| initial filesystem state. | |
| """ | |
| self.run_step() | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): # type: ignore | |
| """Clean up any resources associated with the reloader.""" | |
| pass | |
| def run(self) -> None: | |
| """Continually run the watch step, sleeping for the configured | |
| interval after each step. | |
| """ | |
| while True: | |
| self.run_step() | |
| time.sleep(self.interval) | |
| def run_step(self) -> None: | |
| """Run one step for watching the filesystem. Called once to set | |
| up initial state, then repeatedly to update it. | |
| """ | |
| pass | |
| def restart_with_reloader(self) -> int: | |
| """Spawn a new Python interpreter with the same arguments as the | |
| current one, but running the reloader thread. | |
| """ | |
| while True: | |
| _log("info", f" * Restarting with {self.name}") | |
| args = _get_args_for_reloading() | |
| new_environ = os.environ.copy() | |
| new_environ["WERKZEUG_RUN_MAIN"] = "true" | |
| exit_code = subprocess.call(args, env=new_environ, close_fds=False) | |
| if exit_code != 3: | |
| return exit_code | |
| def trigger_reload(self, filename: str) -> None: | |
| self.log_reload(filename) | |
| sys.exit(3) | |
| def log_reload(self, filename: str | bytes) -> None: | |
| filename = os.path.abspath(filename) | |
| _log("info", f" * Detected change in {filename!r}, reloading") | |
| class StatReloaderLoop(ReloaderLoop): | |
| name = "stat" | |
| def __enter__(self) -> ReloaderLoop: | |
| self.mtimes: dict[str, float] = {} | |
| return super().__enter__() | |
| def run_step(self) -> None: | |
| for name in _find_stat_paths(self.extra_files, self.exclude_patterns): | |
| try: | |
| mtime = os.stat(name).st_mtime | |
| except OSError: | |
| continue | |
| old_time = self.mtimes.get(name) | |
| if old_time is None: | |
| self.mtimes[name] = mtime | |
| continue | |
| if mtime > old_time: | |
| self.trigger_reload(name) | |
| class WatchdogReloaderLoop(ReloaderLoop): | |
| def __init__(self, *args: t.Any, **kwargs: t.Any) -> None: | |
| from watchdog.events import EVENT_TYPE_CLOSED | |
| from watchdog.events import EVENT_TYPE_CREATED | |
| from watchdog.events import EVENT_TYPE_DELETED | |
| from watchdog.events import EVENT_TYPE_MODIFIED | |
| from watchdog.events import EVENT_TYPE_MOVED | |
| from watchdog.events import FileModifiedEvent | |
| from watchdog.events import PatternMatchingEventHandler | |
| from watchdog.observers import Observer | |
| super().__init__(*args, **kwargs) | |
| trigger_reload = self.trigger_reload | |
| class EventHandler(PatternMatchingEventHandler): | |
| def on_any_event(self, event: FileModifiedEvent) -> None: # type: ignore[override] | |
| if event.event_type not in { | |
| EVENT_TYPE_CLOSED, | |
| EVENT_TYPE_CREATED, | |
| EVENT_TYPE_DELETED, | |
| EVENT_TYPE_MODIFIED, | |
| EVENT_TYPE_MOVED, | |
| }: | |
| # skip events that don't involve changes to the file | |
| return | |
| trigger_reload(event.src_path) | |
| reloader_name = Observer.__name__.lower() # type: ignore[attr-defined] | |
| if reloader_name.endswith("observer"): | |
| reloader_name = reloader_name[:-8] | |
| self.name = f"watchdog ({reloader_name})" | |
| self.observer = Observer() | |
| extra_patterns = (p for p in self.extra_files if not os.path.isdir(p)) | |
| self.event_handler = EventHandler( | |
| patterns=["*.py", "*.pyc", "*.zip", *extra_patterns], | |
| ignore_patterns=[ | |
| *[f"*/{d}/*" for d in _ignore_common_dirs], | |
| *self.exclude_patterns, | |
| ], | |
| ) | |
| self.should_reload = threading.Event() | |
| def trigger_reload(self, filename: str | bytes) -> None: | |
| # This is called inside an event handler, which means throwing | |
| # SystemExit has no effect. | |
| # https://github.com/gorakhargosh/watchdog/issues/294 | |
| self.should_reload.set() | |
| self.log_reload(filename) | |
| def __enter__(self) -> ReloaderLoop: | |
| self.watches: dict[str, t.Any] = {} | |
| self.observer.start() | |
| return super().__enter__() | |
| def __exit__(self, exc_type, exc_val, exc_tb): # type: ignore | |
| self.observer.stop() | |
| self.observer.join() | |
| def run(self) -> None: | |
| while not self.should_reload.wait(timeout=self.interval): | |
| self.run_step() | |
| sys.exit(3) | |
| def run_step(self) -> None: | |
| to_delete = set(self.watches) | |
| for path in _find_watchdog_paths(self.extra_files, self.exclude_patterns): | |
| if path not in self.watches: | |
| try: | |
| self.watches[path] = self.observer.schedule( | |
| self.event_handler, path, recursive=True | |
| ) | |
| except OSError: | |
| # Clear this path from list of watches. We don't want | |
| # the same error message showing again in the next | |
| # iteration. | |
| self.watches[path] = None | |
| to_delete.discard(path) | |
| for path in to_delete: | |
| watch = self.watches.pop(path, None) | |
| if watch is not None: | |
| self.observer.unschedule(watch) | |
| reloader_loops: dict[str, type[ReloaderLoop]] = { | |
| "stat": StatReloaderLoop, | |
| "watchdog": WatchdogReloaderLoop, | |
| } | |
| try: | |
| __import__("watchdog.observers") | |
| except ImportError: | |
| reloader_loops["auto"] = reloader_loops["stat"] | |
| else: | |
| reloader_loops["auto"] = reloader_loops["watchdog"] | |
| def ensure_echo_on() -> None: | |
| """Ensure that echo mode is enabled. Some tools such as PDB disable | |
| it which causes usability issues after a reload.""" | |
| # tcgetattr will fail if stdin isn't a tty | |
| if sys.stdin is None or not sys.stdin.isatty(): | |
| return | |
| try: | |
| import termios | |
| except ImportError: | |
| return | |
| attributes = termios.tcgetattr(sys.stdin) | |
| if not attributes[3] & termios.ECHO: | |
| attributes[3] |= termios.ECHO | |
| termios.tcsetattr(sys.stdin, termios.TCSANOW, attributes) | |
| def run_with_reloader( | |
| main_func: t.Callable[[], None], | |
| extra_files: t.Iterable[str] | None = None, | |
| exclude_patterns: t.Iterable[str] | None = None, | |
| interval: int | float = 1, | |
| reloader_type: str = "auto", | |
| ) -> None: | |
| """Run the given function in an independent Python interpreter.""" | |
| import signal | |
| signal.signal(signal.SIGTERM, lambda *args: sys.exit(0)) | |
| reloader = reloader_loops[reloader_type]( | |
| extra_files=extra_files, exclude_patterns=exclude_patterns, interval=interval | |
| ) | |
| try: | |
| if os.environ.get("WERKZEUG_RUN_MAIN") == "true": | |
| ensure_echo_on() | |
| t = threading.Thread(target=main_func, args=()) | |
| t.daemon = True | |
| # Enter the reloader to set up initial state, then start | |
| # the app thread and reloader update loop. | |
| with reloader: | |
| t.start() | |
| reloader.run() | |
| else: | |
| sys.exit(reloader.restart_with_reloader()) | |
| except KeyboardInterrupt: | |
| pass | |
Xet Storage Details
- Size:
- 15.1 kB
- Xet hash:
- d73782e6063d9d58ce30d28837ff0c610bcf17f84b8e242da0504c3ed6e50699
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.