wip
This commit is contained in:
parent
4046f9dfd3
commit
47488700c4
@ -1,5 +1,6 @@
|
|||||||
#ifndef OSRM_UPDATER_CSV_FILE_PARSER_HPP
|
#ifndef OSRM_UPDATER_CSV_FILE_PARSER_HPP
|
||||||
#define OSRM_UPDATER_CSV_FILE_PARSER_HPP
|
#define OSRM_UPDATER_CSV_FILE_PARSER_HPP
|
||||||
|
#include "file_parser.hpp"
|
||||||
#include <parquet/arrow/reader.h>
|
#include <parquet/arrow/reader.h>
|
||||||
#include <parquet/stream_reader.h>
|
#include <parquet/stream_reader.h>
|
||||||
#include <arrow/io/file.h>
|
#include <arrow/io/file.h>
|
||||||
@ -33,77 +34,20 @@ namespace updater
|
|||||||
// Key and Value structures must be a model of Random Access Sequence.
|
// Key and Value structures must be a model of Random Access Sequence.
|
||||||
// Also the Value structure must have source member that will be filled
|
// Also the Value structure must have source member that will be filled
|
||||||
// with the corresponding file index in the CSV filenames vector.
|
// with the corresponding file index in the CSV filenames vector.
|
||||||
template <typename Key, typename Value> struct CSVFilesParser
|
template <typename Key, typename Value> struct CSVFilesParser : public FilesParser<Key, Value>
|
||||||
{
|
{
|
||||||
using Iterator = boost::iostreams::mapped_file_source::iterator;
|
using Iterator = boost::iostreams::mapped_file_source::iterator;
|
||||||
using KeyRule = boost::spirit::qi::rule<Iterator, Key()>;
|
using KeyRule = boost::spirit::qi::rule<Iterator, Key()>;
|
||||||
using ValueRule = boost::spirit::qi::rule<Iterator, Value()>;
|
using ValueRule = boost::spirit::qi::rule<Iterator, Value()>;
|
||||||
|
|
||||||
CSVFilesParser(std::size_t start_index, const KeyRule &key_rule, const ValueRule &value_rule)
|
CSVFilesParser(const KeyRule &key_rule, const ValueRule &value_rule)
|
||||||
: start_index(start_index), key_rule(key_rule), value_rule(value_rule)
|
: key_rule(key_rule), value_rule(value_rule)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
// Operator returns a lambda function that maps input Key to boost::optional<Value>.
|
|
||||||
auto operator()(const std::vector<std::string> &csv_filenames) const
|
|
||||||
{
|
|
||||||
|
|
||||||
std::shared_ptr<arrow::io::ReadableFile> infile;
|
|
||||||
|
|
||||||
PARQUET_ASSIGN_OR_THROW(
|
|
||||||
infile,
|
|
||||||
arrow::io::ReadableFile::Open("test.parquet"));
|
|
||||||
|
|
||||||
parquet::StreamReader os{parquet::ParquetFileReader::Open(infile)};
|
|
||||||
(void)os;
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
tbb::spin_mutex mutex;
|
|
||||||
std::vector<std::pair<Key, Value>> lookup;
|
|
||||||
tbb::parallel_for(std::size_t{0}, csv_filenames.size(), [&](const std::size_t idx) {
|
|
||||||
auto local = ParseCSVFile(csv_filenames[idx], start_index + idx);
|
|
||||||
|
|
||||||
{ // Merge local CSV results into a flat global vector
|
|
||||||
tbb::spin_mutex::scoped_lock _{mutex};
|
|
||||||
lookup.insert(end(lookup),
|
|
||||||
std::make_move_iterator(begin(local)),
|
|
||||||
std::make_move_iterator(end(local)));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// With flattened map-ish view of all the files, make a stable sort on key and source
|
|
||||||
// and unique them on key to keep only the value with the largest file index
|
|
||||||
// and the largest line number in a file.
|
|
||||||
// The operands order is swapped to make descending ordering on (key, source)
|
|
||||||
tbb::parallel_sort(begin(lookup), end(lookup), [](const auto &lhs, const auto &rhs) {
|
|
||||||
return std::tie(rhs.first, rhs.second.source) <
|
|
||||||
std::tie(lhs.first, lhs.second.source);
|
|
||||||
});
|
|
||||||
|
|
||||||
// Unique only on key to take the source precedence into account and remove duplicates.
|
|
||||||
const auto it =
|
|
||||||
std::unique(begin(lookup), end(lookup), [](const auto &lhs, const auto &rhs) {
|
|
||||||
return lhs.first == rhs.first;
|
|
||||||
});
|
|
||||||
lookup.erase(it, end(lookup));
|
|
||||||
|
|
||||||
util::Log() << "In total loaded " << csv_filenames.size() << " file(s) with a total of "
|
|
||||||
<< lookup.size() << " unique values";
|
|
||||||
|
|
||||||
return LookupTable<Key, Value>{lookup};
|
|
||||||
}
|
|
||||||
catch (const std::exception &e)
|
|
||||||
// TBB should capture to std::exception_ptr and automatically rethrow in this thread.
|
|
||||||
// https://software.intel.com/en-us/node/506317
|
|
||||||
{
|
|
||||||
throw util::exception(e.what() + SOURCE_REF);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// Parse a single CSV file and return result as a vector<Key, Value>
|
// Parse a single CSV file and return result as a vector<Key, Value>
|
||||||
auto ParseCSVFile(const std::string &filename, std::size_t file_id) const
|
std::vector<std::pair<Key, Value>> ParseFile(const std::string &filename, std::size_t file_id) const final
|
||||||
{
|
{
|
||||||
namespace qi = boost::spirit::qi;
|
namespace qi = boost::spirit::qi;
|
||||||
|
|
||||||
@ -146,8 +90,7 @@ std::shared_ptr<arrow::io::ReadableFile> infile;
|
|||||||
throw util::exception(message.str() + SOURCE_REF);
|
throw util::exception(message.str() + SOURCE_REF);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const std::size_t start_index;
|
|
||||||
const KeyRule key_rule;
|
const KeyRule key_rule;
|
||||||
const ValueRule value_rule;
|
const ValueRule value_rule;
|
||||||
};
|
};
|
||||||
|
|||||||
106
include/updater/file_parser.hpp
Normal file
106
include/updater/file_parser.hpp
Normal file
@ -0,0 +1,106 @@
|
|||||||
|
#ifndef OSRM_UPDATER_FILE_PARSER_HPP
|
||||||
|
#define OSRM_UPDATER_FILE_PARSER_HPP
|
||||||
|
#include <parquet/arrow/reader.h>
|
||||||
|
#include <parquet/stream_reader.h>
|
||||||
|
#include <arrow/io/file.h>
|
||||||
|
|
||||||
|
#include "updater/source.hpp"
|
||||||
|
|
||||||
|
#include "util/exception.hpp"
|
||||||
|
#include "util/exception_utils.hpp"
|
||||||
|
#include "util/log.hpp"
|
||||||
|
|
||||||
|
#include <tbb/parallel_for.h>
|
||||||
|
#include <tbb/parallel_sort.h>
|
||||||
|
#include <tbb/spin_mutex.h>
|
||||||
|
|
||||||
|
#include <boost/exception/diagnostic_information.hpp>
|
||||||
|
#include <boost/filesystem.hpp>
|
||||||
|
#include <boost/iostreams/device/mapped_file.hpp>
|
||||||
|
#include <boost/spirit/include/phoenix.hpp>
|
||||||
|
#include <boost/spirit/include/qi.hpp>
|
||||||
|
|
||||||
|
#include <exception>
|
||||||
|
#include <stdexcept>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
namespace osrm
|
||||||
|
{
|
||||||
|
namespace updater
|
||||||
|
{
|
||||||
|
|
||||||
|
// Functor to parse a list of CSV files using "key,value,comment" grammar.
|
||||||
|
// Key and Value structures must be a model of Random Access Sequence.
|
||||||
|
// Also the Value structure must have source member that will be filled
|
||||||
|
// with the corresponding file index in the CSV filenames vector.
|
||||||
|
template <typename Key, typename Value> struct FilesParser
|
||||||
|
{
|
||||||
|
|
||||||
|
virtual ~FilesParser() = default;
|
||||||
|
|
||||||
|
// Operator returns a lambda function that maps input Key to boost::optional<Value>.
|
||||||
|
auto operator()(const std::vector<std::string> &csv_filenames) const
|
||||||
|
{
|
||||||
|
|
||||||
|
// std::shared_ptr<arrow::io::ReadableFile> infile;
|
||||||
|
|
||||||
|
// PARQUET_ASSIGN_OR_THROW(
|
||||||
|
// infile,
|
||||||
|
// arrow::io::ReadableFile::Open("test.parquet"));
|
||||||
|
|
||||||
|
// parquet::StreamReader os{parquet::ParquetFileReader::Open(infile)};
|
||||||
|
// (void)os;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
tbb::spin_mutex mutex;
|
||||||
|
std::vector<std::pair<Key, Value>> lookup;
|
||||||
|
tbb::parallel_for(std::size_t{0}, csv_filenames.size(), [&](const std::size_t idx) {
|
||||||
|
// TODO: do we need this `1 + ` here?
|
||||||
|
auto local = ParseFile(csv_filenames[idx], 1 + idx);
|
||||||
|
|
||||||
|
{ // Merge local CSV results into a flat global vector
|
||||||
|
tbb::spin_mutex::scoped_lock _{mutex};
|
||||||
|
lookup.insert(end(lookup),
|
||||||
|
std::make_move_iterator(begin(local)),
|
||||||
|
std::make_move_iterator(end(local)));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// With flattened map-ish view of all the files, make a stable sort on key and source
|
||||||
|
// and unique them on key to keep only the value with the largest file index
|
||||||
|
// and the largest line number in a file.
|
||||||
|
// The operands order is swapped to make descending ordering on (key, source)
|
||||||
|
tbb::parallel_sort(begin(lookup), end(lookup), [](const auto &lhs, const auto &rhs) {
|
||||||
|
return std::tie(rhs.first, rhs.second.source) <
|
||||||
|
std::tie(lhs.first, lhs.second.source);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Unique only on key to take the source precedence into account and remove duplicates.
|
||||||
|
const auto it =
|
||||||
|
std::unique(begin(lookup), end(lookup), [](const auto &lhs, const auto &rhs) {
|
||||||
|
return lhs.first == rhs.first;
|
||||||
|
});
|
||||||
|
lookup.erase(it, end(lookup));
|
||||||
|
|
||||||
|
util::Log() << "In total loaded " << csv_filenames.size() << " file(s) with a total of "
|
||||||
|
<< lookup.size() << " unique values";
|
||||||
|
|
||||||
|
return LookupTable<Key, Value>{lookup};
|
||||||
|
}
|
||||||
|
catch (const std::exception &e)
|
||||||
|
// TBB should capture to std::exception_ptr and automatically rethrow in this thread.
|
||||||
|
// https://software.intel.com/en-us/node/506317
|
||||||
|
{
|
||||||
|
throw util::exception(e.what() + SOURCE_REF);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected:
|
||||||
|
// Parse a single CSV file and return result as a vector<Key, Value>
|
||||||
|
virtual std::vector<std::pair<Key, Value>> ParseFile(const std::string &filename, std::size_t file_id) const;
|
||||||
|
};
|
||||||
|
} // namespace updater
|
||||||
|
} // namespace osrm
|
||||||
|
|
||||||
|
#endif
|
||||||
100
include/updater/parquet_file_parser.hpp
Normal file
100
include/updater/parquet_file_parser.hpp
Normal file
@ -0,0 +1,100 @@
|
|||||||
|
#ifndef OSRM_UPDATER_PARQUET_FILE_PARSER_HPP
|
||||||
|
#define OSRM_UPDATER_PARQUET_FILE_PARSER_HPP
|
||||||
|
#include "file_parser.hpp"
|
||||||
|
#include <optional>
|
||||||
|
#include <parquet/arrow/reader.h>
|
||||||
|
#include <parquet/stream_reader.h>
|
||||||
|
#include <arrow/io/file.h>
|
||||||
|
|
||||||
|
#include "updater/source.hpp"
|
||||||
|
|
||||||
|
#include "util/exception.hpp"
|
||||||
|
#include "util/exception_utils.hpp"
|
||||||
|
#include "util/log.hpp"
|
||||||
|
|
||||||
|
#include <parquet/stream_writer.h>
|
||||||
|
#include <tbb/parallel_for.h>
|
||||||
|
#include <tbb/parallel_sort.h>
|
||||||
|
#include <tbb/spin_mutex.h>
|
||||||
|
|
||||||
|
#include <exception>
|
||||||
|
#include <stdexcept>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
namespace osrm
|
||||||
|
{
|
||||||
|
namespace updater
|
||||||
|
{
|
||||||
|
|
||||||
|
// Functor to parse a list of CSV files using "key,value,comment" grammar.
|
||||||
|
// Key and Value structures must be a model of Random Access Sequence.
|
||||||
|
// Also the Value structure must have source member that will be filled
|
||||||
|
// with the corresponding file index in the CSV filenames vector.
|
||||||
|
template <typename Key, typename Value> struct ParquetFilesParser : public FilesParser<Key, Value>
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
// Parse a single CSV file and return result as a vector<Key, Value>
|
||||||
|
std::vector<std::pair<Key, Value>> ParseFile(const std::string &filename, std::size_t file_id) const final
|
||||||
|
{
|
||||||
|
// TODO: error handling
|
||||||
|
std::shared_ptr<arrow::io::ReadableFile> infile;
|
||||||
|
|
||||||
|
PARQUET_ASSIGN_OR_THROW(
|
||||||
|
infile,
|
||||||
|
arrow::io::ReadableFile::Open(filename));
|
||||||
|
|
||||||
|
parquet::StreamReader os{parquet::ParquetFileReader::Open(infile)};
|
||||||
|
|
||||||
|
std::vector<std::pair<Key, Value>> result;
|
||||||
|
while ( !os.eof() )
|
||||||
|
{
|
||||||
|
result.emplace_back(ReadKey(os), ReadValue(os, file_id));
|
||||||
|
// ...
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
// (void)os;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
Key ReadKey(parquet::StreamReader &os) const
|
||||||
|
{
|
||||||
|
Key key;
|
||||||
|
ReadKey(os, key);
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
Value ReadValue(parquet::StreamReader &os, std::size_t file_id) const
|
||||||
|
{
|
||||||
|
Value value;
|
||||||
|
ReadValue(os, value);
|
||||||
|
value.source = file_id;
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ReadKey(parquet::StreamReader &os, Turn& turn) const {
|
||||||
|
os >> turn.from >> turn.via >> turn.to >> parquet::EndRow;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ReadValue(parquet::StreamReader &os, PenaltySource& penalty_source) const {
|
||||||
|
os >> penalty_source.duration >> penalty_source.weight >> parquet::EndRow;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ReadKey(parquet::StreamReader &os, Segment& segment) const {
|
||||||
|
os >> segment.from >> segment.to >> parquet::EndRow;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ReadValue(parquet::StreamReader &os, SpeedSource& speed_source) const {
|
||||||
|
std::optional<double> rate;
|
||||||
|
os >> speed_source.speed >> rate >> parquet::EndRow;
|
||||||
|
// TODO: boost::optional
|
||||||
|
if (rate) {
|
||||||
|
speed_source.rate = *rate;
|
||||||
|
}
|
||||||
|
// os >> turn.weight >> turn.duration >> turn.pre_turn_bearing >> turn.post_turn_bearing >> turn.source >> parquet::EndRow;
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
} // namespace updater
|
||||||
|
} // namespace osrm
|
||||||
|
|
||||||
|
#endif
|
||||||
@ -1,6 +1,7 @@
|
|||||||
#include "updater/csv_source.hpp"
|
#include "updater/csv_source.hpp"
|
||||||
|
|
||||||
#include "updater/csv_file_parser.hpp"
|
#include "updater/csv_file_parser.hpp"
|
||||||
|
#include "updater/parquet_file_parser.hpp"
|
||||||
|
|
||||||
#include <boost/fusion/adapted/std_pair.hpp>
|
#include <boost/fusion/adapted/std_pair.hpp>
|
||||||
#include <boost/fusion/include/adapt_adt.hpp>
|
#include <boost/fusion/include/adapt_adt.hpp>
|
||||||
@ -33,12 +34,9 @@ namespace csv
|
|||||||
{
|
{
|
||||||
SegmentLookupTable readSegmentValues(const std::vector<std::string> &paths)
|
SegmentLookupTable readSegmentValues(const std::vector<std::string> &paths)
|
||||||
{
|
{
|
||||||
static const auto value_if_blank = std::numeric_limits<double>::quiet_NaN();
|
//static const auto value_if_blank = std::numeric_limits<double>::quiet_NaN();
|
||||||
const qi::real_parser<double, qi::ureal_policies<double>> unsigned_double;
|
//const qi::real_parser<double, qi::ureal_policies<double>> unsigned_double;
|
||||||
CSVFilesParser<Segment, SpeedSource> parser(
|
ParquetFilesParser<Segment, SpeedSource> parser;
|
||||||
1,
|
|
||||||
qi::ulong_long >> ',' >> qi::ulong_long,
|
|
||||||
unsigned_double >> -(',' >> (qi::double_ | qi::attr(value_if_blank))));
|
|
||||||
|
|
||||||
// Check consistency of keys in the result lookup table
|
// Check consistency of keys in the result lookup table
|
||||||
auto result = parser(paths);
|
auto result = parser(paths);
|
||||||
@ -57,10 +55,7 @@ SegmentLookupTable readSegmentValues(const std::vector<std::string> &paths)
|
|||||||
|
|
||||||
TurnLookupTable readTurnValues(const std::vector<std::string> &paths)
|
TurnLookupTable readTurnValues(const std::vector<std::string> &paths)
|
||||||
{
|
{
|
||||||
CSVFilesParser<Turn, PenaltySource> parser(1,
|
ParquetFilesParser<Turn, PenaltySource> parser;
|
||||||
qi::ulong_long >> ',' >> qi::ulong_long >> ',' >>
|
|
||||||
qi::ulong_long,
|
|
||||||
qi::double_ >> -(',' >> qi::double_));
|
|
||||||
return parser(paths);
|
return parser(paths);
|
||||||
}
|
}
|
||||||
} // namespace csv
|
} // namespace csv
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user