LCOV - code coverage report
Current view: top level - corosio/native/detail/select - select_tcp_acceptor_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 61.6 % 190 117 73
Test Date: 2026-04-06 17:42:26 Functions: 95.2 % 21 20 1

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_SELECT
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : #include <boost/corosio/detail/tcp_acceptor_service.hpp>
      20                 : 
      21                 : #include <boost/corosio/native/detail/select/select_tcp_acceptor.hpp>
      22                 : #include <boost/corosio/native/detail/select/select_tcp_service.hpp>
      23                 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
      24                 : #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
      25                 : 
      26                 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
      27                 : 
      28                 : #include <memory>
      29                 : #include <mutex>
      30                 : #include <utility>
      31                 : 
      32                 : #include <errno.h>
      33                 : #include <fcntl.h>
      34                 : #include <netinet/in.h>
      35                 : #include <sys/select.h>
      36                 : #include <sys/socket.h>
      37                 : #include <unistd.h>
      38                 : 
      39                 : namespace boost::corosio::detail {
      40                 : 
      41                 : /// State for select acceptor service.
      42                 : using select_tcp_acceptor_state =
      43                 :     reactor_service_state<select_scheduler, select_tcp_acceptor>;
      44                 : 
      45                 : /** select acceptor service implementation.
      46                 : 
      47                 :     Inherits from tcp_acceptor_service to enable runtime polymorphism.
      48                 :     Uses key_type = tcp_acceptor_service for service lookup.
      49                 : */
      50                 : class BOOST_COROSIO_DECL select_tcp_acceptor_service final
      51                 :     : public tcp_acceptor_service
      52                 : {
      53                 : public:
      54                 :     explicit select_tcp_acceptor_service(
      55                 :         capy::execution_context& ctx, select_tcp_service& tcp_svc);
      56                 :     ~select_tcp_acceptor_service() override;
      57                 : 
      58                 :     select_tcp_acceptor_service(select_tcp_acceptor_service const&) = delete;
      59                 :     select_tcp_acceptor_service&
      60                 :     operator=(select_tcp_acceptor_service const&) = delete;
      61                 : 
      62                 :     void shutdown() override;
      63                 : 
      64                 :     io_object::implementation* construct() override;
      65                 :     void destroy(io_object::implementation*) override;
      66                 :     void close(io_object::handle&) override;
      67                 :     std::error_code open_acceptor_socket(
      68                 :         tcp_acceptor::implementation& impl,
      69                 :         int family,
      70                 :         int type,
      71                 :         int protocol) override;
      72                 :     std::error_code
      73                 :     bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
      74                 :     std::error_code
      75                 :     listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
      76                 : 
      77 HIT         119 :     select_scheduler& scheduler() const noexcept
      78                 :     {
      79             119 :         return state_->sched_;
      80                 :     }
      81                 :     void post(scheduler_op* op);
      82                 :     void work_started() noexcept;
      83                 :     void work_finished() noexcept;
      84                 : 
      85                 :     /** Get the TCP service for creating peer sockets during accept. */
      86                 :     select_tcp_service* tcp_service() const noexcept;
      87                 : 
      88                 : private:
      89                 :     select_tcp_service* tcp_svc_;
      90                 :     std::unique_ptr<select_tcp_acceptor_state> state_;
      91                 : };
      92                 : 
      93                 : inline void
      94 MIS           0 : select_accept_op::cancel() noexcept
      95                 : {
      96               0 :     if (acceptor_impl_)
      97               0 :         acceptor_impl_->cancel_single_op(*this);
      98                 :     else
      99               0 :         request_cancel();
     100               0 : }
     101                 : 
     102                 : inline void
     103 HIT        1985 : select_accept_op::operator()()
     104                 : {
     105            1985 :     complete_accept_op<select_tcp_socket>(*this);
     106            1985 : }
     107                 : 
     108              65 : inline select_tcp_acceptor::select_tcp_acceptor(
     109              65 :     select_tcp_acceptor_service& svc) noexcept
     110              65 :     : reactor_acceptor(svc)
     111                 : {
     112              65 : }
     113                 : 
     114                 : inline std::coroutine_handle<>
     115            1985 : select_tcp_acceptor::accept(
     116                 :     std::coroutine_handle<> h,
     117                 :     capy::executor_ref ex,
     118                 :     std::stop_token token,
     119                 :     std::error_code* ec,
     120                 :     io_object::implementation** impl_out)
     121                 : {
     122            1985 :     auto& op = acc_;
     123            1985 :     op.reset();
     124            1985 :     op.h        = h;
     125            1985 :     op.ex       = ex;
     126            1985 :     op.ec_out   = ec;
     127            1985 :     op.impl_out = impl_out;
     128            1985 :     op.fd       = fd_;
     129            1985 :     op.start(token, this);
     130                 : 
     131            1985 :     sockaddr_storage peer_storage{};
     132            1985 :     socklen_t addrlen = sizeof(peer_storage);
     133                 :     int accepted;
     134                 :     do
     135                 :     {
     136                 :         accepted =
     137            1985 :             ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
     138                 :     }
     139            1985 :     while (accepted < 0 && errno == EINTR);
     140                 : 
     141            1985 :     if (accepted >= 0)
     142                 :     {
     143               3 :         if (accepted >= FD_SETSIZE)
     144                 :         {
     145 MIS           0 :             ::close(accepted);
     146               0 :             op.complete(EINVAL, 0);
     147               0 :             op.impl_ptr = shared_from_this();
     148               0 :             svc_.post(&op);
     149               0 :             return std::noop_coroutine();
     150                 :         }
     151                 : 
     152 HIT           3 :         int flags = ::fcntl(accepted, F_GETFL, 0);
     153               3 :         if (flags == -1)
     154                 :         {
     155 MIS           0 :             int err = errno;
     156               0 :             ::close(accepted);
     157               0 :             op.complete(err, 0);
     158               0 :             op.impl_ptr = shared_from_this();
     159               0 :             svc_.post(&op);
     160               0 :             return std::noop_coroutine();
     161                 :         }
     162                 : 
     163 HIT           3 :         if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
     164                 :         {
     165 MIS           0 :             int err = errno;
     166               0 :             ::close(accepted);
     167               0 :             op.complete(err, 0);
     168               0 :             op.impl_ptr = shared_from_this();
     169               0 :             svc_.post(&op);
     170               0 :             return std::noop_coroutine();
     171                 :         }
     172                 : 
     173 HIT           3 :         if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
     174                 :         {
     175 MIS           0 :             int err = errno;
     176               0 :             ::close(accepted);
     177               0 :             op.complete(err, 0);
     178               0 :             op.impl_ptr = shared_from_this();
     179               0 :             svc_.post(&op);
     180               0 :             return std::noop_coroutine();
     181                 :         }
     182                 : 
     183                 :         {
     184 HIT           3 :             std::lock_guard lock(desc_state_.mutex);
     185               3 :             desc_state_.read_ready = false;
     186               3 :         }
     187                 : 
     188               3 :         if (svc_.scheduler().try_consume_inline_budget())
     189                 :         {
     190 MIS           0 :             auto* socket_svc = svc_.tcp_service();
     191               0 :             if (socket_svc)
     192                 :             {
     193                 :                 auto& impl =
     194               0 :                     static_cast<select_tcp_socket&>(*socket_svc->construct());
     195               0 :                 impl.set_socket(accepted);
     196                 : 
     197               0 :                 impl.desc_state_.fd = accepted;
     198                 :                 {
     199               0 :                     std::lock_guard lock(impl.desc_state_.mutex);
     200               0 :                     impl.desc_state_.read_op    = nullptr;
     201               0 :                     impl.desc_state_.write_op   = nullptr;
     202               0 :                     impl.desc_state_.connect_op = nullptr;
     203               0 :                 }
     204               0 :                 socket_svc->scheduler().register_descriptor(
     205                 :                     accepted, &impl.desc_state_);
     206                 : 
     207               0 :                 impl.set_endpoints(
     208                 :                     local_endpoint_, from_sockaddr(peer_storage));
     209                 : 
     210               0 :                 *ec = {};
     211               0 :                 if (impl_out)
     212               0 :                     *impl_out = &impl;
     213                 :             }
     214                 :             else
     215                 :             {
     216               0 :                 ::close(accepted);
     217               0 :                 *ec = make_err(ENOENT);
     218               0 :                 if (impl_out)
     219               0 :                     *impl_out = nullptr;
     220                 :             }
     221               0 :             op.cont_op.cont.h = h;
     222               0 :             return dispatch_coro(ex, op.cont_op.cont);
     223                 :         }
     224                 : 
     225 HIT           3 :         op.accepted_fd  = accepted;
     226               3 :         op.peer_storage = peer_storage;
     227               3 :         op.complete(0, 0);
     228               3 :         op.impl_ptr = shared_from_this();
     229               3 :         svc_.post(&op);
     230               3 :         return std::noop_coroutine();
     231                 :     }
     232                 : 
     233            1982 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     234                 :     {
     235            1982 :         op.impl_ptr = shared_from_this();
     236            1982 :         svc_.work_started();
     237                 : 
     238            1982 :         std::lock_guard lock(desc_state_.mutex);
     239            1982 :         bool io_done = false;
     240            1982 :         if (desc_state_.read_ready)
     241                 :         {
     242 MIS           0 :             desc_state_.read_ready = false;
     243               0 :             op.perform_io();
     244               0 :             io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
     245               0 :             if (!io_done)
     246               0 :                 op.errn = 0;
     247                 :         }
     248                 : 
     249 HIT        1982 :         if (io_done || op.cancelled.load(std::memory_order_acquire))
     250                 :         {
     251 MIS           0 :             svc_.post(&op);
     252               0 :             svc_.work_finished();
     253                 :         }
     254                 :         else
     255                 :         {
     256 HIT        1982 :             desc_state_.read_op = &op;
     257                 :         }
     258            1982 :         return std::noop_coroutine();
     259            1982 :     }
     260                 : 
     261 MIS           0 :     op.complete(errno, 0);
     262               0 :     op.impl_ptr = shared_from_this();
     263               0 :     svc_.post(&op);
     264               0 :     return std::noop_coroutine();
     265                 : }
     266                 : 
     267                 : inline void
     268 HIT           2 : select_tcp_acceptor::cancel() noexcept
     269                 : {
     270               2 :     do_cancel();
     271               2 : }
     272                 : 
     273                 : inline void
     274             254 : select_tcp_acceptor::close_socket() noexcept
     275                 : {
     276             254 :     do_close_socket();
     277             254 : }
     278                 : 
     279             206 : inline select_tcp_acceptor_service::select_tcp_acceptor_service(
     280             206 :     capy::execution_context& ctx, select_tcp_service& tcp_svc)
     281             206 :     : tcp_svc_(&tcp_svc)
     282             206 :     , state_(
     283                 :           std::make_unique<select_tcp_acceptor_state>(
     284             206 :               ctx.use_service<select_scheduler>()))
     285                 : {
     286             206 : }
     287                 : 
     288             412 : inline select_tcp_acceptor_service::~select_tcp_acceptor_service() {}
     289                 : 
     290                 : inline void
     291             206 : select_tcp_acceptor_service::shutdown()
     292                 : {
     293             206 :     std::lock_guard lock(state_->mutex_);
     294                 : 
     295             206 :     while (auto* impl = state_->impl_list_.pop_front())
     296 MIS           0 :         impl->close_socket();
     297                 : 
     298                 :     // Don't clear impl_ptrs_ here — same rationale as
     299                 :     // select_tcp_service::shutdown(). Let ~state_ release ptrs
     300                 :     // after scheduler shutdown has drained all queued ops.
     301 HIT         206 : }
     302                 : 
     303                 : inline io_object::implementation*
     304              65 : select_tcp_acceptor_service::construct()
     305                 : {
     306              65 :     auto impl = std::make_shared<select_tcp_acceptor>(*this);
     307              65 :     auto* raw = impl.get();
     308                 : 
     309              65 :     std::lock_guard lock(state_->mutex_);
     310              65 :     state_->impl_ptrs_.emplace(raw, std::move(impl));
     311              65 :     state_->impl_list_.push_back(raw);
     312                 : 
     313              65 :     return raw;
     314              65 : }
     315                 : 
     316                 : inline void
     317              65 : select_tcp_acceptor_service::destroy(io_object::implementation* impl)
     318                 : {
     319              65 :     auto* select_impl = static_cast<select_tcp_acceptor*>(impl);
     320              65 :     select_impl->close_socket();
     321              65 :     std::lock_guard lock(state_->mutex_);
     322              65 :     state_->impl_list_.remove(select_impl);
     323              65 :     state_->impl_ptrs_.erase(select_impl);
     324              65 : }
     325                 : 
     326                 : inline void
     327             127 : select_tcp_acceptor_service::close(io_object::handle& h)
     328                 : {
     329             127 :     static_cast<select_tcp_acceptor*>(h.get())->close_socket();
     330             127 : }
     331                 : 
     332                 : inline std::error_code
     333              62 : select_tcp_acceptor_service::open_acceptor_socket(
     334                 :     tcp_acceptor::implementation& impl, int family, int type, int protocol)
     335                 : {
     336              62 :     auto* select_impl = static_cast<select_tcp_acceptor*>(&impl);
     337              62 :     select_impl->close_socket();
     338                 : 
     339              62 :     int fd = ::socket(family, type, protocol);
     340              62 :     if (fd < 0)
     341 MIS           0 :         return make_err(errno);
     342                 : 
     343 HIT          62 :     int flags = ::fcntl(fd, F_GETFL, 0);
     344              62 :     if (flags == -1)
     345                 :     {
     346 MIS           0 :         int errn = errno;
     347               0 :         ::close(fd);
     348               0 :         return make_err(errn);
     349                 :     }
     350 HIT          62 :     if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
     351                 :     {
     352 MIS           0 :         int errn = errno;
     353               0 :         ::close(fd);
     354               0 :         return make_err(errn);
     355                 :     }
     356 HIT          62 :     if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
     357                 :     {
     358 MIS           0 :         int errn = errno;
     359               0 :         ::close(fd);
     360               0 :         return make_err(errn);
     361                 :     }
     362                 : 
     363 HIT          62 :     if (fd >= FD_SETSIZE)
     364                 :     {
     365 MIS           0 :         ::close(fd);
     366               0 :         return make_err(EMFILE);
     367                 :     }
     368                 : 
     369 HIT          62 :     if (family == AF_INET6)
     370                 :     {
     371               8 :         int val = 0; // dual-stack default
     372               8 :         ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
     373                 :     }
     374                 : 
     375                 : #ifdef SO_NOSIGPIPE
     376                 :     {
     377                 :         int nosig = 1;
     378                 :         ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &nosig, sizeof(nosig));
     379                 :     }
     380                 : #endif
     381                 : 
     382              62 :     select_impl->fd_ = fd;
     383                 : 
     384                 :     // Set up descriptor state but do NOT register with reactor yet
     385                 :     // (registration happens in do_listen via reactor_acceptor base)
     386              62 :     select_impl->desc_state_.fd = fd;
     387                 :     {
     388              62 :         std::lock_guard lock(select_impl->desc_state_.mutex);
     389              62 :         select_impl->desc_state_.read_op = nullptr;
     390              62 :     }
     391                 : 
     392              62 :     return {};
     393                 : }
     394                 : 
     395                 : inline std::error_code
     396              61 : select_tcp_acceptor_service::bind_acceptor(
     397                 :     tcp_acceptor::implementation& impl, endpoint ep)
     398                 : {
     399              61 :     return static_cast<select_tcp_acceptor*>(&impl)->do_bind(ep);
     400                 : }
     401                 : 
     402                 : inline std::error_code
     403              58 : select_tcp_acceptor_service::listen_acceptor(
     404                 :     tcp_acceptor::implementation& impl, int backlog)
     405                 : {
     406              58 :     return static_cast<select_tcp_acceptor*>(&impl)->do_listen(backlog);
     407                 : }
     408                 : 
     409                 : inline void
     410               6 : select_tcp_acceptor_service::post(scheduler_op* op)
     411                 : {
     412               6 :     state_->sched_.post(op);
     413               6 : }
     414                 : 
     415                 : inline void
     416            1982 : select_tcp_acceptor_service::work_started() noexcept
     417                 : {
     418            1982 :     state_->sched_.work_started();
     419            1982 : }
     420                 : 
     421                 : inline void
     422               3 : select_tcp_acceptor_service::work_finished() noexcept
     423                 : {
     424               3 :     state_->sched_.work_finished();
     425               3 : }
     426                 : 
     427                 : inline select_tcp_service*
     428            1982 : select_tcp_acceptor_service::tcp_service() const noexcept
     429                 : {
     430            1982 :     return tcp_svc_;
     431                 : }
     432                 : 
     433                 : } // namespace boost::corosio::detail
     434                 : 
     435                 : #endif // BOOST_COROSIO_HAS_SELECT
     436                 : 
     437                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
        

Generated by: LCOV version 2.3