File size: 5,980 Bytes
7fc5a59 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | #ifndef OPENPOSE_THREAD_W_QUEUE_ORDERER_HPP
#define OPENPOSE_THREAD_W_QUEUE_ORDERER_HPP
#include <queue> // std::priority_queue
#include <openpose/core/common.hpp>
#include <openpose/thread/worker.hpp>
#include <openpose/utilities/pointerContainer.hpp>
namespace op
{
template<typename TDatums>
class WQueueOrderer : public Worker<TDatums>
{
public:
explicit WQueueOrderer(const unsigned int maxBufferSize = 64u);
virtual ~WQueueOrderer();
void initializationOnThread();
void work(TDatums& tDatums);
void tryStop();
private:
const unsigned int mMaxBufferSize;
bool mStopWhenEmpty;
unsigned long long mNextExpectedId;
unsigned long long mNextExpectedSubId;
std::priority_queue<TDatums, std::vector<TDatums>, PointerContainerGreater<TDatums>> mPriorityQueueBuffer;
DELETE_COPY(WQueueOrderer);
};
}
// Implementation
namespace op
{
template<typename TDatums>
WQueueOrderer<TDatums>::WQueueOrderer(const unsigned int maxBufferSize) :
mMaxBufferSize{maxBufferSize},
mStopWhenEmpty{false},
mNextExpectedId{0},
mNextExpectedSubId{0}
{
}
template<typename TDatums>
WQueueOrderer<TDatums>::~WQueueOrderer()
{
}
template<typename TDatums>
void WQueueOrderer<TDatums>::initializationOnThread()
{
}
template<typename TDatums>
void WQueueOrderer<TDatums>::work(TDatums& tDatums)
{
try
{
// Profiling speed
const auto profilerKey = Profiler::timerInit(__LINE__, __FUNCTION__, __FILE__);
bool profileSpeed = (tDatums != nullptr);
// Input TDatum -> enqueue or return it back
if (checkNoNullNorEmpty(tDatums))
{
// T* to T
auto& tDatumsNoPtr = *tDatums;
// tDatums is the next expected, update counter
if (tDatumsNoPtr[0]->id == mNextExpectedId && tDatumsNoPtr[0]->subId == mNextExpectedSubId)
{
// If single-view
if (tDatumsNoPtr[0]->subIdMax == 0)
mNextExpectedId++;
// If muilti-view system
else
{
mNextExpectedSubId++;
if (mNextExpectedSubId > tDatumsNoPtr[0]->subIdMax)
{
mNextExpectedSubId = 0;
mNextExpectedId++;
}
}
}
// Else push it to our buffered queue
else
{
// Enqueue current tDatums
mPriorityQueueBuffer.emplace(tDatums);
tDatums = nullptr;
// Else if buffer full -> remove one tDatums
if (mPriorityQueueBuffer.size() > mMaxBufferSize)
{
tDatums = mPriorityQueueBuffer.top();
mPriorityQueueBuffer.pop();
}
}
}
// If input TDatum enqueued -> check if previously enqueued next desired frame and pop it
if (!checkNoNullNorEmpty(tDatums))
{
// Retrieve frame if next is desired frame or if we want to stop this worker
if (!mPriorityQueueBuffer.empty()
&& (mStopWhenEmpty ||
((*mPriorityQueueBuffer.top())[0]->id == mNextExpectedId
&& (*mPriorityQueueBuffer.top())[0]->subId == mNextExpectedSubId)))
{
tDatums = { mPriorityQueueBuffer.top() };
mPriorityQueueBuffer.pop();
}
}
// If TDatum ready to be returned -> updated next expected id
if (checkNoNullNorEmpty(tDatums))
{
const auto& tDatumsNoPtr = *tDatums;
// If single-view
if (tDatumsNoPtr[0]->subIdMax == 0)
mNextExpectedId = tDatumsNoPtr[0]->id + 1;
// If muilti-view system
else
{
mNextExpectedSubId = tDatumsNoPtr[0]->subId + 1;
if (mNextExpectedSubId > tDatumsNoPtr[0]->subIdMax)
{
mNextExpectedSubId = 0;
mNextExpectedId = tDatumsNoPtr[0]->id + 1;
}
}
}
// Sleep if no new tDatums to either pop or push
if (!checkNoNullNorEmpty(tDatums) && mPriorityQueueBuffer.size() < mMaxBufferSize / 2u)
std::this_thread::sleep_for(std::chrono::milliseconds{1});
// If TDatum popped and/or pushed
if (profileSpeed || tDatums != nullptr)
{
// Profiling speed
Profiler::timerEnd(profilerKey);
Profiler::printAveragedTimeMsOnIterationX(profilerKey, __LINE__, __FUNCTION__, __FILE__);
// Debugging log
opLogIfDebug("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
}
}
catch (const std::exception& e)
{
this->stop();
tDatums = nullptr;
error(e.what(), __LINE__, __FUNCTION__, __FILE__);
}
}
template<typename TDatums>
void WQueueOrderer<TDatums>::tryStop()
{
try
{
// Close if all frames were retrieved from the queue
if (mPriorityQueueBuffer.empty())
this->stop();
mStopWhenEmpty = true;
}
catch (const std::exception& e)
{
error(e.what(), __LINE__, __FUNCTION__, __FILE__);
}
}
COMPILE_TEMPLATE_DATUM(WQueueOrderer);
}
#endif // OPENPOSE_THREAD_W_QUEUE_ORDERER_HPP
|