| """
|
| """
|
| import contextlib
|
| from contextvars import ContextVar
|
| from io import BytesIO
|
| from typing import Any
|
| from typing import cast
|
| from unittest.mock import patch
|
|
|
| import torch
|
| from torch._inductor.package.package import package_aoti
|
| from torch.export.pt2_archive._package import AOTICompiledModel
|
| from torch.export.pt2_archive._package_weights import Weights
|
|
|
|
|
| INDUCTOR_CONFIGS_OVERRIDES = {
|
| 'aot_inductor.package_constants_in_so': False,
|
| 'aot_inductor.package_constants_on_disk': True,
|
| 'aot_inductor.package': True,
|
| }
|
|
|
|
|
| class ZeroGPUWeights:
|
| def __init__(self, constants_map: dict[str, torch.Tensor], to_cuda: bool = False):
|
| if to_cuda:
|
| self.constants_map = {name: tensor.to('cuda') for name, tensor in constants_map.items()}
|
| else:
|
| self.constants_map = constants_map
|
|
|
| def __reduce__(self):
|
| constants_map: dict[str, torch.Tensor] = {}
|
| for name, tensor in self.constants_map.items():
|
| tensor_ = torch.empty_like(tensor, device='cpu').pin_memory()
|
| constants_map[name] = tensor_.copy_(tensor).detach().share_memory_()
|
| return ZeroGPUWeights, (constants_map, True)
|
|
|
|
|
| class ZeroGPUCompiledModel:
|
| def __init__(self, archive_file: torch.types.FileLike, weights: ZeroGPUWeights):
|
| self.archive_file = archive_file
|
| self.weights = weights
|
| self.compiled_model: ContextVar[AOTICompiledModel | None] = ContextVar('compiled_model', default=None)
|
|
|
| def __call__(self, *args, **kwargs):
|
| if (compiled_model := self.compiled_model.get()) is None:
|
| compiled_model = cast(AOTICompiledModel, torch._inductor.aoti_load_package(self.archive_file))
|
| compiled_model.load_constants(self.weights.constants_map, check_full_update=True, user_managed=True)
|
| self.compiled_model.set(compiled_model)
|
| return compiled_model(*args, **kwargs)
|
|
|
| def __reduce__(self):
|
| return ZeroGPUCompiledModel, (self.archive_file, self.weights)
|
|
|
|
|
| def to_serializable_dict(self) -> dict[str, Any]:
|
| """
|
| Stable representation that can be stored with torch.save and re-loaded later,
|
| without depending on runtime state.
|
| """
|
| if hasattr(self.archive_file, "getvalue"):
|
| archive_bytes = self.archive_file.getvalue()
|
| else:
|
| pos = self.archive_file.tell()
|
| self.archive_file.seek(0)
|
| archive_bytes = self.archive_file.read()
|
| self.archive_file.seek(pos)
|
|
|
| constants_cpu = {k: v.detach().to("cpu") for k, v in self.weights.constants_map.items()}
|
| return {
|
| "format": "zerogpu_aoti_v1",
|
| "archive_bytes": archive_bytes,
|
| "constants_map": constants_cpu,
|
| }
|
|
|
|
|
|
|
| class ZeroGPUCompiledModelFromDict:
|
| """
|
| Rebuilds a callable AOTI model from a dict payload produced by ZeroGPUCompiledModel.to_serializable_dict().
|
|
|
| Important: constants are moved to CUDA on first call to avoid:
|
| - dtype/device mismatches (CPU bf16 weights vs CUDA bf16 inputs),
|
| - black outputs due to wrong device handling.
|
| """
|
| def __init__(self, payload: dict[str, Any], device: str = "cuda"):
|
| if payload.get("format") != "zerogpu_aoti_v1":
|
| raise ValueError(f"Unsupported payload format: {payload.get('format')}")
|
| self.archive_file = BytesIO(payload["archive_bytes"])
|
| constants = payload["constants_map"]
|
| constants = {k: v.to(device=device, dtype=torch.bfloat16).contiguous() for k, v in constants.items()}
|
| payload["constants_map"] = constants
|
| self.constants_map_cpu: dict[str, torch.Tensor] = payload["constants_map"]
|
| self.device = device
|
| self.compiled_model: ContextVar[AOTICompiledModel | None] = ContextVar("compiled_model", default=None)
|
| self._loaded_constants = False
|
|
|
| def __call__(self, *args, **kwargs):
|
| if (compiled_model := self.compiled_model.get()) is None:
|
| compiled_model = cast(AOTICompiledModel, torch._inductor.aoti_load_package(self.archive_file))
|
| self.compiled_model.set(compiled_model)
|
|
|
| if not self._loaded_constants:
|
|
|
| constants_map = {k: v.to(device="cuda", dtype=torch.bfloat16).contiguous() for k, v in self.constants_map_cpu.items()}
|
| compiled_model.load_constants(constants_map, check_full_update=True, user_managed=True)
|
| self._loaded_constants = True
|
|
|
| return compiled_model(*args, **kwargs)
|
|
|
|
|
| def aoti_compile(
|
| exported_program: torch.export.ExportedProgram,
|
| inductor_configs: dict[str, Any] | None = None,
|
| ):
|
| inductor_configs = (inductor_configs or {}) | INDUCTOR_CONFIGS_OVERRIDES
|
| gm = cast(torch.fx.GraphModule, exported_program.module())
|
| assert exported_program.example_inputs is not None
|
| args, kwargs = exported_program.example_inputs
|
| artifacts = torch._inductor.aot_compile(gm, args, kwargs, options=inductor_configs)
|
| archive_file = BytesIO()
|
| files: list[str | Weights] = [file for file in artifacts if isinstance(file, str)]
|
| package_aoti(archive_file, files)
|
| weights, = (artifact for artifact in artifacts if isinstance(artifact, Weights))
|
| zerogpu_weights = ZeroGPUWeights({name: weights.get_weight(name)[0] for name in weights})
|
| return ZeroGPUCompiledModel(archive_file, zerogpu_weights)
|
|
|
|
|
| @contextlib.contextmanager
|
| def capture_component_call(
|
| pipeline: Any,
|
| component_name: str,
|
| component_method='forward',
|
| ):
|
|
|
| class CapturedCallException(Exception):
|
| def __init__(self, *args, **kwargs):
|
| super().__init__()
|
| self.args = args
|
| self.kwargs = kwargs
|
|
|
| class CapturedCall:
|
| def __init__(self):
|
| self.args: tuple[Any, ...] = ()
|
| self.kwargs: dict[str, Any] = {}
|
|
|
| component = getattr(pipeline, component_name)
|
| captured_call = CapturedCall()
|
|
|
| def capture_call(*args, **kwargs):
|
| raise CapturedCallException(*args, **kwargs)
|
|
|
| with patch.object(component, component_method, new=capture_call):
|
| try:
|
| yield captured_call
|
| except CapturedCallException as e:
|
| captured_call.args = e.args
|
| captured_call.kwargs = e.kwargs
|
|
|
|
|
| def drain_module_parameters(module: torch.nn.Module):
|
| state_dict_meta = {name: {'device': tensor.device, 'dtype': tensor.dtype} for name, tensor in module.state_dict().items()}
|
| state_dict = {name: torch.nn.Parameter(torch.empty_like(tensor, device='cpu')) for name, tensor in module.state_dict().items()}
|
| module.load_state_dict(state_dict, assign=True)
|
| for name, param in state_dict.items():
|
| meta = state_dict_meta[name]
|
| param.data = torch.Tensor([]).to(**meta) |