Turi Create  4.0
buffer_pool.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_BUFFER_POOL_HPP
7 #define TURI_SFRAME_BUFFER_POOL_HPP
8 #include <vector>
9 #include <memory>
10 #include <stack>
11 #include <core/parallel/pthread_tools.hpp>
12 
13 namespace turi {
14 
15 /**
16  * \ingroup util
17  * Implements a buffer pool around collections of T.
18  * The buffer is lazily allocated; but only up to 2 * buffer_size entries can
19  * exist.
20  */
21 template <typename T>
22 class buffer_pool {
23  public:
24  explicit inline buffer_pool(size_t buffer_size = 128) {
25  init(buffer_size);
26  }
27 
28  /**
29  * Initializes the buffer pool to a certain capacity.
30  * Can be called in parallel
31  */
32  inline void init(size_t buffer_size) {
33  m_buffer_size = buffer_size;
34  }
35 
36  /**
37  * Returns a new buffer from the buffer pool
38  * Can be called in parallel
39  */
40  inline std::shared_ptr<T> get_new_buffer() {
41  if (m_free_buffers.empty()) {
42  std::lock_guard<turi::mutex> guard(m_buffer_lock);
43  // no free buffers. Loop through the buffer pool in search of unique buffer
44  for (size_t i = 0;i < m_buffer_pool.size(); ++i) {
45  if (m_buffer_pool[i].unique()) m_free_buffers.push(m_buffer_pool[i]);
46  }
47  }
48  if (!m_free_buffers.empty()) {
49  std::lock_guard<turi::mutex> guard(m_buffer_lock);
50  if (!m_free_buffers.empty()) {
51  auto ret = m_free_buffers.top();
52  m_free_buffers.pop();
53  return ret;
54  }
55  }
56  // allocate a new buffer
57  std::shared_ptr<T> new_buffer = std::make_shared<T>();
58  std::lock_guard<turi::mutex> guard(m_buffer_lock);
59  if (m_buffer_pool.size() < m_buffer_size) m_buffer_pool.push_back(new_buffer);
60  return new_buffer;
61  }
62 
63  /**
64  * Releases a buffer back to the pool
65  * Can be called in parallel
66  */
67  inline void release_buffer(std::shared_ptr<T>&& buffer) {
68  const size_t BUFFER_CAPACITY_LIMIT = 1024 * 1024;
69  if (buffer) {
70  buffer->clear();
71  if (buffer->capacity() >= BUFFER_CAPACITY_LIMIT)
72  buffer->shrink_to_fit();
73  if (m_buffer_pool.size() + m_free_buffers.size() < m_buffer_size) {
74  std::lock_guard<turi::mutex> guard(m_buffer_lock);
75  m_free_buffers.push(std::move(buffer));
76  }
77  buffer.reset();
78  }
79  }
80 
81  private:
82  /// Lock for m_buffer_pool
83  turi::mutex m_buffer_lock;
84  size_t m_buffer_size;
85  //
86  /**
87  * additional buffers used for returning stuff, decompression, etc.
88  * Here we are using a free-list mechanism.
89  * When m_free_buffers go empty, we loop through m_buffer_pool
90  * in search of all "unique" pointers which can then be added to the
91  * free-list. This allows buffer release to be optional. Though, actively
92  * releasing has performance benefits.
93  */
94  std::vector<std::shared_ptr<T> > m_buffer_pool;
95  std::stack<std::shared_ptr<T> > m_free_buffers;
96 };
97 }
98 #endif
void release_buffer(std::shared_ptr< T > &&buffer)
Definition: buffer_pool.hpp:67
void init(size_t buffer_size)
Definition: buffer_pool.hpp:32
std::shared_ptr< T > get_new_buffer()
Definition: buffer_pool.hpp:40