Reimplemented ConcurrentQueue with a circular buffer from boost library.
Gives about 10% faster raw parsing performance.
This commit is contained in:
		
							parent
							
								
									e412366bb2
								
							
						
					
					
						commit
						7983063d05
					
				| @ -21,77 +21,68 @@ or see http://www.gnu.org/licenses/agpl.txt. | ||||
| #ifndef CONCURRENTQUEUE_H_INCLUDED | ||||
| #define CONCURRENTQUEUE_H_INCLUDED | ||||
| 
 | ||||
| #include <queue> | ||||
| #include <boost/signals2/mutex.hpp> | ||||
| #include <boost/circular_buffer.hpp> | ||||
| #include <boost/thread/mutex.hpp> | ||||
| #include <boost/thread/condition.hpp> | ||||
| #include <boost/thread/thread.hpp> | ||||
| #include <boost/call_traits.hpp> | ||||
| #include <boost/progress.hpp> | ||||
| #include <boost/bind.hpp> | ||||
| 
 | ||||
| #include "../typedefs.h" | ||||
| 
 | ||||
| /* 
 | ||||
|    Concurrent Queue written by Anthony Williams:  | ||||
|    http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html  
 | ||||
| */ | ||||
| template<typename Data> | ||||
| class ConcurrentQueue { | ||||
| 
 | ||||
|     typedef typename boost::circular_buffer<Data>::size_type size_t; | ||||
| 
 | ||||
| public: | ||||
|     ConcurrentQueue(const size_t max_size)  | ||||
|         : max_queue_size(max_size) { | ||||
|     } | ||||
|     ConcurrentQueue(const size_t max_size) : internal_queue(max_size) { } | ||||
| 
 | ||||
|     void push(Data const& data) { | ||||
|         if (size_exceeded()) { | ||||
|             boost::mutex::scoped_lock qf_lock(queue_full_mutex); | ||||
|             queue_full_cv.wait(qf_lock); | ||||
|         } | ||||
| 
 | ||||
|         boost::mutex::scoped_lock lock(queue_mutex); | ||||
|         internal_queue.push(data); | ||||
|         boost::mutex::scoped_lock lock(m_mutex); | ||||
|         m_not_full.wait(lock, boost::bind(&ConcurrentQueue<Data>::is_not_full, this)); | ||||
|         internal_queue.push_back(data); | ||||
|         lock.unlock(); | ||||
|         queue_cv.notify_one(); | ||||
|         m_not_empty.notify_one(); | ||||
|     } | ||||
| 
 | ||||
|     bool empty() const { | ||||
|         return internal_queue.empty(); | ||||
|     } | ||||
| 
 | ||||
|     void wait_and_pop(Data& popped_value) { | ||||
|         boost::mutex::scoped_lock lock(m_mutex); | ||||
|         m_not_empty.wait(lock, boost::bind(&ConcurrentQueue<Data>::is_not_empty, this)); | ||||
|         popped_value=internal_queue.front(); | ||||
|         internal_queue.pop_front(); | ||||
|         lock.unlock(); | ||||
|         m_not_full.notify_one(); | ||||
|     } | ||||
| 
 | ||||
|     bool try_pop(Data& popped_value) { | ||||
|         boost::mutex::scoped_lock lock(queue_mutex); | ||||
|         boost::mutex::scoped_lock lock(m_mutex); | ||||
|         if(internal_queue.empty()) { | ||||
|             return false; | ||||
|         } | ||||
|          | ||||
| 
 | ||||
|         popped_value=internal_queue.front(); | ||||
|         internal_queue.pop(); | ||||
|         queue_full_cv.notify_one(); | ||||
|         internal_queue.pop_front(); | ||||
|         lock.unlock(); | ||||
|         m_not_full.notify_one(); | ||||
|         return true; | ||||
|     } | ||||
| 
 | ||||
|     void wait_and_pop(Data& popped_value) { | ||||
|         boost::mutex::scoped_lock lock(queue_mutex); | ||||
|         while(internal_queue.empty()) { | ||||
|             queue_cv.wait(lock); | ||||
|         } | ||||
|          | ||||
|         popped_value=internal_queue.front(); | ||||
|         internal_queue.pop(); | ||||
|         queue_full_cv.notify_one(); | ||||
|     } | ||||
| 
 | ||||
|     int size() const { | ||||
|         return static_cast<int>(internal_queue.size()); | ||||
|     } | ||||
| 
 | ||||
| private: | ||||
|     std::queue<Data> internal_queue; | ||||
|     mutable boost::mutex queue_mutex; | ||||
|     mutable boost::mutex queue_full_mutex; | ||||
|     boost::condition_variable queue_cv; | ||||
|     boost::condition_variable queue_full_cv; | ||||
|     const size_t max_queue_size; | ||||
|     boost::circular_buffer<Data> internal_queue; | ||||
|     boost::mutex m_mutex; | ||||
|     boost::condition m_not_empty; | ||||
|     boost::condition m_not_full; | ||||
| 
 | ||||
|     bool size_exceeded() const { | ||||
|         return internal_queue.size() >= max_queue_size; | ||||
|     } | ||||
|     inline bool is_not_empty() const { return internal_queue.size() > 0; } | ||||
|     inline bool is_not_full() const { return internal_queue.size() < internal_queue.capacity(); } | ||||
| 
 | ||||
| }; | ||||
| 
 | ||||
|  | ||||
| @ -62,9 +62,10 @@ class PBFParser : public BaseParser<_Node, _RawRestrictionContainer, _Way> { | ||||
|     }; | ||||
| 
 | ||||
| public: | ||||
|     PBFParser(const char * fileName) { /* Max 25 items in queue */ | ||||
|     PBFParser(const char * fileName) { | ||||
|         GOOGLE_PROTOBUF_VERIFY_VERSION; | ||||
|         threadDataQueue.reset( new ConcurrentQueue<_ThreadData*>(250) ); | ||||
|         //TODO: What is the bottleneck here? Filling the queue or reading the stuff from disk?
 | ||||
|         threadDataQueue.reset( new ConcurrentQueue<_ThreadData*>(2500) ); /* Max 2500 items in queue, hardcoded. */ | ||||
|         input.open(fileName, std::ios::in | std::ios::binary); | ||||
| 
 | ||||
|         if (!input) { | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user