#ifndef OPENPOSE_THREAD_W_QUEUE_ORDERER_HPP #define OPENPOSE_THREAD_W_QUEUE_ORDERER_HPP #include // std::priority_queue #include #include #include namespace op { template class WQueueOrderer : public Worker { public: explicit WQueueOrderer(const unsigned int maxBufferSize = 64u); virtual ~WQueueOrderer(); void initializationOnThread(); void work(TDatums& tDatums); void tryStop(); private: const unsigned int mMaxBufferSize; bool mStopWhenEmpty; unsigned long long mNextExpectedId; unsigned long long mNextExpectedSubId; std::priority_queue, PointerContainerGreater> mPriorityQueueBuffer; DELETE_COPY(WQueueOrderer); }; } // Implementation namespace op { template WQueueOrderer::WQueueOrderer(const unsigned int maxBufferSize) : mMaxBufferSize{maxBufferSize}, mStopWhenEmpty{false}, mNextExpectedId{0}, mNextExpectedSubId{0} { } template WQueueOrderer::~WQueueOrderer() { } template void WQueueOrderer::initializationOnThread() { } template void WQueueOrderer::work(TDatums& tDatums) { try { // Profiling speed const auto profilerKey = Profiler::timerInit(__LINE__, __FUNCTION__, __FILE__); bool profileSpeed = (tDatums != nullptr); // Input TDatum -> enqueue or return it back if (checkNoNullNorEmpty(tDatums)) { // T* to T auto& tDatumsNoPtr = *tDatums; // tDatums is the next expected, update counter if (tDatumsNoPtr[0]->id == mNextExpectedId && tDatumsNoPtr[0]->subId == mNextExpectedSubId) { // If single-view if (tDatumsNoPtr[0]->subIdMax == 0) mNextExpectedId++; // If muilti-view system else { mNextExpectedSubId++; if (mNextExpectedSubId > tDatumsNoPtr[0]->subIdMax) { mNextExpectedSubId = 0; mNextExpectedId++; } } } // Else push it to our buffered queue else { // Enqueue current tDatums mPriorityQueueBuffer.emplace(tDatums); tDatums = nullptr; // Else if buffer full -> remove one tDatums if (mPriorityQueueBuffer.size() > mMaxBufferSize) { tDatums = mPriorityQueueBuffer.top(); mPriorityQueueBuffer.pop(); } } } // If input TDatum enqueued -> check if previously enqueued next desired frame and pop it if (!checkNoNullNorEmpty(tDatums)) { // Retrieve frame if next is desired frame or if we want to stop this worker if (!mPriorityQueueBuffer.empty() && (mStopWhenEmpty || ((*mPriorityQueueBuffer.top())[0]->id == mNextExpectedId && (*mPriorityQueueBuffer.top())[0]->subId == mNextExpectedSubId))) { tDatums = { mPriorityQueueBuffer.top() }; mPriorityQueueBuffer.pop(); } } // If TDatum ready to be returned -> updated next expected id if (checkNoNullNorEmpty(tDatums)) { const auto& tDatumsNoPtr = *tDatums; // If single-view if (tDatumsNoPtr[0]->subIdMax == 0) mNextExpectedId = tDatumsNoPtr[0]->id + 1; // If muilti-view system else { mNextExpectedSubId = tDatumsNoPtr[0]->subId + 1; if (mNextExpectedSubId > tDatumsNoPtr[0]->subIdMax) { mNextExpectedSubId = 0; mNextExpectedId = tDatumsNoPtr[0]->id + 1; } } } // Sleep if no new tDatums to either pop or push if (!checkNoNullNorEmpty(tDatums) && mPriorityQueueBuffer.size() < mMaxBufferSize / 2u) std::this_thread::sleep_for(std::chrono::milliseconds{1}); // If TDatum popped and/or pushed if (profileSpeed || tDatums != nullptr) { // Profiling speed Profiler::timerEnd(profilerKey); Profiler::printAveragedTimeMsOnIterationX(profilerKey, __LINE__, __FUNCTION__, __FILE__); // Debugging log opLogIfDebug("", Priority::Low, __LINE__, __FUNCTION__, __FILE__); } } catch (const std::exception& e) { this->stop(); tDatums = nullptr; error(e.what(), __LINE__, __FUNCTION__, __FILE__); } } template void WQueueOrderer::tryStop() { try { // Close if all frames were retrieved from the queue if (mPriorityQueueBuffer.empty()) this->stop(); mStopWhenEmpty = true; } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); } } COMPILE_TEMPLATE_DATUM(WQueueOrderer); } #endif // OPENPOSE_THREAD_W_QUEUE_ORDERER_HPP