| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | #include <stdio.h> |
| | #ifdef __linux |
| | #include <pthread.h> |
| | #include <unistd.h> |
| | #endif |
| | #include <stdio.h> |
| | #include <stdlib.h> |
| | #include <errno.h> |
| | #include <thread> |
| |
|
| | #include "ThreadPool.h" |
| |
|
| | using namespace std; |
| |
|
| | namespace Moses2 |
| | { |
| |
|
| | #define handle_error_en(en, msg) \ |
| | do { errno = en; perror(msg); exit(EXIT_FAILURE); } while (0) |
| |
|
| | ThreadPool::ThreadPool(size_t numThreads, int cpuAffinityOffset, |
| | int cpuAffinityIncr) : |
| | m_stopped(false), m_stopping(false), m_queueLimit(numThreads*2) |
| | { |
| | #if defined(_WIN32) || defined(_WIN64) |
| | size_t numCPU = std::thread::hardware_concurrency(); |
| | #else |
| | size_t numCPU = sysconf(_SC_NPROCESSORS_ONLN); |
| | #endif |
| | |
| |
|
| | int cpuInd = cpuAffinityOffset % numCPU; |
| |
|
| | for (size_t i = 0; i < numThreads; ++i) { |
| | boost::thread *thread = m_threads.create_thread( |
| | boost::bind(&ThreadPool::Execute, this)); |
| |
|
| | #ifdef __linux |
| | if (cpuAffinityOffset >= 0) { |
| | int s; |
| |
|
| | boost::thread::native_handle_type handle = thread->native_handle(); |
| |
|
| | |
| | cpu_set_t cpuset; |
| | CPU_ZERO(&cpuset); |
| |
|
| | CPU_SET(cpuInd, &cpuset); |
| | cpuInd += cpuAffinityIncr; |
| | cpuInd = cpuInd % numCPU; |
| |
|
| | s = pthread_setaffinity_np(handle, sizeof(cpu_set_t), &cpuset); |
| | if (s != 0) { |
| | handle_error_en(s, "pthread_setaffinity_np"); |
| | |
| | } |
| |
|
| | |
| | CPU_ZERO(&cpuset); |
| | s = pthread_getaffinity_np(handle, sizeof(cpu_set_t), &cpuset); |
| | cerr << "Set returned by pthread_getaffinity_np() contained:\n"; |
| | for (int j = 0; j < CPU_SETSIZE; j++) { |
| | if (CPU_ISSET(j, &cpuset)) { |
| | cerr << " CPU " << j << "\n"; |
| | } |
| | } |
| | } |
| | #endif |
| | } |
| | } |
| |
|
| | void ThreadPool::Execute() |
| | { |
| | do { |
| | boost::shared_ptr<Task> task; |
| | { |
| | |
| | boost::mutex::scoped_lock lock(m_mutex); |
| | if (m_tasks.empty() && !m_stopped) { |
| | m_threadNeeded.wait(lock); |
| | } |
| | if (!m_stopped && !m_tasks.empty()) { |
| | task = m_tasks.front(); |
| | m_tasks.pop(); |
| | } |
| | } |
| | |
| | if (task) { |
| | |
| | |
| | task->DeleteAfterExecution(); |
| | task->Run(); |
| | } |
| | m_threadAvailable.notify_all(); |
| | } while (!m_stopped); |
| | } |
| |
|
| | void ThreadPool::Submit(boost::shared_ptr<Task> task) |
| | { |
| | boost::mutex::scoped_lock lock(m_mutex); |
| | if (m_stopping) { |
| | throw runtime_error("ThreadPool stopping - unable to accept new jobs"); |
| | } |
| | while (m_queueLimit > 0 && m_tasks.size() >= m_queueLimit) { |
| | m_threadAvailable.wait(lock); |
| | } |
| | m_tasks.push(task); |
| | m_threadNeeded.notify_all(); |
| | } |
| |
|
| | void ThreadPool::Stop(bool processRemainingJobs) |
| | { |
| | { |
| | |
| | boost::mutex::scoped_lock lock(m_mutex); |
| | if (m_stopped) return; |
| | m_stopping = true; |
| | } |
| | if (processRemainingJobs) { |
| | boost::mutex::scoped_lock lock(m_mutex); |
| | |
| | while (!m_tasks.empty() && !m_stopped) { |
| | m_threadAvailable.wait(lock); |
| | } |
| | } |
| | |
| | { |
| | boost::mutex::scoped_lock lock(m_mutex); |
| | m_stopped = true; |
| | } |
| | m_threadNeeded.notify_all(); |
| |
|
| | m_threads.join_all(); |
| | } |
| |
|
| | } |
| |
|
| |
|