|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#ifndef CONCURRENT_UNORDERED_MAP_CUH |
|
|
#define CONCURRENT_UNORDERED_MAP_CUH |
|
|
|
|
|
#include <cudf/detail/nvtx/ranges.hpp> |
|
|
#include <hash/hash_allocator.cuh> |
|
|
#include <hash/helper_functions.cuh> |
|
|
#include <hash/managed.cuh> |
|
|
#include <cudf/detail/utilities/hash_functions.cuh> |
|
|
#include <cudf/detail/utilities/device_atomics.cuh> |
|
|
#include <cudf/utilities/error.hpp> |
|
|
|
|
|
#include <thrust/execution_policy.h> |
|
|
|
|
|
#include <thrust/pair.h> |
|
|
#include <thrust/count.h> |
|
|
|
|
|
#include <functional> |
|
|
#include <memory> |
|
|
#include <cassert> |
|
|
#include <iostream> |
|
|
#include <iterator> |
|
|
#include <limits> |
|
|
#include <type_traits> |
|
|
|
|
|
namespace { |
|
|
template <std::size_t N> |
|
|
struct packed { |
|
|
using type = void; |
|
|
}; |
|
|
template <> |
|
|
struct packed<sizeof(uint64_t)> { |
|
|
using type = uint64_t; |
|
|
}; |
|
|
template <> |
|
|
struct packed<sizeof(uint32_t)> { |
|
|
using type = uint32_t; |
|
|
}; |
|
|
template <typename pair_type> |
|
|
using packed_t = typename packed<sizeof(pair_type)>::type; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template <typename pair_type, |
|
|
typename key_type = typename pair_type::first_type, |
|
|
typename value_type = typename pair_type::second_type> |
|
|
constexpr bool is_packable() |
|
|
{ |
|
|
return std::is_integral<key_type>::value and std::is_integral<value_type>::value and |
|
|
not std::is_void<packed_t<pair_type>>::value; |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template <typename pair_type, typename Enable = void> |
|
|
union pair_packer; |
|
|
|
|
|
template <typename pair_type> |
|
|
union pair_packer<pair_type, std::enable_if_t<is_packable<pair_type>()>> { |
|
|
using packed_type = packed_t<pair_type>; |
|
|
packed_type const packed; |
|
|
pair_type const pair; |
|
|
|
|
|
__device__ pair_packer(pair_type _pair) : pair{_pair} {} |
|
|
|
|
|
__device__ pair_packer(packed_type _packed) : packed{_packed} {} |
|
|
}; |
|
|
} |
|
|
|
|
|
template <typename Key, typename Element, typename Equality> struct _is_used { |
|
|
using value_type = thrust::pair<Key, Element>; |
|
|
|
|
|
_is_used(Key const &unused, Equality const &equal) |
|
|
: m_unused_key(unused), m_equal(equal) {} |
|
|
|
|
|
__host__ __device__ bool operator()(value_type const &x) { |
|
|
return !m_equal(x.first, m_unused_key); |
|
|
} |
|
|
|
|
|
Key const m_unused_key; |
|
|
Equality const m_equal; |
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template <typename Key, |
|
|
typename Element, |
|
|
typename Hasher = default_hash<Key>, |
|
|
typename Equality = equal_to<Key>, |
|
|
typename Allocator = default_allocator<thrust::pair<Key, Element>>> |
|
|
class concurrent_unordered_map { |
|
|
public: |
|
|
using size_type = size_t; |
|
|
using hasher = Hasher; |
|
|
using key_equal = Equality; |
|
|
using allocator_type = Allocator; |
|
|
using key_type = Key; |
|
|
using mapped_type = Element; |
|
|
using value_type = thrust::pair<Key, Element>; |
|
|
using iterator = cycle_iterator_adapter<value_type*>; |
|
|
using const_iterator = const cycle_iterator_adapter<value_type*>; |
|
|
|
|
|
public: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static auto create(size_type capacity, |
|
|
const mapped_type unused_element = std::numeric_limits<mapped_type>::max(), |
|
|
const key_type unused_key = std::numeric_limits<key_type>::max(), |
|
|
const Hasher& hash_function = hasher(), |
|
|
const Equality& equal = key_equal(), |
|
|
const allocator_type& allocator = allocator_type(), |
|
|
cudaStream_t stream = 0) |
|
|
{ |
|
|
CUDF_FUNC_RANGE(); |
|
|
using Self = concurrent_unordered_map<Key, Element, Hasher, Equality, Allocator>; |
|
|
|
|
|
|
|
|
|
|
|
auto deleter = [stream](Self* p) { (*p).destroy(stream); }; |
|
|
|
|
|
return std::unique_ptr<Self, std::function<void(Self*)>>{ |
|
|
new Self(capacity, unused_element, unused_key, hash_function, equal, allocator, stream), |
|
|
deleter}; |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__device__ iterator begin() |
|
|
{ |
|
|
return iterator(m_hashtbl_values, m_hashtbl_values + m_capacity, m_hashtbl_values); |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__device__ const_iterator begin() const |
|
|
{ |
|
|
return const_iterator(m_hashtbl_values, m_hashtbl_values + m_capacity, m_hashtbl_values); |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__device__ iterator end() |
|
|
{ |
|
|
return iterator(m_hashtbl_values, m_hashtbl_values + m_capacity, m_hashtbl_values + m_capacity); |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__device__ const_iterator end() const |
|
|
{ |
|
|
return const_iterator( |
|
|
m_hashtbl_values, m_hashtbl_values + m_capacity, m_hashtbl_values + m_capacity); |
|
|
} |
|
|
__host__ __device__ inline value_type* data() const { return m_hashtbl_values; } |
|
|
|
|
|
__host__ __device__ inline key_type get_unused_key() const { return m_unused_key; } |
|
|
|
|
|
__host__ __device__ inline mapped_type get_unused_element() const { return m_unused_element; } |
|
|
|
|
|
__host__ __device__ inline key_equal get_key_equal() const { return m_equal; } |
|
|
|
|
|
__host__ __device__ inline size_type capacity() const { return m_capacity; } |
|
|
|
|
|
private: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
enum class insert_result { |
|
|
CONTINUE, |
|
|
|
|
|
SUCCESS, |
|
|
DUPLICATE |
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template <typename pair_type = value_type> |
|
|
__device__ std::enable_if_t<is_packable<pair_type>(), insert_result> attempt_insert( |
|
|
value_type* insert_location, value_type const& insert_pair) |
|
|
{ |
|
|
pair_packer<pair_type> const unused{thrust::make_pair(m_unused_key, m_unused_element)}; |
|
|
pair_packer<pair_type> const new_pair{insert_pair}; |
|
|
pair_packer<pair_type> const old{ |
|
|
atomicCAS(reinterpret_cast<typename pair_packer<pair_type>::packed_type*>(insert_location), |
|
|
unused.packed, |
|
|
new_pair.packed)}; |
|
|
|
|
|
if (old.packed == unused.packed) { return insert_result::SUCCESS; } |
|
|
|
|
|
if (m_equal(old.pair.first, insert_pair.first)) { return insert_result::DUPLICATE; } |
|
|
return insert_result::CONTINUE; |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template <typename pair_type = value_type> |
|
|
__device__ std::enable_if_t<not is_packable<pair_type>(), insert_result> attempt_insert( |
|
|
value_type* const __restrict__ insert_location, value_type const& insert_pair) |
|
|
{ |
|
|
key_type const old_key{atomicCAS(&(insert_location->first), m_unused_key, insert_pair.first)}; |
|
|
|
|
|
|
|
|
if (m_equal(m_unused_key, old_key)) { |
|
|
insert_location->second = insert_pair.second; |
|
|
return insert_result::SUCCESS; |
|
|
} |
|
|
|
|
|
|
|
|
if (m_equal(old_key, insert_pair.first)) { return insert_result::DUPLICATE; } |
|
|
|
|
|
return insert_result::CONTINUE; |
|
|
} |
|
|
|
|
|
public: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__device__ thrust::pair<iterator, bool> insert(value_type const& insert_pair) |
|
|
{ |
|
|
const size_type key_hash{m_hf(insert_pair.first)}; |
|
|
size_type index{key_hash % m_capacity}; |
|
|
|
|
|
insert_result status{insert_result::CONTINUE}; |
|
|
|
|
|
value_type* current_bucket{nullptr}; |
|
|
|
|
|
while (status == insert_result::CONTINUE) { |
|
|
current_bucket = &m_hashtbl_values[index]; |
|
|
status = attempt_insert(current_bucket, insert_pair); |
|
|
index = (index + 1) % m_capacity; |
|
|
} |
|
|
|
|
|
bool const insert_success = (status == insert_result::SUCCESS) ? true : false; |
|
|
|
|
|
return thrust::make_pair( |
|
|
iterator(m_hashtbl_values, m_hashtbl_values + m_capacity, current_bucket), insert_success); |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__device__ const_iterator find(key_type const& k) const |
|
|
{ |
|
|
size_type const key_hash = m_hf(k); |
|
|
size_type index = key_hash % m_capacity; |
|
|
|
|
|
value_type* current_bucket = &m_hashtbl_values[index]; |
|
|
|
|
|
while (true) { |
|
|
key_type const existing_key = current_bucket->first; |
|
|
|
|
|
if (m_equal(m_unused_key, existing_key)) { return this->end(); } |
|
|
|
|
|
if (m_equal(k, existing_key)) { |
|
|
return const_iterator(m_hashtbl_values, m_hashtbl_values + m_capacity, current_bucket); |
|
|
} |
|
|
|
|
|
index = (index + 1) % m_capacity; |
|
|
current_bucket = &m_hashtbl_values[index]; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__device__ iterator find(key_type const& k) |
|
|
{ |
|
|
size_type const key_hash = m_hf(k); |
|
|
size_type index = key_hash % m_capacity; |
|
|
|
|
|
value_type* current_bucket = &m_hashtbl_values[index]; |
|
|
|
|
|
while (true) { |
|
|
key_type const existing_key = current_bucket->first; |
|
|
|
|
|
if (m_equal(m_unused_key, existing_key)) { return this->end(); } |
|
|
|
|
|
if (m_equal(k, existing_key)) { |
|
|
return iterator(m_hashtbl_values, m_hashtbl_values + m_capacity, current_bucket); |
|
|
} |
|
|
|
|
|
index = (index + 1) % m_capacity; |
|
|
current_bucket = &m_hashtbl_values[index]; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template <typename find_hasher, typename find_key_equal> |
|
|
__device__ const_iterator find(key_type const& k, |
|
|
find_hasher f_hash, |
|
|
find_key_equal f_equal) const |
|
|
{ |
|
|
size_type const key_hash = f_hash(k); |
|
|
size_type index = key_hash % m_capacity; |
|
|
|
|
|
value_type* current_bucket = &m_hashtbl_values[index]; |
|
|
|
|
|
while (true) { |
|
|
key_type const existing_key = current_bucket->first; |
|
|
|
|
|
if (m_equal(m_unused_key, existing_key)) { return this->end(); } |
|
|
|
|
|
if (f_equal(k, existing_key)) { |
|
|
return const_iterator(m_hashtbl_values, m_hashtbl_values + m_capacity, current_bucket); |
|
|
} |
|
|
|
|
|
index = (index + 1) % m_capacity; |
|
|
current_bucket = &m_hashtbl_values[index]; |
|
|
} |
|
|
} |
|
|
|
|
|
gdf_error assign_async(const concurrent_unordered_map& other, cudaStream_t stream = 0) |
|
|
{ |
|
|
if (other.m_capacity <= m_capacity) { |
|
|
m_capacity = other.m_capacity; |
|
|
} else { |
|
|
m_allocator.deallocate(m_hashtbl_values, m_capacity, stream); |
|
|
m_capacity = other.m_capacity; |
|
|
m_capacity = other.m_capacity; |
|
|
|
|
|
m_hashtbl_values = m_allocator.allocate(m_capacity, stream); |
|
|
} |
|
|
CUDA_TRY(cudaMemcpyAsync(m_hashtbl_values, |
|
|
other.m_hashtbl_values, |
|
|
m_capacity * sizeof(value_type), |
|
|
cudaMemcpyDefault, |
|
|
stream)); |
|
|
return GDF_SUCCESS; |
|
|
} |
|
|
|
|
|
void clear_async(cudaStream_t stream = 0) |
|
|
{ |
|
|
constexpr int block_size = 128; |
|
|
init_hashtbl<<<((m_capacity + block_size - 1) / block_size), block_size, 0, stream>>>( |
|
|
m_hashtbl_values, m_capacity, m_unused_key, m_unused_element); |
|
|
} |
|
|
|
|
|
void print() const |
|
|
{ |
|
|
for (size_type i = 0; i < m_capacity; ++i) { |
|
|
std::cout << i << ": " << m_hashtbl_values[i].first << "," << m_hashtbl_values[i].second |
|
|
<< std::endl; |
|
|
} |
|
|
} |
|
|
|
|
|
size_t size() const |
|
|
{ |
|
|
return thrust::count_if(thrust::device, m_hashtbl_values, m_hashtbl_values + m_capacity, |
|
|
_is_used<Key, Element, Equality>(get_unused_key(), get_key_equal())); |
|
|
} |
|
|
|
|
|
gdf_error prefetch(const int dev_id, cudaStream_t stream = 0) |
|
|
{ |
|
|
cudaPointerAttributes hashtbl_values_ptr_attributes; |
|
|
cudaError_t status = cudaPointerGetAttributes(&hashtbl_values_ptr_attributes, m_hashtbl_values); |
|
|
|
|
|
if (cudaSuccess == status && isPtrManaged(hashtbl_values_ptr_attributes)) { |
|
|
CUDA_TRY( |
|
|
cudaMemPrefetchAsync(m_hashtbl_values, m_capacity * sizeof(value_type), dev_id, stream)); |
|
|
} |
|
|
CUDA_TRY(cudaMemPrefetchAsync(this, sizeof(*this), dev_id, stream)); |
|
|
|
|
|
return GDF_SUCCESS; |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void destroy(cudaStream_t stream = 0) |
|
|
{ |
|
|
m_allocator.deallocate(m_hashtbl_values, m_capacity, stream); |
|
|
delete this; |
|
|
} |
|
|
|
|
|
concurrent_unordered_map() = delete; |
|
|
concurrent_unordered_map(concurrent_unordered_map const&) = default; |
|
|
concurrent_unordered_map(concurrent_unordered_map&&) = default; |
|
|
concurrent_unordered_map& operator=(concurrent_unordered_map const&) = default; |
|
|
concurrent_unordered_map& operator=(concurrent_unordered_map&&) = default; |
|
|
~concurrent_unordered_map() = default; |
|
|
|
|
|
private: |
|
|
hasher m_hf; |
|
|
key_equal m_equal; |
|
|
mapped_type m_unused_element; |
|
|
key_type m_unused_key; |
|
|
allocator_type m_allocator; |
|
|
size_type m_capacity; |
|
|
value_type* m_hashtbl_values; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
concurrent_unordered_map(size_type capacity, |
|
|
const mapped_type unused_element, |
|
|
const key_type unused_key, |
|
|
const Hasher& hash_function, |
|
|
const Equality& equal, |
|
|
const allocator_type& allocator, |
|
|
cudaStream_t stream = 0) |
|
|
: m_hf(hash_function), |
|
|
m_equal(equal), |
|
|
m_allocator(allocator), |
|
|
m_capacity(capacity), |
|
|
m_unused_element(unused_element), |
|
|
m_unused_key(unused_key) |
|
|
{ |
|
|
m_hashtbl_values = m_allocator.allocate(m_capacity, stream); |
|
|
constexpr int block_size = 128; |
|
|
{ |
|
|
cudaPointerAttributes hashtbl_values_ptr_attributes; |
|
|
cudaError_t status = |
|
|
cudaPointerGetAttributes(&hashtbl_values_ptr_attributes, m_hashtbl_values); |
|
|
|
|
|
if (cudaSuccess == status && isPtrManaged(hashtbl_values_ptr_attributes)) { |
|
|
int dev_id = 0; |
|
|
CUDA_TRY(cudaGetDevice(&dev_id)); |
|
|
CUDA_TRY( |
|
|
cudaMemPrefetchAsync(m_hashtbl_values, m_capacity * sizeof(value_type), dev_id, stream)); |
|
|
} |
|
|
} |
|
|
|
|
|
init_hashtbl<<<((m_capacity + block_size - 1) / block_size), block_size, 0, stream>>>( |
|
|
m_hashtbl_values, m_capacity, m_unused_key, m_unused_element); |
|
|
CUDA_TRY(cudaGetLastError()); |
|
|
} |
|
|
}; |
|
|
|
|
|
#endif |
|
|
|