From fce8d72895ba3919a1d1b63dd949f9f77b2acd76 Mon Sep 17 00:00:00 2001 From: Michael Krasnyk Date: Mon, 26 Dec 2016 01:00:37 +0100 Subject: [PATCH] Change to condvar signaling if data region swapped --- include/engine/data_watchdog.hpp | 136 ++++------- .../datafacade/shared_memory_datafacade.hpp | 49 +--- include/engine/engine.hpp | 2 +- include/storage/shared_barriers.hpp | 21 +- include/storage/shared_memory.hpp | 11 +- include/storage/storage.hpp | 11 +- src/engine/engine.cpp | 8 +- src/storage/storage.cpp | 218 +++++------------- src/tools/store.cpp | 45 +--- src/tools/unlock_all_mutexes.cpp | 4 +- 10 files changed, 143 insertions(+), 362 deletions(-) diff --git a/include/engine/data_watchdog.hpp b/include/engine/data_watchdog.hpp index 09cf16218..a22e23038 100644 --- a/include/engine/data_watchdog.hpp +++ b/include/engine/data_watchdog.hpp @@ -13,6 +13,7 @@ #include #include +#include namespace osrm { @@ -26,10 +27,17 @@ class DataWatchdog { public: DataWatchdog() - : shared_barriers{std::make_shared()}, - shared_regions(storage::makeSharedMemory(storage::CURRENT_REGION)), - current_timestamp{storage::REGION_NONE, 0} + : active(true) + , timestamp(0) { + watcher = std::thread(&DataWatchdog::Run, this); + } + + ~DataWatchdog() + { + active = false; + barrier.region_condition.notify_all(); + watcher.join(); } // Tries to connect to the shared memory containing the regions table @@ -38,102 +46,44 @@ class DataWatchdog return storage::SharedMemory::RegionExists(storage::CURRENT_REGION); } - using RegionsLock = - boost::interprocess::sharable_lock; - using LockAndFacade = std::pair>; - - // This will either update the contens of facade or just leave it as is - // if the update was already done by another thread - LockAndFacade GetDataFacade() + auto GetDataFacade() const { - const boost::interprocess::sharable_lock lock( - shared_barriers->current_region_mutex); - - const auto shared_timestamp = - static_cast(shared_regions->Ptr()); - - const auto get_locked_facade = [this, shared_timestamp]( - const std::shared_ptr &facade) { - if (current_timestamp.region == storage::REGION_1) - { - return std::make_pair(RegionsLock(shared_barriers->region_1_mutex), facade); - } - else - { - BOOST_ASSERT(current_timestamp.region == storage::REGION_2); - return std::make_pair(RegionsLock(shared_barriers->region_2_mutex), facade); - } - }; - - // this blocks handle the common case when there is no data update -> we will only need a - // shared lock - { - boost::shared_lock facade_lock(facade_mutex); - - if (shared_timestamp->timestamp == current_timestamp.timestamp) - { - if (auto facade = cached_facade.lock()) - { - BOOST_ASSERT(shared_timestamp->region == current_timestamp.region); - return get_locked_facade(facade); - } - } - } - - // if we reach this code there is a data update to be made. multiple - // requests can reach this, but only ever one goes through at a time. - boost::upgrade_lock facade_lock(facade_mutex); - - // we might get overtaken before we actually do the writing - // in that case we don't modify anything - if (shared_timestamp->timestamp == current_timestamp.timestamp) - { - if (auto facade = cached_facade.lock()) - { - BOOST_ASSERT(shared_timestamp->region == current_timestamp.region); - return get_locked_facade(facade); - } - } - - // this thread has won and can update the data - boost::upgrade_to_unique_lock unique_facade_lock(facade_lock); - - // if two threads try to enter this critical section one will loose - // and will find an up-to-date instance of the shared data facade - if (shared_timestamp->timestamp == current_timestamp.timestamp) - { - // if the thread that updated the facade finishes the query before - // we can aquire our handle here, we need to regenerate - if (auto facade = cached_facade.lock()) - { - BOOST_ASSERT(shared_timestamp->region == current_timestamp.region); - - return get_locked_facade(facade); - } - } - else - { - current_timestamp = *shared_timestamp; - } - - auto new_facade = std::make_shared( - shared_barriers, current_timestamp.region, current_timestamp.timestamp); - cached_facade = new_facade; - - return get_locked_facade(new_facade); + return facade; } private: - // mutexes should be mutable even on const objects: This enables - // marking functions as logical const and thread-safe. - std::shared_ptr shared_barriers; - // shared memory table containing pointers to all shared regions - std::unique_ptr shared_regions; + void Run() + { + boost::interprocess::scoped_lock + current_region_lock(barrier.region_mutex); - mutable boost::shared_mutex facade_mutex; - std::weak_ptr cached_facade; - storage::SharedDataTimestamp current_timestamp; + auto shared_memory = makeSharedMemory(storage::CURRENT_REGION); + auto current = static_cast(shared_memory->Ptr()); + + while (active) + { + if (timestamp != current->timestamp) + { + util::Log() << "updating facade to region " << storage::regionToString(current->region) + << " with timestamp " << current->timestamp; + facade = std::make_shared(current->region); + timestamp = current->timestamp; + } + + barrier.region_condition.wait(current_region_lock); + } + + facade.reset(); + + util::Log() << "DataWatchdog thread stopped"; + } + + storage::SharedBarriers barrier; + std::thread watcher; + bool active; + unsigned timestamp; + std::shared_ptr facade; }; } } diff --git a/include/engine/datafacade/shared_memory_datafacade.hpp b/include/engine/datafacade/shared_memory_datafacade.hpp index f233e8dd4..75229c17a 100644 --- a/include/engine/datafacade/shared_memory_datafacade.hpp +++ b/include/engine/datafacade/shared_memory_datafacade.hpp @@ -25,57 +25,16 @@ class SharedMemoryDataFacade : public ContiguousInternalMemoryDataFacadeBase protected: std::unique_ptr m_large_memory; - std::shared_ptr shared_barriers; storage::SharedDataType data_region; - unsigned shared_timestamp; SharedMemoryDataFacade() {} public: - // this function handle the deallocation of the shared memory it we can prove it will not be - // used anymore. We crash hard here if something goes wrong (noexcept). - virtual ~SharedMemoryDataFacade() noexcept + + SharedMemoryDataFacade(storage::SharedDataType data_region) + : data_region(data_region) { - // Now check if this is still the newest dataset - boost::interprocess::sharable_lock - current_regions_lock(shared_barriers->current_region_mutex, - boost::interprocess::defer_lock); - - boost::interprocess::scoped_lock exclusive_lock( - data_region == storage::REGION_1 ? shared_barriers->region_1_mutex - : shared_barriers->region_2_mutex, - boost::interprocess::defer_lock); - - // if this returns false this is still in use - if (current_regions_lock.try_lock() && exclusive_lock.try_lock()) - { - if (storage::SharedMemory::RegionExists(data_region)) - { - auto shared_region = storage::makeSharedMemory(storage::CURRENT_REGION); - const auto current_timestamp = - static_cast(shared_region->Ptr()); - - // check if the memory region referenced by this facade needs cleanup - if (current_timestamp->region == data_region) - { - util::Log(logDEBUG) << "Retaining data with shared timestamp " - << shared_timestamp; - } - else - { - storage::SharedMemory::Remove(data_region); - } - } - } - } - - SharedMemoryDataFacade(const std::shared_ptr &shared_barriers_, - storage::SharedDataType data_region_, - unsigned shared_timestamp_) - : shared_barriers(shared_barriers_), data_region(data_region_), - shared_timestamp(shared_timestamp_) - { - util::Log(logDEBUG) << "Loading new data with shared timestamp " << shared_timestamp; + util::Log(logDEBUG) << "Loading new data for region " << regionToString(data_region); BOOST_ASSERT(storage::SharedMemory::RegionExists(data_region)); m_large_memory = storage::makeSharedMemory(data_region); diff --git a/include/engine/engine.hpp b/include/engine/engine.hpp index 94a416539..33985ebc5 100644 --- a/include/engine/engine.hpp +++ b/include/engine/engine.hpp @@ -50,7 +50,7 @@ class Engine final Status Tile(const api::TileParameters ¶meters, std::string &result) const; private: - std::unique_ptr lock; + //std::unique_ptr lock; std::unique_ptr watchdog; const plugins::ViaRoutePlugin route_plugin; diff --git a/include/storage/shared_barriers.hpp b/include/storage/shared_barriers.hpp index db48cead9..a74991bd4 100644 --- a/include/storage/shared_barriers.hpp +++ b/include/storage/shared_barriers.hpp @@ -1,8 +1,8 @@ #ifndef SHARED_BARRIERS_HPP #define SHARED_BARRIERS_HPP -#include -#include +#include +#include namespace osrm { @@ -13,22 +13,19 @@ struct SharedBarriers { SharedBarriers() - : current_region_mutex(boost::interprocess::open_or_create, "current_region"), - region_1_mutex(boost::interprocess::open_or_create, "region_1"), - region_2_mutex(boost::interprocess::open_or_create, "region_2") + : region_mutex(boost::interprocess::open_or_create, "osrm-region") + , region_condition(boost::interprocess::open_or_create, "osrm-region-cv") { } - static void resetCurrentRegion() + static void remove() { - boost::interprocess::named_sharable_mutex::remove("current_region"); + boost::interprocess::named_mutex::remove("osrm-region"); + boost::interprocess::named_condition::remove("osrm-region-cv"); } - static void resetRegion1() { boost::interprocess::named_sharable_mutex::remove("region_1"); } - static void resetRegion2() { boost::interprocess::named_sharable_mutex::remove("region_2"); } - boost::interprocess::named_upgradable_mutex current_region_mutex; - boost::interprocess::named_sharable_mutex region_1_mutex; - boost::interprocess::named_sharable_mutex region_2_mutex; + boost::interprocess::named_mutex region_mutex; + boost::interprocess::named_condition region_condition; }; } } diff --git a/include/storage/shared_memory.hpp b/include/storage/shared_memory.hpp index 2e2c452d1..8aa2fc3f5 100644 --- a/include/storage/shared_memory.hpp +++ b/include/storage/shared_memory.hpp @@ -19,7 +19,6 @@ #include #endif -// #include #include #include @@ -150,25 +149,23 @@ class SharedMemory SharedMemory(const boost::filesystem::path &lock_file, const int id, - const uint64_t size = 0, - bool read_write = false) + const uint64_t size = 0) { sprintf(key, "%s.%d", "osrm.lock", id); - auto access = read_write ? boost::interprocess::read_write : boost::interprocess::read_only; if (0 == size) { // read_only shm = boost::interprocess::shared_memory_object( boost::interprocess::open_only, key, - read_write ? boost::interprocess::read_write : boost::interprocess::read_only); - region = boost::interprocess::mapped_region(shm, access); + boost::interprocess::read_only); + region = boost::interprocess::mapped_region(shm, boost::interprocess::read_only); } else { // writeable pointer shm = boost::interprocess::shared_memory_object( boost::interprocess::open_or_create, key, boost::interprocess::read_write); shm.truncate(size); - region = boost::interprocess::mapped_region(shm, access); + region = boost::interprocess::mapped_region(shm, boost::interprocess::read_write); util::Log(logDEBUG) << "writeable memory allocated " << size << " bytes"; } diff --git a/include/storage/storage.hpp b/include/storage/storage.hpp index 711dec816..000c49e8d 100644 --- a/include/storage/storage.hpp +++ b/include/storage/storage.hpp @@ -31,6 +31,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "storage/shared_datatype.hpp" #include "storage/storage_config.hpp" +#include #include #include @@ -44,20 +45,14 @@ class Storage public: Storage(StorageConfig config); - enum ReturnCode - { - Ok, - Error, - Retry - }; - - ReturnCode Run(int max_wait); + int Run(); void PopulateLayout(DataLayout &layout); void PopulateData(const DataLayout &layout, char *memory_ptr); private: StorageConfig config; + boost::interprocess::named_mutex datastore_mutex; }; } } diff --git a/src/engine/engine.cpp b/src/engine/engine.cpp index 9ebe9bf2a..5719f35b1 100644 --- a/src/engine/engine.cpp +++ b/src/engine/engine.cpp @@ -35,9 +35,9 @@ RunQuery(const std::unique_ptr &watchdog, if (watchdog) { BOOST_ASSERT(!facade); - auto lock_and_facade = watchdog->GetDataFacade(); + auto facade = watchdog->GetDataFacade(); - return plugin.HandleRequest(lock_and_facade.second, parameters, result); + return plugin.HandleRequest(facade, parameters, result); } BOOST_ASSERT(facade); @@ -53,9 +53,7 @@ namespace engine { Engine::Engine(const EngineConfig &config) - : lock(config.use_shared_memory ? std::make_unique() - : std::unique_ptr()), - route_plugin(config.max_locations_viaroute), // + : route_plugin(config.max_locations_viaroute), // table_plugin(config.max_locations_distance_table), // nearest_plugin(config.max_results_nearest), // trip_plugin(config.max_locations_trip), // diff --git a/src/storage/storage.cpp b/src/storage/storage.cpp index b2b2ee36a..f4ad41fa0 100644 --- a/src/storage/storage.cpp +++ b/src/storage/storage.cpp @@ -54,177 +54,85 @@ using RTreeNode = util::StaticRTree::vector, true>::TreeNode; using QueryGraph = util::StaticGraph; -Storage::Storage(StorageConfig config_) : config(std::move(config_)) {} +Storage::Storage(StorageConfig config_) + : config(std::move(config_)) + , datastore_mutex(boost::interprocess::open_or_create, "osrm-datastore") +{} -struct RegionsLayout -{ - SharedDataType current_data_region; - boost::interprocess::named_sharable_mutex ¤t_region_mutex; - SharedDataType old_data_region; - boost::interprocess::named_sharable_mutex &old_region_mutex; -}; - -RegionsLayout getRegionsLayout(SharedBarriers &barriers) -{ - if (SharedMemory::RegionExists(CURRENT_REGION)) - { - auto shared_region = makeSharedMemory(CURRENT_REGION); - const auto shared_timestamp = - static_cast(shared_region->Ptr()); - if (shared_timestamp->region == REGION_1) - { - return RegionsLayout{ - REGION_1, barriers.region_1_mutex, REGION_2, barriers.region_2_mutex}; - } - - BOOST_ASSERT(shared_timestamp->region == REGION_2); - } - - return RegionsLayout{REGION_2, barriers.region_2_mutex, REGION_1, barriers.region_1_mutex}; -} - -Storage::ReturnCode Storage::Run(int max_wait) +int Storage::Run() { BOOST_ASSERT_MSG(config.IsValid(), "Invalid storage config"); util::LogPolicy::GetInstance().Unmute(); + // Lock datastore for binary exclusive read and write access + boost::interprocess::scoped_lock + datastore_lock(datastore_mutex); + SharedBarriers barriers; - boost::interprocess::upgradable_lock - current_region_lock(barriers.current_region_mutex, boost::interprocess::defer_lock); - try { - if (!current_region_lock.try_lock()) - { - util::Log(logWARNING) << "A data update is in progress"; - return ReturnCode::Error; - } - } - // hard unlock in case of any exception. - catch (boost::interprocess::lock_exception &) - { - barriers.current_region_mutex.unlock_upgradable(); - // make sure we exit here because this is bad - throw; - } - #ifdef __linux__ - // try to disable swapping on Linux - const bool lock_flags = MCL_CURRENT | MCL_FUTURE; - if (-1 == mlockall(lock_flags)) - { - util::Log(logWARNING) << "Could not request RAM lock"; - } + // try to disable swapping on Linux + const bool lock_flags = MCL_CURRENT | MCL_FUTURE; + if (-1 == mlockall(lock_flags)) + { + util::Log(logWARNING) << "Could not request RAM lock"; + } #endif - auto regions_layout = getRegionsLayout(barriers); - const SharedDataType data_region = regions_layout.old_data_region; - - if (max_wait > 0) - { - util::Log() << "Waiting for " << max_wait - << " second for all queries on the old dataset to finish:"; - } - else - { - util::Log() << "Waiting for all queries on the old dataset to finish:"; - } - - boost::interprocess::scoped_lock regions_lock( - regions_layout.old_region_mutex, boost::interprocess::defer_lock); - - if (max_wait > 0) - { - if (!regions_lock.timed_lock(boost::posix_time::microsec_clock::universal_time() + - boost::posix_time::seconds(max_wait))) + // Get the next region ID and time stamp without locking shared barriers. + // Because of datastore_lock the only write operation can occur sequentially later. + auto next_region = REGION_1; + unsigned next_timestamp = 1; + if (SharedMemory::RegionExists(CURRENT_REGION)) { - util::Log(logWARNING) << "Queries did not finish in " << max_wait - << " seconds. Claiming the lock by force."; - // WARNING: if queries are still using the old dataset they might crash - if (regions_layout.old_data_region == REGION_1) - { - barriers.resetRegion1(); - } - else - { - BOOST_ASSERT(regions_layout.old_data_region == REGION_2); - barriers.resetRegion2(); - } - - return ReturnCode::Retry; - } - } - else - { - regions_lock.lock(); - } - util::Log() << "Ok."; - - // since we can't change the size of a shared memory regions we delete and reallocate - if (SharedMemory::RegionExists(data_region) && !SharedMemory::Remove(data_region)) - { - throw util::exception("Could not remove shared memory region " + - regionToString(data_region) + SOURCE_REF); - } - - // Populate a memory layout into stack memory - DataLayout layout; - PopulateLayout(layout); - - // Allocate shared memory block - auto regions_size = sizeof(layout) + layout.GetSizeOfLayout(); - util::Log() << "allocating shared memory of " << regions_size << " bytes"; - auto shared_memory = makeSharedMemory(data_region, regions_size); - - // Copy memory layout to shared memory and populate data - char *shared_memory_ptr = static_cast(shared_memory->Ptr()); - memcpy(shared_memory_ptr, &layout, sizeof(layout)); - PopulateData(layout, shared_memory_ptr + sizeof(layout)); - - auto data_type_memory = makeSharedMemory(CURRENT_REGION, sizeof(SharedDataTimestamp)); - SharedDataTimestamp *data_timestamp_ptr = - static_cast(data_type_memory->Ptr()); - - { - - boost::interprocess::scoped_lock - current_region_exclusive_lock; - - if (max_wait > 0) - { - util::Log() << "Waiting for " << max_wait << " seconds to write new dataset timestamp"; - auto end_time = boost::posix_time::microsec_clock::universal_time() + - boost::posix_time::seconds(max_wait); - current_region_exclusive_lock = - boost::interprocess::scoped_lock( - std::move(current_region_lock), end_time); - - if (!current_region_exclusive_lock.owns()) - { - util::Log(logWARNING) << "Aquiring the lock timed out after " << max_wait - << " seconds. Claiming the lock by force."; - current_region_lock.unlock(); - current_region_lock.release(); - storage::SharedBarriers::resetCurrentRegion(); - return ReturnCode::Retry; - } - } - else - { - util::Log() << "Waiting to write new dataset timestamp"; - current_region_exclusive_lock = - boost::interprocess::scoped_lock( - std::move(current_region_lock)); + auto shared_memory = makeSharedMemory(CURRENT_REGION); + auto current_region = static_cast(shared_memory->Ptr()); + next_region = current_region->region == REGION_1 ? REGION_2 : REGION_1; + next_timestamp = current_region->timestamp + 1; } - util::Log() << "Ok."; - data_timestamp_ptr->region = data_region; - data_timestamp_ptr->timestamp += 1; - } - util::Log() << "All data loaded."; + // SHMCTL(2): Mark the segment to be destroyed. The segment will actually be destroyed + // only after the last process detaches it. + if (storage::SharedMemory::RegionExists(next_region)) + { + storage::SharedMemory::Remove(next_region); + } - return ReturnCode::Ok; + // Populate a memory layout into stack memory + DataLayout layout; + PopulateLayout(layout); + + // Allocate shared memory block + auto regions_size = sizeof(layout) + layout.GetSizeOfLayout(); + util::Log() << "Allocating shared memory of " << regions_size << " bytes"; + auto data_memory = makeSharedMemory(next_region, regions_size); + + // Copy memory layout to shared memory and populate data + char *shared_memory_ptr = static_cast(data_memory->Ptr()); + memcpy(shared_memory_ptr, &layout, sizeof(layout)); + PopulateData(layout, shared_memory_ptr + sizeof(layout)); + + { // Lock for write access shared region mutex that protects CURRENT_REGION + boost::interprocess::scoped_lock + current_region_lock(barriers.region_mutex); + + // Get write access to CURRENT_REGION + auto shared_memory = makeSharedMemory(CURRENT_REGION, sizeof(SharedDataTimestamp)); + auto current_region = static_cast(shared_memory->Ptr()); + + // Update the current region ID and timestamp + current_region->region = next_region; + current_region->timestamp = next_timestamp; + } + + util::Log() << "All data loaded."; + } + + barriers.region_condition.notify_all(); + + return EXIT_SUCCESS; } /** diff --git a/src/tools/store.cpp b/src/tools/store.cpp index ada72e8ed..9a700f9ad 100644 --- a/src/tools/store.cpp +++ b/src/tools/store.cpp @@ -16,20 +16,15 @@ using namespace osrm; // generate boost::program_options object for the routing part bool generateDataStoreOptions(const int argc, const char *argv[], - boost::filesystem::path &base_path, - int &max_wait) + boost::filesystem::path &base_path) { // declare a group of options that will be allowed only on command line boost::program_options::options_description generic_options("Options"); - generic_options.add_options()("version,v", "Show version")("help,h", "Show this help message"); + generic_options.add_options()("version,v", "Show version")("help,h", "Show this help message")("remove-locks,r", "Remove locks"); // declare a group of options that will be allowed both on command line // as well as in a config file boost::program_options::options_description config_options("Configuration"); - config_options.add_options()( - "max-wait", - boost::program_options::value(&max_wait)->default_value(-1), - "Maximum number of seconds to wait on requests that use the old dataset."); // hidden options, will be allowed on command line but will not be shown to the user boost::program_options::options_description hidden_options("Hidden options"); @@ -86,6 +81,13 @@ bool generateDataStoreOptions(const int argc, return false; } + if (option_variables.count("remove-locks")) + { + osrm::storage::SharedBarriers::remove(); + boost::interprocess::named_mutex::remove("osrm-datastore"); + return false; + } + boost::program_options::notify(option_variables); return true; @@ -93,7 +95,7 @@ bool generateDataStoreOptions(const int argc, [[noreturn]] void CleanupSharedBarriers(int signum) { // Here the lock state of named mutexes is unknown, make a hard cleanup - osrm::storage::SharedBarriers::resetCurrentRegion(); + osrm::storage::SharedBarriers::remove(); std::_Exit(128 + signum); } @@ -108,8 +110,7 @@ int main(const int argc, const char *argv[]) try util::LogPolicy::GetInstance().Unmute(); boost::filesystem::path base_path; - int max_wait = -1; - if (!generateDataStoreOptions(argc, argv, base_path, max_wait)) + if (!generateDataStoreOptions(argc, argv, base_path)) { return EXIT_SUCCESS; } @@ -121,29 +122,7 @@ int main(const int argc, const char *argv[]) try } storage::Storage storage(std::move(config)); - // We will attempt to load this dataset to memory several times if we encounter - // an error we can recover from. This is needed when we need to clear mutexes - // that have been left dangling by other processes. - const constexpr unsigned MAX_RETRIES = 3; - unsigned retry_counter = 0; - storage::Storage::ReturnCode code = storage::Storage::ReturnCode::Retry; - while (code == storage::Storage::ReturnCode::Retry && retry_counter < MAX_RETRIES) - { - if (retry_counter > 0) - { - util::Log(logWARNING) << "Try number " << (retry_counter + 1) - << " to load the dataset."; - } - code = storage.Run(max_wait); - retry_counter++; - } - - if (code == storage::Storage::ReturnCode::Ok) - { - return EXIT_SUCCESS; - } - - return EXIT_FAILURE; + return storage.Run(); } catch (const std::bad_alloc &e) { diff --git a/src/tools/unlock_all_mutexes.cpp b/src/tools/unlock_all_mutexes.cpp index 4c3a534e6..cf93688ea 100644 --- a/src/tools/unlock_all_mutexes.cpp +++ b/src/tools/unlock_all_mutexes.cpp @@ -8,9 +8,7 @@ int main() osrm::util::LogPolicy::GetInstance().Unmute(); osrm::util::Log() << "Releasing all locks"; - osrm::storage::SharedBarriers::resetCurrentRegion(); - osrm::storage::SharedBarriers::resetRegion1(); - osrm::storage::SharedBarriers::resetRegion2(); + osrm::storage::SharedBarriers::remove(); return 0; }