Queues

Python Java

Challenge

Allow efficient operations on a shared queue by multiple clients acting concurrently.

Explanation

We can model a queue by assigning increasing integers that encode the order of items. To minimize conflicts for concurrent operations, we combine the integers in a tuple with a random element to make the final key unique.

Ordering

The ordering of keys preserves the FIFO order of items and therefore lets us identify the next item to be dequeued without maintaining a pointer to it.

Pattern

We store each item in the queue within a subspace, which takes care of packing our integer indexes into byte strings.

queue = fdb.Subspace(('Q',))

As a first cut, we could store each item with a single key-value pair using increasing integer indexes for subsequent items:

tr[queue[index]] = value

However, this would leave concurrent enqueue operations vulnerable to conflicts. To minimize these conflicts, we can add a random integer to the key.

tr[queue[index][random_int]] = value

With this data model, items enqueued concurrently may be assigned the same index, but the keys as a whole will still be ordered (in this case, randomly). By using a snapshot read, we guarantee that enqueuing will be conflict-free.

To implement this model, we need an efficient way of finding the first and last index presently in use. FoundationDB’s range reads have limit and reverse options that let us accomplish this. Given the range of the subspace:

r = queue.range()

we can find the first and last key-value pairs in the range with:

tr.get_range(r.start, r.stop, limit=1) # first
tr.get_range(r.start, r.stop, limit=1, reverse=True) # last

Extensions

High-Contention Dequeue Operations

To minimize conflicts during dequeue operations, we can use a staging technique to service the requests. If a dequeue operation doesn’t initially succeed, it registers a dequeue request in a semi-ordered set of such requests. It then enters a retry loop in which it attempts to fulfill outstanding requests.

Code

The following is a simple implementation of the basic pattern:

import os

queue = fdb.Subspace(('Q',))

@fdb.transactional
def dequeue(tr):
    item = first_item(tr)
    if item is None: return None
    del tr[item.key]
    return item.value

@fdb.transactional
def enqueue(tr, value):
    tr[queue[last_index(tr) + 1][os.urandom(20)]] = value

@fdb.transactional
def last_index(tr):
    r = queue.range()
    for key, _ in tr.snapshot.get_range(r.start, r.stop, limit=1, reverse=True):
        return queue.unpack(key)[0]
    return 0

@fdb.transactional
def first_item(tr):
    r = queue.range()
    for kv in tr.get_range(r.start, r.stop, limit=1):
        return kv