This commit is contained in:
Siarhei Fedartsou 2022-11-26 22:31:13 +01:00
parent 14d3d50263
commit e6fcf465b7
14 changed files with 141 additions and 98 deletions

View File

@ -508,6 +508,9 @@ if(ENABLE_CONAN)
onetbb:shared=${TBB_SHARED} onetbb:shared=${TBB_SHARED}
boost:without_stacktrace=True # Apple Silicon cross-compilation fails without it boost:without_stacktrace=True # Apple Silicon cross-compilation fails without it
arrow:parquet=True arrow:parquet=True
arrow:with_snappy=True
arrow:with_brotli=True
arrow:with_zlib=True
BUILD missing BUILD missing
) )
# explicitly say Conan to use x86 dependencies if build for x86 platforms (https://github.com/conan-io/cmake-conan/issues/141) # explicitly say Conan to use x86 dependencies if build for x86 platforms (https://github.com/conan-io/cmake-conan/issues/141)
@ -569,6 +572,9 @@ else()
add_dependency_includes(${TBB_INCLUDE_DIR}) add_dependency_includes(${TBB_INCLUDE_DIR})
set(TBB_LIBRARIES TBB::tbb) set(TBB_LIBRARIES TBB::tbb)
find_package(Parquet REQUIRED)
set(ARROW_LIBRARIES Parquet::parquet_shared)
find_package(EXPAT REQUIRED) find_package(EXPAT REQUIRED)
add_dependency_includes(${EXPAT_INCLUDE_DIRS}) add_dependency_includes(${EXPAT_INCLUDE_DIRS})

View File

@ -1,9 +1,9 @@
#ifndef OSRM_UPDATER_CSV_FILE_PARSER_HPP #ifndef OSRM_UPDATER_CSV_FILE_PARSER_HPP
#define OSRM_UPDATER_CSV_FILE_PARSER_HPP #define OSRM_UPDATER_CSV_FILE_PARSER_HPP
#include "file_parser.hpp" #include "file_parser.hpp"
#include <arrow/io/file.h>
#include <parquet/arrow/reader.h> #include <parquet/arrow/reader.h>
#include <parquet/stream_reader.h> #include <parquet/stream_reader.h>
#include <arrow/io/file.h>
#include "updater/source.hpp" #include "updater/source.hpp"
@ -47,7 +47,8 @@ template <typename Key, typename Value> struct CSVFilesParser : public FilesPars
private: private:
// Parse a single CSV file and return result as a vector<Key, Value> // Parse a single CSV file and return result as a vector<Key, Value>
std::vector<std::pair<Key, Value>> ParseFile(const std::string &filename, std::size_t file_id) const final std::vector<std::pair<Key, Value>> ParseFile(const std::string &filename,
std::size_t file_id) const final
{ {
namespace qi = boost::spirit::qi; namespace qi = boost::spirit::qi;
@ -90,7 +91,7 @@ template <typename Key, typename Value> struct CSVFilesParser : public FilesPars
throw util::exception(message.str() + SOURCE_REF); throw util::exception(message.str() + SOURCE_REF);
} }
} }
const KeyRule key_rule; const KeyRule key_rule;
const ValueRule value_rule; const ValueRule value_rule;
}; };

View File

@ -10,8 +10,10 @@ namespace updater
{ {
namespace data namespace data
{ {
SegmentLookupTable readSegmentValues(const std::vector<std::string> &paths, SpeedAndTurnPenaltyFormat format); SegmentLookupTable readSegmentValues(const std::vector<std::string> &paths,
TurnLookupTable readTurnValues(const std::vector<std::string> &paths, SpeedAndTurnPenaltyFormat format); SpeedAndTurnPenaltyFormat format);
TurnLookupTable readTurnValues(const std::vector<std::string> &paths,
SpeedAndTurnPenaltyFormat format);
} // namespace data } // namespace data
} // namespace updater } // namespace updater
} // namespace osrm } // namespace osrm

View File

@ -1,8 +1,8 @@
#ifndef OSRM_UPDATER_FILE_PARSER_HPP #ifndef OSRM_UPDATER_FILE_PARSER_HPP
#define OSRM_UPDATER_FILE_PARSER_HPP #define OSRM_UPDATER_FILE_PARSER_HPP
#include <arrow/io/file.h>
#include <parquet/arrow/reader.h> #include <parquet/arrow/reader.h>
#include <parquet/stream_reader.h> #include <parquet/stream_reader.h>
#include <arrow/io/file.h>
#include "updater/source.hpp" #include "updater/source.hpp"
@ -88,7 +88,8 @@ template <typename Key, typename Value> struct FilesParser
protected: protected:
// Parse a single CSV file and return result as a vector<Key, Value> // Parse a single CSV file and return result as a vector<Key, Value>
virtual std::vector<std::pair<Key, Value>> ParseFile(const std::string &filename, std::size_t file_id) const; virtual std::vector<std::pair<Key, Value>> ParseFile(const std::string &filename,
std::size_t file_id) const;
}; };
} // namespace updater } // namespace updater
} // namespace osrm } // namespace osrm

View File

@ -1,11 +1,11 @@
#ifndef OSRM_UPDATER_PARQUET_FILE_PARSER_HPP #ifndef OSRM_UPDATER_PARQUET_FILE_PARSER_HPP
#define OSRM_UPDATER_PARQUET_FILE_PARSER_HPP #define OSRM_UPDATER_PARQUET_FILE_PARSER_HPP
#include "file_parser.hpp" #include "file_parser.hpp"
#include <arrow/io/file.h>
#include <optional> #include <optional>
#include <parquet/arrow/reader.h> #include <parquet/arrow/reader.h>
#include <parquet/exception.h> #include <parquet/exception.h>
#include <parquet/stream_reader.h> #include <parquet/stream_reader.h>
#include <arrow/io/file.h>
#include "updater/source.hpp" #include "updater/source.hpp"
@ -31,24 +31,26 @@ template <typename Key, typename Value> struct ParquetFilesParser : public Files
{ {
private: private:
// Parse a single Parquet file and return result as a vector<Key, Value> // Parse a single Parquet file and return result as a vector<Key, Value>
std::vector<std::pair<Key, Value>> ParseFile(const std::string &filename, std::size_t file_id) const final std::vector<std::pair<Key, Value>> ParseFile(const std::string &filename,
std::size_t file_id) const final
{ {
try { try
{
std::shared_ptr<arrow::io::ReadableFile> infile; std::shared_ptr<arrow::io::ReadableFile> infile;
PARQUET_ASSIGN_OR_THROW( PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open(filename));
infile,
arrow::io::ReadableFile::Open(filename));
parquet::StreamReader os{parquet::ParquetFileReader::Open(infile)}; parquet::StreamReader os{parquet::ParquetFileReader::Open(infile)};
std::vector<std::pair<Key, Value>> result; std::vector<std::pair<Key, Value>> result;
while ( !os.eof() ) while (!os.eof())
{ {
result.emplace_back(ReadKeyValue(os, file_id)); result.emplace_back(ReadKeyValue(os, file_id));
} }
return result; return result;
} catch (const std::exception &e) { }
catch (const std::exception &e)
{
throw util::exception(e.what() + SOURCE_REF); throw util::exception(e.what() + SOURCE_REF);
} }
} }
@ -64,38 +66,31 @@ template <typename Key, typename Value> struct ParquetFilesParser : public Files
return {key, value}; return {key, value};
} }
void Read(parquet::StreamReader &os, Turn& turn) const { void Read(parquet::StreamReader &os, Turn &turn) const
int64_t from, via, to; {
os >> from >> via >> to; os >> turn.from >> turn.via >> turn.to;
turn.from = from;
turn.via = via;
turn.to = to;
} }
void Read(parquet::StreamReader &os, PenaltySource& penalty_source) const { void Read(parquet::StreamReader &os, PenaltySource &penalty_source) const
{
os >> penalty_source.duration >> penalty_source.weight; os >> penalty_source.duration >> penalty_source.weight;
} }
void Read(parquet::StreamReader &os, Segment& segment) const { void Read(parquet::StreamReader &os, Segment &segment) const
int64_t from; {
int64_t to; os >> segment.from >> segment.to;
os >> from >> to;
segment.from = from;
segment.to = to;
//std::cerr << from << " " << to<< std::endl;
//os >> segment.from >> segment.to >> parquet::EndRow;
} }
void Read(parquet::StreamReader &os, SpeedSource& speed_source) const { void Read(parquet::StreamReader &os, SpeedSource &speed_source) const
{
std::optional<double> rate; std::optional<double> rate;
os >> speed_source.speed >> rate; os >> speed_source.speed >> rate;
// TODO: boost::optional // TODO: boost::optional
if (rate) { if (rate)
{
speed_source.rate = *rate; speed_source.rate = *rate;
} }
} }
}; };
} // namespace updater } // namespace updater
} // namespace osrm } // namespace osrm

View File

@ -42,11 +42,11 @@ namespace osrm
namespace updater namespace updater
{ {
enum class SpeedAndTurnPenaltyFormat
enum class SpeedAndTurnPenaltyFormat { {
CSV, CSV,
PARQUET PARQUET
}; };
struct UpdaterConfig final : storage::IOConfig struct UpdaterConfig final : storage::IOConfig
{ {
@ -76,45 +76,51 @@ struct UpdaterConfig final : storage::IOConfig
double log_edge_updates_factor = 0.0; double log_edge_updates_factor = 0.0;
std::time_t valid_now; std::time_t valid_now;
SpeedAndTurnPenaltyFormat speed_and_turn_penalty_format = SpeedAndTurnPenaltyFormat::CSV; SpeedAndTurnPenaltyFormat speed_and_turn_penalty_format = SpeedAndTurnPenaltyFormat::CSV;
std::vector<std::string> segment_speed_lookup_paths; std::vector<std::string> segment_speed_lookup_paths;
std::vector<std::string> turn_penalty_lookup_paths; std::vector<std::string> turn_penalty_lookup_paths;
std::string tz_file_path; std::string tz_file_path;
}; };
inline std::istream &operator>>(std::istream &in, SpeedAndTurnPenaltyFormat &format)
inline std::istream& operator>> (std::istream &in, SpeedAndTurnPenaltyFormat& format) { {
std::string token; std::string token;
in >> token; in >> token;
std::transform(token.begin(), token.end(), token.begin(), [](auto c){ return std::tolower(c); }); std::transform(
token.begin(), token.end(), token.begin(), [](auto c) { return std::tolower(c); });
if (token == "csv") { if (token == "csv")
{
format = SpeedAndTurnPenaltyFormat::CSV; format = SpeedAndTurnPenaltyFormat::CSV;
} else if (token == "parquet") { }
else if (token == "parquet")
{
format = SpeedAndTurnPenaltyFormat::PARQUET; format = SpeedAndTurnPenaltyFormat::PARQUET;
} else { }
throw boost::program_options::validation_error{boost::program_options::validation_error::invalid_option_value}; else
{
throw boost::program_options::validation_error{
boost::program_options::validation_error::invalid_option_value};
} }
return in; return in;
} }
inline std::ostream &operator<<(std::ostream &out, SpeedAndTurnPenaltyFormat format)
inline std::ostream& operator<< (std::ostream &out, SpeedAndTurnPenaltyFormat format) { {
switch (format) { switch (format)
case SpeedAndTurnPenaltyFormat::CSV: {
out << "csv"; case SpeedAndTurnPenaltyFormat::CSV:
break; out << "csv";
case SpeedAndTurnPenaltyFormat::PARQUET: break;
out << "parquet"; case SpeedAndTurnPenaltyFormat::PARQUET:
break; out << "parquet";
break;
} }
return out; return out;
} }
} // namespace updater } // namespace updater
} // namespace osrm } // namespace osrm

View File

@ -49,11 +49,15 @@ return_code parseArguments(int argc,
"core,k", "core,k",
boost::program_options::value<double>(&contractor_config.core_factor)->default_value(1.0), boost::program_options::value<double>(&contractor_config.core_factor)->default_value(1.0),
"DEPRECATED: Will always be 1.0. Percentage of the graph (in vertices) to contract " "DEPRECATED: Will always be 1.0. Percentage of the graph (in vertices) to contract "
"[0..1].") ("speed-and-turn-penalty-format", boost::program_options::value<updater::SpeedAndTurnPenaltyFormat>(&contractor_config.updater_config.speed_and_turn_penalty_format)->default_value(updater::SpeedAndTurnPenaltyFormat::CSV))("segment-speed-file", "[0..1].")("speed-and-turn-penalty-format",
boost::program_options::value<std::vector<std::string>>( boost::program_options::value<updater::SpeedAndTurnPenaltyFormat>(
&contractor_config.updater_config.segment_speed_lookup_paths) &contractor_config.updater_config.speed_and_turn_penalty_format)
->composing(), ->default_value(updater::SpeedAndTurnPenaltyFormat::CSV))(
"Lookup files containing nodeA, nodeB, speed data to adjust edge weights")( "segment-speed-file",
boost::program_options::value<std::vector<std::string>>(
&contractor_config.updater_config.segment_speed_lookup_paths)
->composing(),
"Lookup files containing nodeA, nodeB, speed data to adjust edge weights")(
"turn-penalty-file", "turn-penalty-file",
boost::program_options::value<std::vector<std::string>>( boost::program_options::value<std::vector<std::string>>(
&contractor_config.updater_config.turn_penalty_lookup_paths) &contractor_config.updater_config.turn_penalty_lookup_paths)

View File

@ -21,7 +21,6 @@ enum class return_code : unsigned
exit exit
}; };
return_code parseArguments(int argc, return_code parseArguments(int argc,
char *argv[], char *argv[],
std::string &verbosity, std::string &verbosity,
@ -41,8 +40,11 @@ return_code parseArguments(int argc,
("threads,t", ("threads,t",
boost::program_options::value<unsigned int>(&customization_config.requested_num_threads) boost::program_options::value<unsigned int>(&customization_config.requested_num_threads)
->default_value(std::thread::hardware_concurrency()), ->default_value(std::thread::hardware_concurrency()),
"Number of threads to use") "Number of threads to use")(
("speed-and-turn-penalty-format", boost::program_options::value<updater::SpeedAndTurnPenaltyFormat>(&customization_config.updater_config.speed_and_turn_penalty_format)->default_value(updater::SpeedAndTurnPenaltyFormat::CSV))( "speed-and-turn-penalty-format",
boost::program_options::value<updater::SpeedAndTurnPenaltyFormat>(
&customization_config.updater_config.speed_and_turn_penalty_format)
->default_value(updater::SpeedAndTurnPenaltyFormat::CSV))(
"segment-speed-file", "segment-speed-file",
boost::program_options::value<std::vector<std::string>>( boost::program_options::value<std::vector<std::string>>(
&customization_config.updater_config.segment_speed_lookup_paths) &customization_config.updater_config.segment_speed_lookup_paths)

View File

@ -34,37 +34,45 @@ namespace updater
namespace data namespace data
{ {
namespace { namespace
std::unique_ptr<FilesParser<Segment, SpeedSource>> makeSegmentParser(SpeedAndTurnPenaltyFormat format) { {
switch (format) { std::unique_ptr<FilesParser<Segment, SpeedSource>>
case SpeedAndTurnPenaltyFormat::CSV: makeSegmentParser(SpeedAndTurnPenaltyFormat format)
{ {
static const auto value_if_blank = std::numeric_limits<double>::quiet_NaN(); switch (format)
const qi::real_parser<double, qi::ureal_policies<double>> unsigned_double; {
return std::make_unique<CSVFilesParser<Segment, SpeedSource>>(qi::ulong_long >> ',' >> qi::ulong_long, case SpeedAndTurnPenaltyFormat::CSV:
unsigned_double >> -(',' >> (qi::double_ | qi::attr(value_if_blank)))); {
} static const auto value_if_blank = std::numeric_limits<double>::quiet_NaN();
case SpeedAndTurnPenaltyFormat::PARQUET: const qi::real_parser<double, qi::ureal_policies<double>> unsigned_double;
return std::make_unique<ParquetFilesParser<Segment, SpeedSource>>(); return std::make_unique<CSVFilesParser<Segment, SpeedSource>>(
qi::ulong_long >> ',' >> qi::ulong_long,
unsigned_double >> -(',' >> (qi::double_ | qi::attr(value_if_blank))));
} }
} case SpeedAndTurnPenaltyFormat::PARQUET:
return std::make_unique<ParquetFilesParser<Segment, SpeedSource>>();
}
}
std::unique_ptr<FilesParser<Turn, PenaltySource>> makeTurnParser(SpeedAndTurnPenaltyFormat format) { std::unique_ptr<FilesParser<Turn, PenaltySource>> makeTurnParser(SpeedAndTurnPenaltyFormat format)
switch (format) { {
case SpeedAndTurnPenaltyFormat::CSV: switch (format)
{ {
return std::make_unique<CSVFilesParser<Turn, PenaltySource>>(qi::ulong_long >> ',' >> qi::ulong_long >> ',' >> case SpeedAndTurnPenaltyFormat::CSV:
qi::ulong_long, {
qi::double_ >> -(',' >> qi::double_)); return std::make_unique<CSVFilesParser<Turn, PenaltySource>>(
} qi::ulong_long >> ',' >> qi::ulong_long >> ',' >> qi::ulong_long,
case SpeedAndTurnPenaltyFormat::PARQUET: qi::double_ >> -(',' >> qi::double_));
return std::make_unique<ParquetFilesParser<Turn, PenaltySource>>();
} }
} case SpeedAndTurnPenaltyFormat::PARQUET:
return std::make_unique<ParquetFilesParser<Turn, PenaltySource>>();
}
}
} // namespace } // namespace
SegmentLookupTable readSegmentValues(const std::vector<std::string> &paths, SpeedAndTurnPenaltyFormat format) SegmentLookupTable readSegmentValues(const std::vector<std::string> &paths,
SpeedAndTurnPenaltyFormat format)
{ {
auto parser = makeSegmentParser(format); auto parser = makeSegmentParser(format);
@ -83,7 +91,8 @@ SegmentLookupTable readSegmentValues(const std::vector<std::string> &paths, Spee
return result; return result;
} }
TurnLookupTable readTurnValues(const std::vector<std::string> &paths, SpeedAndTurnPenaltyFormat format) TurnLookupTable readTurnValues(const std::vector<std::string> &paths,
SpeedAndTurnPenaltyFormat format)
{ {
auto parser = makeTurnParser(format); auto parser = makeTurnParser(format);
return (*parser)(paths); return (*parser)(paths);

View File

@ -618,7 +618,8 @@ Updater::LoadAndUpdateEdgeExpandedGraph(std::vector<extractor::EdgeBasedEdge> &e
tbb::concurrent_vector<GeometryID> updated_segments; tbb::concurrent_vector<GeometryID> updated_segments;
if (update_edge_weights) if (update_edge_weights)
{ {
auto segment_speed_lookup = data::readSegmentValues(config.segment_speed_lookup_paths, config.speed_and_turn_penalty_format); auto segment_speed_lookup = data::readSegmentValues(config.segment_speed_lookup_paths,
config.speed_and_turn_penalty_format);
TIMER_START(segment); TIMER_START(segment);
updated_segments = updateSegmentData(config, updated_segments = updateSegmentData(config,
@ -633,7 +634,8 @@ Updater::LoadAndUpdateEdgeExpandedGraph(std::vector<extractor::EdgeBasedEdge> &e
util::Log() << "Updating segment data took " << TIMER_MSEC(segment) << "ms."; util::Log() << "Updating segment data took " << TIMER_MSEC(segment) << "ms.";
} }
auto turn_penalty_lookup = data::readTurnValues(config.turn_penalty_lookup_paths, config.speed_and_turn_penalty_format); auto turn_penalty_lookup = data::readTurnValues(config.turn_penalty_lookup_paths,
config.speed_and_turn_penalty_format);
if (update_turn_penalties) if (update_turn_penalties)
{ {
auto updated_turn_penalties = updateTurnPenalties(config, auto updated_turn_penalties = updateTurnPenalties(config,

View File

@ -1,13 +1,28 @@
#include <arrow/array.h>
#include <arrow/io/file.h>
#include <arrow/table.h>
#include <arrow/type_fwd.h>
#include <boost/test/tools/old/interface.hpp> #include <boost/test/tools/old/interface.hpp>
#include <updater/data_source.hpp>
#include <boost/test/unit_test.hpp> #include <boost/test/unit_test.hpp>
#include <parquet/stream_writer.h>
#include <updater/data_source.hpp>
using namespace osrm; using namespace osrm;
using namespace osrm::updater; using namespace osrm::updater;
BOOST_AUTO_TEST_CASE(parquet_readSegmentValues) BOOST_AUTO_TEST_CASE(parquet_readSegmentValues)
{ {
boost::filesystem::path test_path(TEST_DATA_DIR "/speeds_file.parquet"); {
SegmentLookupTable segment_lookup_table = data::readSegmentValues({test_path.string()}, SpeedAndTurnPenaltyFormat::PARQUET); SegmentLookupTable segment_lookup_table = data::readSegmentValues(
BOOST_CHECK_EQUAL(segment_lookup_table.lookup.size(), 2); {boost::filesystem::path{TEST_DATA_DIR "/speed.parquet"}.string()},
SpeedAndTurnPenaltyFormat::PARQUET);
BOOST_CHECK_EQUAL(segment_lookup_table.lookup.size(), 100);
}
{
SegmentLookupTable segment_lookup_table = data::readSegmentValues(
{boost::filesystem::path{TEST_DATA_DIR "/speed_without_rate.parquet"}.string()},
SpeedAndTurnPenaltyFormat::PARQUET);
BOOST_CHECK_EQUAL(segment_lookup_table.lookup.size(), 100);
}
} }

Binary file not shown.

Binary file not shown.