Merge commit 'babbda98a6ea1d53a8bc5015ef5dfb313c47186a' into libosmium-2.10.0
This commit is contained in:
+40
-17
@@ -51,7 +51,7 @@ namespace osmium {
|
||||
|
||||
namespace thread {
|
||||
|
||||
static const std::chrono::milliseconds full_queue_sleep_duration { 10 }; // XXX
|
||||
static const std::chrono::milliseconds max_wait{10};
|
||||
|
||||
/**
|
||||
* A thread-safe queue.
|
||||
@@ -70,9 +70,12 @@ namespace osmium {
|
||||
|
||||
std::queue<T> m_queue;
|
||||
|
||||
/// Used to signal readers when data is available in the queue.
|
||||
/// Used to signal consumers when data is available in the queue.
|
||||
std::condition_variable m_data_available;
|
||||
|
||||
/// Used to signal producers when queue is not full.
|
||||
std::condition_variable m_space_available;
|
||||
|
||||
#ifdef OSMIUM_DEBUG_QUEUE_SIZE
|
||||
/// The largest size the queue has been so far.
|
||||
size_t m_largest_size;
|
||||
@@ -109,7 +112,8 @@ namespace osmium {
|
||||
m_name(name),
|
||||
m_mutex(),
|
||||
m_queue(),
|
||||
m_data_available()
|
||||
m_data_available(),
|
||||
m_space_available()
|
||||
#ifdef OSMIUM_DEBUG_QUEUE_SIZE
|
||||
,
|
||||
m_largest_size(0),
|
||||
@@ -123,13 +127,20 @@ namespace osmium {
|
||||
|
||||
~Queue() {
|
||||
#ifdef OSMIUM_DEBUG_QUEUE_SIZE
|
||||
std::cerr << "queue '" << m_name << "' with max_size=" << m_max_size << " had largest size " << m_largest_size << " and was full " << m_full_counter << " times in " << m_push_counter << " push() calls and was empty " << m_empty_counter << " times in " << m_pop_counter << " pop() calls\n";
|
||||
std::cerr << "queue '" << m_name
|
||||
<< "' with max_size=" << m_max_size
|
||||
<< " had largest size " << m_largest_size
|
||||
<< " and was full " << m_full_counter
|
||||
<< " times in " << m_push_counter
|
||||
<< " push() calls and was empty " << m_empty_counter
|
||||
<< " times in " << m_pop_counter
|
||||
<< " pop() calls\n";
|
||||
#endif
|
||||
}
|
||||
|
||||
/**
|
||||
* Push an element onto the queue. If the queue has a max size, this
|
||||
* call will block if the queue is full.
|
||||
* Push an element onto the queue. If the queue has a max size,
|
||||
* this call will block if the queue is full.
|
||||
*/
|
||||
void push(T value) {
|
||||
#ifdef OSMIUM_DEBUG_QUEUE_SIZE
|
||||
@@ -137,13 +148,16 @@ namespace osmium {
|
||||
#endif
|
||||
if (m_max_size) {
|
||||
while (size() >= m_max_size) {
|
||||
std::this_thread::sleep_for(full_queue_sleep_duration);
|
||||
std::unique_lock<std::mutex> lock{m_mutex};
|
||||
m_space_available.wait_for(lock, max_wait, [this] {
|
||||
return m_queue.size() < m_max_size;
|
||||
});
|
||||
#ifdef OSMIUM_DEBUG_QUEUE_SIZE
|
||||
++m_full_counter;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
std::lock_guard<std::mutex> lock(m_mutex);
|
||||
std::lock_guard<std::mutex> lock{m_mutex};
|
||||
m_queue.push(std::move(value));
|
||||
#ifdef OSMIUM_DEBUG_QUEUE_SIZE
|
||||
if (m_largest_size < m_queue.size()) {
|
||||
@@ -157,7 +171,7 @@ namespace osmium {
|
||||
#ifdef OSMIUM_DEBUG_QUEUE_SIZE
|
||||
++m_pop_counter;
|
||||
#endif
|
||||
std::unique_lock<std::mutex> lock(m_mutex);
|
||||
std::unique_lock<std::mutex> lock{m_mutex};
|
||||
#ifdef OSMIUM_DEBUG_QUEUE_SIZE
|
||||
if (m_queue.empty()) {
|
||||
++m_empty_counter;
|
||||
@@ -169,6 +183,10 @@ namespace osmium {
|
||||
if (!m_queue.empty()) {
|
||||
value = std::move(m_queue.front());
|
||||
m_queue.pop();
|
||||
lock.unlock();
|
||||
if (m_max_size) {
|
||||
m_space_available.notify_one();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -176,25 +194,30 @@ namespace osmium {
|
||||
#ifdef OSMIUM_DEBUG_QUEUE_SIZE
|
||||
++m_pop_counter;
|
||||
#endif
|
||||
std::lock_guard<std::mutex> lock(m_mutex);
|
||||
if (m_queue.empty()) {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock{m_mutex};
|
||||
if (m_queue.empty()) {
|
||||
#ifdef OSMIUM_DEBUG_QUEUE_SIZE
|
||||
++m_empty_counter;
|
||||
++m_empty_counter;
|
||||
#endif
|
||||
return false;
|
||||
return false;
|
||||
}
|
||||
value = std::move(m_queue.front());
|
||||
m_queue.pop();
|
||||
}
|
||||
if (m_max_size) {
|
||||
m_space_available.notify_one();
|
||||
}
|
||||
value = std::move(m_queue.front());
|
||||
m_queue.pop();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool empty() const {
|
||||
std::lock_guard<std::mutex> lock(m_mutex);
|
||||
std::lock_guard<std::mutex> lock{m_mutex};
|
||||
return m_queue.empty();
|
||||
}
|
||||
|
||||
size_t size() const {
|
||||
std::lock_guard<std::mutex> lock(m_mutex);
|
||||
std::lock_guard<std::mutex> lock{m_mutex};
|
||||
return m_queue.size();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user