Turi Create  4.0
deferred_rwlock.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef DEFERRED_RWLOCK_HPP
7 #define DEFERRED_RWLOCK_HPP
8 #include <core/parallel/pthread_tools.hpp>
9 #include <core/parallel/queued_rwlock.hpp>
10 #include <core/logging/assertions.hpp>
11 namespace turi {
12 /**
13  * \ingroup threading
14  * The deferred rwlock is a variant of a regular rwlock which is completely
15  * non-blocking.
16  *
17  * The user creates request objects:
18  * \code
19  * // we use new here, but it is preferable that you have a big statically
20  * // allocated pool of request objects
21  * deferred_rwlock::request* request = new deferred_rwlock::request;
22  * // request->id can be used here to associate additional information with the
23  * // request. note that request->id is a 62 bit integer
24  * \endcode
25  *
26  * After which, all lock acquisition / release operations return a released
27  * argument. For instance, to acquire a readlock
28  * \code
29  * deferred_rwlock::request* released;
30  * rwlock.readlock(request, &released);
31  * \endcode
32  *
33  * or to complete a readlock:
34  * \code
35  * deferred_rwlock::request* released;
36  * rwlock.complete_rdlock(&released);
37  * \endcode
38  *
39  * All operations are \b non-blocking, meaning that the lock request you
40  * issued/completed, need not be the set of requests that are satisfied.
41  * The set of released locks returned is a linked list of locks
42  * that are satisfied at completion of the operation.
43  *
44  * For instance:
45  * \code
46  * deferred_rwlock::request writelock;
47  * deferred_rwlock::request readlocks[4];
48  * deferred_rwlock::request* released;
49  *
50  * // we are going to acquire a write lock
51  * // this will be successful if rwlock is a fresh lock
52  * bool success = rwlock.writelock(writelock);
53  *
54  * // acquire 4 read locks consecutively. Note that this does not block.
55  * // but since a write lock has already been acquired, all of these will
56  * // return num_released = 0
57  * int num_released;
58  * num_released = rwlock.readlock(readlocks[0], &released);
59  * num_released = rwlock.readlock(readlocks[1], &released);
60  * num_released = rwlock.readlock(readlocks[2], &released);
61  * num_released = rwlock.readlock(readlocks[3], &released);
62  *
63  * // release the write lock we acquired earlier.
64  * num_released = rwlock.wrunlock(writelock, &released);
65  *
66  * // since the write lock is released, all readers can proceed and
67  * // num_released will be 4 here. released now is a linked list of read requests.
68  * // For instance, the following may be true:
69  * released == &(readlocks[0]);
70  * released->next == &(readlocks[1]);
71  * released->next->next == &(readlocks[2]);
72  * released->next->next->next == &(readlocks[3]);
73  * released->next->next->next->next == nullptr;
74  * // strictly the implementation need not have this particular linked list
75  * // ordering, though that is indeed currently the case since it maintains
76  * // a strict queue.
77  *
78  * // At some point in the future you do need to call rdunlock() as many times
79  * // as there are read requests completed.
80  * rwlock.rdunlock();
81  * rwlock.rdunlock();
82  * rwlock.rdunlock();
83  * rwlock.rdunlock();
84  * \endcode
85  *
86  * This deferred_rwlock is tricky and not easy to use. You need to manage
87  * the request objects carefully and it is easy to get into inconsistent
88  * scenarios.
89  */
91  public:
92 
93  struct request{
94  char lockclass : 2;
95  __attribute__((may_alias)) uint64_t id : 62;
96  request* next;
97  };
98  private:
99  request* head;
100  request* tail;
101  uint16_t reader_count;
102  bool writer;
103  simple_spinlock lock;
104  public:
105 
106  deferred_rwlock(): head(NULL),
107  tail(NULL), reader_count(0),writer(false) { }
108 
109  // debugging purposes only
110  inline size_t get_reader_count() {
111  __sync_synchronize();
112  return reader_count;
113  }
114 
115  // debugging purposes only
116  inline bool has_waiters() {
117  return head != NULL || tail != NULL;
118  }
119 
120  inline void insert_queue(request *I) {
121  if (head == NULL) {
122  head = I;
123  tail = I;
124  }
125  else {
126  tail->next = I;
127  tail = I;
128  }
129  }
130  inline void insert_queue_head(request *I) {
131  if (head == NULL) {
132  head = I;
133  tail = I;
134  }
135  else {
136  I->next = head;
137  head = I;
138  }
139  }
140 
141  /**
142  * Tries to acquire a high priority writelock.
143  * Returns true if the write lock is available immediately.
144  * False otherwise, in which case the request object may be returned in a
145  * released linked list via another complete lock operation.
146  */
147  inline bool writelock_priority(request *I) {
148  I->next = NULL;
149  I->lockclass = QUEUED_RW_LOCK_REQUEST_WRITE;
150  lock.lock();
151  if (reader_count == 0 && writer == false) {
152  // fastpath
153  writer = true;
154  lock.unlock();
155  return true;
156  }
157  else {
158  insert_queue_head(I);
159  lock.unlock();
160  return false;
161  }
162  }
163 
164  /**
165  * Tries to acquire a writelock.
166  * Returns true if the write lock is available immediately.
167  * False otherwise, in which case the request object may be returned in a
168  * released linked list via another complete lock operation.
169  */
170  inline bool writelock(request *I) {
171  I->next = NULL;
172  I->lockclass = QUEUED_RW_LOCK_REQUEST_WRITE;
173  lock.lock();
174  if (reader_count == 0 && writer == false) {
175  // fastpath
176  writer = true;
177  lock.unlock();
178  return true;
179  }
180  else {
181  insert_queue(I);
182  lock.unlock();
183  return false;
184  }
185  }
186 
187  /**
188  * \internal
189  * completes the write lock on the head. lock must be acquired
190  * head must be a write lock
191  */
192  inline void complete_wrlock() {
193  // ASSERT_EQ(reader_count.value, 0);
194  head = head->next;
195  if (head == NULL) tail = NULL;
196  writer = true;
197  }
198 
199  /**
200  * \internal
201  * completes the read lock on the head. lock must be acquired
202  * head must be a read lock
203  */
204  inline size_t complete_rdlock(request* &released) {
205  released = head;
206  size_t numcompleted = 1;
207  head = head->next;
208  request* readertail = released;
209  while (head != NULL && head->lockclass == QUEUED_RW_LOCK_REQUEST_READ) {
210  readertail = head;
211  head = head->next;
212  numcompleted++;
213  }
214  reader_count += numcompleted;
215  if (head == NULL) tail = NULL;
216 
217  // now released is the head to a reader list
218  // and head is the head of a writer list
219  // I want to go through the writer list and extract all the readers
220  // this essentially
221  // splits the list into two sections, one containing only readers, and
222  // one containing only writers.
223  // (reader biased locking)
224  if (head != NULL) {
225  request* latestwriter = head;
226  request* cur = head->next;
227  while (1) {
228  if (cur->lockclass == QUEUED_RW_LOCK_REQUEST_WRITE) {
229  latestwriter = cur;
230  }
231  else {
232  readertail->next = cur;
233  readertail = cur;
234  reader_count++;
235  numcompleted++;
236  latestwriter->next = cur->next;
237  }
238  if (cur == tail) break;
239  cur=cur->next;
240  }
241  }
242  return numcompleted;
243  }
244 
245  /**
246  * Released a currently acquired write lock.
247  * Returns the number of new locks acquired, and the output argument
248  * 'released' contains a linked list of locks next acquired,
249  */
250  inline size_t wrunlock(request* &released) {
251  released = NULL;
252  lock.lock();
253  writer = false;
254  size_t ret = 0;
255  if (head != NULL) {
256  if (head->lockclass == QUEUED_RW_LOCK_REQUEST_READ) {
257  ret = complete_rdlock(released);
258  if (ret == 2) assert(released->next != NULL);
259  }
260  else {
261  writer = true;
262  released = head;
263  complete_wrlock();
264  ret = 1;
265  }
266  }
267  lock.unlock();
268  return ret;
269  }
270 
271  /**
272  * Tries to acquire a readlock.
273  * Returns the number of locks now released.
274  * 'released' contains a linked list of locks next acquired,
275  */
276  inline size_t readlock(request *I, request* &released) {
277  released = NULL;
278  size_t ret = 0;
279  I->next = NULL;
280  I->lockclass = QUEUED_RW_LOCK_REQUEST_READ;
281  lock.lock();
282  // there are readers and no one is writing
283  if (head == NULL && writer == false) {
284  // fast path
285  ++reader_count;
286  lock.unlock();
287  released = I;
288  return 1;
289  }
290  else {
291  // slow path. Insert into queue
292  insert_queue(I);
293  if (head->lockclass == QUEUED_RW_LOCK_REQUEST_READ && writer == false) {
294  ret = complete_rdlock(released);
295  }
296  lock.unlock();
297  return ret;
298  }
299  }
300 
301  /**
302  * Tries to acquire a high priority readlock.
303  * Returns the number of locks now released.
304  * 'released' contains a linked list of locks next acquired,
305  */
306  inline size_t readlock_priority(request *I, request* &released) {
307  released = NULL;
308  size_t ret = 0;
309  I->next = NULL;
310  I->lockclass = QUEUED_RW_LOCK_REQUEST_READ;
311  lock.lock();
312  // there are readers and no one is writing
313  if (head == NULL && writer == false) {
314  // fast path
315  ++reader_count;
316  lock.unlock();
317  released = I;
318  return 1;
319  }
320  else {
321  // slow path. Insert into queue
322  insert_queue_head(I);
323  if (head->lockclass == QUEUED_RW_LOCK_REQUEST_READ && writer == false) {
324  ret = complete_rdlock(released);
325  }
326  lock.unlock();
327  return ret;
328  }
329  }
330 
331  /**
332  * Released a currently acquired read lock.
333  * Returns the number of new locks acquired, and the output argument
334  * 'released' contains a linked list of locks next acquired,
335  */
336  inline size_t rdunlock(request* &released) {
337  released = NULL;
338  lock.lock();
339  --reader_count;
340  if (reader_count == 0) {
341  size_t ret = 0;
342  if (head != NULL) {
343  if (head->lockclass == QUEUED_RW_LOCK_REQUEST_READ) {
344  ret = complete_rdlock(released);
345  }
346  else {
347  writer = true;
348  released = head;
349  complete_wrlock();
350  ret = 1;
351  }
352  }
353  lock.unlock();
354  return ret;
355  }
356  else {
357  lock.unlock();
358  return 0;
359  }
360  }
361 };
362 
363 }
364 #endif
void unlock() const
Releases a lock on the spinlock.
size_t readlock_priority(request *I, request *&released)
bool writelock(request *I)
void lock() const
Acquires a lock on the spinlock.
size_t wrunlock(request *&released)
size_t rdunlock(request *&released)
size_t readlock(request *I, request *&released)
size_t complete_rdlock(request *&released)
bool writelock_priority(request *I)