From 2a6275cf38d49c99dc5eb5232917078da5f014bc Mon Sep 17 00:00:00 2001 From: Sasa Ivetic Date: Fri, 14 Oct 2011 09:18:57 -0500 Subject: [PATCH 1/2] Split PBF Parser into a single Read and a single Parse thread to increase parsing speed. --- DataStructures/PBFParser.h | 219 +++++++++++++++++++++++++------------ 1 file changed, 149 insertions(+), 70 deletions(-) diff --git a/DataStructures/PBFParser.h b/DataStructures/PBFParser.h index 09d11d053..99729ada0 100644 --- a/DataStructures/PBFParser.h +++ b/DataStructures/PBFParser.h @@ -32,6 +32,84 @@ or see http://www.gnu.org/licenses/agpl.txt. #include "ExtractorStructs.h" +/* + Concurrent Queue written by Anthony Williams: + http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html +*/ +template +class concurrent_queue +{ +private: + std::queue internal_queue; + mutable boost::mutex queue_mutex; + mutable boost::mutex queue_full_mutex; + boost::condition_variable queue_cv; + boost::condition_variable queue_full_cv; + const size_t max_queue_size; + + bool size_exceeded() const { + boost::mutex::scoped_lock lock(queue_mutex); + return internal_queue.size() >= max_queue_size; + } + +public: + concurrent_queue(const size_t max_size) + : max_queue_size(max_size) { + } + + void push(Data const& data) + { + if (size_exceeded()) { + boost::mutex::scoped_lock qf_lock(queue_full_mutex); + queue_full_cv.wait(qf_lock); + } + + boost::mutex::scoped_lock lock(queue_mutex); + internal_queue.push(data); + lock.unlock(); + queue_cv.notify_one(); + } + + bool empty() const + { + boost::mutex::scoped_lock lock(queue_mutex); + return internal_queue.empty(); + } + + bool try_pop(Data& popped_value) + { + boost::mutex::scoped_lock lock(queue_mutex); + if(internal_queue.empty()) + { + return false; + } + + popped_value=internal_queue.front(); + internal_queue.pop(); + queue_full_cv.notify_one(); + return true; + } + + void wait_and_pop(Data& popped_value) + { + boost::mutex::scoped_lock lock(queue_mutex); + while(internal_queue.empty()) + { + queue_cv.wait(lock); + } + + popped_value=internal_queue.front(); + internal_queue.pop(); + queue_full_cv.notify_one(); + } + + int size() const { + boost::mutex::scoped_lock lock(queue_mutex); + return static_cast(internal_queue.size()); + } +}; + + class PBFParser : public BaseParser<_Node, _RawRestrictionContainer, _Way> { enum EntityType { @@ -61,9 +139,9 @@ class PBFParser : public BaseParser<_Node, _RawRestrictionContainer, _Way> { }; public: - PBFParser(const char * fileName) { + PBFParser(const char * fileName) + : threadDataQueue( new concurrent_queue<_ThreadData*>(25) ) { /* Max 25 items in queue */ GOOGLE_PROTOBUF_VERIFY_VERSION; - omp_set_num_threads(1); input.open(fileName, std::ios::in | std::ios::binary); if (!input) { @@ -86,10 +164,12 @@ public: if(input.is_open()) input.close(); - unsigned maxThreads = omp_get_max_threads(); - for ( unsigned threadNum = 0; threadNum < maxThreads; ++threadNum ) { - delete threadDataVector[threadNum]; + // Clean up any leftover ThreadData objects in the queue + _ThreadData* td; + while (threadDataQueue->try_pop(td)) { + delete td; } + delete threadDataQueue; google::protobuf::ShutdownProtobufLibrary(); @@ -100,12 +180,6 @@ public: } bool Init() { - /** Init Vector with ThreadData Objects */ - unsigned maxThreads = omp_get_max_threads(); - for ( unsigned threadNum = 0; threadNum < maxThreads; ++threadNum ) { - threadDataVector.push_back( new _ThreadData( ) ); - } - _ThreadData initData; /** read Header */ if(!readPBFBlobHeader(input, &initData)) { @@ -137,36 +211,58 @@ public: return true; } - bool Parse() { -#pragma omp parallel - { - _ThreadData * threadData = threadDataVector[omp_get_thread_num()]; - //parse through all Blocks - bool keepRunning = true; - // while(readNextBlock(input)) { - do{ -#pragma omp critical - { - keepRunning = readNextBlock(input, threadData); - } - if(keepRunning) { - loadBlock(threadData); - for(int i = 0; i < threadData->PBFprimitiveBlock.primitivegroup_size(); i++) { - threadData->currentGroupID = i; - loadGroup(threadData); + void ReadData() { + bool keepRunning = true; + do { + _ThreadData *threadData = new _ThreadData(); + keepRunning = readNextBlock(input, threadData); - if(threadData->entityTypeIndicator == TypeNode) - parseNode(threadData); - if(threadData->entityTypeIndicator == TypeWay) - parseWay(threadData); - if(threadData->entityTypeIndicator == TypeRelation) - parseRelation(threadData); - if(threadData->entityTypeIndicator == TypeDenseNode) - parseDenseNode(threadData); - } - } - }while(keepRunning); + if (keepRunning) + threadDataQueue->push(threadData); + else + threadDataQueue->push(NULL); // No more data to read, parse stops when NULL encountered + } while(keepRunning); + } + + void ParseData() { + while (1) { + _ThreadData *threadData; + threadDataQueue->wait_and_pop(threadData); + if (threadData == NULL) { + cout << "Parse Data Thread Finished" << endl; + threadDataQueue->push(NULL); // Signal end of data for other threads + break; + } + + loadBlock(threadData); + for(int i = 0; i < threadData->PBFprimitiveBlock.primitivegroup_size(); i++) { + threadData->currentGroupID = i; + loadGroup(threadData); + + if(threadData->entityTypeIndicator == TypeNode) + parseNode(threadData); + if(threadData->entityTypeIndicator == TypeWay) + parseWay(threadData); + if(threadData->entityTypeIndicator == TypeRelation) + parseRelation(threadData); + if(threadData->entityTypeIndicator == TypeDenseNode) + parseDenseNode(threadData); + } + + delete threadData; + threadData = NULL; } + } + + bool Parse() { + // Start the read and parse threads + boost::thread readThread(boost::bind(&PBFParser::ReadData, this)); + boost::thread parseThread(boost::bind(&PBFParser::ParseData, this)); + + // Wait for the threads to finish + readThread.join(); + parseThread.join(); + return true; } @@ -200,27 +296,18 @@ private: keyVals.Add(key, value); denseTagIndex += 2; } -#pragma omp critical - { - if(!(*addressCallback)(n, keyVals)) - std::cerr << "[PBFParser] adress not parsed" << std::endl; - } - -#pragma omp critical - { - if(!(*nodeCallback)(n)) - std::cerr << "[PBFParser] dense node not parsed" << std::endl; - } + if(!(*addressCallback)(n, keyVals)) + std::cerr << "[PBFParser] adress not parsed" << std::endl; + + if(!(*nodeCallback)(n)) + std::cerr << "[PBFParser] dense node not parsed" << std::endl; } } void parseNode(_ThreadData * threadData) { _Node n; -#pragma omp critical - { - if(!(*nodeCallback)(n)) - std::cerr << "[PBFParser] simple node not parsed" << std::endl; - } + if(!(*nodeCallback)(n)) + std::cerr << "[PBFParser] simple node not parsed" << std::endl; } void parseRelation(_ThreadData * threadData) { @@ -291,11 +378,8 @@ private: // cout << "node " << currentRestriction.viaNode; // cout << " to " << currentRestriction.to << endl; // } -#pragma omp critical - { - if(!(*restrictionCallback)(currentRestrictionContainer)) - std::cerr << "[PBFParser] relation not parsed" << std::endl; - } + if(!(*restrictionCallback)(currentRestrictionContainer)) + std::cerr << "[PBFParser] relation not parsed" << std::endl; } } } @@ -317,18 +401,15 @@ private: const std::string val = threadData->PBFprimitiveBlock.stringtable().s(inputWay.vals(i)); w.keyVals.Add(key, val); } -#pragma omp critical - { - if(!(*wayCallback)(w)) { - std::cerr << "[PBFParser] way not parsed" << std::endl; - } + + if(!(*wayCallback)(w)) { + std::cerr << "[PBFParser] way not parsed" << std::endl; } } } } void loadGroup(_ThreadData * threadData) { -#pragma omp atomic groupCount++; const OSMPBF::PrimitiveGroup& group = threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ); @@ -350,7 +431,6 @@ private: } void loadBlock(_ThreadData * threadData) { -#pragma omp critical blockCount++; threadData->currentGroupID = 0; threadData->currentEntityID = 0; @@ -522,9 +602,8 @@ private: /* the input stream to parse */ std::fstream input; - /* ThreadData Array */ - std::vector < _ThreadData* > threadDataVector; - + /* ThreadData Queue */ + concurrent_queue < _ThreadData* >* threadDataQueue; }; #endif /* PBFPARSER_H_ */ From 63e8793c7f641ddeee53496925b2e9dc0ab2a233 Mon Sep 17 00:00:00 2001 From: Sasa Ivetic Date: Fri, 14 Oct 2011 12:02:40 -0500 Subject: [PATCH 2/2] Moved Concurrent Queue class into its own file --- DataStructures/ConcurrentQueue.h | 103 +++++++++++++++++++++++++++++++ DataStructures/PBFParser.h | 80 +----------------------- vcprojects/extractor.vcproj | 4 ++ 3 files changed, 108 insertions(+), 79 deletions(-) create mode 100644 DataStructures/ConcurrentQueue.h diff --git a/DataStructures/ConcurrentQueue.h b/DataStructures/ConcurrentQueue.h new file mode 100644 index 000000000..dd6c84be0 --- /dev/null +++ b/DataStructures/ConcurrentQueue.h @@ -0,0 +1,103 @@ +/* + open source routing machine + Copyright (C) Dennis Luxen, others 2010 + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU AFFERO General Public License as published by +the Free Software Foundation; either version 3 of the License, or +any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +or see http://www.gnu.org/licenses/agpl.txt. + */ + +#ifndef CONCURRENTQUEUE_H_INCLUDED +#define CONCURRENTQUEUE_H_INCLUDED + +#include "typedefs.h" + +/* + Concurrent Queue written by Anthony Williams: + http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html +*/ +template +class concurrent_queue +{ +private: + std::queue internal_queue; + mutable boost::mutex queue_mutex; + mutable boost::mutex queue_full_mutex; + boost::condition_variable queue_cv; + boost::condition_variable queue_full_cv; + const size_t max_queue_size; + + bool size_exceeded() const { + boost::mutex::scoped_lock lock(queue_mutex); + return internal_queue.size() >= max_queue_size; + } + +public: + concurrent_queue(const size_t max_size) + : max_queue_size(max_size) { + } + + void push(Data const& data) + { + if (size_exceeded()) { + boost::mutex::scoped_lock qf_lock(queue_full_mutex); + queue_full_cv.wait(qf_lock); + } + + boost::mutex::scoped_lock lock(queue_mutex); + internal_queue.push(data); + lock.unlock(); + queue_cv.notify_one(); + } + + bool empty() const + { + boost::mutex::scoped_lock lock(queue_mutex); + return internal_queue.empty(); + } + + bool try_pop(Data& popped_value) + { + boost::mutex::scoped_lock lock(queue_mutex); + if(internal_queue.empty()) + { + return false; + } + + popped_value=internal_queue.front(); + internal_queue.pop(); + queue_full_cv.notify_one(); + return true; + } + + void wait_and_pop(Data& popped_value) + { + boost::mutex::scoped_lock lock(queue_mutex); + while(internal_queue.empty()) + { + queue_cv.wait(lock); + } + + popped_value=internal_queue.front(); + internal_queue.pop(); + queue_full_cv.notify_one(); + } + + int size() const { + boost::mutex::scoped_lock lock(queue_mutex); + return static_cast(internal_queue.size()); + } +}; + +#endif //#ifndef CONCURRENTQUEUE_H_INCLUDED \ No newline at end of file diff --git a/DataStructures/PBFParser.h b/DataStructures/PBFParser.h index 99729ada0..ee48885c0 100644 --- a/DataStructures/PBFParser.h +++ b/DataStructures/PBFParser.h @@ -30,85 +30,7 @@ or see http://www.gnu.org/licenses/agpl.txt. #include "../typedefs.h" #include "HashTable.h" #include "ExtractorStructs.h" - - -/* - Concurrent Queue written by Anthony Williams: - http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html -*/ -template -class concurrent_queue -{ -private: - std::queue internal_queue; - mutable boost::mutex queue_mutex; - mutable boost::mutex queue_full_mutex; - boost::condition_variable queue_cv; - boost::condition_variable queue_full_cv; - const size_t max_queue_size; - - bool size_exceeded() const { - boost::mutex::scoped_lock lock(queue_mutex); - return internal_queue.size() >= max_queue_size; - } - -public: - concurrent_queue(const size_t max_size) - : max_queue_size(max_size) { - } - - void push(Data const& data) - { - if (size_exceeded()) { - boost::mutex::scoped_lock qf_lock(queue_full_mutex); - queue_full_cv.wait(qf_lock); - } - - boost::mutex::scoped_lock lock(queue_mutex); - internal_queue.push(data); - lock.unlock(); - queue_cv.notify_one(); - } - - bool empty() const - { - boost::mutex::scoped_lock lock(queue_mutex); - return internal_queue.empty(); - } - - bool try_pop(Data& popped_value) - { - boost::mutex::scoped_lock lock(queue_mutex); - if(internal_queue.empty()) - { - return false; - } - - popped_value=internal_queue.front(); - internal_queue.pop(); - queue_full_cv.notify_one(); - return true; - } - - void wait_and_pop(Data& popped_value) - { - boost::mutex::scoped_lock lock(queue_mutex); - while(internal_queue.empty()) - { - queue_cv.wait(lock); - } - - popped_value=internal_queue.front(); - internal_queue.pop(); - queue_full_cv.notify_one(); - } - - int size() const { - boost::mutex::scoped_lock lock(queue_mutex); - return static_cast(internal_queue.size()); - } -}; - +#include "ConcurrentQueue.h" class PBFParser : public BaseParser<_Node, _RawRestrictionContainer, _Way> { diff --git a/vcprojects/extractor.vcproj b/vcprojects/extractor.vcproj index 6c7600b75..57b9eb2fc 100644 --- a/vcprojects/extractor.vcproj +++ b/vcprojects/extractor.vcproj @@ -250,6 +250,10 @@ RelativePath="..\DataStructures\BinaryHeap.h" > + +