From 8a67f2a23812ebbfe6c455affd538825bae3f874 Mon Sep 17 00:00:00 2001 From: Kim Barrett Date: Fri, 14 Nov 2025 09:41:28 +0000 Subject: [PATCH] remove NonblockingQueue --- .../share/utilities/nonblockingQueue.hpp | 136 --------- .../utilities/nonblockingQueue.inline.hpp | 248 --------------- .../gtest/utilities/test_nonblockingQueue.cpp | 283 ------------------ 3 files changed, 667 deletions(-) delete mode 100644 src/hotspot/share/utilities/nonblockingQueue.hpp delete mode 100644 src/hotspot/share/utilities/nonblockingQueue.inline.hpp delete mode 100644 test/hotspot/gtest/utilities/test_nonblockingQueue.cpp diff --git a/src/hotspot/share/utilities/nonblockingQueue.hpp b/src/hotspot/share/utilities/nonblockingQueue.hpp deleted file mode 100644 index 1b7e4b8bac440..0000000000000 --- a/src/hotspot/share/utilities/nonblockingQueue.hpp +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - * - */ - -#ifndef SHARE_UTILITIES_NONBLOCKINGQUEUE_HPP -#define SHARE_UTILITIES_NONBLOCKINGQUEUE_HPP - -#include "memory/padded.hpp" -#include "utilities/globalDefinitions.hpp" -#include "utilities/pair.hpp" - -// The NonblockingQueue template provides a non-blocking FIFO. -// It has inner padding of one cache line between its two internal pointers. -// -// The queue is internally represented by a linked list of elements, with -// the link to the next element provided by a member of each element. -// Access to this member is provided by the next_ptr function. -// -// The queue has a special pseudo-element that marks the end of the list. -// Each queue has its own unique special element. A pointer to this element -// can be recognized using the is_end() function. Such a pointer must never -// be dereferenced. This end marker is the value of the next member of the -// last element in the queue, and possibly other elements while modifying -// the queue. -// -// A queue may temporarily appear to be empty even though elements have been -// added and not removed. For example, after running the following program, -// the value of r may be null. -// -// thread1: q.push(a); r = q.pop(); -// thread2: q.push(b); -// -// This can occur if the push of b started before the push of a, but didn't -// complete until after the pop. -// -// \tparam T is the class of the elements in the queue. -// -// \tparam next_ptr is a function pointer. Applying this function to -// an object of type T must return a pointer to the list entry member -// of the object associated with the NonblockingQueue type. -template -class NonblockingQueue { - T* volatile _head; - // Padding of one cache line to avoid false sharing. - DEFINE_PAD_MINUS_SIZE(1, DEFAULT_PADDING_SIZE, sizeof(T*)); - T* volatile _tail; - - NONCOPYABLE(NonblockingQueue); - - // Return the entry following node in the list used by the - // specialized NonblockingQueue class. - static inline T* next(const T& node); - - // Set the entry following node to new_next in the list used by the - // specialized NonblockingQueue class. Not thread-safe, as it cannot - // concurrently run with push or try_pop operations that modify this - // node. - static inline void set_next(T& node, T* new_next); - - // A unique pseudo-object pointer associated with this specific queue. - // The resulting pointer must not be dereferenced. - inline T* end_marker() const; - -public: - inline NonblockingQueue(); - inline ~NonblockingQueue() NOT_DEBUG(= default); - - // Return true if the queue is empty. - // Not thread-safe. There must be no concurrent modification while the - // queue is being tested. - inline bool empty() const; - - // Return the number of objects in the queue. - // Not thread-safe. There must be no concurrent modification while the - // length is being determined. - inline size_t length() const; - - // Thread-safe add the object to the end of the queue. - // Subject to ABA behavior; callers must ensure usage is safe. - inline void push(T& node) { append(node, node); } - - // Thread-safe add the objects from first to last to the end of the queue. - // Subject to ABA behavior; callers must ensure usage is safe. - inline void append(T& first, T& last); - - // Thread-safe attempt to remove and return the first object in the queue. - // Returns true if successful. If successful then *node_ptr is the former - // first object, or null if the queue was empty. If unsuccessful, because - // of contention with a concurrent modification, then returns false with - // the value of *node_ptr unspecified. Subject to ABA behavior; callers - // must ensure usage is safe. - inline bool try_pop(T** node_ptr); - - // Thread-safe remove and return the first object in the queue, or null - // if the queue was empty. This just iterates on try_pop() until it - // succeeds, returning the (possibly null) element obtained from that. - // Subject to ABA behavior; callers must ensure usage is safe. - inline T* pop(); - - // Take all the objects from the queue, leaving the queue empty. - // Not thread-safe. There must be no concurrent operations. - // Returns a pair of pointers to the current queue. - inline Pair take_all(); - - // Iteration support is provided by first() and is_end(). The queue must - // not be modified while iterating over its elements. - - // Return the first object in the queue, or an end marker (a pointer p for - // which is_end(p) is true) if the queue is empty. - inline T* first() const; - - // Test whether entry is an end marker for this queue. - inline bool is_end(const T* entry) const; -}; - -#endif // SHARE_UTILITIES_NONBLOCKINGQUEUE_HPP diff --git a/src/hotspot/share/utilities/nonblockingQueue.inline.hpp b/src/hotspot/share/utilities/nonblockingQueue.inline.hpp deleted file mode 100644 index d805eedb7a43e..0000000000000 --- a/src/hotspot/share/utilities/nonblockingQueue.inline.hpp +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Copyright (c) 2021, 2025, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - * - */ - -#ifndef SHARE_UTILITIES_NONBLOCKINGQUEUE_INLINE_HPP -#define SHARE_UTILITIES_NONBLOCKINGQUEUE_INLINE_HPP - -#include "utilities/nonblockingQueue.hpp" - -#include "runtime/atomicAccess.hpp" - -template -T* NonblockingQueue::next(const T& node) { - return AtomicAccess::load(next_ptr(const_cast(node))); -} - -template -void NonblockingQueue::set_next(T& node, T* new_next) { - AtomicAccess::store(next_ptr(node), new_next); -} - -template -NonblockingQueue::NonblockingQueue() : _head(nullptr), _tail(nullptr) {} - -#ifdef ASSERT -template -NonblockingQueue::~NonblockingQueue() { - assert(_head == nullptr, "precondition"); - assert(_tail == nullptr, "precondition"); -} -#endif - -// The end_marker must be uniquely associated with the specific queue, in -// case queue elements can make their way through multiple queues. A -// pointer to the queue itself (after casting) satisfies that requirement. -template -T* NonblockingQueue::end_marker() const { - return const_cast(reinterpret_cast(this)); -} - -template -T* NonblockingQueue::first() const { - T* head = AtomicAccess::load(&_head); - return head == nullptr ? end_marker() : head; -} - -template -bool NonblockingQueue::is_end(const T* entry) const { - return entry == end_marker(); -} - -template -bool NonblockingQueue::empty() const { - return AtomicAccess::load(&_head) == nullptr; -} - -template -size_t NonblockingQueue::length() const { - size_t result = 0; - for (T* cur = first(); !is_end(cur); cur = next(*cur)) { - ++result; - } - return result; -} - -// An append operation atomically exchanges the new tail with the queue tail. -// It then sets the "next" value of the old tail to the head of the list being -// appended. If the old tail is null then the queue was empty, then the -// head of the list being appended is instead stored in the queue head. -// -// This means there is a period between the exchange and the old tail update -// where the queue sequence is split into two parts, the list from the queue -// head to the old tail, and the list being appended. If there are concurrent -// push/append operations, each may introduce another such segment. But they -// all eventually get resolved by their respective updates of their old tail's -// "next" value. This also means that try_pop operation must handle an object -// differently depending on its "next" value. -// -// A push operation is just a degenerate append, where the object being pushed -// is both the head and the tail of the list being appended. -template -void NonblockingQueue::append(T& first, T& last) { - assert(next(last) == nullptr, "precondition"); - // Make last the new end of the queue. Any further push/appends will - // extend after last. We will try to extend from the previous end of - // queue. - set_next(last, end_marker()); - T* old_tail = AtomicAccess::xchg(&_tail, &last); - if (old_tail == nullptr) { - // If old_tail is null then the queue was empty, and _head must also be - // null. The correctness of this assertion depends on try_pop clearing - // first _head then _tail when taking the last entry. - assert(AtomicAccess::load(&_head) == nullptr, "invariant"); - // Fall through to common update of _head. - } else if (is_end(AtomicAccess::cmpxchg(next_ptr(*old_tail), end_marker(), &first))) { - // Successfully extended the queue list from old_tail to first. No - // other push/append could have competed with us, because we claimed - // old_tail for extension. We won any races with try_pop by changing - // away from end-marker. So we're done. - // - // Note that ABA is possible here. A concurrent try_pop could take - // old_tail before our update of old_tail's next_ptr, old_tail gets - // recycled and re-added to the end of this queue, and then we - // successfully cmpxchg, making the list in _tail circular. Callers - // must ensure this can't happen. - return; - } else { - // A concurrent try_pop has claimed old_tail, so it is no longer in the - // list. The queue was logically empty. _head is either null or - // old_tail, depending on how far try_pop operations have progressed. - DEBUG_ONLY(T* old_head = AtomicAccess::load(&_head);) - assert((old_head == nullptr) || (old_head == old_tail), "invariant"); - // Fall through to common update of _head. - } - // The queue was empty, and first should become the new _head. The queue - // will appear to be empty to any further try_pops until done. - AtomicAccess::store(&_head, &first); -} - -template -bool NonblockingQueue::try_pop(T** node_ptr) { - // We only need memory_order_consume. Upgrade it to "load_acquire" - // as the memory_order_consume API is not ready for use yet. - T* old_head = AtomicAccess::load_acquire(&_head); - if (old_head == nullptr) { - *node_ptr = nullptr; - return true; // Queue is empty. - } - - T* next_node = AtomicAccess::load_acquire(next_ptr(*old_head)); - if (!is_end(next_node)) { - // [Clause 1] - // There are several cases for next_node. - // (1) next_node is the extension of the queue's list. - // (2) next_node is null, because a competing try_pop took old_head. - // (3) next_node is the extension of some unrelated list, because a - // competing try_pop took old_head and put it in some other list. - // - // Attempt to advance the list, replacing old_head with next_node in - // _head. The success or failure of that attempt, along with the value - // of next_node, are used to partially determine which case we're in and - // how to proceed. In particular, advancement will fail for case (3). - if (old_head != AtomicAccess::cmpxchg(&_head, old_head, next_node)) { - // [Clause 1a] - // The cmpxchg to advance the list failed; a concurrent try_pop won - // the race and claimed old_head. This can happen for any of the - // next_node cases. - return false; - } else if (next_node == nullptr) { - // [Clause 1b] - // The cmpxchg to advance the list succeeded, but a concurrent try_pop - // has already claimed old_head (see [Clause 2] - old_head was the last - // entry in the list) by nulling old_head's next field. The advance set - // _head to null, "helping" the competing try_pop. _head will remain - // nullptr until a subsequent push/append. This is a lost race, and we - // report it as such for consistency, though we could report the queue - // was empty. We don't attempt to further help [Clause 2] by also - // trying to set _tail to nullptr, as that would just ensure that one or - // the other cmpxchg is a wasted failure. - return false; - } else { - // [Clause 1c] - // Successfully advanced the list and claimed old_head. next_node was - // in the extension of the queue's list. Return old_head after - // unlinking it from next_node. - set_next(*old_head, nullptr); - *node_ptr = old_head; - return true; - } - - } else if (is_end(AtomicAccess::cmpxchg(next_ptr(*old_head), next_node, (T*)nullptr))) { - // [Clause 2] - // Old_head was the last entry and we've claimed it by setting its next - // value to null. However, this leaves the queue in disarray. Fix up - // the queue, possibly in conjunction with other concurrent operations. - // Any further try_pops will consider the queue empty until a - // push/append completes by installing a new head. - - // The order of the two cmpxchgs doesn't matter algorithmically, but - // dealing with _head first gives a stronger invariant in append, and is - // also consistent with [Clause 1b]. - - // Attempt to change the queue head from old_head to null. Failure of - // the cmpxchg indicates a concurrent operation updated _head first. That - // could be either a push/append or a try_pop in [Clause 1b]. - AtomicAccess::cmpxchg(&_head, old_head, (T*)nullptr); - - // Attempt to change the queue tail from old_head to null. Failure of - // the cmpxchg indicates that a concurrent push/append updated _tail first. - // That operation will eventually recognize the old tail (our old_head) is - // no longer in the list and update _head from the list being appended. - AtomicAccess::cmpxchg(&_tail, old_head, (T*)nullptr); - - // The queue has been restored to order, and we can return old_head. - *node_ptr = old_head; - return true; - - } else { - // [Clause 3] - // Old_head was the last entry in the list, but either a concurrent - // try_pop claimed it first or a concurrent push/append extended the - // list from it. Either way, we lost the race to claim it. - return false; - } -} - -template -T* NonblockingQueue::pop() { - T* result = nullptr; - // Typically try_pop() will succeed without retrying many times, thus we - // omit SpinPause in the loop body. SpinPause or yield may be worthwhile - // in rare, highly contended cases, and client code could implement such - // with try_pop(). - while (!try_pop(&result)) {} - return result; -} - -template -Pair NonblockingQueue::take_all() { - T* tail = AtomicAccess::load(&_tail); - if (tail != nullptr) set_next(*tail, nullptr); // Clear end marker. - Pair result(AtomicAccess::load(&_head), tail); - AtomicAccess::store(&_head, (T*)nullptr); - AtomicAccess::store(&_tail, (T*)nullptr); - return result; -} - -#endif // SHARE_UTILITIES_NONBLOCKINGQUEUE_INLINE_HPP diff --git a/test/hotspot/gtest/utilities/test_nonblockingQueue.cpp b/test/hotspot/gtest/utilities/test_nonblockingQueue.cpp deleted file mode 100644 index ae299730f6e1a..0000000000000 --- a/test/hotspot/gtest/utilities/test_nonblockingQueue.cpp +++ /dev/null @@ -1,283 +0,0 @@ -/* - * Copyright (c) 2021, 2025, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -#include "memory/allocation.inline.hpp" -#include "runtime/atomicAccess.hpp" -#include "utilities/globalDefinitions.hpp" -#include "utilities/nonblockingQueue.inline.hpp" -#include "utilities/pair.hpp" -#include "threadHelper.inline.hpp" -#include "unittest.hpp" - -#include - -class NonblockingQueueTestElement { - typedef NonblockingQueueTestElement Element; - - Element* volatile _entry; - Element* volatile _entry1; - size_t _id; - - static Element* volatile* entry_ptr(Element& e) { return &e._entry; } - static Element* volatile* entry1_ptr(Element& e) { return &e._entry1; } - -public: - using TestQueue = NonblockingQueue; - using TestQueue1 = NonblockingQueue; - - NonblockingQueueTestElement(size_t id = 0) : _entry(), _entry1(), _id(id) {} - size_t id() const { return _id; } - void set_id(size_t value) { _id = value; } - Element* next() { return _entry; } - Element* next1() { return _entry1; } -}; - -typedef NonblockingQueueTestElement Element; -typedef Element::TestQueue TestQueue; -typedef Element::TestQueue1 TestQueue1; - -static void initialize(Element* elements, size_t size, TestQueue* queue) { - for (size_t i = 0; i < size; ++i) { - elements[i].set_id(i); - } - ASSERT_TRUE(queue->empty()); - ASSERT_EQ(0u, queue->length()); - ASSERT_TRUE(queue->is_end(queue->first())); - ASSERT_TRUE(queue->pop() == nullptr); - - for (size_t id = 0; id < size; ++id) { - ASSERT_EQ(id, queue->length()); - Element* e = &elements[id]; - ASSERT_EQ(id, e->id()); - queue->push(*e); - ASSERT_FALSE(queue->empty()); - // first() is always the oldest element. - ASSERT_EQ(&elements[0], queue->first()); - } -} - -class NonblockingQueueTestBasics : public ::testing::Test { -public: - NonblockingQueueTestBasics(); - - static const size_t nelements = 10; - Element elements[nelements]; - TestQueue queue; -}; - -const size_t NonblockingQueueTestBasics::nelements; - -NonblockingQueueTestBasics::NonblockingQueueTestBasics() : queue() { - initialize(elements, nelements, &queue); -} - -TEST_F(NonblockingQueueTestBasics, pop) { - for (size_t i = 0; i < nelements; ++i) { - ASSERT_FALSE(queue.empty()); - ASSERT_EQ(nelements - i, queue.length()); - Element* e = queue.pop(); - ASSERT_TRUE(e != nullptr); - ASSERT_EQ(&elements[i], e); - ASSERT_EQ(i, e->id()); - } - ASSERT_TRUE(queue.empty()); - ASSERT_EQ(0u, queue.length()); - ASSERT_TRUE(queue.pop() == nullptr); -} - -TEST_F(NonblockingQueueTestBasics, append) { - TestQueue other_queue; - ASSERT_TRUE(other_queue.empty()); - ASSERT_EQ(0u, other_queue.length()); - ASSERT_TRUE(other_queue.is_end(other_queue.first())); - ASSERT_TRUE(other_queue.pop() == nullptr); - - Pair pair = queue.take_all(); - other_queue.append(*pair.first, *pair.second); - ASSERT_EQ(nelements, other_queue.length()); - ASSERT_TRUE(queue.empty()); - ASSERT_EQ(0u, queue.length()); - ASSERT_TRUE(queue.is_end(queue.first())); - ASSERT_TRUE(queue.pop() == nullptr); - - for (size_t i = 0; i < nelements; ++i) { - ASSERT_EQ(nelements - i, other_queue.length()); - Element* e = other_queue.pop(); - ASSERT_TRUE(e != nullptr); - ASSERT_EQ(&elements[i], e); - ASSERT_EQ(i, e->id()); - } - ASSERT_EQ(0u, other_queue.length()); - ASSERT_TRUE(other_queue.pop() == nullptr); -} - -TEST_F(NonblockingQueueTestBasics, two_queues) { - TestQueue1 queue1; - ASSERT_TRUE(queue1.pop() == nullptr); - - for (size_t id = 0; id < nelements; ++id) { - queue1.push(elements[id]); - } - ASSERT_EQ(nelements, queue1.length()); - Element* e0 = queue.first(); - Element* e1 = queue1.first(); - ASSERT_TRUE(e0 != nullptr); - ASSERT_TRUE(e1 != nullptr); - ASSERT_FALSE(queue.is_end(e0)); - ASSERT_FALSE(queue1.is_end(e1)); - while (!queue.is_end(e0) && !queue1.is_end(e1)) { - ASSERT_EQ(e0, e1); - e0 = e0->next(); - e1 = e1->next1(); - } - ASSERT_TRUE(queue.is_end(e0)); - ASSERT_TRUE(queue1.is_end(e1)); - - for (size_t i = 0; i < nelements; ++i) { - ASSERT_EQ(nelements - i, queue.length()); - ASSERT_EQ(nelements - i, queue1.length()); - - Element* e = queue.pop(); - ASSERT_TRUE(e != nullptr); - ASSERT_EQ(&elements[i], e); - ASSERT_EQ(i, e->id()); - - Element* e1 = queue1.pop(); - ASSERT_TRUE(e1 != nullptr); - ASSERT_EQ(&elements[i], e1); - ASSERT_EQ(i, e1->id()); - - ASSERT_EQ(e, e1); - } - ASSERT_EQ(0u, queue.length()); - ASSERT_EQ(0u, queue1.length()); - ASSERT_TRUE(queue.pop() == nullptr); - ASSERT_TRUE(queue1.pop() == nullptr); -} - -class NonblockingQueueTestThread : public JavaTestThread { - uint _id; - TestQueue* _from; - TestQueue* _to; - volatile size_t* _processed; - size_t _process_limit; - size_t _local_processed; - volatile bool _ready; - -public: - NonblockingQueueTestThread(Semaphore* post, - uint id, - TestQueue* from, - TestQueue* to, - volatile size_t* processed, - size_t process_limit) : - JavaTestThread(post), - _id(id), - _from(from), - _to(to), - _processed(processed), - _process_limit(process_limit), - _local_processed(0), - _ready(false) - {} - - virtual void main_run() { - AtomicAccess::release_store_fence(&_ready, true); - while (true) { - Element* e = _from->pop(); - if (e != nullptr) { - _to->push(*e); - AtomicAccess::inc(_processed); - ++_local_processed; - } else if (AtomicAccess::load_acquire(_processed) == _process_limit) { - tty->print_cr("thread %u processed %zu", _id, _local_processed); - return; - } - } - } - - bool ready() const { return AtomicAccess::load_acquire(&_ready); } -}; - -TEST_VM(NonblockingQueueTest, stress) { - Semaphore post; - TestQueue initial_queue; - TestQueue start_queue; - TestQueue middle_queue; - TestQueue final_queue; - volatile size_t stage1_processed = 0; - volatile size_t stage2_processed = 0; - - const size_t nelements = 10000; - Element* elements = NEW_C_HEAP_ARRAY(Element, nelements, mtOther); - for (size_t id = 0; id < nelements; ++id) { - ::new (&elements[id]) Element(id); - initial_queue.push(elements[id]); - } - ASSERT_EQ(nelements, initial_queue.length()); - - // - stage1 threads pop from start_queue and push to middle_queue. - // - stage2 threads pop from middle_queue and push to final_queue. - // - all threads in a stage count the number of elements processed in - // their corresponding stageN_processed counter. - - const uint stage1_threads = 2; - const uint stage2_threads = 2; - const uint nthreads = stage1_threads + stage2_threads; - NonblockingQueueTestThread* threads[nthreads] = {}; - - for (uint i = 0; i < ARRAY_SIZE(threads); ++i) { - TestQueue* from = &start_queue; - TestQueue* to = &middle_queue; - volatile size_t* processed = &stage1_processed; - if (i >= stage1_threads) { - from = &middle_queue; - to = &final_queue; - processed = &stage2_processed; - } - threads[i] = - new NonblockingQueueTestThread(&post, i, from, to, processed, nelements); - threads[i]->doit(); - while (!threads[i]->ready()) {} // Wait until ready to start test. - } - - // Transfer elements to start_queue to start test. - Pair pair = initial_queue.take_all(); - start_queue.append(*pair.first, *pair.second); - - // Wait for all threads to complete. - for (uint i = 0; i < nthreads; ++i) { - post.wait(); - } - - // Verify expected state. - ASSERT_EQ(nelements, stage1_processed); - ASSERT_EQ(nelements, stage2_processed); - ASSERT_EQ(0u, initial_queue.length()); - ASSERT_EQ(0u, start_queue.length()); - ASSERT_EQ(0u, middle_queue.length()); - ASSERT_EQ(nelements, final_queue.length()); - while (final_queue.pop() != nullptr) {} - - FREE_C_HEAP_ARRAY(Element, elements); -}