Change to condvar signaling if data region swapped

This commit is contained in:
Michael Krasnyk
2016-12-26 01:00:37 +01:00
committed by Patrick Niklaus
parent 774b8688ca
commit fce8d72895
10 changed files with 143 additions and 362 deletions
+3 -5
View File
@@ -35,9 +35,9 @@ RunQuery(const std::unique_ptr<osrm::engine::DataWatchdog> &watchdog,
if (watchdog)
{
BOOST_ASSERT(!facade);
auto lock_and_facade = watchdog->GetDataFacade();
auto facade = watchdog->GetDataFacade();
return plugin.HandleRequest(lock_and_facade.second, parameters, result);
return plugin.HandleRequest(facade, parameters, result);
}
BOOST_ASSERT(facade);
@@ -53,9 +53,7 @@ namespace engine
{
Engine::Engine(const EngineConfig &config)
: lock(config.use_shared_memory ? std::make_unique<storage::SharedBarriers>()
: std::unique_ptr<storage::SharedBarriers>()),
route_plugin(config.max_locations_viaroute), //
: route_plugin(config.max_locations_viaroute), //
table_plugin(config.max_locations_distance_table), //
nearest_plugin(config.max_results_nearest), //
trip_plugin(config.max_locations_trip), //
+63 -155
View File
@@ -54,177 +54,85 @@ using RTreeNode =
util::StaticRTree<RTreeLeaf, util::ShM<util::Coordinate, true>::vector, true>::TreeNode;
using QueryGraph = util::StaticGraph<contractor::QueryEdge::EdgeData>;
Storage::Storage(StorageConfig config_) : config(std::move(config_)) {}
Storage::Storage(StorageConfig config_)
: config(std::move(config_))
, datastore_mutex(boost::interprocess::open_or_create, "osrm-datastore")
{}
struct RegionsLayout
{
SharedDataType current_data_region;
boost::interprocess::named_sharable_mutex &current_region_mutex;
SharedDataType old_data_region;
boost::interprocess::named_sharable_mutex &old_region_mutex;
};
RegionsLayout getRegionsLayout(SharedBarriers &barriers)
{
if (SharedMemory::RegionExists(CURRENT_REGION))
{
auto shared_region = makeSharedMemory(CURRENT_REGION);
const auto shared_timestamp =
static_cast<const SharedDataTimestamp *>(shared_region->Ptr());
if (shared_timestamp->region == REGION_1)
{
return RegionsLayout{
REGION_1, barriers.region_1_mutex, REGION_2, barriers.region_2_mutex};
}
BOOST_ASSERT(shared_timestamp->region == REGION_2);
}
return RegionsLayout{REGION_2, barriers.region_2_mutex, REGION_1, barriers.region_1_mutex};
}
Storage::ReturnCode Storage::Run(int max_wait)
int Storage::Run()
{
BOOST_ASSERT_MSG(config.IsValid(), "Invalid storage config");
util::LogPolicy::GetInstance().Unmute();
// Lock datastore for binary exclusive read and write access
boost::interprocess::scoped_lock<boost::interprocess::named_mutex>
datastore_lock(datastore_mutex);
SharedBarriers barriers;
boost::interprocess::upgradable_lock<boost::interprocess::named_upgradable_mutex>
current_region_lock(barriers.current_region_mutex, boost::interprocess::defer_lock);
try
{
if (!current_region_lock.try_lock())
{
util::Log(logWARNING) << "A data update is in progress";
return ReturnCode::Error;
}
}
// hard unlock in case of any exception.
catch (boost::interprocess::lock_exception &)
{
barriers.current_region_mutex.unlock_upgradable();
// make sure we exit here because this is bad
throw;
}
#ifdef __linux__
// try to disable swapping on Linux
const bool lock_flags = MCL_CURRENT | MCL_FUTURE;
if (-1 == mlockall(lock_flags))
{
util::Log(logWARNING) << "Could not request RAM lock";
}
// try to disable swapping on Linux
const bool lock_flags = MCL_CURRENT | MCL_FUTURE;
if (-1 == mlockall(lock_flags))
{
util::Log(logWARNING) << "Could not request RAM lock";
}
#endif
auto regions_layout = getRegionsLayout(barriers);
const SharedDataType data_region = regions_layout.old_data_region;
if (max_wait > 0)
{
util::Log() << "Waiting for " << max_wait
<< " second for all queries on the old dataset to finish:";
}
else
{
util::Log() << "Waiting for all queries on the old dataset to finish:";
}
boost::interprocess::scoped_lock<boost::interprocess::named_sharable_mutex> regions_lock(
regions_layout.old_region_mutex, boost::interprocess::defer_lock);
if (max_wait > 0)
{
if (!regions_lock.timed_lock(boost::posix_time::microsec_clock::universal_time() +
boost::posix_time::seconds(max_wait)))
// Get the next region ID and time stamp without locking shared barriers.
// Because of datastore_lock the only write operation can occur sequentially later.
auto next_region = REGION_1;
unsigned next_timestamp = 1;
if (SharedMemory::RegionExists(CURRENT_REGION))
{
util::Log(logWARNING) << "Queries did not finish in " << max_wait
<< " seconds. Claiming the lock by force.";
// WARNING: if queries are still using the old dataset they might crash
if (regions_layout.old_data_region == REGION_1)
{
barriers.resetRegion1();
}
else
{
BOOST_ASSERT(regions_layout.old_data_region == REGION_2);
barriers.resetRegion2();
}
return ReturnCode::Retry;
}
}
else
{
regions_lock.lock();
}
util::Log() << "Ok.";
// since we can't change the size of a shared memory regions we delete and reallocate
if (SharedMemory::RegionExists(data_region) && !SharedMemory::Remove(data_region))
{
throw util::exception("Could not remove shared memory region " +
regionToString(data_region) + SOURCE_REF);
}
// Populate a memory layout into stack memory
DataLayout layout;
PopulateLayout(layout);
// Allocate shared memory block
auto regions_size = sizeof(layout) + layout.GetSizeOfLayout();
util::Log() << "allocating shared memory of " << regions_size << " bytes";
auto shared_memory = makeSharedMemory(data_region, regions_size);
// Copy memory layout to shared memory and populate data
char *shared_memory_ptr = static_cast<char *>(shared_memory->Ptr());
memcpy(shared_memory_ptr, &layout, sizeof(layout));
PopulateData(layout, shared_memory_ptr + sizeof(layout));
auto data_type_memory = makeSharedMemory(CURRENT_REGION, sizeof(SharedDataTimestamp));
SharedDataTimestamp *data_timestamp_ptr =
static_cast<SharedDataTimestamp *>(data_type_memory->Ptr());
{
boost::interprocess::scoped_lock<boost::interprocess::named_upgradable_mutex>
current_region_exclusive_lock;
if (max_wait > 0)
{
util::Log() << "Waiting for " << max_wait << " seconds to write new dataset timestamp";
auto end_time = boost::posix_time::microsec_clock::universal_time() +
boost::posix_time::seconds(max_wait);
current_region_exclusive_lock =
boost::interprocess::scoped_lock<boost::interprocess::named_upgradable_mutex>(
std::move(current_region_lock), end_time);
if (!current_region_exclusive_lock.owns())
{
util::Log(logWARNING) << "Aquiring the lock timed out after " << max_wait
<< " seconds. Claiming the lock by force.";
current_region_lock.unlock();
current_region_lock.release();
storage::SharedBarriers::resetCurrentRegion();
return ReturnCode::Retry;
}
}
else
{
util::Log() << "Waiting to write new dataset timestamp";
current_region_exclusive_lock =
boost::interprocess::scoped_lock<boost::interprocess::named_upgradable_mutex>(
std::move(current_region_lock));
auto shared_memory = makeSharedMemory(CURRENT_REGION);
auto current_region = static_cast<SharedDataTimestamp *>(shared_memory->Ptr());
next_region = current_region->region == REGION_1 ? REGION_2 : REGION_1;
next_timestamp = current_region->timestamp + 1;
}
util::Log() << "Ok.";
data_timestamp_ptr->region = data_region;
data_timestamp_ptr->timestamp += 1;
}
util::Log() << "All data loaded.";
// SHMCTL(2): Mark the segment to be destroyed. The segment will actually be destroyed
// only after the last process detaches it.
if (storage::SharedMemory::RegionExists(next_region))
{
storage::SharedMemory::Remove(next_region);
}
return ReturnCode::Ok;
// Populate a memory layout into stack memory
DataLayout layout;
PopulateLayout(layout);
// Allocate shared memory block
auto regions_size = sizeof(layout) + layout.GetSizeOfLayout();
util::Log() << "Allocating shared memory of " << regions_size << " bytes";
auto data_memory = makeSharedMemory(next_region, regions_size);
// Copy memory layout to shared memory and populate data
char *shared_memory_ptr = static_cast<char *>(data_memory->Ptr());
memcpy(shared_memory_ptr, &layout, sizeof(layout));
PopulateData(layout, shared_memory_ptr + sizeof(layout));
{ // Lock for write access shared region mutex that protects CURRENT_REGION
boost::interprocess::scoped_lock<boost::interprocess::named_mutex>
current_region_lock(barriers.region_mutex);
// Get write access to CURRENT_REGION
auto shared_memory = makeSharedMemory(CURRENT_REGION, sizeof(SharedDataTimestamp));
auto current_region = static_cast<SharedDataTimestamp *>(shared_memory->Ptr());
// Update the current region ID and timestamp
current_region->region = next_region;
current_region->timestamp = next_timestamp;
}
util::Log() << "All data loaded.";
}
barriers.region_condition.notify_all();
return EXIT_SUCCESS;
}
/**
+12 -33
View File
@@ -16,20 +16,15 @@ using namespace osrm;
// generate boost::program_options object for the routing part
bool generateDataStoreOptions(const int argc,
const char *argv[],
boost::filesystem::path &base_path,
int &max_wait)
boost::filesystem::path &base_path)
{
// declare a group of options that will be allowed only on command line
boost::program_options::options_description generic_options("Options");
generic_options.add_options()("version,v", "Show version")("help,h", "Show this help message");
generic_options.add_options()("version,v", "Show version")("help,h", "Show this help message")("remove-locks,r", "Remove locks");
// declare a group of options that will be allowed both on command line
// as well as in a config file
boost::program_options::options_description config_options("Configuration");
config_options.add_options()(
"max-wait",
boost::program_options::value<int>(&max_wait)->default_value(-1),
"Maximum number of seconds to wait on requests that use the old dataset.");
// hidden options, will be allowed on command line but will not be shown to the user
boost::program_options::options_description hidden_options("Hidden options");
@@ -86,6 +81,13 @@ bool generateDataStoreOptions(const int argc,
return false;
}
if (option_variables.count("remove-locks"))
{
osrm::storage::SharedBarriers::remove();
boost::interprocess::named_mutex::remove("osrm-datastore");
return false;
}
boost::program_options::notify(option_variables);
return true;
@@ -93,7 +95,7 @@ bool generateDataStoreOptions(const int argc,
[[noreturn]] void CleanupSharedBarriers(int signum)
{ // Here the lock state of named mutexes is unknown, make a hard cleanup
osrm::storage::SharedBarriers::resetCurrentRegion();
osrm::storage::SharedBarriers::remove();
std::_Exit(128 + signum);
}
@@ -108,8 +110,7 @@ int main(const int argc, const char *argv[]) try
util::LogPolicy::GetInstance().Unmute();
boost::filesystem::path base_path;
int max_wait = -1;
if (!generateDataStoreOptions(argc, argv, base_path, max_wait))
if (!generateDataStoreOptions(argc, argv, base_path))
{
return EXIT_SUCCESS;
}
@@ -121,29 +122,7 @@ int main(const int argc, const char *argv[]) try
}
storage::Storage storage(std::move(config));
// We will attempt to load this dataset to memory several times if we encounter
// an error we can recover from. This is needed when we need to clear mutexes
// that have been left dangling by other processes.
const constexpr unsigned MAX_RETRIES = 3;
unsigned retry_counter = 0;
storage::Storage::ReturnCode code = storage::Storage::ReturnCode::Retry;
while (code == storage::Storage::ReturnCode::Retry && retry_counter < MAX_RETRIES)
{
if (retry_counter > 0)
{
util::Log(logWARNING) << "Try number " << (retry_counter + 1)
<< " to load the dataset.";
}
code = storage.Run(max_wait);
retry_counter++;
}
if (code == storage::Storage::ReturnCode::Ok)
{
return EXIT_SUCCESS;
}
return EXIT_FAILURE;
return storage.Run();
}
catch (const std::bad_alloc &e)
{
+1 -3
View File
@@ -8,9 +8,7 @@ int main()
osrm::util::LogPolicy::GetInstance().Unmute();
osrm::util::Log() << "Releasing all locks";
osrm::storage::SharedBarriers::resetCurrentRegion();
osrm::storage::SharedBarriers::resetRegion1();
osrm::storage::SharedBarriers::resetRegion2();
osrm::storage::SharedBarriers::remove();
return 0;
}