Libthreadar 1.5.0
ratelier_scatter.hpp
Go to the documentation of this file.
1/*********************************************************************/
2// libthreadar - is a library providing several C++ classes to work with threads
3// Copyright (C) 2014-2024 Denis Corbin
4//
5// This file is part of libthreadar
6//
7// libthreadar is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// libhtreadar is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Lesser General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with libthreadar. If not, see <http://www.gnu.org/licenses/>
19//
20//----
21// to contact the author: dar.linux@free.fr
22/*********************************************************************/
23
24#ifndef LIBTHREADAR_RATELIER_SCATTER_HPP
25#define LIBTHREADAR_RATELIER_SCATTER_HPP
26
33
34#include "config.h"
35
36 // C system headers
37extern "C"
38{
39}
40 // C++ standard headers
41#include <vector>
42#include <map>
43#include <deque>
44#include <memory>
45
46 // libthreadar headers
47#include "mutex.hpp"
48
49
50namespace libthreadar
51{
53
62
63 template <class T> class ratelier_scatter
64 {
65 public:
66 ratelier_scatter(unsigned int size, signed int flag = 0);
67 ratelier_scatter(const ratelier_scatter & ref) = delete;
68 ratelier_scatter(ratelier_scatter && ref) = default;
69 ratelier_scatter & operator = (const ratelier_scatter & ref) = delete;
70 ratelier_scatter & operator = (ratelier_scatter && ref) noexcept = default;
71 virtual ~ratelier_scatter() = default;
72
81 void scatter(std::unique_ptr<T> & one, signed int flag = 0);
82
91 std::unique_ptr<T> worker_get_one(unsigned int & slot, signed int & flag);
92
94 void reset();
95
96 private:
97
98 static const unsigned int cond_empty = 0;
99 static const unsigned int cond_full = 1;
100
101 struct slot
102 {
103 std::unique_ptr<T> obj;
104 bool empty;
105 unsigned int index;
106 signed int flag;
107
108 slot(signed int val) { empty = true; flag = val; };
109 slot(const slot & ref) { obj.reset(); empty = ref.empty; index = ref.index; flag = ref.flag; };
110 };
111
112 unsigned int next_index;
113 unsigned int lowest_index;
114 std::vector<slot> table;
115 std::map<unsigned int, unsigned int> corres;
116 std::deque<unsigned int> empty_slot;
118 };
119
120 template <class T> ratelier_scatter<T>::ratelier_scatter(unsigned int size, signed int flag):
121 table(size, slot(flag)),
122 verrou(2)
123 {
124 next_index = 0;
125 lowest_index = 0;
126
127 for(unsigned int i = 0; i < size; ++i)
128 empty_slot.push_back(i);
129 }
130
131 template <class T> void ratelier_scatter<T>::scatter(std::unique_ptr<T> & one, signed int flag)
132 {
133 unsigned int tableindex;
134
135 verrou.lock();
136 try
137 {
138 while(empty_slot.empty()) // ratelier_scatter is full
139 verrou.wait(cond_full);
140
141 tableindex = empty_slot.back();
142
143 // sanity checks
144
145 if(tableindex >= table.size())
146 throw THREADAR_BUG;
147 if( ! table[tableindex].empty)
148 throw THREADAR_BUG;
149
150 // recording the change
151
152 table[tableindex].empty = false;
153 table[tableindex].obj = std::move(one);
154 table[tableindex].index = next_index;
155 table[tableindex].flag = flag;
156
157 corres[next_index] = tableindex;
158 ++next_index;
159
160 empty_slot.pop_back();
161 if(verrou.get_waiting_thread_count(cond_empty) > 0)
162 verrou.signal(cond_empty);
163 }
164 catch(...)
165 {
166 verrou.unlock();
167 verrou.broadcast(cond_empty);
168 verrou.broadcast(cond_full);
169 throw;
170 }
171 verrou.unlock();
172 }
173
174 template <class T> std::unique_ptr<T> ratelier_scatter<T>::worker_get_one(unsigned int & slot, signed int & flag)
175 {
176 std::unique_ptr<T> ret;
177
178 verrou.lock();
179 try
180 {
181 std::map<unsigned int, unsigned int>::iterator it = corres.begin();
182 // using sequential reading provides sorted scanning
183 // of the map, looking first for the lowest index available (oldest entries)
184
185 do
186 {
187 if(it != corres.end())
188 {
189 if(it->first < lowest_index) // overflooding occured
190 ++it; // ignoring this slot
191 else
192 {
193
194 // sanity checks
195
196 if(it->second >= table.size())
197 throw THREADAR_BUG;
198 if(table[it->second].empty)
199 throw THREADAR_BUG;
200 if( ! table[it->second].obj)
201 throw THREADAR_BUG;
202
203 // recording the change
204
205 ret = std::move(table[it->second].obj);
206 slot = table[it->second].index;
207 flag = table[it->second].flag;
208 table[it->second].empty = true;
209
210 if(lowest_index != slot)
211 throw THREADAR_BUG;
212 ++lowest_index;
213
214 // reusing quicker the last block used
215 // as the back() be used first
216 empty_slot.push_back(it->second);
217 corres.erase(it); // removing the correspondance
218
219 if(verrou.get_waiting_thread_count(cond_full) > 0)
220 verrou.signal(cond_full);
221 }
222 }
223 else
224 {
225 // ratelier_scatter is empty
226
227 verrou.wait(cond_empty);
228 it = corres.begin();
229 }
230 }
231 while( ! ret);
232 }
233 catch(...)
234 {
235 verrou.unlock();
236 verrou.broadcast(cond_empty);
237 verrou.broadcast(cond_full);
238 throw;
239 }
240 verrou.unlock();
241
242 return ret;
243 }
244
245 template <class T> void ratelier_scatter<T>::reset()
246 {
247 unsigned int size = table.size();
248 next_index = 0;
249 lowest_index = 0;
250 corres.clear();
251 empty_slot.clear();
252
253 for(unsigned int i = 0; i < size; ++i)
254 {
255 table[i].obj.reset();
256 table[i].empty = true;
257 empty_slot.push_back(i);
258 }
259
260 verrou.lock();
261 verrou.broadcast(cond_empty);
262 verrou.broadcast(cond_full);
263 verrou.unlock();
264 }
265
266} // end of namespace
267
268#endif
Wrapper around the Posix pthread_cond_t object and its associated mutex.
Definition: condition.hpp:46
the class ratelier_scatter has a fixed length range of slots of arbitrary defined object type
std::unique_ptr< T > worker_get_one(unsigned int &slot, signed int &flag)
void scatter(std::unique_ptr< T > &one, signed int flag=0)
void reset()
reset the object in its prestine state
#define THREADAR_BUG
Macro used to throw an exception_bug when execution reach that statement.
Definition: exceptions.hpp:164
defines the mutex C++ class
This is the only namespace used in libthreadar and all symbols provided by libthreadar are member of ...
Definition: barrier.hpp:46