From 82acd59a8e6c23813ad72747b74ddf715f0ea5ee Mon Sep 17 00:00:00 2001 From: Michael Krasnyk Date: Fri, 3 Feb 2017 13:39:44 +0100 Subject: [PATCH] shared monitor implementation with a conditional variable or a ring buffer with semaphores --- CHANGELOG.md | 5 + features/step_definitions/options.js | 8 +- features/testbot/load.feature | 16 ++ features/testbot/via.feature | 21 ++- include/engine/data_watchdog.hpp | 34 ++-- include/storage/shared_barrier.hpp | 131 --------------- include/storage/shared_datatype.hpp | 7 + include/storage/shared_monitor.hpp | 229 +++++++++++++++++++++++++++ src/storage/storage.cpp | 25 +-- src/tools/store.cpp | 4 +- 10 files changed, 310 insertions(+), 170 deletions(-) delete mode 100644 include/storage/shared_barrier.hpp create mode 100644 include/storage/shared_monitor.hpp diff --git a/CHANGELOG.md b/CHANGELOG.md index cdb1a3b31..2cbaf2078 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# 5.7.0 + - Changes from 5.6 + - Internals + - Shared memory notification via conditional variables on Linux or semaphore queue on OS X and Windows with a limit of 128 OSRM Engine instances + # 5.6.0 - Changes from 5.5 - Bugfixes diff --git a/features/step_definitions/options.js b/features/step_definitions/options.js index 47273aa16..7f23c1101 100644 --- a/features/step_definitions/options.js +++ b/features/step_definitions/options.js @@ -12,7 +12,7 @@ module.exports = function () { }; this.runAndSafeOutput = (binary, options, callback) => { - this.runBin(binary, this.expandOptions(options), this.environment, (err, stdout, stderr) => { + return this.runBin(binary, this.expandOptions(options), this.environment, (err, stdout, stderr) => { this.stdout = stdout; this.stderr = stderr; this.exitCode = err && err.code || 0; @@ -53,8 +53,10 @@ module.exports = function () { this.runAndSafeOutput('osrm-contract', options, () => { callback(); }); }); - this.When(/^I run "osrm\-datastore\s?(.*?)"$/, (options, callback) => { - this.runAndSafeOutput('osrm-datastore', options, callback); + this.When(/^I run "osrm\-datastore\s?(.*?)"(?: with input "([^"]*)")?$/, (options, input, callback) => { + let child = this.runAndSafeOutput('osrm-datastore', options, callback); + if (input !== undefined) + child.stdin.write(input); }); this.Then(/^it should exit successfully$/, () => { diff --git a/features/testbot/load.feature b/features/testbot/load.feature index 0c472ae95..f195e8b35 100644 --- a/features/testbot/load.feature +++ b/features/testbot/load.feature @@ -22,6 +22,22 @@ Feature: Ways of loading data | a | b | ab,ab | | b | a | ab,ab | + Scenario: osrm-datastore - Remove shared control block + When I run "osrm-datastore --remove-locks" + Then stderr should be empty + And it should exit successfully + + Scenario: osrm-datastore - Remove shared memory blocks + When I run "osrm-datastore --spring-clean" with input "Y" + Then stderr should be empty + And it should exit successfully + + Scenario: osrm-datastore - Fail if no shared memory blocks are loaded + When I run "osrm-datastore --spring-clean" with input "Y" + And I try to run "osrm-routed --shared-memory=1" + Then stderr should contain "No shared memory block" + And it should exit with an error + Scenario: Load data directly - st Given data is loaded directly Given the node map diff --git a/features/testbot/via.feature b/features/testbot/via.feature index bc5ba7348..90da6191d 100644 --- a/features/testbot/via.feature +++ b/features/testbot/via.feature @@ -22,18 +22,27 @@ Feature: Via points Given the contract extra arguments "--core 0.8" Given the node map """ - a b c + a b c d + e f g + h i + j """ And the ways | nodes | - | abc | + | abcd | + | efg | + | hi | + | be | + | cfh | + | dgij | When I route I should get - | waypoints | route | - | a,b,c | abc,abc,abc,abc | - | c,b,a | abc,abc,abc,abc | - | c,b,a | abc,abc,abc,abc | + | waypoints | route | + | a,b,c | abcd,abcd,abcd,abcd | + | c,b,a | abcd,abcd,abcd,abcd | + | a,d,j | abcd,abcd,dgij,dgij | + | j,d,a | dgij,dgij,abcd,abcd | Scenario: Via point at a dead end Given the node map diff --git a/include/engine/data_watchdog.hpp b/include/engine/data_watchdog.hpp index dbeb31efd..9894565f4 100644 --- a/include/engine/data_watchdog.hpp +++ b/include/engine/data_watchdog.hpp @@ -4,9 +4,9 @@ #include "engine/datafacade/contiguous_internalmem_datafacade.hpp" #include "engine/datafacade/shared_memory_allocator.hpp" -#include "storage/shared_barrier.hpp" #include "storage/shared_datatype.hpp" #include "storage/shared_memory.hpp" +#include "storage/shared_monitor.hpp" #include #include @@ -26,17 +26,18 @@ namespace engine // once a new dataset arrives. class DataWatchdog { + using mutex_type = typename storage::SharedMonitor::mutex_type; + public: - DataWatchdog() : barrier(boost::interprocess::open_only), active(true), timestamp(0) + DataWatchdog() : active(true), timestamp(0) { // create the initial facade before launching the watchdog thread { - boost::interprocess::scoped_lock - current_region_lock(barrier.GetMutex()); + boost::interprocess::scoped_lock current_region_lock(barrier.get_mutex()); facade = std::make_shared( - std::make_unique(barrier.GetRegion())); - timestamp = barrier.GetTimestamp(); + std::make_unique(barrier.data().region)); + timestamp = barrier.data().timestamp; } watcher = std::thread(&DataWatchdog::Run, this); @@ -45,7 +46,7 @@ class DataWatchdog ~DataWatchdog() { active = false; - barrier.NotifyAll(); + barrier.notify_all(); watcher.join(); } @@ -56,21 +57,20 @@ class DataWatchdog { while (active) { - boost::interprocess::scoped_lock - current_region_lock(barrier.GetMutex()); + boost::interprocess::scoped_lock current_region_lock(barrier.get_mutex()); - while (active && timestamp == barrier.GetTimestamp()) + while (active && timestamp == barrier.data().timestamp) { - barrier.Wait(current_region_lock); + barrier.wait(current_region_lock); } - if (timestamp != barrier.GetTimestamp()) + if (timestamp != barrier.data().timestamp) { + auto region = barrier.data().region; facade = std::make_shared( - std::make_unique(barrier.GetRegion())); - timestamp = barrier.GetTimestamp(); - util::Log() << "updated facade to region " - << storage::regionToString(barrier.GetRegion()) << " with timestamp " + std::make_unique(region)); + timestamp = barrier.data().timestamp; + util::Log() << "updated facade to region " << region << " with timestamp " << timestamp; } } @@ -78,7 +78,7 @@ class DataWatchdog util::Log() << "DataWatchdog thread stopped"; } - storage::SharedBarrier barrier; + storage::SharedMonitor barrier; std::thread watcher; bool active; unsigned timestamp; diff --git a/include/storage/shared_barrier.hpp b/include/storage/shared_barrier.hpp deleted file mode 100644 index ef0ac0923..000000000 --- a/include/storage/shared_barrier.hpp +++ /dev/null @@ -1,131 +0,0 @@ -#ifndef SHARED_BARRIER_HPP -#define SHARED_BARRIER_HPP - -#include "storage/shared_datatype.hpp" - -#include -#include -#include -#include -#include - -#include - -namespace osrm -{ -namespace storage -{ - -namespace -{ -namespace bi = boost::interprocess; -} - -struct SharedBarrier -{ - struct internal_data - { - typedef bi::interprocess_mutex mutex_type; - typedef bi::interprocess_condition condvar_type; - - internal_data() : region(REGION_NONE), timestamp(0){}; - - mutex_type mutex; - condvar_type condvar; - SharedDataType region; - unsigned timestamp; - }; - - typedef internal_data::mutex_type mutex_type; - typedef internal_data::condvar_type condvar_type; - - SharedBarrier(bi::open_or_create_t) - { - shmem = bi::shared_memory_object(bi::open_or_create, block_name, bi::read_write); - - const void *address = reinterpret_cast(block_address); - bi::offset_t size = 0; - if (!shmem.get_size(size) || size != block_size) - { - shmem.truncate(block_size); - region = bi::mapped_region(shmem, bi::read_write, 0, block_size, address, block_flags); - new (region.get_address()) internal_data; - } - else - { - region = bi::mapped_region(shmem, bi::read_write, 0, block_size, address, block_flags); - } - } - - SharedBarrier(bi::open_only_t) - { - try - { - shmem = bi::shared_memory_object(bi::open_only, block_name, bi::read_write); - - bi::offset_t size = 0; - if (!shmem.get_size(size) || size != block_size) - { - auto message = - boost::format("Wrong shared memory block size %1%, expected %2% bytes") % size % - static_cast(block_size); - throw util::exception(message.str() + SOURCE_REF); - } - } - catch (const bi::interprocess_exception &exception) - { - throw util::exception( - std::string( - "No shared memory blocks found, have you forgotten to run osrm-datastore?") + - SOURCE_REF); - } - - const void *address = reinterpret_cast(block_address); - region = bi::mapped_region(shmem, bi::read_write, 0, block_size, address, block_flags); - } - - auto &GetMutex() const { return internal()->mutex; } - - template void Wait(L &lock) const { internal()->condvar.wait(lock); } - - void NotifyAll() const { internal()->condvar.notify_all(); } - - void SetRegion(SharedDataType region) const - { - internal()->region = region; - internal()->timestamp += 1; - } - - auto GetRegion() const { return internal()->region; } - - auto GetTimestamp() const { return internal()->timestamp; } - - static void Remove() { bi::shared_memory_object::remove(block_name); } - - private: - bi::shared_memory_object shmem; - bi::mapped_region region; - - static constexpr const char *const block_name = "osrm-region"; - static constexpr std::size_t block_size = sizeof(internal_data); -#if defined(__APPLE__) - // OSX pthread_cond_wait implementation internally checks address of the mutex - // so a fixed mapping address is needed to share control memory block between - // processes and engine instances in one process - // https://github.com/Project-OSRM/osrm-backend/issues/3619 - static constexpr std::intptr_t block_address = 0x00000caffee00000; - static constexpr bi::map_options_t block_flags = MAP_FIXED; -#else - static constexpr std::intptr_t block_address = 0x0; - static constexpr bi::map_options_t block_flags = bi::default_map_options; -#endif - - inline internal_data *internal() const - { - return reinterpret_cast(region.get_address()); - } -}; -} -} - -#endif // SHARED_BARRIER_HPP diff --git a/include/storage/shared_datatype.hpp b/include/storage/shared_datatype.hpp index 416d36e90..ef0884f55 100644 --- a/include/storage/shared_datatype.hpp +++ b/include/storage/shared_datatype.hpp @@ -207,8 +207,15 @@ enum SharedDataType struct SharedDataTimestamp { + explicit SharedDataTimestamp(SharedDataType region, unsigned timestamp) + : region(region), timestamp(timestamp) + { + } + SharedDataType region; unsigned timestamp; + + static constexpr const char *name = "osrm-region"; }; inline std::string regionToString(const SharedDataType region) diff --git a/include/storage/shared_monitor.hpp b/include/storage/shared_monitor.hpp new file mode 100644 index 000000000..5aa1a5104 --- /dev/null +++ b/include/storage/shared_monitor.hpp @@ -0,0 +1,229 @@ +#ifndef SHARED_MONITOR_HPP +#define SHARED_MONITOR_HPP + +#include "storage/shared_datatype.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#if defined(__linux__) +#define USE_BOOST_INTERPROCESS_CONDITION 1 +#endif + +namespace osrm +{ +namespace storage +{ + +namespace +{ +namespace bi = boost::interprocess; + +template class InvertedLock +{ + Lock &lock; + + public: + InvertedLock(Lock &lock) : lock(lock) { lock.unlock(); } + ~InvertedLock() { lock.lock(); } +}; +} + +// The shared monitor implementation based on a semaphore and mutex +template struct SharedMonitor +{ + using mutex_type = bi::interprocess_mutex; + + SharedMonitor(const Data &initial_data) + { + shmem = bi::shared_memory_object(bi::open_or_create, Data::name, bi::read_write); + + bi::offset_t size = 0; + if (shmem.get_size(size) && size == 0) + { + shmem.truncate(internal_size + sizeof(Data)); + region = bi::mapped_region(shmem, bi::read_write); + new (&internal()) InternalData; + new (&data()) Data(initial_data); + } + else + { + region = bi::mapped_region(shmem, bi::read_write); + } + } + + SharedMonitor() + { + try + { + shmem = bi::shared_memory_object(bi::open_only, Data::name, bi::read_write); + + bi::offset_t size = 0; + if (!shmem.get_size(size) || size != internal_size + sizeof(Data)) + { + auto message = + boost::format("Wrong shared memory block '%1%' size %2%, expected %3% bytes") % + (const char *)Data::name % size % (internal_size + sizeof(Data)); + throw util::exception(message.str() + SOURCE_REF); + } + + region = bi::mapped_region(shmem, bi::read_write); + } + catch (const bi::interprocess_exception &exception) + { + auto message = boost::format("No shared memory block '%1%' found, have you forgotten " + "to run osrm-datastore?") % + (const char *)Data::name; + throw util::exception(message.str() + SOURCE_REF); + } + } + + Data &data() const + { + auto region_pointer = reinterpret_cast(region.get_address()); + return *reinterpret_cast(region_pointer + internal_size); + } + + mutex_type &get_mutex() const { return internal().mutex; } + +#if USE_BOOST_INTERPROCESS_CONDITION + template void wait(Lock &lock) { internal().condition.wait(lock); } + + void notify_all() { internal().condition.notify_all(); } +#else + template void wait(Lock &lock) + { + auto semaphore = internal().enqueue_semaphore(); + { + InvertedLock inverted_lock(lock); + semaphore->wait(); + } + } + + void notify_all() + { + bi::scoped_lock lock(internal().mutex); + while (!internal().empty()) + { + internal().dequeue_semaphore()->post(); + } + } +#endif + + static void remove() { bi::shared_memory_object::remove(Data::name); } + + private: +#if USE_BOOST_INTERPROCESS_CONDITION + + static constexpr int internal_size = 128; + + struct InternalData + { + mutex_type mutex; + bi::interprocess_condition condition; + }; + +#else + // Implement a conditional variable using a queue of semaphores. + // OSX checks the virtual address of a mutex in pthread_cond_wait and fails with EINVAL + // if the shared block is used by different processes. Solutions based on waiters counting + // like two-turnstile reusable barrier or boost/interprocess/sync/spin/condition.hpp + // fail if a waiter is killed. + + static constexpr int buffer_size = 256; + static constexpr int internal_size = 4 * 4096; + + struct InternalData + { + InternalData() : head(0), tail(0) + { + for (int index = 0; index < buffer_size; ++index) + { + invalidate_semaphore(get_semaphore(index)); + } + }; + + auto size() const { return head >= tail ? head - tail : buffer_size + head - tail; } + + auto empty() const { return tail == head; } + + auto enqueue_semaphore() + { + auto semaphore = get_semaphore(head); + head = (head + 1) % buffer_size; + + if (size() >= buffer_size / 2) + throw util::exception(std::string("ring buffer is too small") + SOURCE_REF); + + if (is_semaphore_valid(semaphore)) + { + // SEM_DESTROY(3): Destroying a semaphore that other processes or threads + // are currently blocked on (in sem_wait(3)) produces undefined behavior. + // To prevent undefined behavior number of active semaphores is limited + // to a half of buffer size. It is not a strong guarantee of well-defined behavior, + // but minimizes a risk to destroy a semaphore that is still in use. + semaphore->~interprocess_semaphore(); + } + + return new (semaphore) bi::interprocess_semaphore(0); + } + + auto dequeue_semaphore() + { + auto semaphore = get_semaphore(tail); + tail = (tail + 1) % buffer_size; + return semaphore; + } + + public: + mutex_type mutex; + + private: + auto get_semaphore(std::size_t index) + { + return reinterpret_cast( + buffer + index * sizeof(bi::interprocess_semaphore)); + } + + void invalidate_semaphore(void *semaphore) const + { + std::memset(semaphore, 0xff, sizeof(bi::interprocess_semaphore)); + } + + bool is_semaphore_valid(void *semaphore) const + { + char invalid[sizeof(bi::interprocess_semaphore)]; + invalidate_semaphore(invalid); + return std::memcmp(semaphore, invalid, sizeof(invalid)) != 0; + } + + std::size_t head, tail; + char buffer[buffer_size * sizeof(bi::interprocess_semaphore)]; + }; + + static_assert(buffer_size >= 2, "buffer size is too small"); + +#endif + + static_assert(sizeof(InternalData) + sizeof(Data) <= internal_size, "not enough space"); + static_assert(sizeof(InternalData) % alignof(Data) == 0, "incorrect data alignment"); + + InternalData &internal() const + { + return *reinterpret_cast(reinterpret_cast(region.get_address())); + } + + bi::shared_memory_object shmem; + bi::mapped_region region; +}; +} +} + +#undef USE_BOOST_INTERPROCESS_CONDITION + +#endif // SHARED_MONITOR_HPP diff --git a/src/storage/storage.cpp b/src/storage/storage.cpp index 4611aa578..21aca1501 100644 --- a/src/storage/storage.cpp +++ b/src/storage/storage.cpp @@ -8,9 +8,9 @@ #include "extractor/travel_mode.hpp" #include "storage/io.hpp" #include "storage/serialization.hpp" -#include "storage/shared_barrier.hpp" #include "storage/shared_datatype.hpp" #include "storage/shared_memory.hpp" +#include "storage/shared_monitor.hpp" #include "engine/datafacade/datafacade_base.hpp" #include "util/coordinate.hpp" #include "util/exception.hpp" @@ -53,6 +53,8 @@ using RTreeNode = util::StaticRTree::vector, true>::TreeNode; using QueryGraph = util::StaticGraph; +using Monitor = SharedMonitor; + Storage::Storage(StorageConfig config_) : config(std::move(config_)) {} int Storage::Run(int max_wait) @@ -90,8 +92,9 @@ int Storage::Run(int max_wait) // Get the next region ID and time stamp without locking shared barriers. // Because of datastore_lock the only write operation can occur sequentially later. - SharedBarrier barrier(boost::interprocess::open_or_create); - auto in_use_region = barrier.GetRegion(); + Monitor monitor(SharedDataTimestamp{REGION_NONE, 0}); + auto in_use_region = monitor.data().region; + auto next_timestamp = monitor.data().timestamp + 1; auto next_region = in_use_region == REGION_2 || in_use_region == REGION_NONE ? REGION_1 : REGION_2; @@ -124,8 +127,8 @@ int Storage::Run(int max_wait) PopulateData(layout, shared_memory_ptr + sizeof(layout)); { // Lock for write access shared region mutex - boost::interprocess::scoped_lock lock( - barrier.GetMutex(), boost::interprocess::defer_lock); + boost::interprocess::scoped_lock lock(monitor.get_mutex(), + boost::interprocess::defer_lock); if (max_wait >= 0) { @@ -136,9 +139,9 @@ int Storage::Run(int max_wait) << "Could not aquire current region lock after " << max_wait << " seconds. Removing locked block and creating a new one. All currently " "attached processes will not receive notifications and must be restarted"; - SharedBarrier::Remove(); + Monitor::remove(); in_use_region = REGION_NONE; - barrier = SharedBarrier(boost::interprocess::open_or_create); + monitor = Monitor(SharedDataTimestamp{REGION_NONE, 0}); } } else @@ -147,13 +150,13 @@ int Storage::Run(int max_wait) } // Update the current region ID and timestamp - barrier.SetRegion(next_region); + monitor.data().region = next_region; + monitor.data().timestamp = next_timestamp; } util::Log() << "All data loaded. Notify all client about new data in " - << regionToString(barrier.GetRegion()) << " with timestamp " - << barrier.GetTimestamp(); - barrier.NotifyAll(); + << regionToString(next_region) << " with timestamp " << next_timestamp; + monitor.notify_all(); // SHMCTL(2): Mark the segment to be destroyed. The segment will actually be destroyed // only after the last process detaches it. diff --git a/src/tools/store.cpp b/src/tools/store.cpp index dd9483b8e..24ee46fef 100644 --- a/src/tools/store.cpp +++ b/src/tools/store.cpp @@ -1,5 +1,5 @@ -#include "storage/shared_barrier.hpp" #include "storage/shared_memory.hpp" +#include "storage/shared_monitor.hpp" #include "storage/storage.hpp" #include "util/exception.hpp" #include "util/log.hpp" @@ -14,7 +14,7 @@ using namespace osrm; -void removeLocks() { osrm::storage::SharedBarrier::Remove(); } +void removeLocks() { osrm::storage::SharedMonitor::remove(); } void deleteRegion(const storage::SharedDataType region) {