LCOV - code coverage report
Current view: top level - corosio/native/detail/epoll - epoll_tcp_acceptor_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 76.0 % 146 111 35
Test Date: 2026-04-06 17:42:26 Functions: 100.0 % 21 21

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_EPOLL
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : #include <boost/corosio/detail/tcp_acceptor_service.hpp>
      20                 : 
      21                 : #include <boost/corosio/native/detail/epoll/epoll_tcp_acceptor.hpp>
      22                 : #include <boost/corosio/native/detail/epoll/epoll_tcp_service.hpp>
      23                 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
      24                 : #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
      25                 : 
      26                 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
      27                 : 
      28                 : #include <memory>
      29                 : #include <mutex>
      30                 : #include <utility>
      31                 : 
      32                 : #include <errno.h>
      33                 : #include <netinet/in.h>
      34                 : #include <sys/epoll.h>
      35                 : #include <sys/socket.h>
      36                 : #include <unistd.h>
      37                 : 
      38                 : namespace boost::corosio::detail {
      39                 : 
      40                 : /// State for epoll acceptor service.
      41                 : using epoll_tcp_acceptor_state =
      42                 :     reactor_service_state<epoll_scheduler, epoll_tcp_acceptor>;
      43                 : 
      44                 : /** epoll acceptor service implementation.
      45                 : 
      46                 :     Inherits from tcp_acceptor_service to enable runtime polymorphism.
      47                 :     Uses key_type = tcp_acceptor_service for service lookup.
      48                 : */
      49                 : class BOOST_COROSIO_DECL epoll_tcp_acceptor_service final
      50                 :     : public tcp_acceptor_service
      51                 : {
      52                 : public:
      53                 :     explicit epoll_tcp_acceptor_service(
      54                 :         capy::execution_context& ctx, epoll_tcp_service& tcp_svc);
      55                 :     ~epoll_tcp_acceptor_service() override;
      56                 : 
      57                 :     epoll_tcp_acceptor_service(epoll_tcp_acceptor_service const&) = delete;
      58                 :     epoll_tcp_acceptor_service&
      59                 :     operator=(epoll_tcp_acceptor_service const&) = delete;
      60                 : 
      61                 :     void shutdown() override;
      62                 : 
      63                 :     io_object::implementation* construct() override;
      64                 :     void destroy(io_object::implementation*) override;
      65                 :     void close(io_object::handle&) override;
      66                 :     std::error_code open_acceptor_socket(
      67                 :         tcp_acceptor::implementation& impl,
      68                 :         int family,
      69                 :         int type,
      70                 :         int protocol) override;
      71                 :     std::error_code
      72                 :     bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
      73                 :     std::error_code
      74                 :     listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
      75                 : 
      76 HIT         155 :     epoll_scheduler& scheduler() const noexcept
      77                 :     {
      78             155 :         return state_->sched_;
      79                 :     }
      80                 :     void post(scheduler_op* op);
      81                 :     void work_started() noexcept;
      82                 :     void work_finished() noexcept;
      83                 : 
      84                 :     /** Get the TCP service for creating peer sockets during accept. */
      85                 :     epoll_tcp_service* tcp_service() const noexcept;
      86                 : 
      87                 : private:
      88                 :     epoll_tcp_service* tcp_svc_;
      89                 :     std::unique_ptr<epoll_tcp_acceptor_state> state_;
      90                 : };
      91                 : 
      92                 : inline void
      93               6 : epoll_accept_op::cancel() noexcept
      94                 : {
      95               6 :     if (acceptor_impl_)
      96               6 :         acceptor_impl_->cancel_single_op(*this);
      97                 :     else
      98 MIS           0 :         request_cancel();
      99 HIT           6 : }
     100                 : 
     101                 : inline void
     102            2408 : epoll_accept_op::operator()()
     103                 : {
     104            2408 :     complete_accept_op<epoll_tcp_socket>(*this);
     105            2408 : }
     106                 : 
     107              84 : inline epoll_tcp_acceptor::epoll_tcp_acceptor(
     108              84 :     epoll_tcp_acceptor_service& svc) noexcept
     109              84 :     : reactor_acceptor(svc)
     110                 : {
     111              84 : }
     112                 : 
     113                 : inline std::coroutine_handle<>
     114            2408 : epoll_tcp_acceptor::accept(
     115                 :     std::coroutine_handle<> h,
     116                 :     capy::executor_ref ex,
     117                 :     std::stop_token token,
     118                 :     std::error_code* ec,
     119                 :     io_object::implementation** impl_out)
     120                 : {
     121            2408 :     auto& op = acc_;
     122            2408 :     op.reset();
     123            2408 :     op.h        = h;
     124            2408 :     op.ex       = ex;
     125            2408 :     op.ec_out   = ec;
     126            2408 :     op.impl_out = impl_out;
     127            2408 :     op.fd       = fd_;
     128            2408 :     op.start(token, this);
     129                 : 
     130            2408 :     sockaddr_storage peer_storage{};
     131            2408 :     socklen_t addrlen = sizeof(peer_storage);
     132                 :     int accepted;
     133                 :     do
     134                 :     {
     135            2408 :         accepted = ::accept4(
     136                 :             fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
     137                 :             SOCK_NONBLOCK | SOCK_CLOEXEC);
     138                 :     }
     139            2408 :     while (accepted < 0 && errno == EINTR);
     140                 : 
     141            2408 :     if (accepted >= 0)
     142                 :     {
     143                 :         {
     144               3 :             std::lock_guard lock(desc_state_.mutex);
     145               3 :             desc_state_.read_ready = false;
     146               3 :         }
     147                 : 
     148               3 :         if (svc_.scheduler().try_consume_inline_budget())
     149                 :         {
     150 MIS           0 :             auto* socket_svc = svc_.tcp_service();
     151               0 :             if (socket_svc)
     152                 :             {
     153                 :                 auto& impl =
     154               0 :                     static_cast<epoll_tcp_socket&>(*socket_svc->construct());
     155               0 :                 impl.set_socket(accepted);
     156                 : 
     157               0 :                 impl.desc_state_.fd = accepted;
     158                 :                 {
     159               0 :                     std::lock_guard lock(impl.desc_state_.mutex);
     160               0 :                     impl.desc_state_.read_op    = nullptr;
     161               0 :                     impl.desc_state_.write_op   = nullptr;
     162               0 :                     impl.desc_state_.connect_op = nullptr;
     163               0 :                 }
     164               0 :                 socket_svc->scheduler().register_descriptor(
     165                 :                     accepted, &impl.desc_state_);
     166                 : 
     167               0 :                 impl.set_endpoints(
     168                 :                     local_endpoint_, from_sockaddr(peer_storage));
     169                 : 
     170               0 :                 *ec = {};
     171               0 :                 if (impl_out)
     172               0 :                     *impl_out = &impl;
     173                 :             }
     174                 :             else
     175                 :             {
     176               0 :                 ::close(accepted);
     177               0 :                 *ec = make_err(ENOENT);
     178               0 :                 if (impl_out)
     179               0 :                     *impl_out = nullptr;
     180                 :             }
     181               0 :             op.cont_op.cont.h = h;
     182               0 :             return dispatch_coro(ex, op.cont_op.cont);
     183                 :         }
     184                 : 
     185 HIT           3 :         op.accepted_fd  = accepted;
     186               3 :         op.peer_storage = peer_storage;
     187               3 :         op.complete(0, 0);
     188               3 :         op.impl_ptr = shared_from_this();
     189               3 :         svc_.post(&op);
     190               3 :         return std::noop_coroutine();
     191                 :     }
     192                 : 
     193            2405 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     194                 :     {
     195            2405 :         op.impl_ptr = shared_from_this();
     196            2405 :         svc_.work_started();
     197                 : 
     198            2405 :         std::lock_guard lock(desc_state_.mutex);
     199            2405 :         bool io_done = false;
     200            2405 :         if (desc_state_.read_ready)
     201                 :         {
     202 MIS           0 :             desc_state_.read_ready = false;
     203               0 :             op.perform_io();
     204               0 :             io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
     205               0 :             if (!io_done)
     206               0 :                 op.errn = 0;
     207                 :         }
     208                 : 
     209 HIT        2405 :         if (io_done || op.cancelled.load(std::memory_order_acquire))
     210                 :         {
     211 MIS           0 :             svc_.post(&op);
     212               0 :             svc_.work_finished();
     213                 :         }
     214                 :         else
     215                 :         {
     216 HIT        2405 :             desc_state_.read_op = &op;
     217                 :         }
     218            2405 :         return std::noop_coroutine();
     219            2405 :     }
     220                 : 
     221 MIS           0 :     op.complete(errno, 0);
     222               0 :     op.impl_ptr = shared_from_this();
     223               0 :     svc_.post(&op);
     224                 :     // completion is always posted to scheduler queue, never inline.
     225               0 :     return std::noop_coroutine();
     226                 : }
     227                 : 
     228                 : inline void
     229 HIT           2 : epoll_tcp_acceptor::cancel() noexcept
     230                 : {
     231               2 :     do_cancel();
     232               2 : }
     233                 : 
     234                 : inline void
     235             332 : epoll_tcp_acceptor::close_socket() noexcept
     236                 : {
     237             332 :     do_close_socket();
     238             332 : }
     239                 : 
     240             333 : inline epoll_tcp_acceptor_service::epoll_tcp_acceptor_service(
     241             333 :     capy::execution_context& ctx, epoll_tcp_service& tcp_svc)
     242             333 :     : tcp_svc_(&tcp_svc)
     243             333 :     , state_(
     244                 :           std::make_unique<epoll_tcp_acceptor_state>(
     245             333 :               ctx.use_service<epoll_scheduler>()))
     246                 : {
     247             333 : }
     248                 : 
     249             666 : inline epoll_tcp_acceptor_service::~epoll_tcp_acceptor_service() {}
     250                 : 
     251                 : inline void
     252             333 : epoll_tcp_acceptor_service::shutdown()
     253                 : {
     254             333 :     std::lock_guard lock(state_->mutex_);
     255                 : 
     256             333 :     while (auto* impl = state_->impl_list_.pop_front())
     257 MIS           0 :         impl->close_socket();
     258                 : 
     259                 :     // Don't clear impl_ptrs_ here — same rationale as
     260                 :     // epoll_tcp_service::shutdown(). Let ~state_ release ptrs
     261                 :     // after scheduler shutdown has drained all queued ops.
     262 HIT         333 : }
     263                 : 
     264                 : inline io_object::implementation*
     265              84 : epoll_tcp_acceptor_service::construct()
     266                 : {
     267              84 :     auto impl = std::make_shared<epoll_tcp_acceptor>(*this);
     268              84 :     auto* raw = impl.get();
     269                 : 
     270              84 :     std::lock_guard lock(state_->mutex_);
     271              84 :     state_->impl_ptrs_.emplace(raw, std::move(impl));
     272              84 :     state_->impl_list_.push_back(raw);
     273                 : 
     274              84 :     return raw;
     275              84 : }
     276                 : 
     277                 : inline void
     278              84 : epoll_tcp_acceptor_service::destroy(io_object::implementation* impl)
     279                 : {
     280              84 :     auto* epoll_impl = static_cast<epoll_tcp_acceptor*>(impl);
     281              84 :     epoll_impl->close_socket();
     282              84 :     std::lock_guard lock(state_->mutex_);
     283              84 :     state_->impl_list_.remove(epoll_impl);
     284              84 :     state_->impl_ptrs_.erase(epoll_impl);
     285              84 : }
     286                 : 
     287                 : inline void
     288             166 : epoll_tcp_acceptor_service::close(io_object::handle& h)
     289                 : {
     290             166 :     static_cast<epoll_tcp_acceptor*>(h.get())->close_socket();
     291             166 : }
     292                 : 
     293                 : inline std::error_code
     294              82 : epoll_tcp_acceptor_service::open_acceptor_socket(
     295                 :     tcp_acceptor::implementation& impl, int family, int type, int protocol)
     296                 : {
     297              82 :     auto* epoll_impl = static_cast<epoll_tcp_acceptor*>(&impl);
     298              82 :     epoll_impl->close_socket();
     299                 : 
     300              82 :     int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
     301              82 :     if (fd < 0)
     302 MIS           0 :         return make_err(errno);
     303                 : 
     304 HIT          82 :     if (family == AF_INET6)
     305                 :     {
     306               8 :         int val = 0; // dual-stack default
     307               8 :         ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
     308                 :     }
     309                 : 
     310              82 :     epoll_impl->fd_ = fd;
     311                 : 
     312                 :     // Set up descriptor state but do NOT register with epoll yet
     313              82 :     epoll_impl->desc_state_.fd = fd;
     314                 :     {
     315              82 :         std::lock_guard lock(epoll_impl->desc_state_.mutex);
     316              82 :         epoll_impl->desc_state_.read_op = nullptr;
     317              82 :     }
     318                 : 
     319              82 :     return {};
     320                 : }
     321                 : 
     322                 : inline std::error_code
     323              81 : epoll_tcp_acceptor_service::bind_acceptor(
     324                 :     tcp_acceptor::implementation& impl, endpoint ep)
     325                 : {
     326              81 :     return static_cast<epoll_tcp_acceptor*>(&impl)->do_bind(ep);
     327                 : }
     328                 : 
     329                 : inline std::error_code
     330              76 : epoll_tcp_acceptor_service::listen_acceptor(
     331                 :     tcp_acceptor::implementation& impl, int backlog)
     332                 : {
     333              76 :     return static_cast<epoll_tcp_acceptor*>(&impl)->do_listen(backlog);
     334                 : }
     335                 : 
     336                 : inline void
     337              12 : epoll_tcp_acceptor_service::post(scheduler_op* op)
     338                 : {
     339              12 :     state_->sched_.post(op);
     340              12 : }
     341                 : 
     342                 : inline void
     343            2405 : epoll_tcp_acceptor_service::work_started() noexcept
     344                 : {
     345            2405 :     state_->sched_.work_started();
     346            2405 : }
     347                 : 
     348                 : inline void
     349               9 : epoll_tcp_acceptor_service::work_finished() noexcept
     350                 : {
     351               9 :     state_->sched_.work_finished();
     352               9 : }
     353                 : 
     354                 : inline epoll_tcp_service*
     355            2399 : epoll_tcp_acceptor_service::tcp_service() const noexcept
     356                 : {
     357            2399 :     return tcp_svc_;
     358                 : }
     359                 : 
     360                 : } // namespace boost::corosio::detail
     361                 : 
     362                 : #endif // BOOST_COROSIO_HAS_EPOLL
     363                 : 
     364                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
        

Generated by: LCOV version 2.3