#ifndef OPENPOSE_THREAD_THREAD_QUEUE_OUT_HPP #define OPENPOSE_THREAD_THREAD_QUEUE_OUT_HPP #include #include #include #include namespace op { template>, typename TQueue = Queue> class SubThreadQueueOut : public SubThread { public: SubThreadQueueOut(const std::vector& tWorkers, const std::shared_ptr& tQueueOut); virtual ~SubThreadQueueOut(); bool work(); private: std::shared_ptr spTQueueOut; DELETE_COPY(SubThreadQueueOut); }; } // Implementation namespace op { template SubThreadQueueOut::SubThreadQueueOut(const std::vector& tWorkers, const std::shared_ptr& tQueueOut) : SubThread{tWorkers}, spTQueueOut{tQueueOut} { spTQueueOut->addPusher(); } template SubThreadQueueOut::~SubThreadQueueOut() { } template bool SubThreadQueueOut::work() { try { // If output queue is closed -> close input queue if (!spTQueueOut->isRunning()) return false; else { // Don't work until next queue is not full // This reduces latency to half if (!spTQueueOut->isFull()) { // Process TDatums TDatums tDatums; const auto workersAreRunning = this->workTWorkers(tDatums, true); // Push/emplace tDatums if successfully processed if (workersAreRunning) { if (tDatums != nullptr) spTQueueOut->waitAndEmplace(tDatums); } // Close queue otherwise else spTQueueOut->stopPusher(); return workersAreRunning; } else { std::this_thread::sleep_for(std::chrono::microseconds{100}); return true; } } } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); spTQueueOut->stop(); return false; } } COMPILE_TEMPLATE_DATUM(SubThreadQueueOut); } #endif // OPENPOSE_THREAD_THREAD_QUEUE_OUT_HPP