Simplify the locking logic
This commit is contained in:
parent
ebac9f586b
commit
1557ff81bc
@ -6,8 +6,8 @@
|
|||||||
#include "storage/shared_datatype.hpp"
|
#include "storage/shared_datatype.hpp"
|
||||||
#include "storage/shared_memory.hpp"
|
#include "storage/shared_memory.hpp"
|
||||||
|
|
||||||
#include <boost/thread/shared_mutex.hpp>
|
|
||||||
#include <boost/thread/lock_types.hpp>
|
#include <boost/thread/lock_types.hpp>
|
||||||
|
#include <boost/thread/shared_mutex.hpp>
|
||||||
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
|
||||||
@ -47,31 +47,43 @@ class DataWatchdog
|
|||||||
const auto shared_timestamp =
|
const auto shared_timestamp =
|
||||||
static_cast<const storage::SharedDataTimestamp *>(shared_regions->Ptr());
|
static_cast<const storage::SharedDataTimestamp *>(shared_regions->Ptr());
|
||||||
|
|
||||||
return shared_timestamp->layout != current_timestamp.layout ||
|
// sanity check: if the timestamp is the same all other data needs to be the same as well
|
||||||
shared_timestamp->data != current_timestamp.data ||
|
BOOST_ASSERT(shared_timestamp->timestamp != current_timestamp.timestamp ||
|
||||||
shared_timestamp->timestamp != current_timestamp.timestamp;
|
(shared_timestamp->layout == current_timestamp.layout &&
|
||||||
|
shared_timestamp->data == current_timestamp.data));
|
||||||
|
|
||||||
|
return shared_timestamp->timestamp != current_timestamp.timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note this can still return an emptry pointer if this function got overtaken by another thread
|
// This will either update the contens of facade or just leave it as is
|
||||||
std::shared_ptr<datafacade::SharedDataFacade> MaybeLoadNewRegion()
|
// if the update was already done by another thread
|
||||||
|
void MaybeLoadNewRegion(std::shared_ptr<datafacade::BaseDataFacade> &facade)
|
||||||
{
|
{
|
||||||
const boost::lock_guard<boost::shared_mutex> lock(current_timestamp_mutex);
|
const boost::lock_guard<boost::shared_mutex> lock(current_timestamp_mutex);
|
||||||
|
|
||||||
const auto shared_timestamp =
|
const auto shared_timestamp =
|
||||||
static_cast<const storage::SharedDataTimestamp *>(shared_regions->Ptr());
|
static_cast<const storage::SharedDataTimestamp *>(shared_regions->Ptr());
|
||||||
|
|
||||||
|
// if more then one request tried to aquire the write lock
|
||||||
|
// we might get overtaken before we actually do the writing
|
||||||
|
// in that case we don't modify anthing
|
||||||
if (shared_timestamp->timestamp == current_timestamp.timestamp)
|
if (shared_timestamp->timestamp == current_timestamp.timestamp)
|
||||||
{
|
{
|
||||||
BOOST_ASSERT(shared_timestamp->layout == current_timestamp.layout);
|
BOOST_ASSERT(shared_timestamp->layout == current_timestamp.layout);
|
||||||
BOOST_ASSERT(shared_timestamp->data == current_timestamp.data);
|
BOOST_ASSERT(shared_timestamp->data == current_timestamp.data);
|
||||||
return std::shared_ptr<datafacade::SharedDataFacade>();
|
|
||||||
}
|
}
|
||||||
|
// this thread has won and can update the data
|
||||||
|
else
|
||||||
|
{
|
||||||
current_timestamp = *shared_timestamp;
|
current_timestamp = *shared_timestamp;
|
||||||
|
// TODO remove once we allow for more then one SharedMemoryFacade at the same time
|
||||||
return std::make_shared<datafacade::SharedDataFacade>(
|
// at this point no other query is allowed to reference this facade!
|
||||||
|
// the old facade will die exactly here
|
||||||
|
BOOST_ASSERT(!facade || facade.use_count() == 1);
|
||||||
|
facade = std::make_shared<datafacade::SharedDataFacade>(
|
||||||
current_timestamp.layout, current_timestamp.data, current_timestamp.timestamp);
|
current_timestamp.layout, current_timestamp.data, current_timestamp.timestamp);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// shared memory table containing pointers to all shared regions
|
// shared memory table containing pointers to all shared regions
|
||||||
|
@ -85,10 +85,8 @@ class Engine final
|
|||||||
std::unique_ptr<plugins::MatchPlugin> match_plugin;
|
std::unique_ptr<plugins::MatchPlugin> match_plugin;
|
||||||
std::unique_ptr<plugins::TilePlugin> tile_plugin;
|
std::unique_ptr<plugins::TilePlugin> tile_plugin;
|
||||||
|
|
||||||
// reading and setting this is protected by lock
|
// reading and setting this is protected by locking in the watchdog
|
||||||
mutable std::shared_ptr<datafacade::BaseDataFacade> query_data_facade;
|
mutable std::shared_ptr<datafacade::BaseDataFacade> query_data_facade;
|
||||||
// ensures that when we set facade we can do it without race conditions
|
|
||||||
mutable std::unique_ptr<std::mutex> facade_update_mutex;
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -37,7 +37,6 @@ namespace
|
|||||||
template <typename ParameterT, typename PluginT, typename ResultT>
|
template <typename ParameterT, typename PluginT, typename ResultT>
|
||||||
osrm::engine::Status
|
osrm::engine::Status
|
||||||
RunQuery(const std::unique_ptr<osrm::storage::SharedBarriers> &lock,
|
RunQuery(const std::unique_ptr<osrm::storage::SharedBarriers> &lock,
|
||||||
std::mutex &facade_update_mutex,
|
|
||||||
osrm::engine::DataWatchdog& watchdog,
|
osrm::engine::DataWatchdog& watchdog,
|
||||||
std::shared_ptr<osrm::engine::datafacade::BaseDataFacade> &facade,
|
std::shared_ptr<osrm::engine::datafacade::BaseDataFacade> &facade,
|
||||||
const ParameterT ¶meters,
|
const ParameterT ¶meters,
|
||||||
@ -56,31 +55,9 @@ RunQuery(const std::unique_ptr<osrm::storage::SharedBarriers> &lock,
|
|||||||
boost::interprocess::sharable_lock<boost::interprocess::named_sharable_mutex> query_lock(
|
boost::interprocess::sharable_lock<boost::interprocess::named_sharable_mutex> query_lock(
|
||||||
lock->query_mutex);
|
lock->query_mutex);
|
||||||
|
|
||||||
{
|
|
||||||
// this lock ensures that we are never overtaken while creating a new
|
|
||||||
// facade and setting it.
|
|
||||||
// This is important since we need to ensure there is always exactly
|
|
||||||
// one facade per shared memory region.
|
|
||||||
// TODO: Remove this once the SharedDataFacade doesn't own the shared memory
|
|
||||||
// segment anymore.
|
|
||||||
std::lock_guard<std::mutex> update_lock(facade_update_mutex);
|
|
||||||
|
|
||||||
if (watchdog.HasNewRegion())
|
if (watchdog.HasNewRegion())
|
||||||
{
|
{
|
||||||
|
watchdog.MaybeLoadNewRegion(facade);
|
||||||
auto new_facade = watchdog.MaybeLoadNewRegion();
|
|
||||||
// for now the external locking will ensure that loading the new region
|
|
||||||
// will ways work. In the future we might allow being overtaken
|
|
||||||
// by other threads and they will also try to update.
|
|
||||||
if (new_facade)
|
|
||||||
{
|
|
||||||
// TODO remove once we allow for more then one SharedMemoryFacade at the same time
|
|
||||||
// at this point no other query is allowed to reference this facade!
|
|
||||||
// the old facade will die exactly here
|
|
||||||
BOOST_ASSERT(facade.use_count() == 1);
|
|
||||||
facade = std::move(new_facade);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
osrm::engine::Status status = plugin.HandleRequest(facade, parameters, result);
|
osrm::engine::Status status = plugin.HandleRequest(facade, parameters, result);
|
||||||
@ -111,7 +88,7 @@ Engine::Engine(const EngineConfig &config)
|
|||||||
watchdog = std::make_unique<DataWatchdog>();
|
watchdog = std::make_unique<DataWatchdog>();
|
||||||
// this will always either return a value or throw an exception
|
// this will always either return a value or throw an exception
|
||||||
// in the initial run
|
// in the initial run
|
||||||
query_data_facade = watchdog->MaybeLoadNewRegion();
|
watchdog->MaybeLoadNewRegion(query_data_facade);
|
||||||
BOOST_ASSERT(query_data_facade);
|
BOOST_ASSERT(query_data_facade);
|
||||||
BOOST_ASSERT(watchdog);
|
BOOST_ASSERT(watchdog);
|
||||||
BOOST_ASSERT(lock);
|
BOOST_ASSERT(lock);
|
||||||
@ -143,32 +120,32 @@ Engine &Engine::operator=(Engine &&) noexcept = default;
|
|||||||
|
|
||||||
Status Engine::Route(const api::RouteParameters ¶ms, util::json::Object &result) const
|
Status Engine::Route(const api::RouteParameters ¶ms, util::json::Object &result) const
|
||||||
{
|
{
|
||||||
return RunQuery(lock, *facade_update_mutex, *watchdog, query_data_facade, params, *route_plugin, result);
|
return RunQuery(lock, *watchdog, query_data_facade, params, *route_plugin, result);
|
||||||
}
|
}
|
||||||
|
|
||||||
Status Engine::Table(const api::TableParameters ¶ms, util::json::Object &result) const
|
Status Engine::Table(const api::TableParameters ¶ms, util::json::Object &result) const
|
||||||
{
|
{
|
||||||
return RunQuery(lock, *facade_update_mutex, *watchdog, query_data_facade, params, *table_plugin, result);
|
return RunQuery(lock, *watchdog, query_data_facade, params, *table_plugin, result);
|
||||||
}
|
}
|
||||||
|
|
||||||
Status Engine::Nearest(const api::NearestParameters ¶ms, util::json::Object &result) const
|
Status Engine::Nearest(const api::NearestParameters ¶ms, util::json::Object &result) const
|
||||||
{
|
{
|
||||||
return RunQuery(lock, *facade_update_mutex, *watchdog, query_data_facade, params, *nearest_plugin, result);
|
return RunQuery(lock, *watchdog, query_data_facade, params, *nearest_plugin, result);
|
||||||
}
|
}
|
||||||
|
|
||||||
Status Engine::Trip(const api::TripParameters ¶ms, util::json::Object &result) const
|
Status Engine::Trip(const api::TripParameters ¶ms, util::json::Object &result) const
|
||||||
{
|
{
|
||||||
return RunQuery(lock, *facade_update_mutex, *watchdog, query_data_facade, params, *trip_plugin, result);
|
return RunQuery(lock, *watchdog, query_data_facade, params, *trip_plugin, result);
|
||||||
}
|
}
|
||||||
|
|
||||||
Status Engine::Match(const api::MatchParameters ¶ms, util::json::Object &result) const
|
Status Engine::Match(const api::MatchParameters ¶ms, util::json::Object &result) const
|
||||||
{
|
{
|
||||||
return RunQuery(lock, *facade_update_mutex, *watchdog, query_data_facade, params, *match_plugin, result);
|
return RunQuery(lock, *watchdog, query_data_facade, params, *match_plugin, result);
|
||||||
}
|
}
|
||||||
|
|
||||||
Status Engine::Tile(const api::TileParameters ¶ms, std::string &result) const
|
Status Engine::Tile(const api::TileParameters ¶ms, std::string &result) const
|
||||||
{
|
{
|
||||||
return RunQuery(lock, *facade_update_mutex, *watchdog, query_data_facade, params, *tile_plugin, result);
|
return RunQuery(lock, *watchdog, query_data_facade, params, *tile_plugin, result);
|
||||||
}
|
}
|
||||||
|
|
||||||
} // engine ns
|
} // engine ns
|
||||||
|
Loading…
Reference in New Issue
Block a user