18template<
typename T,
size_t max_count = default_count>
class ringbuffer {
35 TracePrintln(
"ringbuffer",
"");
40 int getFullCount()
const {
return fullCount; }
42 int getEmptyCount()
const {
return emptyCount; }
44 int getWriteCount()
const {
return writeCount; }
48 std::unique_lock<std::mutex> lk(mutex);
49 write_index = read_index = 0;
55 std::unique_lock<std::mutex> lk(mutex);
58 write_index = max_count / 2;
59 nonfullCV.notify_all();
60 nonemptyCV.notify_all();
63 void setBlockSize(
int size)
65 TracePrintln(
"ringbuffer",
"");
67 if (block_size != size)
71 int aligned_block_size = (block_size + ALIGN - 1) & (~(ALIGN - 1));
73 DebugPrintln(
"ringbuffer",
"New raw buffer size : %ld", max_count * aligned_block_size);
75 for(
auto it = buffers.begin(); it < buffers.end(); it++)
77 it->resize(aligned_block_size);
82 T* peekWritePtr(
int offset)
84 return buffers[(write_index.load() + max_count + offset) % max_count].data();
87 T* peekReadPtr(
int offset)
89 return buffers[(read_index.load() + max_count + offset) % max_count].data();
92 void push(vector<T> arr)
96 std::unique_lock<std::mutex> lk(mutex);
98 buffers[write_index] = arr;
100 write_index = (write_index + 1) % max_count;
103 if (blocks_available == 1)
105 nonemptyCV.notify_all();
115 std::unique_lock<std::mutex> lk(mutex);
117 vector<T> vec = buffers[read_index];
119 read_index = (read_index + 1) % max_count;
122 if (blocks_available == max_count - 1)
124 nonfullCV.notify_all();
130 int getBlockSize()
const {
return block_size; }
132 void WaitUntilNotEmpty()
137 for (
int i = 0; i < spin_count; i++)
139 if (blocks_available > 0)
143 if(blocks_available <= 0)
145 std::unique_lock<std::mutex> lk(mutex);
148 nonemptyCV.wait(lk, [
this] {
149 return blocks_available > 0;
154 void WaitUntilNotFull()
158 for (
int i = 0; i < spin_count; i++)
160 if (blocks_available < max_count)
164 if (blocks_available >= max_count)
166 std::unique_lock<std::mutex> lk(mutex);
168 nonfullCV.wait(lk, [
this] {
169 return blocks_available < max_count;
174 volatile atomic<size_t> read_index;
175 volatile atomic<size_t> write_index;
176 volatile atomic<size_t> blocks_available;
185 std::condition_variable nonemptyCV;
186 std::condition_variable nonfullCV;
190 array<vector<T>, max_count> buffers;