shared monitor implementation with a conditional variable

or a ring buffer with semaphores
This commit is contained in:
Michael Krasnyk
2017-02-03 13:39:44 +01:00
committed by Patrick Niklaus
parent 438aa66b4c
commit 82acd59a8e
10 changed files with 310 additions and 170 deletions
+17 -17
View File
@@ -4,9 +4,9 @@
#include "engine/datafacade/contiguous_internalmem_datafacade.hpp"
#include "engine/datafacade/shared_memory_allocator.hpp"
#include "storage/shared_barrier.hpp"
#include "storage/shared_datatype.hpp"
#include "storage/shared_memory.hpp"
#include "storage/shared_monitor.hpp"
#include <boost/interprocess/sync/named_upgradable_mutex.hpp>
#include <boost/thread/lock_types.hpp>
@@ -26,17 +26,18 @@ namespace engine
// once a new dataset arrives.
class DataWatchdog
{
using mutex_type = typename storage::SharedMonitor<storage::SharedDataTimestamp>::mutex_type;
public:
DataWatchdog() : barrier(boost::interprocess::open_only), active(true), timestamp(0)
DataWatchdog() : active(true), timestamp(0)
{
// create the initial facade before launching the watchdog thread
{
boost::interprocess::scoped_lock<storage::SharedBarrier::mutex_type>
current_region_lock(barrier.GetMutex());
boost::interprocess::scoped_lock<mutex_type> current_region_lock(barrier.get_mutex());
facade = std::make_shared<datafacade::ContiguousInternalMemoryDataFacade>(
std::make_unique<datafacade::SharedMemoryAllocator>(barrier.GetRegion()));
timestamp = barrier.GetTimestamp();
std::make_unique<datafacade::SharedMemoryAllocator>(barrier.data().region));
timestamp = barrier.data().timestamp;
}
watcher = std::thread(&DataWatchdog::Run, this);
@@ -45,7 +46,7 @@ class DataWatchdog
~DataWatchdog()
{
active = false;
barrier.NotifyAll();
barrier.notify_all();
watcher.join();
}
@@ -56,21 +57,20 @@ class DataWatchdog
{
while (active)
{
boost::interprocess::scoped_lock<storage::SharedBarrier::mutex_type>
current_region_lock(barrier.GetMutex());
boost::interprocess::scoped_lock<mutex_type> current_region_lock(barrier.get_mutex());
while (active && timestamp == barrier.GetTimestamp())
while (active && timestamp == barrier.data().timestamp)
{
barrier.Wait(current_region_lock);
barrier.wait(current_region_lock);
}
if (timestamp != barrier.GetTimestamp())
if (timestamp != barrier.data().timestamp)
{
auto region = barrier.data().region;
facade = std::make_shared<datafacade::ContiguousInternalMemoryDataFacade>(
std::make_unique<datafacade::SharedMemoryAllocator>(barrier.GetRegion()));
timestamp = barrier.GetTimestamp();
util::Log() << "updated facade to region "
<< storage::regionToString(barrier.GetRegion()) << " with timestamp "
std::make_unique<datafacade::SharedMemoryAllocator>(region));
timestamp = barrier.data().timestamp;
util::Log() << "updated facade to region " << region << " with timestamp "
<< timestamp;
}
}
@@ -78,7 +78,7 @@ class DataWatchdog
util::Log() << "DataWatchdog thread stopped";
}
storage::SharedBarrier barrier;
storage::SharedMonitor<storage::SharedDataTimestamp> barrier;
std::thread watcher;
bool active;
unsigned timestamp;
-131
View File
@@ -1,131 +0,0 @@
#ifndef SHARED_BARRIER_HPP
#define SHARED_BARRIER_HPP
#include "storage/shared_datatype.hpp"
#include <boost/format.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <iostream>
namespace osrm
{
namespace storage
{
namespace
{
namespace bi = boost::interprocess;
}
struct SharedBarrier
{
struct internal_data
{
typedef bi::interprocess_mutex mutex_type;
typedef bi::interprocess_condition condvar_type;
internal_data() : region(REGION_NONE), timestamp(0){};
mutex_type mutex;
condvar_type condvar;
SharedDataType region;
unsigned timestamp;
};
typedef internal_data::mutex_type mutex_type;
typedef internal_data::condvar_type condvar_type;
SharedBarrier(bi::open_or_create_t)
{
shmem = bi::shared_memory_object(bi::open_or_create, block_name, bi::read_write);
const void *address = reinterpret_cast<void *>(block_address);
bi::offset_t size = 0;
if (!shmem.get_size(size) || size != block_size)
{
shmem.truncate(block_size);
region = bi::mapped_region(shmem, bi::read_write, 0, block_size, address, block_flags);
new (region.get_address()) internal_data;
}
else
{
region = bi::mapped_region(shmem, bi::read_write, 0, block_size, address, block_flags);
}
}
SharedBarrier(bi::open_only_t)
{
try
{
shmem = bi::shared_memory_object(bi::open_only, block_name, bi::read_write);
bi::offset_t size = 0;
if (!shmem.get_size(size) || size != block_size)
{
auto message =
boost::format("Wrong shared memory block size %1%, expected %2% bytes") % size %
static_cast<const std::size_t>(block_size);
throw util::exception(message.str() + SOURCE_REF);
}
}
catch (const bi::interprocess_exception &exception)
{
throw util::exception(
std::string(
"No shared memory blocks found, have you forgotten to run osrm-datastore?") +
SOURCE_REF);
}
const void *address = reinterpret_cast<void *>(block_address);
region = bi::mapped_region(shmem, bi::read_write, 0, block_size, address, block_flags);
}
auto &GetMutex() const { return internal()->mutex; }
template <typename L> void Wait(L &lock) const { internal()->condvar.wait(lock); }
void NotifyAll() const { internal()->condvar.notify_all(); }
void SetRegion(SharedDataType region) const
{
internal()->region = region;
internal()->timestamp += 1;
}
auto GetRegion() const { return internal()->region; }
auto GetTimestamp() const { return internal()->timestamp; }
static void Remove() { bi::shared_memory_object::remove(block_name); }
private:
bi::shared_memory_object shmem;
bi::mapped_region region;
static constexpr const char *const block_name = "osrm-region";
static constexpr std::size_t block_size = sizeof(internal_data);
#if defined(__APPLE__)
// OSX pthread_cond_wait implementation internally checks address of the mutex
// so a fixed mapping address is needed to share control memory block between
// processes and engine instances in one process
// https://github.com/Project-OSRM/osrm-backend/issues/3619
static constexpr std::intptr_t block_address = 0x00000caffee00000;
static constexpr bi::map_options_t block_flags = MAP_FIXED;
#else
static constexpr std::intptr_t block_address = 0x0;
static constexpr bi::map_options_t block_flags = bi::default_map_options;
#endif
inline internal_data *internal() const
{
return reinterpret_cast<internal_data *>(region.get_address());
}
};
}
}
#endif // SHARED_BARRIER_HPP
+7
View File
@@ -207,8 +207,15 @@ enum SharedDataType
struct SharedDataTimestamp
{
explicit SharedDataTimestamp(SharedDataType region, unsigned timestamp)
: region(region), timestamp(timestamp)
{
}
SharedDataType region;
unsigned timestamp;
static constexpr const char *name = "osrm-region";
};
inline std::string regionToString(const SharedDataType region)
+229
View File
@@ -0,0 +1,229 @@
#ifndef SHARED_MONITOR_HPP
#define SHARED_MONITOR_HPP
#include "storage/shared_datatype.hpp"
#include <boost/format.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/interprocess_semaphore.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#if defined(__linux__)
#define USE_BOOST_INTERPROCESS_CONDITION 1
#endif
namespace osrm
{
namespace storage
{
namespace
{
namespace bi = boost::interprocess;
template <class Lock> class InvertedLock
{
Lock &lock;
public:
InvertedLock(Lock &lock) : lock(lock) { lock.unlock(); }
~InvertedLock() { lock.lock(); }
};
}
// The shared monitor implementation based on a semaphore and mutex
template <typename Data> struct SharedMonitor
{
using mutex_type = bi::interprocess_mutex;
SharedMonitor(const Data &initial_data)
{
shmem = bi::shared_memory_object(bi::open_or_create, Data::name, bi::read_write);
bi::offset_t size = 0;
if (shmem.get_size(size) && size == 0)
{
shmem.truncate(internal_size + sizeof(Data));
region = bi::mapped_region(shmem, bi::read_write);
new (&internal()) InternalData;
new (&data()) Data(initial_data);
}
else
{
region = bi::mapped_region(shmem, bi::read_write);
}
}
SharedMonitor()
{
try
{
shmem = bi::shared_memory_object(bi::open_only, Data::name, bi::read_write);
bi::offset_t size = 0;
if (!shmem.get_size(size) || size != internal_size + sizeof(Data))
{
auto message =
boost::format("Wrong shared memory block '%1%' size %2%, expected %3% bytes") %
(const char *)Data::name % size % (internal_size + sizeof(Data));
throw util::exception(message.str() + SOURCE_REF);
}
region = bi::mapped_region(shmem, bi::read_write);
}
catch (const bi::interprocess_exception &exception)
{
auto message = boost::format("No shared memory block '%1%' found, have you forgotten "
"to run osrm-datastore?") %
(const char *)Data::name;
throw util::exception(message.str() + SOURCE_REF);
}
}
Data &data() const
{
auto region_pointer = reinterpret_cast<char *>(region.get_address());
return *reinterpret_cast<Data *>(region_pointer + internal_size);
}
mutex_type &get_mutex() const { return internal().mutex; }
#if USE_BOOST_INTERPROCESS_CONDITION
template <typename Lock> void wait(Lock &lock) { internal().condition.wait(lock); }
void notify_all() { internal().condition.notify_all(); }
#else
template <typename Lock> void wait(Lock &lock)
{
auto semaphore = internal().enqueue_semaphore();
{
InvertedLock<Lock> inverted_lock(lock);
semaphore->wait();
}
}
void notify_all()
{
bi::scoped_lock<mutex_type> lock(internal().mutex);
while (!internal().empty())
{
internal().dequeue_semaphore()->post();
}
}
#endif
static void remove() { bi::shared_memory_object::remove(Data::name); }
private:
#if USE_BOOST_INTERPROCESS_CONDITION
static constexpr int internal_size = 128;
struct InternalData
{
mutex_type mutex;
bi::interprocess_condition condition;
};
#else
// Implement a conditional variable using a queue of semaphores.
// OSX checks the virtual address of a mutex in pthread_cond_wait and fails with EINVAL
// if the shared block is used by different processes. Solutions based on waiters counting
// like two-turnstile reusable barrier or boost/interprocess/sync/spin/condition.hpp
// fail if a waiter is killed.
static constexpr int buffer_size = 256;
static constexpr int internal_size = 4 * 4096;
struct InternalData
{
InternalData() : head(0), tail(0)
{
for (int index = 0; index < buffer_size; ++index)
{
invalidate_semaphore(get_semaphore(index));
}
};
auto size() const { return head >= tail ? head - tail : buffer_size + head - tail; }
auto empty() const { return tail == head; }
auto enqueue_semaphore()
{
auto semaphore = get_semaphore(head);
head = (head + 1) % buffer_size;
if (size() >= buffer_size / 2)
throw util::exception(std::string("ring buffer is too small") + SOURCE_REF);
if (is_semaphore_valid(semaphore))
{
// SEM_DESTROY(3): Destroying a semaphore that other processes or threads
// are currently blocked on (in sem_wait(3)) produces undefined behavior.
// To prevent undefined behavior number of active semaphores is limited
// to a half of buffer size. It is not a strong guarantee of well-defined behavior,
// but minimizes a risk to destroy a semaphore that is still in use.
semaphore->~interprocess_semaphore();
}
return new (semaphore) bi::interprocess_semaphore(0);
}
auto dequeue_semaphore()
{
auto semaphore = get_semaphore(tail);
tail = (tail + 1) % buffer_size;
return semaphore;
}
public:
mutex_type mutex;
private:
auto get_semaphore(std::size_t index)
{
return reinterpret_cast<bi::interprocess_semaphore *>(
buffer + index * sizeof(bi::interprocess_semaphore));
}
void invalidate_semaphore(void *semaphore) const
{
std::memset(semaphore, 0xff, sizeof(bi::interprocess_semaphore));
}
bool is_semaphore_valid(void *semaphore) const
{
char invalid[sizeof(bi::interprocess_semaphore)];
invalidate_semaphore(invalid);
return std::memcmp(semaphore, invalid, sizeof(invalid)) != 0;
}
std::size_t head, tail;
char buffer[buffer_size * sizeof(bi::interprocess_semaphore)];
};
static_assert(buffer_size >= 2, "buffer size is too small");
#endif
static_assert(sizeof(InternalData) + sizeof(Data) <= internal_size, "not enough space");
static_assert(sizeof(InternalData) % alignof(Data) == 0, "incorrect data alignment");
InternalData &internal() const
{
return *reinterpret_cast<InternalData *>(reinterpret_cast<char *>(region.get_address()));
}
bi::shared_memory_object shmem;
bi::mapped_region region;
};
}
}
#undef USE_BOOST_INTERPROCESS_CONDITION
#endif // SHARED_MONITOR_HPP