TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/tcp_acceptor_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/epoll/epoll_tcp_acceptor.hpp>
22 : #include <boost/corosio/native/detail/epoll/epoll_tcp_service.hpp>
23 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
24 : #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
25 :
26 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
27 :
28 : #include <memory>
29 : #include <mutex>
30 : #include <utility>
31 :
32 : #include <errno.h>
33 : #include <netinet/in.h>
34 : #include <sys/epoll.h>
35 : #include <sys/socket.h>
36 : #include <unistd.h>
37 :
38 : namespace boost::corosio::detail {
39 :
40 : /// State for epoll acceptor service.
41 : using epoll_tcp_acceptor_state =
42 : reactor_service_state<epoll_scheduler, epoll_tcp_acceptor>;
43 :
44 : /** epoll acceptor service implementation.
45 :
46 : Inherits from tcp_acceptor_service to enable runtime polymorphism.
47 : Uses key_type = tcp_acceptor_service for service lookup.
48 : */
49 : class BOOST_COROSIO_DECL epoll_tcp_acceptor_service final
50 : : public tcp_acceptor_service
51 : {
52 : public:
53 : explicit epoll_tcp_acceptor_service(
54 : capy::execution_context& ctx, epoll_tcp_service& tcp_svc);
55 : ~epoll_tcp_acceptor_service() override;
56 :
57 : epoll_tcp_acceptor_service(epoll_tcp_acceptor_service const&) = delete;
58 : epoll_tcp_acceptor_service&
59 : operator=(epoll_tcp_acceptor_service const&) = delete;
60 :
61 : void shutdown() override;
62 :
63 : io_object::implementation* construct() override;
64 : void destroy(io_object::implementation*) override;
65 : void close(io_object::handle&) override;
66 : std::error_code open_acceptor_socket(
67 : tcp_acceptor::implementation& impl,
68 : int family,
69 : int type,
70 : int protocol) override;
71 : std::error_code
72 : bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
73 : std::error_code
74 : listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
75 :
76 HIT 155 : epoll_scheduler& scheduler() const noexcept
77 : {
78 155 : return state_->sched_;
79 : }
80 : void post(scheduler_op* op);
81 : void work_started() noexcept;
82 : void work_finished() noexcept;
83 :
84 : /** Get the TCP service for creating peer sockets during accept. */
85 : epoll_tcp_service* tcp_service() const noexcept;
86 :
87 : private:
88 : epoll_tcp_service* tcp_svc_;
89 : std::unique_ptr<epoll_tcp_acceptor_state> state_;
90 : };
91 :
92 : inline void
93 6 : epoll_accept_op::cancel() noexcept
94 : {
95 6 : if (acceptor_impl_)
96 6 : acceptor_impl_->cancel_single_op(*this);
97 : else
98 MIS 0 : request_cancel();
99 HIT 6 : }
100 :
101 : inline void
102 2408 : epoll_accept_op::operator()()
103 : {
104 2408 : complete_accept_op<epoll_tcp_socket>(*this);
105 2408 : }
106 :
107 84 : inline epoll_tcp_acceptor::epoll_tcp_acceptor(
108 84 : epoll_tcp_acceptor_service& svc) noexcept
109 84 : : reactor_acceptor(svc)
110 : {
111 84 : }
112 :
113 : inline std::coroutine_handle<>
114 2408 : epoll_tcp_acceptor::accept(
115 : std::coroutine_handle<> h,
116 : capy::executor_ref ex,
117 : std::stop_token token,
118 : std::error_code* ec,
119 : io_object::implementation** impl_out)
120 : {
121 2408 : auto& op = acc_;
122 2408 : op.reset();
123 2408 : op.h = h;
124 2408 : op.ex = ex;
125 2408 : op.ec_out = ec;
126 2408 : op.impl_out = impl_out;
127 2408 : op.fd = fd_;
128 2408 : op.start(token, this);
129 :
130 2408 : sockaddr_storage peer_storage{};
131 2408 : socklen_t addrlen = sizeof(peer_storage);
132 : int accepted;
133 : do
134 : {
135 2408 : accepted = ::accept4(
136 : fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
137 : SOCK_NONBLOCK | SOCK_CLOEXEC);
138 : }
139 2408 : while (accepted < 0 && errno == EINTR);
140 :
141 2408 : if (accepted >= 0)
142 : {
143 : {
144 3 : std::lock_guard lock(desc_state_.mutex);
145 3 : desc_state_.read_ready = false;
146 3 : }
147 :
148 3 : if (svc_.scheduler().try_consume_inline_budget())
149 : {
150 MIS 0 : auto* socket_svc = svc_.tcp_service();
151 0 : if (socket_svc)
152 : {
153 : auto& impl =
154 0 : static_cast<epoll_tcp_socket&>(*socket_svc->construct());
155 0 : impl.set_socket(accepted);
156 :
157 0 : impl.desc_state_.fd = accepted;
158 : {
159 0 : std::lock_guard lock(impl.desc_state_.mutex);
160 0 : impl.desc_state_.read_op = nullptr;
161 0 : impl.desc_state_.write_op = nullptr;
162 0 : impl.desc_state_.connect_op = nullptr;
163 0 : }
164 0 : socket_svc->scheduler().register_descriptor(
165 : accepted, &impl.desc_state_);
166 :
167 0 : impl.set_endpoints(
168 : local_endpoint_, from_sockaddr(peer_storage));
169 :
170 0 : *ec = {};
171 0 : if (impl_out)
172 0 : *impl_out = &impl;
173 : }
174 : else
175 : {
176 0 : ::close(accepted);
177 0 : *ec = make_err(ENOENT);
178 0 : if (impl_out)
179 0 : *impl_out = nullptr;
180 : }
181 0 : op.cont_op.cont.h = h;
182 0 : return dispatch_coro(ex, op.cont_op.cont);
183 : }
184 :
185 HIT 3 : op.accepted_fd = accepted;
186 3 : op.peer_storage = peer_storage;
187 3 : op.complete(0, 0);
188 3 : op.impl_ptr = shared_from_this();
189 3 : svc_.post(&op);
190 3 : return std::noop_coroutine();
191 : }
192 :
193 2405 : if (errno == EAGAIN || errno == EWOULDBLOCK)
194 : {
195 2405 : op.impl_ptr = shared_from_this();
196 2405 : svc_.work_started();
197 :
198 2405 : std::lock_guard lock(desc_state_.mutex);
199 2405 : bool io_done = false;
200 2405 : if (desc_state_.read_ready)
201 : {
202 MIS 0 : desc_state_.read_ready = false;
203 0 : op.perform_io();
204 0 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
205 0 : if (!io_done)
206 0 : op.errn = 0;
207 : }
208 :
209 HIT 2405 : if (io_done || op.cancelled.load(std::memory_order_acquire))
210 : {
211 MIS 0 : svc_.post(&op);
212 0 : svc_.work_finished();
213 : }
214 : else
215 : {
216 HIT 2405 : desc_state_.read_op = &op;
217 : }
218 2405 : return std::noop_coroutine();
219 2405 : }
220 :
221 MIS 0 : op.complete(errno, 0);
222 0 : op.impl_ptr = shared_from_this();
223 0 : svc_.post(&op);
224 : // completion is always posted to scheduler queue, never inline.
225 0 : return std::noop_coroutine();
226 : }
227 :
228 : inline void
229 HIT 2 : epoll_tcp_acceptor::cancel() noexcept
230 : {
231 2 : do_cancel();
232 2 : }
233 :
234 : inline void
235 332 : epoll_tcp_acceptor::close_socket() noexcept
236 : {
237 332 : do_close_socket();
238 332 : }
239 :
240 333 : inline epoll_tcp_acceptor_service::epoll_tcp_acceptor_service(
241 333 : capy::execution_context& ctx, epoll_tcp_service& tcp_svc)
242 333 : : tcp_svc_(&tcp_svc)
243 333 : , state_(
244 : std::make_unique<epoll_tcp_acceptor_state>(
245 333 : ctx.use_service<epoll_scheduler>()))
246 : {
247 333 : }
248 :
249 666 : inline epoll_tcp_acceptor_service::~epoll_tcp_acceptor_service() {}
250 :
251 : inline void
252 333 : epoll_tcp_acceptor_service::shutdown()
253 : {
254 333 : std::lock_guard lock(state_->mutex_);
255 :
256 333 : while (auto* impl = state_->impl_list_.pop_front())
257 MIS 0 : impl->close_socket();
258 :
259 : // Don't clear impl_ptrs_ here — same rationale as
260 : // epoll_tcp_service::shutdown(). Let ~state_ release ptrs
261 : // after scheduler shutdown has drained all queued ops.
262 HIT 333 : }
263 :
264 : inline io_object::implementation*
265 84 : epoll_tcp_acceptor_service::construct()
266 : {
267 84 : auto impl = std::make_shared<epoll_tcp_acceptor>(*this);
268 84 : auto* raw = impl.get();
269 :
270 84 : std::lock_guard lock(state_->mutex_);
271 84 : state_->impl_ptrs_.emplace(raw, std::move(impl));
272 84 : state_->impl_list_.push_back(raw);
273 :
274 84 : return raw;
275 84 : }
276 :
277 : inline void
278 84 : epoll_tcp_acceptor_service::destroy(io_object::implementation* impl)
279 : {
280 84 : auto* epoll_impl = static_cast<epoll_tcp_acceptor*>(impl);
281 84 : epoll_impl->close_socket();
282 84 : std::lock_guard lock(state_->mutex_);
283 84 : state_->impl_list_.remove(epoll_impl);
284 84 : state_->impl_ptrs_.erase(epoll_impl);
285 84 : }
286 :
287 : inline void
288 166 : epoll_tcp_acceptor_service::close(io_object::handle& h)
289 : {
290 166 : static_cast<epoll_tcp_acceptor*>(h.get())->close_socket();
291 166 : }
292 :
293 : inline std::error_code
294 82 : epoll_tcp_acceptor_service::open_acceptor_socket(
295 : tcp_acceptor::implementation& impl, int family, int type, int protocol)
296 : {
297 82 : auto* epoll_impl = static_cast<epoll_tcp_acceptor*>(&impl);
298 82 : epoll_impl->close_socket();
299 :
300 82 : int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
301 82 : if (fd < 0)
302 MIS 0 : return make_err(errno);
303 :
304 HIT 82 : if (family == AF_INET6)
305 : {
306 8 : int val = 0; // dual-stack default
307 8 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
308 : }
309 :
310 82 : epoll_impl->fd_ = fd;
311 :
312 : // Set up descriptor state but do NOT register with epoll yet
313 82 : epoll_impl->desc_state_.fd = fd;
314 : {
315 82 : std::lock_guard lock(epoll_impl->desc_state_.mutex);
316 82 : epoll_impl->desc_state_.read_op = nullptr;
317 82 : }
318 :
319 82 : return {};
320 : }
321 :
322 : inline std::error_code
323 81 : epoll_tcp_acceptor_service::bind_acceptor(
324 : tcp_acceptor::implementation& impl, endpoint ep)
325 : {
326 81 : return static_cast<epoll_tcp_acceptor*>(&impl)->do_bind(ep);
327 : }
328 :
329 : inline std::error_code
330 76 : epoll_tcp_acceptor_service::listen_acceptor(
331 : tcp_acceptor::implementation& impl, int backlog)
332 : {
333 76 : return static_cast<epoll_tcp_acceptor*>(&impl)->do_listen(backlog);
334 : }
335 :
336 : inline void
337 12 : epoll_tcp_acceptor_service::post(scheduler_op* op)
338 : {
339 12 : state_->sched_.post(op);
340 12 : }
341 :
342 : inline void
343 2405 : epoll_tcp_acceptor_service::work_started() noexcept
344 : {
345 2405 : state_->sched_.work_started();
346 2405 : }
347 :
348 : inline void
349 9 : epoll_tcp_acceptor_service::work_finished() noexcept
350 : {
351 9 : state_->sched_.work_finished();
352 9 : }
353 :
354 : inline epoll_tcp_service*
355 2399 : epoll_tcp_acceptor_service::tcp_service() const noexcept
356 : {
357 2399 : return tcp_svc_;
358 : }
359 :
360 : } // namespace boost::corosio::detail
361 :
362 : #endif // BOOST_COROSIO_HAS_EPOLL
363 :
364 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
|