#ifndef OPENPOSE_THREAD_QUEUE_BASE_HPP #define OPENPOSE_THREAD_QUEUE_BASE_HPP #include #include #include // std::queue & std::priority_queue #include namespace op { template class QueueBase { public: explicit QueueBase(const long long maxSize = -1); virtual ~QueueBase(); bool forceEmplace(TDatums& tDatums); bool tryEmplace(TDatums& tDatums); bool waitAndEmplace(TDatums& tDatums); bool forcePush(const TDatums& tDatums); bool tryPush(const TDatums& tDatums); bool waitAndPush(const TDatums& tDatums); bool tryPop(TDatums& tDatums); bool tryPop(); bool waitAndPop(TDatums& tDatums); bool waitAndPop(); bool empty() const; void stop(); void stopPusher(); void addPopper(); void addPusher(); bool isRunning() const; bool isFull() const; size_t size() const; void clear(); virtual TDatums front() const = 0; protected: mutable std::mutex mMutex; long long mPoppers; long long mPushers; long long mMaxPoppersPushers; bool mPopIsStopped; bool mPushIsStopped; std::condition_variable mConditionVariable; TQueue mTQueue; virtual bool pop(TDatums& tDatums) = 0; unsigned long long getMaxSize() const; private: const long long mMaxSize; bool emplace(TDatums& tDatums); bool push(const TDatums& tDatums); bool pop(); void updateMaxPoppersPushers(); DELETE_COPY(QueueBase); }; } // Implementation #include #include namespace op { template QueueBase::QueueBase(const long long maxSize) : mPoppers{0ll}, mPushers{0ll}, mPopIsStopped{false}, mPushIsStopped{false}, mMaxSize{maxSize} { } // Virtual destructor template QueueBase::~QueueBase() { try { opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__); stop(); opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__); } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); } } template bool QueueBase::forceEmplace(TDatums& tDatums) { try { const std::lock_guard lock{mMutex}; if (mTQueue.size() >= getMaxSize()) mTQueue.pop(); return emplace(tDatums); } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); return false; } } template bool QueueBase::tryEmplace(TDatums& tDatums) { try { const std::lock_guard lock{mMutex}; if (mTQueue.size() >= getMaxSize()) return false; return emplace(tDatums); } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); return false; } } template bool QueueBase::waitAndEmplace(TDatums& tDatums) { try { std::unique_lock lock{mMutex}; mConditionVariable.wait(lock, [this]{return mTQueue.size() < getMaxSize() || mPushIsStopped; }); return emplace(tDatums); } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); return false; } } template bool QueueBase::forcePush(const TDatums& tDatums) { try { const std::lock_guard lock{mMutex}; if (mTQueue.size() >= getMaxSize()) mTQueue.pop(); return push(tDatums); } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); return false; } } template bool QueueBase::tryPush(const TDatums& tDatums) { try { const std::lock_guard lock{mMutex}; if (mTQueue.size() >= getMaxSize()) return false; return push(tDatums); } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); return false; } } template bool QueueBase::waitAndPush(const TDatums& tDatums) { try { std::unique_lock lock{mMutex}; mConditionVariable.wait(lock, [this]{return mTQueue.size() < getMaxSize() || mPushIsStopped; }); return push(tDatums); } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); return false; } } template bool QueueBase::tryPop(TDatums& tDatums) { try { const std::lock_guard lock{mMutex}; return pop(tDatums); } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); return false; } } template bool QueueBase::tryPop() { try { const std::lock_guard lock{mMutex}; return pop(); } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); return false; } } template bool QueueBase::waitAndPop(TDatums& tDatums) { try { std::unique_lock lock{mMutex}; mConditionVariable.wait(lock, [this]{return !mTQueue.empty() || mPopIsStopped; }); return pop(tDatums); } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); return false; } } template bool QueueBase::waitAndPop() { try { std::unique_lock lock{mMutex}; mConditionVariable.wait(lock, [this]{return !mTQueue.empty() || mPopIsStopped; }); return pop(); } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); return false; } } template bool QueueBase::empty() const { try { const std::lock_guard lock{mMutex}; return mTQueue.empty(); } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); return false; } } template void QueueBase::stop() { try { opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__); const std::lock_guard lock{mMutex}; mPopIsStopped = {true}; mPushIsStopped = {true}; while (!mTQueue.empty()) mTQueue.pop(); mConditionVariable.notify_all(); } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); } } template void QueueBase::stopPusher() { try { opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__); const std::lock_guard lock{mMutex}; mPushers--; if (mPushers == 0) { mPushIsStopped = {true}; if (mTQueue.empty()) mPopIsStopped = {true}; mConditionVariable.notify_all(); } } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); } } template void QueueBase::addPopper() { try { opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__); const std::lock_guard lock{mMutex}; mPoppers++; updateMaxPoppersPushers(); } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); } } template void QueueBase::addPusher() { try { opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__); const std::lock_guard lock{mMutex}; mPushers++; updateMaxPoppersPushers(); } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); } } template bool QueueBase::isRunning() const { try { const std::lock_guard lock{mMutex}; return !(mPushIsStopped && (mPopIsStopped || mTQueue.empty())); } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); return true; } } template bool QueueBase::isFull() const { try { // No mutex required because the size() and getMaxSize() are already thread-safe return size() == getMaxSize(); } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); return false; } } template size_t QueueBase::size() const { try { const std::lock_guard lock{mMutex}; return mTQueue.size(); } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); return 0; } } template void QueueBase::clear() { try { const std::lock_guard lock{mMutex}; while (!mTQueue.empty()) mTQueue.pop(); } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); } } template unsigned long long QueueBase::getMaxSize() const { try { return (mMaxSize > 0 ? mMaxSize : fastMax(1ll, mMaxPoppersPushers)); } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); return false; } } template bool QueueBase::emplace(TDatums& tDatums) { try { if (mPushIsStopped) return false; mTQueue.emplace(tDatums); mConditionVariable.notify_all(); return true; } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); return false; } } template bool QueueBase::push(const TDatums& tDatums) { try { if (mPushIsStopped) return false; mTQueue.push(tDatums); mConditionVariable.notify_all(); return true; } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); return false; } } template bool QueueBase::pop() { try { if (mPopIsStopped || mTQueue.empty()) return false; mTQueue.pop(); mConditionVariable.notify_all(); return true; } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); return false; } } template void QueueBase::updateMaxPoppersPushers() { try { mMaxPoppersPushers = fastMax(mPoppers, mPushers); } catch (const std::exception& e) { error(e.what(), __LINE__, __FUNCTION__, __FILE__); } } extern template class QueueBase>; extern template class QueueBase< BASE_DATUMS_SH, std::priority_queue, std::greater>>; } #endif // OPENPOSE_THREAD_QUEUE_BASE_HPP