Refactoring internal style of ConcurrentQueue

This commit is contained in:
Dennis Luxen 2013-08-16 17:09:04 +02:00
parent 161487d6c2
commit aba078a9d8

View File

@ -18,8 +18,10 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
or see http://www.gnu.org/licenses/agpl.txt. or see http://www.gnu.org/licenses/agpl.txt.
*/ */
#ifndef CONCURRENTQUEUE_H_INCLUDED #ifndef CONCURRENTQUEUE_H_
#define CONCURRENTQUEUE_H_INCLUDED #define CONCURRENTQUEUE_H_
#include "../typedefs.h"
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <boost/circular_buffer.hpp> #include <boost/circular_buffer.hpp>
@ -27,57 +29,64 @@ or see http://www.gnu.org/licenses/agpl.txt.
#include <boost/thread/mutex.hpp> #include <boost/thread/mutex.hpp>
#include <boost/thread/thread.hpp> #include <boost/thread/thread.hpp>
#include "../typedefs.h"
template<typename Data> template<typename Data>
class ConcurrentQueue { class ConcurrentQueue {
typedef typename boost::circular_buffer<Data>::size_type size_t;
public: public:
ConcurrentQueue(const size_t max_size) : internal_queue(max_size) { } ConcurrentQueue(const size_t max_size) : m_internal_queue(max_size) { }
inline void push(Data const& data) { inline void push(const Data & data) {
boost::mutex::scoped_lock lock(m_mutex); boost::mutex::scoped_lock lock(m_mutex);
m_not_full.wait(lock, boost::bind(&ConcurrentQueue<Data>::is_not_full, this)); m_not_full.wait(
internal_queue.push_back(data); lock,
boost::bind(&ConcurrentQueue<Data>::is_not_full, this)
);
m_internal_queue.push_back(data);
lock.unlock(); lock.unlock();
m_not_empty.notify_one(); m_not_empty.notify_one();
} }
inline bool empty() const { inline bool empty() const {
return internal_queue.empty(); return m_internal_queue.empty();
} }
inline void wait_and_pop(Data& popped_value) { inline void wait_and_pop(Data & popped_value) {
boost::mutex::scoped_lock lock(m_mutex); boost::mutex::scoped_lock lock(m_mutex);
m_not_empty.wait(lock, boost::bind(&ConcurrentQueue<Data>::is_not_empty, this)); m_not_empty.wait(
popped_value=internal_queue.front(); lock,
internal_queue.pop_front(); boost::bind(&ConcurrentQueue<Data>::is_not_empty, this)
);
popped_value = m_internal_queue.front();
m_internal_queue.pop_front();
lock.unlock(); lock.unlock();
m_not_full.notify_one(); m_not_full.notify_one();
} }
inline bool try_pop(Data& popped_value) { inline bool try_pop(Data& popped_value) {
boost::mutex::scoped_lock lock(m_mutex); boost::mutex::scoped_lock lock(m_mutex);
if(internal_queue.empty()) { if(m_internal_queue.empty()) {
return false; return false;
} }
popped_value=internal_queue.front(); popped_value=m_internal_queue.front();
internal_queue.pop_front(); m_internal_queue.pop_front();
lock.unlock(); lock.unlock();
m_not_full.notify_one(); m_not_full.notify_one();
return true; return true;
} }
private: private:
boost::circular_buffer<Data> internal_queue; inline bool is_not_empty() const {
boost::mutex m_mutex; return !m_internal_queue.empty();
boost::condition m_not_empty; }
boost::condition m_not_full;
inline bool is_not_empty() const { return internal_queue.size() > 0; } inline bool is_not_full() const {
inline bool is_not_full() const { return internal_queue.size() < internal_queue.capacity(); } return m_internal_queue.size() < m_internal_queue.capacity();
}
boost::circular_buffer<Data> m_internal_queue;
boost::mutex m_mutex;
boost::condition m_not_empty;
boost::condition m_not_full;
}; };
#endif //#ifndef CONCURRENTQUEUE_H_INCLUDED #endif /* CONCURRENTQUEUE_H_ */