1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/corosio/detail/tcp_service.hpp>
18  
#include <boost/corosio/detail/tcp_service.hpp>
19  

19  

20  
#include <boost/corosio/native/detail/epoll/epoll_tcp_socket.hpp>
20  
#include <boost/corosio/native/detail/epoll/epoll_tcp_socket.hpp>
21  
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
21  
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
22  
#include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
22  
#include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
23  

23  

24  
#include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
24  
#include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
25  

25  

26  
#include <coroutine>
26  
#include <coroutine>
27  

27  

28  
#include <errno.h>
28  
#include <errno.h>
29  
#include <netinet/in.h>
29  
#include <netinet/in.h>
30  
#include <netinet/tcp.h>
30  
#include <netinet/tcp.h>
31  
#include <sys/epoll.h>
31  
#include <sys/epoll.h>
32  
#include <sys/socket.h>
32  
#include <sys/socket.h>
33  
#include <unistd.h>
33  
#include <unistd.h>
34  

34  

35  
/*
35  
/*
36  
    epoll Socket Implementation
36  
    epoll Socket Implementation
37  
    ===========================
37  
    ===========================
38  

38  

39  
    Each I/O operation follows the same pattern:
39  
    Each I/O operation follows the same pattern:
40  
      1. Try the syscall immediately (non-blocking socket)
40  
      1. Try the syscall immediately (non-blocking socket)
41  
      2. If it succeeds or fails with a real error, post to completion queue
41  
      2. If it succeeds or fails with a real error, post to completion queue
42  
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
42  
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
43  

43  

44  
    This "try first" approach avoids unnecessary epoll round-trips for
44  
    This "try first" approach avoids unnecessary epoll round-trips for
45  
    operations that can complete immediately (common for small reads/writes
45  
    operations that can complete immediately (common for small reads/writes
46  
    on fast local connections).
46  
    on fast local connections).
47  

47  

48  
    One-Shot Registration
48  
    One-Shot Registration
49  
    ---------------------
49  
    ---------------------
50  
    We use one-shot epoll registration: each operation registers, waits for
50  
    We use one-shot epoll registration: each operation registers, waits for
51  
    one event, then unregisters. This simplifies the state machine since we
51  
    one event, then unregisters. This simplifies the state machine since we
52  
    don't need to track whether an fd is currently registered or handle
52  
    don't need to track whether an fd is currently registered or handle
53  
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
53  
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
54  
    simplicity is worth it.
54  
    simplicity is worth it.
55  

55  

56  
    Cancellation
56  
    Cancellation
57  
    ------------
57  
    ------------
58  
    See op.hpp for the completion/cancellation race handling via the
58  
    See op.hpp for the completion/cancellation race handling via the
59  
    `registered` atomic. cancel() must complete pending operations (post
59  
    `registered` atomic. cancel() must complete pending operations (post
60  
    them with cancelled flag) so coroutines waiting on them can resume.
60  
    them with cancelled flag) so coroutines waiting on them can resume.
61  
    close_socket() calls cancel() first to ensure this.
61  
    close_socket() calls cancel() first to ensure this.
62  

62  

63  
    Impl Lifetime with shared_ptr
63  
    Impl Lifetime with shared_ptr
64  
    -----------------------------
64  
    -----------------------------
65  
    Socket impls use enable_shared_from_this. The service owns impls via
65  
    Socket impls use enable_shared_from_this. The service owns impls via
66  
    shared_ptr maps (impl_ptrs_) keyed by raw pointer for O(1) lookup and
66  
    shared_ptr maps (impl_ptrs_) keyed by raw pointer for O(1) lookup and
67  
    removal. When a user calls close(), we call cancel() which posts pending
67  
    removal. When a user calls close(), we call cancel() which posts pending
68  
    ops to the scheduler.
68  
    ops to the scheduler.
69  

69  

70  
    CRITICAL: The posted ops must keep the impl alive until they complete.
70  
    CRITICAL: The posted ops must keep the impl alive until they complete.
71  
    Otherwise the scheduler would process a freed op (use-after-free). The
71  
    Otherwise the scheduler would process a freed op (use-after-free). The
72  
    cancel() method captures shared_from_this() into op.impl_ptr before
72  
    cancel() method captures shared_from_this() into op.impl_ptr before
73  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
73  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
74  
    to be destroyed if no other references exist.
74  
    to be destroyed if no other references exist.
75  

75  

76  
    Service Ownership
76  
    Service Ownership
77  
    -----------------
77  
    -----------------
78  
    epoll_tcp_service owns all socket impls. destroy_impl() removes the
78  
    epoll_tcp_service owns all socket impls. destroy_impl() removes the
79  
    shared_ptr from the map, but the impl may survive if ops still hold
79  
    shared_ptr from the map, but the impl may survive if ops still hold
80  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
80  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
81  
    in-flight ops will complete and release their refs.
81  
    in-flight ops will complete and release their refs.
82  
*/
82  
*/
83  

83  

84  
namespace boost::corosio::detail {
84  
namespace boost::corosio::detail {
85  

85  

86  
/** epoll TCP service implementation.
86  
/** epoll TCP service implementation.
87  

87  

88  
    Inherits from tcp_service to enable runtime polymorphism.
88  
    Inherits from tcp_service to enable runtime polymorphism.
89  
    Uses key_type = tcp_service for service lookup.
89  
    Uses key_type = tcp_service for service lookup.
90  
*/
90  
*/
91  
class BOOST_COROSIO_DECL epoll_tcp_service final
91  
class BOOST_COROSIO_DECL epoll_tcp_service final
92  
    : public reactor_socket_service<
92  
    : public reactor_socket_service<
93  
          epoll_tcp_service,
93  
          epoll_tcp_service,
94  
          tcp_service,
94  
          tcp_service,
95  
          epoll_scheduler,
95  
          epoll_scheduler,
96  
          epoll_tcp_socket>
96  
          epoll_tcp_socket>
97  
{
97  
{
98  
public:
98  
public:
99  
    explicit epoll_tcp_service(capy::execution_context& ctx)
99  
    explicit epoll_tcp_service(capy::execution_context& ctx)
100  
        : reactor_socket_service(ctx)
100  
        : reactor_socket_service(ctx)
101  
    {
101  
    {
102  
    }
102  
    }
103  

103  

104  
    std::error_code open_socket(
104  
    std::error_code open_socket(
105  
        tcp_socket::implementation& impl,
105  
        tcp_socket::implementation& impl,
106  
        int family,
106  
        int family,
107  
        int type,
107  
        int type,
108  
        int protocol) override;
108  
        int protocol) override;
 
109 +

 
110 +
    std::error_code
 
111 +
    bind_socket(tcp_socket::implementation& impl, endpoint ep) override;
109  
};
112  
};
110  

113  

111  
inline void
114  
inline void
112  
epoll_connect_op::cancel() noexcept
115  
epoll_connect_op::cancel() noexcept
113  
{
116  
{
114  
    if (socket_impl_)
117  
    if (socket_impl_)
115  
        socket_impl_->cancel_single_op(*this);
118  
        socket_impl_->cancel_single_op(*this);
116  
    else
119  
    else
117  
        request_cancel();
120  
        request_cancel();
118  
}
121  
}
119  

122  

120  
inline void
123  
inline void
121  
epoll_read_op::cancel() noexcept
124  
epoll_read_op::cancel() noexcept
122  
{
125  
{
123  
    if (socket_impl_)
126  
    if (socket_impl_)
124  
        socket_impl_->cancel_single_op(*this);
127  
        socket_impl_->cancel_single_op(*this);
125  
    else
128  
    else
126  
        request_cancel();
129  
        request_cancel();
127  
}
130  
}
128  

131  

129  
inline void
132  
inline void
130  
epoll_write_op::cancel() noexcept
133  
epoll_write_op::cancel() noexcept
131  
{
134  
{
132  
    if (socket_impl_)
135  
    if (socket_impl_)
133  
        socket_impl_->cancel_single_op(*this);
136  
        socket_impl_->cancel_single_op(*this);
134  
    else
137  
    else
135  
        request_cancel();
138  
        request_cancel();
136  
}
139  
}
137  

140  

138  
inline void
141  
inline void
139  
epoll_op::operator()()
142  
epoll_op::operator()()
140  
{
143  
{
141  
    complete_io_op(*this);
144  
    complete_io_op(*this);
142  
}
145  
}
143  

146  

144  
inline void
147  
inline void
145  
epoll_connect_op::operator()()
148  
epoll_connect_op::operator()()
146  
{
149  
{
147  
    complete_connect_op(*this);
150  
    complete_connect_op(*this);
148  
}
151  
}
149  

152  

150  
inline epoll_tcp_socket::epoll_tcp_socket(epoll_tcp_service& svc) noexcept
153  
inline epoll_tcp_socket::epoll_tcp_socket(epoll_tcp_service& svc) noexcept
151  
    : reactor_stream_socket(svc)
154  
    : reactor_stream_socket(svc)
152  
{
155  
{
153  
}
156  
}
154  

157  

155  
inline epoll_tcp_socket::~epoll_tcp_socket() = default;
158  
inline epoll_tcp_socket::~epoll_tcp_socket() = default;
156  

159  

157  
inline std::coroutine_handle<>
160  
inline std::coroutine_handle<>
158  
epoll_tcp_socket::connect(
161  
epoll_tcp_socket::connect(
159  
    std::coroutine_handle<> h,
162  
    std::coroutine_handle<> h,
160  
    capy::executor_ref ex,
163  
    capy::executor_ref ex,
161  
    endpoint ep,
164  
    endpoint ep,
162  
    std::stop_token token,
165  
    std::stop_token token,
163  
    std::error_code* ec)
166  
    std::error_code* ec)
164  
{
167  
{
165  
    return do_connect(h, ex, ep, token, ec);
168  
    return do_connect(h, ex, ep, token, ec);
166  
}
169  
}
167  

170  

168  
inline std::coroutine_handle<>
171  
inline std::coroutine_handle<>
169  
epoll_tcp_socket::read_some(
172  
epoll_tcp_socket::read_some(
170  
    std::coroutine_handle<> h,
173  
    std::coroutine_handle<> h,
171  
    capy::executor_ref ex,
174  
    capy::executor_ref ex,
172  
    buffer_param param,
175  
    buffer_param param,
173  
    std::stop_token token,
176  
    std::stop_token token,
174  
    std::error_code* ec,
177  
    std::error_code* ec,
175  
    std::size_t* bytes_out)
178  
    std::size_t* bytes_out)
176  
{
179  
{
177  
    return do_read_some(h, ex, param, token, ec, bytes_out);
180  
    return do_read_some(h, ex, param, token, ec, bytes_out);
178  
}
181  
}
179  

182  

180  
inline std::coroutine_handle<>
183  
inline std::coroutine_handle<>
181  
epoll_tcp_socket::write_some(
184  
epoll_tcp_socket::write_some(
182  
    std::coroutine_handle<> h,
185  
    std::coroutine_handle<> h,
183  
    capy::executor_ref ex,
186  
    capy::executor_ref ex,
184  
    buffer_param param,
187  
    buffer_param param,
185  
    std::stop_token token,
188  
    std::stop_token token,
186  
    std::error_code* ec,
189  
    std::error_code* ec,
187  
    std::size_t* bytes_out)
190  
    std::size_t* bytes_out)
188  
{
191  
{
189  
    return do_write_some(h, ex, param, token, ec, bytes_out);
192  
    return do_write_some(h, ex, param, token, ec, bytes_out);
190  
}
193  
}
191  

194  

192  
inline void
195  
inline void
193  
epoll_tcp_socket::cancel() noexcept
196  
epoll_tcp_socket::cancel() noexcept
194  
{
197  
{
195  
    do_cancel();
198  
    do_cancel();
196  
}
199  
}
197  

200  

198  
inline void
201  
inline void
199  
epoll_tcp_socket::close_socket() noexcept
202  
epoll_tcp_socket::close_socket() noexcept
200  
{
203  
{
201  
    do_close_socket();
204  
    do_close_socket();
202  
}
205  
}
203  

206  

204  
inline std::error_code
207  
inline std::error_code
205  
epoll_tcp_service::open_socket(
208  
epoll_tcp_service::open_socket(
206  
    tcp_socket::implementation& impl, int family, int type, int protocol)
209  
    tcp_socket::implementation& impl, int family, int type, int protocol)
207  
{
210  
{
208  
    auto* epoll_impl = static_cast<epoll_tcp_socket*>(&impl);
211  
    auto* epoll_impl = static_cast<epoll_tcp_socket*>(&impl);
209  
    epoll_impl->close_socket();
212  
    epoll_impl->close_socket();
210  

213  

211  
    int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
214  
    int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
212  
    if (fd < 0)
215  
    if (fd < 0)
213  
        return make_err(errno);
216  
        return make_err(errno);
214  

217  

215  
    if (family == AF_INET6)
218  
    if (family == AF_INET6)
216  
    {
219  
    {
217  
        int one = 1;
220  
        int one = 1;
218  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
221  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
219  
    }
222  
    }
220  

223  

221  
    epoll_impl->fd_ = fd;
224  
    epoll_impl->fd_ = fd;
222  

225  

223  
    // Register fd with epoll (edge-triggered mode)
226  
    // Register fd with epoll (edge-triggered mode)
224  
    epoll_impl->desc_state_.fd = fd;
227  
    epoll_impl->desc_state_.fd = fd;
225  
    {
228  
    {
226  
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
229  
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
227  
        epoll_impl->desc_state_.read_op    = nullptr;
230  
        epoll_impl->desc_state_.read_op    = nullptr;
228  
        epoll_impl->desc_state_.write_op   = nullptr;
231  
        epoll_impl->desc_state_.write_op   = nullptr;
229  
        epoll_impl->desc_state_.connect_op = nullptr;
232  
        epoll_impl->desc_state_.connect_op = nullptr;
230  
    }
233  
    }
231  
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
234  
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
232  

235  

233  
    return {};
236  
    return {};
 
237 +
}
 
238 +

 
239 +
inline std::error_code
 
240 +
epoll_tcp_service::bind_socket(
 
241 +
    tcp_socket::implementation& impl, endpoint ep)
 
242 +
{
 
243 +
    return static_cast<epoll_tcp_socket*>(&impl)->do_bind(ep);
234  
}
244  
}
235  

245  

236  
} // namespace boost::corosio::detail
246  
} // namespace boost::corosio::detail
237  

247  

238  
#endif // BOOST_COROSIO_HAS_EPOLL
248  
#endif // BOOST_COROSIO_HAS_EPOLL
239  

249  

240  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
250  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP