| | #include "parallel.h" |
| | #include <list> |
| | #include <thread> |
| | #include <condition_variable> |
| | #include <vector> |
| | #include <cassert> |
| |
|
| | |
| |
|
| | static std::vector<std::thread> threads; |
| | static bool shutdownThreads = false; |
| | struct ParallelForLoop; |
| | static ParallelForLoop *workList = nullptr; |
| | static std::mutex workListMutex; |
| |
|
| | struct ParallelForLoop { |
| | ParallelForLoop(std::function<void(int64_t)> func1D, int64_t maxIndex, int chunkSize) |
| | : func1D(std::move(func1D)), maxIndex(maxIndex), chunkSize(chunkSize) { |
| | } |
| | ParallelForLoop(const std::function<void(Vector2i)> &f, const Vector2i count) |
| | : func2D(f), maxIndex(count[0] * count[1]), chunkSize(1) { |
| | nX = count[0]; |
| | } |
| |
|
| | std::function<void(int64_t)> func1D; |
| | std::function<void(Vector2i)> func2D; |
| | const int64_t maxIndex; |
| | const int chunkSize; |
| | int64_t nextIndex = 0; |
| | int activeWorkers = 0; |
| | ParallelForLoop *next = nullptr; |
| | int nX = -1; |
| |
|
| | bool Finished() const { |
| | return nextIndex >= maxIndex && activeWorkers == 0; |
| | } |
| | }; |
| |
|
| | void Barrier::Wait() { |
| | std::unique_lock<std::mutex> lock(mutex); |
| | assert(count > 0); |
| | if (--count == 0) { |
| | |
| | |
| | cv.notify_all(); |
| | } else { |
| | |
| | |
| | cv.wait(lock, [this] { return count == 0; }); |
| | } |
| | } |
| |
|
| | static std::condition_variable workListCondition; |
| |
|
| | static void worker_thread_func(const int tIndex, std::shared_ptr<Barrier> barrier) { |
| | ThreadIndex = tIndex; |
| |
|
| | |
| | |
| | |
| | barrier->Wait(); |
| |
|
| | |
| | |
| | barrier.reset(); |
| |
|
| | std::unique_lock<std::mutex> lock(workListMutex); |
| | while (!shutdownThreads) { |
| | if (!workList) { |
| | |
| | workListCondition.wait(lock); |
| | } else { |
| | |
| | ParallelForLoop &loop = *workList; |
| |
|
| | |
| |
|
| | |
| | int64_t indexStart = loop.nextIndex; |
| | int64_t indexEnd = std::min(indexStart + loop.chunkSize, loop.maxIndex); |
| |
|
| | |
| | loop.nextIndex = indexEnd; |
| | if (loop.nextIndex == loop.maxIndex) |
| | workList = loop.next; |
| | loop.activeWorkers++; |
| |
|
| | |
| | lock.unlock(); |
| | for (int64_t index = indexStart; index < indexEnd; ++index) { |
| | if (loop.func1D) { |
| | loop.func1D(index); |
| | } |
| | |
| | else { |
| | assert(loop.func2D != nullptr); |
| | loop.func2D(Vector2i{int(index % loop.nX), |
| | int(index / loop.nX)}); |
| | } |
| | } |
| | lock.lock(); |
| |
|
| | |
| | loop.activeWorkers--; |
| | if (loop.Finished()) { |
| | workListCondition.notify_all(); |
| | } |
| | } |
| | } |
| | } |
| |
|
| | void parallel_for_host(const std::function<void(int64_t)> &func, |
| | int64_t count, |
| | int chunkSize) { |
| | |
| | if (threads.empty() || count < chunkSize) { |
| | for (int64_t i = 0; i < count; ++i) { |
| | func(i); |
| | } |
| | return; |
| | } |
| |
|
| | |
| | ParallelForLoop loop(func, count, chunkSize); |
| | workListMutex.lock(); |
| | loop.next = workList; |
| | workList = &loop; |
| | workListMutex.unlock(); |
| |
|
| | |
| | std::unique_lock<std::mutex> lock(workListMutex); |
| | workListCondition.notify_all(); |
| |
|
| | |
| | while (!loop.Finished()) { |
| | |
| |
|
| | |
| | int64_t indexStart = loop.nextIndex; |
| | int64_t indexEnd = std::min(indexStart + loop.chunkSize, loop.maxIndex); |
| |
|
| | |
| | loop.nextIndex = indexEnd; |
| | if (loop.nextIndex == loop.maxIndex) { |
| | workList = loop.next; |
| | } |
| | loop.activeWorkers++; |
| |
|
| | |
| | lock.unlock(); |
| | for (int64_t index = indexStart; index < indexEnd; ++index) { |
| | if (loop.func1D) { |
| | loop.func1D(index); |
| | } |
| | |
| | else { |
| | assert(loop.func2D != nullptr); |
| | loop.func2D(Vector2i{int(index % loop.nX), |
| | int(index / loop.nX)}); |
| | } |
| | } |
| | lock.lock(); |
| |
|
| | |
| | loop.activeWorkers--; |
| | } |
| | } |
| |
|
| | thread_local int ThreadIndex; |
| |
|
| | void parallel_for_host( |
| | std::function<void(Vector2i)> func, const Vector2i count) { |
| | |
| | if (threads.empty() || count.x * count.y <= 1) { |
| | for (int y = 0; y < count.y; ++y) { |
| | for (int x = 0; x < count.x; ++x) { |
| | func(Vector2i{x, y}); |
| | } |
| | } |
| | return; |
| | } |
| |
|
| | ParallelForLoop loop(std::move(func), count); |
| | { |
| | std::lock_guard<std::mutex> lock(workListMutex); |
| | loop.next = workList; |
| | workList = &loop; |
| | } |
| |
|
| | std::unique_lock<std::mutex> lock(workListMutex); |
| | workListCondition.notify_all(); |
| |
|
| | |
| | while (!loop.Finished()) { |
| | |
| |
|
| | |
| | int64_t indexStart = loop.nextIndex; |
| | int64_t indexEnd = std::min(indexStart + loop.chunkSize, loop.maxIndex); |
| |
|
| | |
| | loop.nextIndex = indexEnd; |
| | if (loop.nextIndex == loop.maxIndex) { |
| | workList = loop.next; |
| | } |
| | loop.activeWorkers++; |
| |
|
| | |
| | lock.unlock(); |
| | for (int64_t index = indexStart; index < indexEnd; ++index) { |
| | if (loop.func1D) { |
| | loop.func1D(index); |
| | } |
| | |
| | else { |
| | assert(loop.func2D != nullptr); |
| | loop.func2D(Vector2i{int(index % loop.nX), |
| | int(index / loop.nX)}); |
| | } |
| | } |
| | lock.lock(); |
| |
|
| | |
| | loop.activeWorkers--; |
| | } |
| | } |
| |
|
| | int num_system_cores() { |
| | |
| | int ret = std::thread::hardware_concurrency(); |
| | if (ret == 0) { |
| | return 16; |
| | } |
| | return ret; |
| | } |
| |
|
| | void parallel_init() { |
| | assert(threads.size() == 0); |
| | int nThreads = num_system_cores(); |
| | ThreadIndex = 0; |
| |
|
| | |
| | |
| | |
| | |
| | std::shared_ptr<Barrier> barrier = std::make_shared<Barrier>(nThreads); |
| |
|
| | |
| | |
| | for (int i = 0; i < nThreads - 1; ++i) { |
| | threads.push_back(std::thread(worker_thread_func, i + 1, barrier)); |
| | } |
| |
|
| | barrier->Wait(); |
| | } |
| |
|
| | void parallel_cleanup() { |
| | if (threads.empty()) { |
| | return; |
| | } |
| |
|
| | { |
| | std::lock_guard<std::mutex> lock(workListMutex); |
| | shutdownThreads = true; |
| | workListCondition.notify_all(); |
| | } |
| |
|
| | for (std::thread &thread : threads) { |
| | thread.join(); |
| | } |
| | threads.erase(threads.begin(), threads.end()); |
| | shutdownThreads = false; |
| | } |
| |
|