diff --git a/Extractor/ExtractionWay.h b/Extractor/ExtractionWay.h index 740c4beaf..d42595a7b 100644 --- a/Extractor/ExtractionWay.h +++ b/Extractor/ExtractionWay.h @@ -40,6 +40,7 @@ struct ExtractionWay void Clear() { + id = -1; forward_speed = -1; backward_speed = -1; duration = -1; @@ -106,6 +107,7 @@ struct ExtractionWay void set_backward_mode(const TravelMode m) { backward_travel_mode = m; } const TravelMode get_backward_mode() const { return backward_travel_mode; } + int64_t id; double forward_speed; double backward_speed; double duration; diff --git a/Extractor/Extractor.cpp b/Extractor/Extractor.cpp index 00e2e615d..c442ad325 100644 --- a/Extractor/Extractor.cpp +++ b/Extractor/Extractor.cpp @@ -37,10 +37,12 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "../Util/GitDescription.h" #include "../Util/IniFileUtil.h" +#include "../DataStructures/ConcurrentQueue.h" #include "../Util/OSRMException.h" #include "../Util/simple_logger.hpp" #include "../Util/TimingUtil.h" #include "../Util/make_unique.hpp" + #include "../typedefs.h" #include @@ -49,16 +51,28 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include +#include + #include +#include #include #include #include #include #include +#include namespace { + +struct ResultBuffer +{ + std::vector> nodes; + std::vector> ways; + std::vector> restrictions; +}; + int lua_error_callback(lua_State *L) // This is so I can use my own function as an // exception handler, pcall_log() { @@ -147,7 +161,7 @@ int Extractor::Run(int argc, char *argv[]) } SimpleLogger().Write() << "input file generated by " << generator; - // TODO: write timestamp if non-empty + // write .timestamp data file std::string timestamp = header.get("osmosis_replication_timestamp"); if (timestamp.empty()) { @@ -164,12 +178,35 @@ int Extractor::Run(int argc, char *argv[]) RestrictionParser restriction_parser(scripting_environment); - ExtractionNode result_node; - ExtractionWay result_way; + // move to header + std::atomic_bool parsing_done {false}; + std::atomic_bool loading_done {false}; - while (osmium::memory::Buffer buffer = reader.read()) + ConcurrentQueue> parse_queue(128); + ConcurrentQueue> result_queue(128); + + std::thread loading_thread([&]{ + while (osmium::memory::Buffer buffer = reader.read()) + { + parse_queue.push(std::make_shared(std::move(buffer))); + } + loading_done = true; + }); + + // parsing threads + while (!loading_done || !parse_queue.empty()) { - for (osmium::OSMEntity &entity : buffer) + std::shared_ptr current_buffer; + if (!parse_queue.try_pop(current_buffer)) + { + continue; + } + + ExtractionNode result_node; + ExtractionWay result_way; + + std::shared_ptr result_buffer = std::make_shared();; + for (osmium::OSMEntity &entity : *current_buffer) { switch (entity.type()) { @@ -180,8 +217,7 @@ int Extractor::Run(int argc, char *argv[]) "node_function", boost::cref(static_cast(entity)), boost::ref(result_node)); - extractor_callbacks->ProcessNode(static_cast(entity), - result_node); + result_buffer->nodes.emplace_back(osmium::NodeRef{static_cast(entity).id(), static_cast(entity).location()}, result_node); break; case osmium::item_type::way: ++number_of_ways; @@ -190,19 +226,85 @@ int Extractor::Run(int argc, char *argv[]) "way_function", boost::cref(static_cast(entity)), boost::ref(result_way)); - extractor_callbacks->ProcessWay(static_cast(entity), result_way); + result_way.id = static_cast(entity).id(); + result_buffer->ways.emplace_back(osmium::WayNodeList{static_cast(entity).nodes()}, result_way); break; case osmium::item_type::relation: ++number_of_relations; - extractor_callbacks->ProcessRestriction( - restriction_parser.TryParse(static_cast(entity))); + result_buffer->restrictions.emplace_back(restriction_parser.TryParse(static_cast(entity))); break; default: ++number_of_others; break; } } + result_queue.push(result_buffer); + parsing_done = true; } + + while (!parsing_done || !result_queue.empty()) + { + std::shared_ptr current_buffer; + if (!result_queue.try_pop(current_buffer)) + { + for (const auto &node : current_buffer->nodes) + { + extractor_callbacks->ProcessNode(node.first, + node.second); + } + for (auto &way : current_buffer->ways) + { + extractor_callbacks->ProcessWay(way.first, + way.second); + } + for (const auto &restriction : current_buffer->restrictions) + { + extractor_callbacks->ProcessRestriction(restriction); + } + + } + } + + loading_thread.join(); + + // TODO: join parser threads + + // while (osmium::memory::Buffer buffer = reader.read()) + // { + // for (osmium::OSMEntity &entity : buffer) + // { + // switch (entity.type()) + // { + // case osmium::item_type::node: + // ++number_of_nodes; + // result_node.Clear(); + // luabind::call_function(lua_state, + // "node_function", + // boost::cref(static_cast(entity)), + // boost::ref(result_node)); + // extractor_callbacks->ProcessNode(static_cast(entity), + // result_node); + // break; + // case osmium::item_type::way: + // ++number_of_ways; + // result_way.Clear(); + // luabind::call_function(lua_state, + // "way_function", + // boost::cref(static_cast(entity)), + // boost::ref(result_way)); + // extractor_callbacks->ProcessWay(static_cast(entity), result_way); + // break; + // case osmium::item_type::relation: + // ++number_of_relations; + // extractor_callbacks->ProcessRestriction( + // restriction_parser.TryParse(static_cast(entity))); + // break; + // default: + // ++number_of_others; + // break; + // } + // } + // } TIMER_STOP(parsing); SimpleLogger().Write() << "Parsing finished after " << TIMER_SEC(parsing) << " seconds"; SimpleLogger().Write() << "Raw input contains " << number_of_nodes << " nodes, " diff --git a/Extractor/ExtractorCallbacks.cpp b/Extractor/ExtractorCallbacks.cpp index 170c4437a..a14f09a2b 100644 --- a/Extractor/ExtractorCallbacks.cpp +++ b/Extractor/ExtractorCallbacks.cpp @@ -48,13 +48,13 @@ ExtractorCallbacks::ExtractorCallbacks(ExtractionContainers &extraction_containe } /** warning: caller needs to take care of synchronization! */ -void ExtractorCallbacks::ProcessNode(const osmium::Node &osm_input_node, +void ExtractorCallbacks::ProcessNode(const osmium::NodeRef &osm_input_node, const ExtractionNode &result_node) { external_memory.all_nodes_list.push_back({ static_cast(osm_input_node.location().lat() * COORDINATE_PRECISION), static_cast(osm_input_node.location().lon() * COORDINATE_PRECISION), - static_cast(osm_input_node.id()), + static_cast(osm_input_node.ref()), result_node.barrier, result_node.traffic_lights }); @@ -69,7 +69,7 @@ void ExtractorCallbacks::ProcessRestriction( } } /** warning: caller needs to take care of synchronization! */ -void ExtractorCallbacks::ProcessWay(const osmium::Way &input_way, ExtractionWay &parsed_way) +void ExtractorCallbacks::ProcessWay(const osmium::WayNodeList &input_way, const ExtractionWay &parsed_way) { if (((0 >= parsed_way.forward_speed) || (TRAVEL_MODE_INACCESSIBLE == parsed_way.forward_travel_mode)) && @@ -80,15 +80,15 @@ void ExtractorCallbacks::ProcessWay(const osmium::Way &input_way, ExtractionWay return; } - if (input_way.nodes().size() <= 1) + if (input_way.size() <= 1) { // safe-guard against broken data return; } - if (std::numeric_limits::max() == input_way.id()) + if (std::numeric_limits::max() == parsed_way.id) { - SimpleLogger().Write(logDEBUG) << "found bogus way with id: " << input_way.id() - << " of size " << input_way.nodes().size(); + SimpleLogger().Write(logDEBUG) << "found bogus way with id: " << parsed_way.id + << " of size " << input_way.size(); return; } @@ -96,13 +96,13 @@ void ExtractorCallbacks::ProcessWay(const osmium::Way &input_way, ExtractionWay { // TODO: iterate all way segments and set duration corresponding to the length of each // segment - parsed_way.forward_speed = parsed_way.duration / (input_way.nodes().size() - 1); - parsed_way.backward_speed = parsed_way.duration / (input_way.nodes().size() - 1); + const_cast(parsed_way).forward_speed = parsed_way.duration / (input_way.size() - 1); + const_cast(parsed_way).backward_speed = parsed_way.duration / (input_way.size() - 1); } if (std::numeric_limits::epsilon() >= std::abs(-1. - parsed_way.forward_speed)) { - SimpleLogger().Write(logDEBUG) << "found way with bogus speed, id: " << input_way.id(); + SimpleLogger().Write(logDEBUG) << "found way with bogus speed, id: " << parsed_way.id; return; } @@ -151,27 +151,27 @@ void ExtractorCallbacks::ProcessWay(const osmium::Way &input_way, ExtractionWay const bool is_opposite_way = TRAVEL_MODE_INACCESSIBLE == parsed_way.forward_travel_mode; if (is_opposite_way) { - parsed_way.forward_travel_mode = parsed_way.backward_travel_mode; - parsed_way.backward_travel_mode = TRAVEL_MODE_INACCESSIBLE; + const_cast(parsed_way).forward_travel_mode = parsed_way.backward_travel_mode; + const_cast(parsed_way).backward_travel_mode = TRAVEL_MODE_INACCESSIBLE; osrm::for_each_pair( - input_way.nodes().crbegin(), input_way.nodes().crend(), pair_wise_segment_split); - external_memory.used_node_id_list.push_back(input_way.nodes().front().ref()); + input_way.crbegin(), input_way.crend(), pair_wise_segment_split); + external_memory.used_node_id_list.push_back(input_way.front().ref()); } else { osrm::for_each_pair( - input_way.nodes().cbegin(), input_way.nodes().cend(), pair_wise_segment_split); - external_memory.used_node_id_list.push_back(input_way.nodes().back().ref()); + input_way.cbegin(), input_way.cend(), pair_wise_segment_split); + external_memory.used_node_id_list.push_back(input_way.back().ref()); } // The following information is needed to identify start and end segments of restrictions // The following information is needed to identify start and end segments of restrictions external_memory.way_start_end_id_list.push_back( - {(EdgeID)input_way.id(), - (NodeID)input_way.nodes()[0].ref(), - (NodeID)input_way.nodes()[1].ref(), - (NodeID)input_way.nodes()[input_way.nodes().size() - 2].ref(), - (NodeID)input_way.nodes().back().ref()}); + {(EdgeID)parsed_way.id, + (NodeID)input_way[0].ref(), + (NodeID)input_way[1].ref(), + (NodeID)input_way[input_way.size() - 2].ref(), + (NodeID)input_way.back().ref()}); if (split_edge) { // Only true if the way should be split @@ -198,24 +198,24 @@ void ExtractorCallbacks::ProcessWay(const osmium::Way &input_way, ExtractionWay if (is_opposite_way) { // SimpleLogger().Write() << "opposite2"; - osrm::for_each_pair(input_way.nodes().crbegin(), - input_way.nodes().crend(), + osrm::for_each_pair(input_way.crbegin(), + input_way.crend(), pair_wise_segment_split_2); - external_memory.used_node_id_list.push_back(input_way.nodes().front().ref()); + external_memory.used_node_id_list.push_back(input_way.front().ref()); } else { - osrm::for_each_pair(input_way.nodes().cbegin(), - input_way.nodes().cend(), + osrm::for_each_pair(input_way.cbegin(), + input_way.cend(), pair_wise_segment_split_2); - external_memory.used_node_id_list.push_back(input_way.nodes().back().ref()); + external_memory.used_node_id_list.push_back(input_way.back().ref()); } external_memory.way_start_end_id_list.push_back( - {(EdgeID)input_way.id(), - (NodeID)input_way.nodes()[1].ref(), - (NodeID)input_way.nodes()[0].ref(), - (NodeID)input_way.nodes().back().ref(), - (NodeID)input_way.nodes()[input_way.nodes().size() - 2].ref()}); + {(EdgeID)parsed_way.id, + (NodeID)input_way[1].ref(), + (NodeID)input_way[0].ref(), + (NodeID)input_way.back().ref(), + (NodeID)input_way[input_way.size() - 2].ref()}); } } diff --git a/Extractor/ExtractorCallbacks.h b/Extractor/ExtractorCallbacks.h index e43b02b52..baf0f7b67 100644 --- a/Extractor/ExtractorCallbacks.h +++ b/Extractor/ExtractorCallbacks.h @@ -56,13 +56,13 @@ class ExtractorCallbacks std::unordered_map &string_map); // warning: caller needs to take care of synchronization! - void ProcessNode(const osmium::Node ¤t_node, const ExtractionNode &result_node); + void ProcessNode(const osmium::NodeRef ¤t_node, const ExtractionNode &result_node); // warning: caller needs to take care of synchronization! void ProcessRestriction(const mapbox::util::optional &restriction); // warning: caller needs to take care of synchronization! - void ProcessWay(const osmium::Way ¤t_way, ExtractionWay &result_way); + void ProcessWay(const osmium::WayNodeList ¤t_way, const ExtractionWay &result_way); }; #endif /* EXTRACTOR_CALLBACKS_H */