Join LAYOUT and DATA shared memory blocks into REGION
This commit is contained in:
parent
7b11cd3a11
commit
b376c97db8
@ -32,15 +32,15 @@ class DataWatchdog
|
|||||||
public:
|
public:
|
||||||
DataWatchdog()
|
DataWatchdog()
|
||||||
: shared_barriers{std::make_shared<storage::SharedBarriers>()},
|
: shared_barriers{std::make_shared<storage::SharedBarriers>()},
|
||||||
shared_regions(storage::makeSharedMemory(storage::CURRENT_REGIONS)),
|
shared_regions(storage::makeSharedMemory(storage::CURRENT_REGION)),
|
||||||
current_timestamp{storage::LAYOUT_NONE, storage::DATA_NONE, 0}
|
current_timestamp{storage::REGION_NONE, 0}
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tries to connect to the shared memory containing the regions table
|
// Tries to connect to the shared memory containing the regions table
|
||||||
static bool TryConnect()
|
static bool TryConnect()
|
||||||
{
|
{
|
||||||
return storage::SharedMemory::RegionExists(storage::CURRENT_REGIONS);
|
return storage::SharedMemory::RegionExists(storage::CURRENT_REGION);
|
||||||
}
|
}
|
||||||
|
|
||||||
using RegionsLock =
|
using RegionsLock =
|
||||||
@ -52,22 +52,20 @@ class DataWatchdog
|
|||||||
LockAndFacade GetDataFacade()
|
LockAndFacade GetDataFacade()
|
||||||
{
|
{
|
||||||
const boost::interprocess::sharable_lock<boost::interprocess::named_upgradable_mutex> lock(
|
const boost::interprocess::sharable_lock<boost::interprocess::named_upgradable_mutex> lock(
|
||||||
shared_barriers->current_regions_mutex);
|
shared_barriers->current_region_mutex);
|
||||||
|
|
||||||
const auto shared_timestamp =
|
const auto shared_timestamp =
|
||||||
static_cast<const storage::SharedDataTimestamp *>(shared_regions->Ptr());
|
static_cast<const storage::SharedDataTimestamp *>(shared_regions->Ptr());
|
||||||
|
|
||||||
const auto get_locked_facade = [this, shared_timestamp]() {
|
const auto get_locked_facade = [this, shared_timestamp]() {
|
||||||
if (current_timestamp.data == storage::DATA_1)
|
if (current_timestamp.region == storage::REGION_1)
|
||||||
{
|
{
|
||||||
BOOST_ASSERT(current_timestamp.layout == storage::LAYOUT_1);
|
return std::make_pair(RegionsLock(shared_barriers->region_1_mutex), facade);
|
||||||
return std::make_pair(RegionsLock(shared_barriers->regions_1_mutex), facade);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
BOOST_ASSERT(current_timestamp.layout == storage::LAYOUT_2);
|
BOOST_ASSERT(current_timestamp.region == storage::REGION_2);
|
||||||
BOOST_ASSERT(current_timestamp.data == storage::DATA_2);
|
return std::make_pair(RegionsLock(shared_barriers->region_2_mutex), facade);
|
||||||
return std::make_pair(RegionsLock(shared_barriers->regions_2_mutex), facade);
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -78,8 +76,7 @@ class DataWatchdog
|
|||||||
|
|
||||||
if (shared_timestamp->timestamp == current_timestamp.timestamp)
|
if (shared_timestamp->timestamp == current_timestamp.timestamp)
|
||||||
{
|
{
|
||||||
BOOST_ASSERT(shared_timestamp->layout == current_timestamp.layout);
|
BOOST_ASSERT(shared_timestamp->region == current_timestamp.region);
|
||||||
BOOST_ASSERT(shared_timestamp->data == current_timestamp.data);
|
|
||||||
return get_locked_facade();
|
return get_locked_facade();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -89,11 +86,10 @@ class DataWatchdog
|
|||||||
boost::upgrade_lock<boost::shared_mutex> facade_lock(facade_mutex);
|
boost::upgrade_lock<boost::shared_mutex> facade_lock(facade_mutex);
|
||||||
|
|
||||||
// we might get overtaken before we actually do the writing
|
// we might get overtaken before we actually do the writing
|
||||||
// in that case we don't modify anthing
|
// in that case we don't modify anything
|
||||||
if (shared_timestamp->timestamp == current_timestamp.timestamp)
|
if (shared_timestamp->timestamp == current_timestamp.timestamp)
|
||||||
{
|
{
|
||||||
BOOST_ASSERT(shared_timestamp->layout == current_timestamp.layout);
|
BOOST_ASSERT(shared_timestamp->region == current_timestamp.region);
|
||||||
BOOST_ASSERT(shared_timestamp->data == current_timestamp.data);
|
|
||||||
|
|
||||||
return get_locked_facade();
|
return get_locked_facade();
|
||||||
}
|
}
|
||||||
@ -103,8 +99,7 @@ class DataWatchdog
|
|||||||
|
|
||||||
current_timestamp = *shared_timestamp;
|
current_timestamp = *shared_timestamp;
|
||||||
facade = std::make_shared<datafacade::SharedMemoryDataFacade>(shared_barriers,
|
facade = std::make_shared<datafacade::SharedMemoryDataFacade>(shared_barriers,
|
||||||
current_timestamp.layout,
|
current_timestamp.region,
|
||||||
current_timestamp.data,
|
|
||||||
current_timestamp.timestamp);
|
current_timestamp.timestamp);
|
||||||
|
|
||||||
return get_locked_facade();
|
return get_locked_facade();
|
||||||
|
@ -24,10 +24,8 @@ class SharedMemoryDataFacade : public ContiguousInternalMemoryDataFacadeBase
|
|||||||
{
|
{
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
std::unique_ptr<storage::SharedMemory> m_layout_memory;
|
|
||||||
std::unique_ptr<storage::SharedMemory> m_large_memory;
|
std::unique_ptr<storage::SharedMemory> m_large_memory;
|
||||||
std::shared_ptr<storage::SharedBarriers> shared_barriers;
|
std::shared_ptr<storage::SharedBarriers> shared_barriers;
|
||||||
storage::SharedDataType layout_region;
|
|
||||||
storage::SharedDataType data_region;
|
storage::SharedDataType data_region;
|
||||||
unsigned shared_timestamp;
|
unsigned shared_timestamp;
|
||||||
|
|
||||||
@ -40,12 +38,12 @@ class SharedMemoryDataFacade : public ContiguousInternalMemoryDataFacadeBase
|
|||||||
{
|
{
|
||||||
// Now check if this is still the newest dataset
|
// Now check if this is still the newest dataset
|
||||||
boost::interprocess::sharable_lock<boost::interprocess::named_upgradable_mutex>
|
boost::interprocess::sharable_lock<boost::interprocess::named_upgradable_mutex>
|
||||||
current_regions_lock(shared_barriers->current_regions_mutex,
|
current_regions_lock(shared_barriers->current_region_mutex,
|
||||||
boost::interprocess::defer_lock);
|
boost::interprocess::defer_lock);
|
||||||
|
|
||||||
boost::interprocess::scoped_lock<boost::interprocess::named_sharable_mutex> exclusive_lock(
|
boost::interprocess::scoped_lock<boost::interprocess::named_sharable_mutex> exclusive_lock(
|
||||||
data_region == storage::DATA_1 ? shared_barriers->regions_1_mutex
|
data_region == storage::REGION_1 ? shared_barriers->region_1_mutex
|
||||||
: shared_barriers->regions_2_mutex,
|
: shared_barriers->region_2_mutex,
|
||||||
boost::interprocess::defer_lock);
|
boost::interprocess::defer_lock);
|
||||||
|
|
||||||
// if this returns false this is still in use
|
// if this returns false this is still in use
|
||||||
@ -53,45 +51,36 @@ class SharedMemoryDataFacade : public ContiguousInternalMemoryDataFacadeBase
|
|||||||
{
|
{
|
||||||
if (storage::SharedMemory::RegionExists(data_region))
|
if (storage::SharedMemory::RegionExists(data_region))
|
||||||
{
|
{
|
||||||
BOOST_ASSERT(storage::SharedMemory::RegionExists(layout_region));
|
auto shared_region = storage::makeSharedMemory(storage::CURRENT_REGION);
|
||||||
|
|
||||||
auto shared_regions = storage::makeSharedMemory(storage::CURRENT_REGIONS);
|
|
||||||
const auto current_timestamp =
|
const auto current_timestamp =
|
||||||
static_cast<const storage::SharedDataTimestamp *>(shared_regions->Ptr());
|
static_cast<const storage::SharedDataTimestamp *>(shared_region->Ptr());
|
||||||
|
|
||||||
// check if the memory region referenced by this facade needs cleanup
|
// check if the memory region referenced by this facade needs cleanup
|
||||||
if (current_timestamp->data == data_region)
|
if (current_timestamp->region == data_region)
|
||||||
{
|
{
|
||||||
BOOST_ASSERT(current_timestamp->layout == layout_region);
|
|
||||||
util::Log(logDEBUG) << "Retaining data with shared timestamp "
|
util::Log(logDEBUG) << "Retaining data with shared timestamp "
|
||||||
<< shared_timestamp;
|
<< shared_timestamp;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
storage::SharedMemory::Remove(data_region);
|
storage::SharedMemory::Remove(data_region);
|
||||||
storage::SharedMemory::Remove(layout_region);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SharedMemoryDataFacade(const std::shared_ptr<storage::SharedBarriers> &shared_barriers_,
|
SharedMemoryDataFacade(const std::shared_ptr<storage::SharedBarriers> &shared_barriers_,
|
||||||
storage::SharedDataType layout_region_,
|
|
||||||
storage::SharedDataType data_region_,
|
storage::SharedDataType data_region_,
|
||||||
unsigned shared_timestamp_)
|
unsigned shared_timestamp_)
|
||||||
: shared_barriers(shared_barriers_), layout_region(layout_region_),
|
: shared_barriers(shared_barriers_), data_region(data_region_), shared_timestamp(shared_timestamp_)
|
||||||
data_region(data_region_), shared_timestamp(shared_timestamp_)
|
|
||||||
{
|
{
|
||||||
util::Log(logDEBUG) << "Loading new data with shared timestamp " << shared_timestamp;
|
util::Log(logDEBUG) << "Loading new data with shared timestamp " << shared_timestamp;
|
||||||
|
|
||||||
BOOST_ASSERT(storage::SharedMemory::RegionExists(layout_region));
|
|
||||||
m_layout_memory = storage::makeSharedMemory(layout_region);
|
|
||||||
|
|
||||||
BOOST_ASSERT(storage::SharedMemory::RegionExists(data_region));
|
BOOST_ASSERT(storage::SharedMemory::RegionExists(data_region));
|
||||||
m_large_memory = storage::makeSharedMemory(data_region);
|
m_large_memory = storage::makeSharedMemory(data_region);
|
||||||
|
|
||||||
InitializeInternalPointers(*reinterpret_cast<storage::DataLayout *>(m_layout_memory->Ptr()),
|
InitializeInternalPointers(*reinterpret_cast<storage::DataLayout *>(m_large_memory->Ptr()),
|
||||||
reinterpret_cast<char *>(m_large_memory->Ptr()));
|
reinterpret_cast<char *>(m_large_memory->Ptr()) + sizeof(storage::DataLayout));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -13,22 +13,22 @@ struct SharedBarriers
|
|||||||
{
|
{
|
||||||
|
|
||||||
SharedBarriers()
|
SharedBarriers()
|
||||||
: current_regions_mutex(boost::interprocess::open_or_create, "current_regions"),
|
: current_region_mutex(boost::interprocess::open_or_create, "current_region"),
|
||||||
regions_1_mutex(boost::interprocess::open_or_create, "regions_1"),
|
region_1_mutex(boost::interprocess::open_or_create, "region_1"),
|
||||||
regions_2_mutex(boost::interprocess::open_or_create, "regions_2")
|
region_2_mutex(boost::interprocess::open_or_create, "region_2")
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
static void resetCurrentRegions()
|
static void resetCurrentRegion()
|
||||||
{
|
{
|
||||||
boost::interprocess::named_sharable_mutex::remove("current_regions");
|
boost::interprocess::named_sharable_mutex::remove("current_region");
|
||||||
}
|
}
|
||||||
static void resetRegions1() { boost::interprocess::named_sharable_mutex::remove("regions_1"); }
|
static void resetRegion1() { boost::interprocess::named_sharable_mutex::remove("region_1"); }
|
||||||
static void resetRegions2() { boost::interprocess::named_sharable_mutex::remove("regions_2"); }
|
static void resetRegion2() { boost::interprocess::named_sharable_mutex::remove("region_2"); }
|
||||||
|
|
||||||
boost::interprocess::named_upgradable_mutex current_regions_mutex;
|
boost::interprocess::named_upgradable_mutex current_region_mutex;
|
||||||
boost::interprocess::named_sharable_mutex regions_1_mutex;
|
boost::interprocess::named_sharable_mutex region_1_mutex;
|
||||||
boost::interprocess::named_sharable_mutex regions_2_mutex;
|
boost::interprocess::named_sharable_mutex region_2_mutex;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -192,19 +192,15 @@ struct DataLayout
|
|||||||
|
|
||||||
enum SharedDataType
|
enum SharedDataType
|
||||||
{
|
{
|
||||||
CURRENT_REGIONS,
|
CURRENT_REGION,
|
||||||
LAYOUT_1,
|
REGION_1,
|
||||||
DATA_1,
|
REGION_2,
|
||||||
LAYOUT_2,
|
REGION_NONE
|
||||||
DATA_2,
|
|
||||||
LAYOUT_NONE,
|
|
||||||
DATA_NONE
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SharedDataTimestamp
|
struct SharedDataTimestamp
|
||||||
{
|
{
|
||||||
SharedDataType layout;
|
SharedDataType region;
|
||||||
SharedDataType data;
|
|
||||||
unsigned timestamp;
|
unsigned timestamp;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -212,20 +208,14 @@ inline std::string regionToString(const SharedDataType region)
|
|||||||
{
|
{
|
||||||
switch (region)
|
switch (region)
|
||||||
{
|
{
|
||||||
case CURRENT_REGIONS:
|
case CURRENT_REGION:
|
||||||
return "CURRENT_REGIONS";
|
return "CURRENT_REGION";
|
||||||
case LAYOUT_1:
|
case REGION_1:
|
||||||
return "LAYOUT_1";
|
return "REGION_1";
|
||||||
case DATA_1:
|
case REGION_2:
|
||||||
return "DATA_1";
|
return "REGION_2";
|
||||||
case LAYOUT_2:
|
case REGION_NONE:
|
||||||
return "LAYOUT_2";
|
return "REGION_NONE";
|
||||||
case DATA_2:
|
|
||||||
return "DATA_2";
|
|
||||||
case LAYOUT_NONE:
|
|
||||||
return "LAYOUT_NONE";
|
|
||||||
case DATA_NONE:
|
|
||||||
return "DATA_NONE";
|
|
||||||
default:
|
default:
|
||||||
return "INVALID_REGION";
|
return "INVALID_REGION";
|
||||||
}
|
}
|
||||||
|
@ -58,38 +58,31 @@ Storage::Storage(StorageConfig config_) : config(std::move(config_)) {}
|
|||||||
|
|
||||||
struct RegionsLayout
|
struct RegionsLayout
|
||||||
{
|
{
|
||||||
SharedDataType current_layout_region;
|
|
||||||
SharedDataType current_data_region;
|
SharedDataType current_data_region;
|
||||||
boost::interprocess::named_sharable_mutex ¤t_regions_mutex;
|
boost::interprocess::named_sharable_mutex ¤t_region_mutex;
|
||||||
SharedDataType old_layout_region;
|
|
||||||
SharedDataType old_data_region;
|
SharedDataType old_data_region;
|
||||||
boost::interprocess::named_sharable_mutex &old_regions_mutex;
|
boost::interprocess::named_sharable_mutex &old_region_mutex;
|
||||||
};
|
};
|
||||||
|
|
||||||
RegionsLayout getRegionsLayout(SharedBarriers &barriers)
|
RegionsLayout getRegionsLayout(SharedBarriers &barriers)
|
||||||
{
|
{
|
||||||
if (SharedMemory::RegionExists(CURRENT_REGIONS))
|
if (SharedMemory::RegionExists(CURRENT_REGION))
|
||||||
{
|
{
|
||||||
auto shared_regions = makeSharedMemory(CURRENT_REGIONS);
|
auto shared_region = makeSharedMemory(CURRENT_REGION);
|
||||||
const auto shared_timestamp =
|
const auto shared_timestamp =
|
||||||
static_cast<const SharedDataTimestamp *>(shared_regions->Ptr());
|
static_cast<const SharedDataTimestamp *>(shared_region->Ptr());
|
||||||
if (shared_timestamp->data == DATA_1)
|
if (shared_timestamp->region == REGION_1)
|
||||||
{
|
{
|
||||||
BOOST_ASSERT(shared_timestamp->layout == LAYOUT_1);
|
return RegionsLayout{REGION_1,
|
||||||
return RegionsLayout{LAYOUT_1,
|
barriers.region_1_mutex,
|
||||||
DATA_1,
|
REGION_2,
|
||||||
barriers.regions_1_mutex,
|
barriers.region_2_mutex};
|
||||||
LAYOUT_2,
|
|
||||||
DATA_2,
|
|
||||||
barriers.regions_2_mutex};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
BOOST_ASSERT(shared_timestamp->data == DATA_2);
|
BOOST_ASSERT(shared_timestamp->region == REGION_2);
|
||||||
BOOST_ASSERT(shared_timestamp->layout == LAYOUT_2);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return RegionsLayout{
|
return RegionsLayout{REGION_2, barriers.region_2_mutex, REGION_1, barriers.region_1_mutex};
|
||||||
LAYOUT_2, DATA_2, barriers.regions_2_mutex, LAYOUT_1, DATA_1, barriers.regions_1_mutex};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Storage::ReturnCode Storage::Run(int max_wait)
|
Storage::ReturnCode Storage::Run(int max_wait)
|
||||||
@ -101,10 +94,10 @@ Storage::ReturnCode Storage::Run(int max_wait)
|
|||||||
SharedBarriers barriers;
|
SharedBarriers barriers;
|
||||||
|
|
||||||
boost::interprocess::upgradable_lock<boost::interprocess::named_upgradable_mutex>
|
boost::interprocess::upgradable_lock<boost::interprocess::named_upgradable_mutex>
|
||||||
current_regions_lock(barriers.current_regions_mutex, boost::interprocess::defer_lock);
|
current_region_lock(barriers.current_region_mutex, boost::interprocess::defer_lock);
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (!current_regions_lock.try_lock())
|
if (!current_region_lock.try_lock())
|
||||||
{
|
{
|
||||||
util::Log(logWARNING) << "A data update is in progress";
|
util::Log(logWARNING) << "A data update is in progress";
|
||||||
return ReturnCode::Error;
|
return ReturnCode::Error;
|
||||||
@ -113,7 +106,7 @@ Storage::ReturnCode Storage::Run(int max_wait)
|
|||||||
// hard unlock in case of any exception.
|
// hard unlock in case of any exception.
|
||||||
catch (boost::interprocess::lock_exception &ex)
|
catch (boost::interprocess::lock_exception &ex)
|
||||||
{
|
{
|
||||||
barriers.current_regions_mutex.unlock_upgradable();
|
barriers.current_region_mutex.unlock_upgradable();
|
||||||
// make sure we exit here because this is bad
|
// make sure we exit here because this is bad
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
@ -128,7 +121,6 @@ Storage::ReturnCode Storage::Run(int max_wait)
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
auto regions_layout = getRegionsLayout(barriers);
|
auto regions_layout = getRegionsLayout(barriers);
|
||||||
const SharedDataType layout_region = regions_layout.old_layout_region;
|
|
||||||
const SharedDataType data_region = regions_layout.old_data_region;
|
const SharedDataType data_region = regions_layout.old_data_region;
|
||||||
|
|
||||||
if (max_wait > 0)
|
if (max_wait > 0)
|
||||||
@ -142,7 +134,7 @@ Storage::ReturnCode Storage::Run(int max_wait)
|
|||||||
}
|
}
|
||||||
|
|
||||||
boost::interprocess::scoped_lock<boost::interprocess::named_sharable_mutex> regions_lock(
|
boost::interprocess::scoped_lock<boost::interprocess::named_sharable_mutex> regions_lock(
|
||||||
regions_layout.old_regions_mutex, boost::interprocess::defer_lock);
|
regions_layout.old_region_mutex, boost::interprocess::defer_lock);
|
||||||
|
|
||||||
if (max_wait > 0)
|
if (max_wait > 0)
|
||||||
{
|
{
|
||||||
@ -152,16 +144,14 @@ Storage::ReturnCode Storage::Run(int max_wait)
|
|||||||
util::Log(logWARNING) << "Queries did not finish in " << max_wait
|
util::Log(logWARNING) << "Queries did not finish in " << max_wait
|
||||||
<< " seconds. Claiming the lock by force.";
|
<< " seconds. Claiming the lock by force.";
|
||||||
// WARNING: if queries are still using the old dataset they might crash
|
// WARNING: if queries are still using the old dataset they might crash
|
||||||
if (regions_layout.old_layout_region == LAYOUT_1)
|
if (regions_layout.old_data_region == REGION_1)
|
||||||
{
|
{
|
||||||
BOOST_ASSERT(regions_layout.old_data_region == DATA_1);
|
barriers.resetRegion1();
|
||||||
barriers.resetRegions1();
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
BOOST_ASSERT(regions_layout.old_layout_region == LAYOUT_2);
|
BOOST_ASSERT(regions_layout.old_data_region == REGION_2);
|
||||||
BOOST_ASSERT(regions_layout.old_data_region == DATA_2);
|
barriers.resetRegion2();
|
||||||
barriers.resetRegions2();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return ReturnCode::Retry;
|
return ReturnCode::Retry;
|
||||||
@ -174,70 +164,64 @@ Storage::ReturnCode Storage::Run(int max_wait)
|
|||||||
util::Log() << "Ok.";
|
util::Log() << "Ok.";
|
||||||
|
|
||||||
// since we can't change the size of a shared memory regions we delete and reallocate
|
// since we can't change the size of a shared memory regions we delete and reallocate
|
||||||
if (SharedMemory::RegionExists(layout_region) && !SharedMemory::Remove(layout_region))
|
|
||||||
{
|
|
||||||
throw util::exception("Could not remove shared memory region " +
|
|
||||||
regionToString(layout_region) + SOURCE_REF);
|
|
||||||
}
|
|
||||||
if (SharedMemory::RegionExists(data_region) && !SharedMemory::Remove(data_region))
|
if (SharedMemory::RegionExists(data_region) && !SharedMemory::Remove(data_region))
|
||||||
{
|
{
|
||||||
throw util::exception("Could not remove shared memory region " +
|
throw util::exception("Could not remove shared memory region " +
|
||||||
regionToString(data_region) + SOURCE_REF);
|
regionToString(data_region) + SOURCE_REF);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Allocate a memory layout in shared memory
|
// Populate a memory layout into stack memory
|
||||||
auto layout_memory = makeSharedMemory(layout_region, sizeof(DataLayout), true);
|
DataLayout layout;
|
||||||
auto shared_layout_ptr = new (layout_memory->Ptr()) DataLayout();
|
PopulateLayout(layout);
|
||||||
|
|
||||||
PopulateLayout(*shared_layout_ptr);
|
// Allocate shared memory block
|
||||||
|
auto regions_size = sizeof(layout) + layout.GetSizeOfLayout();
|
||||||
|
util::Log() << "allocating shared memory of " << regions_size << " bytes";
|
||||||
|
auto shared_memory = makeSharedMemory(data_region, regions_size, true);
|
||||||
|
|
||||||
// allocate shared memory block
|
// Copy memory layout to shared memory and populate data
|
||||||
util::Log() << "allocating shared memory of " << shared_layout_ptr->GetSizeOfLayout()
|
|
||||||
<< " bytes";
|
|
||||||
auto shared_memory = makeSharedMemory(data_region, shared_layout_ptr->GetSizeOfLayout(), true);
|
|
||||||
char *shared_memory_ptr = static_cast<char *>(shared_memory->Ptr());
|
char *shared_memory_ptr = static_cast<char *>(shared_memory->Ptr());
|
||||||
|
memcpy(shared_memory_ptr, &layout, sizeof(layout));
|
||||||
|
PopulateData(layout, shared_memory_ptr + sizeof(layout));
|
||||||
|
|
||||||
PopulateData(*shared_layout_ptr, shared_memory_ptr);
|
auto data_type_memory = makeSharedMemory(CURRENT_REGION, sizeof(SharedDataTimestamp), true);
|
||||||
|
|
||||||
auto data_type_memory = makeSharedMemory(CURRENT_REGIONS, sizeof(SharedDataTimestamp), true);
|
|
||||||
SharedDataTimestamp *data_timestamp_ptr =
|
SharedDataTimestamp *data_timestamp_ptr =
|
||||||
static_cast<SharedDataTimestamp *>(data_type_memory->Ptr());
|
static_cast<SharedDataTimestamp *>(data_type_memory->Ptr());
|
||||||
|
|
||||||
{
|
{
|
||||||
|
|
||||||
boost::interprocess::scoped_lock<boost::interprocess::named_upgradable_mutex>
|
boost::interprocess::scoped_lock<boost::interprocess::named_upgradable_mutex>
|
||||||
current_regions_exclusive_lock;
|
current_region_exclusive_lock;
|
||||||
|
|
||||||
if (max_wait > 0)
|
if (max_wait > 0)
|
||||||
{
|
{
|
||||||
util::Log() << "Waiting for " << max_wait << " seconds to write new dataset timestamp";
|
util::Log() << "Waiting for " << max_wait << " seconds to write new dataset timestamp";
|
||||||
auto end_time = boost::posix_time::microsec_clock::universal_time() +
|
auto end_time = boost::posix_time::microsec_clock::universal_time() +
|
||||||
boost::posix_time::seconds(max_wait);
|
boost::posix_time::seconds(max_wait);
|
||||||
current_regions_exclusive_lock =
|
current_region_exclusive_lock =
|
||||||
boost::interprocess::scoped_lock<boost::interprocess::named_upgradable_mutex>(
|
boost::interprocess::scoped_lock<boost::interprocess::named_upgradable_mutex>(
|
||||||
std::move(current_regions_lock), end_time);
|
std::move(current_region_lock), end_time);
|
||||||
|
|
||||||
if (!current_regions_exclusive_lock.owns())
|
if (!current_region_exclusive_lock.owns())
|
||||||
{
|
{
|
||||||
util::Log(logWARNING) << "Aquiring the lock timed out after " << max_wait
|
util::Log(logWARNING) << "Aquiring the lock timed out after " << max_wait
|
||||||
<< " seconds. Claiming the lock by force.";
|
<< " seconds. Claiming the lock by force.";
|
||||||
current_regions_lock.unlock();
|
current_region_lock.unlock();
|
||||||
current_regions_lock.release();
|
current_region_lock.release();
|
||||||
storage::SharedBarriers::resetCurrentRegions();
|
storage::SharedBarriers::resetCurrentRegion();
|
||||||
return ReturnCode::Retry;
|
return ReturnCode::Retry;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
util::Log() << "Waiting to write new dataset timestamp";
|
util::Log() << "Waiting to write new dataset timestamp";
|
||||||
current_regions_exclusive_lock =
|
current_region_exclusive_lock =
|
||||||
boost::interprocess::scoped_lock<boost::interprocess::named_upgradable_mutex>(
|
boost::interprocess::scoped_lock<boost::interprocess::named_upgradable_mutex>(
|
||||||
std::move(current_regions_lock));
|
std::move(current_region_lock));
|
||||||
}
|
}
|
||||||
|
|
||||||
util::Log() << "Ok.";
|
util::Log() << "Ok.";
|
||||||
data_timestamp_ptr->layout = layout_region;
|
data_timestamp_ptr->region = data_region;
|
||||||
data_timestamp_ptr->data = data_region;
|
|
||||||
data_timestamp_ptr->timestamp += 1;
|
data_timestamp_ptr->timestamp += 1;
|
||||||
}
|
}
|
||||||
util::Log() << "All data loaded.";
|
util::Log() << "All data loaded.";
|
||||||
|
@ -19,20 +19,14 @@ void deleteRegion(const SharedDataType region)
|
|||||||
const std::string name = [&] {
|
const std::string name = [&] {
|
||||||
switch (region)
|
switch (region)
|
||||||
{
|
{
|
||||||
case CURRENT_REGIONS:
|
case CURRENT_REGION:
|
||||||
return "CURRENT_REGIONS";
|
return "CURRENT_REGIONS";
|
||||||
case LAYOUT_1:
|
case REGION_1:
|
||||||
return "LAYOUT_1";
|
return "REGION_1";
|
||||||
case DATA_1:
|
case REGION_2:
|
||||||
return "DATA_1";
|
return "REGION_2";
|
||||||
case LAYOUT_2:
|
default: // REGION_NONE:
|
||||||
return "LAYOUT_2";
|
return "REGION_NONE";
|
||||||
case DATA_2:
|
|
||||||
return "DATA_2";
|
|
||||||
case LAYOUT_NONE:
|
|
||||||
return "LAYOUT_NONE";
|
|
||||||
default: // DATA_NONE:
|
|
||||||
return "DATA_NONE";
|
|
||||||
}
|
}
|
||||||
}();
|
}();
|
||||||
|
|
||||||
@ -44,11 +38,9 @@ void deleteRegion(const SharedDataType region)
|
|||||||
void springclean()
|
void springclean()
|
||||||
{
|
{
|
||||||
util::Log() << "spring-cleaning all shared memory regions";
|
util::Log() << "spring-cleaning all shared memory regions";
|
||||||
deleteRegion(DATA_1);
|
deleteRegion(REGION_1);
|
||||||
deleteRegion(LAYOUT_1);
|
deleteRegion(REGION_2);
|
||||||
deleteRegion(DATA_2);
|
deleteRegion(CURRENT_REGION);
|
||||||
deleteRegion(LAYOUT_2);
|
|
||||||
deleteRegion(CURRENT_REGIONS);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -93,7 +93,7 @@ bool generateDataStoreOptions(const int argc,
|
|||||||
|
|
||||||
[[ noreturn ]] void CleanupSharedBarriers(int signum)
|
[[ noreturn ]] void CleanupSharedBarriers(int signum)
|
||||||
{ // Here the lock state of named mutexes is unknown, make a hard cleanup
|
{ // Here the lock state of named mutexes is unknown, make a hard cleanup
|
||||||
osrm::storage::SharedBarriers::resetCurrentRegions();
|
osrm::storage::SharedBarriers::resetCurrentRegion();
|
||||||
std::_Exit(128 + signum);
|
std::_Exit(128 + signum);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -8,9 +8,9 @@ int main()
|
|||||||
osrm::util::LogPolicy::GetInstance().Unmute();
|
osrm::util::LogPolicy::GetInstance().Unmute();
|
||||||
osrm::util::Log() << "Releasing all locks";
|
osrm::util::Log() << "Releasing all locks";
|
||||||
|
|
||||||
osrm::storage::SharedBarriers::resetCurrentRegions();
|
osrm::storage::SharedBarriers::resetCurrentRegion();
|
||||||
osrm::storage::SharedBarriers::resetRegions1();
|
osrm::storage::SharedBarriers::resetRegion1();
|
||||||
osrm::storage::SharedBarriers::resetRegions2();
|
osrm::storage::SharedBarriers::resetRegion2();
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user