diff --git a/include/engine/data_watchdog.hpp b/include/engine/data_watchdog.hpp index 91349eb2a..fc6c428fd 100644 --- a/include/engine/data_watchdog.hpp +++ b/include/engine/data_watchdog.hpp @@ -32,20 +32,28 @@ template class DataWatchdogImpl; template class DataWatchdogImpl> final { - using mutex_type = typename storage::SharedMonitor::mutex_type; + using mutex_type = typename storage::SharedMonitor::mutex_type; using Facade = datafacade::ContiguousInternalMemoryDataFacade; public: - DataWatchdogImpl() : active(true), timestamp(0) + DataWatchdogImpl() : active(true) { // create the initial facade before launching the watchdog thread { boost::interprocess::scoped_lock current_region_lock(barrier.get_mutex()); + auto& shared_register = barrier.data(); + auto region_id = shared_register.Find("data"); + if (region_id == storage::SharedRegionRegister::INVALID_REGION_ID) + { + throw util::exception("Could not find shared memory region. Did you run osrm-datastore?"); + } + shared_region = &shared_register.GetRegion(region_id); + region = *shared_region; + facade_factory = DataFacadeFactory( - std::make_shared(barrier.data().region)); - timestamp = barrier.data().timestamp; + std::make_shared(region.shm_key)); } watcher = std::thread(&DataWatchdogImpl::Run, this); @@ -74,30 +82,30 @@ class DataWatchdogImpl current_region_lock(barrier.get_mutex()); - while (active && timestamp == barrier.data().timestamp) + while (active && region.timestamp == shared_region->timestamp) { barrier.wait(current_region_lock); } - if (timestamp != barrier.data().timestamp) + if (region.timestamp != shared_region->timestamp) { - auto region = barrier.data().region; + region = *shared_region; facade_factory = DataFacadeFactory( - std::make_shared(region)); - timestamp = barrier.data().timestamp; - util::Log() << "updated facade to region " << region << " with timestamp " - << timestamp; + std::make_shared(region.shm_key)); + util::Log() << "updated facade to region " << region.shm_key << " with timestamp " + << region.timestamp; } } util::Log() << "DataWatchdog thread stopped"; } - storage::SharedMonitor barrier; + storage::SharedMonitor barrier; std::thread watcher; bool active; - unsigned timestamp; + storage::SharedRegion region; + storage::SharedRegion *shared_region; DataFacadeFactory facade_factory; }; } diff --git a/include/engine/datafacade/shared_memory_allocator.hpp b/include/engine/datafacade/shared_memory_allocator.hpp index e201e4ee4..3734ff8db 100644 --- a/include/engine/datafacade/shared_memory_allocator.hpp +++ b/include/engine/datafacade/shared_memory_allocator.hpp @@ -23,7 +23,7 @@ namespace datafacade class SharedMemoryAllocator : public ContiguousBlockAllocator { public: - explicit SharedMemoryAllocator(storage::SharedDataType data_region); + explicit SharedMemoryAllocator(storage::SharedRegionRegister::ShmKey data_shm_key); ~SharedMemoryAllocator() override final; // interface to give access to the datafacades diff --git a/include/storage/shared_datatype.hpp b/include/storage/shared_datatype.hpp index 7c3e561d3..f18462897 100644 --- a/include/storage/shared_datatype.hpp +++ b/include/storage/shared_datatype.hpp @@ -6,13 +6,13 @@ #include "util/exception.hpp" #include "util/exception_utils.hpp" -#include "util/log.hpp" #include #include #include #include +#include #include namespace osrm @@ -160,40 +160,100 @@ class DataLayout std::map blocks; }; -enum SharedDataType +struct SharedRegion { - REGION_NONE, - REGION_1, - REGION_2 -}; + static constexpr const int MAX_NAME_LENGTH = 254; -struct SharedDataTimestamp -{ - explicit SharedDataTimestamp(SharedDataType region, unsigned timestamp) - : region(region), timestamp(timestamp) + SharedRegion() : name{0}, timestamp{0} {} + SharedRegion(const std::string &name_, std::uint64_t timestamp, std::uint8_t shm_key) + : name{0}, timestamp{timestamp}, shm_key{shm_key} { + std::copy_n(name_.begin(), std::min(MAX_NAME_LENGTH, name_.size()), name); } - SharedDataType region; - unsigned timestamp; + bool IsEmpty() const { return timestamp == 0; } + + char name[MAX_NAME_LENGTH + 1]; + std::uint64_t timestamp; + std::uint8_t shm_key; +}; + +// Keeps a list of all shared regions in a fixed-sized struct +// for fast access and deserialization. +struct SharedRegionRegister +{ + using RegionID = std::uint8_t; + static constexpr const RegionID INVALID_REGION_ID = std::numeric_limits::max(); + using ShmKey = decltype(SharedRegion::shm_key); + + // Returns the key of the region with the given name + RegionID Find(const std::string &name) const + { + auto iter = std::find_if(regions.begin(), regions.end(), [&](const auto ®ion) { + return std::strncmp(region.name, name.c_str(), SharedRegion::MAX_NAME_LENGTH) == 0; + }); + + if (iter == regions.end()) + { + return INVALID_REGION_ID; + } + else + { + return std::distance(regions.begin(), iter); + } + } + + RegionID Register(const std::string &name, ShmKey key) + { + auto iter = std::find_if( + regions.begin(), regions.end(), [&](const auto ®ion) { return region.IsEmpty(); }); + if (iter == regions.end()) + { + throw util::exception("No shared memory regions left. Could not register " + name + + "."); + } + else + { + constexpr std::uint32_t INITIAL_TIMESTAMP = 1; + *iter = SharedRegion{name, INITIAL_TIMESTAMP, key}; + RegionID key = std::distance(regions.begin(), iter); + return key; + } + } + + void Deregister(const RegionID key) { regions[key] = SharedRegion{}; } + + const auto &GetRegion(const RegionID key) const { return regions[key]; } + + auto &GetRegion(const RegionID key) { return regions[key]; } + + ShmKey ReserveKey() + { + auto free_key_iter = std::find(shm_key_in_use.begin(), shm_key_in_use.end(), false); + if (free_key_iter == shm_key_in_use.end()) + { + throw util::exception("Could not reserve a new SHM key. All keys are in use"); + } + + *free_key_iter = true; + return std::distance(shm_key_in_use.begin(), free_key_iter); + } + + void ReleaseKey(ShmKey key) { shm_key_in_use[key] = false; } + + static constexpr const std::uint8_t MAX_SHARED_REGIONS = + std::numeric_limits::max() - 1; + static_assert(MAX_SHARED_REGIONS < std::numeric_limits::max(), + "Number of shared memory regions needs to be less than the region id size."); + + static constexpr const std::uint8_t MAX_SHM_KEYS = std::numeric_limits::max() - 1; static constexpr const char *name = "osrm-region"; -}; -inline std::string regionToString(const SharedDataType region) -{ - switch (region) - { - case REGION_1: - return "REGION_1"; - case REGION_2: - return "REGION_2"; - case REGION_NONE: - return "REGION_NONE"; - default: - return "INVALID_REGION"; - } -} + private: + std::array regions; + std::array shm_key_in_use; +}; } } diff --git a/include/storage/shared_monitor.hpp b/include/storage/shared_monitor.hpp index b80d2d3c5..2d0e5d086 100644 --- a/include/storage/shared_monitor.hpp +++ b/include/storage/shared_monitor.hpp @@ -47,7 +47,7 @@ template struct SharedMonitor bi::offset_t size = 0; if (shmem.get_size(size) && size == 0) { - shmem.truncate(internal_size + sizeof(Data)); + shmem.truncate(rounded_internal_size + sizeof(Data)); region = bi::mapped_region(shmem, bi::read_write); new (&internal()) InternalData; new (&data()) Data(initial_data); @@ -65,11 +65,11 @@ template struct SharedMonitor shmem = bi::shared_memory_object(bi::open_only, Data::name, bi::read_write); bi::offset_t size = 0; - if (!shmem.get_size(size) || size != internal_size + sizeof(Data)) + if (!shmem.get_size(size) || size != rounded_internal_size + sizeof(Data)) { auto message = boost::format("Wrong shared memory block '%1%' size %2%, expected %3% bytes") % - (const char *)Data::name % size % (internal_size + sizeof(Data)); + (const char *)Data::name % size % (rounded_internal_size + sizeof(Data)); throw util::exception(message.str() + SOURCE_REF); } @@ -87,7 +87,7 @@ template struct SharedMonitor Data &data() const { auto region_pointer = reinterpret_cast(region.get_address()); - return *reinterpret_cast(region_pointer + internal_size); + return *reinterpret_cast(region_pointer + rounded_internal_size); } mutex_type &get_mutex() const { return internal().mutex; } @@ -120,15 +120,13 @@ template struct SharedMonitor private: #if USE_BOOST_INTERPROCESS_CONDITION - - static constexpr int internal_size = 128; - struct InternalData { mutex_type mutex; bi::interprocess_condition condition; }; + #else // Implement a conditional variable using a queue of semaphores. // OSX checks the virtual address of a mutex in pthread_cond_wait and fails with EINVAL @@ -137,7 +135,6 @@ template struct SharedMonitor // fail if a waiter is killed. static constexpr int buffer_size = 256; - static constexpr int internal_size = 4 * 4096; struct InternalData { @@ -210,9 +207,8 @@ template struct SharedMonitor static_assert(buffer_size >= 2, "buffer size is too small"); #endif - - static_assert(sizeof(InternalData) + sizeof(Data) <= internal_size, "not enough space"); - static_assert(sizeof(InternalData) % alignof(Data) == 0, "incorrect data alignment"); + static constexpr int rounded_internal_size = ((sizeof(InternalData) + alignof(Data) - 1) / alignof(Data)) * alignof(Data); + static_assert(rounded_internal_size < sizeof(InternalData) + sizeof(Data), "Data and internal data need to fit into shared memory"); InternalData &internal() const { diff --git a/src/engine/datafacade/shared_memory_allocator.cpp b/src/engine/datafacade/shared_memory_allocator.cpp index d6d8be0d2..8b8297a53 100644 --- a/src/engine/datafacade/shared_memory_allocator.cpp +++ b/src/engine/datafacade/shared_memory_allocator.cpp @@ -13,12 +13,12 @@ namespace engine namespace datafacade { -SharedMemoryAllocator::SharedMemoryAllocator(storage::SharedDataType data_region) +SharedMemoryAllocator::SharedMemoryAllocator(storage::SharedRegionRegister::ShmKey data_shm_key) { - util::Log(logDEBUG) << "Loading new data for region " << regionToString(data_region); + util::Log(logDEBUG) << "Loading new data for region " << data_shm_key; - BOOST_ASSERT(storage::SharedMemory::RegionExists(data_region)); - m_large_memory = storage::makeSharedMemory(data_region); + BOOST_ASSERT(storage::SharedMemory::RegionExists(data_shm_key)); + m_large_memory = storage::makeSharedMemory(data_shm_key); storage::io::BufferReader reader(reinterpret_cast(m_large_memory->Ptr()), m_large_memory->Size()); diff --git a/src/osrm/osrm.cpp b/src/osrm/osrm.cpp index 77da7b40e..06abf7b79 100644 --- a/src/osrm/osrm.cpp +++ b/src/osrm/osrm.cpp @@ -28,12 +28,6 @@ OSRM::OSRM(engine::EngineConfig &config) throw util::exception("Required files are missing, cannot continue. Have all the " "pre-processing steps been run?"); } - else if (config.use_shared_memory) - { - storage::SharedMonitor barrier; - using mutex_type = typename decltype(barrier)::mutex_type; - boost::interprocess::scoped_lock current_region_lock(barrier.get_mutex()); - } // Now, check that the algorithm requested can be used with the data // that's available. diff --git a/src/storage/storage.cpp b/src/storage/storage.cpp index 66aca9485..06ad9ca4d 100644 --- a/src/storage/storage.cpp +++ b/src/storage/storage.cpp @@ -61,7 +61,7 @@ inline void readBlocks(const boost::filesystem::path &path, DataLayout &layout) } } -using Monitor = SharedMonitor; +using Monitor = SharedMonitor; Storage::Storage(StorageConfig config_) : config(std::move(config_)) {} @@ -100,25 +100,24 @@ int Storage::Run(int max_wait) // Get the next region ID and time stamp without locking shared barriers. // Because of datastore_lock the only write operation can occur sequentially later. - Monitor monitor(SharedDataTimestamp{REGION_NONE, 0}); - auto in_use_region = monitor.data().region; - auto next_timestamp = monitor.data().timestamp + 1; - auto next_region = - in_use_region == REGION_2 || in_use_region == REGION_NONE ? REGION_1 : REGION_2; + Monitor monitor(SharedRegionRegister{}); + auto &shared_register = monitor.data(); + + // This is safe because we have an exclusive lock for all osrm-datastore processes. + auto shm_key = shared_register.ReserveKey(); // ensure that the shared memory region we want to write to is really removed // this is only needef for failure recovery because we actually wait for all clients // to detach at the end of the function - if (storage::SharedMemory::RegionExists(next_region)) + if (storage::SharedMemory::RegionExists(shm_key)) { - util::Log(logWARNING) << "Old shared memory region " << regionToString(next_region) - << " still exists."; + util::Log(logWARNING) << "Old shared memory region " << shm_key << " still exists."; util::UnbufferedLog() << "Retrying removal... "; - storage::SharedMemory::Remove(next_region); + storage::SharedMemory::Remove(shm_key); util::UnbufferedLog() << "ok."; } - util::Log() << "Loading data into " << regionToString(next_region); + util::Log() << "Loading data into " << shm_key; // Populate a memory layout into stack memory DataLayout layout; @@ -132,13 +131,16 @@ int Storage::Run(int max_wait) auto regions_size = encoded_layout.size() + layout.GetSizeOfLayout(); util::Log() << "Data layout has a size of " << encoded_layout.size() << " bytes"; util::Log() << "Allocating shared memory of " << regions_size << " bytes"; - auto data_memory = makeSharedMemory(next_region, regions_size); + auto data_memory = makeSharedMemory(shm_key, regions_size); // Copy memory layout to shared memory and populate data char *shared_memory_ptr = static_cast(data_memory->Ptr()); std::copy_n(encoded_layout.data(), encoded_layout.size(), shared_memory_ptr); PopulateData(layout, shared_memory_ptr + encoded_layout.size()); + std::uint32_t next_timestamp = 0; + std::uint8_t in_use_key = SharedRegionRegister::MAX_SHM_KEYS; + { // Lock for write access shared region mutex boost::interprocess::scoped_lock lock(monitor.get_mutex(), boost::interprocess::defer_lock); @@ -148,13 +150,10 @@ int Storage::Run(int max_wait) if (!lock.timed_lock(boost::posix_time::microsec_clock::universal_time() + boost::posix_time::seconds(max_wait))) { - util::Log(logWARNING) - << "Could not aquire current region lock after " << max_wait - << " seconds. Removing locked block and creating a new one. All currently " - "attached processes will not receive notifications and must be restarted"; - Monitor::remove(); - in_use_region = REGION_NONE; - monitor = Monitor(SharedDataTimestamp{REGION_NONE, 0}); + util::Log(logERROR) << "Could not aquire current region lock after " << max_wait + << " seconds. Data update failed."; + SharedMemory::Remove(shm_key); + return EXIT_FAILURE; } } else @@ -162,32 +161,45 @@ int Storage::Run(int max_wait) lock.lock(); } - // Update the current region ID and timestamp - monitor.data().region = next_region; - monitor.data().timestamp = next_timestamp; + auto region_id = shared_register.Find("data"); + if (region_id == SharedRegionRegister::INVALID_REGION_ID) + { + region_id = shared_register.Register("data", shm_key); + } + else + { + auto& shared_region = shared_register.GetRegion(region_id); + next_timestamp = shared_region.timestamp + 1; + in_use_key = shared_region.shm_key; + shared_region.shm_key = shm_key; + shared_region.timestamp = next_timestamp; + } } - util::Log() << "All data loaded. Notify all client about new data in " - << regionToString(next_region) << " with timestamp " << next_timestamp; + util::Log() << "All data loaded. Notify all client about new data in " << shm_key + << " with timestamp " << next_timestamp; monitor.notify_all(); // SHMCTL(2): Mark the segment to be destroyed. The segment will actually be destroyed // only after the last process detaches it. - if (in_use_region != REGION_NONE && storage::SharedMemory::RegionExists(in_use_region)) + if (in_use_key != SharedRegionRegister::MAX_SHM_KEYS && + storage::SharedMemory::RegionExists(in_use_key)) { - util::UnbufferedLog() << "Marking old shared memory region " - << regionToString(in_use_region) << " for removal... "; + util::UnbufferedLog() << "Marking old shared memory region " << in_use_key + << " for removal... "; // aquire a handle for the old shared memory region before we mark it for deletion // we will need this to wait for all users to detach - auto in_use_shared_memory = makeSharedMemory(in_use_region); + auto in_use_shared_memory = makeSharedMemory(in_use_key); - storage::SharedMemory::Remove(in_use_region); + storage::SharedMemory::Remove(in_use_key); util::UnbufferedLog() << "ok."; util::UnbufferedLog() << "Waiting for clients to detach... "; in_use_shared_memory->WaitForDetach(); util::UnbufferedLog() << " ok."; + + shared_register.ReleaseKey(in_use_key); } util::Log() << "All clients switched."; diff --git a/src/tools/store.cpp b/src/tools/store.cpp index e32f89d70..17d0a1dfd 100644 --- a/src/tools/store.cpp +++ b/src/tools/store.cpp @@ -15,14 +15,13 @@ using namespace osrm; -void removeLocks() { storage::SharedMonitor::remove(); } +void removeLocks() { storage::SharedMonitor::remove(); } -void deleteRegion(const storage::SharedDataType region) +void deleteRegion(const storage::SharedRegionRegister::ShmKey key) { - if (storage::SharedMemory::RegionExists(region) && !storage::SharedMemory::Remove(region)) + if (storage::SharedMemory::RegionExists(key) && !storage::SharedMemory::Remove(key)) { - util::Log(logWARNING) << "could not delete shared memory region " - << storage::regionToString(region); + util::Log(logWARNING) << "could not delete shared memory region " << key; } } @@ -43,8 +42,10 @@ void springClean() } else { - deleteRegion(storage::REGION_1); - deleteRegion(storage::REGION_2); + for (auto key : util::irange(0, storage::SharedRegionRegister::MAX_SHM_KEYS)) + { + deleteRegion(key); + } removeLocks(); } }