diff --git a/include/engine/datafacade/shared_datafacade.hpp b/include/engine/datafacade/shared_datafacade.hpp index 12fbfbf73..f0d1e844d 100644 --- a/include/engine/datafacade/shared_datafacade.hpp +++ b/include/engine/datafacade/shared_datafacade.hpp @@ -150,7 +150,7 @@ class SharedDataFacade final : public BaseDataFacade if (!boost::filesystem::exists(file_index_path)) { util::SimpleLogger().Write(logDEBUG) << "Leaf file name " << file_index_path.string(); - throw util::exception("Could not load leaf index file. " + throw util::exception("Could not load " + file_index_path.string() + "Is any data loaded into shared memory?"); } diff --git a/include/storage/shared_barriers.hpp b/include/storage/shared_barriers.hpp index d9bdb3aeb..0a72c4774 100644 --- a/include/storage/shared_barriers.hpp +++ b/include/storage/shared_barriers.hpp @@ -19,11 +19,23 @@ struct SharedBarriers { } + static void resetCurrentRegions() + { + boost::interprocess::named_sharable_mutex::remove("current_regions"); + } + static void resetRegions1() + { + boost::interprocess::named_sharable_mutex::remove("regions_1"); + } + static void resetRegions2() + { + boost::interprocess::named_sharable_mutex::remove("regions_2"); + } + boost::interprocess::named_upgradable_mutex current_regions_mutex; boost::interprocess::named_sharable_mutex regions_1_mutex; boost::interprocess::named_sharable_mutex regions_2_mutex; }; - } } diff --git a/include/storage/storage.hpp b/include/storage/storage.hpp index f5934ddf4..e914e5872 100644 --- a/include/storage/storage.hpp +++ b/include/storage/storage.hpp @@ -42,7 +42,15 @@ class Storage { public: Storage(StorageConfig config); - int Run(); + + enum ReturnCode + { + Ok, + Error, + Retry + }; + + ReturnCode Run(int max_wait); private: StorageConfig config; diff --git a/src/storage/storage.cpp b/src/storage/storage.cpp index 9c08aa402..76e0ddd4f 100644 --- a/src/storage/storage.cpp +++ b/src/storage/storage.cpp @@ -26,6 +26,7 @@ #include #endif +#include #include #include #include @@ -69,25 +70,33 @@ RegionsLayout getRegionsLayout(SharedBarriers &barriers) if (SharedMemory::RegionExists(CURRENT_REGIONS)) { auto shared_regions = makeSharedMemory(CURRENT_REGIONS); - const auto shared_timestamp = static_cast(shared_regions->Ptr()); + const auto shared_timestamp = + static_cast(shared_regions->Ptr()); if (shared_timestamp->data == DATA_1) { BOOST_ASSERT(shared_timestamp->layout == LAYOUT_1); - return RegionsLayout {LAYOUT_1, DATA_1, barriers.regions_1_mutex, LAYOUT_2, DATA_2, barriers.regions_2_mutex}; + return RegionsLayout{LAYOUT_1, + DATA_1, + barriers.regions_1_mutex, + LAYOUT_2, + DATA_2, + barriers.regions_2_mutex}; } BOOST_ASSERT(shared_timestamp->data == DATA_2); BOOST_ASSERT(shared_timestamp->layout == LAYOUT_2); } - return RegionsLayout {LAYOUT_2, DATA_2, barriers.regions_2_mutex, LAYOUT_1, DATA_1, barriers.regions_1_mutex}; + return RegionsLayout{ + LAYOUT_2, DATA_2, barriers.regions_2_mutex, LAYOUT_1, DATA_1, barriers.regions_1_mutex}; } -int Storage::Run() +Storage::ReturnCode Storage::Run(int max_wait) { BOOST_ASSERT_MSG(config.IsValid(), "Invalid storage config"); util::LogPolicy::GetInstance().Unmute(); + SharedBarriers barriers; boost::interprocess::upgradable_lock @@ -97,7 +106,7 @@ int Storage::Run() if (!current_regions_lock.try_lock()) { util::SimpleLogger().Write(logWARNING) << "A data update is in progress"; - return EXIT_FAILURE; + return ReturnCode::Error; } } // hard unlock in case of any exception. @@ -121,9 +130,46 @@ int Storage::Run() const SharedDataType layout_region = regions_layout.old_layout_region; const SharedDataType data_region = regions_layout.old_data_region; - util::SimpleLogger().Write() << "Waiting for all queries on the old dataset to finish:"; - boost::interprocess::scoped_lock - layout_lock(regions_layout.old_regions_mutex); + if (max_wait > 0) + { + util::SimpleLogger().Write() << "Waiting for " << max_wait + << " second for all queries on the old dataset to finish:"; + } + else + { + util::SimpleLogger().Write() << "Waiting for all queries on the old dataset to finish:"; + } + + boost::interprocess::scoped_lock regions_lock( + regions_layout.old_regions_mutex, boost::interprocess::defer_lock); + + if (max_wait > 0) + { + if (!regions_lock.timed_lock(boost::posix_time::microsec_clock::universal_time() + + boost::posix_time::seconds(max_wait))) + { + util::SimpleLogger().Write(logWARNING) << "Queries did not finish in " << max_wait + << " seconds. Claiming the lock by force."; + // WARNING: if queries are still using the old dataset they might crash + if (regions_layout.old_layout_region == LAYOUT_1) + { + BOOST_ASSERT(regions_layout.old_data_region == DATA_1); + barriers.resetRegions1(); + } + else + { + BOOST_ASSERT(regions_layout.old_layout_region == LAYOUT_2); + BOOST_ASSERT(regions_layout.old_data_region == DATA_2); + barriers.resetRegions2(); + } + + return ReturnCode::Retry; + } + } + else + { + regions_lock.lock(); + } util::SimpleLogger().Write() << "Ok."; // since we can't change the size of a shared memory regions we delete and reallocate @@ -158,7 +204,6 @@ int Storage::Run() shared_layout_ptr->SetBlockSize(SharedDataLayout::NAME_OFFSETS, name_blocks); shared_layout_ptr->SetBlockSize::BlockT>( SharedDataLayout::NAME_BLOCKS, name_blocks); - util::SimpleLogger().Write() << "name offsets size: " << name_blocks; BOOST_ASSERT_MSG(0 != name_blocks, "name file broken"); unsigned number_of_chars = 0; @@ -593,9 +638,6 @@ int Storage::Run() shared_memory_ptr, SharedDataLayout::DATASOURCE_NAME_DATA); if (shared_layout_ptr->GetBlockSize(SharedDataLayout::DATASOURCE_NAME_DATA) > 0) { - util::SimpleLogger().Write() - << "Copying " << (m_datasource_name_data.end() - m_datasource_name_data.begin()) - << " chars into name data ptr"; std::copy( m_datasource_name_data.begin(), m_datasource_name_data.end(), datasource_name_data_ptr); } @@ -750,22 +792,49 @@ int Storage::Run() std::copy(entry_class_table.begin(), entry_class_table.end(), entry_class_ptr); } - auto data_type_memory = - makeSharedMemory(CURRENT_REGIONS, sizeof(SharedDataTimestamp), true); + auto data_type_memory = makeSharedMemory(CURRENT_REGIONS, sizeof(SharedDataTimestamp), true); SharedDataTimestamp *data_timestamp_ptr = static_cast(data_type_memory->Ptr()); { - boost::interprocess::scoped_lock - current_regions_exclusive_lock(std::move(current_regions_lock)); + + boost::interprocess::scoped_lock + current_regions_exclusive_lock; + + if (max_wait > 0) + { + util::SimpleLogger().Write() << "Waiting for " << max_wait << " seconds to write new dataset timestamp"; + auto end_time = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::seconds(max_wait); + current_regions_exclusive_lock = + boost::interprocess::scoped_lock( + std::move(current_regions_lock), end_time); + + if (!current_regions_exclusive_lock.owns()) + { + util::SimpleLogger().Write(logWARNING) << "Aquiring the lock timed out after " << max_wait << " seconds. Claiming the lock by force."; + current_regions_lock.unlock(); + current_regions_lock.release(); + storage::SharedBarriers::resetCurrentRegions(); + return ReturnCode::Retry; + } + } + else + { + util::SimpleLogger().Write() << "Waiting to write new dataset timestamp"; + current_regions_exclusive_lock = + boost::interprocess::scoped_lock( + std::move(current_regions_lock)); + } + + util::SimpleLogger().Write() << "Ok."; data_timestamp_ptr->layout = layout_region; data_timestamp_ptr->data = data_region; data_timestamp_ptr->timestamp += 1; } - util::SimpleLogger().Write() << "all data loaded"; + util::SimpleLogger().Write() << "All data loaded."; - return EXIT_SUCCESS; + return ReturnCode::Ok; } } } diff --git a/src/tools/store.cpp b/src/tools/store.cpp index 1d4bbb1f4..2e8278ad5 100644 --- a/src/tools/store.cpp +++ b/src/tools/store.cpp @@ -12,7 +12,8 @@ using namespace osrm; // generate boost::program_options object for the routing part bool generateDataStoreOptions(const int argc, const char *argv[], - boost::filesystem::path &base_path) + boost::filesystem::path &base_path, + int &max_wait) { // declare a group of options that will be allowed only on command line boost::program_options::options_description generic_options("Options"); @@ -21,6 +22,9 @@ bool generateDataStoreOptions(const int argc, // declare a group of options that will be allowed both on command line // as well as in a config file boost::program_options::options_description config_options("Configuration"); + config_options.add_options()("max-wait", + boost::program_options::value(&max_wait)->default_value(-1), + "Maximum number of seconds to wait on requests that use the old dataset."); // hidden options, will be allowed on command line but will not be shown to the user boost::program_options::options_description hidden_options("Hidden options"); @@ -87,7 +91,8 @@ int main(const int argc, const char *argv[]) try util::LogPolicy::GetInstance().Unmute(); boost::filesystem::path base_path; - if (!generateDataStoreOptions(argc, argv, base_path)) + int max_wait = -1; + if (!generateDataStoreOptions(argc, argv, base_path, max_wait)) { return EXIT_SUCCESS; } @@ -98,7 +103,29 @@ int main(const int argc, const char *argv[]) try return EXIT_FAILURE; } storage::Storage storage(std::move(config)); - return storage.Run(); + + // We will attempt to load this dataset to memory several times if we encounter + // an error we can recover from. This is needed when we need to clear mutexes + // that have been left dangling by other processes. + const constexpr unsigned MAX_RETRIES = 3; + unsigned retry_counter = 0; + storage::Storage::ReturnCode code = storage::Storage::ReturnCode::Retry; + while(code == storage::Storage::ReturnCode::Retry && retry_counter < MAX_RETRIES) + { + if (retry_counter > 0) + { + util::SimpleLogger().Write(logWARNING) << "Try number " << (retry_counter+1) << " to load the dataset."; + } + code = storage.Run(max_wait); + retry_counter++; + } + + if (code == storage::Storage::ReturnCode::Ok) + { + return EXIT_SUCCESS; + } + + return EXIT_FAILURE; } catch (const std::bad_alloc &e) { diff --git a/src/tools/unlock_all_mutexes.cpp b/src/tools/unlock_all_mutexes.cpp index 343d73061..508777d61 100644 --- a/src/tools/unlock_all_mutexes.cpp +++ b/src/tools/unlock_all_mutexes.cpp @@ -7,9 +7,10 @@ int main() { osrm::util::LogPolicy::GetInstance().Unmute(); osrm::util::SimpleLogger().Write() << "Releasing all locks"; - osrm::storage::SharedBarriers barriers; - boost::interprocess::named_upgradable_mutex::remove("current_regions"); - boost::interprocess::named_sharable_mutex::remove("regions_1"); - boost::interprocess::named_sharable_mutex::remove("regions_2"); + + osrm::storage::SharedBarriers::resetCurrentRegions(); + osrm::storage::SharedBarriers::resetRegions1(); + osrm::storage::SharedBarriers::resetRegions2(); + return 0; }