diff --git a/include/updater/csv_file_parser.hpp b/include/updater/csv_file_parser.hpp index a0c987d93..8ab3f3dfb 100644 --- a/include/updater/csv_file_parser.hpp +++ b/include/updater/csv_file_parser.hpp @@ -1,5 +1,6 @@ #ifndef OSRM_UPDATER_CSV_FILE_PARSER_HPP #define OSRM_UPDATER_CSV_FILE_PARSER_HPP +#include "file_parser.hpp" #include #include #include @@ -33,77 +34,20 @@ namespace updater // Key and Value structures must be a model of Random Access Sequence. // Also the Value structure must have source member that will be filled // with the corresponding file index in the CSV filenames vector. -template struct CSVFilesParser +template struct CSVFilesParser : public FilesParser { using Iterator = boost::iostreams::mapped_file_source::iterator; using KeyRule = boost::spirit::qi::rule; using ValueRule = boost::spirit::qi::rule; - CSVFilesParser(std::size_t start_index, const KeyRule &key_rule, const ValueRule &value_rule) - : start_index(start_index), key_rule(key_rule), value_rule(value_rule) + CSVFilesParser(const KeyRule &key_rule, const ValueRule &value_rule) + : key_rule(key_rule), value_rule(value_rule) { } - // Operator returns a lambda function that maps input Key to boost::optional. - auto operator()(const std::vector &csv_filenames) const - { - -std::shared_ptr infile; - - PARQUET_ASSIGN_OR_THROW( - infile, - arrow::io::ReadableFile::Open("test.parquet")); - - parquet::StreamReader os{parquet::ParquetFileReader::Open(infile)}; - (void)os; - - try - { - tbb::spin_mutex mutex; - std::vector> lookup; - tbb::parallel_for(std::size_t{0}, csv_filenames.size(), [&](const std::size_t idx) { - auto local = ParseCSVFile(csv_filenames[idx], start_index + idx); - - { // Merge local CSV results into a flat global vector - tbb::spin_mutex::scoped_lock _{mutex}; - lookup.insert(end(lookup), - std::make_move_iterator(begin(local)), - std::make_move_iterator(end(local))); - } - }); - - // With flattened map-ish view of all the files, make a stable sort on key and source - // and unique them on key to keep only the value with the largest file index - // and the largest line number in a file. - // The operands order is swapped to make descending ordering on (key, source) - tbb::parallel_sort(begin(lookup), end(lookup), [](const auto &lhs, const auto &rhs) { - return std::tie(rhs.first, rhs.second.source) < - std::tie(lhs.first, lhs.second.source); - }); - - // Unique only on key to take the source precedence into account and remove duplicates. - const auto it = - std::unique(begin(lookup), end(lookup), [](const auto &lhs, const auto &rhs) { - return lhs.first == rhs.first; - }); - lookup.erase(it, end(lookup)); - - util::Log() << "In total loaded " << csv_filenames.size() << " file(s) with a total of " - << lookup.size() << " unique values"; - - return LookupTable{lookup}; - } - catch (const std::exception &e) - // TBB should capture to std::exception_ptr and automatically rethrow in this thread. - // https://software.intel.com/en-us/node/506317 - { - throw util::exception(e.what() + SOURCE_REF); - } - } - private: // Parse a single CSV file and return result as a vector - auto ParseCSVFile(const std::string &filename, std::size_t file_id) const + std::vector> ParseFile(const std::string &filename, std::size_t file_id) const final { namespace qi = boost::spirit::qi; @@ -146,8 +90,7 @@ std::shared_ptr infile; throw util::exception(message.str() + SOURCE_REF); } } - - const std::size_t start_index; + const KeyRule key_rule; const ValueRule value_rule; }; diff --git a/include/updater/file_parser.hpp b/include/updater/file_parser.hpp new file mode 100644 index 000000000..09f61a372 --- /dev/null +++ b/include/updater/file_parser.hpp @@ -0,0 +1,106 @@ +#ifndef OSRM_UPDATER_FILE_PARSER_HPP +#define OSRM_UPDATER_FILE_PARSER_HPP +#include +#include +#include + +#include "updater/source.hpp" + +#include "util/exception.hpp" +#include "util/exception_utils.hpp" +#include "util/log.hpp" + +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace osrm +{ +namespace updater +{ + +// Functor to parse a list of CSV files using "key,value,comment" grammar. +// Key and Value structures must be a model of Random Access Sequence. +// Also the Value structure must have source member that will be filled +// with the corresponding file index in the CSV filenames vector. +template struct FilesParser +{ + + virtual ~FilesParser() = default; + + // Operator returns a lambda function that maps input Key to boost::optional. + auto operator()(const std::vector &csv_filenames) const + { + +// std::shared_ptr infile; + +// PARQUET_ASSIGN_OR_THROW( +// infile, +// arrow::io::ReadableFile::Open("test.parquet")); + +// parquet::StreamReader os{parquet::ParquetFileReader::Open(infile)}; +// (void)os; + + try + { + tbb::spin_mutex mutex; + std::vector> lookup; + tbb::parallel_for(std::size_t{0}, csv_filenames.size(), [&](const std::size_t idx) { + // TODO: do we need this `1 + ` here? + auto local = ParseFile(csv_filenames[idx], 1 + idx); + + { // Merge local CSV results into a flat global vector + tbb::spin_mutex::scoped_lock _{mutex}; + lookup.insert(end(lookup), + std::make_move_iterator(begin(local)), + std::make_move_iterator(end(local))); + } + }); + + // With flattened map-ish view of all the files, make a stable sort on key and source + // and unique them on key to keep only the value with the largest file index + // and the largest line number in a file. + // The operands order is swapped to make descending ordering on (key, source) + tbb::parallel_sort(begin(lookup), end(lookup), [](const auto &lhs, const auto &rhs) { + return std::tie(rhs.first, rhs.second.source) < + std::tie(lhs.first, lhs.second.source); + }); + + // Unique only on key to take the source precedence into account and remove duplicates. + const auto it = + std::unique(begin(lookup), end(lookup), [](const auto &lhs, const auto &rhs) { + return lhs.first == rhs.first; + }); + lookup.erase(it, end(lookup)); + + util::Log() << "In total loaded " << csv_filenames.size() << " file(s) with a total of " + << lookup.size() << " unique values"; + + return LookupTable{lookup}; + } + catch (const std::exception &e) + // TBB should capture to std::exception_ptr and automatically rethrow in this thread. + // https://software.intel.com/en-us/node/506317 + { + throw util::exception(e.what() + SOURCE_REF); + } + } + + protected: + // Parse a single CSV file and return result as a vector + virtual std::vector> ParseFile(const std::string &filename, std::size_t file_id) const; +}; +} // namespace updater +} // namespace osrm + +#endif diff --git a/include/updater/parquet_file_parser.hpp b/include/updater/parquet_file_parser.hpp new file mode 100644 index 000000000..ce52d34c4 --- /dev/null +++ b/include/updater/parquet_file_parser.hpp @@ -0,0 +1,100 @@ +#ifndef OSRM_UPDATER_PARQUET_FILE_PARSER_HPP +#define OSRM_UPDATER_PARQUET_FILE_PARSER_HPP +#include "file_parser.hpp" +#include +#include +#include +#include + +#include "updater/source.hpp" + +#include "util/exception.hpp" +#include "util/exception_utils.hpp" +#include "util/log.hpp" + +#include +#include +#include +#include + +#include +#include +#include + +namespace osrm +{ +namespace updater +{ + +// Functor to parse a list of CSV files using "key,value,comment" grammar. +// Key and Value structures must be a model of Random Access Sequence. +// Also the Value structure must have source member that will be filled +// with the corresponding file index in the CSV filenames vector. +template struct ParquetFilesParser : public FilesParser +{ + private: + // Parse a single CSV file and return result as a vector + std::vector> ParseFile(const std::string &filename, std::size_t file_id) const final + { + // TODO: error handling + std::shared_ptr infile; + + PARQUET_ASSIGN_OR_THROW( + infile, + arrow::io::ReadableFile::Open(filename)); + + parquet::StreamReader os{parquet::ParquetFileReader::Open(infile)}; + + std::vector> result; + while ( !os.eof() ) + { + result.emplace_back(ReadKey(os), ReadValue(os, file_id)); + // ... + } + return result; +// (void)os; + + } + + Key ReadKey(parquet::StreamReader &os) const + { + Key key; + ReadKey(os, key); + return key; + } + + Value ReadValue(parquet::StreamReader &os, std::size_t file_id) const + { + Value value; + ReadValue(os, value); + value.source = file_id; + return value; + } + + void ReadKey(parquet::StreamReader &os, Turn& turn) const { + os >> turn.from >> turn.via >> turn.to >> parquet::EndRow; + } + + void ReadValue(parquet::StreamReader &os, PenaltySource& penalty_source) const { + os >> penalty_source.duration >> penalty_source.weight >> parquet::EndRow; + } + + void ReadKey(parquet::StreamReader &os, Segment& segment) const { + os >> segment.from >> segment.to >> parquet::EndRow; + } + + void ReadValue(parquet::StreamReader &os, SpeedSource& speed_source) const { + std::optional rate; + os >> speed_source.speed >> rate >> parquet::EndRow; + // TODO: boost::optional + if (rate) { + speed_source.rate = *rate; + } + // os >> turn.weight >> turn.duration >> turn.pre_turn_bearing >> turn.post_turn_bearing >> turn.source >> parquet::EndRow; + } + +}; +} // namespace updater +} // namespace osrm + +#endif diff --git a/src/updater/csv_source.cpp b/src/updater/csv_source.cpp index fe13a7477..6469c5e3f 100644 --- a/src/updater/csv_source.cpp +++ b/src/updater/csv_source.cpp @@ -1,6 +1,7 @@ #include "updater/csv_source.hpp" #include "updater/csv_file_parser.hpp" +#include "updater/parquet_file_parser.hpp" #include #include @@ -33,12 +34,9 @@ namespace csv { SegmentLookupTable readSegmentValues(const std::vector &paths) { - static const auto value_if_blank = std::numeric_limits::quiet_NaN(); - const qi::real_parser> unsigned_double; - CSVFilesParser parser( - 1, - qi::ulong_long >> ',' >> qi::ulong_long, - unsigned_double >> -(',' >> (qi::double_ | qi::attr(value_if_blank)))); + //static const auto value_if_blank = std::numeric_limits::quiet_NaN(); + //const qi::real_parser> unsigned_double; + ParquetFilesParser parser; // Check consistency of keys in the result lookup table auto result = parser(paths); @@ -57,10 +55,7 @@ SegmentLookupTable readSegmentValues(const std::vector &paths) TurnLookupTable readTurnValues(const std::vector &paths) { - CSVFilesParser parser(1, - qi::ulong_long >> ',' >> qi::ulong_long >> ',' >> - qi::ulong_long, - qi::double_ >> -(',' >> qi::double_)); + ParquetFilesParser parser; return parser(paths); } } // namespace csv