Port Contractor to TBB

This commit is contained in:
Patrick Niklaus 2014-05-20 23:57:48 +02:00
parent 77641a9fce
commit f487845e9d
2 changed files with 120 additions and 79 deletions

View File

@ -179,6 +179,7 @@ target_link_libraries(osrm-extract ${CMAKE_THREAD_LIBS_INIT})
find_package(TBB REQUIRED) find_package(TBB REQUIRED)
target_link_libraries(osrm-extract ${TBB_LIBRARIES}) target_link_libraries(osrm-extract ${TBB_LIBRARIES})
target_link_libraries(osrm-prepare ${TBB_LIBRARIES})
include_directories(${TBB_INCLUDE_DIR}) include_directories(${TBB_INCLUDE_DIR})
find_package(Lua52) find_package(Lua52)

View File

@ -38,9 +38,13 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "../Util/OpenMPWrapper.h" #include "../Util/OpenMPWrapper.h"
#include "../Util/SimpleLogger.h" #include "../Util/SimpleLogger.h"
#include "../Util/StringUtil.h" #include "../Util/StringUtil.h"
#include "../Util/TimingUtil.h"
#include <boost/assert.hpp> #include <boost/assert.hpp>
#include <tbb/enumerable_thread_specific.h>
#include <tbb/parallel_for.h>
#include <algorithm> #include <algorithm>
#include <limits> #include <limits>
#include <vector> #include <vector>
@ -125,6 +129,28 @@ class Contractor
bool is_independent : 1; bool is_independent : 1;
}; };
struct ThreadDataContainer
{
ThreadDataContainer(int number_of_nodes) : number_of_nodes(number_of_nodes) {}
inline ContractorThreadData* getThreadData()
{
bool exists = false;
auto& ref = data.local(exists);
if (!exists)
{
ref = std::make_shared<ContractorThreadData>(number_of_nodes);
}
return ref.get();
}
int number_of_nodes;
typedef tbb::enumerable_thread_specific<std::shared_ptr<ContractorThreadData>> EnumerableThreadData;
EnumerableThreadData data;
};
public: public:
template <class ContainerT> Contractor(int nodes, ContainerT &input_edge_list) template <class ContainerT> Contractor(int nodes, ContainerT &input_edge_list)
{ {
@ -262,39 +288,51 @@ class Contractor
void Run() void Run()
{ {
// for the preperation we can use a big grain size, which is much faster (probably cache)
constexpr size_t InitGrainSize = 100000;
constexpr size_t PQGrainSize = 100000;
// auto_partitioner will automatically increase the blocksize if we have
// a lot of data. It is *important* for the last loop iterations
// (which have a very small dataset) that it is devisible.
constexpr size_t IndependentGrainSize = 1;
constexpr size_t ContractGrainSize = 1;
constexpr size_t NeighboursGrainSize = 1;
constexpr size_t DeleteGrainSize = 1;
const NodeID number_of_nodes = contractor_graph->GetNumberOfNodes(); const NodeID number_of_nodes = contractor_graph->GetNumberOfNodes();
Percent p(number_of_nodes); Percent p(number_of_nodes);
const unsigned thread_count = omp_get_max_threads(); ThreadDataContainer thread_data_list(number_of_nodes);
std::vector<ContractorThreadData *> thread_data_list;
for (unsigned thread_id = 0; thread_id < thread_count; ++thread_id)
{
thread_data_list.push_back(new ContractorThreadData(number_of_nodes));
}
std::cout << "Contractor is using " << thread_count << " threads" << std::endl;
NodeID number_of_contracted_nodes = 0; NodeID number_of_contracted_nodes = 0;
std::vector<RemainingNodeData> remaining_nodes(number_of_nodes); std::vector<RemainingNodeData> remaining_nodes(number_of_nodes);
std::vector<float> node_priorities(number_of_nodes); std::vector<float> node_priorities(number_of_nodes);
std::vector<NodePriorityData> node_data(number_of_nodes); std::vector<NodePriorityData> node_data(number_of_nodes);
// initialize priorities in parallel
#pragma omp parallel for schedule(guided) // initialize priorities in parallel
for (int x = 0; x < (int)number_of_nodes; ++x) tbb::parallel_for(tbb::blocked_range<int>(0, number_of_nodes, InitGrainSize),
[&remaining_nodes](const tbb::blocked_range<int>& range)
{
for (int x = range.begin(); x != range.end(); ++x)
{ {
remaining_nodes[x].id = x; remaining_nodes[x].id = x;
} }
}
);
std::cout << "initializing elimination PQ ..." << std::flush; std::cout << "initializing elimination PQ ..." << std::flush;
#pragma omp parallel tbb::parallel_for(tbb::blocked_range<int>(0, number_of_nodes, PQGrainSize),
[this, &node_priorities, &node_data, &thread_data_list](const tbb::blocked_range<int>& range)
{ {
ContractorThreadData *data = thread_data_list[omp_get_thread_num()]; ContractorThreadData *data = thread_data_list.getThreadData();
#pragma omp parallel for schedule(guided) for (int x = range.begin(); x != range.end(); ++x)
for (int x = 0; x < (int)number_of_nodes; ++x)
{ {
node_priorities[x] = EvaluateNodePriority(data, &node_data[x], x); node_priorities[x] = this->EvaluateNodePriority(data, &node_data[x], x);
} }
} }
);
std::cout << "ok" << std::endl << "preprocessing " << number_of_nodes << " nodes ..." std::cout << "ok" << std::endl << "preprocessing " << number_of_nodes << " nodes ..."
<< std::flush; << std::flush;
@ -309,11 +347,7 @@ class Contractor
std::cout << " [flush " << number_of_contracted_nodes << " nodes] " << std::flush; std::cout << " [flush " << number_of_contracted_nodes << " nodes] " << std::flush;
// Delete old heap data to free memory that we need for the coming operations // Delete old heap data to free memory that we need for the coming operations
for (ContractorThreadData *data : thread_data_list) thread_data_list.data.clear();
{
delete data;
}
thread_data_list.clear();
// Create new priority array // Create new priority array
std::vector<float> new_node_priority(remaining_nodes.size()); std::vector<float> new_node_priority(remaining_nodes.size());
@ -396,59 +430,67 @@ class Contractor
// INFO: MAKE SURE THIS IS THE LAST OPERATION OF THE FLUSH! // INFO: MAKE SURE THIS IS THE LAST OPERATION OF THE FLUSH!
// reinitialize heaps and ThreadData objects with appropriate size // reinitialize heaps and ThreadData objects with appropriate size
for (unsigned thread_id = 0; thread_id < thread_count; ++thread_id) thread_data_list.number_of_nodes = contractor_graph->GetNumberOfNodes();
{
thread_data_list.push_back(
new ContractorThreadData(contractor_graph->GetNumberOfNodes()));
}
} }
const int last = (int)remaining_nodes.size(); const int last = (int)remaining_nodes.size();
#pragma omp parallel tbb::parallel_for(tbb::blocked_range<int>(0, last, IndependentGrainSize),
[this, &node_priorities, &remaining_nodes, &thread_data_list](const tbb::blocked_range<int>& range)
{ {
ContractorThreadData *data = thread_data_list.getThreadData();
// determine independent node set // determine independent node set
ContractorThreadData *const data = thread_data_list[omp_get_thread_num()]; for (int i = range.begin(); i != range.end(); ++i)
#pragma omp for schedule(guided)
for (int i = 0; i < last; ++i)
{ {
const NodeID node = remaining_nodes[i].id; const NodeID node = remaining_nodes[i].id;
remaining_nodes[i].is_independent = remaining_nodes[i].is_independent =
IsNodeIndependent(node_priorities, data, node); this->IsNodeIndependent(node_priorities, data, node);
} }
} }
);
const auto first = stable_partition(remaining_nodes.begin(), const auto first = stable_partition(remaining_nodes.begin(),
remaining_nodes.end(), remaining_nodes.end(),
[](RemainingNodeData node_data) [](RemainingNodeData node_data)
{ return !node_data.is_independent; }); { return !node_data.is_independent; });
const int first_independent_node = first - remaining_nodes.begin(); const int first_independent_node = first - remaining_nodes.begin();
// contract independent nodes
#pragma omp parallel
{
ContractorThreadData *data = thread_data_list[omp_get_thread_num()];
#pragma omp for schedule(guided) nowait
for (int position = first_independent_node; position < last; ++position)
{
NodeID x = remaining_nodes[position].id;
ContractNode<false>(data, x);
}
std::sort(data->inserted_edges.begin(), data->inserted_edges.end()); // contract independent nodes
} tbb::parallel_for(tbb::blocked_range<int>(first_independent_node, last, ContractGrainSize),
#pragma omp parallel [this, &remaining_nodes, &thread_data_list](const tbb::blocked_range<int>& range)
{ {
ContractorThreadData *data = thread_data_list[omp_get_thread_num()]; ContractorThreadData *data = thread_data_list.getThreadData();
#pragma omp for schedule(guided) nowait for (int position = range.begin(); position != range.end(); ++position)
for (int position = first_independent_node; position < last; ++position)
{ {
NodeID x = remaining_nodes[position].id; NodeID x = remaining_nodes[position].id;
DeleteIncomingEdges(data, x); this->ContractNode<false>(data, x);
} }
} }
// insert new edges );
for (unsigned thread_id = 0; thread_id < thread_count; ++thread_id) // make sure we really sort each block
tbb::parallel_for(thread_data_list.data.range(),
[&](const ThreadDataContainer::EnumerableThreadData::range_type& range)
{ {
ContractorThreadData &data = *thread_data_list[thread_id]; for (auto& data : range)
for (const ContractorEdge &edge : data.inserted_edges) std::sort(data->inserted_edges.begin(),
data->inserted_edges.end());
}
);
tbb::parallel_for(tbb::blocked_range<int>(first_independent_node, last, DeleteGrainSize),
[this, &remaining_nodes, &thread_data_list](const tbb::blocked_range<int>& range)
{
ContractorThreadData *data = thread_data_list.getThreadData();
for (int position = range.begin(); position != range.end(); ++position)
{
NodeID x = remaining_nodes[position].id;
this->DeleteIncomingEdges(data, x);
}
}
);
// insert new edges
for (auto& data : thread_data_list.data)
{
for (const ContractorEdge &edge : data->inserted_edges)
{ {
auto current_edge_ID = contractor_graph->FindEdge(edge.source, edge.target); auto current_edge_ID = contractor_graph->FindEdge(edge.source, edge.target);
if (current_edge_ID < contractor_graph->EndEdges(edge.source)) if (current_edge_ID < contractor_graph->EndEdges(edge.source))
@ -466,19 +508,21 @@ class Contractor
} }
contractor_graph->InsertEdge(edge.source, edge.target, edge.data); contractor_graph->InsertEdge(edge.source, edge.target, edge.data);
} }
data.inserted_edges.clear(); data->inserted_edges.clear();
} }
// update priorities
#pragma omp parallel tbb::parallel_for(tbb::blocked_range<int>(first_independent_node, last, NeighboursGrainSize),
[this, &remaining_nodes, &node_priorities, &node_data, &thread_data_list](const tbb::blocked_range<int>& range)
{ {
ContractorThreadData *data = thread_data_list[omp_get_thread_num()]; ContractorThreadData *data = thread_data_list.getThreadData();
#pragma omp for schedule(guided) nowait for (int position = range.begin(); position != range.end(); ++position)
for (int position = first_independent_node; position < last; ++position)
{ {
NodeID x = remaining_nodes[position].id; NodeID x = remaining_nodes[position].id;
UpdateNodeNeighbours(node_priorities, node_data, data, x); this->UpdateNodeNeighbours(node_priorities, node_data, data, x);
} }
} }
);
// remove contracted nodes from the pool // remove contracted nodes from the pool
number_of_contracted_nodes += last - first_independent_node; number_of_contracted_nodes += last - first_independent_node;
remaining_nodes.resize(first_independent_node); remaining_nodes.resize(first_independent_node);
@ -510,11 +554,8 @@ class Contractor
p.printStatus(number_of_contracted_nodes); p.printStatus(number_of_contracted_nodes);
} }
for (ContractorThreadData *data : thread_data_list)
{ thread_data_list.data.clear();
delete data;
}
thread_data_list.clear();
} }
template <class Edge> inline void GetEdges(DeallocatingVector<Edge> &edges) template <class Edge> inline void GetEdges(DeallocatingVector<Edge> &edges)
@ -769,7 +810,6 @@ class Contractor
true, true,
true, true,
false); false);
;
inserted_edges.push_back(new_edge); inserted_edges.push_back(new_edge);
std::swap(new_edge.source, new_edge.target); std::swap(new_edge.source, new_edge.target);
new_edge.data.forward = false; new_edge.data.forward = false;