Don't block all requests to switch data
This switchtes the data even if there are requests still running on the old data. osrm-datastore then waits until all of these old requests have finished before freeing the old regions. This also means that osrm-datastore will return with an error if there is a data update currenlty in progress.
This commit is contained in:
committed by
Patrick Niklaus
parent
1557ff81bc
commit
c69545c47a
+9
-25
@@ -22,7 +22,6 @@
|
||||
#include <boost/interprocess/sync/named_condition.hpp>
|
||||
#include <boost/interprocess/sync/scoped_lock.hpp>
|
||||
#include <boost/interprocess/sync/sharable_lock.hpp>
|
||||
#include <boost/thread/lock_types.hpp>
|
||||
|
||||
#include <algorithm>
|
||||
#include <fstream>
|
||||
@@ -36,28 +35,15 @@ namespace
|
||||
// Works the same for every plugin.
|
||||
template <typename ParameterT, typename PluginT, typename ResultT>
|
||||
osrm::engine::Status
|
||||
RunQuery(const std::unique_ptr<osrm::storage::SharedBarriers> &lock,
|
||||
osrm::engine::DataWatchdog& watchdog,
|
||||
RunQuery(const std::unique_ptr<osrm::engine::DataWatchdog>& watchdog,
|
||||
std::shared_ptr<osrm::engine::datafacade::BaseDataFacade> &facade,
|
||||
const ParameterT ¶meters,
|
||||
PluginT &plugin,
|
||||
ResultT &result)
|
||||
{
|
||||
if (!lock)
|
||||
if (watchdog && watchdog->HasNewRegion())
|
||||
{
|
||||
return plugin.HandleRequest(facade, parameters, result);
|
||||
}
|
||||
|
||||
BOOST_ASSERT(lock);
|
||||
// this locks aquires shared ownership of the query mutex: other requets are allowed
|
||||
// to run, but data updates need to wait for all queries to finish until they can aquire an
|
||||
// exclusive lock
|
||||
boost::interprocess::sharable_lock<boost::interprocess::named_sharable_mutex> query_lock(
|
||||
lock->query_mutex);
|
||||
|
||||
if (watchdog.HasNewRegion())
|
||||
{
|
||||
watchdog.MaybeLoadNewRegion(facade);
|
||||
watchdog->MaybeLoadNewRegion(facade);
|
||||
}
|
||||
|
||||
osrm::engine::Status status = plugin.HandleRequest(facade, parameters, result);
|
||||
@@ -84,14 +70,12 @@ Engine::Engine(const EngineConfig &config)
|
||||
"No shared memory blocks found, have you forgotten to run osrm-datastore?");
|
||||
}
|
||||
|
||||
facade_update_mutex = std::make_unique<std::mutex>();
|
||||
watchdog = std::make_unique<DataWatchdog>();
|
||||
// this will always either return a value or throw an exception
|
||||
// in the initial run
|
||||
watchdog->MaybeLoadNewRegion(query_data_facade);
|
||||
BOOST_ASSERT(query_data_facade);
|
||||
BOOST_ASSERT(watchdog);
|
||||
BOOST_ASSERT(lock);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -120,32 +104,32 @@ Engine &Engine::operator=(Engine &&) noexcept = default;
|
||||
|
||||
Status Engine::Route(const api::RouteParameters ¶ms, util::json::Object &result) const
|
||||
{
|
||||
return RunQuery(lock, *watchdog, query_data_facade, params, *route_plugin, result);
|
||||
return RunQuery(watchdog, query_data_facade, params, *route_plugin, result);
|
||||
}
|
||||
|
||||
Status Engine::Table(const api::TableParameters ¶ms, util::json::Object &result) const
|
||||
{
|
||||
return RunQuery(lock, *watchdog, query_data_facade, params, *table_plugin, result);
|
||||
return RunQuery(watchdog, query_data_facade, params, *table_plugin, result);
|
||||
}
|
||||
|
||||
Status Engine::Nearest(const api::NearestParameters ¶ms, util::json::Object &result) const
|
||||
{
|
||||
return RunQuery(lock, *watchdog, query_data_facade, params, *nearest_plugin, result);
|
||||
return RunQuery(watchdog, query_data_facade, params, *nearest_plugin, result);
|
||||
}
|
||||
|
||||
Status Engine::Trip(const api::TripParameters ¶ms, util::json::Object &result) const
|
||||
{
|
||||
return RunQuery(lock, *watchdog, query_data_facade, params, *trip_plugin, result);
|
||||
return RunQuery(watchdog, query_data_facade, params, *trip_plugin, result);
|
||||
}
|
||||
|
||||
Status Engine::Match(const api::MatchParameters ¶ms, util::json::Object &result) const
|
||||
{
|
||||
return RunQuery(lock, *watchdog, query_data_facade, params, *match_plugin, result);
|
||||
return RunQuery(watchdog, query_data_facade, params, *match_plugin, result);
|
||||
}
|
||||
|
||||
Status Engine::Tile(const api::TileParameters ¶ms, std::string &result) const
|
||||
{
|
||||
return RunQuery(lock, *watchdog, query_data_facade, params, *tile_plugin, result);
|
||||
return RunQuery(watchdog, query_data_facade, params, *tile_plugin, result);
|
||||
}
|
||||
|
||||
} // engine ns
|
||||
|
||||
+85
-30
@@ -1,4 +1,3 @@
|
||||
#include "storage/storage.hpp"
|
||||
#include "contractor/query_edge.hpp"
|
||||
#include "extractor/compressed_edge_container.hpp"
|
||||
#include "extractor/guidance/turn_instruction.hpp"
|
||||
@@ -9,6 +8,7 @@
|
||||
#include "storage/shared_barriers.hpp"
|
||||
#include "storage/shared_datatype.hpp"
|
||||
#include "storage/shared_memory.hpp"
|
||||
#include "storage/storage.hpp"
|
||||
#include "engine/datafacade/datafacade_base.hpp"
|
||||
#include "util/coordinate.hpp"
|
||||
#include "util/exception.hpp"
|
||||
@@ -27,6 +27,11 @@
|
||||
#endif
|
||||
|
||||
#include <boost/filesystem/fstream.hpp>
|
||||
#include <boost/interprocess/exceptions.hpp>
|
||||
#include <boost/interprocess/sync/named_sharable_mutex.hpp>
|
||||
#include <boost/interprocess/sync/named_upgradable_mutex.hpp>
|
||||
#include <boost/interprocess/sync/scoped_lock.hpp>
|
||||
#include <boost/interprocess/sync/upgradable_lock.hpp>
|
||||
#include <boost/iostreams/seek.hpp>
|
||||
|
||||
#include <cstdint>
|
||||
@@ -78,12 +83,37 @@ using QueryGraph = util::StaticGraph<contractor::QueryEdge::EdgeData>;
|
||||
|
||||
Storage::Storage(StorageConfig config_) : config(std::move(config_)) {}
|
||||
|
||||
bool regionsAvailable(SharedDataType layout, SharedDataType data)
|
||||
{
|
||||
auto shared_regions = makeSharedMemoryView(CURRENT_REGIONS);
|
||||
const auto shared_timestamp = static_cast<const SharedDataTimestamp *>(shared_regions->Ptr());
|
||||
return shared_timestamp->layout != layout && shared_timestamp->data != data;
|
||||
}
|
||||
|
||||
int Storage::Run()
|
||||
{
|
||||
BOOST_ASSERT_MSG(config.IsValid(), "Invalid storage config");
|
||||
|
||||
util::LogPolicy::GetInstance().Unmute();
|
||||
SharedBarriers barrier;
|
||||
SharedBarriers barriers;
|
||||
|
||||
boost::interprocess::upgradable_lock<boost::interprocess::named_upgradable_mutex>
|
||||
current_regions_lock(barriers.current_regions_mutex, boost::interprocess::defer_lock);
|
||||
try
|
||||
{
|
||||
if (!current_regions_lock.try_lock())
|
||||
{
|
||||
util::SimpleLogger().Write(logWARNING) << "A data update is in progress";
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
}
|
||||
// hard unlock in case of any exception.
|
||||
catch (boost::interprocess::lock_exception &ex)
|
||||
{
|
||||
barriers.current_regions_mutex.unlock_upgradable();
|
||||
// make sure we exit here because this is bad
|
||||
throw;
|
||||
}
|
||||
|
||||
#ifdef __linux__
|
||||
// try to disable swapping on Linux
|
||||
@@ -94,30 +124,34 @@ int Storage::Run()
|
||||
}
|
||||
#endif
|
||||
|
||||
try
|
||||
{
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> pending_lock(
|
||||
barrier.pending_update_mutex);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
// hard unlock in case of any exception.
|
||||
barrier.pending_update_mutex.unlock();
|
||||
}
|
||||
const auto regions_1_available = regionsAvailable(LAYOUT_1, DATA_1);
|
||||
const auto regions_2_available = regionsAvailable(LAYOUT_2, DATA_2);
|
||||
|
||||
// determine segment to use
|
||||
bool segment2_in_use = SharedMemory::RegionExists(LAYOUT_2);
|
||||
const storage::SharedDataType layout_region = [&] {
|
||||
return segment2_in_use ? LAYOUT_1 : LAYOUT_2;
|
||||
const SharedDataType layout_region = [&] {
|
||||
if (regions_1_available)
|
||||
{
|
||||
return LAYOUT_1;
|
||||
}
|
||||
if (regions_2_available)
|
||||
{
|
||||
return LAYOUT_2;
|
||||
}
|
||||
throw util::exception("No shared memory region free!");
|
||||
}();
|
||||
const storage::SharedDataType data_region = [&] { return segment2_in_use ? DATA_1 : DATA_2; }();
|
||||
const storage::SharedDataType previous_layout_region = [&] {
|
||||
return segment2_in_use ? LAYOUT_2 : LAYOUT_1;
|
||||
}();
|
||||
const storage::SharedDataType previous_data_region = [&] {
|
||||
return segment2_in_use ? DATA_2 : DATA_1;
|
||||
const SharedDataType data_region = [&] {
|
||||
if (regions_1_available)
|
||||
{
|
||||
return DATA_1;
|
||||
}
|
||||
if (regions_2_available)
|
||||
{
|
||||
return DATA_2;
|
||||
}
|
||||
throw util::exception("No shared memory region free!");
|
||||
}();
|
||||
|
||||
BOOST_ASSERT(regions_1_available || regions_2_available);
|
||||
|
||||
// Allocate a memory layout in shared memory, deallocate previous
|
||||
auto layout_memory = makeSharedMemory(layout_region, sizeof(SharedDataLayout));
|
||||
auto shared_layout_ptr = new (layout_memory->Ptr()) SharedDataLayout();
|
||||
@@ -606,8 +640,8 @@ int Storage::Run()
|
||||
std::uint64_t *osmnodeid_ptr = shared_layout_ptr->GetBlockPtr<std::uint64_t, true>(
|
||||
shared_memory_ptr, SharedDataLayout::OSM_NODE_ID_LIST);
|
||||
util::PackedVector<OSMNodeID, true> osmnodeid_list;
|
||||
osmnodeid_list.reset(
|
||||
osmnodeid_ptr, shared_layout_ptr->num_entries[storage::SharedDataLayout::OSM_NODE_ID_LIST]);
|
||||
osmnodeid_list.reset(osmnodeid_ptr,
|
||||
shared_layout_ptr->num_entries[SharedDataLayout::OSM_NODE_ID_LIST]);
|
||||
|
||||
extractor::QueryNode current_node;
|
||||
for (unsigned i = 0; i < coordinate_list_size; ++i)
|
||||
@@ -733,20 +767,41 @@ int Storage::Run()
|
||||
}
|
||||
|
||||
// acquire lock
|
||||
auto data_type_memory = makeSharedMemory(CURRENT_REGIONS, sizeof(SharedDataTimestamp), true, false, false);
|
||||
auto data_type_memory =
|
||||
makeSharedMemory(CURRENT_REGIONS, sizeof(SharedDataTimestamp), true, false, false);
|
||||
SharedDataTimestamp *data_timestamp_ptr =
|
||||
static_cast<SharedDataTimestamp *>(data_type_memory->Ptr());
|
||||
|
||||
{
|
||||
util::SimpleLogger().Write(logDEBUG) << "waiting for all queries to finish";
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_sharable_mutex> query_lock(barrier.query_mutex);
|
||||
util::SimpleLogger().Write(logDEBUG) << "all queries complete, switching over.";
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_upgradable_mutex>
|
||||
current_regions_exclusive_lock(std::move(current_regions_lock));
|
||||
|
||||
data_timestamp_ptr->layout = layout_region;
|
||||
data_timestamp_ptr->data = data_region;
|
||||
data_timestamp_ptr->timestamp += 1;
|
||||
deleteRegion(previous_data_region);
|
||||
deleteRegion(previous_layout_region);
|
||||
|
||||
boost::interprocess::upgradable_lock<boost::interprocess::named_upgradable_mutex>
|
||||
current_regions_upgradable_lock(std::move(current_regions_exclusive_lock));
|
||||
|
||||
util::SimpleLogger().Write(logDEBUG) << "waiting for server to switch dataset and request to finish";
|
||||
if (!regions_1_available)
|
||||
{
|
||||
BOOST_ASSERT(regions_2_available);
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_sharable_mutex>
|
||||
regions_1_lock(barriers.regions_1_mutex);
|
||||
util::SimpleLogger().Write(logDEBUG) << "switched. removing old regions 1";
|
||||
deleteRegion(DATA_1);
|
||||
deleteRegion(LAYOUT_1);
|
||||
}
|
||||
else if (!regions_2_available)
|
||||
{
|
||||
BOOST_ASSERT(regions_1_available);
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_sharable_mutex>
|
||||
regions_2_lock(barriers.regions_2_mutex);
|
||||
util::SimpleLogger().Write(logDEBUG) << "switched. removing regions 2";
|
||||
deleteRegion(DATA_2);
|
||||
deleteRegion(LAYOUT_2);
|
||||
}
|
||||
}
|
||||
util::SimpleLogger().Write() << "all data loaded";
|
||||
|
||||
|
||||
@@ -7,8 +7,9 @@ int main()
|
||||
{
|
||||
osrm::util::LogPolicy::GetInstance().Unmute();
|
||||
osrm::util::SimpleLogger().Write() << "Releasing all locks";
|
||||
osrm::storage::SharedBarriers barrier;
|
||||
barrier.pending_update_mutex.unlock();
|
||||
barrier.query_mutex.unlock();
|
||||
osrm::storage::SharedBarriers barriers;
|
||||
boost::interprocess::named_upgradable_mutex::remove("current_regions");
|
||||
boost::interprocess::named_sharable_mutex::remove("regions_1");
|
||||
boost::interprocess::named_sharable_mutex::remove("regions_2");
|
||||
return 0;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user