Use a shared (!) reader writer lock to protect CURRENT_REGIONS

This fixes issue #3016.
This commit is contained in:
Patrick Niklaus 2016-10-07 16:08:08 +02:00 committed by Patrick Niklaus
parent 036475afd0
commit 9eb7fc03ce
5 changed files with 30 additions and 94 deletions

View File

@ -1,7 +1,6 @@
#ifndef ENGINE_HPP #ifndef ENGINE_HPP
#define ENGINE_HPP #define ENGINE_HPP
#include "storage/shared_barriers.hpp"
#include "engine/status.hpp" #include "engine/status.hpp"
#include "util/json_container.hpp" #include "util/json_container.hpp"
@ -20,6 +19,11 @@ struct Object;
} }
} }
namespace storage
{
struct SharedBarriers;
}
// Fwd decls // Fwd decls
namespace engine namespace engine
{ {
@ -52,9 +56,6 @@ class BaseDataFacade;
class Engine final class Engine final
{ {
public: public:
// Needs to be public
struct EngineLock;
explicit Engine(const EngineConfig &config); explicit Engine(const EngineConfig &config);
Engine(Engine &&) noexcept; Engine(Engine &&) noexcept;
@ -71,7 +72,7 @@ class Engine final
Status Tile(const api::TileParameters &parameters, std::string &result) const; Status Tile(const api::TileParameters &parameters, std::string &result) const;
private: private:
std::unique_ptr<EngineLock> lock; std::unique_ptr<storage::SharedBarriers> lock;
std::unique_ptr<plugins::ViaRoutePlugin> route_plugin; std::unique_ptr<plugins::ViaRoutePlugin> route_plugin;
std::unique_ptr<plugins::TablePlugin> table_plugin; std::unique_ptr<plugins::TablePlugin> table_plugin;

View File

@ -3,6 +3,7 @@
#include <boost/interprocess/sync/named_condition.hpp> #include <boost/interprocess/sync/named_condition.hpp>
#include <boost/interprocess/sync/named_mutex.hpp> #include <boost/interprocess/sync/named_mutex.hpp>
#include <boost/interprocess/sync/named_sharable_mutex.hpp>
namespace osrm namespace osrm
{ {
@ -13,25 +14,13 @@ struct SharedBarriers
SharedBarriers() SharedBarriers()
: pending_update_mutex(boost::interprocess::open_or_create, "pending_update"), : pending_update_mutex(boost::interprocess::open_or_create, "pending_update"),
update_mutex(boost::interprocess::open_or_create, "update"), query_mutex(boost::interprocess::open_or_create, "query")
query_mutex(boost::interprocess::open_or_create, "query"),
no_running_queries_condition(boost::interprocess::open_or_create, "no_running_queries"),
update_ongoing(false), number_of_queries(0)
{ {
} }
// Mutex to protect access to the boolean variable // Mutex to protect access to the boolean variable
boost::interprocess::named_mutex pending_update_mutex; boost::interprocess::named_mutex pending_update_mutex;
boost::interprocess::named_mutex update_mutex; boost::interprocess::named_sharable_mutex query_mutex;
boost::interprocess::named_mutex query_mutex;
// Condition that no update is running
boost::interprocess::named_condition no_running_queries_condition;
// Is there an ongoing update?
bool update_ongoing;
// Is there any query?
int number_of_queries;
}; };
} }
} }

View File

@ -1,5 +1,5 @@
#include "engine/engine.hpp"
#include "engine/api/route_parameters.hpp" #include "engine/api/route_parameters.hpp"
#include "engine/engine.hpp"
#include "engine/engine_config.hpp" #include "engine/engine_config.hpp"
#include "engine/status.hpp" #include "engine/status.hpp"
@ -20,6 +20,7 @@
#include <boost/assert.hpp> #include <boost/assert.hpp>
#include <boost/interprocess/sync/named_condition.hpp> #include <boost/interprocess/sync/named_condition.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp> #include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/interprocess/sync/sharable_lock.hpp>
#include <boost/thread/lock_types.hpp> #include <boost/thread/lock_types.hpp>
#include <algorithm> #include <algorithm>
@ -27,65 +28,13 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
namespace osrm
{
namespace engine
{
struct Engine::EngineLock
{
// will only be initialized if shared memory is used
storage::SharedBarriers barrier;
// decrease number of concurrent queries
void DecreaseQueryCount();
// increase number of concurrent queries
void IncreaseQueryCount();
};
// decrease number of concurrent queries
void Engine::EngineLock::DecreaseQueryCount()
{
// lock query
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> query_lock(
barrier.query_mutex);
// decrement query count
--(barrier.number_of_queries);
BOOST_ASSERT_MSG(0 <= barrier.number_of_queries, "invalid number of queries");
// notify all processes that were waiting for this condition
if (0 == barrier.number_of_queries)
{
barrier.no_running_queries_condition.notify_all();
}
}
// increase number of concurrent queries
void Engine::EngineLock::IncreaseQueryCount()
{
// lock update pending
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> pending_lock(
barrier.pending_update_mutex);
// lock query
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> query_lock(
barrier.query_mutex);
// unlock update pending
pending_lock.unlock();
// increment query count
++(barrier.number_of_queries);
}
} // ns engine
} // ns osrm
namespace namespace
{ {
// Abstracted away the query locking into a template function // Abstracted away the query locking into a template function
// Works the same for every plugin. // Works the same for every plugin.
template <typename ParameterT, typename PluginT, typename ResultT> template <typename ParameterT, typename PluginT, typename ResultT>
osrm::engine::Status osrm::engine::Status
RunQuery(const std::unique_ptr<osrm::engine::Engine::EngineLock> &lock, RunQuery(const std::unique_ptr<osrm::storage::SharedBarriers> &lock,
const std::shared_ptr<osrm::engine::datafacade::BaseDataFacade> &facade, const std::shared_ptr<osrm::engine::datafacade::BaseDataFacade> &facade,
const ParameterT &parameters, const ParameterT &parameters,
PluginT &plugin, PluginT &plugin,
@ -97,7 +46,10 @@ RunQuery(const std::unique_ptr<osrm::engine::Engine::EngineLock> &lock,
} }
BOOST_ASSERT(lock); BOOST_ASSERT(lock);
lock->IncreaseQueryCount(); // this locks aquires shared ownership of the query mutex: other requets are allowed
// to run, but data updates need to wait for all queries to finish until they can aquire an exclusive lock
boost::interprocess::sharable_lock<boost::interprocess::named_sharable_mutex> query_lock(
lock->query_mutex);
auto &shared_facade = static_cast<osrm::engine::datafacade::SharedDataFacade &>(*facade); auto &shared_facade = static_cast<osrm::engine::datafacade::SharedDataFacade &>(*facade);
shared_facade.CheckAndReloadFacade(); shared_facade.CheckAndReloadFacade();
@ -107,7 +59,6 @@ RunQuery(const std::unique_ptr<osrm::engine::Engine::EngineLock> &lock,
osrm::engine::Status status = plugin.HandleRequest(facade, parameters, result); osrm::engine::Status status = plugin.HandleRequest(facade, parameters, result);
lock->DecreaseQueryCount();
return status; return status;
} }
@ -119,10 +70,11 @@ namespace engine
{ {
Engine::Engine(const EngineConfig &config) Engine::Engine(const EngineConfig &config)
: lock(config.use_shared_memory ? std::make_unique<storage::SharedBarriers>()
: std::unique_ptr<storage::SharedBarriers>())
{ {
if (config.use_shared_memory) if (config.use_shared_memory)
{ {
lock = std::make_unique<EngineLock>();
query_data_facade = std::make_shared<datafacade::SharedDataFacade>(); query_data_facade = std::make_shared<datafacade::SharedDataFacade>();
} }
else else

View File

@ -42,11 +42,6 @@ namespace osrm
namespace storage namespace storage
{ {
using RTreeLeaf = engine::datafacade::BaseDataFacade::RTreeLeaf;
using RTreeNode =
util::StaticRTree<RTreeLeaf, util::ShM<util::Coordinate, true>::vector, true>::TreeNode;
using QueryGraph = util::StaticGraph<contractor::QueryEdge::EdgeData>;
// delete a shared memory region. report warning if it could not be deleted // delete a shared memory region. report warning if it could not be deleted
void deleteRegion(const SharedDataType region) void deleteRegion(const SharedDataType region)
{ {
@ -76,6 +71,11 @@ void deleteRegion(const SharedDataType region)
} }
} }
using RTreeLeaf = engine::datafacade::BaseDataFacade::RTreeLeaf;
using RTreeNode =
util::StaticRTree<RTreeLeaf, util::ShM<util::Coordinate, true>::vector, true>::TreeNode;
using QueryGraph = util::StaticGraph<contractor::QueryEdge::EdgeData>;
Storage::Storage(StorageConfig config_) : config(std::move(config_)) {} Storage::Storage(StorageConfig config_) : config(std::move(config_)) {}
int Storage::Run() int Storage::Run()
@ -738,20 +738,15 @@ int Storage::Run()
SharedDataTimestamp *data_timestamp_ptr = SharedDataTimestamp *data_timestamp_ptr =
static_cast<SharedDataTimestamp *>(data_type_memory->Ptr()); static_cast<SharedDataTimestamp *>(data_type_memory->Ptr());
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> query_lock(
barrier.query_mutex);
// notify all processes that were waiting for this condition
if (0 < barrier.number_of_queries)
{ {
barrier.no_running_queries_condition.wait(query_lock); boost::interprocess::scoped_lock<boost::interprocess::named_sharable_mutex> query_lock(barrier.query_mutex);
}
data_timestamp_ptr->layout = layout_region; data_timestamp_ptr->layout = layout_region;
data_timestamp_ptr->data = data_region; data_timestamp_ptr->data = data_region;
data_timestamp_ptr->timestamp += 1; data_timestamp_ptr->timestamp += 1;
deleteRegion(previous_data_region); deleteRegion(previous_data_region);
deleteRegion(previous_layout_region); deleteRegion(previous_layout_region);
}
util::SimpleLogger().Write() << "all data loaded"; util::SimpleLogger().Write() << "all data loaded";
return EXIT_SUCCESS; return EXIT_SUCCESS;

View File

@ -10,6 +10,5 @@ int main()
osrm::storage::SharedBarriers barrier; osrm::storage::SharedBarriers barrier;
barrier.pending_update_mutex.unlock(); barrier.pending_update_mutex.unlock();
barrier.query_mutex.unlock(); barrier.query_mutex.unlock();
barrier.update_mutex.unlock();
return 0; return 0;
} }