Skip to content

Commit 789a9c8

Browse files
committed
mpmc: feature-gate crossbeam implementation
Adds a feature-gate for the `mpmc::Queue` crossbeam implementation.
1 parent cd3126b commit 789a9c8

File tree

3 files changed

+456
-353
lines changed

3 files changed

+456
-353
lines changed

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ alloc = []
5858

5959
nightly = []
6060

61+
# Implement MPMC using `crossbeam::ArrayQueue`
62+
mpmc_crossbeam = ["dep:crossbeam-utils"]
63+
6164
[dependencies]
6265
bytes = { version = "1", default-features = false, optional = true }
6366
portable-atomic = { version = "1.0", optional = true }
@@ -67,7 +70,7 @@ ufmt = { version = "0.2", optional = true }
6770
ufmt-write = { version = "0.1", optional = true }
6871
defmt = { version = "1.0.1", optional = true }
6972
zeroize = { version = "1.8", optional = true, default-features = false, features = ["derive"] }
70-
crossbeam-utils = "0.8"
73+
crossbeam-utils = { version = "0.8", optional = true }
7174

7275
# for the pool module
7376
[target.'cfg(any(target_arch = "arm", target_pointer_width = "32", target_pointer_width = "64"))'.dependencies]

src/mpmc.rs

Lines changed: 7 additions & 352 deletions
Original file line numberDiff line numberDiff line change
@@ -68,357 +68,12 @@
6868
//!
6969
//! [bounded MPMC queue]: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
7070
71-
use core::{cell::UnsafeCell, mem::MaybeUninit};
72-
73-
#[cfg(not(feature = "portable-atomic"))]
74-
use core::sync::atomic;
75-
#[cfg(feature = "portable-atomic")]
76-
use portable_atomic as atomic;
77-
78-
use atomic::Ordering;
79-
80-
use crate::storage::{OwnedStorage, Storage, ViewStorage};
81-
71+
#[cfg(feature = "mpmc_crossbeam")]
8272
pub mod crossbeam_array_queue;
73+
#[cfg(feature = "mpmc_crossbeam")]
74+
pub use crossbeam_array_queue::*;
8375

84-
#[cfg(feature = "mpmc_large")]
85-
type AtomicTargetSize = atomic::AtomicUsize;
86-
#[cfg(not(feature = "mpmc_large"))]
87-
type AtomicTargetSize = atomic::AtomicU8;
88-
89-
#[cfg(feature = "mpmc_large")]
90-
type UintSize = usize;
91-
#[cfg(not(feature = "mpmc_large"))]
92-
type UintSize = u8;
93-
94-
#[cfg(feature = "mpmc_large")]
95-
type IntSize = isize;
96-
#[cfg(not(feature = "mpmc_large"))]
97-
type IntSize = i8;
98-
99-
/// Base struct for [`Queue`] and [`QueueView`], generic over the [`Storage`].
100-
///
101-
/// In most cases you should use [`Queue`] or [`QueueView`] directly. Only use this
102-
/// struct if you want to write code that's generic over both.
103-
pub struct QueueInner<T, S: Storage> {
104-
dequeue_pos: AtomicTargetSize,
105-
enqueue_pos: AtomicTargetSize,
106-
buffer: UnsafeCell<S::Buffer<Cell<T>>>,
107-
}
108-
109-
/// A statically allocated multi-producer, multi-consumer queue with a capacity of `N` elements.
110-
///
111-
/// <div class="warning">
112-
///
113-
/// `N` must be a power of 2.
114-
///
115-
/// </div>
116-
///
117-
/// The maximum value of `N` is 128 if the `mpmc_large` feature is not enabled.
118-
pub type Queue<T, const N: usize> = QueueInner<T, OwnedStorage<N>>;
119-
120-
/// A [`Queue`] with dynamic capacity.
121-
///
122-
/// [`Queue`] coerces to `QueueView`. `QueueView` is `!Sized`, meaning it can only ever be used by reference.
123-
pub type QueueView<T> = QueueInner<T, ViewStorage>;
124-
125-
impl<T, const N: usize> Queue<T, N> {
126-
/// Creates an empty queue.
127-
pub const fn new() -> Self {
128-
const {
129-
assert!(N > 1);
130-
assert!(N.is_power_of_two());
131-
assert!(N < UintSize::MAX as usize);
132-
}
133-
134-
let mut cell_count = 0;
135-
136-
let mut result_cells: [Cell<T>; N] = [const { Cell::new(0) }; N];
137-
while cell_count != N {
138-
result_cells[cell_count] = Cell::new(cell_count);
139-
cell_count += 1;
140-
}
141-
142-
Self {
143-
buffer: UnsafeCell::new(result_cells),
144-
dequeue_pos: AtomicTargetSize::new(0),
145-
enqueue_pos: AtomicTargetSize::new(0),
146-
}
147-
}
148-
149-
/// Used in `Storage` implementation.
150-
pub(crate) fn as_view_private(&self) -> &QueueView<T> {
151-
self
152-
}
153-
/// Used in `Storage` implementation.
154-
pub(crate) fn as_view_mut_private(&mut self) -> &mut QueueView<T> {
155-
self
156-
}
157-
}
158-
159-
impl<T, S: Storage> QueueInner<T, S> {
160-
/// Returns the maximum number of elements the queue can hold.
161-
#[inline]
162-
pub fn capacity(&self) -> usize {
163-
S::len(self.buffer.get())
164-
}
165-
166-
/// Get a reference to the `Queue`, erasing the `N` const-generic.
167-
///
168-
///
169-
/// ```rust
170-
/// # use heapless::mpmc::{Queue, QueueView};
171-
/// let queue: Queue<u8, 2> = Queue::new();
172-
/// let view: &QueueView<u8> = queue.as_view();
173-
/// ```
174-
///
175-
/// It is often preferable to do the same through type coerction, since `Queue<T, N>` implements `Unsize<QueueView<T>>`:
176-
///
177-
/// ```rust
178-
/// # use heapless::mpmc::{Queue, QueueView};
179-
/// let queue: Queue<u8, 2> = Queue::new();
180-
/// let view: &QueueView<u8> = &queue;
181-
/// ```
182-
#[inline]
183-
pub fn as_view(&self) -> &QueueView<T> {
184-
S::as_mpmc_view(self)
185-
}
186-
187-
/// Get a mutable reference to the `Queue`, erasing the `N` const-generic.
188-
///
189-
/// ```rust
190-
/// # use heapless::mpmc::{Queue, QueueView};
191-
/// let mut queue: Queue<u8, 2> = Queue::new();
192-
/// let view: &mut QueueView<u8> = queue.as_mut_view();
193-
/// ```
194-
///
195-
/// It is often preferable to do the same through type coerction, since `Queue<T, N>` implements `Unsize<QueueView<T>>`:
196-
///
197-
/// ```rust
198-
/// # use heapless::mpmc::{Queue, QueueView};
199-
/// let mut queue: Queue<u8, 2> = Queue::new();
200-
/// let view: &mut QueueView<u8> = &mut queue;
201-
/// ```
202-
#[inline]
203-
pub fn as_mut_view(&mut self) -> &mut QueueView<T> {
204-
S::as_mpmc_mut_view(self)
205-
}
206-
207-
fn mask(&self) -> UintSize {
208-
(S::len(self.buffer.get()) - 1) as _
209-
}
210-
211-
/// Returns the item in the front of the queue, or `None` if the queue is empty.
212-
pub fn dequeue(&self) -> Option<T> {
213-
unsafe { dequeue(S::as_ptr(self.buffer.get()), &self.dequeue_pos, self.mask()) }
214-
}
215-
216-
/// Adds an `item` to the end of the queue.
217-
///
218-
/// Returns back the `item` if the queue is full.
219-
pub fn enqueue(&self, item: T) -> Result<(), T> {
220-
unsafe {
221-
enqueue(
222-
S::as_ptr(self.buffer.get()),
223-
&self.enqueue_pos,
224-
self.mask(),
225-
item,
226-
)
227-
}
228-
}
229-
}
230-
231-
impl<T, const N: usize> Default for Queue<T, N> {
232-
fn default() -> Self {
233-
Self::new()
234-
}
235-
}
236-
237-
impl<T, S: Storage> Drop for QueueInner<T, S> {
238-
fn drop(&mut self) {
239-
// Drop all elements currently in the queue.
240-
while self.dequeue().is_some() {}
241-
}
242-
}
243-
244-
unsafe impl<T, S: Storage> Sync for QueueInner<T, S> where T: Send {}
245-
246-
struct Cell<T> {
247-
data: MaybeUninit<T>,
248-
sequence: AtomicTargetSize,
249-
}
250-
251-
impl<T> Cell<T> {
252-
const fn new(seq: usize) -> Self {
253-
Self {
254-
data: MaybeUninit::uninit(),
255-
sequence: AtomicTargetSize::new(seq as UintSize),
256-
}
257-
}
258-
}
259-
260-
unsafe fn dequeue<T>(
261-
buffer: *mut Cell<T>,
262-
dequeue_pos: &AtomicTargetSize,
263-
mask: UintSize,
264-
) -> Option<T> {
265-
let mut pos = dequeue_pos.load(Ordering::Relaxed);
266-
267-
let mut cell;
268-
loop {
269-
cell = buffer.add(usize::from(pos & mask));
270-
let seq = (*cell).sequence.load(Ordering::Acquire);
271-
let dif = (seq as IntSize).wrapping_sub((pos.wrapping_add(1)) as IntSize);
272-
273-
match dif.cmp(&0) {
274-
core::cmp::Ordering::Equal => {
275-
if dequeue_pos
276-
.compare_exchange_weak(
277-
pos,
278-
pos.wrapping_add(1),
279-
Ordering::Relaxed,
280-
Ordering::Relaxed,
281-
)
282-
.is_ok()
283-
{
284-
break;
285-
}
286-
}
287-
core::cmp::Ordering::Less => {
288-
return None;
289-
}
290-
core::cmp::Ordering::Greater => {
291-
pos = dequeue_pos.load(Ordering::Relaxed);
292-
}
293-
}
294-
}
295-
296-
let data = (*cell).data.as_ptr().read();
297-
(*cell)
298-
.sequence
299-
.store(pos.wrapping_add(mask).wrapping_add(1), Ordering::Release);
300-
Some(data)
301-
}
302-
303-
unsafe fn enqueue<T>(
304-
buffer: *mut Cell<T>,
305-
enqueue_pos: &AtomicTargetSize,
306-
mask: UintSize,
307-
item: T,
308-
) -> Result<(), T> {
309-
let mut pos = enqueue_pos.load(Ordering::Relaxed);
310-
311-
let mut cell;
312-
loop {
313-
cell = buffer.add(usize::from(pos & mask));
314-
let seq = (*cell).sequence.load(Ordering::Acquire);
315-
let dif = (seq as IntSize).wrapping_sub(pos as IntSize);
316-
317-
match dif.cmp(&0) {
318-
core::cmp::Ordering::Equal => {
319-
if enqueue_pos
320-
.compare_exchange_weak(
321-
pos,
322-
pos.wrapping_add(1),
323-
Ordering::Relaxed,
324-
Ordering::Relaxed,
325-
)
326-
.is_ok()
327-
{
328-
break;
329-
}
330-
}
331-
core::cmp::Ordering::Less => {
332-
return Err(item);
333-
}
334-
core::cmp::Ordering::Greater => {
335-
pos = enqueue_pos.load(Ordering::Relaxed);
336-
}
337-
}
338-
}
339-
340-
(*cell).data.as_mut_ptr().write(item);
341-
(*cell)
342-
.sequence
343-
.store(pos.wrapping_add(1), Ordering::Release);
344-
Ok(())
345-
}
346-
347-
#[cfg(test)]
348-
mod tests {
349-
use static_assertions::assert_not_impl_any;
350-
351-
use super::Queue;
352-
353-
// Ensure a `Queue` containing `!Send` values stays `!Send` itself.
354-
assert_not_impl_any!(Queue<*const (), 4>: Send);
355-
356-
#[test]
357-
fn memory_leak() {
358-
droppable!();
359-
360-
let q = Queue::<_, 2>::new();
361-
q.enqueue(Droppable::new()).unwrap_or_else(|_| panic!());
362-
q.enqueue(Droppable::new()).unwrap_or_else(|_| panic!());
363-
drop(q);
364-
365-
assert_eq!(Droppable::count(), 0);
366-
}
367-
368-
#[test]
369-
fn sanity() {
370-
let q = Queue::<_, 2>::new();
371-
q.enqueue(0).unwrap();
372-
q.enqueue(1).unwrap();
373-
assert!(q.enqueue(2).is_err());
374-
375-
assert_eq!(q.dequeue(), Some(0));
376-
assert_eq!(q.dequeue(), Some(1));
377-
assert_eq!(q.dequeue(), None);
378-
}
379-
380-
#[test]
381-
fn drain_at_pos255() {
382-
let q = Queue::<_, 2>::new();
383-
for _ in 0..255 {
384-
assert!(q.enqueue(0).is_ok());
385-
assert_eq!(q.dequeue(), Some(0));
386-
}
387-
388-
// Queue is empty, this should not block forever.
389-
assert_eq!(q.dequeue(), None);
390-
}
391-
392-
#[test]
393-
fn full_at_wrapped_pos0() {
394-
let q = Queue::<_, 2>::new();
395-
for _ in 0..254 {
396-
assert!(q.enqueue(0).is_ok());
397-
assert_eq!(q.dequeue(), Some(0));
398-
}
399-
assert!(q.enqueue(0).is_ok());
400-
assert!(q.enqueue(0).is_ok());
401-
// this should not block forever
402-
assert!(q.enqueue(0).is_err());
403-
}
404-
405-
#[test]
406-
fn enqueue_full() {
407-
#[cfg(not(feature = "mpmc_large"))]
408-
const CAPACITY: usize = 128;
409-
410-
#[cfg(feature = "mpmc_large")]
411-
const CAPACITY: usize = 256;
412-
413-
let q: Queue<u8, CAPACITY> = Queue::new();
414-
415-
assert_eq!(q.capacity(), CAPACITY);
416-
417-
for _ in 0..CAPACITY {
418-
q.enqueue(0xAA).unwrap();
419-
}
420-
421-
// Queue is full, this should not block forever.
422-
q.enqueue(0x55).unwrap_err();
423-
}
424-
}
76+
#[cfg(not(feature = "mpmc_crossbeam"))]
77+
mod original;
78+
#[cfg(not(feature = "mpmc_crossbeam"))]
79+
pub use original::*;

0 commit comments

Comments
 (0)