Restore --max-wait and file_lock for osrm-datastore

This commit is contained in:
Patrick Niklaus
2017-01-05 22:38:48 +00:00
committed by Patrick Niklaus
parent a7bb26f2d6
commit 104e23abf3
12 changed files with 234 additions and 120 deletions
+116 -75
View File
@@ -29,12 +29,9 @@
#include <sys/mman.h>
#endif
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/interprocess/exceptions.hpp>
#include <boost/interprocess/sync/named_sharable_mutex.hpp>
#include <boost/interprocess/sync/named_upgradable_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/interprocess/sync/upgradable_lock.hpp>
#include <boost/filesystem/fstream.hpp>
#include <boost/filesystem/path.hpp>
#include <boost/interprocess/sync/file_lock.hpp>
#include <cstdint>
@@ -54,86 +51,130 @@ using RTreeNode =
util::StaticRTree<RTreeLeaf, util::ShM<util::Coordinate, true>::vector, true>::TreeNode;
using QueryGraph = util::StaticGraph<contractor::QueryEdge::EdgeData>;
Storage::Storage(StorageConfig config_)
: config(std::move(config_))
, datastore_mutex(boost::interprocess::open_or_create, "osrm-datastore")
{}
Storage::Storage(StorageConfig config_) : config(std::move(config_)) {}
int Storage::Run()
int Storage::Run(int max_wait)
{
BOOST_ASSERT_MSG(config.IsValid(), "Invalid storage config");
util::LogPolicy::GetInstance().Unmute();
// Lock datastore for binary exclusive read and write access
boost::interprocess::scoped_lock<boost::interprocess::named_mutex>
datastore_lock(datastore_mutex);
SharedBarriers barriers;
boost::filesystem::path lock_path =
boost::filesystem::temp_directory_path() / "osrm-datastore.lock";
if (!boost::filesystem::exists(lock_path))
{
#ifdef __linux__
// try to disable swapping on Linux
const bool lock_flags = MCL_CURRENT | MCL_FUTURE;
if (-1 == mlockall(lock_flags))
{
util::Log(logWARNING) << "Could not request RAM lock";
}
#endif
// Get the next region ID and time stamp without locking shared barriers.
// Because of datastore_lock the only write operation can occur sequentially later.
auto next_region = REGION_1;
unsigned next_timestamp = 1;
if (SharedMemory::RegionExists(CURRENT_REGION))
{
auto shared_memory = makeSharedMemory(CURRENT_REGION);
auto current_region = static_cast<SharedDataTimestamp *>(shared_memory->Ptr());
next_region = current_region->region == REGION_1 ? REGION_2 : REGION_1;
next_timestamp = current_region->timestamp + 1;
}
// SHMCTL(2): Mark the segment to be destroyed. The segment will actually be destroyed
// only after the last process detaches it.
if (storage::SharedMemory::RegionExists(next_region))
{
storage::SharedMemory::Remove(next_region);
}
util::Log() << "Loading data into " << regionToString(next_region) << " timestamp " << next_timestamp;
// Populate a memory layout into stack memory
DataLayout layout;
PopulateLayout(layout);
// Allocate shared memory block
auto regions_size = sizeof(layout) + layout.GetSizeOfLayout();
util::Log() << "Allocating shared memory of " << regions_size << " bytes";
auto data_memory = makeSharedMemory(next_region, regions_size);
// Copy memory layout to shared memory and populate data
char *shared_memory_ptr = static_cast<char *>(data_memory->Ptr());
memcpy(shared_memory_ptr, &layout, sizeof(layout));
PopulateData(layout, shared_memory_ptr + sizeof(layout));
{ // Lock for write access shared region mutex that protects CURRENT_REGION
boost::interprocess::scoped_lock<boost::interprocess::named_mutex>
current_region_lock(barriers.region_mutex);
// Get write access to CURRENT_REGION
auto shared_memory = makeSharedMemory(CURRENT_REGION, sizeof(SharedDataTimestamp));
auto current_region = static_cast<SharedDataTimestamp *>(shared_memory->Ptr());
// Update the current region ID and timestamp
current_region->region = next_region;
current_region->timestamp = next_timestamp;
}
util::Log() << "All data loaded.";
boost::filesystem::ofstream ofs(lock_path);
}
boost::interprocess::file_lock file_lock(lock_path.c_str());
boost::interprocess::scoped_lock<boost::interprocess::file_lock> datastore_lock(
file_lock, boost::interprocess::defer_lock);
if (!datastore_lock.try_lock())
{
util::UnbufferedLog(logWARNING) << "Data update in progress, waiting until it finishes... ";
datastore_lock.lock();
util::UnbufferedLog(logWARNING) << "ok.";
}
#ifdef __linux__
// try to disable swapping on Linux
const bool lock_flags = MCL_CURRENT | MCL_FUTURE;
if (-1 == mlockall(lock_flags))
{
util::Log(logWARNING) << "Could not request RAM lock";
}
#endif
// Get the next region ID and time stamp without locking shared barriers.
// Because of datastore_lock the only write operation can occur sequentially later.
auto next_region = REGION_1;
unsigned next_timestamp = 1;
auto in_use_region = REGION_NONE;
unsigned in_use_timestamp = 0;
if (SharedMemory::RegionExists(CURRENT_REGION))
{
auto shared_memory = makeSharedMemory(CURRENT_REGION);
auto current_region = static_cast<SharedDataTimestamp *>(shared_memory->Ptr());
in_use_region = current_region->region;
in_use_timestamp = current_region->timestamp;
next_region = in_use_region == REGION_1 ? REGION_2 : REGION_1;
next_timestamp = in_use_timestamp + 1;
}
// ensure that the shared memory region we want to write to is really removed
// this is only needef for failure recovery because we actually wait for all clients
// to detach at the end of the function
if (storage::SharedMemory::RegionExists(next_region))
{
util::Log(logWARNING) << "Old shared memory region " << regionToString(next_region)
<< " still exists.";
util::UnbufferedLog() << "Retrying removal... ";
storage::SharedMemory::Remove(next_region);
util::UnbufferedLog() << "ok.";
}
util::Log() << "Loading data into " << regionToString(next_region) << " timestamp "
<< next_timestamp;
// Populate a memory layout into stack memory
DataLayout layout;
PopulateLayout(layout);
// Allocate shared memory block
auto regions_size = sizeof(layout) + layout.GetSizeOfLayout();
util::Log() << "Allocating shared memory of " << regions_size << " bytes";
auto data_memory = makeSharedMemory(next_region, regions_size);
// Copy memory layout to shared memory and populate data
char *shared_memory_ptr = static_cast<char *>(data_memory->Ptr());
memcpy(shared_memory_ptr, &layout, sizeof(layout));
PopulateData(layout, shared_memory_ptr + sizeof(layout));
SharedBarriers barriers;
{ // Lock for write access shared region mutex that protects CURRENT_REGION
RetryLock current_region_lock = barriers.getLockWithRetry(max_wait);
if (!current_region_lock.TryLock())
{
util::Log(logWARNING) << "Could not aquire current region lock after " << max_wait
<< " seconds. Claiming the lock by force.";
current_region_lock.ForceLock();
}
// Get write access to CURRENT_REGION
auto shared_memory = makeSharedMemory(CURRENT_REGION, sizeof(SharedDataTimestamp));
auto current_region = static_cast<SharedDataTimestamp *>(shared_memory->Ptr());
// Update the current region ID and timestamp
current_region->region = next_region;
current_region->timestamp = next_timestamp;
}
util::Log() << "All data loaded. Notify all client... ";
barriers.region_condition.notify_all();
// SHMCTL(2): Mark the segment to be destroyed. The segment will actually be destroyed
// only after the last process detaches it.
if (in_use_region != REGION_NONE && storage::SharedMemory::RegionExists(in_use_region))
{
util::UnbufferedLog() << "Marking old shared memory region "
<< regionToString(in_use_region) << " for removal... ";
// aquire a handle for the old shared memory region before we mark it for deletion
// we will need this to wait for all users to detach
auto in_use_shared_memory = makeSharedMemory(in_use_region);
storage::SharedMemory::Remove(in_use_region);
util::UnbufferedLog() << "ok.";
util::UnbufferedLog() << "Waiting for clients to detach... ";
in_use_shared_memory->WaitForDetach();
util::UnbufferedLog() << " ok.";
}
util::Log() << "All clients switched.";
return EXIT_SUCCESS;
}
+11 -4
View File
@@ -16,15 +16,21 @@ using namespace osrm;
// generate boost::program_options object for the routing part
bool generateDataStoreOptions(const int argc,
const char *argv[],
boost::filesystem::path &base_path)
boost::filesystem::path &base_path,
int &max_wait)
{
// declare a group of options that will be allowed only on command line
boost::program_options::options_description generic_options("Options");
generic_options.add_options()("version,v", "Show version")("help,h", "Show this help message")("remove-locks,r", "Remove locks");
generic_options.add_options()("version,v", "Show version")("help,h", "Show this help message")(
"remove-locks,r", "Remove locks");
// declare a group of options that will be allowed both on command line
// as well as in a config file
boost::program_options::options_description config_options("Configuration");
config_options.add_options()("max-wait",
boost::program_options::value<int>(&max_wait)->default_value(-1),
"Maximum number of seconds to wait on a running data update "
"before aquiring the lock by force.");
// hidden options, will be allowed on command line but will not be shown to the user
boost::program_options::options_description hidden_options("Hidden options");
@@ -110,7 +116,8 @@ int main(const int argc, const char *argv[]) try
util::LogPolicy::GetInstance().Unmute();
boost::filesystem::path base_path;
if (!generateDataStoreOptions(argc, argv, base_path))
int max_wait = -1;
if (!generateDataStoreOptions(argc, argv, base_path, max_wait))
{
return EXIT_SUCCESS;
}
@@ -122,7 +129,7 @@ int main(const int argc, const char *argv[]) try
}
storage::Storage storage(std::move(config));
return storage.Run();
return storage.Run(max_wait);
}
catch (const std::bad_alloc &e)
{
+1 -1
View File
@@ -1,5 +1,5 @@
#include "util/coordinate.hpp"
#include "util/coordinate_calculation.hpp"
#include "util/coordinate.hpp"
#include "util/trigonometry_table.hpp"
#include "util/web_mercator.hpp"