LCOV - code coverage report
Current view: top level - boost/beast2/server/workers.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 0.0 % 85 0
Test Date: 2025-11-13 15:50:43 Functions: 0.0 % 14 0

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/beast2
       8              : //
       9              : 
      10              : #ifndef BOOST_BEAST2_SERVER_WORKERS_HPP
      11              : #define BOOST_BEAST2_SERVER_WORKERS_HPP
      12              : 
      13              : #include <boost/beast2/detail/config.hpp>
      14              : #include <boost/beast2/application.hpp>
      15              : #include <boost/beast2/log_service.hpp>
      16              : #include <boost/beast2/server/fixed_array.hpp>
      17              : #include <boost/asio/basic_stream_socket.hpp>
      18              : #include <boost/asio/basic_socket_acceptor.hpp>
      19              : #include <boost/asio/dispatch.hpp>
      20              : #include <boost/asio/prepend.hpp>
      21              : 
      22              : namespace boost {
      23              : namespace beast2 {
      24              : 
      25              : class BOOST_SYMBOL_VISIBLE
      26              :     workers_base
      27              : {
      28              : public:
      29              :     BOOST_BEAST2_DECL
      30              :     virtual ~workers_base();
      31              : 
      32              :     virtual application& app() noexcept = 0;
      33              :     virtual void do_idle(void* worker) = 0;
      34              : };
      35              : 
      36              : /** A set of accepting sockets and their workers.
      37              : 
      38              :     This implements a set of listening ports as a server service. An array of
      39              :     workers created upon construction is used to accept incoming connections
      40              :     and handle their sessions.
      41              : 
      42              :     @par Worker exemplar
      43              :     @code
      44              :     template< class Executor >
      45              :     struct Worker
      46              :     {
      47              :         using executor_type = Executor;
      48              :         using protocol_type = asio::ip::tcp;
      49              :         using socket_type = asio::basic_stream_socket<protocol_type, Executor>;
      50              :         using acceptor_config = beast2::acceptor_config;
      51              : 
      52              :         asio::basic_stream_socket<protocol_type, Executor>& socket() noexcept;
      53              :         typename protocol_type::endpoint& endpoint() noexcept;
      54              : 
      55              :         void on_accept();
      56              :     };
      57              :     @endcode
      58              : 
      59              :     @tparam Executor The type of executor used by acceptor sockets.
      60              :     @tparam Worker The type of worker to use.
      61              : */
      62              : template<class Worker>
      63              : class workers
      64              :     : public workers_base
      65              : {
      66              : public:
      67              :     using executor_type = typename Worker::executor_type;
      68              :     using protocol_type = typename Worker::protocol_type;
      69              :     using acceptor_type = asio::basic_socket_acceptor<protocol_type, executor_type>;
      70              :     using acceptor_config = typename Worker::acceptor_config;
      71              :     using socket_type = asio::basic_stream_socket<protocol_type, executor_type>;
      72              : 
      73            0 :     ~workers()
      74              :     {
      75            0 :     }
      76              : 
      77              :     /** Constructor
      78              : 
      79              :         @param app The @ref application which holds this part
      80              :         @param ex The executor to use for acceptor sockets
      81              :         @param concurrency The number of threads calling io_context::run
      82              :         @param num_workers The number of workers to construct
      83              :         @param args Arguments forwarded to each worker's constructor
      84              :     */
      85              :     template<class Executor1, class... Args>
      86              :     workers(
      87              :         application& app,
      88              :         Executor1 const& ex,
      89              :         unsigned concurrency,
      90              :         std::size_t num_workers,
      91              :         Args&&... args);
      92              : 
      93              :     /** Add an acceptor
      94              :     */
      95              :     template<class... Args>
      96              :     void
      97            0 :     emplace(
      98              :         acceptor_config&& config,
      99              :         Args&&... args)
     100              :     {
     101            0 :         va_.emplace_back(
     102            0 :             concurrency_,
     103            0 :             std::move(config),
     104            0 :             acceptor_type(ex_,
     105            0 :                 std::forward<Args>(args)...));
     106            0 :     }
     107              : 
     108              :     void start();
     109              :     void stop();
     110              : 
     111              : private:
     112              :     struct acceptor;
     113              :     struct worker;
     114              : 
     115              :     application& app() noexcept override;
     116              :     void do_idle(void*) override;
     117              :     void do_accepts();
     118              :     void on_accept(acceptor*, worker*,
     119              :         system::error_code const&);
     120              :     void do_stop();
     121              : 
     122              :     application& app_;
     123              :     section sect_;
     124              :     executor_type ex_;
     125              :     fixed_array<worker> vw_;
     126              :     std::vector<acceptor> va_;
     127              :     worker* idle_ = nullptr;
     128              :     std::size_t n_idle_ = 0;
     129              :     unsigned concurrency_;
     130              :     bool stop_ = false;
     131              : };
     132              : 
     133              : //------------------------------------------------
     134              : 
     135              : template<class Worker>
     136              : struct workers<Worker>::
     137              :     acceptor
     138              : {
     139              :     template<class... Args>
     140              :     explicit
     141            0 :     acceptor(
     142              :         std::size_t concurrency,
     143              :         acceptor_config&& config_,
     144              :         acceptor_type&& sock_)
     145            0 :         : config(std::move(config_))
     146            0 :         , sock(std::move(sock_))
     147            0 :         , need(concurrency)
     148              :     {
     149            0 :     }
     150              : 
     151              :     acceptor_config config;
     152              :     asio::basic_socket_acceptor<
     153              :         protocol_type, executor_type> sock;
     154              :     std::size_t need;   // number of accepts we need
     155              : };
     156              : 
     157              : template<class Worker>
     158              : struct workers<Worker>::
     159              :     worker
     160              : {
     161              :     worker* next;
     162              :     Worker w;
     163              : 
     164              :     template<class... Args>
     165            0 :     explicit worker(
     166              :         worker* next_, Args&&... args)
     167            0 :         : next(next_)
     168            0 :         , w(std::forward<Args>(args)...)
     169              :     {
     170            0 :     }
     171              : };
     172              : 
     173              : //------------------------------------------------
     174              : 
     175              : template<class Worker>
     176              : template<class Executor1, class... Args>
     177            0 : workers<Worker>::
     178              : workers(
     179              :     application& app,
     180              :     Executor1 const& ex,
     181              :     unsigned concurrency,
     182              :     std::size_t num_workers,
     183              :     Args&&... args)
     184            0 :     : app_(app)
     185            0 :     , sect_(use_log_service(app).get_section("workers"))
     186            0 :     , ex_(executor_type(ex))
     187            0 :     , vw_(num_workers)
     188            0 :     , concurrency_(concurrency)
     189              : {
     190            0 :     while(! vw_.is_full())
     191            0 :         idle_ = &vw_.emplace_back(idle_, *this,
     192              :             std::forward<Args>(args)...);
     193            0 :     n_idle_ = vw_.size();
     194            0 : }
     195              : 
     196              : template<class Worker>
     197              : application&
     198            0 : workers<Worker>::
     199              : app() noexcept
     200              : {
     201            0 :     return app_;
     202              : }
     203              : 
     204              : template<class Worker>
     205              : void
     206            0 : workers<Worker>::
     207              : do_idle(void* pw)
     208              : {
     209            0 :     asio::dispatch(ex_,
     210            0 :         [this, pw]()
     211              :         {
     212              :             // recover the `worker` pointer without using offsetof
     213            0 :             worker* w = vw_.data() + (
     214            0 :                 reinterpret_cast<std::uintptr_t>(pw) -
     215            0 :                 reinterpret_cast<std::uintptr_t>(vw_.data())) /
     216              :                 sizeof(worker);
     217              :             // push
     218            0 :             w->next = idle_;
     219            0 :             idle_ = w;
     220            0 :             ++n_idle_;
     221            0 :             do_accepts();
     222              :         });
     223            0 : }
     224              : template<class Worker>
     225              : void
     226            0 : workers<Worker>::
     227              : start()
     228              : {
     229            0 :     asio::dispatch(ex_, call_mf(&workers::do_accepts, this));
     230            0 : }
     231              : 
     232              : template<class Worker>
     233              : void
     234            0 : workers<Worker>::
     235              : stop()
     236              : {
     237            0 :     asio::dispatch(ex_, call_mf(&workers::do_stop, this));
     238            0 : }
     239              : 
     240              : template<class Worker>
     241              : void
     242            0 : workers<Worker>::
     243              : do_accepts()
     244              : {
     245            0 :     BOOST_ASSERT(ex_.running_in_this_thread());
     246            0 :     if(stop_)
     247            0 :         return;
     248            0 :     if(idle_)
     249              :     {
     250            0 :         for(auto& a : va_)
     251              :         {
     252            0 :             while(a.need > 0)
     253              :             {
     254            0 :                 --a.need;
     255              :                 // pop
     256            0 :                 auto pw = idle_;
     257            0 :                 idle_ = idle_->next;
     258            0 :                 --n_idle_;
     259            0 :                 a.sock.async_accept(pw->w.socket(), pw->w.endpoint(),
     260            0 :                     asio::prepend(call_mf(&workers::on_accept, this), &a, pw));
     261            0 :                 if(! idle_)
     262            0 :                     goto busy;
     263              :             }
     264              :         }
     265            0 :         return;
     266              :     }
     267            0 : busy:
     268              :     // all workers are busy
     269              :     // VFALCO log to warn the server admin?
     270            0 :     return;
     271              : }
     272              : 
     273              : template<class Worker>
     274              : void
     275            0 : workers<Worker>::
     276              : on_accept(
     277              :     acceptor* pa,
     278              :     worker* pw,
     279              :     system::error_code const& ec)
     280              : {
     281            0 :     BOOST_ASSERT(ex_.running_in_this_thread());
     282            0 :     ++pa->need;
     283            0 :     if(ec.failed())
     284              :     {
     285              :         // push
     286            0 :         pw->next = idle_;
     287            0 :         idle_ = pw;
     288            0 :         ++n_idle_;
     289            0 :         LOG_DBG(sect_)("async_accept: {}", ec.message());
     290            0 :         return do_accepts();
     291              :     }
     292            0 :     do_accepts();
     293            0 :     asio::dispatch(pw->w.socket().get_executor(), asio::prepend(
     294            0 :         call_mf(&Worker::on_accept, &pw->w), &pa->config));
     295              : }
     296              : 
     297              : template<class Worker>
     298              : void
     299            0 : workers<Worker>::
     300              : do_stop()
     301              : {
     302            0 :     stop_ = true;
     303              : 
     304            0 :     for(auto& a : va_)
     305              :     {
     306            0 :         system::error_code ec;
     307            0 :         a.sock.cancel(ec); // error ignored
     308              :     }
     309            0 :     for(auto& w : vw_)
     310            0 :         w.w.cancel();
     311            0 : }
     312              : 
     313              : } // beast2
     314              : } // boost
     315              : 
     316              : #endif
        

Generated by: LCOV version 2.1