Duplicate edges and turns generation code

This commit is contained in:
Michael Krasnyk 2017-11-08 23:04:32 -05:00
parent 1cbfbd34cf
commit 90ff725125

View File

@ -55,6 +55,9 @@ template <> struct hash<std::pair<NodeID, NodeID>>
};
}
// Buffer size of turn_indexes_write_buffer to reduce number of write(v) syscals
const constexpr int TURN_INDEX_WRITE_BUFFER_SIZE = 1000;
namespace osrm
{
namespace extractor
@ -483,8 +486,53 @@ void EdgeBasedGraphFactory::GenerateEdgeExpandedEdges(
const NodeID node_count = m_node_based_graph.GetNumberOfNodes();
util::Percent progress(log, node_count);
// This counter is used to keep track of how far along we've made it
std::uint64_t nodes_completed = 0;
// Because we write TurnIndexBlock data as we go, we'll
// buffer them into groups of 1000 to reduce the syscall
// count by 1000x. This doesn't need much memory, but
// greatly reduces the syscall overhead of writing lots
// of small objects
std::vector<lookup::TurnIndexBlock> turn_indexes_write_buffer;
turn_indexes_write_buffer.reserve(TURN_INDEX_WRITE_BUFFER_SIZE);
// TODO: update comments block
// This struct is the buffered output of the `processor_stage`. This data is
// appended to the various output arrays/files by the `output_stage`.
// same as IntersectionData, but grouped with edge to allow sorting after creating. Edges
// can be out of order
struct EdgeWithData
{
EdgeBasedEdge edge;
lookup::TurnIndexBlock turn_index;
TurnPenalty turn_weight_penalty;
TurnPenalty turn_duration_penalty;
};
auto const transfer_data = [&](const EdgeWithData &edge_with_data) {
m_edge_based_edge_list.push_back(edge_with_data.edge);
turn_weight_penalties.push_back(edge_with_data.turn_weight_penalty);
turn_duration_penalties.push_back(edge_with_data.turn_duration_penalty);
turn_indexes_write_buffer.push_back(edge_with_data.turn_index);
};
struct EdgesPipelineBuffer
{
std::size_t nodes_processed = 0;
std::vector<EdgeWithData> continuous_data;
std::vector<EdgeWithData> delayed_data;
std::vector<Conditional> conditionals;
};
using EdgesPipelineBufferPtr = std::shared_ptr<EdgesPipelineBuffer>;
struct TurnsPipelineBuffer
{
std::size_t nodes_processed = 0;
std::vector<TurnData> continuous_turn_data;
std::vector<TurnData> delayed_turn_data;
};
using TurnsPipelineBufferPtr = std::shared_ptr<TurnsPipelineBuffer>;
// going over all nodes (which form the center of an intersection), we compute all possible
// turns along these intersections.
@ -499,7 +547,7 @@ void EdgeBasedGraphFactory::GenerateEdgeExpandedEdges(
// First part of the pipeline generates iterator ranges of IDs in sets of
// GRAINSIZE
tbb::filter_t<void, tbb::blocked_range<NodeID>> generator_stage(
tbb::filter::serial_in_order, [&](tbb::flow_control &fc) -> tbb::blocked_range<NodeID> {
tbb::filter::serial_in_order, [&](tbb::flow_control &fc) {
if (current_node < node_count)
{
auto next_node = std::min(current_node + GRAINSIZE, node_count);
@ -514,36 +562,6 @@ void EdgeBasedGraphFactory::GenerateEdgeExpandedEdges(
}
});
// This struct is the buffered output of the `processor_stage`. This data is appended to
// the various output arrays/files by the `output_stage`.
struct IntersectionData
{
std::vector<lookup::TurnIndexBlock> turn_indexes;
std::vector<EdgeBasedEdge> edges_list;
std::vector<TurnPenalty> turn_weight_penalties;
std::vector<TurnPenalty> turn_duration_penalties;
std::vector<TurnData> turn_data_container;
};
// same as IntersectionData, but grouped with edge to allow sorting after creating. Edges
// can be out of order
struct EdgeWithData
{
EdgeBasedEdge edge;
lookup::TurnIndexBlock turn_index;
TurnPenalty turn_weight_penalty;
TurnPenalty turn_duration_penalty;
TurnData turn_data;
};
struct PipelineBuffer
{
std::size_t nodes_processed = 0;
IntersectionData continuous_data;
std::vector<EdgeWithData> delayed_data;
std::vector<Conditional> conditionals;
};
// Generate edges for either artificial nodes or the main graph
const auto generate_edge = [this,
&scripting_environment,
@ -680,24 +698,20 @@ void EdgeBasedGraphFactory::GenerateEdgeExpandedEdges(
lookup::TurnIndexBlock turn_index_block = {from_node, intersection_node, to_node};
// insert data into the designated buffer
return std::make_pair(
EdgeWithData{
edge_based_edge, turn_index_block, weight_penalty, duration_penalty, turn_data},
return std::make_tuple(
EdgeWithData{edge_based_edge, turn_index_block, weight_penalty, duration_penalty},
turn_data,
conditional);
};
// Second part of the pipeline is where the intersection analysis is done for each
// intersection
tbb::filter_t<tbb::blocked_range<NodeID>, std::shared_ptr<PipelineBuffer>> processor_stage(
//
// Edge-based-graph stage
//
tbb::filter_t<tbb::blocked_range<NodeID>, EdgesPipelineBufferPtr> processor_stage(
tbb::filter::parallel, [&](const tbb::blocked_range<NodeID> &intersection_node_range) {
auto buffer = std::make_shared<PipelineBuffer>();
buffer->nodes_processed =
intersection_node_range.end() - intersection_node_range.begin();
// If we get fed a 0-length range for some reason, we can just return right away
if (buffer->nodes_processed == 0)
return buffer;
auto buffer = std::make_shared<EdgesPipelineBuffer>();
buffer->nodes_processed = intersection_node_range.size();
for (auto intersection_node = intersection_node_range.begin(),
end = intersection_node_range.end();
@ -916,20 +930,12 @@ void EdgeBasedGraphFactory::GenerateEdgeExpandedEdges(
entry_class_id,
edge_geometries);
buffer->continuous_data.edges_list.push_back(
edge_with_data_and_condition.first.edge);
buffer->continuous_data.turn_indexes.push_back(
edge_with_data_and_condition.first.turn_index);
buffer->continuous_data.turn_weight_penalties.push_back(
edge_with_data_and_condition.first.turn_weight_penalty);
buffer->continuous_data.turn_duration_penalties.push_back(
edge_with_data_and_condition.first.turn_duration_penalty);
buffer->continuous_data.turn_data_container.push_back(
edge_with_data_and_condition.first.turn_data);
if (edge_with_data_and_condition.second)
buffer->continuous_data.push_back(
std::get<0>(edge_with_data_and_condition));
if (std::get<2>(edge_with_data_and_condition))
{
buffer->conditionals.push_back(
*edge_with_data_and_condition.second);
*std::get<2>(edge_with_data_and_condition));
}
}
@ -982,12 +988,11 @@ void EdgeBasedGraphFactory::GenerateEdgeExpandedEdges(
edge_geometries);
buffer->delayed_data.push_back(
std::move(edge_with_data_and_condition.first));
if (edge_with_data_and_condition.second)
std::get<0>(edge_with_data_and_condition));
if (std::get<2>(edge_with_data_and_condition))
{
buffer->conditionals.push_back(
*edge_with_data_and_condition.second);
*std::get<2>(edge_with_data_and_condition));
}
// also add the conditions for the way
@ -1019,12 +1024,11 @@ void EdgeBasedGraphFactory::GenerateEdgeExpandedEdges(
edge_geometries);
buffer->delayed_data.push_back(
std::move(edge_with_data_and_condition.first));
if (edge_with_data_and_condition.second)
std::get<0>(edge_with_data_and_condition));
if (std::get<2>(edge_with_data_and_condition))
{
buffer->conditionals.push_back(
*edge_with_data_and_condition.second);
*std::get<2>(edge_with_data_and_condition));
}
}
}
@ -1036,42 +1040,334 @@ void EdgeBasedGraphFactory::GenerateEdgeExpandedEdges(
return buffer;
});
// Because we write TurnIndexBlock data as we go, we'll buffer them into groups of 1000 to
// reduce the syscall count by 1000x. This doesn't need much memory, but greatly reduces
// the syscall overhead of writing lots of small objects
const constexpr int TURN_INDEX_WRITE_BUFFER_SIZE = 1000;
std::vector<lookup::TurnIndexBlock> turn_indexes_write_buffer;
turn_indexes_write_buffer.reserve(TURN_INDEX_WRITE_BUFFER_SIZE);
//
// Guidance stage
//
tbb::filter_t<tbb::blocked_range<NodeID>, TurnsPipelineBufferPtr> guidance_stage(
tbb::filter::parallel, [&](const tbb::blocked_range<NodeID> &intersection_node_range) {
std::vector<EdgeWithData> delayed_data;
auto buffer = std::make_shared<TurnsPipelineBuffer>();
buffer->nodes_processed = intersection_node_range.size();
for (auto intersection_node = intersection_node_range.begin(),
end = intersection_node_range.end();
intersection_node < end;
++intersection_node)
{
// We capture the thread-local work in these objects, then flush
// them in a controlled manner at the end of the parallel range
const auto &incoming_edges =
intersection::getIncomingEdges(m_node_based_graph, intersection_node);
const auto &outgoing_edges =
intersection::getOutgoingEdges(m_node_based_graph, intersection_node);
const auto &edge_geometries_and_merged_edges =
intersection::getIntersectionGeometries(m_node_based_graph,
m_compressed_edge_container,
m_coordinates,
mergable_road_detector,
intersection_node);
const auto &edge_geometries = edge_geometries_and_merged_edges.first;
const auto &merged_edge_ids = edge_geometries_and_merged_edges.second;
// all nodes in the graph are connected in both directions. We check all
// outgoing nodes to find the incoming edge. This is a larger search overhead,
// but the cost we need to pay to generate edges here is worth the additional
// search overhead.
//
// a -> b <-> c
// |
// v
// d
//
// will have:
// a: b,rev=0
// b: a,rev=1 c,rev=0 d,rev=0
// c: b,rev=0
//
// From the flags alone, we cannot determine which nodes are connected to
// `b` by an outgoing edge. Therefore, we have to search all connected edges for
// edges entering `b`
for (const auto &incoming_edge : incoming_edges)
{
++node_based_edge_counter;
const auto intersection_view =
convertToIntersectionView(m_node_based_graph,
m_edge_based_node_container,
node_restriction_map,
m_barrier_nodes,
edge_geometries,
turn_lanes_data,
incoming_edge,
outgoing_edges,
merged_edge_ids);
auto intersection = turn_analysis.AssignTurnTypes(
incoming_edge.node, incoming_edge.edge, intersection_view);
OSRM_ASSERT(intersection.valid(), m_coordinates[intersection_node]);
intersection = turn_lane_handler.assignTurnLanes(
incoming_edge.node, incoming_edge.edge, std::move(intersection));
// the entry class depends on the turn, so we have to classify the
// interesction for every edge
const auto turn_classification =
classifyIntersection(intersection, m_coordinates[intersection_node]);
const auto entry_class_id =
entry_class_hash.ConcurrentFindOrAdd(turn_classification.first);
const auto bearing_class_id =
bearing_class_hash.ConcurrentFindOrAdd(turn_classification.second);
// Note - this is strictly speaking not thread safe, but we know we
// should never be touching the same element twice, so we should
// be fine.
bearing_class_by_node_based_node[intersection_node] = bearing_class_id;
// check if we are turning off a via way
const auto turning_off_via_way =
way_restriction_map.IsViaWay(incoming_edge.node, intersection_node);
// Save reversed incoming bearing to compute turn angles
const auto reversed_incoming_bearing = util::bearing::reverse(
findEdgeBearing(edge_geometries, incoming_edge.edge));
for (const auto &outgoing_edge : outgoing_edges)
{
if (!intersection::isTurnAllowed(m_node_based_graph,
m_edge_based_node_container,
node_restriction_map,
m_barrier_nodes,
edge_geometries,
turn_lanes_data,
incoming_edge,
outgoing_edge))
continue;
const auto turn =
std::find_if(intersection.begin(),
intersection.end(),
[edge = outgoing_edge.edge](const auto &road) {
return road.eid == edge;
});
BOOST_ASSERT(turn != intersection.end());
std::vector<ExtractionTurnLeg> road_legs_on_the_right;
std::vector<ExtractionTurnLeg> road_legs_on_the_left;
auto get_connected_road_info = [&](const auto &connected_edge) {
const auto &edge_data =
m_node_based_graph.GetEdgeData(connected_edge.eid);
return ExtractionTurnLeg(
edge_data.flags.restricted,
edge_data.flags.road_classification.IsMotorwayClass(),
edge_data.flags.road_classification.IsLinkClass(),
edge_data.flags.road_classification.GetNumberOfLanes(),
edge_data.flags.highway_turn_classification,
edge_data.flags.access_turn_classification,
((double)intersection::findEdgeLength(edge_geometries,
connected_edge.eid) /
edge_data.duration) *
36,
!connected_edge.entry_allowed ||
(edge_data.flags.forward &&
edge_data.flags.backward), // is incoming
connected_edge.entry_allowed);
};
// all connected roads on the right of a u turn
if (turn->instruction.IsUTurn())
{
if (turn != intersection.begin())
{
std::transform(intersection.begin() + 1,
turn,
std::back_inserter(road_legs_on_the_right),
get_connected_road_info);
}
std::transform(turn + 1,
intersection.end(),
std::back_inserter(road_legs_on_the_right),
get_connected_road_info);
}
else
{
if (intersection.begin() != turn)
{
std::transform(intersection.begin() + 1,
turn,
std::back_inserter(road_legs_on_the_right),
get_connected_road_info);
}
std::transform(turn + 1,
intersection.end(),
std::back_inserter(road_legs_on_the_left),
get_connected_road_info);
}
if (turn->instruction.IsUTurn() && turn != intersection.begin())
{
util::Log(logWARNING)
<< "Turn is a u turn but not turning to the first connected "
"edge of the intersection. Node ID: "
<< intersection_node << ", OSM link: "
<< m_coordinates[intersection_node].toOSMLink();
}
else if (turn == intersection.begin() && !turn->instruction.IsUTurn())
{
util::Log(logWARNING)
<< "Turn is a u turn but not classified as a u turn. Node ID: "
<< intersection_node << ", OSM link: "
<< m_coordinates[intersection_node].toOSMLink();
}
// In case a way restriction starts at a given location, add a turn onto
// every artificial node eminating here.
//
// e - f
// |
// a - b
// |
// c - d
//
// ab via bc to cd
// ab via be to ef
//
// has two artifical nodes (be/bc) with restrictions starting at `ab`.
// Since every restriction group (abc | abe) refers to the same
// artificial node, we simply have to find a single representative for
// the turn. Here we check whether the turn in question is the start of
// a via way restriction. If that should be the case, we switch
// the id of the edge-based-node for the target to the ID of the
// duplicated node associated with the turn. (e.g. ab via bc switches bc
// to bc_dup)
auto const target_id = way_restriction_map.RemapIfRestricted(
nbe_to_ebn_mapping[outgoing_edge.edge],
incoming_edge.node,
outgoing_edge.node,
m_node_based_graph.GetTarget(outgoing_edge.edge),
m_number_of_edge_based_nodes);
{ // scope to forget edge_with_data after
const auto edge_with_data_and_condition =
generate_edge(nbe_to_ebn_mapping[incoming_edge.edge],
target_id,
incoming_edge.node,
incoming_edge.edge,
outgoing_edge.node,
outgoing_edge.edge,
reversed_incoming_bearing,
*turn,
road_legs_on_the_right,
road_legs_on_the_left,
entry_class_id,
edge_geometries);
buffer->continuous_turn_data.push_back(
std::get<1>(edge_with_data_and_condition));
}
// when turning off a a via-way turn restriction, we need to not only
// handle the normal edges for the way, but also add turns for every
// duplicated node. This process is integrated here to avoid doing the
// turn analysis multiple times.
if (turning_off_via_way)
{
const auto duplicated_nodes = way_restriction_map.DuplicatedNodeIDs(
incoming_edge.node, intersection_node);
// next to the normal restrictions tracked in `entry_allowed`, via
// ways might introduce additional restrictions. These are handled
// here when turning off a via-way
for (auto duplicated_node_id : duplicated_nodes)
{
const auto from_id =
NodeID(m_number_of_edge_based_nodes -
way_restriction_map.NumberOfDuplicatedNodes() +
duplicated_node_id);
auto const node_at_end_of_turn =
m_node_based_graph.GetTarget(outgoing_edge.edge);
const auto is_way_restricted = way_restriction_map.IsRestricted(
duplicated_node_id, node_at_end_of_turn);
if (is_way_restricted)
{
auto const restriction = way_restriction_map.GetRestriction(
duplicated_node_id, node_at_end_of_turn);
if (restriction.condition.empty())
continue;
// add into delayed data
auto edge_with_data_and_condition =
generate_edge(from_id,
nbe_to_ebn_mapping[outgoing_edge.edge],
incoming_edge.node,
incoming_edge.edge,
outgoing_edge.node,
outgoing_edge.edge,
reversed_incoming_bearing,
*turn,
road_legs_on_the_right,
road_legs_on_the_left,
entry_class_id,
edge_geometries);
buffer->delayed_turn_data.push_back(
std::get<1>(edge_with_data_and_condition));
}
else
{
auto edge_with_data_and_condition =
generate_edge(from_id,
nbe_to_ebn_mapping[outgoing_edge.edge],
incoming_edge.node,
incoming_edge.edge,
outgoing_edge.node,
outgoing_edge.edge,
reversed_incoming_bearing,
*turn,
road_legs_on_the_right,
road_legs_on_the_left,
entry_class_id,
edge_geometries);
buffer->delayed_turn_data.push_back(
std::get<1>(edge_with_data_and_condition));
}
}
}
}
}
}
return buffer;
});
// Last part of the pipeline puts all the calculated data into the serial buffers
tbb::filter_t<std::shared_ptr<PipelineBuffer>, void> output_stage(
tbb::filter::serial_in_order, [&](const std::shared_ptr<PipelineBuffer> buffer) {
nodes_completed += buffer->nodes_processed;
progress.PrintStatus(nodes_completed);
std::uint64_t routing_nodes_completed = 0;
std::vector<EdgeWithData> delayed_data;
tbb::filter_t<EdgesPipelineBufferPtr, void> output_stage(
tbb::filter::serial_in_order, [&](auto buffer) {
// for readability
const auto &data = buffer->continuous_data;
// NOTE: potential overflow here if we hit 2^32 routable edges
m_edge_based_edge_list.append(data.edges_list.begin(), data.edges_list.end());
BOOST_ASSERT(m_edge_based_edge_list.size() <= std::numeric_limits<NodeID>::max());
turn_weight_penalties.insert(turn_weight_penalties.end(),
data.turn_weight_penalties.begin(),
data.turn_weight_penalties.end());
turn_duration_penalties.insert(turn_duration_penalties.end(),
data.turn_duration_penalties.begin(),
data.turn_duration_penalties.end());
turn_data_container.append(data.turn_data_container);
turn_indexes_write_buffer.insert(turn_indexes_write_buffer.end(),
data.turn_indexes.begin(),
data.turn_indexes.end());
routing_nodes_completed += buffer->nodes_processed;
progress.PrintStatus(routing_nodes_completed);
// Copy data from local buffers into global
// EBG data
std::for_each(
buffer->continuous_data.begin(), buffer->continuous_data.end(), transfer_data);
conditionals.insert(
conditionals.end(), buffer->conditionals.begin(), buffer->conditionals.end());
// NOTE: potential overflow here if we hit 2^32 routable edges
BOOST_ASSERT(m_edge_based_edge_list.size() <= std::numeric_limits<NodeID>::max());
// Buffer writes to reduce syscall count
if (turn_indexes_write_buffer.size() >= TURN_INDEX_WRITE_BUFFER_SIZE)
{
@ -1080,29 +1376,62 @@ void EdgeBasedGraphFactory::GenerateEdgeExpandedEdges(
turn_indexes_write_buffer.clear();
}
// Copy via-way restrictions delayed data
delayed_data.insert(
delayed_data.end(), buffer->delayed_data.begin(), buffer->delayed_data.end());
});
// Now, execute the pipeline. The value of "5" here was chosen by experimentation on a
// 16-CPU machine and seemed to give the best performance. This value needs to be balanced
// with the GRAINSIZE above - ideally, the pipeline puts as much work as possible in the
// `intersection_handler` step so that those parallel workers don't get blocked too much by
// the slower (io-performing) `buffer_storage`
// Last part of the pipeline puts all the calculated data into the serial buffers
std::uint64_t guidance_nodes_completed = 0;
std::vector<TurnData> delayed_turn_data;
tbb::filter_t<TurnsPipelineBufferPtr, void> guidance_output_stage(
tbb::filter::serial_in_order, [&](auto buffer) {
guidance_nodes_completed += buffer->nodes_processed;
progress.PrintStatus(guidance_nodes_completed);
// Guidance data
std::for_each(buffer->continuous_turn_data.begin(),
buffer->continuous_turn_data.end(),
[&turn_data_container](const auto &turn_data) {
turn_data_container.push_back(turn_data);
});
// Copy via-way restrictions delayed data
delayed_turn_data.insert(delayed_turn_data.end(),
buffer->delayed_turn_data.begin(),
buffer->delayed_turn_data.end());
});
// Now, execute the pipeline. The value of "5" here was chosen by experimentation
// on a 16-CPU machine and seemed to give the best performance. This value needs
// to be balanced with the GRAINSIZE above - ideally, the pipeline puts as much work
// as possible in the `intersection_handler` step so that those parallel workers
// don't
// get blocked too much by the slower (io-performing) `buffer_storage`
current_node = 0;
tbb::parallel_pipeline(tbb::task_scheduler_init::default_num_threads() * 5,
generator_stage & processor_stage & output_stage);
std::sort(delayed_data.begin(), delayed_data.end(), [](auto const &lhs, auto const &rhs) {
return lhs.edge.source < rhs.edge.source;
});
auto const transfer_data = [&](auto const &edge_with_data) {
m_edge_based_edge_list.push_back(edge_with_data.edge);
turn_weight_penalties.push_back(edge_with_data.turn_weight_penalty);
turn_duration_penalties.push_back(edge_with_data.turn_duration_penalty);
turn_data_container.push_back(edge_with_data.turn_data);
turn_indexes_write_buffer.push_back(edge_with_data.turn_index);
};
current_node = 0;
tbb::parallel_pipeline(tbb::task_scheduler_init::default_num_threads() * 5,
generator_stage & guidance_stage & guidance_output_stage);
// TODO: remove sorting below
// std::sort(delayed_data.begin(), delayed_data.end(), [](auto const &lhs, auto
// const &rhs)
// {
// return lhs.edge.source < rhs.edge.source;
// });
// NOTE: buffer.delayed_data and buffer.delayed_turn_data have the same index
std::for_each(delayed_data.begin(), delayed_data.end(), transfer_data);
std::for_each(delayed_turn_data.begin(),
delayed_turn_data.end(),
[&turn_data_container](const auto &turn_data) {
turn_data_container.push_back(turn_data);
});
// Flush the turn_indexes_write_buffer if it's not empty
if (!turn_indexes_write_buffer.empty())