Only keep reader lock on shared memory during queries.

This commit is contained in:
Patrick Niklaus
2016-10-12 01:09:20 +02:00
committed by Patrick Niklaus
parent c69545c47a
commit 847f530c8e
7 changed files with 174 additions and 301 deletions
+21 -23
View File
@@ -1,8 +1,8 @@
#include "engine/engine.hpp"
#include "engine/api/route_parameters.hpp"
#include "engine/data_watchdog.hpp"
#include "engine/engine.hpp"
#include "engine/engine_config.hpp"
#include "engine/status.hpp"
#include "engine/data_watchdog.hpp"
#include "engine/plugins/match.hpp"
#include "engine/plugins/nearest.hpp"
@@ -34,21 +34,23 @@ namespace
// Abstracted away the query locking into a template function
// Works the same for every plugin.
template <typename ParameterT, typename PluginT, typename ResultT>
osrm::engine::Status
RunQuery(const std::unique_ptr<osrm::engine::DataWatchdog>& watchdog,
std::shared_ptr<osrm::engine::datafacade::BaseDataFacade> &facade,
const ParameterT &parameters,
PluginT &plugin,
ResultT &result)
osrm::engine::Status RunQuery(const std::unique_ptr<osrm::engine::DataWatchdog> &watchdog,
const std::shared_ptr<osrm::engine::datafacade::BaseDataFacade> &facade,
const ParameterT &parameters,
PluginT &plugin,
ResultT &result)
{
if (watchdog && watchdog->HasNewRegion())
if (watchdog)
{
watchdog->MaybeLoadNewRegion(facade);
BOOST_ASSERT(!facade);
auto lock_and_facade = watchdog->GetDataFacade();
return plugin.HandleRequest(lock_and_facade.second, parameters, result);
}
osrm::engine::Status status = plugin.HandleRequest(facade, parameters, result);
BOOST_ASSERT(facade);
return status;
return plugin.HandleRequest(facade, parameters, result);
}
} // anon. ns
@@ -71,10 +73,6 @@ Engine::Engine(const EngineConfig &config)
}
watchdog = std::make_unique<DataWatchdog>();
// this will always either return a value or throw an exception
// in the initial run
watchdog->MaybeLoadNewRegion(query_data_facade);
BOOST_ASSERT(query_data_facade);
BOOST_ASSERT(watchdog);
}
else
@@ -83,7 +81,7 @@ Engine::Engine(const EngineConfig &config)
{
throw util::exception("Invalid file paths given!");
}
query_data_facade = std::make_shared<datafacade::InternalDataFacade>(config.storage_config);
immutable_data_facade = std::make_shared<datafacade::InternalDataFacade>(config.storage_config);
}
// Register plugins
@@ -104,32 +102,32 @@ Engine &Engine::operator=(Engine &&) noexcept = default;
Status Engine::Route(const api::RouteParameters &params, util::json::Object &result) const
{
return RunQuery(watchdog, query_data_facade, params, *route_plugin, result);
return RunQuery(watchdog, immutable_data_facade, params, *route_plugin, result);
}
Status Engine::Table(const api::TableParameters &params, util::json::Object &result) const
{
return RunQuery(watchdog, query_data_facade, params, *table_plugin, result);
return RunQuery(watchdog, immutable_data_facade, params, *table_plugin, result);
}
Status Engine::Nearest(const api::NearestParameters &params, util::json::Object &result) const
{
return RunQuery(watchdog, query_data_facade, params, *nearest_plugin, result);
return RunQuery(watchdog, immutable_data_facade, params, *nearest_plugin, result);
}
Status Engine::Trip(const api::TripParameters &params, util::json::Object &result) const
{
return RunQuery(watchdog, query_data_facade, params, *trip_plugin, result);
return RunQuery(watchdog, immutable_data_facade, params, *trip_plugin, result);
}
Status Engine::Match(const api::MatchParameters &params, util::json::Object &result) const
{
return RunQuery(watchdog, query_data_facade, params, *match_plugin, result);
return RunQuery(watchdog, immutable_data_facade, params, *match_plugin, result);
}
Status Engine::Tile(const api::TileParameters &params, std::string &result) const
{
return RunQuery(watchdog, query_data_facade, params, *tile_plugin, result);
return RunQuery(watchdog, immutable_data_facade, params, *tile_plugin, result);
}
} // engine ns
+46 -86
View File
@@ -47,35 +47,6 @@ namespace osrm
namespace storage
{
// delete a shared memory region. report warning if it could not be deleted
void deleteRegion(const SharedDataType region)
{
if (SharedMemory::RegionExists(region) && !SharedMemory::Remove(region))
{
const std::string name = [&] {
switch (region)
{
case CURRENT_REGIONS:
return "CURRENT_REGIONS";
case LAYOUT_1:
return "LAYOUT_1";
case DATA_1:
return "DATA_1";
case LAYOUT_2:
return "LAYOUT_2";
case DATA_2:
return "DATA_2";
case LAYOUT_NONE:
return "LAYOUT_NONE";
default: // DATA_NONE:
return "DATA_NONE";
}
}();
util::SimpleLogger().Write(logWARNING) << "could not delete shared memory region " << name;
}
}
using RTreeLeaf = engine::datafacade::BaseDataFacade::RTreeLeaf;
using RTreeNode =
util::StaticRTree<RTreeLeaf, util::ShM<util::Coordinate, true>::vector, true>::TreeNode;
@@ -83,11 +54,33 @@ using QueryGraph = util::StaticGraph<contractor::QueryEdge::EdgeData>;
Storage::Storage(StorageConfig config_) : config(std::move(config_)) {}
bool regionsAvailable(SharedDataType layout, SharedDataType data)
struct RegionsLayout
{
auto shared_regions = makeSharedMemoryView(CURRENT_REGIONS);
const auto shared_timestamp = static_cast<const SharedDataTimestamp *>(shared_regions->Ptr());
return shared_timestamp->layout != layout && shared_timestamp->data != data;
SharedDataType current_layout_region;
SharedDataType current_data_region;
boost::interprocess::named_sharable_mutex &current_regions_mutex;
SharedDataType old_layout_region;
SharedDataType old_data_region;
boost::interprocess::named_sharable_mutex &old_regions_mutex;
};
RegionsLayout getRegionsLayout(SharedBarriers &barriers)
{
if (SharedMemory::RegionExists(CURRENT_REGIONS))
{
auto shared_regions = makeSharedMemory(CURRENT_REGIONS);
const auto shared_timestamp = static_cast<const SharedDataTimestamp *>(shared_regions->Ptr());
if (shared_timestamp->data == DATA_1)
{
BOOST_ASSERT(shared_timestamp->layout == LAYOUT_1);
return RegionsLayout {LAYOUT_1, DATA_1, barriers.regions_1_mutex, LAYOUT_2, DATA_2, barriers.regions_2_mutex};
}
BOOST_ASSERT(shared_timestamp->data == DATA_2);
BOOST_ASSERT(shared_timestamp->layout == LAYOUT_2);
}
return RegionsLayout {LAYOUT_2, DATA_2, barriers.regions_2_mutex, LAYOUT_1, DATA_1, barriers.regions_1_mutex};
}
int Storage::Run()
@@ -124,36 +117,27 @@ int Storage::Run()
}
#endif
const auto regions_1_available = regionsAvailable(LAYOUT_1, DATA_1);
const auto regions_2_available = regionsAvailable(LAYOUT_2, DATA_2);
auto regions_layout = getRegionsLayout(barriers);
const SharedDataType layout_region = regions_layout.old_layout_region;
const SharedDataType data_region = regions_layout.old_data_region;
const SharedDataType layout_region = [&] {
if (regions_1_available)
{
return LAYOUT_1;
}
if (regions_2_available)
{
return LAYOUT_2;
}
throw util::exception("No shared memory region free!");
}();
const SharedDataType data_region = [&] {
if (regions_1_available)
{
return DATA_1;
}
if (regions_2_available)
{
return DATA_2;
}
throw util::exception("No shared memory region free!");
}();
util::SimpleLogger().Write() << "Waiting for all queries on the old dataset to finish:";
boost::interprocess::scoped_lock<boost::interprocess::named_sharable_mutex>
layout_lock(regions_layout.old_regions_mutex);
util::SimpleLogger().Write() << "Ok.";
BOOST_ASSERT(regions_1_available || regions_2_available);
// since we can't change the size of a shared memory regions we delete and reallocate
if (SharedMemory::RegionExists(layout_region) && !SharedMemory::Remove(layout_region))
{
throw util::exception("Could not remove " + regionToString(layout_region));
}
if (SharedMemory::RegionExists(data_region) && !SharedMemory::Remove(data_region))
{
throw util::exception("Could not remove " + regionToString(data_region));
}
// Allocate a memory layout in shared memory, deallocate previous
auto layout_memory = makeSharedMemory(layout_region, sizeof(SharedDataLayout));
// Allocate a memory layout in shared memory
auto layout_memory = makeSharedMemory(layout_region, sizeof(SharedDataLayout), true);
auto shared_layout_ptr = new (layout_memory->Ptr()) SharedDataLayout();
auto absolute_file_index_path = boost::filesystem::absolute(config.file_index_path);
@@ -440,7 +424,7 @@ int Storage::Run()
// allocate shared memory block
util::SimpleLogger().Write() << "allocating shared memory of "
<< shared_layout_ptr->GetSizeOfLayout() << " bytes";
auto shared_memory = makeSharedMemory(data_region, shared_layout_ptr->GetSizeOfLayout());
auto shared_memory = makeSharedMemory(data_region, shared_layout_ptr->GetSizeOfLayout(), true);
char *shared_memory_ptr = static_cast<char *>(shared_memory->Ptr());
// read actual data into shared memory object //
@@ -766,9 +750,8 @@ int Storage::Run()
std::copy(entry_class_table.begin(), entry_class_table.end(), entry_class_ptr);
}
// acquire lock
auto data_type_memory =
makeSharedMemory(CURRENT_REGIONS, sizeof(SharedDataTimestamp), true, false, false);
makeSharedMemory(CURRENT_REGIONS, sizeof(SharedDataTimestamp), true);
SharedDataTimestamp *data_timestamp_ptr =
static_cast<SharedDataTimestamp *>(data_type_memory->Ptr());
@@ -779,29 +762,6 @@ int Storage::Run()
data_timestamp_ptr->layout = layout_region;
data_timestamp_ptr->data = data_region;
data_timestamp_ptr->timestamp += 1;
boost::interprocess::upgradable_lock<boost::interprocess::named_upgradable_mutex>
current_regions_upgradable_lock(std::move(current_regions_exclusive_lock));
util::SimpleLogger().Write(logDEBUG) << "waiting for server to switch dataset and request to finish";
if (!regions_1_available)
{
BOOST_ASSERT(regions_2_available);
boost::interprocess::scoped_lock<boost::interprocess::named_sharable_mutex>
regions_1_lock(barriers.regions_1_mutex);
util::SimpleLogger().Write(logDEBUG) << "switched. removing old regions 1";
deleteRegion(DATA_1);
deleteRegion(LAYOUT_1);
}
else if (!regions_2_available)
{
BOOST_ASSERT(regions_1_available);
boost::interprocess::scoped_lock<boost::interprocess::named_sharable_mutex>
regions_2_lock(barriers.regions_2_mutex);
util::SimpleLogger().Write(logDEBUG) << "switched. removing regions 2";
deleteRegion(DATA_2);
deleteRegion(LAYOUT_2);
}
}
util::SimpleLogger().Write() << "all data loaded";