6 #ifndef TURI_PARALLEL_LOCKFREE_PUSHBACK_HPP 7 #define TURI_PARALLEL_LOCKFREE_PUSHBACK_HPP 8 #include <core/parallel/atomic.hpp> 12 namespace lockfree_push_back_impl {
14 idx_ref(): reference_count(0), idx(0) { }
15 idx_ref(
size_t idx): reference_count(0), idx(idx) { }
17 volatile int reference_count;
23 inline void inc_ref() {
25 int curref = reference_count;
26 if ((curref & MAX_REF) == 0 &&
33 inline void wait_till_no_ref() {
34 while((reference_count & (MAX_REF - 1)) != 0);
37 inline void dec_ref() {
38 __sync_fetch_and_sub(&reference_count, 1);
41 inline void flag_ref() {
42 __sync_fetch_and_xor(&reference_count, MAX_REF);
45 inline size_t inc_idx() {
46 return idx.inc_ret_last();
49 inline size_t inc_idx(
size_t n) {
50 return idx.inc_ret_last(n);
69 template <
typename Container,
typename T =
typename Container::value_type>
73 lockfree_push_back_impl::idx_ref cur;
78 container(container),cur(startidx), scalefactor(scalefactor) { }
84 void set_size(
size_t s) {
88 template <
typename Iterator>
89 size_t push_back(Iterator begin, Iterator end) {
90 size_t numel = std::distance(begin, end);
91 size_t putpos = cur.inc_idx(numel);
92 size_t endidx = putpos + numel;
95 if (endidx <= container.size()) {
96 while(putpos < endidx) {
97 container[putpos] = (*begin);
110 cur.wait_till_no_ref();
112 if (endidx > container.size()) {
113 container.resize(std::max<size_t>(endidx, container.size() * scalefactor));
115 while(putpos < endidx) {
116 container[putpos] = (*begin);
128 bool query(
size_t item, T& value) {
131 if (item < cur.idx) {
132 value = container[item];
139 T* query(
size_t item) {
142 if (item < cur.idx) {
143 ret = &(container[item]);
149 bool query_unsafe(
size_t item, T& value) {
151 if (item < cur.idx) {
152 value = container[item];
158 T* query_unsafe(
size_t item) {
160 if (item < cur.idx) {
161 ret = &(container[item]);
167 size_t push_back(
const T& t) {
168 size_t putpos = cur.inc_idx();
171 if (putpos < container.size()) {
172 container[putpos] = t;
183 cur.wait_till_no_ref();
185 if (putpos >= container.size()) {
186 container.resize(std::max<size_t>(putpos + 1, container.size() * scalefactor));
188 container[putpos] = t;
bool atomic_compare_and_swap(T &a, T oldval, T newval)
void unlock() const
Releases a lock on the mutex.
bool try_lock() const
Non-blocking attempt to acquire a lock on the mutex.