Turi Create  4.0
queued_rwlock.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef QUEUED_RWLOCK_HPP
7 #define QUEUED_RWLOCK_HPP
8 
9 
10 namespace turi {
11 
12 
13 #define QUEUED_RW_LOCK_REQUEST_READ 0
14 #define QUEUED_RW_LOCK_REQUEST_WRITE 1
15 #define QUEUED_RW_LOCK_REQUEST_NONE 2
16 
17 /**
18  * Fair rw-lock with local-only spinning implemented and
19  * modified from
20  * Scalable Reader-Writer Synchronization for Shared-Memory Multiprocessors.
21  * John M. Mellor-Crummey and Michael L. Scott
22  *
23  * \ingroup threading
24  */
26  public:
27 
28  union state_union {
29  volatile uint32_t stateu;
30  struct {
31  volatile uint16_t successor_class;
32  volatile bool blocked;
33  } state;
34  };
35 
36  struct request{
37  void* id;
38  volatile request* volatile next;
39  volatile state_union s;
40  volatile char lockclass;
41  };
42  private:
43  request* volatile tail;
44  atomic<size_t> reader_count;
45  request* volatile next_writer;
46  public:
47  queued_rw_lock(): tail(NULL), reader_count(0), next_writer(NULL) { }
48 
49  inline void writelock(request *I) {
50  I->lockclass = QUEUED_RW_LOCK_REQUEST_WRITE;
51  I->next = NULL;
52  I->s.stateu = 0;
53  I->s.state.blocked = true;
54  I->s.state.successor_class = QUEUED_RW_LOCK_REQUEST_NONE;
55  __sync_synchronize();
56  request* predecessor = __sync_lock_test_and_set(&tail, I);
57 
58  if (predecessor == NULL) {
59  next_writer = I;
60  __sync_synchronize();
61  if (reader_count.value == 0) {
62  if (__sync_lock_test_and_set(&next_writer, (request*)NULL) == I) {
63  I->s.state.blocked = false;
64  }
65  }
66  }
67  else {
68  predecessor->s.state.successor_class = QUEUED_RW_LOCK_REQUEST_WRITE;
69  __sync_synchronize();
70  predecessor->next = I;
71  }
72  // while I->blocked. continue
73  volatile state_union& is = I->s;
74  while (is.state.blocked) sched_yield();
75  assert(reader_count.value == 0);
76  }
77 
78  inline void wrunlock(request *I) {
79  __sync_synchronize();
80  if (I->next != NULL || !__sync_bool_compare_and_swap(&tail, I, (request*)NULL)) {
81  // wait
82  while(I->next == NULL) sched_yield();
83  __sync_synchronize();
84 
85  if (I->next->lockclass == QUEUED_RW_LOCK_REQUEST_READ) {
86  reader_count.inc();
87  }
88  I->next->s.state.blocked = false;
89  }
90  }
91 
92  inline void readlock(request *I) {
93  I->lockclass =QUEUED_RW_LOCK_REQUEST_READ;
94  I->next = NULL;
95  I->s.stateu = 0;
96  I->s.state.successor_class = QUEUED_RW_LOCK_REQUEST_NONE;
97  I->s.state.blocked = true;
98  __sync_synchronize();
99  request* predecessor = __sync_lock_test_and_set(&tail, I);
100  if (predecessor == NULL) {
101  reader_count.inc();
102  I->s.state.blocked = false;
103  }
104  else {
105 
106  state_union tempold, tempnew;
107  tempold.state.blocked = true;
108  tempold.state.successor_class = QUEUED_RW_LOCK_REQUEST_NONE;
109  tempnew.state.blocked = true;
110  tempnew.state.successor_class = QUEUED_RW_LOCK_REQUEST_READ;
111  __sync_synchronize();
112  if (predecessor->lockclass == QUEUED_RW_LOCK_REQUEST_WRITE ||
113  atomic_compare_and_swap(predecessor->s.stateu,
114  tempold.stateu,
115  tempnew.stateu)) {
116 
117  predecessor->next = I;
118  // wait
119  __sync_synchronize();
120  volatile state_union& is = I->s;
121  while(is.state.blocked) sched_yield();
122  }
123  else {
124  reader_count.inc();
125  predecessor->next = I;
126  __sync_synchronize();
127  I->s.state.blocked = false;
128  }
129  }
130  __sync_synchronize();
131  if (I->s.state.successor_class == QUEUED_RW_LOCK_REQUEST_READ) {
132 
133  // wait
134  while(I->next == NULL) sched_yield();
135  reader_count.inc();
136  I->next->s.state.blocked = false;
137  }
138  }
139 
140  inline void rdunlock(request *I) {
141  __sync_synchronize();
142  if (I->next != NULL || !__sync_bool_compare_and_swap(&tail, I, (request*)NULL)) {
143  while(I->next == NULL) sched_yield();
144  if (I->s.state.successor_class == QUEUED_RW_LOCK_REQUEST_WRITE) {
145  next_writer = (request*)(I->next);
146  __sync_synchronize();
147  }
148  }
149  if (reader_count.dec() == 0) {
150  __sync_synchronize();
151  request * w = __sync_lock_test_and_set(&next_writer, (request*)NULL);
152  if (w != NULL) {
153  w->s.state.blocked = false;
154  __sync_synchronize();
155  }
156  }
157  }
158 };
159 
160 }
161 #endif
bool atomic_compare_and_swap(T &a, T oldval, T newval)
Definition: atomic_ops.hpp:27