Parallize scripting on osmium::Buffer granularity

Fixes #3447 and reduces parsing time by about 15%.
This commit is contained in:
Patrick Niklaus
2017-06-08 21:04:48 +00:00
committed by Patrick Niklaus
parent cd8fb82215
commit 27ed69b08f
4 changed files with 117 additions and 106 deletions
+62 -42
View File
@@ -33,12 +33,13 @@
#include <boost/filesystem.hpp>
#include <boost/filesystem/fstream.hpp>
#include <boost/iterator/function_input_iterator.hpp>
#include <boost/optional/optional.hpp>
#include <boost/scope_exit.hpp>
#include <osmium/io/any_input.hpp>
#include <tbb/concurrent_vector.h>
#include <tbb/pipeline.h>
#include <tbb/task_scheduler_init.h>
#include <cstdlib>
@@ -252,11 +253,6 @@ std::vector<TurnRestriction> Extractor::ParseOSMData(ScriptingEnvironment &scrip
timestamp_file.WriteFrom(timestamp.c_str(), timestamp.length());
// initialize vectors holding parsed objects
tbb::concurrent_vector<std::pair<std::size_t, ExtractionNode>> resulting_nodes;
tbb::concurrent_vector<std::pair<std::size_t, ExtractionWay>> resulting_ways;
tbb::concurrent_vector<boost::optional<InputRestrictionContainer>> resulting_restrictions;
std::vector<std::string> restrictions = scripting_environment.GetRestrictions();
// setup restriction parser
const RestrictionParser restriction_parser(
@@ -264,46 +260,70 @@ std::vector<TurnRestriction> Extractor::ParseOSMData(ScriptingEnvironment &scrip
config.parse_conditionals,
restrictions);
// create a vector of iterators into the buffer
for (std::vector<osmium::memory::Buffer::const_iterator> osm_elements;
const osmium::memory::Buffer buffer = reader.read();
osm_elements.clear())
std::mutex process_mutex;
using SharedBuffer = std::shared_ptr<const osmium::memory::Buffer>;
struct ParsedBuffer
{
for (auto iter = std::begin(buffer), end = std::end(buffer); iter != end; ++iter)
{
osm_elements.push_back(iter);
}
SharedBuffer buffer;
std::vector<std::pair<const osmium::Node &, ExtractionNode>> resulting_nodes;
std::vector<std::pair<const osmium::Way &, ExtractionWay>> resulting_ways;
std::vector<boost::optional<InputRestrictionContainer>> resulting_restrictions;
};
// clear resulting vectors
resulting_nodes.clear();
resulting_ways.clear();
resulting_restrictions.clear();
tbb::filter_t<void, SharedBuffer> buffer_reader(
tbb::filter::serial_in_order, [&](tbb::flow_control &fc) {
if (auto buffer = reader.read())
{
return std::make_shared<const osmium::memory::Buffer>(std::move(buffer));
}
else
{
fc.stop();
return SharedBuffer{};
}
});
tbb::filter_t<SharedBuffer, std::shared_ptr<ParsedBuffer>> buffer_transform(
tbb::filter::parallel, [&](const SharedBuffer buffer) {
if (!buffer)
return std::shared_ptr<ParsedBuffer>{};
scripting_environment.ProcessElements(osm_elements,
restriction_parser,
resulting_nodes,
resulting_ways,
resulting_restrictions);
auto parsed_buffer = std::make_shared<ParsedBuffer>();
parsed_buffer->buffer = buffer;
scripting_environment.ProcessElements(*buffer,
restriction_parser,
parsed_buffer->resulting_nodes,
parsed_buffer->resulting_ways,
parsed_buffer->resulting_restrictions);
return parsed_buffer;
});
tbb::filter_t<std::shared_ptr<ParsedBuffer>, void> buffer_storage(
tbb::filter::serial_in_order, [&](const std::shared_ptr<ParsedBuffer> parsed_buffer) {
if (!parsed_buffer)
return;
number_of_nodes += parsed_buffer->resulting_nodes.size();
// put parsed objects thru extractor callbacks
for (const auto &result : parsed_buffer->resulting_nodes)
{
extractor_callbacks->ProcessNode(result.first, result.second);
}
number_of_ways += parsed_buffer->resulting_ways.size();
for (const auto &result : parsed_buffer->resulting_ways)
{
extractor_callbacks->ProcessWay(result.first, result.second);
}
number_of_relations += parsed_buffer->resulting_restrictions.size();
for (const auto &result : parsed_buffer->resulting_restrictions)
{
extractor_callbacks->ProcessRestriction(result);
}
});
// Number of pipeline tokens that yielded the best speedup was about 1.5 * num_cores
tbb::parallel_pipeline(tbb::task_scheduler_init::default_num_threads() * 1.5,
buffer_reader & buffer_transform & buffer_storage);
number_of_nodes += resulting_nodes.size();
// put parsed objects thru extractor callbacks
for (const auto &result : resulting_nodes)
{
extractor_callbacks->ProcessNode(
static_cast<const osmium::Node &>(*(osm_elements[result.first])), result.second);
}
number_of_ways += resulting_ways.size();
for (const auto &result : resulting_ways)
{
extractor_callbacks->ProcessWay(
static_cast<const osmium::Way &>(*(osm_elements[result.first])), result.second);
}
number_of_relations += resulting_restrictions.size();
for (const auto &result : resulting_restrictions)
{
extractor_callbacks->ProcessRestriction(result);
}
}
TIMER_STOP(parsing);
util::Log() << "Parsing finished after " << TIMER_SEC(parsing) << " seconds";