#ifndef OPENPOSE_THREAD_THREAD_QUEUE_IN_OUT_HPP #define OPENPOSE_THREAD_THREAD_QUEUE_IN_OUT_HPP #include #include #include #include namespace op { template>, typename TQueue = Queue> class SubThreadQueueInOut : public SubThread { public: SubThreadQueueInOut(const std::vector& tWorkers, const std::shared_ptr& tQueueIn, const std::shared_ptr& tQueueOut); virtual ~SubThreadQueueInOut(); bool work(); private: std::shared_ptr spTQueueIn; std::shared_ptr spTQueueOut; DELETE_COPY(SubThreadQueueInOut); }; } // Implementation namespace op { template SubThreadQueueInOut::SubThreadQueueInOut(const std::vector& tWorkers, const std::shared_ptr& tQueueIn, const std::shared_ptr& tQueueOut) : SubThread{tWorkers}, spTQueueIn{tQueueIn}, spTQueueOut{tQueueOut} { // spTQueueIn->addPopper(); spTQueueOut->addPusher(); } template SubThreadQueueInOut::~SubThreadQueueInOut() { } template bool SubThreadQueueInOut::work() { try { // If output queue is closed -> close input queue if (!spTQueueOut->isRunning()) { spTQueueIn->stop(); return false; } // If output queue running -> normal operation else { // Don't work until next queue is not full // This reduces latency to half if (!spTQueueOut->isFull()) { // Pop TDatums if (spTQueueIn->empty()) std::this_thread::sleep_for(std::chrono::microseconds{100}); TDatums tDatums; bool workersAreRunning = spTQueueIn->tryPop(tDatums); // Check queue not stopped if (!workersAreRunning) workersAreRunning = spTQueueIn->isRunning(); // Process TDatums workersAreRunning = this->workTWorkers(tDatums, workersAreRunning); // Push/emplace tDatums if successfully processed if (workersAreRunning) { if (tDatums != nullptr) spTQueueOut->waitAndEmplace(tDatums); } // Close both queues otherwise else { spTQueueIn->stop(); spTQueueOut->stopPusher(); } return workersAreRunning; } else { std::this_thread::sleep_for(std::chrono::microseconds{100}); return true; } } } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); spTQueueIn->stop(); spTQueueOut->stop(); return false; } } COMPILE_TEMPLATE_DATUM(SubThreadQueueInOut); } #endif // OPENPOSE_THREAD_THREAD_QUEUE_IN_OUT_HPP