TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/tcp_acceptor_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/select/select_tcp_acceptor.hpp>
22 : #include <boost/corosio/native/detail/select/select_tcp_service.hpp>
23 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
24 : #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
25 :
26 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
27 :
28 : #include <memory>
29 : #include <mutex>
30 : #include <utility>
31 :
32 : #include <errno.h>
33 : #include <fcntl.h>
34 : #include <netinet/in.h>
35 : #include <sys/select.h>
36 : #include <sys/socket.h>
37 : #include <unistd.h>
38 :
39 : namespace boost::corosio::detail {
40 :
41 : /// State for select acceptor service.
42 : using select_tcp_acceptor_state =
43 : reactor_service_state<select_scheduler, select_tcp_acceptor>;
44 :
45 : /** select acceptor service implementation.
46 :
47 : Inherits from tcp_acceptor_service to enable runtime polymorphism.
48 : Uses key_type = tcp_acceptor_service for service lookup.
49 : */
50 : class BOOST_COROSIO_DECL select_tcp_acceptor_service final
51 : : public tcp_acceptor_service
52 : {
53 : public:
54 : explicit select_tcp_acceptor_service(
55 : capy::execution_context& ctx, select_tcp_service& tcp_svc);
56 : ~select_tcp_acceptor_service() override;
57 :
58 : select_tcp_acceptor_service(select_tcp_acceptor_service const&) = delete;
59 : select_tcp_acceptor_service&
60 : operator=(select_tcp_acceptor_service const&) = delete;
61 :
62 : void shutdown() override;
63 :
64 : io_object::implementation* construct() override;
65 : void destroy(io_object::implementation*) override;
66 : void close(io_object::handle&) override;
67 : std::error_code open_acceptor_socket(
68 : tcp_acceptor::implementation& impl,
69 : int family,
70 : int type,
71 : int protocol) override;
72 : std::error_code
73 : bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
74 : std::error_code
75 : listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
76 :
77 HIT 119 : select_scheduler& scheduler() const noexcept
78 : {
79 119 : return state_->sched_;
80 : }
81 : void post(scheduler_op* op);
82 : void work_started() noexcept;
83 : void work_finished() noexcept;
84 :
85 : /** Get the TCP service for creating peer sockets during accept. */
86 : select_tcp_service* tcp_service() const noexcept;
87 :
88 : private:
89 : select_tcp_service* tcp_svc_;
90 : std::unique_ptr<select_tcp_acceptor_state> state_;
91 : };
92 :
93 : inline void
94 MIS 0 : select_accept_op::cancel() noexcept
95 : {
96 0 : if (acceptor_impl_)
97 0 : acceptor_impl_->cancel_single_op(*this);
98 : else
99 0 : request_cancel();
100 0 : }
101 :
102 : inline void
103 HIT 1985 : select_accept_op::operator()()
104 : {
105 1985 : complete_accept_op<select_tcp_socket>(*this);
106 1985 : }
107 :
108 65 : inline select_tcp_acceptor::select_tcp_acceptor(
109 65 : select_tcp_acceptor_service& svc) noexcept
110 65 : : reactor_acceptor(svc)
111 : {
112 65 : }
113 :
114 : inline std::coroutine_handle<>
115 1985 : select_tcp_acceptor::accept(
116 : std::coroutine_handle<> h,
117 : capy::executor_ref ex,
118 : std::stop_token token,
119 : std::error_code* ec,
120 : io_object::implementation** impl_out)
121 : {
122 1985 : auto& op = acc_;
123 1985 : op.reset();
124 1985 : op.h = h;
125 1985 : op.ex = ex;
126 1985 : op.ec_out = ec;
127 1985 : op.impl_out = impl_out;
128 1985 : op.fd = fd_;
129 1985 : op.start(token, this);
130 :
131 1985 : sockaddr_storage peer_storage{};
132 1985 : socklen_t addrlen = sizeof(peer_storage);
133 : int accepted;
134 : do
135 : {
136 : accepted =
137 1985 : ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
138 : }
139 1985 : while (accepted < 0 && errno == EINTR);
140 :
141 1985 : if (accepted >= 0)
142 : {
143 3 : if (accepted >= FD_SETSIZE)
144 : {
145 MIS 0 : ::close(accepted);
146 0 : op.complete(EINVAL, 0);
147 0 : op.impl_ptr = shared_from_this();
148 0 : svc_.post(&op);
149 0 : return std::noop_coroutine();
150 : }
151 :
152 HIT 3 : int flags = ::fcntl(accepted, F_GETFL, 0);
153 3 : if (flags == -1)
154 : {
155 MIS 0 : int err = errno;
156 0 : ::close(accepted);
157 0 : op.complete(err, 0);
158 0 : op.impl_ptr = shared_from_this();
159 0 : svc_.post(&op);
160 0 : return std::noop_coroutine();
161 : }
162 :
163 HIT 3 : if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
164 : {
165 MIS 0 : int err = errno;
166 0 : ::close(accepted);
167 0 : op.complete(err, 0);
168 0 : op.impl_ptr = shared_from_this();
169 0 : svc_.post(&op);
170 0 : return std::noop_coroutine();
171 : }
172 :
173 HIT 3 : if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
174 : {
175 MIS 0 : int err = errno;
176 0 : ::close(accepted);
177 0 : op.complete(err, 0);
178 0 : op.impl_ptr = shared_from_this();
179 0 : svc_.post(&op);
180 0 : return std::noop_coroutine();
181 : }
182 :
183 : {
184 HIT 3 : std::lock_guard lock(desc_state_.mutex);
185 3 : desc_state_.read_ready = false;
186 3 : }
187 :
188 3 : if (svc_.scheduler().try_consume_inline_budget())
189 : {
190 MIS 0 : auto* socket_svc = svc_.tcp_service();
191 0 : if (socket_svc)
192 : {
193 : auto& impl =
194 0 : static_cast<select_tcp_socket&>(*socket_svc->construct());
195 0 : impl.set_socket(accepted);
196 :
197 0 : impl.desc_state_.fd = accepted;
198 : {
199 0 : std::lock_guard lock(impl.desc_state_.mutex);
200 0 : impl.desc_state_.read_op = nullptr;
201 0 : impl.desc_state_.write_op = nullptr;
202 0 : impl.desc_state_.connect_op = nullptr;
203 0 : }
204 0 : socket_svc->scheduler().register_descriptor(
205 : accepted, &impl.desc_state_);
206 :
207 0 : impl.set_endpoints(
208 : local_endpoint_, from_sockaddr(peer_storage));
209 :
210 0 : *ec = {};
211 0 : if (impl_out)
212 0 : *impl_out = &impl;
213 : }
214 : else
215 : {
216 0 : ::close(accepted);
217 0 : *ec = make_err(ENOENT);
218 0 : if (impl_out)
219 0 : *impl_out = nullptr;
220 : }
221 0 : op.cont_op.cont.h = h;
222 0 : return dispatch_coro(ex, op.cont_op.cont);
223 : }
224 :
225 HIT 3 : op.accepted_fd = accepted;
226 3 : op.peer_storage = peer_storage;
227 3 : op.complete(0, 0);
228 3 : op.impl_ptr = shared_from_this();
229 3 : svc_.post(&op);
230 3 : return std::noop_coroutine();
231 : }
232 :
233 1982 : if (errno == EAGAIN || errno == EWOULDBLOCK)
234 : {
235 1982 : op.impl_ptr = shared_from_this();
236 1982 : svc_.work_started();
237 :
238 1982 : std::lock_guard lock(desc_state_.mutex);
239 1982 : bool io_done = false;
240 1982 : if (desc_state_.read_ready)
241 : {
242 MIS 0 : desc_state_.read_ready = false;
243 0 : op.perform_io();
244 0 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
245 0 : if (!io_done)
246 0 : op.errn = 0;
247 : }
248 :
249 HIT 1982 : if (io_done || op.cancelled.load(std::memory_order_acquire))
250 : {
251 MIS 0 : svc_.post(&op);
252 0 : svc_.work_finished();
253 : }
254 : else
255 : {
256 HIT 1982 : desc_state_.read_op = &op;
257 : }
258 1982 : return std::noop_coroutine();
259 1982 : }
260 :
261 MIS 0 : op.complete(errno, 0);
262 0 : op.impl_ptr = shared_from_this();
263 0 : svc_.post(&op);
264 0 : return std::noop_coroutine();
265 : }
266 :
267 : inline void
268 HIT 2 : select_tcp_acceptor::cancel() noexcept
269 : {
270 2 : do_cancel();
271 2 : }
272 :
273 : inline void
274 254 : select_tcp_acceptor::close_socket() noexcept
275 : {
276 254 : do_close_socket();
277 254 : }
278 :
279 206 : inline select_tcp_acceptor_service::select_tcp_acceptor_service(
280 206 : capy::execution_context& ctx, select_tcp_service& tcp_svc)
281 206 : : tcp_svc_(&tcp_svc)
282 206 : , state_(
283 : std::make_unique<select_tcp_acceptor_state>(
284 206 : ctx.use_service<select_scheduler>()))
285 : {
286 206 : }
287 :
288 412 : inline select_tcp_acceptor_service::~select_tcp_acceptor_service() {}
289 :
290 : inline void
291 206 : select_tcp_acceptor_service::shutdown()
292 : {
293 206 : std::lock_guard lock(state_->mutex_);
294 :
295 206 : while (auto* impl = state_->impl_list_.pop_front())
296 MIS 0 : impl->close_socket();
297 :
298 : // Don't clear impl_ptrs_ here — same rationale as
299 : // select_tcp_service::shutdown(). Let ~state_ release ptrs
300 : // after scheduler shutdown has drained all queued ops.
301 HIT 206 : }
302 :
303 : inline io_object::implementation*
304 65 : select_tcp_acceptor_service::construct()
305 : {
306 65 : auto impl = std::make_shared<select_tcp_acceptor>(*this);
307 65 : auto* raw = impl.get();
308 :
309 65 : std::lock_guard lock(state_->mutex_);
310 65 : state_->impl_ptrs_.emplace(raw, std::move(impl));
311 65 : state_->impl_list_.push_back(raw);
312 :
313 65 : return raw;
314 65 : }
315 :
316 : inline void
317 65 : select_tcp_acceptor_service::destroy(io_object::implementation* impl)
318 : {
319 65 : auto* select_impl = static_cast<select_tcp_acceptor*>(impl);
320 65 : select_impl->close_socket();
321 65 : std::lock_guard lock(state_->mutex_);
322 65 : state_->impl_list_.remove(select_impl);
323 65 : state_->impl_ptrs_.erase(select_impl);
324 65 : }
325 :
326 : inline void
327 127 : select_tcp_acceptor_service::close(io_object::handle& h)
328 : {
329 127 : static_cast<select_tcp_acceptor*>(h.get())->close_socket();
330 127 : }
331 :
332 : inline std::error_code
333 62 : select_tcp_acceptor_service::open_acceptor_socket(
334 : tcp_acceptor::implementation& impl, int family, int type, int protocol)
335 : {
336 62 : auto* select_impl = static_cast<select_tcp_acceptor*>(&impl);
337 62 : select_impl->close_socket();
338 :
339 62 : int fd = ::socket(family, type, protocol);
340 62 : if (fd < 0)
341 MIS 0 : return make_err(errno);
342 :
343 HIT 62 : int flags = ::fcntl(fd, F_GETFL, 0);
344 62 : if (flags == -1)
345 : {
346 MIS 0 : int errn = errno;
347 0 : ::close(fd);
348 0 : return make_err(errn);
349 : }
350 HIT 62 : if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
351 : {
352 MIS 0 : int errn = errno;
353 0 : ::close(fd);
354 0 : return make_err(errn);
355 : }
356 HIT 62 : if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
357 : {
358 MIS 0 : int errn = errno;
359 0 : ::close(fd);
360 0 : return make_err(errn);
361 : }
362 :
363 HIT 62 : if (fd >= FD_SETSIZE)
364 : {
365 MIS 0 : ::close(fd);
366 0 : return make_err(EMFILE);
367 : }
368 :
369 HIT 62 : if (family == AF_INET6)
370 : {
371 8 : int val = 0; // dual-stack default
372 8 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
373 : }
374 :
375 : #ifdef SO_NOSIGPIPE
376 : {
377 : int nosig = 1;
378 : ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &nosig, sizeof(nosig));
379 : }
380 : #endif
381 :
382 62 : select_impl->fd_ = fd;
383 :
384 : // Set up descriptor state but do NOT register with reactor yet
385 : // (registration happens in do_listen via reactor_acceptor base)
386 62 : select_impl->desc_state_.fd = fd;
387 : {
388 62 : std::lock_guard lock(select_impl->desc_state_.mutex);
389 62 : select_impl->desc_state_.read_op = nullptr;
390 62 : }
391 :
392 62 : return {};
393 : }
394 :
395 : inline std::error_code
396 61 : select_tcp_acceptor_service::bind_acceptor(
397 : tcp_acceptor::implementation& impl, endpoint ep)
398 : {
399 61 : return static_cast<select_tcp_acceptor*>(&impl)->do_bind(ep);
400 : }
401 :
402 : inline std::error_code
403 58 : select_tcp_acceptor_service::listen_acceptor(
404 : tcp_acceptor::implementation& impl, int backlog)
405 : {
406 58 : return static_cast<select_tcp_acceptor*>(&impl)->do_listen(backlog);
407 : }
408 :
409 : inline void
410 6 : select_tcp_acceptor_service::post(scheduler_op* op)
411 : {
412 6 : state_->sched_.post(op);
413 6 : }
414 :
415 : inline void
416 1982 : select_tcp_acceptor_service::work_started() noexcept
417 : {
418 1982 : state_->sched_.work_started();
419 1982 : }
420 :
421 : inline void
422 3 : select_tcp_acceptor_service::work_finished() noexcept
423 : {
424 3 : state_->sched_.work_finished();
425 3 : }
426 :
427 : inline select_tcp_service*
428 1982 : select_tcp_acceptor_service::tcp_service() const noexcept
429 : {
430 1982 : return tcp_svc_;
431 : }
432 :
433 : } // namespace boost::corosio::detail
434 :
435 : #endif // BOOST_COROSIO_HAS_SELECT
436 :
437 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
|