base_IIXIV / fla /utils.py
mainline777's picture
Duplicate from silx-ai/Quasar-Preview
41865df
Raw
History Blame Contribute Delete
20.6 kB
# Copyright (c) 2023-2025, Songlin Yang, Yu Zhang
import contextlib
import functools
import inspect
import logging
import os
import sys
import warnings
from collections.abc import Callable
from enum import Enum
from functools import lru_cache
from typing import TYPE_CHECKING, Any
import torch
import triton
from packaging import version
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from fla import __version__
FLA_CI_ENV = os.getenv("FLA_CI_ENV") == "1"
FLA_CACHE_RESULTS = os.getenv('FLA_CACHE_RESULTS', '1') == '1'
FLA_DISABLE_TENSOR_CACHE = os.getenv('FLA_DISABLE_TENSOR_CACHE', '0') == '1'
TRITON_ABOVE_3_4_0 = version.parse(triton.__version__) >= version.parse("3.4.0")
TRITON_ABOVE_3_5_1 = version.parse(triton.__version__) >= version.parse("3.5.1")
SUPPORTS_AUTOTUNE_CACHE = "cache_results" in inspect.signature(triton.autotune).parameters
autotune_cache_kwargs = {"cache_results": FLA_CACHE_RESULTS} if SUPPORTS_AUTOTUNE_CACHE else {}
@lru_cache(maxsize=1)
def check_environments():
"""
Checks the current operating system, Triton version, and Python version,
issuing warnings if they don't meet recommendations.
This function's body only runs once due to lru_cache.
"""
# Check Operating System
if sys.platform == 'win32':
# Check if triton-windows is installed
try:
from importlib.metadata import PackageNotFoundError, metadata
metadata('triton-windows')
# triton-windows is installed, no warning needed
except PackageNotFoundError:
logger.warning(
"Detected Windows operating system. Consider installing triton-windows "
"(https://github.com/triton-lang/triton-windows) for better compatibility. "
"Without it, some features may not work correctly.",
)
triton_version = version.parse(triton.__version__)
required_triton_version = version.parse("3.3.0")
if triton_version < required_triton_version:
logger.warning(
f"Current Triton version {triton_version} is below the recommended 3.3.0 version. "
"Errors may occur and these issues will not be fixed. "
"Please consider upgrading Triton.",
)
# Check Python version
py_version = version.parse(f"{sys.version_info.major}.{sys.version_info.minor}")
required_py_version = version.parse("3.11")
if py_version < required_py_version:
logger.warning(
f"Current Python version {py_version} is below the recommended 3.11 version. "
"It is recommended to upgrade to Python 3.11 or higher for the best experience.",
)
return None
check_environments()
def get_abs_err(x, y):
return (x.detach()-y.detach()).flatten().abs().max().item()
def get_err_ratio(x, y):
err = (x.detach()-y.detach()).flatten().square().mean().sqrt().item()
base = (x.detach()).flatten().square().mean().sqrt().item()
return err / (base + 1e-8)
def assert_close(prefix, ref, tri, ratio, warning=False, err_atol=1e-6):
abs_atol = get_abs_err(ref, tri)
msg = f"{prefix:>16} diff: {abs_atol:.6f} ratio: {get_err_ratio(ref, tri):.6f}"
logger.info(msg)
error_rate = get_err_ratio(ref, tri)
if abs_atol <= err_atol:
return
assert not torch.isnan(ref).any(), f"{prefix}: NaN detected in ref"
assert not torch.isnan(tri).any(), f"{prefix}: NaN detected in tri"
if warning or (FLA_CI_ENV and (error_rate < 0.01 or abs_atol <= 0.3)):
if error_rate > ratio:
warnings.warn(msg)
else:
assert error_rate < ratio, msg
def tensor_cache(
fn: Callable[..., torch.Tensor],
) -> Callable[..., torch.Tensor]:
"""
A decorator that caches the most recent result of a function with tensor inputs.
This decorator will store the output of the decorated function for the most recent set of input tensors.
If the function is called again with the same input tensors, it will return the cached result.
If FLA_DISABLE_TENSOR_CACHE environment variable is set to '1', caching is disabled.
Args:
fn (Callable[..., torch.Tensor]):
The function to be decorated. It should take tensor inputs and return tensor outputs.
Returns:
Callable[..., torch.Tensor]:
A wrapped version of the input function with single-entry caching.
"""
last_args: tuple | None = None
last_kwargs: dict | None = None
last_result: Any = None
@functools.wraps(fn)
def wrapper(*args: Any, **kwargs: Any) -> Any:
nonlocal last_args, last_kwargs, last_result
# Skip cache if FLA_DISABLE_TENSOR_CACHE is set
if FLA_DISABLE_TENSOR_CACHE:
return fn(*args, **kwargs)
if last_args is not None and last_kwargs is not None:
if len(args) == len(last_args) and len(kwargs) == len(last_kwargs):
if all(a is b for a, b in zip(args, last_args, strict=False)) and \
all(k in last_kwargs and v is last_kwargs[k] for k, v in kwargs.items()):
return last_result
result = fn(*args, **kwargs)
last_args, last_kwargs, last_result = args, kwargs, result
return result
return wrapper
def input_guard(
fn: Callable[..., torch.Tensor] | None = None,
*,
no_guard_contiguous: bool | list[str] = False,
) -> Callable[[Callable[..., torch.Tensor]], Callable[..., torch.Tensor]] | Callable[..., torch.Tensor]:
"""
A decorator to make sure all input tensors are contiguous and set the device based on input tensors.
Args:
no_guard_contiguous: If True, skip all contiguous checks. If a list of parameter names, skip contiguous check for those parameters.
"""
def decorator(fn: Callable[..., torch.Tensor]) -> Callable[..., torch.Tensor]:
# Get function signature for parameter name mapping
sig = inspect.signature(fn)
param_names = list(sig.parameters.keys())
@functools.wraps(fn)
def wrapper(*args, **kwargs):
# Convert no_guard_contiguous to list of parameter names if it's a list
skip_params = set()
if isinstance(no_guard_contiguous, list):
skip_params = set(no_guard_contiguous)
# Process args with parameter name mapping
processed_args = []
for i, arg in enumerate(args):
if i < len(param_names):
param_name = param_names[i]
else:
# For *args beyond signature, use position as name
param_name = f"__arg_{i}"
if isinstance(arg, torch.Tensor):
if no_guard_contiguous is True or param_name in skip_params:
processed_args.append(arg)
else:
processed_args.append(arg.contiguous())
else:
processed_args.append(arg)
# Process kwargs
processed_kwargs = {}
for k, v in kwargs.items():
if isinstance(v, torch.Tensor):
if no_guard_contiguous is True or k in skip_params:
processed_kwargs[k] = v
else:
processed_kwargs[k] = v.contiguous()
else:
processed_kwargs[k] = v
tensor = None
for arg in args:
if isinstance(arg, torch.Tensor):
tensor = arg
break
if tensor is None:
for value in kwargs.values():
if isinstance(value, torch.Tensor):
tensor = value
break
if tensor is not None:
ctx = custom_device_ctx(tensor.device.index)
else:
ctx = contextlib.nullcontext()
with ctx:
return fn(*processed_args, **processed_kwargs)
return wrapper
# Handle direct usage without parentheses: @input_guard
if fn is not None:
return decorator(fn)
return decorator
def contiguous(fn: Callable[..., torch.Tensor]) -> Callable[..., torch.Tensor]:
"""Alias for input_guard() without parameters."""
return input_guard(fn)
def require_version(version, hint):
"""
Perform a runtime check of the dependency versions, using the exact same syntax used by pip.
"""
def decorator(fn):
@functools.wraps(fn)
def wrapper(ctx, *args, **kwargs):
from transformers.utils.versions import require_version
require_version(version, hint)
return fn(ctx,
*(i if not isinstance(i, torch.Tensor) else i.contiguous() for i in args),
**{k: (v if not isinstance(v, torch.Tensor) else v.contiguous()) for k, v in kwargs.items()})
return wrapper
return decorator
class Action(Enum):
NONE = "none"
NOTIFY = "notify"
NOTIFY_ALWAYS = "notify_always"
RAISE = "raise"
def deprecate_kwarg(
old_name: str,
version: str,
new_name: str | None = None,
warn_if_greater_or_equal_version: bool = False,
raise_if_greater_or_equal_version: bool = False,
raise_if_both_names: bool = False,
additional_message: str | None = None,
):
"""
Decorator to notify users about deprecated keyword arguments, replacing them with a new name if specified.
This decorator allows you to:
- Notify users when a keyword argument is deprecated.
- Automatically replace deprecated keyword arguments with new ones.
- Raise an error if deprecated arguments are used, depending on the specified conditions.
By default, the decorator notifies the user about the deprecated argument while the `fla.__version__` < specified `version`
in the decorator. To keep notifications with any version `warn_if_greater_or_equal_version=True` can be set.
Args:
old_name (`str`):
Name of the deprecated keyword argument.
version (`str`):
The version in which the keyword argument was (or will be) deprecated.
new_name (`Optional[str]`, *optional*):
The new name for the deprecated keyword argument.
If specified, the deprecated keyword argument will be replaced with this new name.
warn_if_greater_or_equal_version (`bool`, *optional*, defaults to `False`):
Whether to show warning if current `fla` version is greater or equal to the deprecated version.
raise_if_greater_or_equal_version (`bool`, *optional*, defaults to `False`):
Whether to raise `ValueError` if current `fla` version is greater or equal to the deprecated version.
raise_if_both_names (`bool`, *optional*, defaults to `False`):
Whether to raise `ValueError` if both deprecated and new keyword arguments are set.
additional_message (`Optional[str]`, *optional*):
An additional message to append to the default deprecation message.
Raises:
ValueError:
If `raise_if_greater_or_equal_version` is `True` and the current version >= the deprecated one,
or if `raise_if_both_names` is `True` and both old and new keyword arguments are provided.
Returns:
Callable:
A wrapped function that handles the deprecated keyword arguments according to the specified parameters.
Example usage with renaming argument:
```python
@deprecate_kwarg("reduce_labels", new_name="do_reduce_labels", version="6.0.0")
def my_function(do_reduce_labels):
print(do_reduce_labels)
my_function(reduce_labels=True) # Will show a deprecation warning and use do_reduce_labels=True
```
Example usage without renaming argument:
```python
@deprecate_kwarg("max_size", version="6.0.0")
def my_function(max_size):
print(max_size)
my_function(max_size=1333) # Will show a deprecation warning
```
"""
deprecated_version = version.parse(version)
current_version = version.parse(__version__)
is_greater_or_equal_version = current_version >= deprecated_version
if is_greater_or_equal_version:
version_message = f"and removed starting from version {version}"
else:
version_message = f"and will be removed in version {version}"
def wrapper(func):
# Required for better warning message
sig = inspect.signature(func)
function_named_args = set(sig.parameters.keys())
is_instance_method = "self" in function_named_args
is_class_method = "cls" in function_named_args
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
# Get class + function name (just for better warning message)
func_name = func.__name__
if is_instance_method:
func_name = f"{args[0].__class__.__name__}.{func_name}"
elif is_class_method:
func_name = f"{args[0].__name__}.{func_name}"
minimum_action = Action.NONE
message = None
# deprecated kwarg and its new version are set for function call -> replace it with new name
if old_name in kwargs and new_name in kwargs:
minimum_action = Action.RAISE if raise_if_both_names else Action.NOTIFY_ALWAYS
message = (
f"Both `{old_name}` and `{new_name}` are set for `{func_name}`. "
f"Using `{new_name}={kwargs[new_name]}` and ignoring deprecated `{old_name}={kwargs[old_name]}`."
)
kwargs.pop(old_name)
# only deprecated kwarg is set for function call -> replace it with new name
elif old_name in kwargs and new_name is not None and new_name not in kwargs:
minimum_action = Action.NOTIFY
message = (
f"`{old_name}` is deprecated {version_message} for `{func_name}`. "
f"Use `{new_name}` instead."
)
kwargs[new_name] = kwargs.pop(old_name)
# deprecated kwarg is not set for function call and new name is not specified -> just notify
elif old_name in kwargs:
minimum_action = Action.NOTIFY
message = f"`{old_name}` is deprecated {version_message} for `{func_name}`."
if message is not None and additional_message is not None:
message = f"{message} {additional_message}"
# update minimum_action if argument is ALREADY deprecated (current version >= deprecated version)
if is_greater_or_equal_version:
# change to (NOTIFY, NOTIFY_ALWAYS) -> RAISE if specified
# in case we want to raise error for already deprecated arguments
if raise_if_greater_or_equal_version and minimum_action != Action.NONE:
minimum_action = Action.RAISE
# change to NOTIFY -> NONE if specified (NOTIFY_ALWAYS can't be changed to NONE)
# in case we want to ignore notifications for already deprecated arguments
elif not warn_if_greater_or_equal_version and minimum_action == Action.NOTIFY:
minimum_action = Action.NONE
# raise error or notify user
if minimum_action == Action.RAISE:
raise ValueError(message)
elif minimum_action in (Action.NOTIFY, Action.NOTIFY_ALWAYS):
# DeprecationWarning is ignored by default, so we use FutureWarning instead
warnings.warn(message, FutureWarning, stacklevel=2)
return func(*args, **kwargs)
return wrapped_func
return wrapper
def checkpoint(fn):
def wrapper(*args, **kwargs):
return torch.utils.checkpoint.checkpoint(fn, *args, **kwargs)
return wrapper
@functools.cache
def check_pytorch_version(version_s: str = '2.4') -> bool:
return version.parse(torch.__version__) >= version.parse(version_s)
def _cpu_device_warning():
warnings.warn(('Triton is not supported on current platform, roll back to CPU.'), stacklevel=1)
@functools.cache
def get_multiprocessor_count(tensor_idx: int = 0) -> int:
try:
return triton.runtime.driver.active.utils.get_device_properties(tensor_idx)['multiprocessor_count']
except BaseException:
# Maybe we use a NPU device.
if triton.runtime.driver.active.get_current_target().backend == 'npu':
return triton.runtime.driver.active.utils.get_device_properties(tensor_idx)['num_vectorcore']
else:
return 1
@functools.cache
def get_available_device() -> str:
try:
return triton.runtime.driver.active.get_current_target().backend
except BaseException:
_cpu_device_warning()
return 'cpu'
def map_triton_backend_to_torch_device() -> str:
backend = get_available_device() # 'cuda' | 'hip' | 'xpu' | 'cpu' | ...
return {'cuda': 'cuda', 'hip': 'cuda', 'xpu': 'xpu'}.get(backend, backend)
# Avoid CUDA/Triton driver probing at import time. Runtime kernels still launch
# on the tensors' devices, but importing FLA should not initialize CUDA.
device_platform = os.environ.get("FLA_DEVICE_PLATFORM", "cuda")
device_name = "cuda" if device_platform in {"cuda", "hip"} else device_platform
device = "cuda" if device_platform in {"cuda", "hip"} else device_platform
device_torch_lib = getattr(torch, device, torch.cuda)
IS_AMD = (device_platform == 'hip')
IS_INTEL = (device_platform == 'xpu')
IS_NVIDIA = (device_platform == 'cuda')
IS_INTEL_ALCHEMIST = False
IS_NVIDIA_HOPPER = IS_NVIDIA
IS_NVIDIA_BLACKWELL = False
USE_CUDA_GRAPH = False
IS_TF32_SUPPORTED = IS_NVIDIA
IS_GATHER_SUPPORTED = hasattr(triton.language, 'gather')
IS_TMA_SUPPORTED = False
if IS_NVIDIA and not IS_TF32_SUPPORTED:
# Make old card happy, since triton will use tf32 by default.
# This is a workaround for old nvidia card.
os.environ['TRITON_F32_DEFAULT'] = 'ieee'
if IS_TMA_SUPPORTED:
logger.info('TMA is supported, using TMA by default.')
def alloc_fn(size: int, alignment: int, stream: int | None):
return torch.empty(size, device=torch.device(device_name, device_torch_lib.current_device()), dtype=torch.int8)
triton.set_allocator(alloc_fn)
def get_all_max_shared_mem():
try:
return [
triton.runtime.driver.active.utils.get_device_properties(i)['max_shared_mem']
for i in range(device_torch_lib.device_count())
]
except BaseException:
_cpu_device_warning()
return [-1]
class Backend(Enum):
ADA = 101376 # RTX 4090
AMPERE = 166912 # A100
HOPPER = 232448 # H100
DEFAULT = 102400 # Default
@classmethod
def get_shared_memory(cls, arch: str) -> int:
try:
return cls[arch.upper()].value
except KeyError:
return cls.DEFAULT.value
@functools.cache
def check_shared_mem(arch: str = "none", tensor_idx: int = 0) -> bool:
try:
device_shared_mem_list = get_all_max_shared_mem()
max_shared_memory = device_shared_mem_list[tensor_idx]
return max_shared_memory >= Backend.get_shared_memory(arch)
except Exception:
return False
if check_pytorch_version('2.4'):
device = 'cuda' if device == 'cpu' else device
autocast_custom_fwd = functools.partial(torch.amp.custom_fwd, device_type=device)
autocast_custom_bwd = functools.partial(torch.amp.custom_bwd, device_type=device)
def custom_device_ctx(index: int):
return device_torch_lib.device(index)
else:
assert device == 'cuda', 'Only cuda device is supported for PyTorch version < 2.4.0.'
autocast_custom_fwd = device_torch_lib.amp.custom_fwd
autocast_custom_bwd = device_torch_lib.amp.custom_bwd
def custom_device_ctx(index: int):
return torch.cuda.device(index)
def _register_aliases():
current_module = sys.modules[__name__]
for key in (
'IS_AMD',
'IS_INTEL',
'IS_NVIDIA',
'IS_INTEL_ALCHEMIST',
'IS_NVIDIA_HOPPER',
'IS_NVIDIA_BLACKWELL',
'USE_CUDA_GRAPH',
'IS_TF32_SUPPORTED',
'IS_GATHER_SUPPORTED',
'IS_TMA_SUPPORTED',
):
if hasattr(current_module, key):
setattr(current_module, key.lower(), getattr(current_module, key))
_register_aliases()
del _register_aliases