/****************************************************************************** * Copyright (c) 2011, Duane Merrill. All rights reserved. * Copyright (c) 2011-2022, NVIDIA CORPORATION. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * Neither the name of the NVIDIA CORPORATION nor the * names of its contributors may be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE FOR ANY * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * ******************************************************************************/ /** * @file cub::DeviceReduceByKey provides device-wide, parallel operations for * reducing segments of values residing within device-accessible memory. */ #pragma once #include #include #include #include #include #include #include #include #include #include #include #include CUB_NAMESPACE_BEGIN /****************************************************************************** * Kernel entry points *****************************************************************************/ /** * @brief Multi-block reduce-by-key sweep kernel entry point * * @tparam AgentReduceByKeyPolicyT * Parameterized AgentReduceByKeyPolicyT tuning policy type * * @tparam KeysInputIteratorT * Random-access input iterator type for keys * * @tparam UniqueOutputIteratorT * Random-access output iterator type for keys * * @tparam ValuesInputIteratorT * Random-access input iterator type for values * * @tparam AggregatesOutputIteratorT * Random-access output iterator type for values * * @tparam NumRunsOutputIteratorT * Output iterator type for recording number of segments encountered * * @tparam ScanTileStateT * Tile status interface type * * @tparam EqualityOpT * KeyT equality operator type * * @tparam ReductionOpT * ValueT reduction operator type * * @tparam OffsetT * Signed integer type for global offsets * * @param d_keys_in * Pointer to the input sequence of keys * * @param d_unique_out * Pointer to the output sequence of unique keys (one key per run) * * @param d_values_in * Pointer to the input sequence of corresponding values * * @param d_aggregates_out * Pointer to the output sequence of value aggregates (one aggregate per run) * * @param d_num_runs_out * Pointer to total number of runs encountered * (i.e., the length of d_unique_out) * * @param tile_state * Tile status interface * * @param start_tile * The starting tile for the current grid * * @param equality_op * KeyT equality operator * * @param reduction_op * ValueT reduction operator * * @param num_items * Total number of items to select from */ template __launch_bounds__(int(ChainedPolicyT::ActivePolicy::ReduceByKeyPolicyT::BLOCK_THREADS)) __global__ void DeviceReduceByKeyKernel(KeysInputIteratorT d_keys_in, UniqueOutputIteratorT d_unique_out, ValuesInputIteratorT d_values_in, AggregatesOutputIteratorT d_aggregates_out, NumRunsOutputIteratorT d_num_runs_out, ScanTileStateT tile_state, int start_tile, EqualityOpT equality_op, ReductionOpT reduction_op, OffsetT num_items) { using AgentReduceByKeyPolicyT = typename ChainedPolicyT::ActivePolicy::ReduceByKeyPolicyT; // Thread block type for reducing tiles of value segments using AgentReduceByKeyT = AgentReduceByKey; // Shared memory for AgentReduceByKey __shared__ typename AgentReduceByKeyT::TempStorage temp_storage; // Process tiles AgentReduceByKeyT(temp_storage, d_keys_in, d_unique_out, d_values_in, d_aggregates_out, d_num_runs_out, equality_op, reduction_op) .ConsumeRange(num_items, tile_state, start_tile); } namespace detail { template struct device_reduce_by_key_policy_hub { static constexpr int MAX_INPUT_BYTES = CUB_MAX(sizeof(KeyOutputT), sizeof(AccumT)); static constexpr int COMBINED_INPUT_BYTES = sizeof(KeyOutputT) + sizeof(AccumT); /// SM35 struct Policy350 : ChainedPolicy<350, Policy350, Policy350> { static constexpr int NOMINAL_4B_ITEMS_PER_THREAD = 6; static constexpr int ITEMS_PER_THREAD = (MAX_INPUT_BYTES <= 8) ? 6 : CUB_MIN(NOMINAL_4B_ITEMS_PER_THREAD, CUB_MAX(1, ((NOMINAL_4B_ITEMS_PER_THREAD * 8) + COMBINED_INPUT_BYTES - 1) / COMBINED_INPUT_BYTES)); using ReduceByKeyPolicyT = AgentReduceByKeyPolicy<128, ITEMS_PER_THREAD, BLOCK_LOAD_DIRECT, LOAD_LDG, BLOCK_SCAN_WARP_SCANS, detail::default_reduce_by_key_delay_constructor_t>; }; using MaxPolicy = Policy350; }; } /****************************************************************************** * Dispatch ******************************************************************************/ /** * @brief Utility class for dispatching the appropriately-tuned kernels for * DeviceReduceByKey * * @tparam KeysInputIteratorT * Random-access input iterator type for keys * * @tparam UniqueOutputIteratorT * Random-access output iterator type for keys * * @tparam ValuesInputIteratorT * Random-access input iterator type for values * * @tparam AggregatesOutputIteratorT * Random-access output iterator type for values * * @tparam NumRunsOutputIteratorT * Output iterator type for recording number of segments encountered * * @tparam EqualityOpT * KeyT equality operator type * * @tparam ReductionOpT * ValueT reduction operator type * * @tparam OffsetT * Signed integer type for global offsets * * @tparam SelectedPolicy * Implementation detail, do not specify directly, requirements on the * content of this type are subject to breaking change. */ template , cub::detail::value_t>, typename SelectedPolicy = // detail::device_reduce_by_key_policy_hub< // AccumT, // cub::detail::non_void_value_t< // UniqueOutputIteratorT, // cub::detail::value_t>>> struct DispatchReduceByKey { //------------------------------------------------------------------------- // Types and constants //------------------------------------------------------------------------- // The input values type using ValueInputT = cub::detail::value_t; static constexpr int INIT_KERNEL_THREADS = 128; // Tile status descriptor interface type using ScanTileStateT = ReduceByKeyScanTileState; void *d_temp_storage; size_t &temp_storage_bytes; KeysInputIteratorT d_keys_in; UniqueOutputIteratorT d_unique_out; ValuesInputIteratorT d_values_in; AggregatesOutputIteratorT d_aggregates_out; NumRunsOutputIteratorT d_num_runs_out; EqualityOpT equality_op; ReductionOpT reduction_op; OffsetT num_items; cudaStream_t stream; CUB_RUNTIME_FUNCTION __forceinline__ DispatchReduceByKey(void *d_temp_storage, size_t &temp_storage_bytes, KeysInputIteratorT d_keys_in, UniqueOutputIteratorT d_unique_out, ValuesInputIteratorT d_values_in, AggregatesOutputIteratorT d_aggregates_out, NumRunsOutputIteratorT d_num_runs_out, EqualityOpT equality_op, ReductionOpT reduction_op, OffsetT num_items, cudaStream_t stream) : d_temp_storage(d_temp_storage) , temp_storage_bytes(temp_storage_bytes) , d_keys_in(d_keys_in) , d_unique_out(d_unique_out) , d_values_in(d_values_in) , d_aggregates_out(d_aggregates_out) , d_num_runs_out(d_num_runs_out) , equality_op(equality_op) , reduction_op(reduction_op) , num_items(num_items) , stream(stream) {} //--------------------------------------------------------------------- // Dispatch entrypoints //--------------------------------------------------------------------- template CUB_RUNTIME_FUNCTION __forceinline__ cudaError_t Invoke(ScanInitKernelT init_kernel, ReduceByKeyKernelT reduce_by_key_kernel) { using AgentReduceByKeyPolicyT = typename ActivePolicyT::ReduceByKeyPolicyT; const int block_threads = AgentReduceByKeyPolicyT::BLOCK_THREADS; const int items_per_thread = AgentReduceByKeyPolicyT::ITEMS_PER_THREAD; cudaError error = cudaSuccess; do { // Get device ordinal int device_ordinal; if (CubDebug(error = cudaGetDevice(&device_ordinal))) { break; } // Number of input tiles int tile_size = block_threads * items_per_thread; int num_tiles = static_cast(cub::DivideAndRoundUp(num_items, tile_size)); // Specify temporary storage allocation requirements size_t allocation_sizes[1]; if (CubDebug(error = ScanTileStateT::AllocationSize(num_tiles, allocation_sizes[0]))) { break; // bytes needed for tile status descriptors } // Compute allocation pointers into the single storage blob (or compute // the necessary size of the blob) void *allocations[1] = {}; if (CubDebug( error = AliasTemporaries(d_temp_storage, temp_storage_bytes, allocations, allocation_sizes))) { break; } if (d_temp_storage == nullptr) { // Return if the caller is simply requesting the size of the storage // allocation break; } // Construct the tile status interface ScanTileStateT tile_state; if (CubDebug(error = tile_state.Init(num_tiles, allocations[0], allocation_sizes[0]))) { break; } // Log init_kernel configuration int init_grid_size = CUB_MAX(1, cub::DivideAndRoundUp(num_tiles, INIT_KERNEL_THREADS)); #ifdef CUB_DETAIL_DEBUG_ENABLE_LOG _CubLog("Invoking init_kernel<<<%d, %d, 0, %lld>>>()\n", init_grid_size, INIT_KERNEL_THREADS, (long long)stream); #endif // Invoke init_kernel to initialize tile descriptors THRUST_NS_QUALIFIER::cuda_cub::launcher::triple_chevron(init_grid_size, INIT_KERNEL_THREADS, 0, stream) .doit(init_kernel, tile_state, num_tiles, d_num_runs_out); // Check for failure to launch if (CubDebug(error = cudaPeekAtLastError())) { break; } // Sync the stream if specified to flush runtime errors error = detail::DebugSyncStream(stream); if (CubDebug(error)) { break; } // Return if empty problem if (num_items == 0) { break; } // Get SM occupancy for reduce_by_key_kernel int reduce_by_key_sm_occupancy; if (CubDebug(error = MaxSmOccupancy(reduce_by_key_sm_occupancy, reduce_by_key_kernel, block_threads))) { break; } // Get max x-dimension of grid int max_dim_x; if (CubDebug( error = cudaDeviceGetAttribute(&max_dim_x, cudaDevAttrMaxGridDimX, device_ordinal))) { break; } // Run grids in epochs (in case number of tiles exceeds max x-dimension int scan_grid_size = CUB_MIN(num_tiles, max_dim_x); for (int start_tile = 0; start_tile < num_tiles; start_tile += scan_grid_size) { // Log reduce_by_key_kernel configuration #ifdef CUB_DETAIL_DEBUG_ENABLE_LOG _CubLog("Invoking %d reduce_by_key_kernel<<<%d, %d, 0, %lld>>>(), %d " "items per thread, %d SM occupancy\n", start_tile, scan_grid_size, block_threads, (long long)stream, items_per_thread, reduce_by_key_sm_occupancy); #endif // Invoke reduce_by_key_kernel THRUST_NS_QUALIFIER::cuda_cub::launcher::triple_chevron(scan_grid_size, block_threads, 0, stream) .doit(reduce_by_key_kernel, d_keys_in, d_unique_out, d_values_in, d_aggregates_out, d_num_runs_out, tile_state, start_tile, equality_op, reduction_op, num_items); // Check for failure to launch if (CubDebug(error = cudaPeekAtLastError())) { break; } // Sync the stream if specified to flush runtime errors error = detail::DebugSyncStream(stream); if (CubDebug(error)) { break; } } } while (0); return error; } template CUB_RUNTIME_FUNCTION __forceinline__ cudaError_t Invoke() { using MaxPolicyT = typename SelectedPolicy::MaxPolicy; return Invoke(DeviceCompactInitKernel, DeviceReduceByKeyKernel); } /** * Internal dispatch routine * @param[in] d_temp_storage * Device-accessible allocation of temporary storage. When `nullptr`, the * required allocation size is written to `temp_storage_bytes` and no * work is done. * * @param[in,out] temp_storage_bytes * Reference to size in bytes of `d_temp_storage` allocation * * @param[in] d_keys_in * Pointer to the input sequence of keys * * @param[out] d_unique_out * Pointer to the output sequence of unique keys (one key per run) * * @param[in] d_values_in * Pointer to the input sequence of corresponding values * * @param[out] d_aggregates_out * Pointer to the output sequence of value aggregates * (one aggregate per run) * * @param[out] d_num_runs_out * Pointer to total number of runs encountered * (i.e., the length of d_unique_out) * * @param[in] equality_op * KeyT equality operator * * @param[in] reduction_op * ValueT reduction operator * * @param[in] num_items * Total number of items to select from * * @param[in] stream * CUDA stream to launch kernels within. Default is stream0. */ CUB_RUNTIME_FUNCTION __forceinline__ static cudaError_t Dispatch(void *d_temp_storage, size_t &temp_storage_bytes, KeysInputIteratorT d_keys_in, UniqueOutputIteratorT d_unique_out, ValuesInputIteratorT d_values_in, AggregatesOutputIteratorT d_aggregates_out, NumRunsOutputIteratorT d_num_runs_out, EqualityOpT equality_op, ReductionOpT reduction_op, OffsetT num_items, cudaStream_t stream) { using MaxPolicyT = typename SelectedPolicy::MaxPolicy; cudaError error = cudaSuccess; do { // Get PTX version int ptx_version = 0; if (CubDebug(error = PtxVersion(ptx_version))) { break; } DispatchReduceByKey dispatch(d_temp_storage, temp_storage_bytes, d_keys_in, d_unique_out, d_values_in, d_aggregates_out, d_num_runs_out, equality_op, reduction_op, num_items, stream); // Dispatch if (CubDebug(error = MaxPolicyT::Invoke(ptx_version, dispatch))) { break; } } while (0); return error; } CUB_DETAIL_RUNTIME_DEBUG_SYNC_IS_NOT_SUPPORTED CUB_RUNTIME_FUNCTION __forceinline__ static cudaError_t Dispatch(void *d_temp_storage, size_t &temp_storage_bytes, KeysInputIteratorT d_keys_in, UniqueOutputIteratorT d_unique_out, ValuesInputIteratorT d_values_in, AggregatesOutputIteratorT d_aggregates_out, NumRunsOutputIteratorT d_num_runs_out, EqualityOpT equality_op, ReductionOpT reduction_op, OffsetT num_items, cudaStream_t stream, bool debug_synchronous) { CUB_DETAIL_RUNTIME_DEBUG_SYNC_USAGE_LOG return Dispatch(d_temp_storage, temp_storage_bytes, d_keys_in, d_unique_out, d_values_in, d_aggregates_out, d_num_runs_out, equality_op, reduction_op, num_items, stream); } }; CUB_NAMESPACE_END