|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#include <stdio.h> |
|
|
#ifdef __linux |
|
|
#include <pthread.h> |
|
|
#include <unistd.h> |
|
|
#endif |
|
|
#include <stdio.h> |
|
|
#include <stdlib.h> |
|
|
#include <errno.h> |
|
|
#include <thread> |
|
|
|
|
|
#include "ThreadPool.h" |
|
|
|
|
|
using namespace std; |
|
|
|
|
|
namespace Moses2 |
|
|
{ |
|
|
|
|
|
#define handle_error_en(en, msg) \ |
|
|
do { errno = en; perror(msg); exit(EXIT_FAILURE); } while (0) |
|
|
|
|
|
ThreadPool::ThreadPool(size_t numThreads, int cpuAffinityOffset, |
|
|
int cpuAffinityIncr) : |
|
|
m_stopped(false), m_stopping(false), m_queueLimit(numThreads*2) |
|
|
{ |
|
|
#if defined(_WIN32) || defined(_WIN64) |
|
|
size_t numCPU = std::thread::hardware_concurrency(); |
|
|
#else |
|
|
size_t numCPU = sysconf(_SC_NPROCESSORS_ONLN); |
|
|
#endif |
|
|
|
|
|
|
|
|
int cpuInd = cpuAffinityOffset % numCPU; |
|
|
|
|
|
for (size_t i = 0; i < numThreads; ++i) { |
|
|
boost::thread *thread = m_threads.create_thread( |
|
|
boost::bind(&ThreadPool::Execute, this)); |
|
|
|
|
|
#ifdef __linux |
|
|
if (cpuAffinityOffset >= 0) { |
|
|
int s; |
|
|
|
|
|
boost::thread::native_handle_type handle = thread->native_handle(); |
|
|
|
|
|
|
|
|
cpu_set_t cpuset; |
|
|
CPU_ZERO(&cpuset); |
|
|
|
|
|
CPU_SET(cpuInd, &cpuset); |
|
|
cpuInd += cpuAffinityIncr; |
|
|
cpuInd = cpuInd % numCPU; |
|
|
|
|
|
s = pthread_setaffinity_np(handle, sizeof(cpu_set_t), &cpuset); |
|
|
if (s != 0) { |
|
|
handle_error_en(s, "pthread_setaffinity_np"); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
CPU_ZERO(&cpuset); |
|
|
s = pthread_getaffinity_np(handle, sizeof(cpu_set_t), &cpuset); |
|
|
cerr << "Set returned by pthread_getaffinity_np() contained:\n"; |
|
|
for (int j = 0; j < CPU_SETSIZE; j++) { |
|
|
if (CPU_ISSET(j, &cpuset)) { |
|
|
cerr << " CPU " << j << "\n"; |
|
|
} |
|
|
} |
|
|
} |
|
|
#endif |
|
|
} |
|
|
} |
|
|
|
|
|
void ThreadPool::Execute() |
|
|
{ |
|
|
do { |
|
|
boost::shared_ptr<Task> task; |
|
|
{ |
|
|
|
|
|
boost::mutex::scoped_lock lock(m_mutex); |
|
|
if (m_tasks.empty() && !m_stopped) { |
|
|
m_threadNeeded.wait(lock); |
|
|
} |
|
|
if (!m_stopped && !m_tasks.empty()) { |
|
|
task = m_tasks.front(); |
|
|
m_tasks.pop(); |
|
|
} |
|
|
} |
|
|
|
|
|
if (task) { |
|
|
|
|
|
|
|
|
task->DeleteAfterExecution(); |
|
|
task->Run(); |
|
|
} |
|
|
m_threadAvailable.notify_all(); |
|
|
} while (!m_stopped); |
|
|
} |
|
|
|
|
|
void ThreadPool::Submit(boost::shared_ptr<Task> task) |
|
|
{ |
|
|
boost::mutex::scoped_lock lock(m_mutex); |
|
|
if (m_stopping) { |
|
|
throw runtime_error("ThreadPool stopping - unable to accept new jobs"); |
|
|
} |
|
|
while (m_queueLimit > 0 && m_tasks.size() >= m_queueLimit) { |
|
|
m_threadAvailable.wait(lock); |
|
|
} |
|
|
m_tasks.push(task); |
|
|
m_threadNeeded.notify_all(); |
|
|
} |
|
|
|
|
|
void ThreadPool::Stop(bool processRemainingJobs) |
|
|
{ |
|
|
{ |
|
|
|
|
|
boost::mutex::scoped_lock lock(m_mutex); |
|
|
if (m_stopped) return; |
|
|
m_stopping = true; |
|
|
} |
|
|
if (processRemainingJobs) { |
|
|
boost::mutex::scoped_lock lock(m_mutex); |
|
|
|
|
|
while (!m_tasks.empty() && !m_stopped) { |
|
|
m_threadAvailable.wait(lock); |
|
|
} |
|
|
} |
|
|
|
|
|
{ |
|
|
boost::mutex::scoped_lock lock(m_mutex); |
|
|
m_stopped = true; |
|
|
} |
|
|
m_threadNeeded.notify_all(); |
|
|
|
|
|
m_threads.join_all(); |
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|