Make SharedDataFacade immutable
This commit is contained in:
parent
9af00b1925
commit
ceaf362326
84
include/engine/data_watchdog.hpp
Normal file
84
include/engine/data_watchdog.hpp
Normal file
@ -0,0 +1,84 @@
|
||||
#include "engine/datafacade/shared_datafacade.hpp"
|
||||
|
||||
#include "storage/shared_datatype.hpp"
|
||||
#include "storage/shared_memory.hpp"
|
||||
|
||||
#include <boost/thread/shared_mutex.hpp>
|
||||
#include <boost/thread/lock_types.hpp>
|
||||
|
||||
#include <memory>
|
||||
|
||||
namespace osrm
|
||||
{
|
||||
namespace engine
|
||||
{
|
||||
|
||||
// This class monitors the shared memory region that contains the pointers to
|
||||
// the data and layout regions that should be used. This region is updated
|
||||
// once a new dataset arrives.
|
||||
//
|
||||
// TODO: This also needs a shared memory reader lock with other clients and
|
||||
// possibly osrm-datastore since updating the CURRENT_REGIONS data is not atomic.
|
||||
// Currently we enfore this by waiting that all queries have finished before
|
||||
// osrm-datastore writes to this section.
|
||||
class DataWatchdog
|
||||
{
|
||||
public:
|
||||
DataWatchdog()
|
||||
: shared_regions(storage::makeSharedMemory(
|
||||
storage::CURRENT_REGIONS, sizeof(storage::SharedDataTimestamp), false, false)),
|
||||
current_timestamp {storage::LAYOUT_NONE, storage::DATA_NONE, 0}
|
||||
{
|
||||
}
|
||||
|
||||
// Tries to connect to the shared memory containing the regions table
|
||||
static bool TryConnect()
|
||||
{
|
||||
return storage::SharedMemory::RegionExists(storage::CURRENT_REGIONS);
|
||||
}
|
||||
|
||||
// Check if it might be worth to try to aquire a exclusive lock
|
||||
bool HasNewRegion() const
|
||||
{
|
||||
const boost::shared_lock<boost::shared_mutex> lock(current_timestamp_mutex);
|
||||
|
||||
const auto shared_timestamp =
|
||||
static_cast<const storage::SharedDataTimestamp *>(shared_regions->Ptr());
|
||||
|
||||
return shared_timestamp->layout != current_timestamp.layout ||
|
||||
shared_timestamp->data != current_timestamp.data ||
|
||||
shared_timestamp->timestamp != current_timestamp.timestamp;
|
||||
}
|
||||
|
||||
// Note this can still return an emptry pointer if this function got overtaken by another thread
|
||||
std::shared_ptr<datafacade::SharedDataFacade> MaybeLoadNewRegion()
|
||||
{
|
||||
const boost::lock_guard<boost::shared_mutex> lock(current_timestamp_mutex);
|
||||
|
||||
const auto shared_timestamp =
|
||||
static_cast<const storage::SharedDataTimestamp *>(shared_regions->Ptr());
|
||||
|
||||
if (shared_timestamp->timestamp == current_timestamp.timestamp)
|
||||
{
|
||||
BOOST_ASSERT(shared_timestamp->layout == current_timestamp.layout);
|
||||
BOOST_ASSERT(shared_timestamp->data == current_timestamp.data);
|
||||
return std::shared_ptr<datafacade::SharedDataFacade>();
|
||||
}
|
||||
|
||||
current_timestamp = *shared_timestamp;
|
||||
|
||||
return std::make_shared<datafacade::SharedDataFacade>(
|
||||
current_timestamp.layout, current_timestamp.data, current_timestamp.timestamp);
|
||||
}
|
||||
|
||||
private:
|
||||
// shared memory table containing pointers to all shared regions
|
||||
std::unique_ptr<storage::SharedMemory> shared_regions;
|
||||
|
||||
// mutexes should be mutable even on const objects: This enables
|
||||
// marking functions as logical const and thread-safe.
|
||||
mutable boost::shared_mutex current_timestamp_mutex;
|
||||
storage::SharedDataTimestamp current_timestamp;
|
||||
};
|
||||
}
|
||||
}
|
@ -16,6 +16,7 @@
|
||||
#include "util/guidance/turn_lanes.hpp"
|
||||
|
||||
#include "engine/geospatial_query.hpp"
|
||||
#include "util/packed_vector.hpp"
|
||||
#include "util/range_table.hpp"
|
||||
#include "util/rectangle.hpp"
|
||||
#include "util/simple_logger.hpp"
|
||||
@ -63,11 +64,10 @@ class SharedDataFacade final : public BaseDataFacade
|
||||
|
||||
storage::SharedDataLayout *data_layout;
|
||||
char *shared_memory;
|
||||
storage::SharedDataTimestamp *data_timestamp_ptr;
|
||||
|
||||
storage::SharedDataType CURRENT_LAYOUT;
|
||||
storage::SharedDataType CURRENT_DATA;
|
||||
unsigned CURRENT_TIMESTAMP;
|
||||
storage::SharedDataType layout_region;
|
||||
storage::SharedDataType data_region;
|
||||
unsigned shared_timestamp;
|
||||
|
||||
unsigned m_check_sum;
|
||||
std::unique_ptr<QueryGraph> m_query_graph;
|
||||
@ -142,6 +142,17 @@ class SharedDataFacade final : public BaseDataFacade
|
||||
{
|
||||
BOOST_ASSERT_MSG(!m_coordinate_list.empty(), "coordinates must be loaded before r-tree");
|
||||
|
||||
const auto file_index_ptr = data_layout->GetBlockPtr<char>(
|
||||
shared_memory, storage::SharedDataLayout::FILE_INDEX_PATH);
|
||||
file_index_path = boost::filesystem::path(file_index_ptr);
|
||||
if (!boost::filesystem::exists(file_index_path))
|
||||
{
|
||||
util::SimpleLogger().Write(logDEBUG) << "Leaf file name "
|
||||
<< file_index_path.string();
|
||||
throw util::exception("Could not load leaf index file. "
|
||||
"Is any data loaded into shared memory?");
|
||||
}
|
||||
|
||||
auto tree_ptr = data_layout->GetBlockPtr<RTreeNode>(
|
||||
shared_memory, storage::SharedDataLayout::R_SEARCH_TREE);
|
||||
m_static_rtree.reset(
|
||||
@ -176,6 +187,11 @@ class SharedDataFacade final : public BaseDataFacade
|
||||
coordinate_list_ptr,
|
||||
data_layout->num_entries[storage::SharedDataLayout::COORDINATE_LIST]);
|
||||
|
||||
for (unsigned i = 0; i < m_coordinate_list.size(); ++i)
|
||||
{
|
||||
BOOST_ASSERT(GetCoordinateOfNode(i).IsValid());
|
||||
}
|
||||
|
||||
auto osmnodeid_list_ptr = data_layout->GetBlockPtr<std::uint64_t>(
|
||||
shared_memory, storage::SharedDataLayout::OSM_NODE_ID_LIST);
|
||||
m_osmnodeid_list.reset(
|
||||
@ -366,80 +382,20 @@ class SharedDataFacade final : public BaseDataFacade
|
||||
public:
|
||||
virtual ~SharedDataFacade() {}
|
||||
|
||||
boost::shared_mutex data_mutex;
|
||||
|
||||
SharedDataFacade()
|
||||
SharedDataFacade(storage::SharedDataType layout_region_, storage::SharedDataType data_region_, unsigned shared_timestamp_)
|
||||
: layout_region(layout_region_), data_region(data_region_), shared_timestamp(shared_timestamp_)
|
||||
{
|
||||
if (!storage::SharedMemory::RegionExists(storage::CURRENT_REGIONS))
|
||||
{
|
||||
throw util::exception(
|
||||
"No shared memory blocks found, have you forgotten to run osrm-datastore?");
|
||||
}
|
||||
data_timestamp_ptr = static_cast<storage::SharedDataTimestamp *>(
|
||||
storage::makeSharedMemory(
|
||||
storage::CURRENT_REGIONS, sizeof(storage::SharedDataTimestamp), false, false)
|
||||
->Ptr());
|
||||
CURRENT_LAYOUT = storage::LAYOUT_NONE;
|
||||
CURRENT_DATA = storage::DATA_NONE;
|
||||
CURRENT_TIMESTAMP = 0;
|
||||
util::SimpleLogger().Write(logDEBUG) << "Loading new data with shared timestamp " << shared_timestamp;
|
||||
|
||||
// load data
|
||||
CheckAndReloadFacade();
|
||||
}
|
||||
|
||||
void CheckAndReloadFacade()
|
||||
{
|
||||
if (CURRENT_LAYOUT != data_timestamp_ptr->layout ||
|
||||
CURRENT_DATA != data_timestamp_ptr->data ||
|
||||
CURRENT_TIMESTAMP != data_timestamp_ptr->timestamp)
|
||||
{
|
||||
// Get exclusive lock
|
||||
util::SimpleLogger().Write(logDEBUG) << "Updates available, getting exclusive lock";
|
||||
const boost::lock_guard<boost::shared_mutex> lock(data_mutex);
|
||||
|
||||
if (CURRENT_LAYOUT != data_timestamp_ptr->layout ||
|
||||
CURRENT_DATA != data_timestamp_ptr->data)
|
||||
{
|
||||
// release the previous shared memory segments
|
||||
storage::SharedMemory::Remove(CURRENT_LAYOUT);
|
||||
storage::SharedMemory::Remove(CURRENT_DATA);
|
||||
|
||||
CURRENT_LAYOUT = data_timestamp_ptr->layout;
|
||||
CURRENT_DATA = data_timestamp_ptr->data;
|
||||
CURRENT_TIMESTAMP = 0; // Force trigger a reload
|
||||
|
||||
util::SimpleLogger().Write(logDEBUG)
|
||||
<< "Current layout was different to new layout, swapping";
|
||||
}
|
||||
else
|
||||
{
|
||||
util::SimpleLogger().Write(logDEBUG)
|
||||
<< "Current layout was same to new layout, not swapping";
|
||||
}
|
||||
|
||||
if (CURRENT_TIMESTAMP != data_timestamp_ptr->timestamp)
|
||||
{
|
||||
CURRENT_TIMESTAMP = data_timestamp_ptr->timestamp;
|
||||
|
||||
util::SimpleLogger().Write(logDEBUG) << "Performing data reload";
|
||||
m_layout_memory.reset(storage::makeSharedMemory(CURRENT_LAYOUT));
|
||||
BOOST_ASSERT(storage::SharedMemory::RegionExists(layout_region));
|
||||
m_layout_memory.reset(storage::makeSharedMemory(layout_region));
|
||||
|
||||
data_layout = static_cast<storage::SharedDataLayout *>(m_layout_memory->Ptr());
|
||||
|
||||
m_large_memory.reset(storage::makeSharedMemory(CURRENT_DATA));
|
||||
BOOST_ASSERT(storage::SharedMemory::RegionExists(data_region));
|
||||
m_large_memory.reset(storage::makeSharedMemory(data_region));
|
||||
shared_memory = (char *)(m_large_memory->Ptr());
|
||||
|
||||
const auto file_index_ptr = data_layout->GetBlockPtr<char>(
|
||||
shared_memory, storage::SharedDataLayout::FILE_INDEX_PATH);
|
||||
file_index_path = boost::filesystem::path(file_index_ptr);
|
||||
if (!boost::filesystem::exists(file_index_path))
|
||||
{
|
||||
util::SimpleLogger().Write(logDEBUG) << "Leaf file name "
|
||||
<< file_index_path.string();
|
||||
throw util::exception("Could not load leaf index file. "
|
||||
"Is any data loaded into shared memory?");
|
||||
}
|
||||
|
||||
LoadGraph();
|
||||
LoadChecksum();
|
||||
LoadNodeAndEdgeInformation();
|
||||
@ -452,16 +408,6 @@ class SharedDataFacade final : public BaseDataFacade
|
||||
LoadProfileProperties();
|
||||
LoadRTree();
|
||||
LoadIntersectionClasses();
|
||||
|
||||
util::SimpleLogger().Write() << "number of geometries: "
|
||||
<< m_coordinate_list.size();
|
||||
for (unsigned i = 0; i < m_coordinate_list.size(); ++i)
|
||||
{
|
||||
BOOST_ASSERT(GetCoordinateOfNode(i).IsValid());
|
||||
}
|
||||
}
|
||||
util::SimpleLogger().Write(logDEBUG) << "Releasing exclusive lock";
|
||||
}
|
||||
}
|
||||
|
||||
// search graph access
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include "util/json_container.hpp"
|
||||
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
|
||||
@ -53,6 +54,8 @@ namespace datafacade
|
||||
class BaseDataFacade;
|
||||
}
|
||||
|
||||
class DataWatchdog;
|
||||
|
||||
class Engine final
|
||||
{
|
||||
public:
|
||||
@ -73,6 +76,7 @@ class Engine final
|
||||
|
||||
private:
|
||||
std::unique_ptr<storage::SharedBarriers> lock;
|
||||
std::unique_ptr<DataWatchdog> watchdog;
|
||||
|
||||
std::unique_ptr<plugins::ViaRoutePlugin> route_plugin;
|
||||
std::unique_ptr<plugins::TablePlugin> table_plugin;
|
||||
@ -81,7 +85,10 @@ class Engine final
|
||||
std::unique_ptr<plugins::MatchPlugin> match_plugin;
|
||||
std::unique_ptr<plugins::TilePlugin> tile_plugin;
|
||||
|
||||
std::shared_ptr<datafacade::BaseDataFacade> query_data_facade;
|
||||
// reading and setting this is protected by lock
|
||||
mutable std::shared_ptr<datafacade::BaseDataFacade> query_data_facade;
|
||||
// ensures that when we set facade we can do it without race conditions
|
||||
mutable std::unique_ptr<std::mutex> facade_update_mutex;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -2,6 +2,7 @@
|
||||
#include "engine/api/route_parameters.hpp"
|
||||
#include "engine/engine_config.hpp"
|
||||
#include "engine/status.hpp"
|
||||
#include "engine/data_watchdog.hpp"
|
||||
|
||||
#include "engine/plugins/match.hpp"
|
||||
#include "engine/plugins/nearest.hpp"
|
||||
@ -36,7 +37,9 @@ namespace
|
||||
template <typename ParameterT, typename PluginT, typename ResultT>
|
||||
osrm::engine::Status
|
||||
RunQuery(const std::unique_ptr<osrm::storage::SharedBarriers> &lock,
|
||||
const std::shared_ptr<osrm::engine::datafacade::BaseDataFacade> &facade,
|
||||
std::mutex &facade_update_mutex,
|
||||
osrm::engine::DataWatchdog& watchdog,
|
||||
std::shared_ptr<osrm::engine::datafacade::BaseDataFacade> &facade,
|
||||
const ParameterT ¶meters,
|
||||
PluginT &plugin,
|
||||
ResultT &result)
|
||||
@ -53,11 +56,32 @@ RunQuery(const std::unique_ptr<osrm::storage::SharedBarriers> &lock,
|
||||
boost::interprocess::sharable_lock<boost::interprocess::named_sharable_mutex> query_lock(
|
||||
lock->query_mutex);
|
||||
|
||||
auto &shared_facade = static_cast<osrm::engine::datafacade::SharedDataFacade &>(*facade);
|
||||
shared_facade.CheckAndReloadFacade();
|
||||
// Get a shared data lock so that other threads won't update
|
||||
// things while the query is running
|
||||
boost::shared_lock<boost::shared_mutex> data_lock{shared_facade.data_mutex};
|
||||
{
|
||||
// this lock ensures that we are never overtaken while creating a new
|
||||
// facade and setting it.
|
||||
// This is important since we need to ensure there is always exactly
|
||||
// one facade per shared memory region.
|
||||
// TODO: Remove this once the SharedDataFacade doesn't own the shared memory
|
||||
// segment anymore.
|
||||
std::lock_guard<std::mutex> update_lock(facade_update_mutex);
|
||||
|
||||
if (watchdog.HasNewRegion())
|
||||
{
|
||||
|
||||
auto new_facade = watchdog.MaybeLoadNewRegion();
|
||||
// for now the external locking will ensure that loading the new region
|
||||
// will ways work. In the future we might allow being overtaken
|
||||
// by other threads and they will also try to update.
|
||||
if (new_facade)
|
||||
{
|
||||
// TODO remove once we allow for more then one SharedMemoryFacade at the same time
|
||||
// at this point no other query is allowed to reference this facade!
|
||||
// the old facade will die exactly here
|
||||
BOOST_ASSERT(facade.use_count() == 1);
|
||||
facade = std::move(new_facade);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
osrm::engine::Status status = plugin.HandleRequest(facade, parameters, result);
|
||||
|
||||
@ -77,7 +101,20 @@ Engine::Engine(const EngineConfig &config)
|
||||
{
|
||||
if (config.use_shared_memory)
|
||||
{
|
||||
query_data_facade = std::make_shared<datafacade::SharedDataFacade>();
|
||||
if (!DataWatchdog::TryConnect())
|
||||
{
|
||||
throw util::exception(
|
||||
"No shared memory blocks found, have you forgotten to run osrm-datastore?");
|
||||
}
|
||||
|
||||
facade_update_mutex = std::make_unique<std::mutex>();
|
||||
watchdog = std::make_unique<DataWatchdog>();
|
||||
// this will always either return a value or throw an exception
|
||||
// in the initial run
|
||||
query_data_facade = watchdog->MaybeLoadNewRegion();
|
||||
BOOST_ASSERT(query_data_facade);
|
||||
BOOST_ASSERT(watchdog);
|
||||
BOOST_ASSERT(lock);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -106,32 +143,32 @@ Engine &Engine::operator=(Engine &&) noexcept = default;
|
||||
|
||||
Status Engine::Route(const api::RouteParameters ¶ms, util::json::Object &result) const
|
||||
{
|
||||
return RunQuery(lock, query_data_facade, params, *route_plugin, result);
|
||||
return RunQuery(lock, *facade_update_mutex, *watchdog, query_data_facade, params, *route_plugin, result);
|
||||
}
|
||||
|
||||
Status Engine::Table(const api::TableParameters ¶ms, util::json::Object &result) const
|
||||
{
|
||||
return RunQuery(lock, query_data_facade, params, *table_plugin, result);
|
||||
return RunQuery(lock, *facade_update_mutex, *watchdog, query_data_facade, params, *table_plugin, result);
|
||||
}
|
||||
|
||||
Status Engine::Nearest(const api::NearestParameters ¶ms, util::json::Object &result) const
|
||||
{
|
||||
return RunQuery(lock, query_data_facade, params, *nearest_plugin, result);
|
||||
return RunQuery(lock, *facade_update_mutex, *watchdog, query_data_facade, params, *nearest_plugin, result);
|
||||
}
|
||||
|
||||
Status Engine::Trip(const api::TripParameters ¶ms, util::json::Object &result) const
|
||||
{
|
||||
return RunQuery(lock, query_data_facade, params, *trip_plugin, result);
|
||||
return RunQuery(lock, *facade_update_mutex, *watchdog, query_data_facade, params, *trip_plugin, result);
|
||||
}
|
||||
|
||||
Status Engine::Match(const api::MatchParameters ¶ms, util::json::Object &result) const
|
||||
{
|
||||
return RunQuery(lock, query_data_facade, params, *match_plugin, result);
|
||||
return RunQuery(lock, *facade_update_mutex, *watchdog, query_data_facade, params, *match_plugin, result);
|
||||
}
|
||||
|
||||
Status Engine::Tile(const api::TileParameters ¶ms, std::string &result) const
|
||||
{
|
||||
return RunQuery(lock, query_data_facade, params, *tile_plugin, result);
|
||||
return RunQuery(lock, *facade_update_mutex, *watchdog, query_data_facade, params, *tile_plugin, result);
|
||||
}
|
||||
|
||||
} // engine ns
|
||||
|
Loading…
Reference in New Issue
Block a user