basic parallelization infrastructure
This commit is contained in:
parent
a0256dfc60
commit
344bdbb707
@ -40,6 +40,7 @@ struct ExtractionWay
|
|||||||
|
|
||||||
void Clear()
|
void Clear()
|
||||||
{
|
{
|
||||||
|
id = -1;
|
||||||
forward_speed = -1;
|
forward_speed = -1;
|
||||||
backward_speed = -1;
|
backward_speed = -1;
|
||||||
duration = -1;
|
duration = -1;
|
||||||
@ -106,6 +107,7 @@ struct ExtractionWay
|
|||||||
void set_backward_mode(const TravelMode m) { backward_travel_mode = m; }
|
void set_backward_mode(const TravelMode m) { backward_travel_mode = m; }
|
||||||
const TravelMode get_backward_mode() const { return backward_travel_mode; }
|
const TravelMode get_backward_mode() const { return backward_travel_mode; }
|
||||||
|
|
||||||
|
int64_t id;
|
||||||
double forward_speed;
|
double forward_speed;
|
||||||
double backward_speed;
|
double backward_speed;
|
||||||
double duration;
|
double duration;
|
||||||
|
@ -37,10 +37,12 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|||||||
|
|
||||||
#include "../Util/GitDescription.h"
|
#include "../Util/GitDescription.h"
|
||||||
#include "../Util/IniFileUtil.h"
|
#include "../Util/IniFileUtil.h"
|
||||||
|
#include "../DataStructures/ConcurrentQueue.h"
|
||||||
#include "../Util/OSRMException.h"
|
#include "../Util/OSRMException.h"
|
||||||
#include "../Util/simple_logger.hpp"
|
#include "../Util/simple_logger.hpp"
|
||||||
#include "../Util/TimingUtil.h"
|
#include "../Util/TimingUtil.h"
|
||||||
#include "../Util/make_unique.hpp"
|
#include "../Util/make_unique.hpp"
|
||||||
|
|
||||||
#include "../typedefs.h"
|
#include "../typedefs.h"
|
||||||
|
|
||||||
#include <luabind/luabind.hpp>
|
#include <luabind/luabind.hpp>
|
||||||
@ -49,16 +51,28 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|||||||
|
|
||||||
#include <tbb/task_scheduler_init.h>
|
#include <tbb/task_scheduler_init.h>
|
||||||
|
|
||||||
|
#include <variant/optional.hpp>
|
||||||
|
|
||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
|
|
||||||
|
struct ResultBuffer
|
||||||
|
{
|
||||||
|
std::vector<std::pair<osmium::NodeRef, ExtractionNode>> nodes;
|
||||||
|
std::vector<std::pair<osmium::WayNodeList, ExtractionWay>> ways;
|
||||||
|
std::vector<mapbox::util::optional<InputRestrictionContainer>> restrictions;
|
||||||
|
};
|
||||||
|
|
||||||
int lua_error_callback(lua_State *L) // This is so I can use my own function as an
|
int lua_error_callback(lua_State *L) // This is so I can use my own function as an
|
||||||
// exception handler, pcall_log()
|
// exception handler, pcall_log()
|
||||||
{
|
{
|
||||||
@ -147,7 +161,7 @@ int Extractor::Run(int argc, char *argv[])
|
|||||||
}
|
}
|
||||||
SimpleLogger().Write() << "input file generated by " << generator;
|
SimpleLogger().Write() << "input file generated by " << generator;
|
||||||
|
|
||||||
// TODO: write timestamp if non-empty
|
// write .timestamp data file
|
||||||
std::string timestamp = header.get("osmosis_replication_timestamp");
|
std::string timestamp = header.get("osmosis_replication_timestamp");
|
||||||
if (timestamp.empty())
|
if (timestamp.empty())
|
||||||
{
|
{
|
||||||
@ -164,12 +178,35 @@ int Extractor::Run(int argc, char *argv[])
|
|||||||
|
|
||||||
RestrictionParser restriction_parser(scripting_environment);
|
RestrictionParser restriction_parser(scripting_environment);
|
||||||
|
|
||||||
|
// move to header
|
||||||
|
std::atomic_bool parsing_done {false};
|
||||||
|
std::atomic_bool loading_done {false};
|
||||||
|
|
||||||
|
ConcurrentQueue<std::shared_ptr<osmium::memory::Buffer>> parse_queue(128);
|
||||||
|
ConcurrentQueue<std::shared_ptr<ResultBuffer>> result_queue(128);
|
||||||
|
|
||||||
|
std::thread loading_thread([&]{
|
||||||
|
while (osmium::memory::Buffer buffer = reader.read())
|
||||||
|
{
|
||||||
|
parse_queue.push(std::make_shared<osmium::memory::Buffer>(std::move(buffer)));
|
||||||
|
}
|
||||||
|
loading_done = true;
|
||||||
|
});
|
||||||
|
|
||||||
|
// parsing threads
|
||||||
|
while (!loading_done || !parse_queue.empty())
|
||||||
|
{
|
||||||
|
std::shared_ptr<osmium::memory::Buffer> current_buffer;
|
||||||
|
if (!parse_queue.try_pop(current_buffer))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
ExtractionNode result_node;
|
ExtractionNode result_node;
|
||||||
ExtractionWay result_way;
|
ExtractionWay result_way;
|
||||||
|
|
||||||
while (osmium::memory::Buffer buffer = reader.read())
|
std::shared_ptr<ResultBuffer> result_buffer = std::make_shared<ResultBuffer>();;
|
||||||
{
|
for (osmium::OSMEntity &entity : *current_buffer)
|
||||||
for (osmium::OSMEntity &entity : buffer)
|
|
||||||
{
|
{
|
||||||
switch (entity.type())
|
switch (entity.type())
|
||||||
{
|
{
|
||||||
@ -180,8 +217,7 @@ int Extractor::Run(int argc, char *argv[])
|
|||||||
"node_function",
|
"node_function",
|
||||||
boost::cref(static_cast<osmium::Node &>(entity)),
|
boost::cref(static_cast<osmium::Node &>(entity)),
|
||||||
boost::ref(result_node));
|
boost::ref(result_node));
|
||||||
extractor_callbacks->ProcessNode(static_cast<osmium::Node &>(entity),
|
result_buffer->nodes.emplace_back(osmium::NodeRef{static_cast<osmium::Node &>(entity).id(), static_cast<osmium::Node &>(entity).location()}, result_node);
|
||||||
result_node);
|
|
||||||
break;
|
break;
|
||||||
case osmium::item_type::way:
|
case osmium::item_type::way:
|
||||||
++number_of_ways;
|
++number_of_ways;
|
||||||
@ -190,19 +226,85 @@ int Extractor::Run(int argc, char *argv[])
|
|||||||
"way_function",
|
"way_function",
|
||||||
boost::cref(static_cast<osmium::Way &>(entity)),
|
boost::cref(static_cast<osmium::Way &>(entity)),
|
||||||
boost::ref(result_way));
|
boost::ref(result_way));
|
||||||
extractor_callbacks->ProcessWay(static_cast<osmium::Way &>(entity), result_way);
|
result_way.id = static_cast<osmium::Way &>(entity).id();
|
||||||
|
result_buffer->ways.emplace_back(osmium::WayNodeList{static_cast<osmium::Way &>(entity).nodes()}, result_way);
|
||||||
break;
|
break;
|
||||||
case osmium::item_type::relation:
|
case osmium::item_type::relation:
|
||||||
++number_of_relations;
|
++number_of_relations;
|
||||||
extractor_callbacks->ProcessRestriction(
|
result_buffer->restrictions.emplace_back(restriction_parser.TryParse(static_cast<osmium::Relation &>(entity)));
|
||||||
restriction_parser.TryParse(static_cast<osmium::Relation &>(entity)));
|
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
++number_of_others;
|
++number_of_others;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
result_queue.push(result_buffer);
|
||||||
|
parsing_done = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
while (!parsing_done || !result_queue.empty())
|
||||||
|
{
|
||||||
|
std::shared_ptr<ResultBuffer> current_buffer;
|
||||||
|
if (!result_queue.try_pop(current_buffer))
|
||||||
|
{
|
||||||
|
for (const auto &node : current_buffer->nodes)
|
||||||
|
{
|
||||||
|
extractor_callbacks->ProcessNode(node.first,
|
||||||
|
node.second);
|
||||||
|
}
|
||||||
|
for (auto &way : current_buffer->ways)
|
||||||
|
{
|
||||||
|
extractor_callbacks->ProcessWay(way.first,
|
||||||
|
way.second);
|
||||||
|
}
|
||||||
|
for (const auto &restriction : current_buffer->restrictions)
|
||||||
|
{
|
||||||
|
extractor_callbacks->ProcessRestriction(restriction);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
loading_thread.join();
|
||||||
|
|
||||||
|
// TODO: join parser threads
|
||||||
|
|
||||||
|
// while (osmium::memory::Buffer buffer = reader.read())
|
||||||
|
// {
|
||||||
|
// for (osmium::OSMEntity &entity : buffer)
|
||||||
|
// {
|
||||||
|
// switch (entity.type())
|
||||||
|
// {
|
||||||
|
// case osmium::item_type::node:
|
||||||
|
// ++number_of_nodes;
|
||||||
|
// result_node.Clear();
|
||||||
|
// luabind::call_function<void>(lua_state,
|
||||||
|
// "node_function",
|
||||||
|
// boost::cref(static_cast<osmium::Node &>(entity)),
|
||||||
|
// boost::ref(result_node));
|
||||||
|
// extractor_callbacks->ProcessNode(static_cast<osmium::Node &>(entity),
|
||||||
|
// result_node);
|
||||||
|
// break;
|
||||||
|
// case osmium::item_type::way:
|
||||||
|
// ++number_of_ways;
|
||||||
|
// result_way.Clear();
|
||||||
|
// luabind::call_function<void>(lua_state,
|
||||||
|
// "way_function",
|
||||||
|
// boost::cref(static_cast<osmium::Way &>(entity)),
|
||||||
|
// boost::ref(result_way));
|
||||||
|
// extractor_callbacks->ProcessWay(static_cast<osmium::Way &>(entity), result_way);
|
||||||
|
// break;
|
||||||
|
// case osmium::item_type::relation:
|
||||||
|
// ++number_of_relations;
|
||||||
|
// extractor_callbacks->ProcessRestriction(
|
||||||
|
// restriction_parser.TryParse(static_cast<osmium::Relation &>(entity)));
|
||||||
|
// break;
|
||||||
|
// default:
|
||||||
|
// ++number_of_others;
|
||||||
|
// break;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
TIMER_STOP(parsing);
|
TIMER_STOP(parsing);
|
||||||
SimpleLogger().Write() << "Parsing finished after " << TIMER_SEC(parsing) << " seconds";
|
SimpleLogger().Write() << "Parsing finished after " << TIMER_SEC(parsing) << " seconds";
|
||||||
SimpleLogger().Write() << "Raw input contains " << number_of_nodes << " nodes, "
|
SimpleLogger().Write() << "Raw input contains " << number_of_nodes << " nodes, "
|
||||||
|
@ -48,13 +48,13 @@ ExtractorCallbacks::ExtractorCallbacks(ExtractionContainers &extraction_containe
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** warning: caller needs to take care of synchronization! */
|
/** warning: caller needs to take care of synchronization! */
|
||||||
void ExtractorCallbacks::ProcessNode(const osmium::Node &osm_input_node,
|
void ExtractorCallbacks::ProcessNode(const osmium::NodeRef &osm_input_node,
|
||||||
const ExtractionNode &result_node)
|
const ExtractionNode &result_node)
|
||||||
{
|
{
|
||||||
external_memory.all_nodes_list.push_back({
|
external_memory.all_nodes_list.push_back({
|
||||||
static_cast<int>(osm_input_node.location().lat() * COORDINATE_PRECISION),
|
static_cast<int>(osm_input_node.location().lat() * COORDINATE_PRECISION),
|
||||||
static_cast<int>(osm_input_node.location().lon() * COORDINATE_PRECISION),
|
static_cast<int>(osm_input_node.location().lon() * COORDINATE_PRECISION),
|
||||||
static_cast<NodeID>(osm_input_node.id()),
|
static_cast<NodeID>(osm_input_node.ref()),
|
||||||
result_node.barrier,
|
result_node.barrier,
|
||||||
result_node.traffic_lights
|
result_node.traffic_lights
|
||||||
});
|
});
|
||||||
@ -69,7 +69,7 @@ void ExtractorCallbacks::ProcessRestriction(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
/** warning: caller needs to take care of synchronization! */
|
/** warning: caller needs to take care of synchronization! */
|
||||||
void ExtractorCallbacks::ProcessWay(const osmium::Way &input_way, ExtractionWay &parsed_way)
|
void ExtractorCallbacks::ProcessWay(const osmium::WayNodeList &input_way, const ExtractionWay &parsed_way)
|
||||||
{
|
{
|
||||||
if (((0 >= parsed_way.forward_speed) ||
|
if (((0 >= parsed_way.forward_speed) ||
|
||||||
(TRAVEL_MODE_INACCESSIBLE == parsed_way.forward_travel_mode)) &&
|
(TRAVEL_MODE_INACCESSIBLE == parsed_way.forward_travel_mode)) &&
|
||||||
@ -80,15 +80,15 @@ void ExtractorCallbacks::ProcessWay(const osmium::Way &input_way, ExtractionWay
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (input_way.nodes().size() <= 1)
|
if (input_way.size() <= 1)
|
||||||
{ // safe-guard against broken data
|
{ // safe-guard against broken data
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (std::numeric_limits<decltype(input_way.id())>::max() == input_way.id())
|
if (std::numeric_limits<decltype(parsed_way.id)>::max() == parsed_way.id)
|
||||||
{
|
{
|
||||||
SimpleLogger().Write(logDEBUG) << "found bogus way with id: " << input_way.id()
|
SimpleLogger().Write(logDEBUG) << "found bogus way with id: " << parsed_way.id
|
||||||
<< " of size " << input_way.nodes().size();
|
<< " of size " << input_way.size();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -96,13 +96,13 @@ void ExtractorCallbacks::ProcessWay(const osmium::Way &input_way, ExtractionWay
|
|||||||
{
|
{
|
||||||
// TODO: iterate all way segments and set duration corresponding to the length of each
|
// TODO: iterate all way segments and set duration corresponding to the length of each
|
||||||
// segment
|
// segment
|
||||||
parsed_way.forward_speed = parsed_way.duration / (input_way.nodes().size() - 1);
|
const_cast<ExtractionWay&>(parsed_way).forward_speed = parsed_way.duration / (input_way.size() - 1);
|
||||||
parsed_way.backward_speed = parsed_way.duration / (input_way.nodes().size() - 1);
|
const_cast<ExtractionWay&>(parsed_way).backward_speed = parsed_way.duration / (input_way.size() - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (std::numeric_limits<double>::epsilon() >= std::abs(-1. - parsed_way.forward_speed))
|
if (std::numeric_limits<double>::epsilon() >= std::abs(-1. - parsed_way.forward_speed))
|
||||||
{
|
{
|
||||||
SimpleLogger().Write(logDEBUG) << "found way with bogus speed, id: " << input_way.id();
|
SimpleLogger().Write(logDEBUG) << "found way with bogus speed, id: " << parsed_way.id;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -151,27 +151,27 @@ void ExtractorCallbacks::ProcessWay(const osmium::Way &input_way, ExtractionWay
|
|||||||
const bool is_opposite_way = TRAVEL_MODE_INACCESSIBLE == parsed_way.forward_travel_mode;
|
const bool is_opposite_way = TRAVEL_MODE_INACCESSIBLE == parsed_way.forward_travel_mode;
|
||||||
if (is_opposite_way)
|
if (is_opposite_way)
|
||||||
{
|
{
|
||||||
parsed_way.forward_travel_mode = parsed_way.backward_travel_mode;
|
const_cast<ExtractionWay&>(parsed_way).forward_travel_mode = parsed_way.backward_travel_mode;
|
||||||
parsed_way.backward_travel_mode = TRAVEL_MODE_INACCESSIBLE;
|
const_cast<ExtractionWay&>(parsed_way).backward_travel_mode = TRAVEL_MODE_INACCESSIBLE;
|
||||||
osrm::for_each_pair(
|
osrm::for_each_pair(
|
||||||
input_way.nodes().crbegin(), input_way.nodes().crend(), pair_wise_segment_split);
|
input_way.crbegin(), input_way.crend(), pair_wise_segment_split);
|
||||||
external_memory.used_node_id_list.push_back(input_way.nodes().front().ref());
|
external_memory.used_node_id_list.push_back(input_way.front().ref());
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
osrm::for_each_pair(
|
osrm::for_each_pair(
|
||||||
input_way.nodes().cbegin(), input_way.nodes().cend(), pair_wise_segment_split);
|
input_way.cbegin(), input_way.cend(), pair_wise_segment_split);
|
||||||
external_memory.used_node_id_list.push_back(input_way.nodes().back().ref());
|
external_memory.used_node_id_list.push_back(input_way.back().ref());
|
||||||
}
|
}
|
||||||
|
|
||||||
// The following information is needed to identify start and end segments of restrictions
|
// The following information is needed to identify start and end segments of restrictions
|
||||||
// The following information is needed to identify start and end segments of restrictions
|
// The following information is needed to identify start and end segments of restrictions
|
||||||
external_memory.way_start_end_id_list.push_back(
|
external_memory.way_start_end_id_list.push_back(
|
||||||
{(EdgeID)input_way.id(),
|
{(EdgeID)parsed_way.id,
|
||||||
(NodeID)input_way.nodes()[0].ref(),
|
(NodeID)input_way[0].ref(),
|
||||||
(NodeID)input_way.nodes()[1].ref(),
|
(NodeID)input_way[1].ref(),
|
||||||
(NodeID)input_way.nodes()[input_way.nodes().size() - 2].ref(),
|
(NodeID)input_way[input_way.size() - 2].ref(),
|
||||||
(NodeID)input_way.nodes().back().ref()});
|
(NodeID)input_way.back().ref()});
|
||||||
|
|
||||||
if (split_edge)
|
if (split_edge)
|
||||||
{ // Only true if the way should be split
|
{ // Only true if the way should be split
|
||||||
@ -198,24 +198,24 @@ void ExtractorCallbacks::ProcessWay(const osmium::Way &input_way, ExtractionWay
|
|||||||
if (is_opposite_way)
|
if (is_opposite_way)
|
||||||
{
|
{
|
||||||
// SimpleLogger().Write() << "opposite2";
|
// SimpleLogger().Write() << "opposite2";
|
||||||
osrm::for_each_pair(input_way.nodes().crbegin(),
|
osrm::for_each_pair(input_way.crbegin(),
|
||||||
input_way.nodes().crend(),
|
input_way.crend(),
|
||||||
pair_wise_segment_split_2);
|
pair_wise_segment_split_2);
|
||||||
external_memory.used_node_id_list.push_back(input_way.nodes().front().ref());
|
external_memory.used_node_id_list.push_back(input_way.front().ref());
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
osrm::for_each_pair(input_way.nodes().cbegin(),
|
osrm::for_each_pair(input_way.cbegin(),
|
||||||
input_way.nodes().cend(),
|
input_way.cend(),
|
||||||
pair_wise_segment_split_2);
|
pair_wise_segment_split_2);
|
||||||
external_memory.used_node_id_list.push_back(input_way.nodes().back().ref());
|
external_memory.used_node_id_list.push_back(input_way.back().ref());
|
||||||
}
|
}
|
||||||
|
|
||||||
external_memory.way_start_end_id_list.push_back(
|
external_memory.way_start_end_id_list.push_back(
|
||||||
{(EdgeID)input_way.id(),
|
{(EdgeID)parsed_way.id,
|
||||||
(NodeID)input_way.nodes()[1].ref(),
|
(NodeID)input_way[1].ref(),
|
||||||
(NodeID)input_way.nodes()[0].ref(),
|
(NodeID)input_way[0].ref(),
|
||||||
(NodeID)input_way.nodes().back().ref(),
|
(NodeID)input_way.back().ref(),
|
||||||
(NodeID)input_way.nodes()[input_way.nodes().size() - 2].ref()});
|
(NodeID)input_way[input_way.size() - 2].ref()});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -56,13 +56,13 @@ class ExtractorCallbacks
|
|||||||
std::unordered_map<std::string, NodeID> &string_map);
|
std::unordered_map<std::string, NodeID> &string_map);
|
||||||
|
|
||||||
// warning: caller needs to take care of synchronization!
|
// warning: caller needs to take care of synchronization!
|
||||||
void ProcessNode(const osmium::Node ¤t_node, const ExtractionNode &result_node);
|
void ProcessNode(const osmium::NodeRef ¤t_node, const ExtractionNode &result_node);
|
||||||
|
|
||||||
// warning: caller needs to take care of synchronization!
|
// warning: caller needs to take care of synchronization!
|
||||||
void ProcessRestriction(const mapbox::util::optional<InputRestrictionContainer> &restriction);
|
void ProcessRestriction(const mapbox::util::optional<InputRestrictionContainer> &restriction);
|
||||||
|
|
||||||
// warning: caller needs to take care of synchronization!
|
// warning: caller needs to take care of synchronization!
|
||||||
void ProcessWay(const osmium::Way ¤t_way, ExtractionWay &result_way);
|
void ProcessWay(const osmium::WayNodeList ¤t_way, const ExtractionWay &result_way);
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif /* EXTRACTOR_CALLBACKS_H */
|
#endif /* EXTRACTOR_CALLBACKS_H */
|
||||||
|
Loading…
Reference in New Issue
Block a user