Turi Create  4.0
safe_circular_char_buffer.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef SAFE_CIRCULAR_CHAR_BUFFER_HPP
7 #define SAFE_CIRCULAR_CHAR_BUFFER_HPP
8 #include <core/parallel/pthread_tools.hpp>
9 #include <core/logging/assertions.hpp>
10 
11 namespace turi {
12 
13 /**
14 \ingroup util
15 A non-resizing circular char buffer
16 with thread-safe write operations and a single reader
17 */
19  public:
20  safe_circular_char_buffer(std::streamsize bufsize = 10485760 /*10 MB */);
21 
23 
24  /**
25  * Stops the buffer and signals any blocking calls.
26  */
27  void stop_reader();
28 
29 
30  /**
31  * Determine if the buffer is empty
32  */
33  bool empty() const;
34 
35  inline bool is_done() const {
36  return done;
37  }
38 
39  inline bool reader_is_blocked() const {
40  return iswaiting;
41  }
42  /**
43  * Get the total contents currently stored in the buffer.
44  */
45  std::streamsize size() const;
46 
47  /**
48  * Get the amount of free space reamining in the buffer
49  */
50  std::streamsize free_space() const;
51 
52  /** Gets the size of the buffer.
53  \note: The useable space is reserved_size() - 1 */
54  inline std::streamsize reserved_size() const {
55  return bufsize - 1;
56  }
57 
58 
59  /**
60  * Returns 0 if the write doesn't fit
61  *
62  * This function acquires the critical section
63  * to perform the write
64  */
65  std::streamsize write(const char* c, std::streamsize clen);
66 
67  /**
68  * Returns 0 if the write doesn't fit
69  *
70  * This does the same as write(), but does not acquire the critical
71  * section. The caller should ensure safety
72  */
73  std::streamsize write_unsafe(const char* c, std::streamsize clen);
74 
75 
76  /**
77  * Returns a pointer (through s) and a length of the read. This
78  * pointer is a direct pointer into the internal buffer of this
79  * datastructure. The pointer is valid as long as no other
80  * operations are performed on this structure. The length of the
81  * introspective_read may be less than the number of bytes
82  * requested. Multiple calls to introspective_read may be necessary
83  * to read all data in the buffer. If the function returns 0, the
84  * buffer is empty.
85  *
86  * No locks are acquired on this call.
87  */
88  std::streamsize introspective_read(char* &s, std::streamsize clen);
89 
90 
91  /**
92  * Same as introspective read. But blocks until there is something to read
93  * This function does not acquire a critical section.
94  */
95  std::streamsize blocking_introspective_read(char* &s,
96  std::streamsize clen);
97 
98 
99  void advance_head(const std::streamsize advance_len);
100 
101 
102  /** When begin critical section returns, it is
103  guaranteed that no other writer will be touching
104  the tail of the queue */
105  inline void begin_critical_section() {
106  mut.lock();
107  }
108 
109  /** Releases a critical section acquired by begin_critical_section */
110  inline void end_critical_section() {
111  mut.unlock();
112  }
113 
114  /** Releases a critical section acquired by begin_critical_section,
115  and signals the reader to begin reading if the reader is blocked */
117  cond.signal();
118  mut.unlock();
119  }
120 
121 
122  private:
123  char* buffer;
124  std::streamsize bufsize; // current size of the buffer
125 
126  /**
127  * points to the head of the queue. Reader reads from here
128  */
129  std::streamsize head;
130 
131  /**
132  * points to one past the end of the queue. writer writes to
133  * here. if tail == head, buffer must be empty
134  */
135  std::streamsize tail;
136 
137  mutex mut;
138  conditional cond;
139 
140  volatile bool done; // Once
141  volatile bool iswaiting;
142 };
143 
144 }
145 
146 #endif
std::streamsize introspective_read(char *&s, std::streamsize clen)
std::streamsize write_unsafe(const char *c, std::streamsize clen)
std::streamsize size() const
std::streamsize write(const char *c, std::streamsize clen)
void lock() const
Acquires a lock on the mutex.
Definition: mutex.hpp:64
std::streamsize free_space() const
void unlock() const
Releases a lock on the mutex.
Definition: mutex.hpp:73
std::streamsize blocking_introspective_read(char *&s, std::streamsize clen)
void signal() const
Signals one waiting thread to wake up.