From 94657a3258e58e43fe920acf336d409aa0aa2bc6 Mon Sep 17 00:00:00 2001 From: DennisOSRM Date: Mon, 19 Nov 2012 19:04:59 +0100 Subject: [PATCH] Implements a shared-memory parallel LUA scripting engine using OpenMP. See issue #506 --- DataStructures/ConcurrentQueue.h | 8 +- Extractor/BaseParser.h | 10 ++- Extractor/ExtractionHelperFunctions.h | 4 + Extractor/PBFParser.h | 72 +++++++++------ Extractor/ScriptingEnvironment.cpp | 122 ++++++++++++++++++++++++++ Extractor/ScriptingEnvironment.h | 48 ++++++++++ Extractor/XMLParser.h | 5 +- extractor.cpp | 89 ++----------------- 8 files changed, 240 insertions(+), 118 deletions(-) create mode 100644 Extractor/ScriptingEnvironment.cpp create mode 100644 Extractor/ScriptingEnvironment.h diff --git a/DataStructures/ConcurrentQueue.h b/DataStructures/ConcurrentQueue.h index abcbc5643..0466ab2fa 100644 --- a/DataStructures/ConcurrentQueue.h +++ b/DataStructures/ConcurrentQueue.h @@ -37,7 +37,7 @@ class ConcurrentQueue { public: ConcurrentQueue(const size_t max_size) : internal_queue(max_size) { } - void push(Data const& data) { + inline void push(Data const& data) { boost::mutex::scoped_lock lock(m_mutex); m_not_full.wait(lock, boost::bind(&ConcurrentQueue::is_not_full, this)); internal_queue.push_back(data); @@ -45,11 +45,11 @@ public: m_not_empty.notify_one(); } - bool empty() const { + inline bool empty() const { return internal_queue.empty(); } - void wait_and_pop(Data& popped_value) { + inline void wait_and_pop(Data& popped_value) { boost::mutex::scoped_lock lock(m_mutex); m_not_empty.wait(lock, boost::bind(&ConcurrentQueue::is_not_empty, this)); popped_value=internal_queue.front(); @@ -58,7 +58,7 @@ public: m_not_full.notify_one(); } - bool try_pop(Data& popped_value) { + inline bool try_pop(Data& popped_value) { boost::mutex::scoped_lock lock(m_mutex); if(internal_queue.empty()) { return false; diff --git a/Extractor/BaseParser.h b/Extractor/BaseParser.h index 8f0054fd3..f46b19fea 100644 --- a/Extractor/BaseParser.h +++ b/Extractor/BaseParser.h @@ -21,7 +21,13 @@ or see http://www.gnu.org/licenses/agpl.txt. #ifndef BASEPARSER_H_ #define BASEPARSER_H_ -#include +extern "C" { +#include +#include +#include +} + +#include "ScriptingEnvironment.h" template class BaseParser { @@ -29,7 +35,7 @@ public: virtual ~BaseParser() {} virtual bool Init() = 0; virtual void RegisterCallbacks(ExternalMemoryT * externalMemory) = 0; - virtual void RegisterLUAState(lua_State *myLuaState) = 0; + virtual void RegisterScriptingEnvironment(ScriptingEnvironment & _se) = 0; virtual bool Parse() = 0; void report_errors(lua_State *L, int status) { diff --git a/Extractor/ExtractionHelperFunctions.h b/Extractor/ExtractionHelperFunctions.h index 39ec3616a..14f3a6ec7 100644 --- a/Extractor/ExtractionHelperFunctions.h +++ b/Extractor/ExtractionHelperFunctions.h @@ -23,6 +23,10 @@ or see http://www.gnu.org/licenses/agpl.txt. #ifndef EXTRACTIONHELPERFUNCTIONS_H_ #define EXTRACTIONHELPERFUNCTIONS_H_ +#include +#include +#include + #include //TODO: Move into LUA diff --git a/Extractor/PBFParser.h b/Extractor/PBFParser.h index 8ae05e040..24f80c924 100644 --- a/Extractor/PBFParser.h +++ b/Extractor/PBFParser.h @@ -34,8 +34,10 @@ or see http://www.gnu.org/licenses/agpl.txt. #include "BaseParser.h" #include "ExtractorCallbacks.h" #include "ExtractorStructs.h" +#include "ScriptingEnvironment.h" #include "../DataStructures/HashTable.h" #include "../DataStructures/ConcurrentQueue.h" +#include "../Util/OpenMPWrapper.h" class PBFParser : public BaseParser { @@ -68,9 +70,10 @@ class PBFParser : public BaseParser >( 2500 ); /* Max 2500 items in queue, hardcoded. */ input.open(fileName, std::ios::in | std::ios::binary); @@ -88,8 +91,9 @@ public: externalMemory = em; } - void RegisterLUAState(lua_State *ml) { - myLuaState = ml; + //call by value, but who cares. It is done once. + void RegisterScriptingEnvironment(ScriptingEnvironment & _se) { + scriptingEnvironment = _se; } ~PBFParser() { @@ -210,8 +214,7 @@ private: int m_lastDenseLongitude = 0; ImportNode n; - - //TODO: abbruchschwellenwert initialisieren + std::vector nodesToParse; for(int i = 0, idSize = dense.id_size(); i < idSize; ++i) { n.Clear(); m_lastDenseID += dense.id( i ); @@ -232,25 +235,34 @@ private: n.keyVals.Add(key, value); denseTagIndex += 2; } + nodesToParse.push_back(n); + } - //TODO: store in container and process in bulk + unsigned endi_nodes = nodesToParse.size(); +#pragma omp parallel for schedule ( guided ) + for(unsigned i = 0; i < endi_nodes; ++i) { + ImportNode &n = nodesToParse[i]; /** Pass the unpacked node to the LUA call back **/ try { luabind::call_function( - myLuaState, + scriptingEnvironment.getLuaStateForThreadID(omp_get_thread_num()), "node_function", boost::ref(n) ); - if(!externalMemory->nodeFunction(n)) - std::cerr << "[PBFParser] dense node not parsed" << std::endl; } catch (const luabind::error &er) { - cerr << er.what() << endl; lua_State* Ler=er.state(); report_errors(Ler, -1); + ERR(er.what()); } - catch (...) { - ERR("Unknown error occurred during PBF dense node parsing!"); - } +// catch (...) { +// ERR("Unknown error occurred during PBF dense node parsing!"); +// } + } + + + BOOST_FOREACH(ImportNode &n, nodesToParse) { + if(!externalMemory->nodeFunction(n)) + std::cerr << "[PBFParser] dense node not parsed" << std::endl; } } @@ -338,9 +350,9 @@ private: } inline void parseWay(_ThreadData * threadData) { - if( threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ).ways_size() > 0) { _Way w; - for(int i = 0; i < threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ).ways_size(); ++i) { + std::vector<_Way> waysToParse(threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ).ways_size()); + for(int i = 0, ways_size = threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ).ways_size(); i < ways_size; ++i) { w.Clear(); const OSMPBF::Way& inputWay = threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ).ways( i ); w.id = inputWay.id(); @@ -356,29 +368,37 @@ private: w.keyVals.Add(key, val); } + waysToParse.push_back(w); + } - //TODO: store in container and process in bulk + unsigned endi_ways = waysToParse.size(); +#pragma omp parallel for schedule ( guided ) + for(unsigned i = 0; i < endi_ways; ++i) { + _Way & w = waysToParse[i]; /** Pass the unpacked way to the LUA call back **/ try { luabind::call_function( - myLuaState, + scriptingEnvironment.getLuaStateForThreadID(omp_get_thread_num()), "way_function", boost::ref(w), w.path.size() ); - if(!externalMemory->wayFunction(w)) { - std::cerr << "[PBFParser] way not parsed" << std::endl; - } + } catch (const luabind::error &er) { lua_State* Ler=er.state(); report_errors(Ler, -1); ERR(er.what()); } - catch (...) { - ERR("Unknown error!"); +// catch (...) { +// ERR("Unknown error!"); +// } + } + + BOOST_FOREACH(_Way & w, waysToParse) { + if(!externalMemory->wayFunction(w)) { + std::cerr << "[PBFParser] way not parsed" << std::endl; } } - } } inline void loadGroup(_ThreadData * threadData) { @@ -412,7 +432,7 @@ private: threadData->currentEntityID = 0; } - /* Reverses Network Byte Order into something usable */ + /* Reverses Network Byte Order into something usable, compiles down to a bswap-mov combination */ inline unsigned swapEndian(unsigned x) const { if(getMachineEndianness() == LittleEndian) return ( (x>>24) | ((x<<8) & 0x00FF0000) | ((x>>8) & 0x0000FF00) | (x<<24) ); @@ -548,7 +568,7 @@ private: return true; } - //TODO: gets called multiple times, save information instead of recomputing. + //Is optimized to a single 'mov eax,1' on GCC, clang and icc using -O3 inline Endianness getMachineEndianness() const { int i(1); char *p = (char *) &i; @@ -574,7 +594,7 @@ private: /* ThreadData Queue */ boost::shared_ptr > threadDataQueue; - lua_State *myLuaState; + ScriptingEnvironment scriptingEnvironment; }; #endif /* PBFPARSER_H_ */ diff --git a/Extractor/ScriptingEnvironment.cpp b/Extractor/ScriptingEnvironment.cpp new file mode 100644 index 000000000..9a333f822 --- /dev/null +++ b/Extractor/ScriptingEnvironment.cpp @@ -0,0 +1,122 @@ +/* + open source routing machine + Copyright (C) Dennis Luxen, others 2010 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU AFFERO General Public License as published by + the Free Software Foundation; either version 3 of the License, or + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + or see http://www.gnu.org/licenses/agpl.txt. + */ + +extern "C" { +#include +#include +#include +} + +#include + +#include "ScriptingEnvironment.h" +#include "../typedefs.h" +#include "../Util/OpenMPWrapper.h" + +ScriptingEnvironment::ScriptingEnvironment() {} +ScriptingEnvironment::ScriptingEnvironment(const char * fileName) { + // Create a new lua state + for(unsigned i = 0; i < omp_get_max_threads(); ++i) + luaStateVector.push_back(luaL_newstate()); + + // Connect LuaBind to this lua state for all threads +#pragma omp parallel + { + lua_State * myLuaState = getLuaStateForThreadID(omp_get_thread_num()); + luabind::open(myLuaState); + //open utility libraries string library; + luaL_openlibs(myLuaState); + + // Add our function to the state's global scope + luabind::module(myLuaState) [ + luabind::def("print", LUA_print), + luabind::def("parseMaxspeed", parseMaxspeed), + luabind::def("durationIsValid", durationIsValid), + luabind::def("parseDuration", parseDuration) + ]; +//#pragma omp critical +// { +// if(0 != luaL_dostring( +// myLuaState, +// "print('Initializing LUA engine')\n" +// )) { +// ERR(lua_tostring(myLuaState,-1)<< " occured in scripting block"); +// } +// } + luabind::module(myLuaState) [ + luabind::class_ >("keyVals") + .def("Add", &HashTable::Add) + .def("Find", &HashTable::Find) + .def("Holds", &HashTable::Holds) + ]; + + luabind::module(myLuaState) [ + luabind::class_("Node") + .def(luabind::constructor<>()) + .def_readwrite("lat", &ImportNode::lat) + .def_readwrite("lon", &ImportNode::lon) + .def_readwrite("id", &ImportNode::id) + .def_readwrite("bollard", &ImportNode::bollard) + .def_readwrite("traffic_light", &ImportNode::trafficLight) + .def_readwrite("tags", &ImportNode::keyVals) + ]; + + luabind::module(myLuaState) [ + luabind::class_<_Way>("Way") + .def(luabind::constructor<>()) + .def_readwrite("name", &_Way::name) + .def_readwrite("speed", &_Way::speed) + .def_readwrite("type", &_Way::type) + .def_readwrite("access", &_Way::access) + .def_readwrite("roundabout", &_Way::roundabout) + .def_readwrite("is_duration_set", &_Way::isDurationSet) + .def_readwrite("is_access_restricted", &_Way::isAccessRestricted) + .def_readwrite("ignore_in_grid", &_Way::ignoreInGrid) + .def_readwrite("tags", &_Way::keyVals) + .def_readwrite("direction", &_Way::direction) + .enum_("constants") + [ + luabind::value("notSure", 0), + luabind::value("oneway", 1), + luabind::value("bidirectional", 2), + luabind::value("opposite", 3) + ] + ]; + + // Now call our function in a lua script +//#pragma omp critical +// { +// INFO("Parsing speedprofile from " << fileName ); +// } + if(0 != luaL_dofile(myLuaState, fileName) ) { + ERR(lua_tostring(myLuaState,-1)<< " occured in scripting block"); + } + } +} + +ScriptingEnvironment::~ScriptingEnvironment() { + for(unsigned i = 0; i < luaStateVector.size(); ++i) { + // luaStateVector[i]; + } +} + +lua_State * ScriptingEnvironment::getLuaStateForThreadID(const int id) { + return luaStateVector[id]; +} diff --git a/Extractor/ScriptingEnvironment.h b/Extractor/ScriptingEnvironment.h new file mode 100644 index 000000000..05528d408 --- /dev/null +++ b/Extractor/ScriptingEnvironment.h @@ -0,0 +1,48 @@ +/* + open source routing machine + Copyright (C) Dennis Luxen, others 2010 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU AFFERO General Public License as published by + the Free Software Foundation; either version 3 of the License, or + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + or see http://www.gnu.org/licenses/agpl.txt. + */ + +#ifndef SCRIPTINGENVIRONMENT_H_ +#define SCRIPTINGENVIRONMENT_H_ + +extern "C" { +#include +#include +#include +} +#include + +#include "ExtractionHelperFunctions.h" +#include "ExtractorStructs.h" +#include "LuaUtil.h" + +#include "../DataStructures/ImportNode.h" + +class ScriptingEnvironment { +public: + ScriptingEnvironment(); + ScriptingEnvironment(const char * fileName); + virtual ~ScriptingEnvironment(); + + lua_State * getLuaStateForThreadID(const int); + + std::vector luaStateVector; +}; + +#endif /* SCRIPTINGENVIRONMENT_H_ */ diff --git a/Extractor/XMLParser.h b/Extractor/XMLParser.h index 5069ec8ef..c9d665dc3 100644 --- a/Extractor/XMLParser.h +++ b/Extractor/XMLParser.h @@ -28,6 +28,7 @@ or see http://www.gnu.org/licenses/agpl.txt. #include "BaseParser.h" #include "ExtractorStructs.h" #include "ExtractorCallbacks.h" +#include "ScriptingEnvironment.h" #include "../DataStructures/HashTable.h" #include "../DataStructures/InputReaderFactory.h" @@ -43,8 +44,8 @@ public: externalMemory = em; } - void RegisterLUAState(lua_State *ml) { - myLuaState = ml; + void RegisterScriptingEnvironment(ScriptingEnvironment & _se) { + myLuaState = _se.getLuaStateForThreadID(0); } bool Init() { diff --git a/extractor.cpp b/extractor.cpp index e041b4d5c..eb4e643ba 100644 --- a/extractor.cpp +++ b/extractor.cpp @@ -23,19 +23,10 @@ or see http://www.gnu.org/licenses/agpl.txt. #include #include -extern "C" { -#include -#include -#include -} -#include - #include "typedefs.h" #include "Extractor/ExtractorCallbacks.h" #include "Extractor/ExtractionContainers.h" -#include "Extractor/ExtractionHelperFunctions.h" -#include "Extractor/ExtractorStructs.h" -#include "Extractor/LuaUtil.h" +#include "Extractor/ScriptingEnvironment.h" #include "Extractor/PBFParser.h" #include "Extractor/XMLParser.h" #include "Util/BaseConfiguration.h" @@ -76,79 +67,6 @@ int main (int argc, char *argv[]) { } } - /*** Setup Scripting Environment ***/ - - // Create a new lua state - lua_State *myLuaState = luaL_newstate(); - - // Connect LuaBind to this lua state - luabind::open(myLuaState); - - // Add our function to the state's global scope - luabind::module(myLuaState) [ - luabind::def("print", LUA_print), - luabind::def("parseMaxspeed", parseMaxspeed), - luabind::def("durationIsValid", durationIsValid), - luabind::def("parseDuration", parseDuration) - ]; - - if(0 != luaL_dostring( - myLuaState, - "print('Initializing LUA engine')\n" - )) { - ERR(lua_tostring(myLuaState,-1)<< " occured in scripting block"); - } - - luabind::module(myLuaState) [ - luabind::class_ >("keyVals") - .def("Add", &HashTable::Add) - .def("Find", &HashTable::Find) - .def("Holds", &HashTable::Holds) - ]; - - luabind::module(myLuaState) [ - luabind::class_("Node") - .def(luabind::constructor<>()) - .def_readwrite("lat", &ImportNode::lat) - .def_readwrite("lon", &ImportNode::lon) - .def_readwrite("id", &ImportNode::id) - .def_readwrite("bollard", &ImportNode::bollard) - .def_readwrite("traffic_light", &ImportNode::trafficLight) - .def_readwrite("tags", &ImportNode::keyVals) - ]; - - luabind::module(myLuaState) [ - luabind::class_<_Way>("Way") - .def(luabind::constructor<>()) - .def_readwrite("name", &_Way::name) - .def_readwrite("speed", &_Way::speed) - .def_readwrite("type", &_Way::type) - .def_readwrite("access", &_Way::access) - .def_readwrite("roundabout", &_Way::roundabout) - .def_readwrite("is_duration_set", &_Way::isDurationSet) - .def_readwrite("is_access_restricted", &_Way::isAccessRestricted) - .def_readwrite("ignore_in_grid", &_Way::ignoreInGrid) - .def_readwrite("tags", &_Way::keyVals) - .def_readwrite("direction", &_Way::direction) - .enum_("constants") - [ - luabind::value("notSure", 0), - luabind::value("oneway", 1), - luabind::value("bidirectional", 2), - luabind::value("opposite", 3) - ] - ]; - // Now call our function in a lua script - INFO("Parsing speedprofile from " << (argc > 2 ? argv[2] : "profile.lua") ); - if(0 != luaL_dofile(myLuaState, (argc > 2 ? argv[2] : "profile.lua") )) { - ERR(lua_tostring(myLuaState,-1)<< " occured in scripting block"); - } - - //open utility libraries string library; - luaL_openlibs(myLuaState); - - /*** End of Scripting Environment Setup; ***/ - unsigned amountOfRAM = 1; unsigned installedRAM = GetPhysicalmemory(); if(installedRAM < 2048264) { @@ -158,6 +76,9 @@ int main (int argc, char *argv[]) { StringMap stringMap; ExtractionContainers externalMemory; + /*** Setup Scripting Environment ***/ + ScriptingEnvironment scriptingEnvironment((argc > 2 ? argv[2] : "profile.lua")); + stringMap[""] = 0; extractCallBacks = new ExtractorCallbacks(&externalMemory, &stringMap); BaseParser * parser; @@ -167,7 +88,7 @@ int main (int argc, char *argv[]) { parser = new XMLParser(argv[1]); } parser->RegisterCallbacks(extractCallBacks); - parser->RegisterLUAState(myLuaState); + parser->RegisterScriptingEnvironment(scriptingEnvironment); if(!parser->Init()) ERR("Parser not initialized!");