Recover from stale mutexes with force unlocking and retry

This commit is contained in:
Patrick Niklaus 2016-10-12 21:26:59 +02:00 committed by Patrick Niklaus
parent caa7c994a0
commit 81c5cba0e5
6 changed files with 145 additions and 28 deletions

View File

@ -150,7 +150,7 @@ class SharedDataFacade final : public BaseDataFacade
if (!boost::filesystem::exists(file_index_path))
{
util::SimpleLogger().Write(logDEBUG) << "Leaf file name " << file_index_path.string();
throw util::exception("Could not load leaf index file. "
throw util::exception("Could not load " + file_index_path.string() +
"Is any data loaded into shared memory?");
}

View File

@ -19,11 +19,23 @@ struct SharedBarriers
{
}
static void resetCurrentRegions()
{
boost::interprocess::named_sharable_mutex::remove("current_regions");
}
static void resetRegions1()
{
boost::interprocess::named_sharable_mutex::remove("regions_1");
}
static void resetRegions2()
{
boost::interprocess::named_sharable_mutex::remove("regions_2");
}
boost::interprocess::named_upgradable_mutex current_regions_mutex;
boost::interprocess::named_sharable_mutex regions_1_mutex;
boost::interprocess::named_sharable_mutex regions_2_mutex;
};
}
}

View File

@ -42,7 +42,15 @@ class Storage
{
public:
Storage(StorageConfig config);
int Run();
enum ReturnCode
{
Ok,
Error,
Retry
};
ReturnCode Run(int max_wait);
private:
StorageConfig config;

View File

@ -26,6 +26,7 @@
#include <sys/mman.h>
#endif
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/filesystem/fstream.hpp>
#include <boost/interprocess/exceptions.hpp>
#include <boost/interprocess/sync/named_sharable_mutex.hpp>
@ -69,25 +70,33 @@ RegionsLayout getRegionsLayout(SharedBarriers &barriers)
if (SharedMemory::RegionExists(CURRENT_REGIONS))
{
auto shared_regions = makeSharedMemory(CURRENT_REGIONS);
const auto shared_timestamp = static_cast<const SharedDataTimestamp *>(shared_regions->Ptr());
const auto shared_timestamp =
static_cast<const SharedDataTimestamp *>(shared_regions->Ptr());
if (shared_timestamp->data == DATA_1)
{
BOOST_ASSERT(shared_timestamp->layout == LAYOUT_1);
return RegionsLayout {LAYOUT_1, DATA_1, barriers.regions_1_mutex, LAYOUT_2, DATA_2, barriers.regions_2_mutex};
return RegionsLayout{LAYOUT_1,
DATA_1,
barriers.regions_1_mutex,
LAYOUT_2,
DATA_2,
barriers.regions_2_mutex};
}
BOOST_ASSERT(shared_timestamp->data == DATA_2);
BOOST_ASSERT(shared_timestamp->layout == LAYOUT_2);
}
return RegionsLayout {LAYOUT_2, DATA_2, barriers.regions_2_mutex, LAYOUT_1, DATA_1, barriers.regions_1_mutex};
return RegionsLayout{
LAYOUT_2, DATA_2, barriers.regions_2_mutex, LAYOUT_1, DATA_1, barriers.regions_1_mutex};
}
int Storage::Run()
Storage::ReturnCode Storage::Run(int max_wait)
{
BOOST_ASSERT_MSG(config.IsValid(), "Invalid storage config");
util::LogPolicy::GetInstance().Unmute();
SharedBarriers barriers;
boost::interprocess::upgradable_lock<boost::interprocess::named_upgradable_mutex>
@ -97,7 +106,7 @@ int Storage::Run()
if (!current_regions_lock.try_lock())
{
util::SimpleLogger().Write(logWARNING) << "A data update is in progress";
return EXIT_FAILURE;
return ReturnCode::Error;
}
}
// hard unlock in case of any exception.
@ -121,9 +130,46 @@ int Storage::Run()
const SharedDataType layout_region = regions_layout.old_layout_region;
const SharedDataType data_region = regions_layout.old_data_region;
if (max_wait > 0)
{
util::SimpleLogger().Write() << "Waiting for " << max_wait
<< " second for all queries on the old dataset to finish:";
}
else
{
util::SimpleLogger().Write() << "Waiting for all queries on the old dataset to finish:";
boost::interprocess::scoped_lock<boost::interprocess::named_sharable_mutex>
layout_lock(regions_layout.old_regions_mutex);
}
boost::interprocess::scoped_lock<boost::interprocess::named_sharable_mutex> regions_lock(
regions_layout.old_regions_mutex, boost::interprocess::defer_lock);
if (max_wait > 0)
{
if (!regions_lock.timed_lock(boost::posix_time::microsec_clock::universal_time() +
boost::posix_time::seconds(max_wait)))
{
util::SimpleLogger().Write(logWARNING) << "Queries did not finish in " << max_wait
<< " seconds. Claiming the lock by force.";
// WARNING: if queries are still using the old dataset they might crash
if (regions_layout.old_layout_region == LAYOUT_1)
{
BOOST_ASSERT(regions_layout.old_data_region == DATA_1);
barriers.resetRegions1();
}
else
{
BOOST_ASSERT(regions_layout.old_layout_region == LAYOUT_2);
BOOST_ASSERT(regions_layout.old_data_region == DATA_2);
barriers.resetRegions2();
}
return ReturnCode::Retry;
}
}
else
{
regions_lock.lock();
}
util::SimpleLogger().Write() << "Ok.";
// since we can't change the size of a shared memory regions we delete and reallocate
@ -158,7 +204,6 @@ int Storage::Run()
shared_layout_ptr->SetBlockSize<unsigned>(SharedDataLayout::NAME_OFFSETS, name_blocks);
shared_layout_ptr->SetBlockSize<typename util::RangeTable<16, true>::BlockT>(
SharedDataLayout::NAME_BLOCKS, name_blocks);
util::SimpleLogger().Write() << "name offsets size: " << name_blocks;
BOOST_ASSERT_MSG(0 != name_blocks, "name file broken");
unsigned number_of_chars = 0;
@ -593,9 +638,6 @@ int Storage::Run()
shared_memory_ptr, SharedDataLayout::DATASOURCE_NAME_DATA);
if (shared_layout_ptr->GetBlockSize(SharedDataLayout::DATASOURCE_NAME_DATA) > 0)
{
util::SimpleLogger().Write()
<< "Copying " << (m_datasource_name_data.end() - m_datasource_name_data.begin())
<< " chars into name data ptr";
std::copy(
m_datasource_name_data.begin(), m_datasource_name_data.end(), datasource_name_data_ptr);
}
@ -750,22 +792,49 @@ int Storage::Run()
std::copy(entry_class_table.begin(), entry_class_table.end(), entry_class_ptr);
}
auto data_type_memory =
makeSharedMemory(CURRENT_REGIONS, sizeof(SharedDataTimestamp), true);
auto data_type_memory = makeSharedMemory(CURRENT_REGIONS, sizeof(SharedDataTimestamp), true);
SharedDataTimestamp *data_timestamp_ptr =
static_cast<SharedDataTimestamp *>(data_type_memory->Ptr());
{
boost::interprocess::scoped_lock<boost::interprocess::named_upgradable_mutex>
current_regions_exclusive_lock(std::move(current_regions_lock));
current_regions_exclusive_lock;
if (max_wait > 0)
{
util::SimpleLogger().Write() << "Waiting for " << max_wait << " seconds to write new dataset timestamp";
auto end_time = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::seconds(max_wait);
current_regions_exclusive_lock =
boost::interprocess::scoped_lock<boost::interprocess::named_upgradable_mutex>(
std::move(current_regions_lock), end_time);
if (!current_regions_exclusive_lock.owns())
{
util::SimpleLogger().Write(logWARNING) << "Aquiring the lock timed out after " << max_wait << " seconds. Claiming the lock by force.";
current_regions_lock.unlock();
current_regions_lock.release();
storage::SharedBarriers::resetCurrentRegions();
return ReturnCode::Retry;
}
}
else
{
util::SimpleLogger().Write() << "Waiting to write new dataset timestamp";
current_regions_exclusive_lock =
boost::interprocess::scoped_lock<boost::interprocess::named_upgradable_mutex>(
std::move(current_regions_lock));
}
util::SimpleLogger().Write() << "Ok.";
data_timestamp_ptr->layout = layout_region;
data_timestamp_ptr->data = data_region;
data_timestamp_ptr->timestamp += 1;
}
util::SimpleLogger().Write() << "all data loaded";
util::SimpleLogger().Write() << "All data loaded.";
return EXIT_SUCCESS;
return ReturnCode::Ok;
}
}
}

View File

@ -12,7 +12,8 @@ using namespace osrm;
// generate boost::program_options object for the routing part
bool generateDataStoreOptions(const int argc,
const char *argv[],
boost::filesystem::path &base_path)
boost::filesystem::path &base_path,
int &max_wait)
{
// declare a group of options that will be allowed only on command line
boost::program_options::options_description generic_options("Options");
@ -21,6 +22,9 @@ bool generateDataStoreOptions(const int argc,
// declare a group of options that will be allowed both on command line
// as well as in a config file
boost::program_options::options_description config_options("Configuration");
config_options.add_options()("max-wait",
boost::program_options::value<int>(&max_wait)->default_value(-1),
"Maximum number of seconds to wait on requests that use the old dataset.");
// hidden options, will be allowed on command line but will not be shown to the user
boost::program_options::options_description hidden_options("Hidden options");
@ -87,7 +91,8 @@ int main(const int argc, const char *argv[]) try
util::LogPolicy::GetInstance().Unmute();
boost::filesystem::path base_path;
if (!generateDataStoreOptions(argc, argv, base_path))
int max_wait = -1;
if (!generateDataStoreOptions(argc, argv, base_path, max_wait))
{
return EXIT_SUCCESS;
}
@ -98,7 +103,29 @@ int main(const int argc, const char *argv[]) try
return EXIT_FAILURE;
}
storage::Storage storage(std::move(config));
return storage.Run();
// We will attempt to load this dataset to memory several times if we encounter
// an error we can recover from. This is needed when we need to clear mutexes
// that have been left dangling by other processes.
const constexpr unsigned MAX_RETRIES = 3;
unsigned retry_counter = 0;
storage::Storage::ReturnCode code = storage::Storage::ReturnCode::Retry;
while(code == storage::Storage::ReturnCode::Retry && retry_counter < MAX_RETRIES)
{
if (retry_counter > 0)
{
util::SimpleLogger().Write(logWARNING) << "Try number " << (retry_counter+1) << " to load the dataset.";
}
code = storage.Run(max_wait);
retry_counter++;
}
if (code == storage::Storage::ReturnCode::Ok)
{
return EXIT_SUCCESS;
}
return EXIT_FAILURE;
}
catch (const std::bad_alloc &e)
{

View File

@ -7,9 +7,10 @@ int main()
{
osrm::util::LogPolicy::GetInstance().Unmute();
osrm::util::SimpleLogger().Write() << "Releasing all locks";
osrm::storage::SharedBarriers barriers;
boost::interprocess::named_upgradable_mutex::remove("current_regions");
boost::interprocess::named_sharable_mutex::remove("regions_1");
boost::interprocess::named_sharable_mutex::remove("regions_2");
osrm::storage::SharedBarriers::resetCurrentRegions();
osrm::storage::SharedBarriers::resetRegions1();
osrm::storage::SharedBarriers::resetRegions2();
return 0;
}