Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/beast2
8 : //
9 :
10 : #ifndef BOOST_BEAST2_SERVER_WORKERS_HPP
11 : #define BOOST_BEAST2_SERVER_WORKERS_HPP
12 :
13 : #include <boost/beast2/detail/config.hpp>
14 : #include <boost/beast2/application.hpp>
15 : #include <boost/beast2/log_service.hpp>
16 : #include <boost/beast2/server/fixed_array.hpp>
17 : #include <boost/asio/basic_stream_socket.hpp>
18 : #include <boost/asio/basic_socket_acceptor.hpp>
19 : #include <boost/asio/dispatch.hpp>
20 : #include <boost/asio/prepend.hpp>
21 :
22 : namespace boost {
23 : namespace beast2 {
24 :
25 : class BOOST_SYMBOL_VISIBLE
26 : workers_base
27 : {
28 : public:
29 : BOOST_BEAST2_DECL
30 : virtual ~workers_base();
31 :
32 : virtual application& app() noexcept = 0;
33 : virtual void do_idle(void* worker) = 0;
34 : };
35 :
36 : /** A set of accepting sockets and their workers.
37 :
38 : This implements a set of listening ports as a server service. An array of
39 : workers created upon construction is used to accept incoming connections
40 : and handle their sessions.
41 :
42 : @par Worker exemplar
43 : @code
44 : template< class Executor >
45 : struct Worker
46 : {
47 : using executor_type = Executor;
48 : using protocol_type = asio::ip::tcp;
49 : using socket_type = asio::basic_stream_socket<protocol_type, Executor>;
50 : using acceptor_config = beast2::acceptor_config;
51 :
52 : asio::basic_stream_socket<protocol_type, Executor>& socket() noexcept;
53 : typename protocol_type::endpoint& endpoint() noexcept;
54 :
55 : void on_accept();
56 : };
57 : @endcode
58 :
59 : @tparam Executor The type of executor used by acceptor sockets.
60 : @tparam Worker The type of worker to use.
61 : */
62 : template<class Worker>
63 : class workers
64 : : public workers_base
65 : {
66 : public:
67 : using executor_type = typename Worker::executor_type;
68 : using protocol_type = typename Worker::protocol_type;
69 : using acceptor_type = asio::basic_socket_acceptor<protocol_type, executor_type>;
70 : using acceptor_config = typename Worker::acceptor_config;
71 : using socket_type = asio::basic_stream_socket<protocol_type, executor_type>;
72 :
73 0 : ~workers()
74 : {
75 0 : }
76 :
77 : /** Constructor
78 :
79 : @param app The @ref application which holds this part
80 : @param ex The executor to use for acceptor sockets
81 : @param concurrency The number of threads calling io_context::run
82 : @param num_workers The number of workers to construct
83 : @param args Arguments forwarded to each worker's constructor
84 : */
85 : template<class Executor1, class... Args>
86 : workers(
87 : application& app,
88 : Executor1 const& ex,
89 : unsigned concurrency,
90 : std::size_t num_workers,
91 : Args&&... args);
92 :
93 : /** Add an acceptor
94 : */
95 : template<class... Args>
96 : void
97 0 : emplace(
98 : acceptor_config&& config,
99 : Args&&... args)
100 : {
101 0 : va_.emplace_back(
102 0 : concurrency_,
103 0 : std::move(config),
104 0 : acceptor_type(ex_,
105 0 : std::forward<Args>(args)...));
106 0 : }
107 :
108 : void start();
109 : void stop();
110 :
111 : private:
112 : struct acceptor;
113 : struct worker;
114 :
115 : application& app() noexcept override;
116 : void do_idle(void*) override;
117 : void do_accepts();
118 : void on_accept(acceptor*, worker*,
119 : system::error_code const&);
120 : void do_stop();
121 :
122 : application& app_;
123 : section sect_;
124 : executor_type ex_;
125 : fixed_array<worker> vw_;
126 : std::vector<acceptor> va_;
127 : worker* idle_ = nullptr;
128 : std::size_t n_idle_ = 0;
129 : unsigned concurrency_;
130 : bool stop_ = false;
131 : };
132 :
133 : //------------------------------------------------
134 :
135 : template<class Worker>
136 : struct workers<Worker>::
137 : acceptor
138 : {
139 : template<class... Args>
140 : explicit
141 0 : acceptor(
142 : std::size_t concurrency,
143 : acceptor_config&& config_,
144 : acceptor_type&& sock_)
145 0 : : config(std::move(config_))
146 0 : , sock(std::move(sock_))
147 0 : , need(concurrency)
148 : {
149 0 : }
150 :
151 : acceptor_config config;
152 : asio::basic_socket_acceptor<
153 : protocol_type, executor_type> sock;
154 : std::size_t need; // number of accepts we need
155 : };
156 :
157 : template<class Worker>
158 : struct workers<Worker>::
159 : worker
160 : {
161 : worker* next;
162 : Worker w;
163 :
164 : template<class... Args>
165 0 : explicit worker(
166 : worker* next_, Args&&... args)
167 0 : : next(next_)
168 0 : , w(std::forward<Args>(args)...)
169 : {
170 0 : }
171 : };
172 :
173 : //------------------------------------------------
174 :
175 : template<class Worker>
176 : template<class Executor1, class... Args>
177 0 : workers<Worker>::
178 : workers(
179 : application& app,
180 : Executor1 const& ex,
181 : unsigned concurrency,
182 : std::size_t num_workers,
183 : Args&&... args)
184 0 : : app_(app)
185 0 : , sect_(use_log_service(app).get_section("workers"))
186 0 : , ex_(executor_type(ex))
187 0 : , vw_(num_workers)
188 0 : , concurrency_(concurrency)
189 : {
190 0 : while(! vw_.is_full())
191 0 : idle_ = &vw_.emplace_back(idle_, *this,
192 : std::forward<Args>(args)...);
193 0 : n_idle_ = vw_.size();
194 0 : }
195 :
196 : template<class Worker>
197 : application&
198 0 : workers<Worker>::
199 : app() noexcept
200 : {
201 0 : return app_;
202 : }
203 :
204 : template<class Worker>
205 : void
206 0 : workers<Worker>::
207 : do_idle(void* pw)
208 : {
209 0 : asio::dispatch(ex_,
210 0 : [this, pw]()
211 : {
212 : // recover the `worker` pointer without using offsetof
213 0 : worker* w = vw_.data() + (
214 0 : reinterpret_cast<std::uintptr_t>(pw) -
215 0 : reinterpret_cast<std::uintptr_t>(vw_.data())) /
216 : sizeof(worker);
217 : // push
218 0 : w->next = idle_;
219 0 : idle_ = w;
220 0 : ++n_idle_;
221 0 : do_accepts();
222 : });
223 0 : }
224 : template<class Worker>
225 : void
226 0 : workers<Worker>::
227 : start()
228 : {
229 0 : asio::dispatch(ex_, call_mf(&workers::do_accepts, this));
230 0 : }
231 :
232 : template<class Worker>
233 : void
234 0 : workers<Worker>::
235 : stop()
236 : {
237 0 : asio::dispatch(ex_, call_mf(&workers::do_stop, this));
238 0 : }
239 :
240 : template<class Worker>
241 : void
242 0 : workers<Worker>::
243 : do_accepts()
244 : {
245 0 : BOOST_ASSERT(ex_.running_in_this_thread());
246 0 : if(stop_)
247 0 : return;
248 0 : if(idle_)
249 : {
250 0 : for(auto& a : va_)
251 : {
252 0 : while(a.need > 0)
253 : {
254 0 : --a.need;
255 : // pop
256 0 : auto pw = idle_;
257 0 : idle_ = idle_->next;
258 0 : --n_idle_;
259 0 : a.sock.async_accept(pw->w.socket(), pw->w.endpoint(),
260 0 : asio::prepend(call_mf(&workers::on_accept, this), &a, pw));
261 0 : if(! idle_)
262 0 : goto busy;
263 : }
264 : }
265 0 : return;
266 : }
267 0 : busy:
268 : // all workers are busy
269 : // VFALCO log to warn the server admin?
270 0 : return;
271 : }
272 :
273 : template<class Worker>
274 : void
275 0 : workers<Worker>::
276 : on_accept(
277 : acceptor* pa,
278 : worker* pw,
279 : system::error_code const& ec)
280 : {
281 0 : BOOST_ASSERT(ex_.running_in_this_thread());
282 0 : ++pa->need;
283 0 : if(ec.failed())
284 : {
285 : // push
286 0 : pw->next = idle_;
287 0 : idle_ = pw;
288 0 : ++n_idle_;
289 0 : LOG_DBG(sect_)("async_accept: {}", ec.message());
290 0 : return do_accepts();
291 : }
292 0 : do_accepts();
293 0 : asio::dispatch(pw->w.socket().get_executor(), asio::prepend(
294 0 : call_mf(&Worker::on_accept, &pw->w), &pa->config));
295 : }
296 :
297 : template<class Worker>
298 : void
299 0 : workers<Worker>::
300 : do_stop()
301 : {
302 0 : stop_ = true;
303 :
304 0 : for(auto& a : va_)
305 : {
306 0 : system::error_code ec;
307 0 : a.sock.cancel(ec); // error ignored
308 : }
309 0 : for(auto& w : vw_)
310 0 : w.w.cancel();
311 0 : }
312 :
313 : } // beast2
314 : } // boost
315 :
316 : #endif
|