| | """ |
| | Formatting many files at once via multiprocessing. Contains entrypoint and utilities. |
| | |
| | NOTE: this module is only imported if we need to format several files at once. |
| | """ |
| |
|
| | import asyncio |
| | import logging |
| | import os |
| | import signal |
| | import sys |
| | from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor |
| | from multiprocessing import Manager |
| | from pathlib import Path |
| | from typing import Any, Iterable, Optional, Set |
| |
|
| | from mypy_extensions import mypyc_attr |
| |
|
| | from black import WriteBack, format_file_in_place |
| | from black.cache import Cache, filter_cached, read_cache, write_cache |
| | from black.mode import Mode |
| | from black.output import err |
| | from black.report import Changed, Report |
| |
|
| |
|
| | def maybe_install_uvloop() -> None: |
| | """If our environment has uvloop installed we use it. |
| | |
| | This is called only from command-line entry points to avoid |
| | interfering with the parent process if Black is used as a library. |
| | """ |
| | try: |
| | import uvloop |
| |
|
| | uvloop.install() |
| | except ImportError: |
| | pass |
| |
|
| |
|
| | def cancel(tasks: Iterable["asyncio.Task[Any]"]) -> None: |
| | """asyncio signal handler that cancels all `tasks` and reports to stderr.""" |
| | err("Aborted!") |
| | for task in tasks: |
| | task.cancel() |
| |
|
| |
|
| | def shutdown(loop: asyncio.AbstractEventLoop) -> None: |
| | """Cancel all pending tasks on `loop`, wait for them, and close the loop.""" |
| | try: |
| | |
| | to_cancel = [task for task in asyncio.all_tasks(loop) if not task.done()] |
| | if not to_cancel: |
| | return |
| |
|
| | for task in to_cancel: |
| | task.cancel() |
| | loop.run_until_complete(asyncio.gather(*to_cancel, return_exceptions=True)) |
| | finally: |
| | |
| | |
| | |
| | cf_logger = logging.getLogger("concurrent.futures") |
| | cf_logger.setLevel(logging.CRITICAL) |
| | loop.close() |
| |
|
| |
|
| | |
| | |
| | @mypyc_attr(patchable=True) |
| | def reformat_many( |
| | sources: Set[Path], |
| | fast: bool, |
| | write_back: WriteBack, |
| | mode: Mode, |
| | report: Report, |
| | workers: Optional[int], |
| | ) -> None: |
| | """Reformat multiple files using a ProcessPoolExecutor.""" |
| | maybe_install_uvloop() |
| |
|
| | executor: Executor |
| | if workers is None: |
| | workers = os.cpu_count() or 1 |
| | if sys.platform == "win32": |
| | |
| | workers = min(workers, 60) |
| | try: |
| | executor = ProcessPoolExecutor(max_workers=workers) |
| | except (ImportError, NotImplementedError, OSError): |
| | |
| | |
| | |
| | |
| | executor = ThreadPoolExecutor(max_workers=1) |
| |
|
| | loop = asyncio.new_event_loop() |
| | asyncio.set_event_loop(loop) |
| | try: |
| | loop.run_until_complete( |
| | schedule_formatting( |
| | sources=sources, |
| | fast=fast, |
| | write_back=write_back, |
| | mode=mode, |
| | report=report, |
| | loop=loop, |
| | executor=executor, |
| | ) |
| | ) |
| | finally: |
| | try: |
| | shutdown(loop) |
| | finally: |
| | asyncio.set_event_loop(None) |
| | if executor is not None: |
| | executor.shutdown() |
| |
|
| |
|
| | async def schedule_formatting( |
| | sources: Set[Path], |
| | fast: bool, |
| | write_back: WriteBack, |
| | mode: Mode, |
| | report: "Report", |
| | loop: asyncio.AbstractEventLoop, |
| | executor: "Executor", |
| | ) -> None: |
| | """Run formatting of `sources` in parallel using the provided `executor`. |
| | |
| | (Use ProcessPoolExecutors for actual parallelism.) |
| | |
| | `write_back`, `fast`, and `mode` options are passed to |
| | :func:`format_file_in_place`. |
| | """ |
| | cache: Cache = {} |
| | if write_back not in (WriteBack.DIFF, WriteBack.COLOR_DIFF): |
| | cache = read_cache(mode) |
| | sources, cached = filter_cached(cache, sources) |
| | for src in sorted(cached): |
| | report.done(src, Changed.CACHED) |
| | if not sources: |
| | return |
| |
|
| | cancelled = [] |
| | sources_to_cache = [] |
| | lock = None |
| | if write_back in (WriteBack.DIFF, WriteBack.COLOR_DIFF): |
| | |
| | |
| | manager = Manager() |
| | lock = manager.Lock() |
| | tasks = { |
| | asyncio.ensure_future( |
| | loop.run_in_executor( |
| | executor, format_file_in_place, src, fast, mode, write_back, lock |
| | ) |
| | ): src |
| | for src in sorted(sources) |
| | } |
| | pending = tasks.keys() |
| | try: |
| | loop.add_signal_handler(signal.SIGINT, cancel, pending) |
| | loop.add_signal_handler(signal.SIGTERM, cancel, pending) |
| | except NotImplementedError: |
| | |
| | pass |
| | while pending: |
| | done, _ = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) |
| | for task in done: |
| | src = tasks.pop(task) |
| | if task.cancelled(): |
| | cancelled.append(task) |
| | elif task.exception(): |
| | report.failed(src, str(task.exception())) |
| | else: |
| | changed = Changed.YES if task.result() else Changed.NO |
| | |
| | |
| | if write_back is WriteBack.YES or ( |
| | write_back is WriteBack.CHECK and changed is Changed.NO |
| | ): |
| | sources_to_cache.append(src) |
| | report.done(src, changed) |
| | if cancelled: |
| | await asyncio.gather(*cancelled, return_exceptions=True) |
| | if sources_to_cache: |
| | write_cache(cache, sources_to_cache, mode) |
| |
|