| from __future__ import annotations |
|
|
| import math |
|
|
|
|
| def calc_chunk_sizes( |
| chunk_size: int | tuple[int, int] | None, |
| chunk_count: int | tuple[int, int] | None, |
| total_chunk_count: int | None, |
| ny: int, |
| nx: int, |
| ) -> tuple[int, int]: |
| """Calculate chunk sizes. |
| |
| Args: |
| chunk_size (int or tuple(int, int), optional): Chunk size in (y, x) directions, or the same |
| size in both directions if only one is specified. Cannot be negative. |
| chunk_count (int or tuple(int, int), optional): Chunk count in (y, x) directions, or the |
| same count in both directions if only one is specified. If less than 1, set to 1. |
| total_chunk_count (int, optional): Total number of chunks. If less than 1, set to 1. |
| ny (int): Number of grid points in y-direction. |
| nx (int): Number of grid points in x-direction. |
| |
| Return: |
| tuple(int, int): Chunk sizes (y_chunk_size, x_chunk_size). |
| |
| Note: |
| Zero or one of ``chunk_size``, ``chunk_count`` and ``total_chunk_count`` should be |
| specified. |
| """ |
| if sum([chunk_size is not None, chunk_count is not None, total_chunk_count is not None]) > 1: |
| raise ValueError("Only one of chunk_size, chunk_count and total_chunk_count should be set") |
|
|
| if nx < 2 or ny < 2: |
| raise ValueError(f"(ny, nx) must be at least (2, 2), not ({ny}, {nx})") |
|
|
| if total_chunk_count is not None: |
| max_chunk_count = (nx-1)*(ny-1) |
| total_chunk_count = min(max(total_chunk_count, 1), max_chunk_count) |
| if total_chunk_count == 1: |
| chunk_size = 0 |
| elif total_chunk_count == max_chunk_count: |
| chunk_size = (1, 1) |
| else: |
| factors = two_factors(total_chunk_count) |
| if ny > nx: |
| chunk_count = factors |
| else: |
| chunk_count = (factors[1], factors[0]) |
|
|
| if chunk_count is not None: |
| if isinstance(chunk_count, tuple): |
| y_chunk_count, x_chunk_count = chunk_count |
| else: |
| y_chunk_count = x_chunk_count = chunk_count |
| x_chunk_count = min(max(x_chunk_count, 1), nx-1) |
| y_chunk_count = min(max(y_chunk_count, 1), ny-1) |
| chunk_size = (math.ceil((ny-1) / y_chunk_count), math.ceil((nx-1) / x_chunk_count)) |
|
|
| if chunk_size is None: |
| y_chunk_size = x_chunk_size = 0 |
| elif isinstance(chunk_size, tuple): |
| y_chunk_size, x_chunk_size = chunk_size |
| else: |
| y_chunk_size = x_chunk_size = chunk_size |
|
|
| if x_chunk_size < 0 or y_chunk_size < 0: |
| raise ValueError("chunk_size cannot be negative") |
|
|
| return y_chunk_size, x_chunk_size |
|
|
|
|
| def two_factors(n: int) -> tuple[int, int]: |
| """Split an integer into two integer factors. |
| |
| The two factors will be as close as possible to the sqrt of n, and are returned in decreasing |
| order. Worst case returns (n, 1). |
| |
| Args: |
| n (int): The integer to factorize, must be positive. |
| |
| Return: |
| tuple(int, int): The two factors of n, in decreasing order. |
| """ |
| if n < 0: |
| raise ValueError(f"two_factors expects positive integer not {n}") |
|
|
| i = math.ceil(math.sqrt(n)) |
| while n % i != 0: |
| i -= 1 |
| j = n // i |
| if i > j: |
| return i, j |
| else: |
| return j, i |
|
|