682 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			682 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
#ifndef OSMIUM_IO_DETAIL_PBF_INPUT_FORMAT_HPP
 | 
						|
#define OSMIUM_IO_DETAIL_PBF_INPUT_FORMAT_HPP
 | 
						|
 | 
						|
/*
 | 
						|
 | 
						|
This file is part of Osmium (http://osmcode.org/libosmium).
 | 
						|
 | 
						|
Copyright 2013,2014 Jochen Topf <jochen@topf.org> and others (see README).
 | 
						|
 | 
						|
Boost Software License - Version 1.0 - August 17th, 2003
 | 
						|
 | 
						|
Permission is hereby granted, free of charge, to any person or organization
 | 
						|
obtaining a copy of the software and accompanying documentation covered by
 | 
						|
this license (the "Software") to use, reproduce, display, distribute,
 | 
						|
execute, and transmit the Software, and to prepare derivative works of the
 | 
						|
Software, and to permit third-parties to whom the Software is furnished to
 | 
						|
do so, all subject to the following:
 | 
						|
 | 
						|
The copyright notices in the Software and this entire statement, including
 | 
						|
the above license grant, this restriction and the following disclaimer,
 | 
						|
must be included in all copies of the Software, in whole or in part, and
 | 
						|
all derivative works of the Software, unless such copies or derivative
 | 
						|
works are solely in the form of machine-executable object code generated by
 | 
						|
a source language processor.
 | 
						|
 | 
						|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 | 
						|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 | 
						|
FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
 | 
						|
SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
 | 
						|
FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
 | 
						|
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
 | 
						|
DEALINGS IN THE SOFTWARE.
 | 
						|
 | 
						|
*/
 | 
						|
 | 
						|
 | 
						|
#include <algorithm>
 | 
						|
#include <atomic>
 | 
						|
#include <cassert>
 | 
						|
#include <chrono>
 | 
						|
#include <cstddef>
 | 
						|
#include <cstdint>
 | 
						|
#include <cstring>
 | 
						|
#include <future>
 | 
						|
#include <memory>
 | 
						|
#include <ratio>
 | 
						|
#include <sstream>
 | 
						|
#include <stdexcept>
 | 
						|
#include <string>
 | 
						|
#include <thread>
 | 
						|
 | 
						|
#include <osmium/builder/osm_object_builder.hpp>
 | 
						|
#include <osmium/io/detail/input_format.hpp>
 | 
						|
#include <osmium/io/detail/pbf.hpp> // IWYU pragma: export
 | 
						|
#include <osmium/io/detail/zlib.hpp>
 | 
						|
#include <osmium/io/file_format.hpp>
 | 
						|
#include <osmium/io/header.hpp>
 | 
						|
#include <osmium/memory/buffer.hpp>
 | 
						|
#include <osmium/osm.hpp>
 | 
						|
#include <osmium/osm/box.hpp>
 | 
						|
#include <osmium/osm/entity_bits.hpp>
 | 
						|
#include <osmium/osm/location.hpp>
 | 
						|
#include <osmium/osm/object.hpp>
 | 
						|
#include <osmium/osm/timestamp.hpp>
 | 
						|
#include <osmium/thread/name.hpp>
 | 
						|
#include <osmium/thread/pool.hpp>
 | 
						|
#include <osmium/thread/queue.hpp>
 | 
						|
 | 
						|
namespace osmium {
 | 
						|
 | 
						|
    namespace io {
 | 
						|
 | 
						|
        class File;
 | 
						|
 | 
						|
        namespace detail {
 | 
						|
 | 
						|
            class PBFPrimitiveBlockParser {
 | 
						|
 | 
						|
                static constexpr size_t initial_buffer_size = 10 * 1024;
 | 
						|
 | 
						|
                const void* m_data;
 | 
						|
                const size_t m_size;
 | 
						|
 | 
						|
                const OSMPBF::StringTable* m_stringtable;
 | 
						|
                int64_t m_lon_offset;
 | 
						|
                int64_t m_lat_offset;
 | 
						|
                int64_t m_date_factor;
 | 
						|
                int32_t m_granularity;
 | 
						|
 | 
						|
                osmium::osm_entity_bits::type m_read_types;
 | 
						|
 | 
						|
                osmium::memory::Buffer m_buffer;
 | 
						|
 | 
						|
                PBFPrimitiveBlockParser(const PBFPrimitiveBlockParser&) = delete;
 | 
						|
                PBFPrimitiveBlockParser(PBFPrimitiveBlockParser&&) = delete;
 | 
						|
 | 
						|
                PBFPrimitiveBlockParser& operator=(const PBFPrimitiveBlockParser&) = delete;
 | 
						|
                PBFPrimitiveBlockParser& operator=(PBFPrimitiveBlockParser&&) = delete;
 | 
						|
 | 
						|
            public:
 | 
						|
 | 
						|
                explicit PBFPrimitiveBlockParser(const void* data, const size_t size, osmium::osm_entity_bits::type read_types) :
 | 
						|
                    m_data(data),
 | 
						|
                    m_size(size),
 | 
						|
                    m_stringtable(nullptr),
 | 
						|
                    m_lon_offset(0),
 | 
						|
                    m_lat_offset(0),
 | 
						|
                    m_date_factor(1000),
 | 
						|
                    m_granularity(100),
 | 
						|
                    m_read_types(read_types),
 | 
						|
                    m_buffer(initial_buffer_size) {
 | 
						|
                }
 | 
						|
 | 
						|
                ~PBFPrimitiveBlockParser() = default;
 | 
						|
 | 
						|
                osmium::memory::Buffer operator()() {
 | 
						|
                    OSMPBF::PrimitiveBlock pbf_primitive_block;
 | 
						|
                    if (!pbf_primitive_block.ParseFromArray(m_data, m_size)) {
 | 
						|
                        throw std::runtime_error("Failed to parse PrimitiveBlock.");
 | 
						|
                    }
 | 
						|
 | 
						|
                    m_stringtable = &pbf_primitive_block.stringtable();
 | 
						|
                    m_lon_offset  = pbf_primitive_block.lon_offset();
 | 
						|
                    m_lat_offset  = pbf_primitive_block.lat_offset();
 | 
						|
                    m_date_factor = pbf_primitive_block.date_granularity() / 1000;
 | 
						|
                    m_granularity = pbf_primitive_block.granularity();
 | 
						|
 | 
						|
                    for (int i=0; i < pbf_primitive_block.primitivegroup_size(); ++i) {
 | 
						|
                        const OSMPBF::PrimitiveGroup& group = pbf_primitive_block.primitivegroup(i);
 | 
						|
 | 
						|
                        if (group.has_dense())  {
 | 
						|
                            if (m_read_types & osmium::osm_entity_bits::node) parse_dense_node_group(group);
 | 
						|
                        } else if (group.ways_size() != 0) {
 | 
						|
                            if (m_read_types & osmium::osm_entity_bits::way) parse_way_group(group);
 | 
						|
                        } else if (group.relations_size() != 0) {
 | 
						|
                            if (m_read_types & osmium::osm_entity_bits::relation) parse_relation_group(group);
 | 
						|
                        } else if (group.nodes_size() != 0) {
 | 
						|
                            if (m_read_types & osmium::osm_entity_bits::node) parse_node_group(group);
 | 
						|
                        } else {
 | 
						|
                            throw std::runtime_error("Group of unknown type.");
 | 
						|
                        }
 | 
						|
                    }
 | 
						|
 | 
						|
                    return std::move(m_buffer);
 | 
						|
                }
 | 
						|
 | 
						|
            private:
 | 
						|
 | 
						|
                template <class TBuilder, class TPBFObject>
 | 
						|
                void parse_attributes(TBuilder& builder, const TPBFObject& pbf_object) {
 | 
						|
                    auto& object = builder.object();
 | 
						|
 | 
						|
                    object.id(pbf_object.id());
 | 
						|
 | 
						|
                    if (pbf_object.has_info()) {
 | 
						|
                        object.version(pbf_object.info().version())
 | 
						|
                            .changeset(pbf_object.info().changeset())
 | 
						|
                            .timestamp(pbf_object.info().timestamp() * m_date_factor)
 | 
						|
                            .uid_from_signed(pbf_object.info().uid());
 | 
						|
                        if (pbf_object.info().has_visible()) {
 | 
						|
                            object.visible(pbf_object.info().visible());
 | 
						|
                        }
 | 
						|
                        builder.add_user(m_stringtable->s(pbf_object.info().user_sid()).data());
 | 
						|
                    } else {
 | 
						|
                        builder.add_user("");
 | 
						|
                    }
 | 
						|
                }
 | 
						|
 | 
						|
                void parse_node_group(const OSMPBF::PrimitiveGroup& group) {
 | 
						|
                    for (int i=0; i < group.nodes_size(); ++i) {
 | 
						|
                        osmium::builder::NodeBuilder builder(m_buffer);
 | 
						|
                        const OSMPBF::Node& pbf_node = group.nodes(i);
 | 
						|
                        parse_attributes(builder, pbf_node);
 | 
						|
 | 
						|
                        if (builder.object().visible()) {
 | 
						|
                            builder.object().location(osmium::Location(
 | 
						|
                                              (pbf_node.lon() * m_granularity + m_lon_offset) / (OSMPBF::lonlat_resolution / osmium::Location::coordinate_precision),
 | 
						|
                                              (pbf_node.lat() * m_granularity + m_lat_offset) / (OSMPBF::lonlat_resolution / osmium::Location::coordinate_precision)));
 | 
						|
                        }
 | 
						|
 | 
						|
                        if (pbf_node.keys_size() > 0) {
 | 
						|
                            osmium::builder::TagListBuilder tl_builder(m_buffer, &builder);
 | 
						|
                            for (int tag=0; tag < pbf_node.keys_size(); ++tag) {
 | 
						|
                                tl_builder.add_tag(m_stringtable->s(static_cast<int>(pbf_node.keys(tag))).data(),
 | 
						|
                                                   m_stringtable->s(static_cast<int>(pbf_node.vals(tag))).data());
 | 
						|
                            }
 | 
						|
                        }
 | 
						|
 | 
						|
                        m_buffer.commit();
 | 
						|
                    }
 | 
						|
                }
 | 
						|
 | 
						|
                void parse_way_group(const OSMPBF::PrimitiveGroup& group) {
 | 
						|
                    for (int i=0; i < group.ways_size(); ++i) {
 | 
						|
                        osmium::builder::WayBuilder builder(m_buffer);
 | 
						|
                        const OSMPBF::Way& pbf_way = group.ways(i);
 | 
						|
                        parse_attributes(builder, pbf_way);
 | 
						|
 | 
						|
                        if (pbf_way.refs_size() > 0) {
 | 
						|
                            osmium::builder::WayNodeListBuilder wnl_builder(m_buffer, &builder);
 | 
						|
                            int64_t ref = 0;
 | 
						|
                            for (int n=0; n < pbf_way.refs_size(); ++n) {
 | 
						|
                                ref += pbf_way.refs(n);
 | 
						|
                                wnl_builder.add_node_ref(ref);
 | 
						|
                            }
 | 
						|
                        }
 | 
						|
 | 
						|
                        if (pbf_way.keys_size() > 0) {
 | 
						|
                            osmium::builder::TagListBuilder tl_builder(m_buffer, &builder);
 | 
						|
                            for (int tag=0; tag < pbf_way.keys_size(); ++tag) {
 | 
						|
                                tl_builder.add_tag(m_stringtable->s(static_cast<int>(pbf_way.keys(tag))).data(),
 | 
						|
                                                   m_stringtable->s(static_cast<int>(pbf_way.vals(tag))).data());
 | 
						|
                            }
 | 
						|
                        }
 | 
						|
 | 
						|
                        m_buffer.commit();
 | 
						|
                    }
 | 
						|
                }
 | 
						|
 | 
						|
                void parse_relation_group(const OSMPBF::PrimitiveGroup& group) {
 | 
						|
                    for (int i=0; i < group.relations_size(); ++i) {
 | 
						|
                        osmium::builder::RelationBuilder builder(m_buffer);
 | 
						|
                        const OSMPBF::Relation& pbf_relation = group.relations(i);
 | 
						|
                        parse_attributes(builder, pbf_relation);
 | 
						|
 | 
						|
                        if (pbf_relation.types_size() > 0) {
 | 
						|
                            osmium::builder::RelationMemberListBuilder rml_builder(m_buffer, &builder);
 | 
						|
                            int64_t ref = 0;
 | 
						|
                            for (int n=0; n < pbf_relation.types_size(); ++n) {
 | 
						|
                                ref += pbf_relation.memids(n);
 | 
						|
                                rml_builder.add_member(osmpbf_membertype_to_item_type(pbf_relation.types(n)), ref, m_stringtable->s(pbf_relation.roles_sid(n)).data());
 | 
						|
                            }
 | 
						|
                        }
 | 
						|
 | 
						|
                        if (pbf_relation.keys_size() > 0) {
 | 
						|
                            osmium::builder::TagListBuilder tl_builder(m_buffer, &builder);
 | 
						|
                            for (int tag=0; tag < pbf_relation.keys_size(); ++tag) {
 | 
						|
                                tl_builder.add_tag(m_stringtable->s(static_cast<int>(pbf_relation.keys(tag))).data(),
 | 
						|
                                                   m_stringtable->s(static_cast<int>(pbf_relation.vals(tag))).data());
 | 
						|
                            }
 | 
						|
                        }
 | 
						|
 | 
						|
                        m_buffer.commit();
 | 
						|
                    }
 | 
						|
                }
 | 
						|
 | 
						|
                int add_tags(const OSMPBF::DenseNodes& dense, int n, osmium::builder::NodeBuilder* builder) {
 | 
						|
                    if (n >= dense.keys_vals_size()) {
 | 
						|
                        return n;
 | 
						|
                    }
 | 
						|
 | 
						|
                    if (dense.keys_vals(n) == 0) {
 | 
						|
                        return n+1;
 | 
						|
                    }
 | 
						|
 | 
						|
                    osmium::builder::TagListBuilder tl_builder(m_buffer, builder);
 | 
						|
 | 
						|
                    while (n < dense.keys_vals_size()) {
 | 
						|
                        int tag_key_pos = dense.keys_vals(n++);
 | 
						|
 | 
						|
                        if (tag_key_pos == 0) {
 | 
						|
                            break;
 | 
						|
                        }
 | 
						|
 | 
						|
                        tl_builder.add_tag(m_stringtable->s(tag_key_pos).data(),
 | 
						|
                                           m_stringtable->s(dense.keys_vals(n)).data());
 | 
						|
 | 
						|
                        ++n;
 | 
						|
                    }
 | 
						|
 | 
						|
                    return n;
 | 
						|
                }
 | 
						|
 | 
						|
                void parse_dense_node_group(const OSMPBF::PrimitiveGroup& group) {
 | 
						|
                    int64_t last_dense_id        = 0;
 | 
						|
                    int64_t last_dense_latitude  = 0;
 | 
						|
                    int64_t last_dense_longitude = 0;
 | 
						|
                    int64_t last_dense_uid       = 0;
 | 
						|
                    int64_t last_dense_user_sid  = 0;
 | 
						|
                    int64_t last_dense_changeset = 0;
 | 
						|
                    int64_t last_dense_timestamp = 0;
 | 
						|
                    int     last_dense_tag       = 0;
 | 
						|
 | 
						|
                    const OSMPBF::DenseNodes& dense = group.dense();
 | 
						|
 | 
						|
                    for (int i=0; i < dense.id_size(); ++i) {
 | 
						|
                        bool visible = true;
 | 
						|
 | 
						|
                        last_dense_id        += dense.id(i);
 | 
						|
                        last_dense_latitude  += dense.lat(i);
 | 
						|
                        last_dense_longitude += dense.lon(i);
 | 
						|
 | 
						|
                        if (dense.has_denseinfo()) {
 | 
						|
                            last_dense_changeset += dense.denseinfo().changeset(i);
 | 
						|
                            last_dense_timestamp += dense.denseinfo().timestamp(i);
 | 
						|
                            last_dense_uid       += dense.denseinfo().uid(i);
 | 
						|
                            last_dense_user_sid  += dense.denseinfo().user_sid(i);
 | 
						|
                            if (dense.denseinfo().visible_size() > 0) {
 | 
						|
                                visible = dense.denseinfo().visible(i);
 | 
						|
                            }
 | 
						|
                            assert(last_dense_changeset >= 0);
 | 
						|
                            assert(last_dense_timestamp >= 0);
 | 
						|
                            assert(last_dense_uid >= -1);
 | 
						|
                            assert(last_dense_user_sid >= 0);
 | 
						|
                        }
 | 
						|
 | 
						|
                        osmium::builder::NodeBuilder builder(m_buffer);
 | 
						|
                        osmium::Node& node = builder.object();
 | 
						|
 | 
						|
                        node.id(last_dense_id);
 | 
						|
 | 
						|
                        if (dense.has_denseinfo()) {
 | 
						|
                            auto v = dense.denseinfo().version(i);
 | 
						|
                            assert(v > 0);
 | 
						|
                            node.version(static_cast<osmium::object_version_type>(v));
 | 
						|
                            node.changeset(last_dense_changeset);
 | 
						|
                            node.timestamp(last_dense_timestamp * m_date_factor);
 | 
						|
                            node.uid_from_signed(last_dense_uid);
 | 
						|
                            node.visible(visible);
 | 
						|
                            builder.add_user(m_stringtable->s(last_dense_user_sid).data());
 | 
						|
                        } else {
 | 
						|
                            builder.add_user("");
 | 
						|
                        }
 | 
						|
 | 
						|
                        if (visible) {
 | 
						|
                            builder.object().location(osmium::Location(
 | 
						|
                                              (last_dense_longitude * m_granularity + m_lon_offset) / (OSMPBF::lonlat_resolution / osmium::Location::coordinate_precision),
 | 
						|
                                              (last_dense_latitude  * m_granularity + m_lat_offset) / (OSMPBF::lonlat_resolution / osmium::Location::coordinate_precision)));
 | 
						|
                        }
 | 
						|
 | 
						|
                        last_dense_tag = add_tags(dense, last_dense_tag, &builder);
 | 
						|
                        m_buffer.commit();
 | 
						|
                    }
 | 
						|
                }
 | 
						|
 | 
						|
            }; // class PBFPrimitiveBlockParser
 | 
						|
 | 
						|
            typedef osmium::thread::Queue<std::future<osmium::memory::Buffer>> queue_type;
 | 
						|
 | 
						|
            class InputQueueReader {
 | 
						|
 | 
						|
                osmium::thread::Queue<std::string>& m_queue;
 | 
						|
                std::string m_buffer;
 | 
						|
 | 
						|
            public:
 | 
						|
 | 
						|
                InputQueueReader(osmium::thread::Queue<std::string>& queue) :
 | 
						|
                    m_queue(queue) {
 | 
						|
                }
 | 
						|
 | 
						|
                bool operator()(unsigned char* data, size_t size) {
 | 
						|
                    while (m_buffer.size() < size) {
 | 
						|
                        std::string new_data;
 | 
						|
                        m_queue.wait_and_pop(new_data);
 | 
						|
                        if (new_data.empty()) {
 | 
						|
                            return false;
 | 
						|
                        }
 | 
						|
                        m_buffer += new_data;
 | 
						|
                    }
 | 
						|
                    memcpy(data, m_buffer.data(), size);
 | 
						|
                    m_buffer.erase(0, size);
 | 
						|
                    return true;
 | 
						|
                }
 | 
						|
 | 
						|
            };
 | 
						|
 | 
						|
            template <class TDerived>
 | 
						|
            class BlobParser {
 | 
						|
 | 
						|
            protected:
 | 
						|
 | 
						|
                std::shared_ptr<unsigned char> m_input_buffer;
 | 
						|
                const int m_size;
 | 
						|
                const int m_blob_num;
 | 
						|
                InputQueueReader& m_input_queue_reader;
 | 
						|
 | 
						|
                BlobParser(const int size, const int blob_num, InputQueueReader& input_queue_reader) :
 | 
						|
                    m_input_buffer(new unsigned char[size], [](unsigned char* ptr) { delete[] ptr; }),
 | 
						|
                    m_size(size),
 | 
						|
                    m_blob_num(blob_num),
 | 
						|
                    m_input_queue_reader(input_queue_reader) {
 | 
						|
                    if (size < 0 || size > OSMPBF::max_uncompressed_blob_size) {
 | 
						|
                        std::ostringstream errmsg;
 | 
						|
                        errmsg << "invalid blob size: " << size;
 | 
						|
                        throw std::runtime_error(errmsg.str());
 | 
						|
                    }
 | 
						|
                    if (! input_queue_reader(m_input_buffer.get(), size)) {
 | 
						|
                        // EOF
 | 
						|
                        throw std::runtime_error("read error (EOF)");
 | 
						|
                    }
 | 
						|
                }
 | 
						|
 | 
						|
            public:
 | 
						|
 | 
						|
                void doit() {
 | 
						|
                    OSMPBF::Blob pbf_blob;
 | 
						|
                    if (!pbf_blob.ParseFromArray(m_input_buffer.get(), m_size)) {
 | 
						|
                        throw std::runtime_error("failed to parse blob");
 | 
						|
                    }
 | 
						|
 | 
						|
                    if (pbf_blob.has_raw()) {
 | 
						|
                        static_cast<TDerived*>(this)->handle_blob(pbf_blob.raw());
 | 
						|
                        return;
 | 
						|
                    } else if (pbf_blob.has_zlib_data()) {
 | 
						|
                        auto raw_size = pbf_blob.raw_size();
 | 
						|
                        assert(raw_size >= 0);
 | 
						|
                        assert(raw_size <= OSMPBF::max_uncompressed_blob_size);
 | 
						|
 | 
						|
                        std::string unpack_buffer { osmium::io::detail::zlib_uncompress(pbf_blob.zlib_data(), static_cast<unsigned long>(raw_size)) };
 | 
						|
                        static_cast<TDerived*>(this)->handle_blob(unpack_buffer);
 | 
						|
                        return;
 | 
						|
                    } else if (pbf_blob.has_lzma_data()) {
 | 
						|
                        throw std::runtime_error("lzma blobs not implemented");
 | 
						|
                    } else {
 | 
						|
                        throw std::runtime_error("Blob contains no data");
 | 
						|
                    }
 | 
						|
                }
 | 
						|
 | 
						|
                osmium::memory::Buffer operator()() {
 | 
						|
                    OSMPBF::Blob pbf_blob;
 | 
						|
                    if (!pbf_blob.ParseFromArray(m_input_buffer.get(), m_size)) {
 | 
						|
                        throw std::runtime_error("failed to parse blob");
 | 
						|
                    }
 | 
						|
 | 
						|
                    if (pbf_blob.has_raw()) {
 | 
						|
                        return static_cast<TDerived*>(this)->handle_blob(pbf_blob.raw());
 | 
						|
                    } else if (pbf_blob.has_zlib_data()) {
 | 
						|
                        auto raw_size = pbf_blob.raw_size();
 | 
						|
                        assert(raw_size >= 0);
 | 
						|
                        assert(raw_size <= OSMPBF::max_uncompressed_blob_size);
 | 
						|
 | 
						|
                        std::string unpack_buffer { osmium::io::detail::zlib_uncompress(pbf_blob.zlib_data(), static_cast<unsigned long>(raw_size)) };
 | 
						|
                        return static_cast<TDerived*>(this)->handle_blob(unpack_buffer);
 | 
						|
                    } else if (pbf_blob.has_lzma_data()) {
 | 
						|
                        throw std::runtime_error("lzma blobs not implemented");
 | 
						|
                    } else {
 | 
						|
                        throw std::runtime_error("Blob contains no data");
 | 
						|
                    }
 | 
						|
                }
 | 
						|
 | 
						|
            }; // class BlobParser;
 | 
						|
 | 
						|
            class HeaderBlobParser : public BlobParser<HeaderBlobParser> {
 | 
						|
 | 
						|
                osmium::io::Header& m_header;
 | 
						|
 | 
						|
                void handle_blob(const std::string& data) {
 | 
						|
                    OSMPBF::HeaderBlock pbf_header_block;
 | 
						|
                    if (!pbf_header_block.ParseFromArray(data.data(), data.size())) {
 | 
						|
                        throw std::runtime_error("Failed to parse HeaderBlock.");
 | 
						|
                    }
 | 
						|
 | 
						|
                    for (int i=0; i < pbf_header_block.required_features_size(); ++i) {
 | 
						|
                        const std::string& feature = pbf_header_block.required_features(i);
 | 
						|
 | 
						|
                        if (feature == "OsmSchema-V0.6") continue;
 | 
						|
                        if (feature == "DenseNodes") {
 | 
						|
                            m_header.set("pbf_dense_nodes", true);
 | 
						|
                            continue;
 | 
						|
                        }
 | 
						|
                        if (feature == "HistoricalInformation") {
 | 
						|
                            m_header.has_multiple_object_versions(true);
 | 
						|
                            continue;
 | 
						|
                        }
 | 
						|
 | 
						|
                        std::ostringstream errmsg;
 | 
						|
                        errmsg << "Required feature not supported: " << feature;
 | 
						|
                        throw std::runtime_error(errmsg.str());
 | 
						|
                    }
 | 
						|
 | 
						|
                    if (pbf_header_block.has_writingprogram()) {
 | 
						|
                        m_header.set("generator", pbf_header_block.writingprogram());
 | 
						|
                    }
 | 
						|
 | 
						|
                    if (pbf_header_block.has_bbox()) {
 | 
						|
                        const OSMPBF::HeaderBBox& pbf_bbox = pbf_header_block.bbox();
 | 
						|
                        const int64_t resolution_convert = OSMPBF::lonlat_resolution / osmium::Location::coordinate_precision;
 | 
						|
                        osmium::Box box;
 | 
						|
                        box.extend(osmium::Location(pbf_bbox.left()  / resolution_convert, pbf_bbox.bottom() / resolution_convert));
 | 
						|
                        box.extend(osmium::Location(pbf_bbox.right() / resolution_convert, pbf_bbox.top()    / resolution_convert));
 | 
						|
                        m_header.add_box(box);
 | 
						|
                    }
 | 
						|
 | 
						|
                    if (pbf_header_block.has_osmosis_replication_timestamp()) {
 | 
						|
                        m_header.set("osmosis_replication_timestamp", osmium::Timestamp(pbf_header_block.osmosis_replication_timestamp()).to_iso());
 | 
						|
                    }
 | 
						|
 | 
						|
                    if (pbf_header_block.has_osmosis_replication_sequence_number()) {
 | 
						|
                        m_header.set("osmosis_replication_sequence_number", std::to_string(pbf_header_block.osmosis_replication_sequence_number()));
 | 
						|
                    }
 | 
						|
 | 
						|
                    if (pbf_header_block.has_osmosis_replication_base_url()) {
 | 
						|
                        m_header.set("osmosis_replication_base_url", pbf_header_block.osmosis_replication_base_url());
 | 
						|
                    }
 | 
						|
                }
 | 
						|
 | 
						|
            public:
 | 
						|
 | 
						|
                friend class BlobParser<HeaderBlobParser>;
 | 
						|
 | 
						|
                HeaderBlobParser(const int size, InputQueueReader& input_queue_reader, osmium::io::Header& header) :
 | 
						|
                    BlobParser(size, 0, input_queue_reader),
 | 
						|
                    m_header(header) {
 | 
						|
                }
 | 
						|
 | 
						|
            }; // class HeaderBlobParser
 | 
						|
 | 
						|
            class DataBlobParser : public BlobParser<DataBlobParser> {
 | 
						|
 | 
						|
                osmium::osm_entity_bits::type m_read_types;
 | 
						|
 | 
						|
                osmium::memory::Buffer handle_blob(const std::string& data) {
 | 
						|
                    PBFPrimitiveBlockParser parser(data.data(), data.size(), m_read_types);
 | 
						|
                    return std::move(parser());
 | 
						|
                }
 | 
						|
 | 
						|
            public:
 | 
						|
 | 
						|
                friend class BlobParser<DataBlobParser>;
 | 
						|
 | 
						|
                DataBlobParser(const int size, const int blob_num, InputQueueReader& input_queue_reader, osmium::osm_entity_bits::type read_types) :
 | 
						|
                    BlobParser(size, blob_num, input_queue_reader),
 | 
						|
                    m_read_types(read_types) {
 | 
						|
                }
 | 
						|
 | 
						|
            }; // class DataBlobParser
 | 
						|
 | 
						|
            /**
 | 
						|
             * Class for parsing PBF files.
 | 
						|
             */
 | 
						|
            class PBFInputFormat : public osmium::io::detail::InputFormat {
 | 
						|
 | 
						|
                bool m_use_thread_pool;
 | 
						|
                queue_type m_queue;
 | 
						|
                const size_t m_max_work_queue_size;
 | 
						|
                const size_t m_max_buffer_queue_size;
 | 
						|
                std::atomic<bool> m_done;
 | 
						|
                std::thread m_reader;
 | 
						|
                OSMPBF::BlobHeader m_blob_header;
 | 
						|
                InputQueueReader m_input_queue_reader;
 | 
						|
 | 
						|
                /**
 | 
						|
                 * Read BlobHeader by first reading the size and then the BlobHeader.
 | 
						|
                 * The BlobHeader contains a type field (which is checked against
 | 
						|
                 * the expected type) and a size field.
 | 
						|
                 *
 | 
						|
                 * @param expected_type Expected type of data ("OSMHeader" or "OSMData").
 | 
						|
                 * @return Size of the data read from BlobHeader (0 on EOF).
 | 
						|
                 */
 | 
						|
                size_t read_blob_header(const char* expected_type) {
 | 
						|
                    uint32_t size_in_network_byte_order;
 | 
						|
 | 
						|
                    if (! m_input_queue_reader(reinterpret_cast<unsigned char*>(&size_in_network_byte_order), sizeof(size_in_network_byte_order))) {
 | 
						|
                        return 0; // EOF
 | 
						|
                    }
 | 
						|
 | 
						|
                    uint32_t size = ntohl(size_in_network_byte_order);
 | 
						|
                    if (size > static_cast<uint32_t>(OSMPBF::max_blob_header_size)) {
 | 
						|
                        throw std::runtime_error("Invalid BlobHeader size");
 | 
						|
                    }
 | 
						|
 | 
						|
                    unsigned char blob_header_buffer[OSMPBF::max_blob_header_size];
 | 
						|
                    if (! m_input_queue_reader(blob_header_buffer, size)) {
 | 
						|
                        throw std::runtime_error("Read error.");
 | 
						|
                    }
 | 
						|
 | 
						|
                    if (!m_blob_header.ParseFromArray(blob_header_buffer, static_cast<int>(size))) {
 | 
						|
                        throw std::runtime_error("Failed to parse BlobHeader.");
 | 
						|
                    }
 | 
						|
 | 
						|
                    if (std::strcmp(m_blob_header.type().c_str(), expected_type)) {
 | 
						|
                        throw std::runtime_error("Blob does not have expected type (OSMHeader in first Blob, OSMData in following Blobs).");
 | 
						|
                    }
 | 
						|
 | 
						|
                    return static_cast<size_t>(m_blob_header.datasize());
 | 
						|
                }
 | 
						|
 | 
						|
                void parse_osm_data(osmium::osm_entity_bits::type read_types) {
 | 
						|
                    osmium::thread::set_thread_name("_osmium_pbf_in");
 | 
						|
                    int n=0;
 | 
						|
                    while (size_t size = read_blob_header("OSMData")) {
 | 
						|
                        DataBlobParser data_blob_parser(size, n, m_input_queue_reader, read_types);
 | 
						|
 | 
						|
                        if (m_use_thread_pool) {
 | 
						|
                            m_queue.push(osmium::thread::Pool::instance().submit(data_blob_parser));
 | 
						|
 | 
						|
                            // if the work queue is getting too large, wait for a while
 | 
						|
                            while (!m_done && osmium::thread::Pool::instance().queue_size() >= m_max_work_queue_size) {
 | 
						|
                                std::this_thread::sleep_for(std::chrono::milliseconds(10));
 | 
						|
                            }
 | 
						|
                        } else {
 | 
						|
                            std::promise<osmium::memory::Buffer> promise;
 | 
						|
                            m_queue.push(promise.get_future());
 | 
						|
                            promise.set_value(data_blob_parser());
 | 
						|
                        }
 | 
						|
                        ++n;
 | 
						|
 | 
						|
                        // wait if the backlog of buffers with parsed data is too large
 | 
						|
                        while (!m_done && m_queue.size() > m_max_buffer_queue_size) {
 | 
						|
                            std::this_thread::sleep_for(std::chrono::milliseconds(10));
 | 
						|
                        }
 | 
						|
 | 
						|
                        if (m_done) {
 | 
						|
                            return;
 | 
						|
                        }
 | 
						|
                    }
 | 
						|
                    m_done = true;
 | 
						|
                }
 | 
						|
 | 
						|
            public:
 | 
						|
 | 
						|
                /**
 | 
						|
                 * Instantiate PBF Parser
 | 
						|
                 *
 | 
						|
                 * @param file osmium::io::File instance describing file to be read from.
 | 
						|
                 * @param read_which_entities Which types of OSM entities (nodes, ways, relations, changesets) should be parsed?
 | 
						|
                 * @param input_queue String queue where data is read from.
 | 
						|
                 */
 | 
						|
                PBFInputFormat(const osmium::io::File& file, osmium::osm_entity_bits::type read_which_entities, osmium::thread::Queue<std::string>& input_queue) :
 | 
						|
                    osmium::io::detail::InputFormat(file, read_which_entities, input_queue),
 | 
						|
                    m_use_thread_pool(true),
 | 
						|
                    m_queue(),
 | 
						|
                    m_max_work_queue_size(10), // XXX tune these settings
 | 
						|
                    m_max_buffer_queue_size(20), // XXX tune these settings
 | 
						|
                    m_done(false),
 | 
						|
                    m_input_queue_reader(input_queue) {
 | 
						|
                    GOOGLE_PROTOBUF_VERIFY_VERSION;
 | 
						|
 | 
						|
                    // handle OSMHeader
 | 
						|
                    size_t size = read_blob_header("OSMHeader");
 | 
						|
 | 
						|
                    {
 | 
						|
                        HeaderBlobParser header_blob_parser(size, m_input_queue_reader, m_header);
 | 
						|
                        header_blob_parser.doit();
 | 
						|
                    }
 | 
						|
 | 
						|
                    if (m_read_which_entities != osmium::osm_entity_bits::nothing) {
 | 
						|
                        m_reader = std::thread(&PBFInputFormat::parse_osm_data, this, m_read_which_entities);
 | 
						|
                    }
 | 
						|
                }
 | 
						|
 | 
						|
                ~PBFInputFormat() {
 | 
						|
                    m_done = true;
 | 
						|
                    if (m_reader.joinable()) {
 | 
						|
                        m_reader.join();
 | 
						|
                    }
 | 
						|
                }
 | 
						|
 | 
						|
                /**
 | 
						|
                 * Returns the next buffer with OSM data read from the PBF file.
 | 
						|
                 * Blocks if data is not available yet.
 | 
						|
                 * Returns an empty buffer at end of input.
 | 
						|
                 */
 | 
						|
                osmium::memory::Buffer read() override {
 | 
						|
                    if (!m_done || !m_queue.empty()) {
 | 
						|
                        std::future<osmium::memory::Buffer> buffer_future;
 | 
						|
                        m_queue.wait_and_pop(buffer_future);
 | 
						|
                        return std::move(buffer_future.get());
 | 
						|
                    }
 | 
						|
 | 
						|
                    return osmium::memory::Buffer();
 | 
						|
                }
 | 
						|
 | 
						|
            }; // class PBFInputFormat
 | 
						|
 | 
						|
            namespace {
 | 
						|
 | 
						|
                const bool registered_pbf_input = osmium::io::detail::InputFormatFactory::instance().register_input_format(osmium::io::file_format::pbf,
 | 
						|
                    [](const osmium::io::File& file, osmium::osm_entity_bits::type read_which_entities, osmium::thread::Queue<std::string>& input_queue) {
 | 
						|
                        return new osmium::io::detail::PBFInputFormat(file, read_which_entities, input_queue);
 | 
						|
                });
 | 
						|
 | 
						|
            } // anonymous namespace
 | 
						|
 | 
						|
        } // namespace detail
 | 
						|
 | 
						|
    } // namespace io
 | 
						|
 | 
						|
} // namespace osmium
 | 
						|
 | 
						|
#endif // OSMIUM_IO_DETAIL_PBF_INPUT_FORMAT_HPP
 |