xinjie.wang
update
7734c01
# Copyright (c) Meta Platforms, Inc. and affiliates.
from typing import Any, Iterable, Tuple, Union, Dict, Sequence, Mapping, Container
import optree
import torch
from collections.abc import Iterable
import inspect
import ast
import astor
from torch.utils import _pytree
# None = root, Iterable[Any] = path, Any = path of one
ChildPathType = Union[None, Iterable[Any], Any]
ArgsType = Iterable[ChildPathType]
KwargsType = Mapping[str, ChildPathType]
ArgsKwargsType = Tuple[ArgsType, KwargsType]
MappingType = Union[None, ArgsKwargsType, ArgsType, KwargsType]
def tree_transpose_level_one(
structure,
check_children=False,
map_fn=None,
is_leaf=None,
):
_, outer_spec = optree.tree_flatten(
structure,
is_leaf=lambda x: x is not structure,
none_is_leaf=True,
)
spec = optree.tree_structure(structure, none_is_leaf=True, is_leaf=is_leaf)
children_spec = spec.children()
if len(children_spec) > 0:
inner_spec = children_spec[0]
if check_children:
for child_spec in children_spec[1:]:
assert (
inner_spec == child_spec
), f"one child was found having a different tree structure ({inner_spec} != {child_spec})"
structure = optree.tree_transpose(outer_spec, inner_spec, structure)
if map_fn is not None:
structure = optree.tree_map(
map_fn,
structure,
is_leaf=lambda x: optree.tree_structure(
x, is_leaf=is_leaf, none_is_leaf=True
)
== outer_spec,
none_is_leaf=True,
)
return structure
@staticmethod
def tree_tensor_map(fn, tree, *rest):
return optree.tree_map(
fn,
tree,
*rest,
is_leaf=lambda x: isinstance(x, torch.Tensor),
none_is_leaf=False,
)
def to_device(obj, device):
"""Recursively moves all tensors in obj to the specified device.
Args:
obj: Object to move to device - can be a tensor, list, tuple, dict or any nested combination
device: Target device (e.g. 'cuda', 'cpu', torch.device('cuda:0') etc.)
Returns:
Same object structure with all contained tensors moved to specified device
"""
to_fn = lambda x: x.to(device)
return optree.tree_map(to_fn, obj, is_leaf=torch.is_tensor, none_is_leaf=False)
def expand_right(tensor, target_shape):
"""
e.g. Takes tensor of (a, b, c) and returns a tensor of (a, b, c, 1, 1, ...)
"""
current_shape = tensor.shape
dims_to_add = len(target_shape) - len(current_shape)
result = tensor
for _ in range(dims_to_add):
result = result.unsqueeze(-1)
expand_shape = list(current_shape) + [-1] * dims_to_add
for i in range(len(target_shape)):
if i < len(expand_shape) and expand_shape[i] == -1:
expand_shape[i] = target_shape[i]
return result.expand(*expand_shape)
def expand_as_right(tensor, target):
return expand_right(tensor, target.shape)
def as_keys(path: ChildPathType):
if isinstance(path, Iterable) and (not isinstance(path, str)):
return tuple(path)
elif path is None:
return ()
return (path,)
def get_child(obj: Any, *keys: Iterable[Any]):
for key in keys:
obj = obj[key]
return obj
def set_child(obj: Any, value: Any, *keys: Iterable[Any]):
parent = None
for key in keys:
parent = obj
obj = obj[key]
if parent is None:
obj = value
else:
parent[key] = value
return obj
def build_args_batch_extractor(args_mapping: ArgsType):
def extract_fn(batch):
return tuple(get_child(batch, *as_keys(path)) for path in args_mapping)
return extract_fn
def build_kwargs_batch_extractor(kwargs_mapping: KwargsType):
def extract_fn(batch):
return {
name: get_child(batch, *as_keys(path))
for name, path in kwargs_mapping.items()
}
return extract_fn
empty_mapping = object()
kwargs_identity_mapping = object()
def build_batch_extractor(mapping: MappingType):
extract_args_fn = lambda x: ()
extract_kwargs_fn = lambda x: {}
if mapping is None:
def extract_args_fn(batch):
return (batch,)
elif mapping is empty_mapping:
pass
elif mapping is kwargs_identity_mapping:
extract_kwargs_fn = lambda x: x
elif isinstance(mapping, Sequence) and (not isinstance(mapping, str)):
if (
len(mapping) == 2
and isinstance(mapping[0], Sequence)
and isinstance(mapping[1], Dict)
):
extract_args_fn = build_args_batch_extractor(mapping[0])
extract_kwargs_fn = build_kwargs_batch_extractor(mapping[1])
else:
extract_args_fn = build_args_batch_extractor(mapping)
elif isinstance(mapping, Mapping):
extract_kwargs_fn = build_kwargs_batch_extractor(mapping)
else:
def extract_args_fn(batch):
return (get_child(batch, *as_keys(mapping)),)
def extract_fn(batch):
return extract_args_fn(batch), extract_kwargs_fn(batch)
return extract_fn
# >
def right_broadcasting(arr, target):
return arr.reshape(arr.shape + (1,) * (target.ndim - arr.ndim))
def get_stats(tensor: torch.Tensor):
float_tensor = tensor.float()
return {
"shape": tuple(tensor.shape),
"min": tensor.min().item(),
"max": tensor.max().item(),
"mean": float_tensor.mean().item(),
"median": tensor.median().item(),
"std": float_tensor.std().item(),
}
def _get_caller_arg_name(argnum=0, parent_frame=1):
try:
frame = inspect.currentframe() # current frame
frame = inspect.getouterframes(frame)[1 + parent_frame] # parent frame
code = inspect.getframeinfo(frame[0]).code_context[0].strip() # get code line
tree = ast.parse(code)
for node in ast.walk(tree):
if isinstance(node, ast.Call):
args = node.args
break # only get the first parent call
# get first argument string (do not handle '=')
label = astor.to_source(args[argnum]).strip()
except:
# TODO(Pierre) log exception
label = "{label}"
return label
def print_stats(tensor, label=None):
if label is None:
label = _get_caller_arg_name(argnum=0)
stats = get_stats(tensor)
string = f"{label}:\n" + "\n".join(f"- {k}: {v}" for k, v in stats.items())
print(string)
def tree_reduce_unique(fn, tree, ensure_unique=True, **kwargs):
values = _pytree.tree_flatten(tree, **kwargs)[0]
values = tuple(map(fn, values))
first = values[0]
if ensure_unique:
for value in values[1:]:
if value != first:
raise RuntimeError(
f"different values found, {value} and {first} should be the same"
)
return first