File size: 5,133 Bytes
18b382b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# Copyright (c) 2025 Delanoe Pirard and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Zero-copy utilities for efficient tensor operations.

Provides utilities to minimize memory copies between NumPy and PyTorch,
especially for CPU→GPU transfers.
"""

from __future__ import annotations

import numpy as np
import torch


def numpy_to_torch_zerocopy(arr: np.ndarray, dtype: torch.dtype | None = None, device: str | torch.device = "cpu") -> torch.Tensor:
    """
    Convert NumPy array to PyTorch tensor with zero-copy when possible.

    Zero-copy is possible when:
    1. Array is C-contiguous
    2. Target device is CPU
    3. dtype is compatible

    For GPU transfers, this still saves one copy (CPU→pinned→GPU vs CPU→CPU→GPU).

    Args:
        arr: Input NumPy array
        dtype: Target PyTorch dtype (if None, infer from numpy dtype)
        device: Target device ('cpu', 'cuda', 'mps')

    Returns:
        PyTorch tensor on specified device

    Example:
        >>> arr = np.random.rand(1000, 1000)
        >>> tensor = numpy_to_torch_zerocopy(arr, device='cuda')
        >>> # No intermediate copy on CPU if arr is C-contiguous
    """
    # Check if zero-copy is possible
    is_contiguous = arr.flags['C_CONTIGUOUS']

    if not is_contiguous:
        # Need to make contiguous copy anyway
        arr = np.ascontiguousarray(arr)

    # Create tensor with zero-copy (shares memory on CPU)
    tensor = torch.from_numpy(arr)

    # Apply dtype conversion if needed
    if dtype is not None and tensor.dtype != dtype:
        tensor = tensor.to(dtype)

    # Move to target device
    if str(device) != "cpu":
        # Use non_blocking for async transfer
        tensor = tensor.to(device, non_blocking=True)

    return tensor


def ensure_pinned_memory(arr: np.ndarray) -> np.ndarray:
    """
    Ensure NumPy array uses pinned (page-locked) memory for faster GPU transfers.

    Pinned memory allows DMA (Direct Memory Access) for faster CPU→GPU transfers.
    Only beneficial for repeated transfers of the same data.

    Args:
        arr: Input NumPy array

    Returns:
        Array in pinned memory

    Note:
        Pinned memory is a limited resource. Only use for frequently transferred data.
        For CUDA devices only (no effect on MPS/CPU).
    """
    if not torch.cuda.is_available():
        return arr

    # Convert to torch tensor with pinned memory
    tensor = torch.from_numpy(arr).pin_memory()

    # Convert back to numpy (shares pinned memory)
    # Note: This creates a new numpy array view over pinned memory
    return tensor.numpy()


def stack_arrays_zerocopy(arrays: list[np.ndarray], dtype: np.dtype | None = None) -> np.ndarray:
    """
    Stack list of arrays with minimal copying.

    Args:
        arrays: List of NumPy arrays to stack
        dtype: Target dtype (if None, use arrays[0].dtype)

    Returns:
        Stacked array

    Note:
        If all arrays already have compatible dtype and layout,
        np.stack uses optimized C-level stacking.
    """
    if not arrays:
        raise ValueError("Cannot stack empty list")

    # Check if all arrays have compatible dtype
    if dtype is None:
        dtype = arrays[0].dtype

    # Ensure all arrays are C-contiguous with same dtype
    # This may create copies, but better done once than repeatedly
    arrays_contig = []
    for arr in arrays:
        if arr.dtype != dtype or not arr.flags['C_CONTIGUOUS']:
            arr = np.ascontiguousarray(arr, dtype=dtype)
        arrays_contig.append(arr)

    # Stack (single memory allocation + copy)
    return np.stack(arrays_contig, axis=0)


def batch_to_device(
    tensors: list[torch.Tensor] | tuple[torch.Tensor, ...],
    device: str | torch.device,
    non_blocking: bool = True
) -> list[torch.Tensor]:
    """
    Move multiple tensors to device with optimal settings.

    Args:
        tensors: List/tuple of tensors to move
        device: Target device
        non_blocking: Use async transfer (default: True)

    Returns:
        List of tensors on target device

    Example:
        >>> tensors = [torch.rand(100), torch.rand(200)]
        >>> gpu_tensors = batch_to_device(tensors, 'cuda')
    """
    return [t.to(device, non_blocking=non_blocking) if t is not None else None for t in tensors]


def get_optimal_pin_memory() -> bool:
    """
    Determine if pin_memory should be used for DataLoader.

    Returns:
        True if CUDA is available and pinned memory is beneficial

    Usage:
        >>> DataLoader(dataset, pin_memory=get_optimal_pin_memory())
    """
    return torch.cuda.is_available()