include/boost/corosio/native/detail/epoll/epoll_tcp_service.hpp

80.0% Lines (48/60) 86.7% List of functions (13/15)
epoll_tcp_service.hpp
f(x) Functions (15)
Function Calls Lines Blocks
boost::corosio::detail::epoll_tcp_service::epoll_tcp_service(boost::capy::execution_context&) :99 333x 100.0% 100.0% boost::corosio::detail::epoll_connect_op::cancel() :115 0 0.0% 0.0% boost::corosio::detail::epoll_read_op::cancel() :124 98x 80.0% 75.0% boost::corosio::detail::epoll_write_op::cancel() :133 0 0.0% 0.0% boost::corosio::detail::epoll_op::operator()() :142 44510x 100.0% 100.0% boost::corosio::detail::epoll_connect_op::operator()() :148 2402x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::epoll_tcp_socket(boost::corosio::detail::epoll_tcp_service&) :153 7267x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::~epoll_tcp_socket() :158 7267x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::connect(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::endpoint, std::stop_token, std::error_code*) :161 2402x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :172 111016x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::write_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :184 110864x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::cancel() :196 95x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::close_socket() :202 21777x 100.0% 100.0% boost::corosio::detail::epoll_tcp_service::open_socket(boost::corosio::tcp_socket::implementation&, int, int, int) :208 2422x 94.4% 94.0% boost::corosio::detail::epoll_tcp_service::bind_socket(boost::corosio::tcp_socket::implementation&, boost::corosio::endpoint) :240 6x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/detail/tcp_service.hpp>
19
20 #include <boost/corosio/native/detail/epoll/epoll_tcp_socket.hpp>
21 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
22 #include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
23
24 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
25
26 #include <coroutine>
27
28 #include <errno.h>
29 #include <netinet/in.h>
30 #include <netinet/tcp.h>
31 #include <sys/epoll.h>
32 #include <sys/socket.h>
33 #include <unistd.h>
34
35 /*
36 epoll Socket Implementation
37 ===========================
38
39 Each I/O operation follows the same pattern:
40 1. Try the syscall immediately (non-blocking socket)
41 2. If it succeeds or fails with a real error, post to completion queue
42 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
43
44 This "try first" approach avoids unnecessary epoll round-trips for
45 operations that can complete immediately (common for small reads/writes
46 on fast local connections).
47
48 One-Shot Registration
49 ---------------------
50 We use one-shot epoll registration: each operation registers, waits for
51 one event, then unregisters. This simplifies the state machine since we
52 don't need to track whether an fd is currently registered or handle
53 re-arming. The tradeoff is slightly more epoll_ctl calls, but the
54 simplicity is worth it.
55
56 Cancellation
57 ------------
58 See op.hpp for the completion/cancellation race handling via the
59 `registered` atomic. cancel() must complete pending operations (post
60 them with cancelled flag) so coroutines waiting on them can resume.
61 close_socket() calls cancel() first to ensure this.
62
63 Impl Lifetime with shared_ptr
64 -----------------------------
65 Socket impls use enable_shared_from_this. The service owns impls via
66 shared_ptr maps (impl_ptrs_) keyed by raw pointer for O(1) lookup and
67 removal. When a user calls close(), we call cancel() which posts pending
68 ops to the scheduler.
69
70 CRITICAL: The posted ops must keep the impl alive until they complete.
71 Otherwise the scheduler would process a freed op (use-after-free). The
72 cancel() method captures shared_from_this() into op.impl_ptr before
73 posting. When the op completes, impl_ptr is cleared, allowing the impl
74 to be destroyed if no other references exist.
75
76 Service Ownership
77 -----------------
78 epoll_tcp_service owns all socket impls. destroy_impl() removes the
79 shared_ptr from the map, but the impl may survive if ops still hold
80 impl_ptr refs. shutdown() closes all sockets and clears the map; any
81 in-flight ops will complete and release their refs.
82 */
83
84 namespace boost::corosio::detail {
85
86 /** epoll TCP service implementation.
87
88 Inherits from tcp_service to enable runtime polymorphism.
89 Uses key_type = tcp_service for service lookup.
90 */
91 class BOOST_COROSIO_DECL epoll_tcp_service final
92 : public reactor_socket_service<
93 epoll_tcp_service,
94 tcp_service,
95 epoll_scheduler,
96 epoll_tcp_socket>
97 {
98 public:
99 333x explicit epoll_tcp_service(capy::execution_context& ctx)
100 333x : reactor_socket_service(ctx)
101 {
102 333x }
103
104 std::error_code open_socket(
105 tcp_socket::implementation& impl,
106 int family,
107 int type,
108 int protocol) override;
109
110 std::error_code
111 bind_socket(tcp_socket::implementation& impl, endpoint ep) override;
112 };
113
114 inline void
115 epoll_connect_op::cancel() noexcept
116 {
117 if (socket_impl_)
118 socket_impl_->cancel_single_op(*this);
119 else
120 request_cancel();
121 }
122
123 inline void
124 98x epoll_read_op::cancel() noexcept
125 {
126 98x if (socket_impl_)
127 98x socket_impl_->cancel_single_op(*this);
128 else
129 request_cancel();
130 98x }
131
132 inline void
133 epoll_write_op::cancel() noexcept
134 {
135 if (socket_impl_)
136 socket_impl_->cancel_single_op(*this);
137 else
138 request_cancel();
139 }
140
141 inline void
142 44510x epoll_op::operator()()
143 {
144 44510x complete_io_op(*this);
145 44510x }
146
147 inline void
148 2402x epoll_connect_op::operator()()
149 {
150 2402x complete_connect_op(*this);
151 2402x }
152
153 7267x inline epoll_tcp_socket::epoll_tcp_socket(epoll_tcp_service& svc) noexcept
154 7267x : reactor_stream_socket(svc)
155 {
156 7267x }
157
158 7267x inline epoll_tcp_socket::~epoll_tcp_socket() = default;
159
160 inline std::coroutine_handle<>
161 2402x epoll_tcp_socket::connect(
162 std::coroutine_handle<> h,
163 capy::executor_ref ex,
164 endpoint ep,
165 std::stop_token token,
166 std::error_code* ec)
167 {
168 2402x return do_connect(h, ex, ep, token, ec);
169 }
170
171 inline std::coroutine_handle<>
172 111016x epoll_tcp_socket::read_some(
173 std::coroutine_handle<> h,
174 capy::executor_ref ex,
175 buffer_param param,
176 std::stop_token token,
177 std::error_code* ec,
178 std::size_t* bytes_out)
179 {
180 111016x return do_read_some(h, ex, param, token, ec, bytes_out);
181 }
182
183 inline std::coroutine_handle<>
184 110864x epoll_tcp_socket::write_some(
185 std::coroutine_handle<> h,
186 capy::executor_ref ex,
187 buffer_param param,
188 std::stop_token token,
189 std::error_code* ec,
190 std::size_t* bytes_out)
191 {
192 110864x return do_write_some(h, ex, param, token, ec, bytes_out);
193 }
194
195 inline void
196 95x epoll_tcp_socket::cancel() noexcept
197 {
198 95x do_cancel();
199 95x }
200
201 inline void
202 21777x epoll_tcp_socket::close_socket() noexcept
203 {
204 21777x do_close_socket();
205 21777x }
206
207 inline std::error_code
208 2422x epoll_tcp_service::open_socket(
209 tcp_socket::implementation& impl, int family, int type, int protocol)
210 {
211 2422x auto* epoll_impl = static_cast<epoll_tcp_socket*>(&impl);
212 2422x epoll_impl->close_socket();
213
214 2422x int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
215 2422x if (fd < 0)
216 return make_err(errno);
217
218 2422x if (family == AF_INET6)
219 {
220 6x int one = 1;
221 6x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
222 }
223
224 2422x epoll_impl->fd_ = fd;
225
226 // Register fd with epoll (edge-triggered mode)
227 2422x epoll_impl->desc_state_.fd = fd;
228 {
229 2422x std::lock_guard lock(epoll_impl->desc_state_.mutex);
230 2422x epoll_impl->desc_state_.read_op = nullptr;
231 2422x epoll_impl->desc_state_.write_op = nullptr;
232 2422x epoll_impl->desc_state_.connect_op = nullptr;
233 2422x }
234 2422x scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
235
236 2422x return {};
237 }
238
239 inline std::error_code
240 6x epoll_tcp_service::bind_socket(
241 tcp_socket::implementation& impl, endpoint ep)
242 {
243 6x return static_cast<epoll_tcp_socket*>(&impl)->do_bind(ep);
244 }
245
246 } // namespace boost::corosio::detail
247
248 #endif // BOOST_COROSIO_HAS_EPOLL
249
250 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
251