SDDC_Driver
Loading...
Searching...
No Matches
ringbuffer.h
1#pragma once
2
3#include <thread>
4#include <mutex>
5#include <condition_variable>
6#include <atomic>
7#include <array>
8#include <vector>
9
10#include "../config.h"
11
12namespace {
13 const int default_count = 64;
14 const int spin_count = 100;
15 #define ALIGN (8)
16};
17
18template<typename T, size_t max_count = default_count> class ringbuffer {
19 typedef T* TPtr;
20
21public:
22 ringbuffer():
23 read_index(0),
24 write_index(0),
25 blocks_available(0),
26 emptyCount(0),
27 fullCount(0),
28 writeCount(0),
29 stopped(false)
30 {
31 }
32
34 {
35 TracePrintln("ringbuffer", "");
36
37 Stop();
38 }
39
40 int getFullCount() const { return fullCount; }
41
42 int getEmptyCount() const { return emptyCount; }
43
44 int getWriteCount() const { return writeCount; }
45
46 void Start()
47 {
48 std::unique_lock<std::mutex> lk(mutex);
49 write_index = read_index = 0;
50 stopped = false;
51 }
52
53 void Stop()
54 {
55 std::unique_lock<std::mutex> lk(mutex);
56 read_index = 0;
57 stopped = true;
58 write_index = max_count / 2;
59 nonfullCV.notify_all();
60 nonemptyCV.notify_all();
61 }
62
63 void setBlockSize(int size)
64 {
65 TracePrintln("ringbuffer", "");
66
67 if (block_size != size)
68 {
69 block_size = size;
70
71 int aligned_block_size = (block_size + ALIGN - 1) & (~(ALIGN - 1));
72
73 DebugPrintln("ringbuffer", "New raw buffer size : %ld", max_count * aligned_block_size);
74
75 for(auto it = buffers.begin(); it < buffers.end(); it++)
76 {
77 it->resize(aligned_block_size);
78 }
79 }
80 }
81
82 T* peekWritePtr(int offset)
83 {
84 return buffers[(write_index.load() + max_count + offset) % max_count].data();
85 }
86
87 T* peekReadPtr(int offset)
88 {
89 return buffers[(read_index.load() + max_count + offset) % max_count].data();
90 }
91
92 void push(vector<T> arr)
93 {
94 WaitUntilNotFull();
95
96 std::unique_lock<std::mutex> lk(mutex);
97
98 buffers[write_index] = arr;
99
100 write_index = (write_index + 1) % max_count;
101 blocks_available++;
102
103 if (blocks_available == 1)
104 {
105 nonemptyCV.notify_all();
106 }
107
108 writeCount++;
109 }
110
111 vector<T> pop()
112 {
113 WaitUntilNotEmpty();
114
115 std::unique_lock<std::mutex> lk(mutex);
116
117 vector<T> vec = buffers[read_index];
118
119 read_index = (read_index + 1) % max_count;
120 blocks_available--;
121
122 if (blocks_available == max_count - 1)
123 {
124 nonfullCV.notify_all();
125 }
126
127 return vec;
128 }
129
130 int getBlockSize() const { return block_size; }
131
132 void WaitUntilNotEmpty()
133 {
134 if (stopped) return;
135
136 // if not empty
137 for (int i = 0; i < spin_count; i++)
138 {
139 if (blocks_available > 0)
140 return;
141 }
142
143 if(blocks_available <= 0)
144 {
145 std::unique_lock<std::mutex> lk(mutex);
146
147 emptyCount++;
148 nonemptyCV.wait(lk, [this] {
149 return blocks_available > 0;
150 });
151 }
152 }
153
154 void WaitUntilNotFull()
155 {
156 if (stopped) return;
157
158 for (int i = 0; i < spin_count; i++)
159 {
160 if (blocks_available < max_count)
161 return;
162 }
163
164 if (blocks_available >= max_count)
165 {
166 std::unique_lock<std::mutex> lk(mutex);
167 fullCount++;
168 nonfullCV.wait(lk, [this] {
169 return blocks_available < max_count;
170 });
171 }
172 }
173
174 volatile atomic<size_t> read_index;
175 volatile atomic<size_t> write_index;
176 volatile atomic<size_t> blocks_available;
177
178private:
179 int emptyCount;
180 int fullCount;
181 int writeCount;
182
183 std::mutex mutex;
184 bool stopped;
185 std::condition_variable nonemptyCV;
186 std::condition_variable nonfullCV;
187
188 int block_size = 0;
189
190 array<vector<T>, max_count> buffers;
191};
Definition ringbuffer.h:18