diff --git a/include/engine/data_watchdog.hpp b/include/engine/data_watchdog.hpp index 8871d4e71..097687e82 100644 --- a/include/engine/data_watchdog.hpp +++ b/include/engine/data_watchdog.hpp @@ -3,9 +3,9 @@ #include "engine/datafacade/shared_datafacade.hpp" +#include "storage/shared_barriers.hpp" #include "storage/shared_datatype.hpp" #include "storage/shared_memory.hpp" -#include "storage/shared_barriers.hpp" #include #include @@ -102,8 +102,10 @@ class DataWatchdog boost::upgrade_to_unique_lock unique_facade_lock(facade_lock); current_timestamp = *shared_timestamp; - facade = std::make_shared( - current_timestamp.layout, current_timestamp.data, current_timestamp.timestamp); + facade = std::make_shared(shared_barriers, + current_timestamp.layout, + current_timestamp.data, + current_timestamp.timestamp); return get_locked_facade(); } diff --git a/include/engine/datafacade/shared_datafacade.hpp b/include/engine/datafacade/shared_datafacade.hpp index a85ff9055..12fbfbf73 100644 --- a/include/engine/datafacade/shared_datafacade.hpp +++ b/include/engine/datafacade/shared_datafacade.hpp @@ -5,6 +5,7 @@ #include "storage/shared_datatype.hpp" #include "storage/shared_memory.hpp" +#include "storage/shared_barriers.hpp" #include "engine/datafacade/datafacade_base.hpp" #include "extractor/compressed_edge_container.hpp" @@ -65,6 +66,7 @@ class SharedDataFacade final : public BaseDataFacade storage::SharedDataLayout *data_layout; char *shared_memory; + std::shared_ptr shared_barriers; storage::SharedDataType layout_region; storage::SharedDataType data_region; unsigned shared_timestamp; @@ -379,13 +381,44 @@ class SharedDataFacade final : public BaseDataFacade } public: - virtual ~SharedDataFacade() {} - SharedDataFacade(storage::SharedDataType layout_region_, + // this function handle the deallocation of the shared memory it we can prove it will not be used anymore + virtual ~SharedDataFacade() + { + boost::interprocess::scoped_lock + exclusive_lock(data_region == storage::DATA_1 ? shared_barriers->regions_1_mutex + : shared_barriers->regions_2_mutex, + boost::interprocess::defer_lock); + + // if this returns false this is still in use + if (exclusive_lock.try_lock()) + { + // Now check if this is still the newest dataset + const boost::interprocess::sharable_lock lock( + shared_barriers->current_regions_mutex); + + auto shared_regions = storage::makeSharedMemory(storage::CURRENT_REGIONS); + const auto current_timestamp = + static_cast(shared_regions->Ptr()); + + if (current_timestamp->timestamp == shared_timestamp) + { + util::SimpleLogger().Write(logDEBUG) << "Retaining data with shared timestamp " << shared_timestamp; + } + else + { + storage::SharedMemory::Remove(data_region); + storage::SharedMemory::Remove(layout_region); + } + } + } + + SharedDataFacade(const std::shared_ptr &shared_barriers_, + storage::SharedDataType layout_region_, storage::SharedDataType data_region_, unsigned shared_timestamp_) - : layout_region(layout_region_), data_region(data_region_), - shared_timestamp(shared_timestamp_) + : shared_barriers(shared_barriers_), layout_region(layout_region_), + data_region(data_region_), shared_timestamp(shared_timestamp_) { util::SimpleLogger().Write(logDEBUG) << "Loading new data with shared timestamp " << shared_timestamp;