Merge branch 'thread-control' of https://github.com/TheMarex/Project-OSRM into TheMarex-thread-control

This commit is contained in:
Dennis Luxen 2014-06-15 12:35:15 +02:00
commit 0e1f6f50ea
4 changed files with 43 additions and 15 deletions

View File

@ -40,6 +40,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "../typedefs.h" #include "../typedefs.h"
#include <tbb/parallel_for.h> #include <tbb/parallel_for.h>
#include <tbb/task_scheduler_init.h>
#include <osrm/Coordinate.h> #include <osrm/Coordinate.h>
@ -51,9 +52,19 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
PBFParser::PBFParser(const char *fileName, PBFParser::PBFParser(const char *fileName,
ExtractorCallbacks *extractor_callbacks, ExtractorCallbacks *extractor_callbacks,
ScriptingEnvironment &scripting_environment) ScriptingEnvironment &scripting_environment,
unsigned num_threads)
: BaseParser(extractor_callbacks, scripting_environment) : BaseParser(extractor_callbacks, scripting_environment)
{ {
if (0 == num_threads)
{
num_parser_threads = tbb::task_scheduler_init::default_num_threads();
}
else
{
num_parser_threads = num_threads;
}
GOOGLE_PROTOBUF_VERIFY_VERSION; GOOGLE_PROTOBUF_VERIFY_VERSION;
// TODO: What is the bottleneck here? Filling the queue or reading the stuff from disk? // TODO: What is the bottleneck here? Filling the queue or reading the stuff from disk?
// NOTE: With Lua scripting, it is parsing the stuff. I/O is virtually for free. // NOTE: With Lua scripting, it is parsing the stuff. I/O is virtually for free.
@ -160,6 +171,8 @@ inline void PBFParser::ReadData()
inline void PBFParser::ParseData() inline void PBFParser::ParseData()
{ {
tbb::task_scheduler_init init(num_parser_threads);
while (true) while (true)
{ {
ParserThreadData *thread_data; ParserThreadData *thread_data;

View File

@ -63,7 +63,10 @@ class PBFParser : public BaseParser
}; };
public: public:
PBFParser(const char *file_name, ExtractorCallbacks *extractor_callbacks, ScriptingEnvironment &scripting_environment); PBFParser(const char *file_name,
ExtractorCallbacks *extractor_callbacks,
ScriptingEnvironment &scripting_environment,
unsigned num_parser_threads=0);
virtual ~PBFParser(); virtual ~PBFParser();
inline bool ReadHeader(); inline bool ReadHeader();
@ -94,6 +97,7 @@ class PBFParser : public BaseParser
std::fstream input; // the input stream to parse std::fstream input; // the input stream to parse
std::shared_ptr<ConcurrentQueue<ParserThreadData *>> thread_data_queue; std::shared_ptr<ConcurrentQueue<ParserThreadData *>> thread_data_queue;
unsigned num_parser_threads;
}; };
#endif /* PBFPARSER_H_ */ #endif /* PBFPARSER_H_ */

View File

@ -81,7 +81,7 @@ int main(int argc, char *argv[])
&profile_path)->default_value("profile.lua"), &profile_path)->default_value("profile.lua"),
"Path to LUA routing profile")( "Path to LUA routing profile")(
"threads,t", "threads,t",
boost::program_options::value<unsigned int>(&requested_num_threads)->default_value(8), boost::program_options::value<unsigned int>(&requested_num_threads)->default_value(tbb::task_scheduler_init::default_num_threads()),
"Number of threads to use"); "Number of threads to use");
// hidden options, will be allowed both on command line and in config file, but will not be // hidden options, will be allowed both on command line and in config file, but will not be
@ -166,15 +166,19 @@ int main(int argc, char *argv[])
return 1; return 1;
} }
unsigned hardware_threads = std::max(1u, std::thread::hardware_concurrency()); const unsigned recommended_num_threads = tbb::task_scheduler_init::default_num_threads();
unsigned real_num_threads = std::min(hardware_threads, requested_num_threads);
SimpleLogger().Write() << "Input file: " << input_path.filename().string(); SimpleLogger().Write() << "Input file: " << input_path.filename().string();
SimpleLogger().Write() << "Profile: " << profile_path.filename().string(); SimpleLogger().Write() << "Profile: " << profile_path.filename().string();
SimpleLogger().Write() << "Threads: " << real_num_threads << " (requested " SimpleLogger().Write() << "Threads: " << requested_num_threads;
<< requested_num_threads << ")"; if (recommended_num_threads != requested_num_threads)
{
SimpleLogger().Write(logWARNING) << "The recommended number of threads is "
<< recommended_num_threads
<< "! This setting may have performance side-effects.";
}
tbb::task_scheduler_init init(real_num_threads); tbb::task_scheduler_init init(requested_num_threads);
/*** Setup Scripting Environment ***/ /*** Setup Scripting Environment ***/
ScriptingEnvironment scripting_environment(profile_path.string().c_str()); ScriptingEnvironment scripting_environment(profile_path.string().c_str());
@ -229,7 +233,10 @@ int main(int argc, char *argv[])
BaseParser *parser; BaseParser *parser;
if (file_has_pbf_format) if (file_has_pbf_format)
{ {
parser = new PBFParser(input_path.string().c_str(), extractor_callbacks, scripting_environment); parser = new PBFParser(input_path.string().c_str(),
extractor_callbacks,
scripting_environment,
requested_num_threads);
} }
else else
{ {

View File

@ -99,7 +99,7 @@ int main(int argc, char *argv[])
->default_value("profile.lua"), ->default_value("profile.lua"),
"Path to LUA routing profile")( "Path to LUA routing profile")(
"threads,t", "threads,t",
boost::program_options::value<unsigned int>(&requested_num_threads)->default_value(8), boost::program_options::value<unsigned int>(&requested_num_threads)->default_value(tbb::task_scheduler_init::default_num_threads()),
"Number of threads to use"); "Number of threads to use");
// hidden options, will be allowed both on command line and in config file, but will not be // hidden options, will be allowed both on command line and in config file, but will not be
@ -178,16 +178,20 @@ int main(int argc, char *argv[])
return 1; return 1;
} }
unsigned int hardware_threads = std::max((unsigned int) 1, std::thread::hardware_concurrency()); const unsigned recommended_num_threads = tbb::task_scheduler_init::default_num_threads();
unsigned int real_num_threads = std::min(hardware_threads, requested_num_threads);
SimpleLogger().Write() << "Input file: " << input_path.filename().string(); SimpleLogger().Write() << "Input file: " << input_path.filename().string();
SimpleLogger().Write() << "Restrictions file: " << restrictions_path.filename().string(); SimpleLogger().Write() << "Restrictions file: " << restrictions_path.filename().string();
SimpleLogger().Write() << "Profile: " << profile_path.filename().string(); SimpleLogger().Write() << "Profile: " << profile_path.filename().string();
SimpleLogger().Write() << "Threads: " << real_num_threads << " (requested " SimpleLogger().Write() << "Threads: " << requested_num_threads;
<< requested_num_threads << ")"; if (recommended_num_threads != requested_num_threads)
{
SimpleLogger().Write(logWARNING) << "The recommended number of threads is "
<< recommended_num_threads
<< "! This setting may have performance side-effects.";
}
tbb::task_scheduler_init init(real_num_threads); tbb::task_scheduler_init init(requested_num_threads);
LogPolicy::GetInstance().Unmute(); LogPolicy::GetInstance().Unmute();
boost::filesystem::ifstream restriction_stream(restrictions_path, std::ios::binary); boost::filesystem::ifstream restriction_stream(restrictions_path, std::ios::binary);