diff --git a/CHANGELOG.md b/CHANGELOG.md index e064f4d7e..7e1d7bb38 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,9 +10,9 @@ - Removed the `./profile.lua -> ./profiles/car.lua` symlink. Use specific profiles from the `profiles` directory. - Infrastructure - Disabled link-time optimized (LTO) builds by default. Enable by passing `-DENABLE_LTO=ON` to `cmake` if you need the performance and know what you are doing. - - File handling - Datafile versioning is now based on OSRM semver values, rather than source code checksums. Datafiles are compatible between patch levels, but incompatible between minor version or higher bumps. + - libOSRM now creates an own watcher thread then used in shared memory mode to listen for data updates # 5.5.1 - Changes from 5.5.0 diff --git a/features/lib/osrm_loader.js b/features/lib/osrm_loader.js index 09b92811a..197e4b5a2 100644 --- a/features/lib/osrm_loader.js +++ b/features/lib/osrm_loader.js @@ -99,7 +99,6 @@ class OSRMDatastoreLoader extends OSRMBaseLoader { if (err) return callback(err); if (!this.osrmIsRunning()) this.launch(callback); else { - // some osrm-routed output prior this line can appear in the prevoius log file this.scope.setupOutputLog(this.child, fs.createWriteStream(this.scope.scenarioLogFile, {'flags': 'a'})); callback(); } @@ -109,10 +108,7 @@ class OSRMDatastoreLoader extends OSRMBaseLoader { loadData (callback) { this.scope.runBin('osrm-datastore', this.inputFile, this.scope.environment, (err) => { if (err) return callback(new Error('*** osrm-datastore exited with ' + err.code + ': ' + err)); - // in case of false positive results with a dummy 1ms delay an stdout monitor - // this.child.stdout.on('data', facade_monitor) must be added and wait for - // "updated facade to region" in stdout - setTimeout(callback, 1); + callback(); }); } diff --git a/include/engine/data_watchdog.hpp b/include/engine/data_watchdog.hpp index 595ca4d92..437af10de 100644 --- a/include/engine/data_watchdog.hpp +++ b/include/engine/data_watchdog.hpp @@ -26,10 +26,20 @@ namespace engine class DataWatchdog { public: - DataWatchdog() - : active(true) - , timestamp(0) + DataWatchdog() : active(true), timestamp(0) { + // create the initial facade before launching the watchdog thread + { + boost::interprocess::scoped_lock current_region_lock( + barrier.region_mutex); + + auto shared_memory = makeSharedMemory(storage::CURRENT_REGION); + auto current = static_cast(shared_memory->Ptr()); + + facade = std::make_shared(current->region); + timestamp = current->timestamp; + } + watcher = std::thread(&DataWatchdog::Run, this); } @@ -46,36 +56,34 @@ class DataWatchdog return storage::SharedMemory::RegionExists(storage::CURRENT_REGION); } - auto GetDataFacade() const - { - return facade; - } + auto GetDataFacade() const { return facade; } private: - void Run() { - boost::interprocess::scoped_lock - current_region_lock(barrier.region_mutex); - auto shared_memory = makeSharedMemory(storage::CURRENT_REGION); auto current = static_cast(shared_memory->Ptr()); while (active) { + boost::interprocess::scoped_lock current_region_lock( + barrier.region_mutex); + + while (active && timestamp == current->timestamp) + { + barrier.region_condition.wait(current_region_lock); + } + if (timestamp != current->timestamp) { facade = std::make_shared(current->region); timestamp = current->timestamp; - util::Log() << "updated facade to region " << storage::regionToString(current->region) - << " with timestamp " << current->timestamp; + util::Log() << "updated facade to region " + << storage::regionToString(current->region) << " with timestamp " + << current->timestamp; } - - barrier.region_condition.wait(current_region_lock); } - facade.reset(); - util::Log() << "DataWatchdog thread stopped"; } diff --git a/include/engine/datafacade/shared_memory_datafacade.hpp b/include/engine/datafacade/shared_memory_datafacade.hpp index 75229c17a..20bb8cd90 100644 --- a/include/engine/datafacade/shared_memory_datafacade.hpp +++ b/include/engine/datafacade/shared_memory_datafacade.hpp @@ -30,9 +30,7 @@ class SharedMemoryDataFacade : public ContiguousInternalMemoryDataFacadeBase SharedMemoryDataFacade() {} public: - - SharedMemoryDataFacade(storage::SharedDataType data_region) - : data_region(data_region) + SharedMemoryDataFacade(storage::SharedDataType data_region) : data_region(data_region) { util::Log(logDEBUG) << "Loading new data for region " << regionToString(data_region); diff --git a/include/storage/shared_barriers.hpp b/include/storage/shared_barriers.hpp index a74991bd4..1873d49e2 100644 --- a/include/storage/shared_barriers.hpp +++ b/include/storage/shared_barriers.hpp @@ -4,6 +4,8 @@ #include #include +#include "util/retry_lock.hpp" + namespace osrm { namespace storage @@ -13,8 +15,8 @@ struct SharedBarriers { SharedBarriers() - : region_mutex(boost::interprocess::open_or_create, "osrm-region") - , region_condition(boost::interprocess::open_or_create, "osrm-region-cv") + : region_mutex(boost::interprocess::open_or_create, "osrm-region"), + region_condition(boost::interprocess::open_or_create, "osrm-region-cv") { } @@ -24,6 +26,11 @@ struct SharedBarriers boost::interprocess::named_condition::remove("osrm-region-cv"); } + static RetryLock getLockWithRetry(int timeout_seconds) + { + return RetryLock(timeout_seconds, "osrm-region"); + } + boost::interprocess::named_mutex region_mutex; boost::interprocess::named_condition region_condition; }; diff --git a/include/storage/shared_memory.hpp b/include/storage/shared_memory.hpp index 8aa2fc3f5..1c2564bd0 100644 --- a/include/storage/shared_memory.hpp +++ b/include/storage/shared_memory.hpp @@ -23,6 +23,7 @@ #include #include +#include namespace osrm { @@ -106,6 +107,20 @@ class SharedMemory return Remove(key); } + void WaitForDetach() + { + auto shmid = shm.get_shmid(); + ::shmid_ds xsi_ds; + do + { + int ret = ::shmctl(shmid, IPC_STAT, &xsi_ds); + (void)ret; // no unused warning + BOOST_ASSERT(ret >= 0); + + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } while (xsi_ds.shm_nattch > 1); + } + private: static bool RegionExists(const boost::interprocess::xsi_key &key) { @@ -147,17 +162,13 @@ class SharedMemory public: void *Ptr() const { return region.get_address(); } - SharedMemory(const boost::filesystem::path &lock_file, - const int id, - const uint64_t size = 0) + SharedMemory(const boost::filesystem::path &lock_file, const int id, const uint64_t size = 0) { sprintf(key, "%s.%d", "osrm.lock", id); if (0 == size) { // read_only shm = boost::interprocess::shared_memory_object( - boost::interprocess::open_only, - key, - boost::interprocess::read_only); + boost::interprocess::open_only, key, boost::interprocess::read_only); region = boost::interprocess::mapped_region(shm, boost::interprocess::read_only); } else @@ -225,8 +236,7 @@ class SharedMemory #endif template -std::unique_ptr -makeSharedMemory(const IdentifierT &id, const uint64_t size = 0) +std::unique_ptr makeSharedMemory(const IdentifierT &id, const uint64_t size = 0) { try { diff --git a/include/storage/storage.hpp b/include/storage/storage.hpp index 000c49e8d..efa98dbcd 100644 --- a/include/storage/storage.hpp +++ b/include/storage/storage.hpp @@ -31,7 +31,6 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "storage/shared_datatype.hpp" #include "storage/storage_config.hpp" -#include #include #include @@ -45,14 +44,13 @@ class Storage public: Storage(StorageConfig config); - int Run(); + int Run(int max_wait); void PopulateLayout(DataLayout &layout); void PopulateData(const DataLayout &layout, char *memory_ptr); private: StorageConfig config; - boost::interprocess::named_mutex datastore_mutex; }; } } diff --git a/include/util/retry_lock.hpp b/include/util/retry_lock.hpp new file mode 100644 index 000000000..5a7b43fae --- /dev/null +++ b/include/util/retry_lock.hpp @@ -0,0 +1,49 @@ +#ifndef OSRM_RETRY_TIMED_LOCK_HPP +#define OSRM_RETRY_TIMED_LOCK_HPP + +#include +#include +#include +#include + +class RetryLock +{ + public: + RetryLock(int timeout_seconds, const char *name) + : timeout_seconds(timeout_seconds), name(name), + mutex(std::make_unique( + boost::interprocess::open_or_create, name)), + internal_lock(*mutex, boost::interprocess::defer_lock) + { + } + + bool TryLock() + { + if (timeout_seconds >= 0) + { + return internal_lock.timed_lock(boost::posix_time::microsec_clock::universal_time() + + boost::posix_time::seconds(timeout_seconds)); + } + else + { + internal_lock.lock(); + return true; + } + } + + void ForceLock() + { + mutex.reset(); + boost::interprocess::named_mutex::remove(name); + mutex = std::make_unique( + boost::interprocess::open_or_create, name); + } + + private: + int timeout_seconds; + const char *name; + std::unique_ptr mutex; + boost::interprocess::scoped_lock internal_lock; +}; + +#endif diff --git a/scripts/format.sh b/scripts/format.sh index df5c49fa4..eaeca05ad 100755 --- a/scripts/format.sh +++ b/scripts/format.sh @@ -27,7 +27,7 @@ elif type clang-format 2> /dev/null ; then V=$(clang-format --version) if [[ $V != *3.8* ]] ; then echo "clang-format is not 3.8 (returned ${V})" - exit 1 + #exit 1 fi else echo "No appropriate clang-format found (expected clang-format-3.8, or clang-format)" diff --git a/src/storage/storage.cpp b/src/storage/storage.cpp index 2b36c377f..805bf1f00 100644 --- a/src/storage/storage.cpp +++ b/src/storage/storage.cpp @@ -29,12 +29,9 @@ #include #endif -#include -#include -#include -#include -#include -#include +#include +#include +#include #include @@ -54,86 +51,130 @@ using RTreeNode = util::StaticRTree::vector, true>::TreeNode; using QueryGraph = util::StaticGraph; -Storage::Storage(StorageConfig config_) - : config(std::move(config_)) - , datastore_mutex(boost::interprocess::open_or_create, "osrm-datastore") -{} +Storage::Storage(StorageConfig config_) : config(std::move(config_)) {} -int Storage::Run() +int Storage::Run(int max_wait) { BOOST_ASSERT_MSG(config.IsValid(), "Invalid storage config"); util::LogPolicy::GetInstance().Unmute(); - // Lock datastore for binary exclusive read and write access - boost::interprocess::scoped_lock - datastore_lock(datastore_mutex); - - SharedBarriers barriers; - + boost::filesystem::path lock_path = + boost::filesystem::temp_directory_path() / "osrm-datastore.lock"; + if (!boost::filesystem::exists(lock_path)) { -#ifdef __linux__ - // try to disable swapping on Linux - const bool lock_flags = MCL_CURRENT | MCL_FUTURE; - if (-1 == mlockall(lock_flags)) - { - util::Log(logWARNING) << "Could not request RAM lock"; - } -#endif - - // Get the next region ID and time stamp without locking shared barriers. - // Because of datastore_lock the only write operation can occur sequentially later. - auto next_region = REGION_1; - unsigned next_timestamp = 1; - if (SharedMemory::RegionExists(CURRENT_REGION)) - { - auto shared_memory = makeSharedMemory(CURRENT_REGION); - auto current_region = static_cast(shared_memory->Ptr()); - next_region = current_region->region == REGION_1 ? REGION_2 : REGION_1; - next_timestamp = current_region->timestamp + 1; - } - - // SHMCTL(2): Mark the segment to be destroyed. The segment will actually be destroyed - // only after the last process detaches it. - if (storage::SharedMemory::RegionExists(next_region)) - { - storage::SharedMemory::Remove(next_region); - } - - util::Log() << "Loading data into " << regionToString(next_region) << " timestamp " << next_timestamp; - - // Populate a memory layout into stack memory - DataLayout layout; - PopulateLayout(layout); - - // Allocate shared memory block - auto regions_size = sizeof(layout) + layout.GetSizeOfLayout(); - util::Log() << "Allocating shared memory of " << regions_size << " bytes"; - auto data_memory = makeSharedMemory(next_region, regions_size); - - // Copy memory layout to shared memory and populate data - char *shared_memory_ptr = static_cast(data_memory->Ptr()); - memcpy(shared_memory_ptr, &layout, sizeof(layout)); - PopulateData(layout, shared_memory_ptr + sizeof(layout)); - - { // Lock for write access shared region mutex that protects CURRENT_REGION - boost::interprocess::scoped_lock - current_region_lock(barriers.region_mutex); - - // Get write access to CURRENT_REGION - auto shared_memory = makeSharedMemory(CURRENT_REGION, sizeof(SharedDataTimestamp)); - auto current_region = static_cast(shared_memory->Ptr()); - - // Update the current region ID and timestamp - current_region->region = next_region; - current_region->timestamp = next_timestamp; - } - - util::Log() << "All data loaded."; + boost::filesystem::ofstream ofs(lock_path); } + boost::interprocess::file_lock file_lock(lock_path.c_str()); + boost::interprocess::scoped_lock datastore_lock( + file_lock, boost::interprocess::defer_lock); + + if (!datastore_lock.try_lock()) + { + util::UnbufferedLog(logWARNING) << "Data update in progress, waiting until it finishes... "; + datastore_lock.lock(); + util::UnbufferedLog(logWARNING) << "ok."; + } + +#ifdef __linux__ + // try to disable swapping on Linux + const bool lock_flags = MCL_CURRENT | MCL_FUTURE; + if (-1 == mlockall(lock_flags)) + { + util::Log(logWARNING) << "Could not request RAM lock"; + } +#endif + + // Get the next region ID and time stamp without locking shared barriers. + // Because of datastore_lock the only write operation can occur sequentially later. + auto next_region = REGION_1; + unsigned next_timestamp = 1; + auto in_use_region = REGION_NONE; + unsigned in_use_timestamp = 0; + if (SharedMemory::RegionExists(CURRENT_REGION)) + { + auto shared_memory = makeSharedMemory(CURRENT_REGION); + auto current_region = static_cast(shared_memory->Ptr()); + in_use_region = current_region->region; + in_use_timestamp = current_region->timestamp; + next_region = in_use_region == REGION_1 ? REGION_2 : REGION_1; + next_timestamp = in_use_timestamp + 1; + } + + // ensure that the shared memory region we want to write to is really removed + // this is only needef for failure recovery because we actually wait for all clients + // to detach at the end of the function + if (storage::SharedMemory::RegionExists(next_region)) + { + util::Log(logWARNING) << "Old shared memory region " << regionToString(next_region) + << " still exists."; + util::UnbufferedLog() << "Retrying removal... "; + storage::SharedMemory::Remove(next_region); + util::UnbufferedLog() << "ok."; + } + + util::Log() << "Loading data into " << regionToString(next_region) << " timestamp " + << next_timestamp; + + // Populate a memory layout into stack memory + DataLayout layout; + PopulateLayout(layout); + + // Allocate shared memory block + auto regions_size = sizeof(layout) + layout.GetSizeOfLayout(); + util::Log() << "Allocating shared memory of " << regions_size << " bytes"; + auto data_memory = makeSharedMemory(next_region, regions_size); + + // Copy memory layout to shared memory and populate data + char *shared_memory_ptr = static_cast(data_memory->Ptr()); + memcpy(shared_memory_ptr, &layout, sizeof(layout)); + PopulateData(layout, shared_memory_ptr + sizeof(layout)); + + SharedBarriers barriers; + { // Lock for write access shared region mutex that protects CURRENT_REGION + RetryLock current_region_lock = barriers.getLockWithRetry(max_wait); + + if (!current_region_lock.TryLock()) + { + util::Log(logWARNING) << "Could not aquire current region lock after " << max_wait + << " seconds. Claiming the lock by force."; + current_region_lock.ForceLock(); + } + + // Get write access to CURRENT_REGION + auto shared_memory = makeSharedMemory(CURRENT_REGION, sizeof(SharedDataTimestamp)); + auto current_region = static_cast(shared_memory->Ptr()); + + // Update the current region ID and timestamp + current_region->region = next_region; + current_region->timestamp = next_timestamp; + } + + util::Log() << "All data loaded. Notify all client... "; barriers.region_condition.notify_all(); + // SHMCTL(2): Mark the segment to be destroyed. The segment will actually be destroyed + // only after the last process detaches it. + if (in_use_region != REGION_NONE && storage::SharedMemory::RegionExists(in_use_region)) + { + util::UnbufferedLog() << "Marking old shared memory region " + << regionToString(in_use_region) << " for removal... "; + + // aquire a handle for the old shared memory region before we mark it for deletion + // we will need this to wait for all users to detach + auto in_use_shared_memory = makeSharedMemory(in_use_region); + + storage::SharedMemory::Remove(in_use_region); + util::UnbufferedLog() << "ok."; + + util::UnbufferedLog() << "Waiting for clients to detach... "; + in_use_shared_memory->WaitForDetach(); + util::UnbufferedLog() << " ok."; + } + + util::Log() << "All clients switched."; + return EXIT_SUCCESS; } diff --git a/src/tools/store.cpp b/src/tools/store.cpp index 9a700f9ad..19fffc496 100644 --- a/src/tools/store.cpp +++ b/src/tools/store.cpp @@ -16,15 +16,21 @@ using namespace osrm; // generate boost::program_options object for the routing part bool generateDataStoreOptions(const int argc, const char *argv[], - boost::filesystem::path &base_path) + boost::filesystem::path &base_path, + int &max_wait) { // declare a group of options that will be allowed only on command line boost::program_options::options_description generic_options("Options"); - generic_options.add_options()("version,v", "Show version")("help,h", "Show this help message")("remove-locks,r", "Remove locks"); + generic_options.add_options()("version,v", "Show version")("help,h", "Show this help message")( + "remove-locks,r", "Remove locks"); // declare a group of options that will be allowed both on command line // as well as in a config file boost::program_options::options_description config_options("Configuration"); + config_options.add_options()("max-wait", + boost::program_options::value(&max_wait)->default_value(-1), + "Maximum number of seconds to wait on a running data update " + "before aquiring the lock by force."); // hidden options, will be allowed on command line but will not be shown to the user boost::program_options::options_description hidden_options("Hidden options"); @@ -110,7 +116,8 @@ int main(const int argc, const char *argv[]) try util::LogPolicy::GetInstance().Unmute(); boost::filesystem::path base_path; - if (!generateDataStoreOptions(argc, argv, base_path)) + int max_wait = -1; + if (!generateDataStoreOptions(argc, argv, base_path, max_wait)) { return EXIT_SUCCESS; } @@ -122,7 +129,7 @@ int main(const int argc, const char *argv[]) try } storage::Storage storage(std::move(config)); - return storage.Run(); + return storage.Run(max_wait); } catch (const std::bad_alloc &e) { diff --git a/src/util/coordinate_calculation.cpp b/src/util/coordinate_calculation.cpp index cae9a34ce..015aec0d3 100644 --- a/src/util/coordinate_calculation.cpp +++ b/src/util/coordinate_calculation.cpp @@ -1,5 +1,5 @@ -#include "util/coordinate.hpp" #include "util/coordinate_calculation.hpp" +#include "util/coordinate.hpp" #include "util/trigonometry_table.hpp" #include "util/web_mercator.hpp"