parallelize with a vector of iterators

This commit is contained in:
Dennis Luxen 2014-11-18 16:54:48 +01:00
parent 710e74219a
commit 428bb36ec6
4 changed files with 96 additions and 42 deletions

View File

@ -49,12 +49,14 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <osmium/io/any_input.hpp> #include <osmium/io/any_input.hpp>
#include <tbb/parallel_for.h>
#include <tbb/task_scheduler_init.h> #include <tbb/task_scheduler_init.h>
#include <variant/optional.hpp> #include <variant/optional.hpp>
#include <cstdlib> #include <cstdlib>
#include <algorithm>
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <fstream> #include <fstream>
@ -116,16 +118,22 @@ int Extractor::Run(int argc, char *argv[])
SimpleLogger().Write() << "Input file: " << extractor_config.input_path.filename().string(); SimpleLogger().Write() << "Input file: " << extractor_config.input_path.filename().string();
SimpleLogger().Write() << "Profile: " << extractor_config.profile_path.filename().string(); SimpleLogger().Write() << "Profile: " << extractor_config.profile_path.filename().string();
SimpleLogger().Write() << "Threads: " << extractor_config.requested_num_threads; SimpleLogger().Write() << "Threads: " << extractor_config.requested_num_threads;
if (recommended_num_threads != extractor_config.requested_num_threads) // if (recommended_num_threads != extractor_config.requested_num_threads)
{ // {
SimpleLogger().Write(logWARNING) << "The recommended number of threads is " // SimpleLogger().Write(logWARNING) << "The recommended number of threads is "
<< recommended_num_threads // << recommended_num_threads
<< "! This setting may have performance side-effects."; // << "! This setting may have performance side-effects.";
} // }
tbb::task_scheduler_init init(extractor_config.requested_num_threads); auto number_of_threads = std::max(1,
std::min(static_cast<int>(recommended_num_threads), static_cast<int>(extractor_config.requested_num_threads)) );
/*** Setup Scripting Environment ***/ tbb::task_scheduler_init init(number_of_threads);
SimpleLogger().Write() << "requested_num_threads: " << extractor_config.requested_num_threads;
SimpleLogger().Write() << "number_of_threads: " << number_of_threads;
// setup scripting environment
ScriptingEnvironment scripting_environment(extractor_config.profile_path.string().c_str()); ScriptingEnvironment scripting_environment(extractor_config.profile_path.string().c_str());
std::unordered_map<std::string, NodeID> string_map; std::unordered_map<std::string, NodeID> string_map;
@ -166,48 +174,94 @@ int Extractor::Run(int argc, char *argv[])
timestamp_out.write(timestamp.c_str(), timestamp.length()); timestamp_out.write(timestamp.c_str(), timestamp.length());
timestamp_out.close(); timestamp_out.close();
lua_State *lua_state = scripting_environment.getLuaState(); // lua_State *lua_state = scripting_environment.getLuaState();
luabind::set_pcall_callback(&lua_error_callback); luabind::set_pcall_callback(&lua_error_callback);
RestrictionParser restriction_parser(scripting_environment); // initialize vectors holding parsed objects
tbb::concurrent_vector<std::pair<std::size_t, ExtractionNode>> resulting_nodes;
ExtractionNode result_node; tbb::concurrent_vector<std::pair<std::size_t, ExtractionWay>> resulting_ways;
ExtractionWay result_way; tbb::concurrent_vector<mapbox::util::optional<InputRestrictionContainer>> resulting_restrictions;
while (osmium::memory::Buffer buffer = reader.read()) while (osmium::memory::Buffer buffer = reader.read())
{ {
for (osmium::OSMEntity &entity : buffer) // create a vector of iterators into the buffer
std::vector<osmium::memory::Buffer::iterator> elements;
osmium::memory::Buffer::iterator iter = std::begin(buffer);
while(iter != std::end(buffer))
{ {
switch (entity.type()) elements.push_back(iter);
iter = std::next(iter);
}
// clear resulting vectors
resulting_nodes.clear();
resulting_ways.clear();
resulting_restrictions.clear();
// SimpleLogger().Write(logDEBUG) << "elements count: " << elements.size();
// parse OSM entities in parallel, store in resulting vectors
tbb::parallel_for(tbb::blocked_range<std::size_t>(0, elements.size()),
[&](const tbb::blocked_range<std::size_t>& range)
{
for (auto x = range.begin(); x != range.end(); ++x)
{
auto entity = elements[x];
ExtractionNode result_node;
ExtractionWay result_way;
// RestrictionParser restriction_parser(scripting_environment);
switch (entity->type())
{ {
case osmium::item_type::node: case osmium::item_type::node:
++number_of_nodes; ++number_of_nodes;
result_node.Clear(); result_node.Clear();
luabind::call_function<void>(lua_state, luabind::call_function<void>(scripting_environment.getLuaState(),
"node_function", "node_function",
boost::cref(static_cast<osmium::Node &>(entity)), boost::cref(static_cast<osmium::Node &>(*entity)),
boost::ref(result_node)); boost::ref(result_node));
extractor_callbacks->ProcessNode(static_cast<osmium::Node &>(entity), resulting_nodes.emplace_back(x, result_node);
result_node); // extractor_callbacks->ProcessNode(static_cast<osmium::Node &>(*entity),
// result_node);
break; break;
case osmium::item_type::way: case osmium::item_type::way:
++number_of_ways; ++number_of_ways;
result_way.Clear(); result_way.Clear();
luabind::call_function<void>(lua_state, luabind::call_function<void>(scripting_environment.getLuaState(),
"way_function", "way_function",
boost::cref(static_cast<osmium::Way &>(entity)), boost::cref(static_cast<osmium::Way &>(*entity)),
boost::ref(result_way)); boost::ref(result_way));
extractor_callbacks->ProcessWay(static_cast<osmium::Way &>(entity), result_way); resulting_ways.emplace_back(x, result_way);
// extractor_callbacks->ProcessWay(static_cast<osmium::Way &>(*entity), result_way);
break; break;
case osmium::item_type::relation: case osmium::item_type::relation:
++number_of_relations; ++number_of_relations;
extractor_callbacks->ProcessRestriction( // resulting_restrictions.emplace_back(restriction_parser.TryParse(static_cast<osmium::Relation &>(*entity)));
restriction_parser.TryParse(static_cast<osmium::Relation &>(entity))); // extractor_callbacks->ProcessRestriction(restriction_parser.TryParse(static_cast<osmium::Relation &>(*entity)));
break; break;
default: default:
++number_of_others; ++number_of_others;
break; break;
} }
}
}
);
// put parsed objects thru extractor callbacks
for (const auto &result : resulting_nodes)
{
extractor_callbacks->ProcessNode(static_cast<osmium::Node &>(*(elements[result.first])),
result.second);
}
for (const auto &result : resulting_ways)
{
extractor_callbacks->ProcessWay(static_cast<osmium::Way &>(*(elements[result.first])),
result.second);
}
for (const auto &result : resulting_restrictions)
{
extractor_callbacks->ProcessRestriction(result);
} }
} }
TIMER_STOP(parsing); TIMER_STOP(parsing);

View File

@ -66,10 +66,10 @@ void ExtractorCallbacks::ProcessRestriction(
if (restriction) if (restriction)
{ {
external_memory.restrictions_list.push_back(restriction.get()); external_memory.restrictions_list.push_back(restriction.get());
SimpleLogger().Write() << "from: " << restriction.get().restriction.from.node << // SimpleLogger().Write() << "from: " << restriction.get().restriction.from.node <<
",via: " << restriction.get().restriction.via.node << // ",via: " << restriction.get().restriction.via.node <<
", to: " << restriction.get().restriction.to.node << // ", to: " << restriction.get().restriction.to.node <<
", only: " << (restriction.get().restriction.flags.is_only ? "y" : "n"); // ", only: " << (restriction.get().restriction.flags.is_only ? "y" : "n");
} }
} }
/** warning: caller needs to take care of synchronization! */ /** warning: caller needs to take care of synchronization! */

View File

@ -51,18 +51,18 @@ int lua_error_callback(lua_State *L)
} }
} }
RestrictionParser::RestrictionParser(ScriptingEnvironment &scripting_environment) RestrictionParser::RestrictionParser(lua_State *lua_state)
: lua_state(scripting_environment.getLuaState()), use_turn_restrictions(true) : /*lua_state(scripting_environment.getLuaState()),*/ use_turn_restrictions(true)
{ {
ReadUseRestrictionsSetting(); ReadUseRestrictionsSetting(lua_state);
if (use_turn_restrictions) if (use_turn_restrictions)
{ {
ReadRestrictionExceptions(); ReadRestrictionExceptions(lua_state);
} }
} }
void RestrictionParser::ReadUseRestrictionsSetting() void RestrictionParser::ReadUseRestrictionsSetting(lua_State *lua_state)
{ {
if (0 == luaL_dostring(lua_state, "return use_turn_restrictions\n")) if (0 == luaL_dostring(lua_state, "return use_turn_restrictions\n"))
{ {
@ -82,7 +82,7 @@ void RestrictionParser::ReadUseRestrictionsSetting()
} }
} }
void RestrictionParser::ReadRestrictionExceptions() void RestrictionParser::ReadRestrictionExceptions(lua_State *lua_state)
{ {
if (lua_function_exists(lua_state, "get_exceptions")) if (lua_function_exists(lua_state, "get_exceptions"))
{ {
@ -104,7 +104,7 @@ void RestrictionParser::ReadRestrictionExceptions()
} }
} }
mapbox::util::optional<InputRestrictionContainer> RestrictionParser::TryParse(osmium::Relation &relation) const mapbox::util::optional<InputRestrictionContainer> RestrictionParser::TryParse(lua_State *lua_state, osmium::Relation &relation) const
{ {
// return if turn restrictions should be ignored // return if turn restrictions should be ignored
if (!use_turn_restrictions) if (!use_turn_restrictions)

View File

@ -44,16 +44,16 @@ class ScriptingEnvironment;
class RestrictionParser class RestrictionParser
{ {
public: public:
RestrictionParser(ScriptingEnvironment &scripting_environment); // RestrictionParser(ScriptingEnvironment &scripting_environment);
RestrictionParser(lua_State *lua_state);
mapbox::util::optional<InputRestrictionContainer> TryParse(lua_State *lua_state, osmium::Relation& relation) const;
mapbox::util::optional<InputRestrictionContainer> TryParse(osmium::Relation& relation) const;
void ReadUseRestrictionsSetting();
void ReadRestrictionExceptions();
private: private:
bool ShouldIgnoreRestriction(const std::string &except_tag_string) const; void ReadUseRestrictionsSetting(lua_State *lua_state);
void ReadRestrictionExceptions(lua_State *lua_state);
bool ShouldIgnoreRestriction(lua_State *lua_state, const std::string &except_tag_string) const;
lua_State *lua_state; // lua_State *lua_state;
std::vector<std::string> restriction_exceptions; std::vector<std::string> restriction_exceptions;
bool use_turn_restrictions; bool use_turn_restrictions;
}; };