diff --git a/include/extractor/scripting_environment.hpp b/include/extractor/scripting_environment.hpp index 865ee6b2e..58ba319fd 100644 --- a/include/extractor/scripting_environment.hpp +++ b/include/extractor/scripting_environment.hpp @@ -58,13 +58,12 @@ class ScriptingEnvironment virtual void ProcessTurn(ExtractionTurn &turn) = 0; virtual void ProcessSegment(ExtractionSegment &segment) = 0; - virtual void - ProcessElements(const std::vector &osm_elements, - const RestrictionParser &restriction_parser, - tbb::concurrent_vector> &resulting_nodes, - tbb::concurrent_vector> &resulting_ways, - tbb::concurrent_vector> - &resulting_restrictions) = 0; + virtual void ProcessElements( + const osmium::memory::Buffer &buffer, + const RestrictionParser &restriction_parser, + std::vector> &resulting_nodes, + std::vector> &resulting_ways, + std::vector> &resulting_restrictions) = 0; }; } } diff --git a/include/extractor/scripting_environment_lua.hpp b/include/extractor/scripting_environment_lua.hpp index 1378b97c7..42c53084e 100644 --- a/include/extractor/scripting_environment_lua.hpp +++ b/include/extractor/scripting_environment_lua.hpp @@ -65,13 +65,12 @@ class Sol2ScriptingEnvironment final : public ScriptingEnvironment void ProcessTurn(ExtractionTurn &turn) override; void ProcessSegment(ExtractionSegment &segment) override; - void - ProcessElements(const std::vector &osm_elements, - const RestrictionParser &restriction_parser, - tbb::concurrent_vector> &resulting_nodes, - tbb::concurrent_vector> &resulting_ways, - tbb::concurrent_vector> - &resulting_restrictions) override; + void ProcessElements( + const osmium::memory::Buffer &buffer, + const RestrictionParser &restriction_parser, + std::vector> &resulting_nodes, + std::vector> &resulting_ways, + std::vector> &resulting_restrictions) override; private: void InitContext(LuaScriptingContext &context); diff --git a/src/extractor/extractor.cpp b/src/extractor/extractor.cpp index d2fc771d2..232f02061 100644 --- a/src/extractor/extractor.cpp +++ b/src/extractor/extractor.cpp @@ -33,12 +33,13 @@ #include #include +#include #include #include #include -#include +#include #include #include @@ -252,11 +253,6 @@ std::vector Extractor::ParseOSMData(ScriptingEnvironment &scrip timestamp_file.WriteFrom(timestamp.c_str(), timestamp.length()); - // initialize vectors holding parsed objects - tbb::concurrent_vector> resulting_nodes; - tbb::concurrent_vector> resulting_ways; - tbb::concurrent_vector> resulting_restrictions; - std::vector restrictions = scripting_environment.GetRestrictions(); // setup restriction parser const RestrictionParser restriction_parser( @@ -264,46 +260,70 @@ std::vector Extractor::ParseOSMData(ScriptingEnvironment &scrip config.parse_conditionals, restrictions); - // create a vector of iterators into the buffer - for (std::vector osm_elements; - const osmium::memory::Buffer buffer = reader.read(); - osm_elements.clear()) + std::mutex process_mutex; + + using SharedBuffer = std::shared_ptr; + struct ParsedBuffer { - for (auto iter = std::begin(buffer), end = std::end(buffer); iter != end; ++iter) - { - osm_elements.push_back(iter); - } + SharedBuffer buffer; + std::vector> resulting_nodes; + std::vector> resulting_ways; + std::vector> resulting_restrictions; + }; - // clear resulting vectors - resulting_nodes.clear(); - resulting_ways.clear(); - resulting_restrictions.clear(); + tbb::filter_t buffer_reader( + tbb::filter::serial_in_order, [&](tbb::flow_control &fc) { + if (auto buffer = reader.read()) + { + return std::make_shared(std::move(buffer)); + } + else + { + fc.stop(); + return SharedBuffer{}; + } + }); + tbb::filter_t> buffer_transform( + tbb::filter::parallel, [&](const SharedBuffer buffer) { + if (!buffer) + return std::shared_ptr{}; - scripting_environment.ProcessElements(osm_elements, - restriction_parser, - resulting_nodes, - resulting_ways, - resulting_restrictions); + auto parsed_buffer = std::make_shared(); + parsed_buffer->buffer = buffer; + scripting_environment.ProcessElements(*buffer, + restriction_parser, + parsed_buffer->resulting_nodes, + parsed_buffer->resulting_ways, + parsed_buffer->resulting_restrictions); + return parsed_buffer; + }); + tbb::filter_t, void> buffer_storage( + tbb::filter::serial_in_order, [&](const std::shared_ptr parsed_buffer) { + if (!parsed_buffer) + return; + + number_of_nodes += parsed_buffer->resulting_nodes.size(); + // put parsed objects thru extractor callbacks + for (const auto &result : parsed_buffer->resulting_nodes) + { + extractor_callbacks->ProcessNode(result.first, result.second); + } + number_of_ways += parsed_buffer->resulting_ways.size(); + for (const auto &result : parsed_buffer->resulting_ways) + { + extractor_callbacks->ProcessWay(result.first, result.second); + } + number_of_relations += parsed_buffer->resulting_restrictions.size(); + for (const auto &result : parsed_buffer->resulting_restrictions) + { + extractor_callbacks->ProcessRestriction(result); + } + }); + + // Number of pipeline tokens that yielded the best speedup was about 1.5 * num_cores + tbb::parallel_pipeline(tbb::task_scheduler_init::default_num_threads() * 1.5, + buffer_reader & buffer_transform & buffer_storage); - number_of_nodes += resulting_nodes.size(); - // put parsed objects thru extractor callbacks - for (const auto &result : resulting_nodes) - { - extractor_callbacks->ProcessNode( - static_cast(*(osm_elements[result.first])), result.second); - } - number_of_ways += resulting_ways.size(); - for (const auto &result : resulting_ways) - { - extractor_callbacks->ProcessWay( - static_cast(*(osm_elements[result.first])), result.second); - } - number_of_relations += resulting_restrictions.size(); - for (const auto &result : resulting_restrictions) - { - extractor_callbacks->ProcessRestriction(result); - } - } TIMER_STOP(parsing); util::Log() << "Parsing finished after " << TIMER_SEC(parsing) << " seconds"; diff --git a/src/extractor/scripting_environment_lua.cpp b/src/extractor/scripting_environment_lua.cpp index 4755b3b08..926d2b9a4 100644 --- a/src/extractor/scripting_environment_lua.cpp +++ b/src/extractor/scripting_environment_lua.cpp @@ -489,61 +489,54 @@ LuaScriptingContext &Sol2ScriptingEnvironment::GetSol2Context() } void Sol2ScriptingEnvironment::ProcessElements( - const std::vector &osm_elements, + const osmium::memory::Buffer &buffer, const RestrictionParser &restriction_parser, - tbb::concurrent_vector> &resulting_nodes, - tbb::concurrent_vector> &resulting_ways, - tbb::concurrent_vector> &resulting_restrictions) + std::vector> &resulting_nodes, + std::vector> &resulting_ways, + std::vector> &resulting_restrictions) { - // parse OSM entities in parallel, store in resulting vectors - tbb::parallel_for( - tbb::blocked_range(0, osm_elements.size()), - [&](const tbb::blocked_range &range) { - ExtractionNode result_node; - ExtractionWay result_way; - std::vector result_res; - auto &local_context = this->GetSol2Context(); + ExtractionNode result_node; + ExtractionWay result_way; + std::vector result_res; + auto &local_context = this->GetSol2Context(); - for (auto x = range.begin(), end = range.end(); x != end; ++x) + for (auto entity = buffer.cbegin(), end = buffer.cend(); entity != end; ++entity) + { + switch (entity->type()) + { + case osmium::item_type::node: + result_node.clear(); + if (local_context.has_node_function && + (!static_cast(*entity).tags().empty() || + local_context.properties.call_tagless_node_function)) { - const auto entity = osm_elements[x]; - - switch (entity->type()) - { - case osmium::item_type::node: - result_node.clear(); - if (local_context.has_node_function && - (!static_cast(*entity).tags().empty() || - local_context.properties.call_tagless_node_function)) - { - local_context.ProcessNode(static_cast(*entity), - result_node); - } - resulting_nodes.push_back(std::make_pair(x, std::move(result_node))); - break; - case osmium::item_type::way: - result_way.clear(); - if (local_context.has_way_function) - { - local_context.ProcessWay(static_cast(*entity), - result_way); - } - resulting_ways.push_back(std::make_pair(x, std::move(result_way))); - break; - case osmium::item_type::relation: - result_res.clear(); - result_res = - restriction_parser.TryParse(static_cast(*entity)); - for (const InputRestrictionContainer &r : result_res) - { - resulting_restrictions.push_back(r); - } - break; - default: - break; - } + local_context.ProcessNode(static_cast(*entity), result_node); } - }); + resulting_nodes.push_back(std::pair( + static_cast(*entity), std::move(result_node))); + break; + case osmium::item_type::way: + result_way.clear(); + if (local_context.has_way_function) + { + local_context.ProcessWay(static_cast(*entity), result_way); + } + resulting_ways.push_back(std::pair( + static_cast(*entity), std::move(result_way))); + break; + case osmium::item_type::relation: + result_res.clear(); + result_res = + restriction_parser.TryParse(static_cast(*entity)); + for (const InputRestrictionContainer &r : result_res) + { + resulting_restrictions.push_back(r); + } + break; + default: + break; + } + } } std::vector Sol2ScriptingEnvironment::GetNameSuffixList()