|
|
from __future__ import annotations |
|
|
|
|
|
import functools |
|
|
import sys |
|
|
from collections.abc import Generator, Iterable, Iterator |
|
|
from typing import Callable, Literal, TypeVar |
|
|
|
|
|
from pip._vendor.rich.progress import ( |
|
|
BarColumn, |
|
|
DownloadColumn, |
|
|
FileSizeColumn, |
|
|
MofNCompleteColumn, |
|
|
Progress, |
|
|
ProgressColumn, |
|
|
SpinnerColumn, |
|
|
TextColumn, |
|
|
TimeElapsedColumn, |
|
|
TimeRemainingColumn, |
|
|
TransferSpeedColumn, |
|
|
) |
|
|
|
|
|
from pip._internal.cli.spinners import RateLimiter |
|
|
from pip._internal.req.req_install import InstallRequirement |
|
|
from pip._internal.utils.logging import get_console, get_indentation |
|
|
|
|
|
T = TypeVar("T") |
|
|
ProgressRenderer = Callable[[Iterable[T]], Iterator[T]] |
|
|
BarType = Literal["on", "off", "raw"] |
|
|
|
|
|
|
|
|
def _rich_download_progress_bar( |
|
|
iterable: Iterable[bytes], |
|
|
*, |
|
|
bar_type: BarType, |
|
|
size: int | None, |
|
|
initial_progress: int | None = None, |
|
|
) -> Generator[bytes, None, None]: |
|
|
assert bar_type == "on", "This should only be used in the default mode." |
|
|
|
|
|
if not size: |
|
|
total = float("inf") |
|
|
columns: tuple[ProgressColumn, ...] = ( |
|
|
TextColumn("[progress.description]{task.description}"), |
|
|
SpinnerColumn("line", speed=1.5), |
|
|
FileSizeColumn(), |
|
|
TransferSpeedColumn(), |
|
|
TimeElapsedColumn(), |
|
|
) |
|
|
else: |
|
|
total = size |
|
|
columns = ( |
|
|
TextColumn("[progress.description]{task.description}"), |
|
|
BarColumn(), |
|
|
DownloadColumn(), |
|
|
TransferSpeedColumn(), |
|
|
TextColumn("{task.fields[time_description]}"), |
|
|
TimeRemainingColumn(elapsed_when_finished=True), |
|
|
) |
|
|
|
|
|
progress = Progress(*columns, refresh_per_second=5) |
|
|
task_id = progress.add_task( |
|
|
" " * (get_indentation() + 2), total=total, time_description="eta" |
|
|
) |
|
|
if initial_progress is not None: |
|
|
progress.update(task_id, advance=initial_progress) |
|
|
with progress: |
|
|
for chunk in iterable: |
|
|
yield chunk |
|
|
progress.update(task_id, advance=len(chunk)) |
|
|
progress.update(task_id, time_description="") |
|
|
|
|
|
|
|
|
def _rich_install_progress_bar( |
|
|
iterable: Iterable[InstallRequirement], *, total: int |
|
|
) -> Iterator[InstallRequirement]: |
|
|
columns = ( |
|
|
TextColumn("{task.fields[indent]}"), |
|
|
BarColumn(), |
|
|
MofNCompleteColumn(), |
|
|
TextColumn("{task.description}"), |
|
|
) |
|
|
console = get_console() |
|
|
|
|
|
bar = Progress(*columns, refresh_per_second=6, console=console, transient=True) |
|
|
|
|
|
|
|
|
task = bar.add_task("", total=total, indent=" " * get_indentation(), visible=False) |
|
|
with bar: |
|
|
for req in iterable: |
|
|
bar.update(task, description=rf"\[{req.name}]", visible=True) |
|
|
yield req |
|
|
bar.advance(task) |
|
|
|
|
|
|
|
|
def _raw_progress_bar( |
|
|
iterable: Iterable[bytes], |
|
|
*, |
|
|
size: int | None, |
|
|
initial_progress: int | None = None, |
|
|
) -> Generator[bytes, None, None]: |
|
|
def write_progress(current: int, total: int) -> None: |
|
|
sys.stdout.write(f"Progress {current} of {total}\n") |
|
|
sys.stdout.flush() |
|
|
|
|
|
current = initial_progress or 0 |
|
|
total = size or 0 |
|
|
rate_limiter = RateLimiter(0.25) |
|
|
|
|
|
write_progress(current, total) |
|
|
for chunk in iterable: |
|
|
current += len(chunk) |
|
|
if rate_limiter.ready() or current == total: |
|
|
write_progress(current, total) |
|
|
rate_limiter.reset() |
|
|
yield chunk |
|
|
|
|
|
|
|
|
def get_download_progress_renderer( |
|
|
*, bar_type: BarType, size: int | None = None, initial_progress: int | None = None |
|
|
) -> ProgressRenderer[bytes]: |
|
|
"""Get an object that can be used to render the download progress. |
|
|
|
|
|
Returns a callable, that takes an iterable to "wrap". |
|
|
""" |
|
|
if bar_type == "on": |
|
|
return functools.partial( |
|
|
_rich_download_progress_bar, |
|
|
bar_type=bar_type, |
|
|
size=size, |
|
|
initial_progress=initial_progress, |
|
|
) |
|
|
elif bar_type == "raw": |
|
|
return functools.partial( |
|
|
_raw_progress_bar, |
|
|
size=size, |
|
|
initial_progress=initial_progress, |
|
|
) |
|
|
else: |
|
|
return iter |
|
|
|
|
|
|
|
|
def get_install_progress_renderer( |
|
|
*, bar_type: BarType, total: int |
|
|
) -> ProgressRenderer[InstallRequirement]: |
|
|
"""Get an object that can be used to render the install progress. |
|
|
Returns a callable, that takes an iterable to "wrap". |
|
|
""" |
|
|
if bar_type == "on": |
|
|
return functools.partial(_rich_install_progress_bar, total=total) |
|
|
else: |
|
|
return iter |
|
|
|