GCC Code Coverage Report


Directory: libs/beast2/
File: include/boost/beast2/server/workers.hpp
Date: 2025-11-13 15:50:44
Exec Total Coverage
Lines: 0 85 0.0%
Functions: 0 13 0.0%
Branches: 0 66 0.0%

Line Branch Exec Source
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/beast2
8 //
9
10 #ifndef BOOST_BEAST2_SERVER_WORKERS_HPP
11 #define BOOST_BEAST2_SERVER_WORKERS_HPP
12
13 #include <boost/beast2/detail/config.hpp>
14 #include <boost/beast2/application.hpp>
15 #include <boost/beast2/log_service.hpp>
16 #include <boost/beast2/server/fixed_array.hpp>
17 #include <boost/asio/basic_stream_socket.hpp>
18 #include <boost/asio/basic_socket_acceptor.hpp>
19 #include <boost/asio/dispatch.hpp>
20 #include <boost/asio/prepend.hpp>
21
22 namespace boost {
23 namespace beast2 {
24
25 class BOOST_SYMBOL_VISIBLE
26 workers_base
27 {
28 public:
29 BOOST_BEAST2_DECL
30 virtual ~workers_base();
31
32 virtual application& app() noexcept = 0;
33 virtual void do_idle(void* worker) = 0;
34 };
35
36 /** A set of accepting sockets and their workers.
37
38 This implements a set of listening ports as a server service. An array of
39 workers created upon construction is used to accept incoming connections
40 and handle their sessions.
41
42 @par Worker exemplar
43 @code
44 template< class Executor >
45 struct Worker
46 {
47 using executor_type = Executor;
48 using protocol_type = asio::ip::tcp;
49 using socket_type = asio::basic_stream_socket<protocol_type, Executor>;
50 using acceptor_config = beast2::acceptor_config;
51
52 asio::basic_stream_socket<protocol_type, Executor>& socket() noexcept;
53 typename protocol_type::endpoint& endpoint() noexcept;
54
55 void on_accept();
56 };
57 @endcode
58
59 @tparam Executor The type of executor used by acceptor sockets.
60 @tparam Worker The type of worker to use.
61 */
62 template<class Worker>
63 class workers
64 : public workers_base
65 {
66 public:
67 using executor_type = typename Worker::executor_type;
68 using protocol_type = typename Worker::protocol_type;
69 using acceptor_type = asio::basic_socket_acceptor<protocol_type, executor_type>;
70 using acceptor_config = typename Worker::acceptor_config;
71 using socket_type = asio::basic_stream_socket<protocol_type, executor_type>;
72
73 ~workers()
74 {
75 }
76
77 /** Constructor
78
79 @param app The @ref application which holds this part
80 @param ex The executor to use for acceptor sockets
81 @param concurrency The number of threads calling io_context::run
82 @param num_workers The number of workers to construct
83 @param args Arguments forwarded to each worker's constructor
84 */
85 template<class Executor1, class... Args>
86 workers(
87 application& app,
88 Executor1 const& ex,
89 unsigned concurrency,
90 std::size_t num_workers,
91 Args&&... args);
92
93 /** Add an acceptor
94 */
95 template<class... Args>
96 void
97 emplace(
98 acceptor_config&& config,
99 Args&&... args)
100 {
101 va_.emplace_back(
102 concurrency_,
103 std::move(config),
104 acceptor_type(ex_,
105 std::forward<Args>(args)...));
106 }
107
108 void start();
109 void stop();
110
111 private:
112 struct acceptor;
113 struct worker;
114
115 application& app() noexcept override;
116 void do_idle(void*) override;
117 void do_accepts();
118 void on_accept(acceptor*, worker*,
119 system::error_code const&);
120 void do_stop();
121
122 application& app_;
123 section sect_;
124 executor_type ex_;
125 fixed_array<worker> vw_;
126 std::vector<acceptor> va_;
127 worker* idle_ = nullptr;
128 std::size_t n_idle_ = 0;
129 unsigned concurrency_;
130 bool stop_ = false;
131 };
132
133 //------------------------------------------------
134
135 template<class Worker>
136 struct workers<Worker>::
137 acceptor
138 {
139 template<class... Args>
140 explicit
141 acceptor(
142 std::size_t concurrency,
143 acceptor_config&& config_,
144 acceptor_type&& sock_)
145 : config(std::move(config_))
146 , sock(std::move(sock_))
147 , need(concurrency)
148 {
149 }
150
151 acceptor_config config;
152 asio::basic_socket_acceptor<
153 protocol_type, executor_type> sock;
154 std::size_t need; // number of accepts we need
155 };
156
157 template<class Worker>
158 struct workers<Worker>::
159 worker
160 {
161 worker* next;
162 Worker w;
163
164 template<class... Args>
165 explicit worker(
166 worker* next_, Args&&... args)
167 : next(next_)
168 , w(std::forward<Args>(args)...)
169 {
170 }
171 };
172
173 //------------------------------------------------
174
175 template<class Worker>
176 template<class Executor1, class... Args>
177 workers<Worker>::
178 workers(
179 application& app,
180 Executor1 const& ex,
181 unsigned concurrency,
182 std::size_t num_workers,
183 Args&&... args)
184 : app_(app)
185 , sect_(use_log_service(app).get_section("workers"))
186 , ex_(executor_type(ex))
187 , vw_(num_workers)
188 , concurrency_(concurrency)
189 {
190 while(! vw_.is_full())
191 idle_ = &vw_.emplace_back(idle_, *this,
192 std::forward<Args>(args)...);
193 n_idle_ = vw_.size();
194 }
195
196 template<class Worker>
197 application&
198 workers<Worker>::
199 app() noexcept
200 {
201 return app_;
202 }
203
204 template<class Worker>
205 void
206 workers<Worker>::
207 do_idle(void* pw)
208 {
209 asio::dispatch(ex_,
210 [this, pw]()
211 {
212 // recover the `worker` pointer without using offsetof
213 worker* w = vw_.data() + (
214 reinterpret_cast<std::uintptr_t>(pw) -
215 reinterpret_cast<std::uintptr_t>(vw_.data())) /
216 sizeof(worker);
217 // push
218 w->next = idle_;
219 idle_ = w;
220 ++n_idle_;
221 do_accepts();
222 });
223 }
224 template<class Worker>
225 void
226 workers<Worker>::
227 start()
228 {
229 asio::dispatch(ex_, call_mf(&workers::do_accepts, this));
230 }
231
232 template<class Worker>
233 void
234 workers<Worker>::
235 stop()
236 {
237 asio::dispatch(ex_, call_mf(&workers::do_stop, this));
238 }
239
240 template<class Worker>
241 void
242 workers<Worker>::
243 do_accepts()
244 {
245 BOOST_ASSERT(ex_.running_in_this_thread());
246 if(stop_)
247 return;
248 if(idle_)
249 {
250 for(auto& a : va_)
251 {
252 while(a.need > 0)
253 {
254 --a.need;
255 // pop
256 auto pw = idle_;
257 idle_ = idle_->next;
258 --n_idle_;
259 a.sock.async_accept(pw->w.socket(), pw->w.endpoint(),
260 asio::prepend(call_mf(&workers::on_accept, this), &a, pw));
261 if(! idle_)
262 goto busy;
263 }
264 }
265 return;
266 }
267 busy:
268 // all workers are busy
269 // VFALCO log to warn the server admin?
270 return;
271 }
272
273 template<class Worker>
274 void
275 workers<Worker>::
276 on_accept(
277 acceptor* pa,
278 worker* pw,
279 system::error_code const& ec)
280 {
281 BOOST_ASSERT(ex_.running_in_this_thread());
282 ++pa->need;
283 if(ec.failed())
284 {
285 // push
286 pw->next = idle_;
287 idle_ = pw;
288 ++n_idle_;
289 LOG_DBG(sect_)("async_accept: {}", ec.message());
290 return do_accepts();
291 }
292 do_accepts();
293 asio::dispatch(pw->w.socket().get_executor(), asio::prepend(
294 call_mf(&Worker::on_accept, &pw->w), &pa->config));
295 }
296
297 template<class Worker>
298 void
299 workers<Worker>::
300 do_stop()
301 {
302 stop_ = true;
303
304 for(auto& a : va_)
305 {
306 system::error_code ec;
307 a.sock.cancel(ec); // error ignored
308 }
309 for(auto& w : vw_)
310 w.w.cancel();
311 }
312
313 } // beast2
314 } // boost
315
316 #endif
317