| import os |
| import sys |
| from typing import Any, Dict, Optional, Union |
|
|
| import numpy as np |
| import paddle |
|
|
| from safetensors import numpy, deserialize, safe_open, serialize, serialize_file |
|
|
|
|
| def save( |
| tensors: Dict[str, paddle.Tensor], metadata: Optional[Dict[str, str]] = None |
| ) -> bytes: |
| """ |
| Saves a dictionary of tensors into raw bytes in safetensors format. |
| |
| Args: |
| tensors (`Dict[str, paddle.Tensor]`): |
| The incoming tensors. Tensors need to be contiguous and dense. |
| metadata (`Dict[str, str]`, *optional*, defaults to `None`): |
| Optional text only metadata you might want to save in your header. |
| For instance it can be useful to specify more about the underlying |
| tensors. This is purely informative and does not affect tensor loading. |
| |
| Returns: |
| `bytes`: The raw bytes representing the format |
| |
| Example: |
| |
| ```python |
| from safetensors.paddle import save |
| import paddle |
| |
| tensors = {"embedding": paddle.zeros((512, 1024)), "attention": paddle.zeros((256, 256))} |
| byte_data = save(tensors) |
| ``` |
| """ |
| serialized = serialize(_flatten(tensors), metadata=metadata) |
| result = bytes(serialized) |
| return result |
|
|
|
|
| def save_file( |
| tensors: Dict[str, paddle.Tensor], |
| filename: Union[str, os.PathLike], |
| metadata: Optional[Dict[str, str]] = None, |
| ) -> None: |
| """ |
| Saves a dictionary of tensors into raw bytes in safetensors format. |
| |
| Args: |
| tensors (`Dict[str, paddle.Tensor]`): |
| The incoming tensors. Tensors need to be contiguous and dense. |
| filename (`str`, or `os.PathLike`)): |
| The filename we're saving into. |
| metadata (`Dict[str, str]`, *optional*, defaults to `None`): |
| Optional text only metadata you might want to save in your header. |
| For instance it can be useful to specify more about the underlying |
| tensors. This is purely informative and does not affect tensor loading. |
| |
| Returns: |
| `None` |
| |
| Example: |
| |
| ```python |
| from safetensors.paddle import save_file |
| import paddle |
| |
| tensors = {"embedding": paddle.zeros((512, 1024)), "attention": paddle.zeros((256, 256))} |
| save_file(tensors, "model.safetensors") |
| ``` |
| """ |
| serialize_file(_flatten(tensors), filename, metadata=metadata) |
|
|
|
|
| def load(data: bytes, device: str = "cpu") -> Dict[str, paddle.Tensor]: |
| """ |
| Loads a safetensors file into paddle format from pure bytes. |
| |
| Args: |
| data (`bytes`): |
| The content of a safetensors file |
| |
| Returns: |
| `Dict[str, paddle.Tensor]`: dictionary that contains name as key, value as `paddle.Tensor` on cpu |
| |
| Example: |
| |
| ```python |
| from safetensors.paddle import load |
| |
| file_path = "./my_folder/bert.safetensors" |
| with open(file_path, "rb") as f: |
| data = f.read() |
| |
| loaded = load(data) |
| ``` |
| """ |
| if paddle.__version__ >= "3.2.0": |
| flat = deserialize(data) |
| return _view2paddle(flat, device) |
| else: |
| flat = numpy.load(data) |
| return _np2paddle(flat, device) |
|
|
|
|
| def load_file( |
| filename: Union[str, os.PathLike], device="cpu" |
| ) -> Dict[str, paddle.Tensor]: |
| """ |
| Loads a safetensors file into paddle format. |
| |
| Args: |
| filename (`str`, or `os.PathLike`)): |
| The name of the file which contains the tensors |
| device (`Union[Dict[str, any], str]`, *optional*, defaults to `cpu`): |
| The device where the tensors need to be located after load. |
| available options are all regular paddle device locations |
| |
| Returns: |
| `Dict[str, paddle.Tensor]`: dictionary that contains name as key, value as `paddle.Tensor` |
| |
| Example: |
| |
| ```python |
| from safetensors.paddle import load_file |
| |
| file_path = "./my_folder/bert.safetensors" |
| loaded = load_file(file_path) |
| ``` |
| """ |
| result = {} |
| if paddle.__version__ >= "3.2.0": |
| with safe_open(filename, framework="paddle", device=device) as f: |
| for k in f.offset_keys(): |
| result[k] = f.get_tensor(k) |
| else: |
| flat = numpy.load_file(filename) |
| result = _np2paddle(flat, device) |
| return result |
|
|
|
|
| def _np2paddle( |
| numpy_dict: Dict[str, np.ndarray], device: str = "cpu" |
| ) -> Dict[str, paddle.Tensor]: |
| for k, v in numpy_dict.items(): |
| numpy_dict[k] = paddle.to_tensor(v, place=device) |
| return numpy_dict |
|
|
|
|
| def _paddle2np(paddle_dict: Dict[str, paddle.Tensor]) -> Dict[str, np.array]: |
| for k, v in paddle_dict.items(): |
| paddle_dict[k] = v.detach().cpu().numpy() |
| return paddle_dict |
|
|
|
|
| _SIZE = { |
| paddle.int64: 8, |
| paddle.float32: 4, |
| paddle.int32: 4, |
| paddle.bfloat16: 2, |
| paddle.float16: 2, |
| paddle.int16: 2, |
| paddle.uint8: 1, |
| paddle.int8: 1, |
| paddle.bool: 1, |
| paddle.float64: 8, |
| paddle.float8_e4m3fn: 1, |
| paddle.float8_e5m2: 1, |
| paddle.complex64: 8, |
| |
| |
| |
| |
| |
| |
| } |
|
|
| _TYPES = { |
| "F64": paddle.float64, |
| "F32": paddle.float32, |
| "F16": paddle.float16, |
| "BF16": paddle.bfloat16, |
| "I64": paddle.int64, |
| "I32": paddle.int32, |
| "I16": paddle.int16, |
| "I8": paddle.int8, |
| "U8": paddle.uint8, |
| "BOOL": paddle.bool, |
| "F8_E4M3": paddle.float8_e4m3fn, |
| "F8_E5M2": paddle.float8_e5m2, |
| } |
|
|
| NPDTYPES = { |
| paddle.int64: np.int64, |
| paddle.float32: np.float32, |
| paddle.int32: np.int32, |
| |
| paddle.bfloat16: np.float16, |
| paddle.float16: np.float16, |
| paddle.int16: np.int16, |
| paddle.uint8: np.uint8, |
| paddle.int8: np.int8, |
| paddle.bool: bool, |
| paddle.float64: np.float64, |
| |
| paddle.float8_e4m3fn: np.uint8, |
| paddle.float8_e5m2: np.uint8, |
| } |
|
|
|
|
| def _getdtype(dtype_str: str) -> paddle.dtype: |
| return _TYPES[dtype_str] |
|
|
|
|
| def _view2paddle(safeview, device) -> Dict[str, paddle.Tensor]: |
| result = {} |
| for k, v in safeview: |
| dtype = _getdtype(v["dtype"]) |
| if len(v["data"]) == 0: |
| |
| assert any(x == 0 for x in v["shape"]) |
| arr = paddle.empty(v["shape"], dtype=dtype) |
| else: |
| arr = paddle.base.core.frombuffer(v["data"], dtype).reshape(v["shape"]) |
| if device != "cpu": |
| arr = arr.to(device) |
| if sys.byteorder == "big": |
| arr = paddle.to_tensor(arr.numpy().byteswap(inplace=False), place=device) |
| result[k] = arr |
|
|
| return result |
|
|
|
|
| def _tobytes(tensor: paddle.Tensor, name: str) -> bytes: |
| if not tensor.is_contiguous(): |
| raise ValueError( |
| f"You are trying to save a non contiguous tensor: `{name}` which is not allowed. It either means you" |
| " are trying to save tensors which are reference of each other in which case it's recommended to save" |
| " only the full tensors, and reslice at load time, or simply call `.contiguous()` on your tensor to" |
| " pack it before saving." |
| ) |
| if not tensor.place.is_cpu_place(): |
| |
| tensor = tensor.cpu() |
|
|
| import ctypes |
|
|
| import numpy as np |
|
|
| |
| |
| length = int(np.prod(tensor.shape).item()) |
| bytes_per_item = _SIZE[tensor.dtype] |
|
|
| total_bytes = length * bytes_per_item |
|
|
| ptr = tensor.data_ptr() |
| if ptr == 0: |
| return b"" |
| newptr = ctypes.cast(ptr, ctypes.POINTER(ctypes.c_ubyte)) |
| data = np.ctypeslib.as_array(newptr, (total_bytes,)) |
| if sys.byteorder == "big": |
| npdtype = NPDTYPES[tensor.dtype] |
| |
| data = data.view(npdtype).byteswap(inplace=False) |
| return data.tobytes() |
|
|
|
|
| def _flatten(tensors: Dict[str, paddle.Tensor]) -> Dict[str, Dict[str, Any]]: |
| if not isinstance(tensors, dict): |
| raise ValueError( |
| f"Expected a dict of [str, paddle.Tensor] but received {type(tensors)}" |
| ) |
|
|
| for k, v in tensors.items(): |
| if not isinstance(v, paddle.Tensor): |
| raise ValueError( |
| f"Key `{k}` is invalid, expected paddle.Tensor but received {type(v)}" |
| ) |
|
|
| return { |
| k: { |
| "dtype": str(v.dtype).split(".")[-1], |
| "shape": v.shape, |
| "data": _tobytes(v, k), |
| } |
| for k, v in tensors.items() |
| } |
|
|