include/boost/corosio/native/detail/select/select_tcp_acceptor_service.hpp

61.6% Lines (117/190) 95.0% List of functions (19/20)
select_tcp_acceptor_service.hpp
f(x) Functions (20)
Function Calls Lines Blocks
boost::corosio::detail::select_tcp_acceptor_service::scheduler() const :77 119x 100.0% 100.0% boost::corosio::detail::select_accept_op::cancel() :94 0 0.0% 0.0% boost::corosio::detail::select_accept_op::operator()() :103 1985x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor::select_tcp_acceptor(boost::corosio::detail::select_tcp_acceptor_service&) :108 65x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor::accept(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token, std::error_code*, boost::corosio::io_object::implementation**) :115 1985x 41.5% 35.0% boost::corosio::detail::select_tcp_acceptor::cancel() :268 2x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor::close_socket() :274 254x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::select_tcp_acceptor_service(boost::capy::execution_context&, boost::corosio::detail::select_tcp_service&) :279 206x 100.0% 83.0% boost::corosio::detail::select_tcp_acceptor_service::~select_tcp_acceptor_service() :288 412x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::shutdown() :291 206x 80.0% 90.0% boost::corosio::detail::select_tcp_acceptor_service::construct() :304 65x 100.0% 78.0% boost::corosio::detail::select_tcp_acceptor_service::destroy(boost::corosio::io_object::implementation*) :317 65x 100.0% 83.0% boost::corosio::detail::select_tcp_acceptor_service::close(boost::corosio::io_object::handle&) :327 127x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::open_acceptor_socket(boost::corosio::tcp_acceptor::implementation&, int, int, int) :333 62x 61.3% 69.0% boost::corosio::detail::select_tcp_acceptor_service::bind_acceptor(boost::corosio::tcp_acceptor::implementation&, boost::corosio::endpoint) :396 61x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::listen_acceptor(boost::corosio::tcp_acceptor::implementation&, int) :403 58x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::post(boost::corosio::detail::scheduler_op*) :410 6x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::work_started() :416 1982x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::work_finished() :422 3x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::tcp_service() const :428 1982x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/tcp_acceptor_service.hpp>
20
21 #include <boost/corosio/native/detail/select/select_tcp_acceptor.hpp>
22 #include <boost/corosio/native/detail/select/select_tcp_service.hpp>
23 #include <boost/corosio/native/detail/select/select_scheduler.hpp>
24 #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
25
26 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
27
28 #include <memory>
29 #include <mutex>
30 #include <utility>
31
32 #include <errno.h>
33 #include <fcntl.h>
34 #include <netinet/in.h>
35 #include <sys/select.h>
36 #include <sys/socket.h>
37 #include <unistd.h>
38
39 namespace boost::corosio::detail {
40
41 /// State for select acceptor service.
42 using select_tcp_acceptor_state =
43 reactor_service_state<select_scheduler, select_tcp_acceptor>;
44
45 /** select acceptor service implementation.
46
47 Inherits from tcp_acceptor_service to enable runtime polymorphism.
48 Uses key_type = tcp_acceptor_service for service lookup.
49 */
50 class BOOST_COROSIO_DECL select_tcp_acceptor_service final
51 : public tcp_acceptor_service
52 {
53 public:
54 explicit select_tcp_acceptor_service(
55 capy::execution_context& ctx, select_tcp_service& tcp_svc);
56 ~select_tcp_acceptor_service() override;
57
58 select_tcp_acceptor_service(select_tcp_acceptor_service const&) = delete;
59 select_tcp_acceptor_service&
60 operator=(select_tcp_acceptor_service const&) = delete;
61
62 void shutdown() override;
63
64 io_object::implementation* construct() override;
65 void destroy(io_object::implementation*) override;
66 void close(io_object::handle&) override;
67 std::error_code open_acceptor_socket(
68 tcp_acceptor::implementation& impl,
69 int family,
70 int type,
71 int protocol) override;
72 std::error_code
73 bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
74 std::error_code
75 listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
76
77 119x select_scheduler& scheduler() const noexcept
78 {
79 119x return state_->sched_;
80 }
81 void post(scheduler_op* op);
82 void work_started() noexcept;
83 void work_finished() noexcept;
84
85 /** Get the TCP service for creating peer sockets during accept. */
86 select_tcp_service* tcp_service() const noexcept;
87
88 private:
89 select_tcp_service* tcp_svc_;
90 std::unique_ptr<select_tcp_acceptor_state> state_;
91 };
92
93 inline void
94 select_accept_op::cancel() noexcept
95 {
96 if (acceptor_impl_)
97 acceptor_impl_->cancel_single_op(*this);
98 else
99 request_cancel();
100 }
101
102 inline void
103 1985x select_accept_op::operator()()
104 {
105 1985x complete_accept_op<select_tcp_socket>(*this);
106 1985x }
107
108 65x inline select_tcp_acceptor::select_tcp_acceptor(
109 65x select_tcp_acceptor_service& svc) noexcept
110 65x : reactor_acceptor(svc)
111 {
112 65x }
113
114 inline std::coroutine_handle<>
115 1985x select_tcp_acceptor::accept(
116 std::coroutine_handle<> h,
117 capy::executor_ref ex,
118 std::stop_token token,
119 std::error_code* ec,
120 io_object::implementation** impl_out)
121 {
122 1985x auto& op = acc_;
123 1985x op.reset();
124 1985x op.h = h;
125 1985x op.ex = ex;
126 1985x op.ec_out = ec;
127 1985x op.impl_out = impl_out;
128 1985x op.fd = fd_;
129 1985x op.start(token, this);
130
131 1985x sockaddr_storage peer_storage{};
132 1985x socklen_t addrlen = sizeof(peer_storage);
133 int accepted;
134 do
135 {
136 accepted =
137 1985x ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
138 }
139 1985x while (accepted < 0 && errno == EINTR);
140
141 1985x if (accepted >= 0)
142 {
143 3x if (accepted >= FD_SETSIZE)
144 {
145 ::close(accepted);
146 op.complete(EINVAL, 0);
147 op.impl_ptr = shared_from_this();
148 svc_.post(&op);
149 return std::noop_coroutine();
150 }
151
152 3x int flags = ::fcntl(accepted, F_GETFL, 0);
153 3x if (flags == -1)
154 {
155 int err = errno;
156 ::close(accepted);
157 op.complete(err, 0);
158 op.impl_ptr = shared_from_this();
159 svc_.post(&op);
160 return std::noop_coroutine();
161 }
162
163 3x if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
164 {
165 int err = errno;
166 ::close(accepted);
167 op.complete(err, 0);
168 op.impl_ptr = shared_from_this();
169 svc_.post(&op);
170 return std::noop_coroutine();
171 }
172
173 3x if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
174 {
175 int err = errno;
176 ::close(accepted);
177 op.complete(err, 0);
178 op.impl_ptr = shared_from_this();
179 svc_.post(&op);
180 return std::noop_coroutine();
181 }
182
183 {
184 3x std::lock_guard lock(desc_state_.mutex);
185 3x desc_state_.read_ready = false;
186 3x }
187
188 3x if (svc_.scheduler().try_consume_inline_budget())
189 {
190 auto* socket_svc = svc_.tcp_service();
191 if (socket_svc)
192 {
193 auto& impl =
194 static_cast<select_tcp_socket&>(*socket_svc->construct());
195 impl.set_socket(accepted);
196
197 impl.desc_state_.fd = accepted;
198 {
199 std::lock_guard lock(impl.desc_state_.mutex);
200 impl.desc_state_.read_op = nullptr;
201 impl.desc_state_.write_op = nullptr;
202 impl.desc_state_.connect_op = nullptr;
203 }
204 socket_svc->scheduler().register_descriptor(
205 accepted, &impl.desc_state_);
206
207 impl.set_endpoints(
208 local_endpoint_, from_sockaddr(peer_storage));
209
210 *ec = {};
211 if (impl_out)
212 *impl_out = &impl;
213 }
214 else
215 {
216 ::close(accepted);
217 *ec = make_err(ENOENT);
218 if (impl_out)
219 *impl_out = nullptr;
220 }
221 op.cont_op.cont.h = h;
222 return dispatch_coro(ex, op.cont_op.cont);
223 }
224
225 3x op.accepted_fd = accepted;
226 3x op.peer_storage = peer_storage;
227 3x op.complete(0, 0);
228 3x op.impl_ptr = shared_from_this();
229 3x svc_.post(&op);
230 3x return std::noop_coroutine();
231 }
232
233 1982x if (errno == EAGAIN || errno == EWOULDBLOCK)
234 {
235 1982x op.impl_ptr = shared_from_this();
236 1982x svc_.work_started();
237
238 1982x std::lock_guard lock(desc_state_.mutex);
239 1982x bool io_done = false;
240 1982x if (desc_state_.read_ready)
241 {
242 desc_state_.read_ready = false;
243 op.perform_io();
244 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
245 if (!io_done)
246 op.errn = 0;
247 }
248
249 1982x if (io_done || op.cancelled.load(std::memory_order_acquire))
250 {
251 svc_.post(&op);
252 svc_.work_finished();
253 }
254 else
255 {
256 1982x desc_state_.read_op = &op;
257 }
258 1982x return std::noop_coroutine();
259 1982x }
260
261 op.complete(errno, 0);
262 op.impl_ptr = shared_from_this();
263 svc_.post(&op);
264 return std::noop_coroutine();
265 }
266
267 inline void
268 2x select_tcp_acceptor::cancel() noexcept
269 {
270 2x do_cancel();
271 2x }
272
273 inline void
274 254x select_tcp_acceptor::close_socket() noexcept
275 {
276 254x do_close_socket();
277 254x }
278
279 206x inline select_tcp_acceptor_service::select_tcp_acceptor_service(
280 206x capy::execution_context& ctx, select_tcp_service& tcp_svc)
281 206x : tcp_svc_(&tcp_svc)
282 206x , state_(
283 std::make_unique<select_tcp_acceptor_state>(
284 206x ctx.use_service<select_scheduler>()))
285 {
286 206x }
287
288 412x inline select_tcp_acceptor_service::~select_tcp_acceptor_service() {}
289
290 inline void
291 206x select_tcp_acceptor_service::shutdown()
292 {
293 206x std::lock_guard lock(state_->mutex_);
294
295 206x while (auto* impl = state_->impl_list_.pop_front())
296 impl->close_socket();
297
298 // Don't clear impl_ptrs_ here — same rationale as
299 // select_tcp_service::shutdown(). Let ~state_ release ptrs
300 // after scheduler shutdown has drained all queued ops.
301 206x }
302
303 inline io_object::implementation*
304 65x select_tcp_acceptor_service::construct()
305 {
306 65x auto impl = std::make_shared<select_tcp_acceptor>(*this);
307 65x auto* raw = impl.get();
308
309 65x std::lock_guard lock(state_->mutex_);
310 65x state_->impl_ptrs_.emplace(raw, std::move(impl));
311 65x state_->impl_list_.push_back(raw);
312
313 65x return raw;
314 65x }
315
316 inline void
317 65x select_tcp_acceptor_service::destroy(io_object::implementation* impl)
318 {
319 65x auto* select_impl = static_cast<select_tcp_acceptor*>(impl);
320 65x select_impl->close_socket();
321 65x std::lock_guard lock(state_->mutex_);
322 65x state_->impl_list_.remove(select_impl);
323 65x state_->impl_ptrs_.erase(select_impl);
324 65x }
325
326 inline void
327 127x select_tcp_acceptor_service::close(io_object::handle& h)
328 {
329 127x static_cast<select_tcp_acceptor*>(h.get())->close_socket();
330 127x }
331
332 inline std::error_code
333 62x select_tcp_acceptor_service::open_acceptor_socket(
334 tcp_acceptor::implementation& impl, int family, int type, int protocol)
335 {
336 62x auto* select_impl = static_cast<select_tcp_acceptor*>(&impl);
337 62x select_impl->close_socket();
338
339 62x int fd = ::socket(family, type, protocol);
340 62x if (fd < 0)
341 return make_err(errno);
342
343 62x int flags = ::fcntl(fd, F_GETFL, 0);
344 62x if (flags == -1)
345 {
346 int errn = errno;
347 ::close(fd);
348 return make_err(errn);
349 }
350 62x if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
351 {
352 int errn = errno;
353 ::close(fd);
354 return make_err(errn);
355 }
356 62x if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
357 {
358 int errn = errno;
359 ::close(fd);
360 return make_err(errn);
361 }
362
363 62x if (fd >= FD_SETSIZE)
364 {
365 ::close(fd);
366 return make_err(EMFILE);
367 }
368
369 62x if (family == AF_INET6)
370 {
371 8x int val = 0; // dual-stack default
372 8x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
373 }
374
375 #ifdef SO_NOSIGPIPE
376 {
377 int nosig = 1;
378 ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &nosig, sizeof(nosig));
379 }
380 #endif
381
382 62x select_impl->fd_ = fd;
383
384 // Set up descriptor state but do NOT register with reactor yet
385 // (registration happens in do_listen via reactor_acceptor base)
386 62x select_impl->desc_state_.fd = fd;
387 {
388 62x std::lock_guard lock(select_impl->desc_state_.mutex);
389 62x select_impl->desc_state_.read_op = nullptr;
390 62x }
391
392 62x return {};
393 }
394
395 inline std::error_code
396 61x select_tcp_acceptor_service::bind_acceptor(
397 tcp_acceptor::implementation& impl, endpoint ep)
398 {
399 61x return static_cast<select_tcp_acceptor*>(&impl)->do_bind(ep);
400 }
401
402 inline std::error_code
403 58x select_tcp_acceptor_service::listen_acceptor(
404 tcp_acceptor::implementation& impl, int backlog)
405 {
406 58x return static_cast<select_tcp_acceptor*>(&impl)->do_listen(backlog);
407 }
408
409 inline void
410 6x select_tcp_acceptor_service::post(scheduler_op* op)
411 {
412 6x state_->sched_.post(op);
413 6x }
414
415 inline void
416 1982x select_tcp_acceptor_service::work_started() noexcept
417 {
418 1982x state_->sched_.work_started();
419 1982x }
420
421 inline void
422 3x select_tcp_acceptor_service::work_finished() noexcept
423 {
424 3x state_->sched_.work_finished();
425 3x }
426
427 inline select_tcp_service*
428 1982x select_tcp_acceptor_service::tcp_service() const noexcept
429 {
430 1982x return tcp_svc_;
431 }
432
433 } // namespace boost::corosio::detail
434
435 #endif // BOOST_COROSIO_HAS_SELECT
436
437 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
438