Restructure ParseOSMData method

This commit is contained in:
Michael Krasnyk 2017-09-22 12:21:32 +02:00
parent 12b2242ad5
commit 2059f7234a

View File

@ -41,7 +41,7 @@
#include <boost/scope_exit.hpp>
#include <osmium/handler/node_locations_for_ways.hpp>
#include <osmium/index/map/sparse_mem_map.hpp>
#include <osmium/index/map/flex_mem.hpp>
#include <osmium/io/any_input.hpp>
#include <osmium/thread/pool.hpp>
#include <osmium/visitor.hpp>
@ -293,28 +293,12 @@ Extractor::ParseOSMData(ScriptingEnvironment &scripting_environment,
const osmium::io::File input_file(config.input_path.string());
osmium::thread::Pool pool(number_of_threads);
std::unique_ptr<osmium::io::Reader> reader(new osmium::io::Reader(
input_file,
osmium::osm_entity_bits::relation,
(config.use_metadata ? osmium::io::read_meta::yes : osmium::io::read_meta::no)));
osmium::io::Header header = reader->header();
unsigned number_of_nodes = 0;
unsigned number_of_ways = 0;
unsigned number_of_relations = 0;
util::Log() << "Parsing in progress..";
TIMER_START(parsing);
ExtractionContainers extraction_containers;
ExtractorCallbacks::ClassesMap classes_map;
guidance::LaneDescriptionMap turn_lane_map;
auto extractor_callbacks =
std::make_unique<ExtractorCallbacks>(extraction_containers,
classes_map,
turn_lane_map,
scripting_environment.GetProfileProperties());
{ // Parse OSM header
osmium::io::Reader reader(input_file, osmium::osm_entity_bits::nothing);
osmium::io::Header header = reader.header();
std::string generator = header.get("generator");
if (generator.empty())
@ -335,6 +319,17 @@ Extractor::ParseOSMData(ScriptingEnvironment &scripting_environment,
storage::io::FileWriter::GenerateFingerprint);
timestamp_file.WriteFrom(timestamp.c_str(), timestamp.length());
}
// Extraction containers and restriction parser
ExtractionContainers extraction_containers;
ExtractorCallbacks::ClassesMap classes_map;
guidance::LaneDescriptionMap turn_lane_map;
auto extractor_callbacks =
std::make_unique<ExtractorCallbacks>(extraction_containers,
classes_map,
turn_lane_map,
scripting_environment.GetProfileProperties());
ExtractionRelationContainer relations;
std::vector<std::string> restrictions = scripting_environment.GetRestrictions();
@ -344,6 +339,7 @@ Extractor::ParseOSMData(ScriptingEnvironment &scripting_environment,
config.parse_conditionals,
restrictions);
// OSM data reader
using SharedBuffer = std::shared_ptr<osmium::memory::Buffer>;
struct ParsedBuffer
{
@ -354,10 +350,10 @@ Extractor::ParseOSMData(ScriptingEnvironment &scripting_environment,
std::vector<InputConditionalTurnRestriction> resulting_restrictions;
};
// Read OSM data
tbb::filter_t<void, SharedBuffer> buffer_reader(
tbb::filter::serial_in_order, [&](tbb::flow_control &fc) {
if (auto buffer = reader->read())
const auto buffer_reader = [](osmium::io::Reader &reader) {
return tbb::filter_t<void, SharedBuffer>(
tbb::filter::serial_in_order, [&reader](tbb::flow_control &fc) {
if (auto buffer = reader.read())
{
return std::make_shared<osmium::memory::Buffer>(std::move(buffer));
}
@ -367,10 +363,11 @@ Extractor::ParseOSMData(ScriptingEnvironment &scripting_environment,
return SharedBuffer{};
}
});
};
// Cache node locations (assumes nodes are placed before ways)
// Node locations cache (assumes nodes are placed before ways)
using osmium_index_type =
osmium::index::map::SparseMemMap<osmium::unsigned_object_id_type, osmium::Location>;
osmium::index::map::FlexMem<osmium::unsigned_object_id_type, osmium::Location>;
using osmium_location_handler_type = osmium::handler::NodeLocationsForWays<osmium_index_type>;
osmium_index_type location_cache;
@ -382,50 +379,48 @@ Extractor::ParseOSMData(ScriptingEnvironment &scripting_environment,
return buffer;
});
// Process OSM elements
tbb::filter_t<SharedBuffer, std::shared_ptr<ParsedBuffer>> buffer_transform(
// OSM elements Lua parser
tbb::filter_t<SharedBuffer, ParsedBuffer> buffer_transformer(
tbb::filter::parallel, [&](const SharedBuffer buffer) {
if (!buffer)
return std::shared_ptr<ParsedBuffer>{};
auto parsed_buffer = std::make_shared<ParsedBuffer>();
parsed_buffer->buffer = buffer;
ParsedBuffer parsed_buffer;
parsed_buffer.buffer = buffer;
scripting_environment.ProcessElements(*buffer,
restriction_parser,
relations,
parsed_buffer->resulting_nodes,
parsed_buffer->resulting_ways,
parsed_buffer->resulting_relations,
parsed_buffer->resulting_restrictions);
parsed_buffer.resulting_nodes,
parsed_buffer.resulting_ways,
parsed_buffer.resulting_relations,
parsed_buffer.resulting_restrictions);
return parsed_buffer;
});
//
tbb::filter_t<std::shared_ptr<ParsedBuffer>, void> buffer_storage(
tbb::filter::serial_in_order, [&](const std::shared_ptr<ParsedBuffer> parsed_buffer) {
if (!parsed_buffer)
return;
// Parsed nodes and ways handler
unsigned number_of_nodes = 0;
unsigned number_of_ways = 0;
tbb::filter_t<ParsedBuffer, void> buffer_storage(
tbb::filter::serial_in_order, [&](const ParsedBuffer &parsed_buffer) {
number_of_nodes += parsed_buffer->resulting_nodes.size();
number_of_nodes += parsed_buffer.resulting_nodes.size();
// put parsed objects thru extractor callbacks
for (const auto &result : parsed_buffer->resulting_nodes)
for (const auto &result : parsed_buffer.resulting_nodes)
{
extractor_callbacks->ProcessNode(result.first, result.second);
}
number_of_ways += parsed_buffer->resulting_ways.size();
for (const auto &result : parsed_buffer->resulting_ways)
number_of_ways += parsed_buffer.resulting_ways.size();
for (const auto &result : parsed_buffer.resulting_ways)
{
extractor_callbacks->ProcessWay(result.first, result.second);
}
});
tbb::filter_t<std::shared_ptr<ParsedBuffer>, void> buffer_storage_relation(
tbb::filter::serial_in_order, [&](const std::shared_ptr<ParsedBuffer> parsed_buffer) {
if (!parsed_buffer)
return;
// Parsed relations handler
unsigned number_of_relations = 0;
tbb::filter_t<ParsedBuffer, void> buffer_storage_relation(
tbb::filter::serial_in_order, [&](const ParsedBuffer &parsed_buffer) {
number_of_relations += parsed_buffer->resulting_relations.size();
for (const auto &result : parsed_buffer->resulting_relations)
number_of_relations += parsed_buffer.resulting_relations.size();
for (const auto &result : parsed_buffer.resulting_relations)
{
/// TODO: add restriction processing
if (result.second.is_restriction)
@ -434,33 +429,35 @@ Extractor::ParseOSMData(ScriptingEnvironment &scripting_environment,
relations.AddRelation(result.second);
}
for (const auto &result : parsed_buffer->resulting_restrictions)
for (const auto &result : parsed_buffer.resulting_restrictions)
{
extractor_callbacks->ProcessRestriction(result);
}
});
/* Main trick that we can use the same pipeline. It just receive relation objects
* from osmium. So other containers would be empty and doesn't process anything
*/
util::Log() << "Parse relations ...";
tbb::parallel_pipeline(tbb::task_scheduler_init::default_num_threads() * 1.5,
buffer_reader & buffer_transform & buffer_storage_relation);
reader->close();
/* At this step we just filter ways and nodes from osmium, so any relation wouldn't be
* processed there.
*/
util::Log() << "Parse ways and nodes ...";
reader.reset(new osmium::io::Reader(
input_file,
osmium::osm_entity_bits::node | osmium::osm_entity_bits::way,
(config.use_metadata ? osmium::io::read_meta::yes : osmium::io::read_meta::no)));
// Parse OSM elements with parallel transformer
// Number of pipeline tokens that yielded the best speedup was about 1.5 * num_cores
const auto num_threads = tbb::task_scheduler_init::default_num_threads() * 1.5;
const auto read_meta =
config.use_metadata ? osmium::io::read_meta::yes : osmium::io::read_meta::no;
{ // Relations reading pipeline
util::Log() << "Parse relations ...";
osmium::io::Reader reader(input_file, osmium::osm_entity_bits::relation, read_meta);
tbb::parallel_pipeline(
num_threads, buffer_reader(reader) & buffer_transformer & buffer_storage_relation);
}
{ // Nodes and ways reading pipeline
util::Log() << "Parse ways and nodes ...";
osmium::io::Reader reader(
input_file, osmium::osm_entity_bits::node | osmium::osm_entity_bits::way, read_meta);
// TODO: make location_cacher conditional
tbb::parallel_pipeline(tbb::task_scheduler_init::default_num_threads() * 1.5,
buffer_reader & location_cacher & buffer_transform & buffer_storage);
const auto pipeline =
buffer_reader(reader) & location_cacher & buffer_transformer & buffer_storage;
tbb::parallel_pipeline(num_threads, pipeline);
}
TIMER_STOP(parsing);
util::Log() << "Parsing finished after " << TIMER_SEC(parsing) << " seconds";