Allow efficient operations on a shared queue by multiple clients acting concurrently.
We can model a queue by assigning increasing integers that encode the order of items. To minimize conflicts for concurrent operations, we combine the integers in a tuple with a random element to make the final key unique.
The ordering of keys preserves the FIFO order of items and therefore lets us identify the next item to be dequeued without maintaining a pointer to it.
We store each item in the queue within a subspace, which takes care of packing our integer indexes into byte strings.
queue = fdb.Subspace(('Q',))
As a first cut, we could store each item with a single key-value pair using increasing integer indexes for subsequent items:
tr[queue[index]] = value
However, this would leave concurrent enqueue operations vulnerable to conflicts. To minimize these conflicts, we can add a random integer to the key.
tr[queue[index][random_int]] = value
With this data model, items enqueued concurrently may be assigned the same index, but the keys as a whole will still be ordered (in this case, randomly). By using a snapshot read, we guarantee that enqueuing will be conflict-free.
To implement this model, we need an efficient way of finding the first and last index presently in use. FoundationDB’s range reads have limit and reverse options that let us accomplish this. Given the range of the subspace:
r = queue.range()
we can find the first and last key-value pairs in the range with:
tr.get_range(r.start, r.stop, limit=1) # first tr.get_range(r.start, r.stop, limit=1, reverse=True) # last
High-Contention Dequeue Operations
To minimize conflicts during dequeue operations, we can use a staging technique to service the requests. If a dequeue operation doesn’t initially succeed, it registers a dequeue request in a semi-ordered set of such requests. It then enters a retry loop in which it attempts to fulfill outstanding requests.
The following is a simple implementation of the basic pattern:
import os queue = fdb.Subspace(('Q',)) @fdb.transactional def dequeue(tr): item = first_item(tr) if item is None: return None del tr[item.key] return item.value @fdb.transactional def enqueue(tr, value): tr[queue[last_index(tr) + 1][os.urandom(20)]] = value @fdb.transactional def last_index(tr): r = queue.range() for key, _ in tr.snapshot.get_range(r.start, r.stop, limit=1, reverse=True): return queue.unpack(key) return 0 @fdb.transactional def first_item(tr): r = queue.range() for kv in tr.get_range(r.start, r.stop, limit=1): return kv