File size: 3,825 Bytes
d8d559a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | #ifndef UTIL_THREAD_POOL_H
#define UTIL_THREAD_POOL_H
#include "pcqueue.hh"
#include <boost/ptr_container/ptr_vector.hpp>
#include <boost/optional.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <cstdlib>
namespace util {
template <class HandlerT> class Worker : boost::noncopyable {
public:
typedef HandlerT Handler;
typedef typename Handler::Request Request;
template <class Construct> Worker(PCQueue<Request> &in, Construct &construct, const Request &poison)
: in_(in), handler_(construct), poison_(poison), thread_(boost::ref(*this)) {}
// Only call from thread.
void operator()() {
Request request;
while (1) {
in_.Consume(request);
if (request == poison_) return;
try {
(*handler_)(request);
}
catch(const std::exception &e) {
std::cerr << "Handler threw " << e.what() << std::endl;
abort();
}
catch(...) {
std::cerr << "Handler threw an exception, dropping request" << std::endl;
abort();
}
}
}
void Join() {
thread_.join();
}
private:
PCQueue<Request> &in_;
boost::optional<Handler> handler_;
const Request poison_;
boost::thread thread_;
};
template <class HandlerT> class ThreadPool : boost::noncopyable {
public:
typedef HandlerT Handler;
typedef typename Handler::Request Request;
template <class Construct> ThreadPool(std::size_t queue_length, std::size_t workers, Construct handler_construct, Request poison) : in_(queue_length), poison_(poison) {
for (size_t i = 0; i < workers; ++i) {
workers_.push_back(new Worker<Handler>(in_, handler_construct, poison));
}
}
~ThreadPool() {
for (std::size_t i = 0; i < workers_.size(); ++i) {
Produce(poison_);
}
for (typename boost::ptr_vector<Worker<Handler> >::iterator i = workers_.begin(); i != workers_.end(); ++i) {
i->Join();
}
}
void Produce(const Request &request) {
in_.Produce(request);
}
// For adding to the queue.
PCQueue<Request> &In() { return in_; }
private:
PCQueue<Request> in_;
boost::ptr_vector<Worker<Handler> > workers_;
Request poison_;
};
template <class Handler> class RecyclingHandler {
public:
typedef typename Handler::Request Request;
template <class Construct> RecyclingHandler(PCQueue<Request> &recycling, Construct &handler_construct)
: inner_(handler_construct), recycling_(recycling) {}
void operator()(Request &request) {
inner_(request);
recycling_.Produce(request);
}
private:
Handler inner_;
PCQueue<Request> &recycling_;
};
template <class HandlerT> class RecyclingThreadPool : boost::noncopyable {
public:
typedef HandlerT Handler;
typedef typename Handler::Request Request;
// Remember to call PopulateRecycling afterwards in most cases.
template <class Construct> RecyclingThreadPool(std::size_t queue, std::size_t workers, Construct handler_construct, Request poison)
: recycling_(queue), pool_(queue, workers, RecyclingHandler<Handler>(recycling_, handler_construct), poison) {}
// Initialization: put stuff into the recycling queue. This could also be
// done by calling Produce without Consume, but it's often easier to
// initialize with PopulateRecycling then do a Consume/Produce loop.
void PopulateRecycling(const Request &request) {
recycling_.Produce(request);
}
Request Consume() {
return recycling_.Consume();
}
void Produce(const Request &request) {
pool_.Produce(request);
}
private:
PCQueue<Request> recycling_;
ThreadPool<RecyclingHandler<Handler> > pool_;
};
} // namespace util
#endif // UTIL_THREAD_POOL_H
|