File size: 47,402 Bytes
9dd3461 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 |
import contextlib
from dataclasses import dataclass
from enum import auto, Enum
from itertools import accumulate, chain
from typing import (
Any,
cast,
Dict,
Generator,
Iterator,
List,
NamedTuple,
Optional,
Sequence,
Set,
Tuple,
Union,
)
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.nn.functional as F
from torch import Tensor
from ._fsdp_extensions import _ext_post_unflatten_transform, _ext_pre_flatten_transform
from ._utils import _alloc_storage, _free_storage, _set_fsdp_flattened, p_assert
__all__ = [
"FlatParameter",
"FlatParamHandle",
"FlatParamShardMetadata",
"ParamInfo",
"SharedParamInfo",
"HandleConfig",
"HandleShardingStrategy",
"HandleTrainingState",
]
class ParamInfo(NamedTuple):
"""Information for an original module parameter."""
param_name: str # unprefixed
module: nn.Module
module_name: str
class SharedParamInfo(NamedTuple):
"""
Additional information for a shared parameter.
For each shared parameter, we designate one module and its parameter
variable to be the primary owner, determined as the first one encountered
in the parameter walk. These are prefixed with "prim". The primary module
and parameter do not have their own :class:`SharedParamInfo` instance.
"""
param_name: str # unprefixed
module: nn.Module
module_name: str
prim_param_name: str # unprefixed
prim_module: nn.Module
prim_module_name: str
class FlatParamShardMetadata(NamedTuple):
"""
This holds metadata specific to this rank's shard of the flattened
parameter.
Attributes:
param_names (Tuple[str, ...]): Prefixed parameter names of this rank's
shard of the parameters; see :class:`FlatParameter`.
param_shapes (Tuple[torch.Size, ...]): Parameter shapes of this rank's
shard of the parameters; see :class:`FlatParameter`.
param_numels (Tuple[int, ...]): Parameter numels of this rank's shard
of the parameters; see :class:`FlatParameter`.
param_offsets (Tuple[Tuple[int, int], ...]): [start, end] offsets (in
units of numels) giving this rank's part of each flattened
original module parameter.
"""
param_names: Tuple[str, ...]
param_shapes: Tuple[torch.Size, ...]
param_numels: Tuple[int, ...]
param_offsets: Tuple[Tuple[int, int], ...]
# TODO (awgu): Prefix these with "Handle" for now to avoid circular imports and
# inadvertent misuses; coalesce with those in fully_sharded_data_parallel.py
# later
class HandleShardingStrategy(Enum):
FULL_SHARD = auto()
SHARD_GRAD_OP = auto()
NO_SHARD = auto()
class HandleTrainingState(Enum):
IDLE = auto()
FORWARD = auto()
BACKWARD_PRE = auto()
BACKWARD_POST = auto()
SUMMON_FULL_PARAMS = auto()
@dataclass
class HandleConfig:
sharding_strategy: HandleShardingStrategy
offload_params: bool
param_dtype: Optional[torch.dtype]
reduce_dtype: Optional[torch.dtype]
keep_low_precision_grads: Optional[bool] = False
class FlatParameter(nn.Parameter):
"""
This is the flattened parameter used by :class:`FullyShardedDataParallel`.
It is comprised of one or more original parameters, which are flattened
and concatenated to construct the flattened parameter.
Under the current design, this parameter logically represents both the
unsharded and sharded flattened parameter, and its data changes storages
dynamically.
- In the :class:`FullyShardedDataParallel` constructor, the parameter
is initialized as unsharded and then sharded in-place.
- At runtime, the parameter is lazily (re)-initialized. The sharded
parameter data is saved in ``self._local_shard``, and a new ``Tensor``
``self._full_param_padded`` is created, which is the all-gather
destination and owns the unsharded parameter storage thereafter. (See
:meth:`FullyShardedDataParallel._init_param_attributes`.)
- Throughout runtime, the parameter data changes storages as needed,
e.g. to the sharded flattened parameter, reduced-precision sharded
flattened parameter, or the unsharded flattened parameter.
Attributes:
_unpadded_unsharded_size (torch.Size): Unsharded flattened parameter's
size without padding.
_padded_unsharded_size (torch.Size): Unsharded flattened parameter's
size with padding. This is only set for sharded strategies since
they require padding for the all-gather.
_param_infos (Tuple[ParamInfo, ...]): Each parameter's parameter info
entry; see :class:`ParamInfo`.
_numels (Tuple[int, ...]): Each parameter's numel.
_shapes (Tuple[torch.Size, ...]): Each parameter's shape.
_prefixed_param_names (Tuple[str, ...]): Each parameter's name prefixed
with the parent module names starting from the module passed to
construct this flattened parameter via :class:`FlatParamHandle`;
the prefixed names are guaranteed to be unique within the subtree
rooted in that module.
_num_params (int): Number of original parameters flattened into this
flattened parameter; this is the length of ``_param_infos``,
``_numels``, ``_shapes``, and ``_prefixed_param_names``.
_shared_param_infos (Tuple[SharedParamInfo, ...]): Shared parameter
info entries; see :class:`SharedParamInfo`.
_param_extensions (Tuple[Optional[Any], ...]): Parameter extensions
(i.e. some per-parameter state) used to customize pre-flatten and
post-unflatten behavior. This is experimental, and users should not
depend on its existence in the future.
_shard_param_offsets (List[Tuple[int, int])): [start, end] offsets (in
units of numel) giving this rank's part of each flattened original
module parameter; for any parameter ``p`` that is not sharded
across ranks, this will be [0, ``p.numel()``-1].
_shard_indices (Tuple[int, int]): [start, end] indices (in units of
parameters) for this rank's shard of the original model parameters,
where the parameters follow the order in which they were originally
flattened; this indexes appropriately into any data structure that
follows the flattening order (e.g. ``_param_infos``, ``_numels``,
etc.).
_shard_numel_padded (int): Numel padded for this rank's sharded
flattened parameter.
_local_shard (Tensor): Sharded flattened parameter with padding if
using a sharded strategy. If using ``NO_SHARD``, then this is the
unpadded unsharded flattened parameter, and there is no notion of a
sharded flattened parameter or padded unsharded flattened
parameter.
_full_param_padded (Tensor): Unsharded flattened parameter with
padding. This is not defined for ``NO_SHARD``. When using mixed
precision for parameters, this has the low precision.
_full_prec_full_param_padded (Tensor): Full precision unsharded
flattened parameter with padding. This is used for unsharding
outside of computation when using mixed precision for parameters.
This is never defined for ``NO_SHARD``.
_post_backward_hook_state (Tuple[AccumulateGrad, RemovableHandle]):
Flattened parameter's :class:`AccumulateGrad` object and
post-backward hook handle.
_mp_shard (Tensor): Low precision sharded flattened parameter with
padding. This is only defined when parameter mixed precision is
enabled. For ``NO_SHARD``, this is used for computation.
_cpu_grad (Tensor): Sharded gradient with padding stored on CPU.
This is only defined when offloading parameters is enabled.
_saved_grad_shard (Tensor): Sharded gradient with padding from previous
iterations for gradient accumulation without :meth:`no_sync`.
"""
def _init_metadata(
self,
param_infos: List[ParamInfo],
numels: List[int],
shapes: List[torch.Size],
prefixed_param_names: List[str],
shared_param_infos: List[SharedParamInfo],
param_extensions: List[Any],
) -> None:
"""
Initializes attributes holding metadata about the original parameters
comprising the flattened parameter.
We expose this method separate from the constructor to keep the
constructor only responsible for the flattened parameter's tensor data.
This method should only be called once per model, while the constructor
may be called multiple times, e.g. when reloading from a checkpoint, in
which case only the tensor data needs to be passed to the constructor.
Since :meth:`load_state_dict` is implemented via :meth:`copy_`, the
metadata is correctly assumed to be unchanged.
Args:
See the Attributes in the class docstring.
"""
assert len(param_infos) == len(numels)
assert len(param_infos) == len(shapes)
assert len(param_infos) == len(prefixed_param_names)
assert len(param_infos) == len(param_extensions)
self._num_params = len(param_infos)
self._param_infos = tuple(param_infos)
self._numels = tuple(numels)
self._shapes = tuple(shapes)
self._prefixed_param_names = tuple(prefixed_param_names)
self._shared_param_infos = tuple(shared_param_infos)
self._param_extensions = tuple(param_extensions)
self._unpadded_unsharded_size = self.size()
_set_fsdp_flattened(self)
class FlatParamHandle:
"""
This handle manages a flattened parameter (:class:`FlatParameter`). This
includes sharding and view management.
Args:
params (Sequence[nn.Parameter]): The parameters to use for the
flattened parameter.
module (nn.Module): A module that is the root of the subtree containing
all parameters in ``params``; for non-recursive wrapping, this must
be the top-level module, while for recursive wrapping, this may not
necessarily be the top-level module.
device (torch.device): The compute and communication device, which
should be a non-CPU device. We refer to it as the compute device.
config (HandleConfig): A config customizing the handle based on FSDP's
available features.
"""
##################
# INITIALIZATION #
##################
def __init__(
self,
params: Sequence[nn.Parameter],
module: nn.Module,
device: torch.device,
config: HandleConfig,
) -> None:
super().__init__()
self.device = device
self._config = config
self._training_state = HandleTrainingState.IDLE
self._init_flat_param(params, module)
self._unflatten(as_params=False)
def _init_flat_param(
self,
params: Sequence[Optional[nn.Parameter]],
module: nn.Module,
) -> None:
"""
Initializes the flattened parameter ``self.flat_param`` by flattening
the parameters in ``params`` into a single :class:`FlatParameter` and
saves relevant metadata. Shared parameters are only included in the
flattened parameter once.
This checks that all comprising parameters have the same dtype and
``requires_grad`` and does not support nested construction of
:class:`FlatParameter` s.
Args:
See the Args in the class docstring.
"""
params_set = set(params)
params_set.discard(None)
assert (
len(params_set) > 0
), "Cannot initialize a `FlatParameter` from an empty parameter list"
param_infos: List[ParamInfo] = []
numels: List[int] = []
shapes: List[torch.Size] = []
prefixed_param_names: List[str] = []
shared_param_infos: List[SharedParamInfo] = []
shared_param_memo: Dict[nn.Parameter, Tuple[nn.Module, str, str]] = {}
params_to_flatten: List[nn.Parameter] = []
param_extensions: List[Any] = []
dtype: Optional[torch.dtype] = None
requires_grad: Optional[bool] = None
for submodule_name, submodule in module.named_modules():
for param_name, param in submodule.named_parameters(recurse=False):
if param not in params_set:
continue
if param in shared_param_memo:
prim_module, prim_module_name, prim_param_name = shared_param_memo[
param
]
shared_param_infos.append(
SharedParamInfo(
param_name,
submodule,
submodule_name,
prim_param_name,
prim_module,
prim_module_name,
)
)
else:
if type(param) is FlatParameter:
raise ValueError("`FlatParameter` does not support nesting")
if dtype is not None and param.dtype != dtype:
raise ValueError(
"`FlatParameter` requires uniform dtype but got "
f"{dtype} and {param.dtype}"
)
if dtype is None and not param.is_floating_point():
raise ValueError("Integer parameters are unsupported")
if (
requires_grad is not None
and param.requires_grad != requires_grad
):
raise ValueError(
"`FlatParameter` requires uniform `requires_grad`"
)
param, extension = _ext_pre_flatten_transform(param)
param_extensions.append(extension)
dtype = param.dtype
requires_grad = param.requires_grad
shared_param_memo[param] = (submodule, submodule_name, param_name)
params_to_flatten.append(param)
param_infos.append(ParamInfo(param_name, submodule, submodule_name))
numels.append(param.numel())
shapes.append(param.shape)
prefixed_param_name = (
submodule_name + "." + param_name
if submodule_name
else param_name
)
prefixed_param_names.append(prefixed_param_name)
assert requires_grad is not None
self.flat_param = FlatParamHandle.flatten_params(
params_to_flatten, requires_grad
)
self.flat_param._init_metadata(
param_infos,
numels,
shapes,
prefixed_param_names,
shared_param_infos,
param_extensions,
)
@staticmethod
def flatten_params(
params: Sequence[torch.Tensor],
requires_grad: bool,
) -> FlatParameter:
"""
Flattens the parameters in ``params`` into a single
:class:`FlatParameter`. This should be the only way used to construct
:class:`FlatParameter` s.
We expose this factory method for checkpointing (e.g. sharded state
dict). The flattened parameter's metadata should only be initialized
once (see :meth:`_init_metadata`), but its tensor data may be reloaded.
"""
with torch.no_grad():
flat_params = [
p.detach().reshape(-1) if isinstance(p, nn.Parameter) else p.reshape(-1)
for p in params
]
flat_param_data = torch.cat(flat_params, dim=0)
flat_param = FlatParameter(flat_param_data, requires_grad=requires_grad)
return flat_param
###################################
# SHARD INITIALIZATION & METADATA #
###################################
@torch.no_grad()
def shard(self, process_group: dist.ProcessGroup):
"""
Shards the handle's ``FlatParameter``. In terms of memory, this
allocates new memory for the sharded flattened parameter and frees the
unsharded flattened parameter's storage.
Postcondition: ``self.flat_param`` is the sharded flattened parameter.
``process_group``, ``rank``, and ``world_size`` attributes are set.
TODO (awgu): Once we retire ``FlattenParamsWrapper``, we should pass
the process group directly to the ``FlatParamHandle`` constructor. For
now, we decouple ``FlattenParamsWrapper` from a process group, but this
makes the process-group-related attributes not necessarily defined.
"""
if not self.uses_sharded_strategy:
return
flat_param = self.flat_param
self.process_group = process_group
self.rank = process_group.rank()
self.world_size = process_group.size()
assert (
flat_param.storage_offset() == 0
), "The `FlatParameter` is not the sole occupant of its storage"
orig_storage = flat_param.storage()
local_shard, numel_padded = FlatParamHandle._get_shard(
flat_param, self.rank, self.world_size
)
flat_param.set_(local_shard) # type: ignore[call-overload]
self._init_shard_metadata(local_shard.numel(), numel_padded, self.rank)
if orig_storage.size() > 0:
orig_storage.resize_(0)
def _init_shard_metadata(
self,
sharded_flat_param_numel: int,
numel_padded: int,
rank: int,
) -> None:
"""
Initializes shard-related metadata for this rank's shard of the
flattened parameter: ``_shard_param_offsets``, ``_shard_indices``, and
``_shard_numel_padded``.
Args:
sharded_flat_param_numel (int): Numel of each rank's sharded
flattened parameter with padding (i.e. including
``numel_padded``).
numel_padded (int): Numel padded for this rank's sharded flattened
parameter.
rank (int): Caller's rank.
"""
if numel_padded > sharded_flat_param_numel:
raise ValueError(
f"Sharded flattened parameter with {sharded_flat_param_numel} "
f"numel cannot have {numel_padded} numel padded"
)
start = sharded_flat_param_numel * rank
end = sharded_flat_param_numel * (rank + 1) - 1 # inclusive
(
self.flat_param._shard_param_offsets, # type: ignore[attr-defined]
self.flat_param._shard_indices, # type: ignore[attr-defined]
) = self._get_shard_metadata(start, end)
self.flat_param._shard_numel_padded = numel_padded # type: ignore[attr-defined]
def _get_shard_metadata(
self,
start: int,
end: int,
) -> Tuple[Tuple[Tuple[int, int], ...], Tuple[int, int]]:
"""
Computes the shard metadata based on ``start`` and ``end``, which give
the closed interval of the unsharded flattened parameter specifying the
shard.
Args:
start (int): Start index (in units of numel) of this rank's shard
of the flattened parameter.
end (int): End index (in units of numel and inclusive) of this
rank's shard of the flattened parameter.
Return:
Tuple[Tuple[Tuple[int, int], ...], Tuple[int, int]]: See
``_shard_param_offsets`` and ``_shard_indices`` in
:class:`FlatParameter` 's docstring.
"""
flat_param_offsets = self._get_flat_param_offsets()
# Indices of the original parameters in this rank's sharded flattened
# parameter
shard_param_indices_range = [] # elements will be consecutive
# [start, end] offsets giving this rank's part of the flattened
# original module parameter (which will be [0, `p.numel()`-1] for any
# parameter that is not sharded across ranks)
shard_param_offsets = []
for i, (param_start, param_end) in enumerate(flat_param_offsets):
if start > param_end or end < param_start:
continue
if start <= param_start:
intra_param_start = 0
else:
intra_param_start = start - param_start
intra_param_end = min(param_end, end) - param_start
shard_param_indices_range.append(i)
shard_param_offsets.append(
(intra_param_start, intra_param_end)
) # both inclusive
if len(shard_param_indices_range) == 0:
shard_param_indices = (0, 0)
assert len(shard_param_offsets) == 0
else:
shard_param_indices = (
shard_param_indices_range[0],
shard_param_indices_range[-1],
)
assert (
len(shard_param_offsets)
== shard_param_indices[-1] - shard_param_indices[0] + 1
)
return tuple(shard_param_offsets), shard_param_indices
@staticmethod
def _get_unpadded_shard(
tensor: Tensor,
rank: int,
world_size: int,
) -> Tuple[Tensor, int]:
"""
Returns the shard of ``tensor`` without any padding for the given
``rank`` and ``world_size`` and the numel to pad for that shard.
If ``tensor`` is already flattened or may be viewed in the flattened
shape (which is true in the expected usage), then this method does not
allocate any new tensor memory.
"""
chunks = torch.flatten(tensor).chunk(world_size)
if len(chunks) < (rank + 1):
# This rank gets an empty chunk fully padded with zeros since there
# are not enough chunks across ranks
chunk = chunks[0].new_empty(0)
else:
chunk = chunks[rank]
numel_to_pad = chunks[0].numel() - chunk.numel()
assert (
numel_to_pad >= 0
), "Chunk's size should be at most the first chunk's size"
return chunk, numel_to_pad
@staticmethod
def _get_shard(
tensor: Tensor,
rank: int,
world_size: int,
) -> Tuple[Tensor, int]:
"""
Returns the shard of ``tensor`` with padding for the given ``rank`` and
``world_size`` and the numel padded for that shard.
This method allocates new memory (via :meth:`clone`) since the
unsharded ``tensor`` may be deallocated after this method returns.
"""
chunk, numel_to_pad = FlatParamHandle._get_unpadded_shard(
tensor, rank, world_size
)
shard = chunk.clone()
if numel_to_pad > 0:
shard = F.pad(shard, [0, numel_to_pad])
return shard, numel_to_pad
@staticmethod
def _get_sharded_size(tensor: Tensor, rank: int, world_size: int) -> torch.Size:
"""
Returns the shape of ``tensor`` after sharding including padding. This
requires ``tensor`` to have 1D shape and ensures that the returned
shape is 1D.
"""
assert len(tensor.shape) == 1, f"{tensor.shape}"
unpadded_sharded_tensor, numel_to_pad = FlatParamHandle._get_unpadded_shard(
tensor, rank, world_size
)
unpadded_sharded_size = unpadded_sharded_tensor.size()
assert len(unpadded_sharded_size) == 1, f"{unpadded_sharded_size}"
return torch.Size([unpadded_sharded_size[0] + numel_to_pad])
def _get_flat_param_offsets(self) -> List[Tuple[int, int]]:
"""Returns [start, end] offsets of each original parameter's flattened
data in the unsharded flattened parameter (without padding)."""
cumulative_sum = list(accumulate(self.flat_param._numels))
starts = [0] + cumulative_sum[:-1]
ends = [end - 1 for end in cumulative_sum] # inclusive
param_offsets = list(zip(starts, ends))
return param_offsets
def shard_metadata(
self,
) -> FlatParamShardMetadata:
"""Returns shard-related metadata specific to this rank's shard of the
flattened parameter."""
assert hasattr(self.flat_param, "_shard_indices") and hasattr(
self.flat_param, "_shard_param_offsets"
), "Shard metadata has not been initialized"
shard_param_start_index = self.flat_param._shard_indices[0] # type: ignore[attr-defined]
shard_param_end_index = self.flat_param._shard_indices[1] # type: ignore[attr-defined]
sl = (
slice(shard_param_start_index, shard_param_end_index + 1)
if shard_param_start_index <= shard_param_end_index
else slice(0, 0)
)
return FlatParamShardMetadata(
self.flat_param._prefixed_param_names[sl],
self.flat_param._shapes[sl],
self.flat_param._numels[sl],
self.flat_param._shard_param_offsets[:], # type: ignore[attr-defined]
)
###################
# UNSHARD/RESHARD #
###################
def pre_unshard(self) -> bool:
"""
Returns: ``False`` if this is a no-op and ``True`` otherwise.
Postcondition: ``self.flat_param`` 's data is on the device for
communication and is what should be all-gathered. This means that it
matches the dtype of the expected unsharded parameter.
"""
ret = False
if (
self.uses_sharded_strategy
and not self._config.offload_params
and not self.needs_unshard()
):
pass # no-op
elif self._uses_param_mixed_precision and not self._force_full_precision:
self._use_low_precision_shard()
ret = True
elif self._config.offload_params and self.flat_param.device != self.device:
# NOTE: This creates a new tensor distinct from any attributes.
self._flat_param_to(self.device, non_blocking=True)
ret = True
self._check_on_compute_device(self.flat_param)
return ret
def _use_low_precision_shard(self):
"""
Allocates the low precision shard directly on the compute device and
switches to using the low precision sharded flattened parameter.
"""
self._check_low_precision_shard()
flat_param = self.flat_param
_alloc_storage(
flat_param._mp_shard, flat_param._local_shard.size() # type: ignore[attr-defined]
)
# `copy_()` implicitly casts to the low precision
flat_param._mp_shard.copy_( # type: ignore[attr-defined]
flat_param._local_shard.to( # type: ignore[attr-defined]
self.device, non_blocking=True
)
)
# Invariant: `_mp_shard` is always on the compute device.
flat_param.data = flat_param._mp_shard # type: ignore[attr-defined]
def unshard(self):
"""
Runs the unshard logic. This includes all-gathering the flattened
parameter and switching to using the unsharded flattened parameter. If
the handle does not need unsharding, then this only switches to using
the unsharded flattened parameter. For ``NO_SHARD``, this is a no-op.
If FSDP is in :meth:`summon_full_params` and the handle uses parameter
mixed precision, then the parameter is forced to full precision.
"""
if not self.needs_unshard():
if self.uses_sharded_strategy:
# The handle may have been resharded without freeing the padded
# unsharded flattened parameter, in which case we need to
# switch to using the unsharded parameter
unsharded_flat_param = self._get_padded_unsharded_flat_param()
self._use_unsharded_flat_param(unsharded_flat_param)
return
unsharded_flat_param = self._alloc_padded_unsharded_flat_param()
self._all_gather_flat_param(unsharded_flat_param)
def needs_unshard(self) -> bool:
"""Returns if the handle's flattened parameter needs to be unsharded."""
if not self.uses_sharded_strategy:
return False
unsharded_flat_param = self._get_padded_unsharded_flat_param()
already_unsharded = (
unsharded_flat_param.storage().size() == unsharded_flat_param.numel()
)
return not already_unsharded
def _alloc_padded_unsharded_flat_param(self):
"""
Allocates the *padded* unsharded flattened parameter. The unpadded
unsharded flattened parameter is always a view into the padded one.
This padded parameter is saved to a different attribute on the
``FlatParameter`` depending on if we force full precision.
"""
self._check_sharded_strategy()
flat_param = self.flat_param
unsharded_flat_param = self._get_padded_unsharded_flat_param()
self._check_storage_freed(unsharded_flat_param)
_alloc_storage(unsharded_flat_param, flat_param._padded_unsharded_size) # type: ignore[attr-defined]
return unsharded_flat_param
def _get_padded_unsharded_flat_param(self) -> torch.Tensor:
"""
Returns a reference to the padded unsharded flattened parameter
depending on the calling context. This should only be called if using a
sharded strategy.
"""
self._check_sharded_strategy()
flat_param = self.flat_param
if self._force_full_precision:
# When parameter mixed precision is enabled, we use a different
# tensor as the all-gather destination to preserve the invariant
# that `_full_param_padded` is in the low precision
unsharded_flat_param = flat_param._full_prec_full_param_padded # type: ignore[attr-defined]
p_assert(
unsharded_flat_param.dtype != self._config.param_dtype,
f"Expects full precision but got {self._config.param_dtype}",
)
else:
unsharded_flat_param = flat_param._full_param_padded # type: ignore[attr-defined]
return unsharded_flat_param
def _all_gather_flat_param(
self,
padded_unsharded_flat_param: Tensor,
) -> None:
"""
All-gathers the handle's flattened parameter to the destination
``padded_unsharded_flat_param``, and switches to using the all-gathered
tensor.
"""
p_assert(
hasattr(self, "process_group") and hasattr(self, "world_size"),
"Expects a process group and world size to have been set via `shard()`",
)
sharded_flat_param = self.flat_param.data
expected_numel = sharded_flat_param.numel() * self.world_size
p_assert(
padded_unsharded_flat_param.numel() == expected_numel,
f"Expects {expected_numel} numel but got {padded_unsharded_flat_param.numel()}",
)
dist._all_gather_base(
padded_unsharded_flat_param,
sharded_flat_param,
self.process_group,
)
self._use_unsharded_flat_param(padded_unsharded_flat_param)
def _use_unsharded_flat_param(
self,
padded_unsharded_flat_param: torch.Tensor,
) -> None:
"""
Switches to using the *unpadded* unsharded flattened parameter, which
is a view into the *padded* unsharded flattened parameter.
"""
unsharded_size = self.flat_param._unpadded_unsharded_size
self.flat_param.data = padded_unsharded_flat_param[
: unsharded_size.numel()
].view(unsharded_size)
def post_unshard(self):
"""
Runs the post-unshard logic. This includes freeing the low precision
shard if needed.
"""
if self._uses_param_mixed_precision and self.uses_sharded_strategy:
self._free_low_precision_sharded_param()
self._check_on_compute_device(self.flat_param)
def _free_low_precision_sharded_param(self):
"""Frees the low precision sharded flattened parameter."""
self._check_low_precision_shard()
_free_storage(self.flat_param._mp_shard) # type: ignore[attr-defined]
def prepare_gradient(self):
"""
Prepares the gradient for the backward computation by saving and
clearing any existing sharded gradient in ``.grad`` to enable computing
a new unsharded gradient.
"""
p_assert(
self._training_state
in (HandleTrainingState.BACKWARD_PRE, HandleTrainingState.IDLE),
"Expects to be in `BACKWARD_PRE` or `IDLE` (if prefetching)",
)
flat_param = self.flat_param
if flat_param.grad is not None and (
flat_param.grad.size() != flat_param._unpadded_unsharded_size
or flat_param.grad.device != flat_param.device # grad on CPU
):
self._check_on_compute_device(self.flat_param)
grad_offloaded = flat_param.grad.device != self.device
p_assert(
not grad_offloaded or self._config.offload_params,
f"Expects the sharded gradient to be on {self.device} "
f"but got {flat_param.grad.device}",
)
prev_iter_synced_gradients = (
flat_param.grad.size()
== flat_param._local_shard.size() # type: ignore[attr-defined]
)
if prev_iter_synced_gradients:
# TODO (awgu): Gradient accumulation outside `no_sync()`
# does not work with CPU offloading. The issue should be
# that, in the post-backward hook, we cannot do an addition
# between a CPU tensor (the existing sharded gradient) and
# a GPU tensor (the new sharded gradient).
if not grad_offloaded:
flat_param._saved_grad_shard = flat_param.grad.data # type: ignore[attr-defined]
# If we're using mixed precision with keeping grads
# casted, gradient here might still be of the reduced
# dtype if we didn't clear / set the gradients to None
# after previous backward. In that case, make sure
# p._saved_grad_shard is cast to the full precision type
# so that we can accumulate in full precision in
# _post_backward_hook and assign back in full precision
# in _wait_for_post_backward.
if (
self._config.keep_low_precision_grads
and flat_param._saved_grad_shard.dtype # type: ignore[attr-defined]
!= flat_param._local_shard.dtype # type: ignore[attr-defined]
):
flat_param._saved_grad_shard = flat_param._saved_grad_shard.to( # type: ignore[attr-defined]
flat_param._local_shard.dtype # type: ignore[attr-defined]
)
else:
padded_unsharded_size = flat_param._padded_unsharded_size # type: ignore[attr-defined]
p_assert(
flat_param.grad.size() == padded_unsharded_size,
"Expects `.grad` to be the unsharded gradient in "
f"`no_sync()` with size {padded_unsharded_size} "
f"but got size {flat_param.grad.size()}",
)
flat_param.grad = None
@contextlib.contextmanager
def to_cpu(self):
"""
Moves the unpadded unsharded flattened parameter to CPU while in the
context and moves it back to the previous device upon exit. For now,
this assumes the ``FlatParameter`` is the unpadded unsharded flattened
parameter since (1) there is no reason to include the padding in the
copy and (2) there is no use case for the sharded flattened parameter.
Precondition: ``self.flat_param`` 's data is the unpadded unsharded
flattened parameter on the compute device, and the handle uses a
sharded strategy.
Postcondition: Same as the precondition.
"""
self._check_sharded_strategy()
p_assert(
self.flat_param.size() == self.flat_param._unpadded_unsharded_size,
f"Expects size {self.flat_param._unpadded_unsharded_size} but got {self.flat_param.size()}",
)
self._check_on_compute_device(self.flat_param)
# Check that the unpadded unsharded flattened parameter is a view into
# the padded unsharded flattened parameter as expected
# NOTE: This check is not strictly needed for correctness but is a
# useful sanity check since the tensor should only be used internally.
unpadded_storage_ptr = self.flat_param.storage().data_ptr()
padded_storage_ptr = (
self._get_padded_unsharded_flat_param().storage().data_ptr()
)
p_assert(
unpadded_storage_ptr == padded_storage_ptr,
"Expects the unpadded parameter to be a view into the padded parameter",
)
self._flat_param_to(torch.device("cpu"))
self._free_unsharded_flat_param()
try:
yield
finally:
p_assert(
self.flat_param.size() == self.flat_param._unpadded_unsharded_size,
f"Expects size {self.flat_param._unpadded_unsharded_size} but got {self.flat_param.size()}",
)
padded_unsharded_flat_param = self._alloc_padded_unsharded_flat_param()
# Copy from CPU to the compute device
padded_unsharded_flat_param[: self.flat_param.numel()].copy_(
self.flat_param
)
self._use_unsharded_flat_param(padded_unsharded_flat_param)
def reshard(self, free_unsharded_flat_param: bool):
"""
Runs the reshard logic. This includes freeing the unsharded flattened
parameter if ``free_unsharded_flat_param`` and switching to using the
sharded flattened parameter.
"""
if free_unsharded_flat_param:
self._free_unsharded_flat_param()
self._use_sharded_flat_param()
def post_reshard(self):
"""
Runs the post-reshard logic. This includes freeing any memory that
can now be freed given that the ``FlatParameter`` points to the full
precision sharded flattened parameter.
Precondition: ``self.flat_param`` 's data points to the full precision
sharded flattened parameter.
"""
# For `NO_SHARD`, `_mp_shard` is not freed in the post-unshard since
# it is also the low precision *unsharded* flattened parameter. Hence,
# we delay the free until the reshard.
if (
self._uses_param_mixed_precision
and not self.uses_sharded_strategy
and not self._force_full_precision # did not use the low precision shard
):
self._free_low_precision_sharded_param()
def _free_unsharded_flat_param(self):
"""
Frees the padded unsharded flattened parameter. The tensor to free
depends on the calling context since the unshard may have forced full
precision, in which case a different tensor is used.
"""
self._check_sharded_strategy()
unsharded_flat_param = self._get_padded_unsharded_flat_param()
self._check_storage_allocated(unsharded_flat_param)
self._check_on_compute_device(unsharded_flat_param)
# Do not free the memory until all ops in the current stream finish
unsharded_flat_param.record_stream(
cast(torch._C.Stream, torch.cuda.current_stream())
)
_free_storage(unsharded_flat_param)
def _use_sharded_flat_param(self) -> None:
"""Switches to using the sharded flattened parameter."""
flat_param = self.flat_param
if self._config.offload_params:
device = flat_param._local_shard.device # type: ignore[attr-defined]
p_assert(
device == torch.device("cpu"),
f"Expects the local shard to be on CPU but got {device}",
)
flat_param.data = flat_param._local_shard # type: ignore[attr-defined]
#########
# VIEWS #
#########
@staticmethod
def _get_unflat_views(
flat_param: FlatParameter,
tensor: Optional[torch.Tensor] = None,
) -> Iterator[Tensor]:
"""
Returns unflattened ``Tensor`` views into ``tensor`` if it is not
``None`` or ``flat_param`` otherwise, where the unflattening is based
on ``flat_param`` 's metadata.
In other words, to get views into the unsharded flattened parameter,
pass ``tensor`` as ``None``, but to get views into tensor optimizer
state, pass ``tensor`` as the optimizer state tensor.
"""
if tensor is None:
tensor = flat_param
p_assert(
tensor.numel() == flat_param._unpadded_unsharded_size.numel(),
f"Expects {flat_param._unpadded_unsharded_size.numel()} numel but got "
f"{tensor.numel()} numel",
)
views = (
_ext_post_unflatten_transform(subtensor.view(shape), param_extension)
for (subtensor, shape, param_extension) in zip(
torch.split(tensor, flat_param._numels, dim=0), # type: ignore[arg-type]
flat_param._shapes, flat_param._param_extensions,
)
)
return views
def _unflatten(self, as_params: bool) -> None:
"""
Unflattens the unsharded flattened parameter by setting the original
module parameter variables to be views into it.
Args:
as_params (bool): If ``True``, then registers the original
parameters as ``nn.Parameter`` s; if ``False``, then registers
the original parameters only as ``Tensor`` s. ``False`` should
be used during forward/backward computation and when hiding the
original parameters from :meth:`nn.Module.named_parameters`.
"""
views = self._get_unflat_views(self.flat_param)
for view, (param_name, module, _) in zip(views, self.flat_param._param_infos):
if hasattr(module, param_name):
delattr(module, param_name)
if as_params:
module.register_parameter(param_name, nn.Parameter(view))
else:
setattr(module, param_name, view)
for (
param_name,
module,
_,
prim_param_name,
prim_module,
_,
) in self.flat_param._shared_param_infos:
if hasattr(module, param_name):
delattr(module, param_name)
assert hasattr(prim_module, prim_param_name)
param: Union[Tensor, nn.Parameter] = getattr(prim_module, prim_param_name)
if as_params:
assert isinstance(param, nn.Parameter)
module.register_parameter(param_name, param)
else:
setattr(module, param_name, param)
@contextlib.contextmanager
def unflatten_as_params(self) -> Generator:
"""
Assumes the flattened parameter is unsharded. When in the context,
unflattens the original parameters as ``nn.Parameter`` views into the
flattened parameter, and after the context, restores the original
parameters as ``Tensor`` views into the flattened parameter.
"""
self._unflatten(as_params=True)
try:
yield
finally:
self._unflatten(as_params=False)
###########
# HELPERS #
###########
def _flat_param_to(self, *args, **kwargs):
"""Wraps an in-place call to ``.to()`` for ``self.flat_param``."""
self.flat_param.data = self.flat_param.to(*args, **kwargs)
def _get_modules(self) -> Set[nn.Module]:
"""Returns a :class:`set` of the modules whose parameters are included
in this handle's flattened parameter."""
return set(pi.module for pi in self.flat_param._param_infos).union(
set(spi.module for spi in self.flat_param._shared_param_infos)
)
def parameter_module_names(self) -> Iterator[Tuple[str, str]]:
shared_param_infos = [
ParamInfo(param_name, module, module_name)
for (
param_name,
module,
module_name,
_,
_,
_,
) in self.flat_param._shared_param_infos
]
for param_name, _, module_name in chain(
self.flat_param._param_infos, shared_param_infos
):
yield (param_name, module_name)
#######################
# CHECKS & INVARIANTS #
#######################
def _check_sharded_strategy(self):
p_assert(self.uses_sharded_strategy, "Expects sharded strategy")
def _check_on_compute_device(self, tensor: Tensor):
p_assert(
tensor.device == self.device,
f"Expects tensor to be on the compute device {self.device}",
)
@staticmethod
def _check_storage_freed(tensor: Tensor):
storage_size: int = tensor.storage().size()
p_assert(
storage_size == 0,
f"Expects storage to be freed but got storage with size {storage_size}",
)
@staticmethod
def _check_storage_allocated(tensor: Tensor):
storage_size: int = tensor.storage().size()
p_assert(storage_size > 0, "Expects storage to be allocated")
def _check_low_precision_shard(self):
p_assert(
self._uses_param_mixed_precision,
"Not using low precision for parameters",
)
p_assert(
getattr(self.flat_param, "_mp_shard", None) is not None,
"Expects `_mp_shard` to exist",
)
device = self.flat_param._mp_shard.device # type: ignore[attr-defined]
p_assert(
device == self.device,
f"Expects the low precision shard to be on {self.device} but got {device}",
)
##############
# PROPERTIES #
##############
@property
def uses_sharded_strategy(self) -> bool:
return self._config.sharding_strategy != HandleShardingStrategy.NO_SHARD
@property
def _uses_param_mixed_precision(self) -> bool:
return self._config.param_dtype is not None
@property
def _force_full_precision(self) -> bool:
return (
self._training_state == HandleTrainingState.SUMMON_FULL_PARAMS
and self._uses_param_mixed_precision
)
|