From c69545c47ac5055af688128096e192d0c34eb9d2 Mon Sep 17 00:00:00 2001 From: Patrick Niklaus Date: Sat, 8 Oct 2016 01:52:47 +0200 Subject: [PATCH] Don't block all requests to switch data This switchtes the data even if there are requests still running on the old data. osrm-datastore then waits until all of these old requests have finished before freeing the old regions. This also means that osrm-datastore will return with an error if there is a data update currenlty in progress. --- include/engine/data_watchdog.hpp | 29 +++-- .../engine/datafacade/shared_datafacade.hpp | 34 ++++-- include/storage/shared_barriers.hpp | 16 +-- src/engine/engine.cpp | 34 ++---- src/storage/storage.cpp | 115 +++++++++++++----- src/tools/unlock_all_mutexes.cpp | 7 +- 6 files changed, 150 insertions(+), 85 deletions(-) diff --git a/include/engine/data_watchdog.hpp b/include/engine/data_watchdog.hpp index ea83b61c7..884dff4f1 100644 --- a/include/engine/data_watchdog.hpp +++ b/include/engine/data_watchdog.hpp @@ -6,6 +6,8 @@ #include "storage/shared_datatype.hpp" #include "storage/shared_memory.hpp" +#include +#include #include #include @@ -28,7 +30,8 @@ class DataWatchdog { public: DataWatchdog() - : shared_regions(storage::makeSharedMemoryView(storage::CURRENT_REGIONS)), + : shared_barriers{std::make_shared()}, + shared_regions(storage::makeSharedMemoryView(storage::CURRENT_REGIONS)), current_timestamp{storage::LAYOUT_NONE, storage::DATA_NONE, 0} { } @@ -42,7 +45,8 @@ class DataWatchdog // Check if it might be worth to try to aquire a exclusive lock bool HasNewRegion() const { - const boost::shared_lock lock(current_timestamp_mutex); + const boost::interprocess::sharable_lock lock( + shared_barriers->current_regions_mutex); const auto shared_timestamp = static_cast(shared_regions->Ptr()); @@ -59,11 +63,14 @@ class DataWatchdog // if the update was already done by another thread void MaybeLoadNewRegion(std::shared_ptr &facade) { - const boost::lock_guard lock(current_timestamp_mutex); + const boost::interprocess::sharable_lock lock( + shared_barriers->current_regions_mutex); const auto shared_timestamp = static_cast(shared_regions->Ptr()); + boost::upgrade_lock facade_lock(facade_mutex); + // if more then one request tried to aquire the write lock // we might get overtaken before we actually do the writing // in that case we don't modify anthing @@ -75,23 +82,29 @@ class DataWatchdog // this thread has won and can update the data else { + boost::upgrade_to_unique_lock unique_facade_lock(facade_lock); + current_timestamp = *shared_timestamp; // TODO remove once we allow for more then one SharedMemoryFacade at the same time // at this point no other query is allowed to reference this facade! // the old facade will die exactly here BOOST_ASSERT(!facade || facade.use_count() == 1); - facade = std::make_shared( - current_timestamp.layout, current_timestamp.data, current_timestamp.timestamp); + facade = std::make_shared(shared_barriers, + current_timestamp.layout, + current_timestamp.data, + current_timestamp.timestamp); } } private: + // mutexes should be mutable even on const objects: This enables + // marking functions as logical const and thread-safe. + std::shared_ptr shared_barriers; + // shared memory table containing pointers to all shared regions std::unique_ptr shared_regions; - // mutexes should be mutable even on const objects: This enables - // marking functions as logical const and thread-safe. - mutable boost::shared_mutex current_timestamp_mutex; + mutable boost::shared_mutex facade_mutex; storage::SharedDataTimestamp current_timestamp; }; } diff --git a/include/engine/datafacade/shared_datafacade.hpp b/include/engine/datafacade/shared_datafacade.hpp index b07e6e951..26ad03b7a 100644 --- a/include/engine/datafacade/shared_datafacade.hpp +++ b/include/engine/datafacade/shared_datafacade.hpp @@ -3,6 +3,7 @@ // implements all data storage when shared memory _IS_ used +#include "storage/shared_barriers.hpp" #include "storage/shared_datatype.hpp" #include "storage/shared_memory.hpp" #include "engine/datafacade/datafacade_base.hpp" @@ -24,8 +25,12 @@ #include "util/static_rtree.hpp" #include "util/typedefs.hpp" -#include +#include +#include +#include +#include +#include #include #include #include @@ -34,10 +39,6 @@ #include #include -#include -#include -#include -#include namespace osrm { @@ -65,9 +66,11 @@ class SharedDataFacade final : public BaseDataFacade storage::SharedDataLayout *data_layout; char *shared_memory; + std::shared_ptr shared_barriers; storage::SharedDataType layout_region; storage::SharedDataType data_region; unsigned shared_timestamp; + boost::interprocess::sharable_lock regions_lock; unsigned m_check_sum; std::unique_ptr m_query_graph; @@ -147,8 +150,7 @@ class SharedDataFacade final : public BaseDataFacade file_index_path = boost::filesystem::path(file_index_ptr); if (!boost::filesystem::exists(file_index_path)) { - util::SimpleLogger().Write(logDEBUG) << "Leaf file name " - << file_index_path.string(); + util::SimpleLogger().Write(logDEBUG) << "Leaf file name " << file_index_path.string(); throw util::exception("Could not load leaf index file. " "Is any data loaded into shared memory?"); } @@ -382,18 +384,26 @@ class SharedDataFacade final : public BaseDataFacade public: virtual ~SharedDataFacade() {} - SharedDataFacade(storage::SharedDataType layout_region_, storage::SharedDataType data_region_, unsigned shared_timestamp_) - : layout_region(layout_region_), data_region(data_region_), shared_timestamp(shared_timestamp_) + SharedDataFacade(const std::shared_ptr &shared_barriers_, + storage::SharedDataType layout_region_, + storage::SharedDataType data_region_, + unsigned shared_timestamp_) + : shared_barriers(shared_barriers_), + layout_region(layout_region_), data_region(data_region_), + shared_timestamp(shared_timestamp_), + regions_lock(layout_region == storage::LAYOUT_1 ? shared_barriers->regions_1_mutex + : shared_barriers->regions_2_mutex) { - util::SimpleLogger().Write(logDEBUG) << "Loading new data with shared timestamp " << shared_timestamp; + util::SimpleLogger().Write(logDEBUG) << "Loading new data with shared timestamp " + << shared_timestamp; BOOST_ASSERT(storage::SharedMemory::RegionExists(layout_region)); - m_layout_memory = storage::makeOwnedSharedMemoryView(layout_region); + m_layout_memory = storage::makeSharedMemoryView(layout_region); data_layout = static_cast(m_layout_memory->Ptr()); BOOST_ASSERT(storage::SharedMemory::RegionExists(data_region)); - m_large_memory = storage::makeOwnedSharedMemoryView(data_region); + m_large_memory = storage::makeSharedMemoryView(data_region); shared_memory = (char *)(m_large_memory->Ptr()); LoadGraph(); diff --git a/include/storage/shared_barriers.hpp b/include/storage/shared_barriers.hpp index 0cb7f84d7..d9bdb3aeb 100644 --- a/include/storage/shared_barriers.hpp +++ b/include/storage/shared_barriers.hpp @@ -1,27 +1,29 @@ #ifndef SHARED_BARRIERS_HPP #define SHARED_BARRIERS_HPP -#include -#include #include +#include namespace osrm { namespace storage { + struct SharedBarriers { SharedBarriers() - : pending_update_mutex(boost::interprocess::open_or_create, "pending_update"), - query_mutex(boost::interprocess::open_or_create, "query") + : current_regions_mutex(boost::interprocess::open_or_create, "current_regions"), + regions_1_mutex(boost::interprocess::open_or_create, "regions_1"), + regions_2_mutex(boost::interprocess::open_or_create, "regions_2") { } - // Mutex to protect access to the boolean variable - boost::interprocess::named_mutex pending_update_mutex; - boost::interprocess::named_sharable_mutex query_mutex; + boost::interprocess::named_upgradable_mutex current_regions_mutex; + boost::interprocess::named_sharable_mutex regions_1_mutex; + boost::interprocess::named_sharable_mutex regions_2_mutex; }; + } } diff --git a/src/engine/engine.cpp b/src/engine/engine.cpp index fdaafc6d9..50109c50a 100644 --- a/src/engine/engine.cpp +++ b/src/engine/engine.cpp @@ -22,7 +22,6 @@ #include #include #include -#include #include #include @@ -36,28 +35,15 @@ namespace // Works the same for every plugin. template osrm::engine::Status -RunQuery(const std::unique_ptr &lock, - osrm::engine::DataWatchdog& watchdog, +RunQuery(const std::unique_ptr& watchdog, std::shared_ptr &facade, const ParameterT ¶meters, PluginT &plugin, ResultT &result) { - if (!lock) + if (watchdog && watchdog->HasNewRegion()) { - return plugin.HandleRequest(facade, parameters, result); - } - - BOOST_ASSERT(lock); - // this locks aquires shared ownership of the query mutex: other requets are allowed - // to run, but data updates need to wait for all queries to finish until they can aquire an - // exclusive lock - boost::interprocess::sharable_lock query_lock( - lock->query_mutex); - - if (watchdog.HasNewRegion()) - { - watchdog.MaybeLoadNewRegion(facade); + watchdog->MaybeLoadNewRegion(facade); } osrm::engine::Status status = plugin.HandleRequest(facade, parameters, result); @@ -84,14 +70,12 @@ Engine::Engine(const EngineConfig &config) "No shared memory blocks found, have you forgotten to run osrm-datastore?"); } - facade_update_mutex = std::make_unique(); watchdog = std::make_unique(); // this will always either return a value or throw an exception // in the initial run watchdog->MaybeLoadNewRegion(query_data_facade); BOOST_ASSERT(query_data_facade); BOOST_ASSERT(watchdog); - BOOST_ASSERT(lock); } else { @@ -120,32 +104,32 @@ Engine &Engine::operator=(Engine &&) noexcept = default; Status Engine::Route(const api::RouteParameters ¶ms, util::json::Object &result) const { - return RunQuery(lock, *watchdog, query_data_facade, params, *route_plugin, result); + return RunQuery(watchdog, query_data_facade, params, *route_plugin, result); } Status Engine::Table(const api::TableParameters ¶ms, util::json::Object &result) const { - return RunQuery(lock, *watchdog, query_data_facade, params, *table_plugin, result); + return RunQuery(watchdog, query_data_facade, params, *table_plugin, result); } Status Engine::Nearest(const api::NearestParameters ¶ms, util::json::Object &result) const { - return RunQuery(lock, *watchdog, query_data_facade, params, *nearest_plugin, result); + return RunQuery(watchdog, query_data_facade, params, *nearest_plugin, result); } Status Engine::Trip(const api::TripParameters ¶ms, util::json::Object &result) const { - return RunQuery(lock, *watchdog, query_data_facade, params, *trip_plugin, result); + return RunQuery(watchdog, query_data_facade, params, *trip_plugin, result); } Status Engine::Match(const api::MatchParameters ¶ms, util::json::Object &result) const { - return RunQuery(lock, *watchdog, query_data_facade, params, *match_plugin, result); + return RunQuery(watchdog, query_data_facade, params, *match_plugin, result); } Status Engine::Tile(const api::TileParameters ¶ms, std::string &result) const { - return RunQuery(lock, *watchdog, query_data_facade, params, *tile_plugin, result); + return RunQuery(watchdog, query_data_facade, params, *tile_plugin, result); } } // engine ns diff --git a/src/storage/storage.cpp b/src/storage/storage.cpp index c90c2a619..d6bad0cfc 100644 --- a/src/storage/storage.cpp +++ b/src/storage/storage.cpp @@ -1,4 +1,3 @@ -#include "storage/storage.hpp" #include "contractor/query_edge.hpp" #include "extractor/compressed_edge_container.hpp" #include "extractor/guidance/turn_instruction.hpp" @@ -9,6 +8,7 @@ #include "storage/shared_barriers.hpp" #include "storage/shared_datatype.hpp" #include "storage/shared_memory.hpp" +#include "storage/storage.hpp" #include "engine/datafacade/datafacade_base.hpp" #include "util/coordinate.hpp" #include "util/exception.hpp" @@ -27,6 +27,11 @@ #endif #include +#include +#include +#include +#include +#include #include #include @@ -78,12 +83,37 @@ using QueryGraph = util::StaticGraph; Storage::Storage(StorageConfig config_) : config(std::move(config_)) {} +bool regionsAvailable(SharedDataType layout, SharedDataType data) +{ + auto shared_regions = makeSharedMemoryView(CURRENT_REGIONS); + const auto shared_timestamp = static_cast(shared_regions->Ptr()); + return shared_timestamp->layout != layout && shared_timestamp->data != data; +} + int Storage::Run() { BOOST_ASSERT_MSG(config.IsValid(), "Invalid storage config"); util::LogPolicy::GetInstance().Unmute(); - SharedBarriers barrier; + SharedBarriers barriers; + + boost::interprocess::upgradable_lock + current_regions_lock(barriers.current_regions_mutex, boost::interprocess::defer_lock); + try + { + if (!current_regions_lock.try_lock()) + { + util::SimpleLogger().Write(logWARNING) << "A data update is in progress"; + return EXIT_FAILURE; + } + } + // hard unlock in case of any exception. + catch (boost::interprocess::lock_exception &ex) + { + barriers.current_regions_mutex.unlock_upgradable(); + // make sure we exit here because this is bad + throw; + } #ifdef __linux__ // try to disable swapping on Linux @@ -94,30 +124,34 @@ int Storage::Run() } #endif - try - { - boost::interprocess::scoped_lock pending_lock( - barrier.pending_update_mutex); - } - catch (...) - { - // hard unlock in case of any exception. - barrier.pending_update_mutex.unlock(); - } + const auto regions_1_available = regionsAvailable(LAYOUT_1, DATA_1); + const auto regions_2_available = regionsAvailable(LAYOUT_2, DATA_2); - // determine segment to use - bool segment2_in_use = SharedMemory::RegionExists(LAYOUT_2); - const storage::SharedDataType layout_region = [&] { - return segment2_in_use ? LAYOUT_1 : LAYOUT_2; + const SharedDataType layout_region = [&] { + if (regions_1_available) + { + return LAYOUT_1; + } + if (regions_2_available) + { + return LAYOUT_2; + } + throw util::exception("No shared memory region free!"); }(); - const storage::SharedDataType data_region = [&] { return segment2_in_use ? DATA_1 : DATA_2; }(); - const storage::SharedDataType previous_layout_region = [&] { - return segment2_in_use ? LAYOUT_2 : LAYOUT_1; - }(); - const storage::SharedDataType previous_data_region = [&] { - return segment2_in_use ? DATA_2 : DATA_1; + const SharedDataType data_region = [&] { + if (regions_1_available) + { + return DATA_1; + } + if (regions_2_available) + { + return DATA_2; + } + throw util::exception("No shared memory region free!"); }(); + BOOST_ASSERT(regions_1_available || regions_2_available); + // Allocate a memory layout in shared memory, deallocate previous auto layout_memory = makeSharedMemory(layout_region, sizeof(SharedDataLayout)); auto shared_layout_ptr = new (layout_memory->Ptr()) SharedDataLayout(); @@ -606,8 +640,8 @@ int Storage::Run() std::uint64_t *osmnodeid_ptr = shared_layout_ptr->GetBlockPtr( shared_memory_ptr, SharedDataLayout::OSM_NODE_ID_LIST); util::PackedVector osmnodeid_list; - osmnodeid_list.reset( - osmnodeid_ptr, shared_layout_ptr->num_entries[storage::SharedDataLayout::OSM_NODE_ID_LIST]); + osmnodeid_list.reset(osmnodeid_ptr, + shared_layout_ptr->num_entries[SharedDataLayout::OSM_NODE_ID_LIST]); extractor::QueryNode current_node; for (unsigned i = 0; i < coordinate_list_size; ++i) @@ -733,20 +767,41 @@ int Storage::Run() } // acquire lock - auto data_type_memory = makeSharedMemory(CURRENT_REGIONS, sizeof(SharedDataTimestamp), true, false, false); + auto data_type_memory = + makeSharedMemory(CURRENT_REGIONS, sizeof(SharedDataTimestamp), true, false, false); SharedDataTimestamp *data_timestamp_ptr = static_cast(data_type_memory->Ptr()); { - util::SimpleLogger().Write(logDEBUG) << "waiting for all queries to finish"; - boost::interprocess::scoped_lock query_lock(barrier.query_mutex); - util::SimpleLogger().Write(logDEBUG) << "all queries complete, switching over."; + boost::interprocess::scoped_lock + current_regions_exclusive_lock(std::move(current_regions_lock)); data_timestamp_ptr->layout = layout_region; data_timestamp_ptr->data = data_region; data_timestamp_ptr->timestamp += 1; - deleteRegion(previous_data_region); - deleteRegion(previous_layout_region); + + boost::interprocess::upgradable_lock + current_regions_upgradable_lock(std::move(current_regions_exclusive_lock)); + + util::SimpleLogger().Write(logDEBUG) << "waiting for server to switch dataset and request to finish"; + if (!regions_1_available) + { + BOOST_ASSERT(regions_2_available); + boost::interprocess::scoped_lock + regions_1_lock(barriers.regions_1_mutex); + util::SimpleLogger().Write(logDEBUG) << "switched. removing old regions 1"; + deleteRegion(DATA_1); + deleteRegion(LAYOUT_1); + } + else if (!regions_2_available) + { + BOOST_ASSERT(regions_1_available); + boost::interprocess::scoped_lock + regions_2_lock(barriers.regions_2_mutex); + util::SimpleLogger().Write(logDEBUG) << "switched. removing regions 2"; + deleteRegion(DATA_2); + deleteRegion(LAYOUT_2); + } } util::SimpleLogger().Write() << "all data loaded"; diff --git a/src/tools/unlock_all_mutexes.cpp b/src/tools/unlock_all_mutexes.cpp index abb2e065e..343d73061 100644 --- a/src/tools/unlock_all_mutexes.cpp +++ b/src/tools/unlock_all_mutexes.cpp @@ -7,8 +7,9 @@ int main() { osrm::util::LogPolicy::GetInstance().Unmute(); osrm::util::SimpleLogger().Write() << "Releasing all locks"; - osrm::storage::SharedBarriers barrier; - barrier.pending_update_mutex.unlock(); - barrier.query_mutex.unlock(); + osrm::storage::SharedBarriers barriers; + boost::interprocess::named_upgradable_mutex::remove("current_regions"); + boost::interprocess::named_sharable_mutex::remove("regions_1"); + boost::interprocess::named_sharable_mutex::remove("regions_2"); return 0; }