Implements a shared-memory parallel LUA scripting engine using OpenMP.

See issue #506
This commit is contained in:
DennisOSRM 2012-11-19 19:04:59 +01:00
parent 7e9eaaddc4
commit 94657a3258
8 changed files with 240 additions and 118 deletions

View File

@ -37,7 +37,7 @@ class ConcurrentQueue {
public: public:
ConcurrentQueue(const size_t max_size) : internal_queue(max_size) { } ConcurrentQueue(const size_t max_size) : internal_queue(max_size) { }
void push(Data const& data) { inline void push(Data const& data) {
boost::mutex::scoped_lock lock(m_mutex); boost::mutex::scoped_lock lock(m_mutex);
m_not_full.wait(lock, boost::bind(&ConcurrentQueue<Data>::is_not_full, this)); m_not_full.wait(lock, boost::bind(&ConcurrentQueue<Data>::is_not_full, this));
internal_queue.push_back(data); internal_queue.push_back(data);
@ -45,11 +45,11 @@ public:
m_not_empty.notify_one(); m_not_empty.notify_one();
} }
bool empty() const { inline bool empty() const {
return internal_queue.empty(); return internal_queue.empty();
} }
void wait_and_pop(Data& popped_value) { inline void wait_and_pop(Data& popped_value) {
boost::mutex::scoped_lock lock(m_mutex); boost::mutex::scoped_lock lock(m_mutex);
m_not_empty.wait(lock, boost::bind(&ConcurrentQueue<Data>::is_not_empty, this)); m_not_empty.wait(lock, boost::bind(&ConcurrentQueue<Data>::is_not_empty, this));
popped_value=internal_queue.front(); popped_value=internal_queue.front();
@ -58,7 +58,7 @@ public:
m_not_full.notify_one(); m_not_full.notify_one();
} }
bool try_pop(Data& popped_value) { inline bool try_pop(Data& popped_value) {
boost::mutex::scoped_lock lock(m_mutex); boost::mutex::scoped_lock lock(m_mutex);
if(internal_queue.empty()) { if(internal_queue.empty()) {
return false; return false;

View File

@ -21,7 +21,13 @@ or see http://www.gnu.org/licenses/agpl.txt.
#ifndef BASEPARSER_H_ #ifndef BASEPARSER_H_
#define BASEPARSER_H_ #define BASEPARSER_H_
#include <luabind/luabind.hpp> extern "C" {
#include <lua.h>
#include <lauxlib.h>
#include <lualib.h>
}
#include "ScriptingEnvironment.h"
template<class ExternalMemoryT, typename NodeT, typename RestrictionT, typename WayT> template<class ExternalMemoryT, typename NodeT, typename RestrictionT, typename WayT>
class BaseParser { class BaseParser {
@ -29,7 +35,7 @@ public:
virtual ~BaseParser() {} virtual ~BaseParser() {}
virtual bool Init() = 0; virtual bool Init() = 0;
virtual void RegisterCallbacks(ExternalMemoryT * externalMemory) = 0; virtual void RegisterCallbacks(ExternalMemoryT * externalMemory) = 0;
virtual void RegisterLUAState(lua_State *myLuaState) = 0; virtual void RegisterScriptingEnvironment(ScriptingEnvironment & _se) = 0;
virtual bool Parse() = 0; virtual bool Parse() = 0;
void report_errors(lua_State *L, int status) { void report_errors(lua_State *L, int status) {

View File

@ -23,6 +23,10 @@ or see http://www.gnu.org/licenses/agpl.txt.
#ifndef EXTRACTIONHELPERFUNCTIONS_H_ #ifndef EXTRACTIONHELPERFUNCTIONS_H_
#define EXTRACTIONHELPERFUNCTIONS_H_ #define EXTRACTIONHELPERFUNCTIONS_H_
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string_regex.hpp>
#include <boost/regex.hpp>
#include <climits> #include <climits>
//TODO: Move into LUA //TODO: Move into LUA

View File

@ -34,8 +34,10 @@ or see http://www.gnu.org/licenses/agpl.txt.
#include "BaseParser.h" #include "BaseParser.h"
#include "ExtractorCallbacks.h" #include "ExtractorCallbacks.h"
#include "ExtractorStructs.h" #include "ExtractorStructs.h"
#include "ScriptingEnvironment.h"
#include "../DataStructures/HashTable.h" #include "../DataStructures/HashTable.h"
#include "../DataStructures/ConcurrentQueue.h" #include "../DataStructures/ConcurrentQueue.h"
#include "../Util/OpenMPWrapper.h"
class PBFParser : public BaseParser<ExtractorCallbacks, _Node, _RawRestrictionContainer, _Way> { class PBFParser : public BaseParser<ExtractorCallbacks, _Node, _RawRestrictionContainer, _Way> {
@ -68,9 +70,10 @@ class PBFParser : public BaseParser<ExtractorCallbacks, _Node, _RawRestrictionCo
}; };
public: public:
PBFParser(const char * fileName) : externalMemory(NULL), myLuaState(NULL) { PBFParser(const char * fileName) : externalMemory(NULL){
GOOGLE_PROTOBUF_VERIFY_VERSION; GOOGLE_PROTOBUF_VERIFY_VERSION;
//TODO: What is the bottleneck here? Filling the queue or reading the stuff from disk? //TODO: What is the bottleneck here? Filling the queue or reading the stuff from disk?
//NOTE: With Lua scripting, it is parsing the stuff. I/O is virtually for free.
threadDataQueue = boost::make_shared<ConcurrentQueue<_ThreadData*> >( 2500 ); /* Max 2500 items in queue, hardcoded. */ threadDataQueue = boost::make_shared<ConcurrentQueue<_ThreadData*> >( 2500 ); /* Max 2500 items in queue, hardcoded. */
input.open(fileName, std::ios::in | std::ios::binary); input.open(fileName, std::ios::in | std::ios::binary);
@ -88,8 +91,9 @@ public:
externalMemory = em; externalMemory = em;
} }
void RegisterLUAState(lua_State *ml) { //call by value, but who cares. It is done once.
myLuaState = ml; void RegisterScriptingEnvironment(ScriptingEnvironment & _se) {
scriptingEnvironment = _se;
} }
~PBFParser() { ~PBFParser() {
@ -210,8 +214,7 @@ private:
int m_lastDenseLongitude = 0; int m_lastDenseLongitude = 0;
ImportNode n; ImportNode n;
std::vector<ImportNode> nodesToParse;
//TODO: abbruchschwellenwert initialisieren
for(int i = 0, idSize = dense.id_size(); i < idSize; ++i) { for(int i = 0, idSize = dense.id_size(); i < idSize; ++i) {
n.Clear(); n.Clear();
m_lastDenseID += dense.id( i ); m_lastDenseID += dense.id( i );
@ -232,25 +235,34 @@ private:
n.keyVals.Add(key, value); n.keyVals.Add(key, value);
denseTagIndex += 2; denseTagIndex += 2;
} }
nodesToParse.push_back(n);
}
//TODO: store in container and process in bulk unsigned endi_nodes = nodesToParse.size();
#pragma omp parallel for schedule ( guided )
for(unsigned i = 0; i < endi_nodes; ++i) {
ImportNode &n = nodesToParse[i];
/** Pass the unpacked node to the LUA call back **/ /** Pass the unpacked node to the LUA call back **/
try { try {
luabind::call_function<int>( luabind::call_function<int>(
myLuaState, scriptingEnvironment.getLuaStateForThreadID(omp_get_thread_num()),
"node_function", "node_function",
boost::ref(n) boost::ref(n)
); );
if(!externalMemory->nodeFunction(n))
std::cerr << "[PBFParser] dense node not parsed" << std::endl;
} catch (const luabind::error &er) { } catch (const luabind::error &er) {
cerr << er.what() << endl;
lua_State* Ler=er.state(); lua_State* Ler=er.state();
report_errors(Ler, -1); report_errors(Ler, -1);
ERR(er.what());
} }
catch (...) { // catch (...) {
ERR("Unknown error occurred during PBF dense node parsing!"); // ERR("Unknown error occurred during PBF dense node parsing!");
} // }
}
BOOST_FOREACH(ImportNode &n, nodesToParse) {
if(!externalMemory->nodeFunction(n))
std::cerr << "[PBFParser] dense node not parsed" << std::endl;
} }
} }
@ -338,9 +350,9 @@ private:
} }
inline void parseWay(_ThreadData * threadData) { inline void parseWay(_ThreadData * threadData) {
if( threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ).ways_size() > 0) {
_Way w; _Way w;
for(int i = 0; i < threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ).ways_size(); ++i) { std::vector<_Way> waysToParse(threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ).ways_size());
for(int i = 0, ways_size = threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ).ways_size(); i < ways_size; ++i) {
w.Clear(); w.Clear();
const OSMPBF::Way& inputWay = threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ).ways( i ); const OSMPBF::Way& inputWay = threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ).ways( i );
w.id = inputWay.id(); w.id = inputWay.id();
@ -356,29 +368,37 @@ private:
w.keyVals.Add(key, val); w.keyVals.Add(key, val);
} }
waysToParse.push_back(w);
}
//TODO: store in container and process in bulk unsigned endi_ways = waysToParse.size();
#pragma omp parallel for schedule ( guided )
for(unsigned i = 0; i < endi_ways; ++i) {
_Way & w = waysToParse[i];
/** Pass the unpacked way to the LUA call back **/ /** Pass the unpacked way to the LUA call back **/
try { try {
luabind::call_function<int>( luabind::call_function<int>(
myLuaState, scriptingEnvironment.getLuaStateForThreadID(omp_get_thread_num()),
"way_function", "way_function",
boost::ref(w), boost::ref(w),
w.path.size() w.path.size()
); );
if(!externalMemory->wayFunction(w)) {
std::cerr << "[PBFParser] way not parsed" << std::endl;
}
} catch (const luabind::error &er) { } catch (const luabind::error &er) {
lua_State* Ler=er.state(); lua_State* Ler=er.state();
report_errors(Ler, -1); report_errors(Ler, -1);
ERR(er.what()); ERR(er.what());
} }
catch (...) { // catch (...) {
ERR("Unknown error!"); // ERR("Unknown error!");
// }
}
BOOST_FOREACH(_Way & w, waysToParse) {
if(!externalMemory->wayFunction(w)) {
std::cerr << "[PBFParser] way not parsed" << std::endl;
} }
} }
}
} }
inline void loadGroup(_ThreadData * threadData) { inline void loadGroup(_ThreadData * threadData) {
@ -412,7 +432,7 @@ private:
threadData->currentEntityID = 0; threadData->currentEntityID = 0;
} }
/* Reverses Network Byte Order into something usable */ /* Reverses Network Byte Order into something usable, compiles down to a bswap-mov combination */
inline unsigned swapEndian(unsigned x) const { inline unsigned swapEndian(unsigned x) const {
if(getMachineEndianness() == LittleEndian) if(getMachineEndianness() == LittleEndian)
return ( (x>>24) | ((x<<8) & 0x00FF0000) | ((x>>8) & 0x0000FF00) | (x<<24) ); return ( (x>>24) | ((x<<8) & 0x00FF0000) | ((x>>8) & 0x0000FF00) | (x<<24) );
@ -548,7 +568,7 @@ private:
return true; return true;
} }
//TODO: gets called multiple times, save information instead of recomputing. //Is optimized to a single 'mov eax,1' on GCC, clang and icc using -O3
inline Endianness getMachineEndianness() const { inline Endianness getMachineEndianness() const {
int i(1); int i(1);
char *p = (char *) &i; char *p = (char *) &i;
@ -574,7 +594,7 @@ private:
/* ThreadData Queue */ /* ThreadData Queue */
boost::shared_ptr<ConcurrentQueue < _ThreadData* > > threadDataQueue; boost::shared_ptr<ConcurrentQueue < _ThreadData* > > threadDataQueue;
lua_State *myLuaState; ScriptingEnvironment scriptingEnvironment;
}; };
#endif /* PBFPARSER_H_ */ #endif /* PBFPARSER_H_ */

View File

@ -0,0 +1,122 @@
/*
open source routing machine
Copyright (C) Dennis Luxen, others 2010
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU AFFERO General Public License as published by
the Free Software Foundation; either version 3 of the License, or
any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
or see http://www.gnu.org/licenses/agpl.txt.
*/
extern "C" {
#include <lua.h>
#include <lauxlib.h>
#include <lualib.h>
}
#include <boost/foreach.hpp>
#include "ScriptingEnvironment.h"
#include "../typedefs.h"
#include "../Util/OpenMPWrapper.h"
ScriptingEnvironment::ScriptingEnvironment() {}
ScriptingEnvironment::ScriptingEnvironment(const char * fileName) {
// Create a new lua state
for(unsigned i = 0; i < omp_get_max_threads(); ++i)
luaStateVector.push_back(luaL_newstate());
// Connect LuaBind to this lua state for all threads
#pragma omp parallel
{
lua_State * myLuaState = getLuaStateForThreadID(omp_get_thread_num());
luabind::open(myLuaState);
//open utility libraries string library;
luaL_openlibs(myLuaState);
// Add our function to the state's global scope
luabind::module(myLuaState) [
luabind::def("print", LUA_print<std::string>),
luabind::def("parseMaxspeed", parseMaxspeed),
luabind::def("durationIsValid", durationIsValid),
luabind::def("parseDuration", parseDuration)
];
//#pragma omp critical
// {
// if(0 != luaL_dostring(
// myLuaState,
// "print('Initializing LUA engine')\n"
// )) {
// ERR(lua_tostring(myLuaState,-1)<< " occured in scripting block");
// }
// }
luabind::module(myLuaState) [
luabind::class_<HashTable<std::string, std::string> >("keyVals")
.def("Add", &HashTable<std::string, std::string>::Add)
.def("Find", &HashTable<std::string, std::string>::Find)
.def("Holds", &HashTable<std::string, std::string>::Holds)
];
luabind::module(myLuaState) [
luabind::class_<ImportNode>("Node")
.def(luabind::constructor<>())
.def_readwrite("lat", &ImportNode::lat)
.def_readwrite("lon", &ImportNode::lon)
.def_readwrite("id", &ImportNode::id)
.def_readwrite("bollard", &ImportNode::bollard)
.def_readwrite("traffic_light", &ImportNode::trafficLight)
.def_readwrite("tags", &ImportNode::keyVals)
];
luabind::module(myLuaState) [
luabind::class_<_Way>("Way")
.def(luabind::constructor<>())
.def_readwrite("name", &_Way::name)
.def_readwrite("speed", &_Way::speed)
.def_readwrite("type", &_Way::type)
.def_readwrite("access", &_Way::access)
.def_readwrite("roundabout", &_Way::roundabout)
.def_readwrite("is_duration_set", &_Way::isDurationSet)
.def_readwrite("is_access_restricted", &_Way::isAccessRestricted)
.def_readwrite("ignore_in_grid", &_Way::ignoreInGrid)
.def_readwrite("tags", &_Way::keyVals)
.def_readwrite("direction", &_Way::direction)
.enum_("constants")
[
luabind::value("notSure", 0),
luabind::value("oneway", 1),
luabind::value("bidirectional", 2),
luabind::value("opposite", 3)
]
];
// Now call our function in a lua script
//#pragma omp critical
// {
// INFO("Parsing speedprofile from " << fileName );
// }
if(0 != luaL_dofile(myLuaState, fileName) ) {
ERR(lua_tostring(myLuaState,-1)<< " occured in scripting block");
}
}
}
ScriptingEnvironment::~ScriptingEnvironment() {
for(unsigned i = 0; i < luaStateVector.size(); ++i) {
// luaStateVector[i];
}
}
lua_State * ScriptingEnvironment::getLuaStateForThreadID(const int id) {
return luaStateVector[id];
}

View File

@ -0,0 +1,48 @@
/*
open source routing machine
Copyright (C) Dennis Luxen, others 2010
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU AFFERO General Public License as published by
the Free Software Foundation; either version 3 of the License, or
any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
or see http://www.gnu.org/licenses/agpl.txt.
*/
#ifndef SCRIPTINGENVIRONMENT_H_
#define SCRIPTINGENVIRONMENT_H_
extern "C" {
#include <lua.h>
#include <lauxlib.h>
#include <lualib.h>
}
#include <luabind/luabind.hpp>
#include "ExtractionHelperFunctions.h"
#include "ExtractorStructs.h"
#include "LuaUtil.h"
#include "../DataStructures/ImportNode.h"
class ScriptingEnvironment {
public:
ScriptingEnvironment();
ScriptingEnvironment(const char * fileName);
virtual ~ScriptingEnvironment();
lua_State * getLuaStateForThreadID(const int);
std::vector<lua_State *> luaStateVector;
};
#endif /* SCRIPTINGENVIRONMENT_H_ */

View File

@ -28,6 +28,7 @@ or see http://www.gnu.org/licenses/agpl.txt.
#include "BaseParser.h" #include "BaseParser.h"
#include "ExtractorStructs.h" #include "ExtractorStructs.h"
#include "ExtractorCallbacks.h" #include "ExtractorCallbacks.h"
#include "ScriptingEnvironment.h"
#include "../DataStructures/HashTable.h" #include "../DataStructures/HashTable.h"
#include "../DataStructures/InputReaderFactory.h" #include "../DataStructures/InputReaderFactory.h"
@ -43,8 +44,8 @@ public:
externalMemory = em; externalMemory = em;
} }
void RegisterLUAState(lua_State *ml) { void RegisterScriptingEnvironment(ScriptingEnvironment & _se) {
myLuaState = ml; myLuaState = _se.getLuaStateForThreadID(0);
} }
bool Init() { bool Init() {

View File

@ -23,19 +23,10 @@ or see http://www.gnu.org/licenses/agpl.txt.
#include <fstream> #include <fstream>
#include <string> #include <string>
extern "C" {
#include <lua.h>
#include <lauxlib.h>
#include <lualib.h>
}
#include <luabind/luabind.hpp>
#include "typedefs.h" #include "typedefs.h"
#include "Extractor/ExtractorCallbacks.h" #include "Extractor/ExtractorCallbacks.h"
#include "Extractor/ExtractionContainers.h" #include "Extractor/ExtractionContainers.h"
#include "Extractor/ExtractionHelperFunctions.h" #include "Extractor/ScriptingEnvironment.h"
#include "Extractor/ExtractorStructs.h"
#include "Extractor/LuaUtil.h"
#include "Extractor/PBFParser.h" #include "Extractor/PBFParser.h"
#include "Extractor/XMLParser.h" #include "Extractor/XMLParser.h"
#include "Util/BaseConfiguration.h" #include "Util/BaseConfiguration.h"
@ -76,79 +67,6 @@ int main (int argc, char *argv[]) {
} }
} }
/*** Setup Scripting Environment ***/
// Create a new lua state
lua_State *myLuaState = luaL_newstate();
// Connect LuaBind to this lua state
luabind::open(myLuaState);
// Add our function to the state's global scope
luabind::module(myLuaState) [
luabind::def("print", LUA_print<std::string>),
luabind::def("parseMaxspeed", parseMaxspeed),
luabind::def("durationIsValid", durationIsValid),
luabind::def("parseDuration", parseDuration)
];
if(0 != luaL_dostring(
myLuaState,
"print('Initializing LUA engine')\n"
)) {
ERR(lua_tostring(myLuaState,-1)<< " occured in scripting block");
}
luabind::module(myLuaState) [
luabind::class_<HashTable<std::string, std::string> >("keyVals")
.def("Add", &HashTable<std::string, std::string>::Add)
.def("Find", &HashTable<std::string, std::string>::Find)
.def("Holds", &HashTable<std::string, std::string>::Holds)
];
luabind::module(myLuaState) [
luabind::class_<ImportNode>("Node")
.def(luabind::constructor<>())
.def_readwrite("lat", &ImportNode::lat)
.def_readwrite("lon", &ImportNode::lon)
.def_readwrite("id", &ImportNode::id)
.def_readwrite("bollard", &ImportNode::bollard)
.def_readwrite("traffic_light", &ImportNode::trafficLight)
.def_readwrite("tags", &ImportNode::keyVals)
];
luabind::module(myLuaState) [
luabind::class_<_Way>("Way")
.def(luabind::constructor<>())
.def_readwrite("name", &_Way::name)
.def_readwrite("speed", &_Way::speed)
.def_readwrite("type", &_Way::type)
.def_readwrite("access", &_Way::access)
.def_readwrite("roundabout", &_Way::roundabout)
.def_readwrite("is_duration_set", &_Way::isDurationSet)
.def_readwrite("is_access_restricted", &_Way::isAccessRestricted)
.def_readwrite("ignore_in_grid", &_Way::ignoreInGrid)
.def_readwrite("tags", &_Way::keyVals)
.def_readwrite("direction", &_Way::direction)
.enum_("constants")
[
luabind::value("notSure", 0),
luabind::value("oneway", 1),
luabind::value("bidirectional", 2),
luabind::value("opposite", 3)
]
];
// Now call our function in a lua script
INFO("Parsing speedprofile from " << (argc > 2 ? argv[2] : "profile.lua") );
if(0 != luaL_dofile(myLuaState, (argc > 2 ? argv[2] : "profile.lua") )) {
ERR(lua_tostring(myLuaState,-1)<< " occured in scripting block");
}
//open utility libraries string library;
luaL_openlibs(myLuaState);
/*** End of Scripting Environment Setup; ***/
unsigned amountOfRAM = 1; unsigned amountOfRAM = 1;
unsigned installedRAM = GetPhysicalmemory(); unsigned installedRAM = GetPhysicalmemory();
if(installedRAM < 2048264) { if(installedRAM < 2048264) {
@ -158,6 +76,9 @@ int main (int argc, char *argv[]) {
StringMap stringMap; StringMap stringMap;
ExtractionContainers externalMemory; ExtractionContainers externalMemory;
/*** Setup Scripting Environment ***/
ScriptingEnvironment scriptingEnvironment((argc > 2 ? argv[2] : "profile.lua"));
stringMap[""] = 0; stringMap[""] = 0;
extractCallBacks = new ExtractorCallbacks(&externalMemory, &stringMap); extractCallBacks = new ExtractorCallbacks(&externalMemory, &stringMap);
BaseParser<ExtractorCallbacks, _Node, _RawRestrictionContainer, _Way> * parser; BaseParser<ExtractorCallbacks, _Node, _RawRestrictionContainer, _Way> * parser;
@ -167,7 +88,7 @@ int main (int argc, char *argv[]) {
parser = new XMLParser(argv[1]); parser = new XMLParser(argv[1]);
} }
parser->RegisterCallbacks(extractCallBacks); parser->RegisterCallbacks(extractCallBacks);
parser->RegisterLUAState(myLuaState); parser->RegisterScriptingEnvironment(scriptingEnvironment);
if(!parser->Init()) if(!parser->Init())
ERR("Parser not initialized!"); ERR("Parser not initialized!");