From 7983063d0566273cd8c8ebdcfe6fe499b9bc1c91 Mon Sep 17 00:00:00 2001 From: DennisOSRM Date: Fri, 6 Apr 2012 14:33:15 +0200 Subject: [PATCH] Reimplemented ConcurrentQueue with a circular buffer from boost library. Gives about 10% faster raw parsing performance. --- DataStructures/ConcurrentQueue.h | 77 ++++++++++++++------------------ DataStructures/PBFParser.h | 5 ++- 2 files changed, 37 insertions(+), 45 deletions(-) diff --git a/DataStructures/ConcurrentQueue.h b/DataStructures/ConcurrentQueue.h index 78c6604e7..9ebfc705b 100644 --- a/DataStructures/ConcurrentQueue.h +++ b/DataStructures/ConcurrentQueue.h @@ -21,77 +21,68 @@ or see http://www.gnu.org/licenses/agpl.txt. #ifndef CONCURRENTQUEUE_H_INCLUDED #define CONCURRENTQUEUE_H_INCLUDED -#include -#include +#include +#include +#include +#include +#include +#include +#include #include "../typedefs.h" -/* - Concurrent Queue written by Anthony Williams: - http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html -*/ template class ConcurrentQueue { + typedef typename boost::circular_buffer::size_type size_t; + public: - ConcurrentQueue(const size_t max_size) - : max_queue_size(max_size) { - } + ConcurrentQueue(const size_t max_size) : internal_queue(max_size) { } void push(Data const& data) { - if (size_exceeded()) { - boost::mutex::scoped_lock qf_lock(queue_full_mutex); - queue_full_cv.wait(qf_lock); - } - - boost::mutex::scoped_lock lock(queue_mutex); - internal_queue.push(data); + boost::mutex::scoped_lock lock(m_mutex); + m_not_full.wait(lock, boost::bind(&ConcurrentQueue::is_not_full, this)); + internal_queue.push_back(data); lock.unlock(); - queue_cv.notify_one(); + m_not_empty.notify_one(); } bool empty() const { return internal_queue.empty(); } + void wait_and_pop(Data& popped_value) { + boost::mutex::scoped_lock lock(m_mutex); + m_not_empty.wait(lock, boost::bind(&ConcurrentQueue::is_not_empty, this)); + popped_value=internal_queue.front(); + internal_queue.pop_front(); + lock.unlock(); + m_not_full.notify_one(); + } + bool try_pop(Data& popped_value) { - boost::mutex::scoped_lock lock(queue_mutex); + boost::mutex::scoped_lock lock(m_mutex); if(internal_queue.empty()) { return false; } - + popped_value=internal_queue.front(); - internal_queue.pop(); - queue_full_cv.notify_one(); + internal_queue.pop_front(); + lock.unlock(); + m_not_full.notify_one(); return true; } - void wait_and_pop(Data& popped_value) { - boost::mutex::scoped_lock lock(queue_mutex); - while(internal_queue.empty()) { - queue_cv.wait(lock); - } - - popped_value=internal_queue.front(); - internal_queue.pop(); - queue_full_cv.notify_one(); - } - int size() const { - return static_cast(internal_queue.size()); - } private: - std::queue internal_queue; - mutable boost::mutex queue_mutex; - mutable boost::mutex queue_full_mutex; - boost::condition_variable queue_cv; - boost::condition_variable queue_full_cv; - const size_t max_queue_size; + boost::circular_buffer internal_queue; + boost::mutex m_mutex; + boost::condition m_not_empty; + boost::condition m_not_full; - bool size_exceeded() const { - return internal_queue.size() >= max_queue_size; - } + inline bool is_not_empty() const { return internal_queue.size() > 0; } + inline bool is_not_full() const { return internal_queue.size() < internal_queue.capacity(); } }; diff --git a/DataStructures/PBFParser.h b/DataStructures/PBFParser.h index 63fcab54c..300709443 100644 --- a/DataStructures/PBFParser.h +++ b/DataStructures/PBFParser.h @@ -62,9 +62,10 @@ class PBFParser : public BaseParser<_Node, _RawRestrictionContainer, _Way> { }; public: - PBFParser(const char * fileName) { /* Max 25 items in queue */ + PBFParser(const char * fileName) { GOOGLE_PROTOBUF_VERIFY_VERSION; - threadDataQueue.reset( new ConcurrentQueue<_ThreadData*>(250) ); + //TODO: What is the bottleneck here? Filling the queue or reading the stuff from disk? + threadDataQueue.reset( new ConcurrentQueue<_ThreadData*>(2500) ); /* Max 2500 items in queue, hardcoded. */ input.open(fileName, std::ios::in | std::ios::binary); if (!input) {