Simple Scalable Unbounded Queue

Created: 2022-11-22T12:36:19-06:00

Return to the Index

This card pertains to a resource available on the internet.

Oliver Giersch and Jörg Nolte published a paper [..] called Fast and Portable Concurrent FIFO Queues With Deterministic Memory Reclamation" [ref]. In this, they note that fetch_add (atomic "fetch and add", or FAA) scales better than looping with [..] "compare and swap"
Use FAA on the producer to get the current buffer while also reserving an index into it atomically. If the reserved index is outside the buffer's bounds, then install a new buffer to the producer. It's the most ideal starting point for a concurrent queue but it has a few edge case that needs to be addressed.
type Slot(T):
value: Uninit(T)
ready: Atomic(bool)

read() -> ?T:
if not LOAD(&ready, Acquire): return null
return READ(&value)

write(v: T):
WRITE(&value, v)
STORE(&ready, true, Release)

@align(4096)
type Buffer(T):
slots: [buffer_size]Slot(T)
next: Atomic(?*Buffer(T))
pending: Atomic(isize)

// basic refcount stuff
unref(count: isize):
p = ADD(&pending, count, Release)
if (p + count != 0) return

FENCE(Acquire)
free(this)

type Queue(T):
producer: Atomic(?*Buffer(T))
consumer: Atomic(?*Buffer(T))

push(value: T):
cached_buf = null
defer if (cached_buf != null) free(cached_buf)

loop:
// fast path
(buf, idx) = decode(ADD(&producer, 1, Acquire))
if (buf != null) and (idx < buffer_size):
return buf.slots[idx].write(value)

// find where to register & link next buffer
prev_link = if (buf != null) &buf.next else &consumer
next = LOAD(prev_link, Acquire)

if (next == null):
// cache the malloc
if (cached_buf == null) cached_buf = malloc(Buffer(T))
next = cached_buf

match CAS(prev_link, null, next, Release, Acquire):
Ok(_): cached_buf = null // registered so dont free it
Err(updated): next = updated

p = LOAD(&producer, Relaxed)
(cur_buf, cur_idx) = decode(p)
loop:
// retry FAA if failed to install
if (buf != cur_buf):
if (buf != null) buf.unref(-1)
break

// install new buffer + reserve slot 0 in it
if Err(updated) = CAS(&producer, p, encode(next, 1), Release, Relaxed):
p = updated
continue

(old_buf, inc) = (buf, cur_idx - buffer_size)
if (buf == null):
(old_buf, inc) = (next, 1) // account for consumer

old_buf.unref(inc)
return next.slots[0].write(value)

pop() -> ?T:
(buf, idx) = decode(LOAD(&consumer, Acquire))
if (buf == bull): return null

if (idx == buffer_size):
next = LOAD(&buf.next, Acquire)
if (next == null): return null

buf.unref(-1)
(buf, idx) = (next, 0)
STORE(&consumer, encode(buf, idx), Unordered)

value = buf.slots[idx].read()
if (value != null):
STORE(&consumer, encode(buf, idx + 1), Unordered)
return value