diff --git a/CMakeLists.txt b/CMakeLists.txt index ec780882a..bd70ddabf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -508,6 +508,9 @@ if(ENABLE_CONAN) onetbb:shared=${TBB_SHARED} boost:without_stacktrace=True # Apple Silicon cross-compilation fails without it arrow:parquet=True + arrow:with_snappy=True + arrow:with_brotli=True + arrow:with_zlib=True BUILD missing ) # explicitly say Conan to use x86 dependencies if build for x86 platforms (https://github.com/conan-io/cmake-conan/issues/141) @@ -569,6 +572,9 @@ else() add_dependency_includes(${TBB_INCLUDE_DIR}) set(TBB_LIBRARIES TBB::tbb) + find_package(Parquet REQUIRED) + set(ARROW_LIBRARIES Parquet::parquet_shared) + find_package(EXPAT REQUIRED) add_dependency_includes(${EXPAT_INCLUDE_DIRS}) diff --git a/include/updater/csv_file_parser.hpp b/include/updater/csv_file_parser.hpp index 8ab3f3dfb..2a92bac33 100644 --- a/include/updater/csv_file_parser.hpp +++ b/include/updater/csv_file_parser.hpp @@ -1,9 +1,9 @@ #ifndef OSRM_UPDATER_CSV_FILE_PARSER_HPP #define OSRM_UPDATER_CSV_FILE_PARSER_HPP #include "file_parser.hpp" +#include #include #include -#include #include "updater/source.hpp" @@ -47,7 +47,8 @@ template struct CSVFilesParser : public FilesPars private: // Parse a single CSV file and return result as a vector - std::vector> ParseFile(const std::string &filename, std::size_t file_id) const final + std::vector> ParseFile(const std::string &filename, + std::size_t file_id) const final { namespace qi = boost::spirit::qi; @@ -90,7 +91,7 @@ template struct CSVFilesParser : public FilesPars throw util::exception(message.str() + SOURCE_REF); } } - + const KeyRule key_rule; const ValueRule value_rule; }; diff --git a/include/updater/data_source.hpp b/include/updater/data_source.hpp index 54551a675..4bfe1fa24 100644 --- a/include/updater/data_source.hpp +++ b/include/updater/data_source.hpp @@ -10,8 +10,10 @@ namespace updater { namespace data { -SegmentLookupTable readSegmentValues(const std::vector &paths, SpeedAndTurnPenaltyFormat format); -TurnLookupTable readTurnValues(const std::vector &paths, SpeedAndTurnPenaltyFormat format); +SegmentLookupTable readSegmentValues(const std::vector &paths, + SpeedAndTurnPenaltyFormat format); +TurnLookupTable readTurnValues(const std::vector &paths, + SpeedAndTurnPenaltyFormat format); } // namespace data } // namespace updater } // namespace osrm diff --git a/include/updater/file_parser.hpp b/include/updater/file_parser.hpp index 2ed2449f7..830ed23b6 100644 --- a/include/updater/file_parser.hpp +++ b/include/updater/file_parser.hpp @@ -1,8 +1,8 @@ #ifndef OSRM_UPDATER_FILE_PARSER_HPP #define OSRM_UPDATER_FILE_PARSER_HPP +#include #include #include -#include #include "updater/source.hpp" @@ -88,7 +88,8 @@ template struct FilesParser protected: // Parse a single CSV file and return result as a vector - virtual std::vector> ParseFile(const std::string &filename, std::size_t file_id) const; + virtual std::vector> ParseFile(const std::string &filename, + std::size_t file_id) const; }; } // namespace updater } // namespace osrm diff --git a/include/updater/parquet_file_parser.hpp b/include/updater/parquet_file_parser.hpp index 7e46b3431..308451162 100644 --- a/include/updater/parquet_file_parser.hpp +++ b/include/updater/parquet_file_parser.hpp @@ -1,11 +1,11 @@ #ifndef OSRM_UPDATER_PARQUET_FILE_PARSER_HPP #define OSRM_UPDATER_PARQUET_FILE_PARSER_HPP #include "file_parser.hpp" +#include #include #include #include #include -#include #include "updater/source.hpp" @@ -31,24 +31,26 @@ template struct ParquetFilesParser : public Files { private: // Parse a single Parquet file and return result as a vector - std::vector> ParseFile(const std::string &filename, std::size_t file_id) const final + std::vector> ParseFile(const std::string &filename, + std::size_t file_id) const final { - try { + try + { std::shared_ptr infile; - PARQUET_ASSIGN_OR_THROW( - infile, - arrow::io::ReadableFile::Open(filename)); + PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open(filename)); parquet::StreamReader os{parquet::ParquetFileReader::Open(infile)}; std::vector> result; - while ( !os.eof() ) + while (!os.eof()) { result.emplace_back(ReadKeyValue(os, file_id)); } return result; - } catch (const std::exception &e) { + } + catch (const std::exception &e) + { throw util::exception(e.what() + SOURCE_REF); } } @@ -64,38 +66,31 @@ template struct ParquetFilesParser : public Files return {key, value}; } - void Read(parquet::StreamReader &os, Turn& turn) const { - int64_t from, via, to; - os >> from >> via >> to; - turn.from = from; - turn.via = via; - turn.to = to; + void Read(parquet::StreamReader &os, Turn &turn) const + { + os >> turn.from >> turn.via >> turn.to; } - void Read(parquet::StreamReader &os, PenaltySource& penalty_source) const { + void Read(parquet::StreamReader &os, PenaltySource &penalty_source) const + { os >> penalty_source.duration >> penalty_source.weight; } - void Read(parquet::StreamReader &os, Segment& segment) const { - int64_t from; - int64_t to; - os >> from >> to; - - segment.from = from; - segment.to = to; - //std::cerr << from << " " << to<< std::endl; - //os >> segment.from >> segment.to >> parquet::EndRow; + void Read(parquet::StreamReader &os, Segment &segment) const + { + os >> segment.from >> segment.to; } - void Read(parquet::StreamReader &os, SpeedSource& speed_source) const { + void Read(parquet::StreamReader &os, SpeedSource &speed_source) const + { std::optional rate; os >> speed_source.speed >> rate; // TODO: boost::optional - if (rate) { + if (rate) + { speed_source.rate = *rate; } } - }; } // namespace updater } // namespace osrm diff --git a/include/updater/updater_config.hpp b/include/updater/updater_config.hpp index dcc46c810..b32c221cd 100644 --- a/include/updater/updater_config.hpp +++ b/include/updater/updater_config.hpp @@ -42,11 +42,11 @@ namespace osrm namespace updater { - - enum class SpeedAndTurnPenaltyFormat { - CSV, - PARQUET - }; +enum class SpeedAndTurnPenaltyFormat +{ + CSV, + PARQUET +}; struct UpdaterConfig final : storage::IOConfig { @@ -76,45 +76,51 @@ struct UpdaterConfig final : storage::IOConfig double log_edge_updates_factor = 0.0; std::time_t valid_now; -SpeedAndTurnPenaltyFormat speed_and_turn_penalty_format = SpeedAndTurnPenaltyFormat::CSV; + SpeedAndTurnPenaltyFormat speed_and_turn_penalty_format = SpeedAndTurnPenaltyFormat::CSV; std::vector segment_speed_lookup_paths; std::vector turn_penalty_lookup_paths; std::string tz_file_path; }; - -inline std::istream& operator>> (std::istream &in, SpeedAndTurnPenaltyFormat& format) { +inline std::istream &operator>>(std::istream &in, SpeedAndTurnPenaltyFormat &format) +{ std::string token; in >> token; - std::transform(token.begin(), token.end(), token.begin(), [](auto c){ return std::tolower(c); }); + std::transform( + token.begin(), token.end(), token.begin(), [](auto c) { return std::tolower(c); }); - if (token == "csv") { + if (token == "csv") + { format = SpeedAndTurnPenaltyFormat::CSV; - } else if (token == "parquet") { + } + else if (token == "parquet") + { format = SpeedAndTurnPenaltyFormat::PARQUET; - } else { - throw boost::program_options::validation_error{boost::program_options::validation_error::invalid_option_value}; + } + else + { + throw boost::program_options::validation_error{ + boost::program_options::validation_error::invalid_option_value}; } return in; } - -inline std::ostream& operator<< (std::ostream &out, SpeedAndTurnPenaltyFormat format) { - switch (format) { - case SpeedAndTurnPenaltyFormat::CSV: - out << "csv"; - break; - case SpeedAndTurnPenaltyFormat::PARQUET: - out << "parquet"; - break; +inline std::ostream &operator<<(std::ostream &out, SpeedAndTurnPenaltyFormat format) +{ + switch (format) + { + case SpeedAndTurnPenaltyFormat::CSV: + out << "csv"; + break; + case SpeedAndTurnPenaltyFormat::PARQUET: + out << "parquet"; + break; } return out; } - - } // namespace updater } // namespace osrm diff --git a/src/tools/contract.cpp b/src/tools/contract.cpp index 2288ab0f3..e6b7701c0 100644 --- a/src/tools/contract.cpp +++ b/src/tools/contract.cpp @@ -49,11 +49,15 @@ return_code parseArguments(int argc, "core,k", boost::program_options::value(&contractor_config.core_factor)->default_value(1.0), "DEPRECATED: Will always be 1.0. Percentage of the graph (in vertices) to contract " - "[0..1].") ("speed-and-turn-penalty-format", boost::program_options::value(&contractor_config.updater_config.speed_and_turn_penalty_format)->default_value(updater::SpeedAndTurnPenaltyFormat::CSV))("segment-speed-file", - boost::program_options::value>( - &contractor_config.updater_config.segment_speed_lookup_paths) - ->composing(), - "Lookup files containing nodeA, nodeB, speed data to adjust edge weights")( + "[0..1].")("speed-and-turn-penalty-format", + boost::program_options::value( + &contractor_config.updater_config.speed_and_turn_penalty_format) + ->default_value(updater::SpeedAndTurnPenaltyFormat::CSV))( + "segment-speed-file", + boost::program_options::value>( + &contractor_config.updater_config.segment_speed_lookup_paths) + ->composing(), + "Lookup files containing nodeA, nodeB, speed data to adjust edge weights")( "turn-penalty-file", boost::program_options::value>( &contractor_config.updater_config.turn_penalty_lookup_paths) diff --git a/src/tools/customize.cpp b/src/tools/customize.cpp index 4ae8ad6fb..7d14f8834 100644 --- a/src/tools/customize.cpp +++ b/src/tools/customize.cpp @@ -21,7 +21,6 @@ enum class return_code : unsigned exit }; - return_code parseArguments(int argc, char *argv[], std::string &verbosity, @@ -41,8 +40,11 @@ return_code parseArguments(int argc, ("threads,t", boost::program_options::value(&customization_config.requested_num_threads) ->default_value(std::thread::hardware_concurrency()), - "Number of threads to use") - ("speed-and-turn-penalty-format", boost::program_options::value(&customization_config.updater_config.speed_and_turn_penalty_format)->default_value(updater::SpeedAndTurnPenaltyFormat::CSV))( + "Number of threads to use")( + "speed-and-turn-penalty-format", + boost::program_options::value( + &customization_config.updater_config.speed_and_turn_penalty_format) + ->default_value(updater::SpeedAndTurnPenaltyFormat::CSV))( "segment-speed-file", boost::program_options::value>( &customization_config.updater_config.segment_speed_lookup_paths) diff --git a/src/updater/data_source.cpp b/src/updater/data_source.cpp index 456cd8ccb..a74d3de16 100644 --- a/src/updater/data_source.cpp +++ b/src/updater/data_source.cpp @@ -34,37 +34,45 @@ namespace updater namespace data { -namespace { -std::unique_ptr> makeSegmentParser(SpeedAndTurnPenaltyFormat format) { - switch (format) { - case SpeedAndTurnPenaltyFormat::CSV: - { - static const auto value_if_blank = std::numeric_limits::quiet_NaN(); - const qi::real_parser> unsigned_double; - return std::make_unique>(qi::ulong_long >> ',' >> qi::ulong_long, - unsigned_double >> -(',' >> (qi::double_ | qi::attr(value_if_blank)))); - } - case SpeedAndTurnPenaltyFormat::PARQUET: - return std::make_unique>(); +namespace +{ +std::unique_ptr> +makeSegmentParser(SpeedAndTurnPenaltyFormat format) +{ + switch (format) + { + case SpeedAndTurnPenaltyFormat::CSV: + { + static const auto value_if_blank = std::numeric_limits::quiet_NaN(); + const qi::real_parser> unsigned_double; + return std::make_unique>( + qi::ulong_long >> ',' >> qi::ulong_long, + unsigned_double >> -(',' >> (qi::double_ | qi::attr(value_if_blank)))); } -} + case SpeedAndTurnPenaltyFormat::PARQUET: + return std::make_unique>(); + } +} -std::unique_ptr> makeTurnParser(SpeedAndTurnPenaltyFormat format) { - switch (format) { - case SpeedAndTurnPenaltyFormat::CSV: - { - return std::make_unique>(qi::ulong_long >> ',' >> qi::ulong_long >> ',' >> - qi::ulong_long, - qi::double_ >> -(',' >> qi::double_)); - } - case SpeedAndTurnPenaltyFormat::PARQUET: - return std::make_unique>(); +std::unique_ptr> makeTurnParser(SpeedAndTurnPenaltyFormat format) +{ + switch (format) + { + case SpeedAndTurnPenaltyFormat::CSV: + { + return std::make_unique>( + qi::ulong_long >> ',' >> qi::ulong_long >> ',' >> qi::ulong_long, + qi::double_ >> -(',' >> qi::double_)); } -} + case SpeedAndTurnPenaltyFormat::PARQUET: + return std::make_unique>(); + } +} } // namespace -SegmentLookupTable readSegmentValues(const std::vector &paths, SpeedAndTurnPenaltyFormat format) +SegmentLookupTable readSegmentValues(const std::vector &paths, + SpeedAndTurnPenaltyFormat format) { auto parser = makeSegmentParser(format); @@ -83,7 +91,8 @@ SegmentLookupTable readSegmentValues(const std::vector &paths, Spee return result; } -TurnLookupTable readTurnValues(const std::vector &paths, SpeedAndTurnPenaltyFormat format) +TurnLookupTable readTurnValues(const std::vector &paths, + SpeedAndTurnPenaltyFormat format) { auto parser = makeTurnParser(format); return (*parser)(paths); diff --git a/src/updater/updater.cpp b/src/updater/updater.cpp index 6b8ea8919..c3d4ad29c 100644 --- a/src/updater/updater.cpp +++ b/src/updater/updater.cpp @@ -618,7 +618,8 @@ Updater::LoadAndUpdateEdgeExpandedGraph(std::vector &e tbb::concurrent_vector updated_segments; if (update_edge_weights) { - auto segment_speed_lookup = data::readSegmentValues(config.segment_speed_lookup_paths, config.speed_and_turn_penalty_format); + auto segment_speed_lookup = data::readSegmentValues(config.segment_speed_lookup_paths, + config.speed_and_turn_penalty_format); TIMER_START(segment); updated_segments = updateSegmentData(config, @@ -633,7 +634,8 @@ Updater::LoadAndUpdateEdgeExpandedGraph(std::vector &e util::Log() << "Updating segment data took " << TIMER_MSEC(segment) << "ms."; } - auto turn_penalty_lookup = data::readTurnValues(config.turn_penalty_lookup_paths, config.speed_and_turn_penalty_format); + auto turn_penalty_lookup = data::readTurnValues(config.turn_penalty_lookup_paths, + config.speed_and_turn_penalty_format); if (update_turn_penalties) { auto updated_turn_penalties = updateTurnPenalties(config, diff --git a/unit_tests/updater/parquet.cpp b/unit_tests/updater/parquet.cpp index 35a99ef2c..07a93d75c 100644 --- a/unit_tests/updater/parquet.cpp +++ b/unit_tests/updater/parquet.cpp @@ -1,13 +1,28 @@ +#include +#include +#include +#include #include -#include #include +#include +#include using namespace osrm; using namespace osrm::updater; BOOST_AUTO_TEST_CASE(parquet_readSegmentValues) { - boost::filesystem::path test_path(TEST_DATA_DIR "/speeds_file.parquet"); - SegmentLookupTable segment_lookup_table = data::readSegmentValues({test_path.string()}, SpeedAndTurnPenaltyFormat::PARQUET); - BOOST_CHECK_EQUAL(segment_lookup_table.lookup.size(), 2); + { + SegmentLookupTable segment_lookup_table = data::readSegmentValues( + {boost::filesystem::path{TEST_DATA_DIR "/speed.parquet"}.string()}, + SpeedAndTurnPenaltyFormat::PARQUET); + BOOST_CHECK_EQUAL(segment_lookup_table.lookup.size(), 100); + } + + { + SegmentLookupTable segment_lookup_table = data::readSegmentValues( + {boost::filesystem::path{TEST_DATA_DIR "/speed_without_rate.parquet"}.string()}, + SpeedAndTurnPenaltyFormat::PARQUET); + BOOST_CHECK_EQUAL(segment_lookup_table.lookup.size(), 100); + } } \ No newline at end of file diff --git a/unit_tests/updater/speed.parquet b/unit_tests/updater/speed.parquet new file mode 100644 index 000000000..cec25b6a5 Binary files /dev/null and b/unit_tests/updater/speed.parquet differ diff --git a/unit_tests/updater/speed_without_rate.parquet b/unit_tests/updater/speed_without_rate.parquet new file mode 100644 index 000000000..4902ee956 Binary files /dev/null and b/unit_tests/updater/speed_without_rate.parquet differ diff --git a/unit_tests/updater/speeds_file.parquet b/unit_tests/updater/speeds_file.parquet deleted file mode 100644 index bb71d8e9a..000000000 Binary files a/unit_tests/updater/speeds_file.parquet and /dev/null differ