Turi Create  4.0
lockfree_push_back.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_PARALLEL_LOCKFREE_PUSHBACK_HPP
7 #define TURI_PARALLEL_LOCKFREE_PUSHBACK_HPP
8 #include <core/parallel/atomic.hpp>
9 
10 namespace turi {
11 
12 namespace lockfree_push_back_impl {
13  struct idx_ref {
14  idx_ref(): reference_count(0), idx(0) { }
15  idx_ref(size_t idx): reference_count(0), idx(idx) { }
16 
17  volatile int reference_count;
18  atomic<size_t> idx;
19  enum {
20  MAX_REF = 65536
21  };
22 
23  inline void inc_ref() {
24  while (1) {
25  int curref = reference_count;
26  if ((curref & MAX_REF) == 0 &&
27  atomic_compare_and_swap(reference_count, curref, curref + 1)) {
28  break;
29  }
30  }
31  }
32 
33  inline void wait_till_no_ref() {
34  while((reference_count & (MAX_REF - 1)) != 0);
35  }
36 
37  inline void dec_ref() {
38  __sync_fetch_and_sub(&reference_count, 1);
39  }
40 
41  inline void flag_ref() {
42  __sync_fetch_and_xor(&reference_count, MAX_REF);
43  }
44 
45  inline size_t inc_idx() {
46  return idx.inc_ret_last();
47  }
48 
49  inline size_t inc_idx(size_t n) {
50  return idx.inc_ret_last(n);
51  }
52  };
53 } // lockfree_push_back_impl
54 
55 /**
56  * Provides a lock free way to insert elements to the end
57  * of a container. Container must provide 3 functions.
58  * - T& operator[](size_t idx)
59  * - void resize(size_t len)
60  * - size_t size()
61  *
62  * resize(n) must guarantee that size() >= n.
63  * T& operator[](size_t idx) must succeed for idx < size() and must be
64  * safely executeable in parallel.
65  * size() must be safely executeable in parallel with resize().
66  *
67  * \ingroup threading
68  */
69 template <typename Container, typename T = typename Container::value_type>
71  private:
72  Container& container;
73  lockfree_push_back_impl::idx_ref cur;
74  mutex mut;
75  float scalefactor;
76  public:
77  lockfree_push_back(Container& container, size_t startidx, float scalefactor = 2):
78  container(container),cur(startidx), scalefactor(scalefactor) { }
79 
80  size_t size() const {
81  return cur.idx.value;
82  }
83 
84  void set_size(size_t s) {
85  cur.idx.value = s;
86  }
87 
88  template <typename Iterator>
89  size_t push_back(Iterator begin, Iterator end) {
90  size_t numel = std::distance(begin, end);
91  size_t putpos = cur.inc_idx(numel);
92  size_t endidx = putpos + numel;
93  while(1) {
94  cur.inc_ref();
95  if (endidx <= container.size()) {
96  while(putpos < endidx) {
97  container[putpos] = (*begin);
98  ++putpos; ++begin;
99  }
100  cur.dec_ref();
101  break;
102  }
103  else {
104  cur.dec_ref();
105 
106  if (mut.try_lock()) {
107  // ok. we need to resize
108  // flag the reference and wait till there are no more references
109  cur.flag_ref();
110  cur.wait_till_no_ref();
111  // we are exclusive here. resize
112  if (endidx > container.size()) {
113  container.resize(std::max<size_t>(endidx, container.size() * scalefactor));
114  }
115  while(putpos < endidx) {
116  container[putpos] = (*begin);
117  ++putpos; ++begin;
118  }
119  cur.flag_ref();
120  mut.unlock();
121  break;
122  }
123  }
124  }
125  return putpos;
126  }
127 
128  bool query(size_t item, T& value) {
129  bool ret = false;
130  cur.inc_ref();
131  if (item < cur.idx) {
132  value = container[item];
133  ret = true;
134  }
135  cur.dec_ref();
136  return ret;
137  }
138 
139  T* query(size_t item) {
140  T* ret = NULL;
141  cur.inc_ref();
142  if (item < cur.idx) {
143  ret = &(container[item]);
144  }
145  cur.dec_ref();
146  return ret;
147  }
148 
149  bool query_unsafe(size_t item, T& value) {
150  bool ret = false;
151  if (item < cur.idx) {
152  value = container[item];
153  ret = true;
154  }
155  return ret;
156  }
157 
158  T* query_unsafe(size_t item) {
159  T* ret = NULL;
160  if (item < cur.idx) {
161  ret = &(container[item]);
162  }
163  return ret;
164  }
165 
166 
167  size_t push_back(const T& t) {
168  size_t putpos = cur.inc_idx();
169  while(1) {
170  cur.inc_ref();
171  if (putpos < container.size()) {
172  container[putpos] = t;
173  cur.dec_ref();
174  break;
175  }
176  else {
177  cur.dec_ref();
178 
179  if (mut.try_lock()) {
180  // ok. we need to resize
181  // flag the reference and wait till there are no more references
182  cur.flag_ref();
183  cur.wait_till_no_ref();
184  // we are exclusive here. resize
185  if (putpos >= container.size()) {
186  container.resize(std::max<size_t>(putpos + 1, container.size() * scalefactor));
187  }
188  container[putpos] = t;
189  cur.flag_ref();
190  mut.unlock();
191  break;
192  }
193  }
194  }
195  return putpos;
196  }
197 };
198 
199 } // namespace turi
200 #endif
bool atomic_compare_and_swap(T &a, T oldval, T newval)
Definition: atomic_ops.hpp:27
void unlock() const
Releases a lock on the mutex.
Definition: mutex.hpp:73
bool try_lock() const
Non-blocking attempt to acquire a lock on the mutex.
Definition: mutex.hpp:82