TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/corosio/detail/tcp_service.hpp>
19 :
20 : #include <boost/corosio/native/detail/select/select_tcp_socket.hpp>
21 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
22 : #include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
23 :
24 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
25 :
26 : #include <coroutine>
27 : #include <mutex>
28 :
29 : #include <errno.h>
30 : #include <fcntl.h>
31 : #include <netinet/in.h>
32 : #include <netinet/tcp.h>
33 : #include <sys/select.h>
34 : #include <sys/socket.h>
35 : #include <unistd.h>
36 :
37 : /*
38 : Each I/O op tries the syscall speculatively; only registers with
39 : the reactor on EAGAIN. Fd is registered once at open time and
40 : stays registered until close. The reactor only marks ready_events_;
41 : actual I/O happens in invoke_deferred_io(). cancel() captures
42 : shared_from_this() into op.impl_ptr to keep the impl alive.
43 : */
44 :
45 : namespace boost::corosio::detail {
46 :
47 : /** select TCP service implementation.
48 :
49 : Inherits from tcp_service to enable runtime polymorphism.
50 : Uses key_type = tcp_service for service lookup.
51 : */
52 : class BOOST_COROSIO_DECL select_tcp_service final
53 : : public reactor_socket_service<
54 : select_tcp_service,
55 : tcp_service,
56 : select_scheduler,
57 : select_tcp_socket>
58 : {
59 : public:
60 HIT 206 : explicit select_tcp_service(capy::execution_context& ctx)
61 206 : : reactor_socket_service(ctx)
62 : {
63 206 : }
64 :
65 : std::error_code open_socket(
66 : tcp_socket::implementation& impl,
67 : int family,
68 : int type,
69 : int protocol) override;
70 :
71 : std::error_code
72 : bind_socket(tcp_socket::implementation& impl, endpoint ep) override;
73 : };
74 :
75 : inline void
76 MIS 0 : select_connect_op::cancel() noexcept
77 : {
78 0 : if (socket_impl_)
79 0 : socket_impl_->cancel_single_op(*this);
80 : else
81 0 : request_cancel();
82 0 : }
83 :
84 : inline void
85 HIT 94 : select_read_op::cancel() noexcept
86 : {
87 94 : if (socket_impl_)
88 94 : socket_impl_->cancel_single_op(*this);
89 : else
90 MIS 0 : request_cancel();
91 HIT 94 : }
92 :
93 : inline void
94 MIS 0 : select_write_op::cancel() noexcept
95 : {
96 0 : if (socket_impl_)
97 0 : socket_impl_->cancel_single_op(*this);
98 : else
99 0 : request_cancel();
100 0 : }
101 :
102 : inline void
103 HIT 43753 : select_op::operator()()
104 : {
105 43753 : complete_io_op(*this);
106 43753 : }
107 :
108 : inline void
109 1984 : select_connect_op::operator()()
110 : {
111 1984 : complete_connect_op(*this);
112 1984 : }
113 :
114 5978 : inline select_tcp_socket::select_tcp_socket(select_tcp_service& svc) noexcept
115 5978 : : reactor_stream_socket(svc)
116 : {
117 5978 : }
118 :
119 5978 : inline select_tcp_socket::~select_tcp_socket() = default;
120 :
121 : inline std::coroutine_handle<>
122 1984 : select_tcp_socket::connect(
123 : std::coroutine_handle<> h,
124 : capy::executor_ref ex,
125 : endpoint ep,
126 : std::stop_token token,
127 : std::error_code* ec)
128 : {
129 1984 : auto result = do_connect(h, ex, ep, token, ec);
130 : // Rebuild fd_sets so select() watches for writability
131 1984 : if (result == std::noop_coroutine())
132 1984 : svc_.scheduler().notify_reactor();
133 1984 : return result;
134 : }
135 :
136 : inline std::coroutine_handle<>
137 109136 : select_tcp_socket::read_some(
138 : std::coroutine_handle<> h,
139 : capy::executor_ref ex,
140 : buffer_param param,
141 : std::stop_token token,
142 : std::error_code* ec,
143 : std::size_t* bytes_out)
144 : {
145 109136 : return do_read_some(h, ex, param, token, ec, bytes_out);
146 : }
147 :
148 : inline std::coroutine_handle<>
149 108990 : select_tcp_socket::write_some(
150 : std::coroutine_handle<> h,
151 : capy::executor_ref ex,
152 : buffer_param param,
153 : std::stop_token token,
154 : std::error_code* ec,
155 : std::size_t* bytes_out)
156 : {
157 108990 : auto result = do_write_some(h, ex, param, token, ec, bytes_out);
158 : // Rebuild fd_sets so select() watches for writability
159 108990 : if (result == std::noop_coroutine())
160 21791 : svc_.scheduler().notify_reactor();
161 108990 : return result;
162 : }
163 :
164 : inline void
165 90 : select_tcp_socket::cancel() noexcept
166 : {
167 90 : do_cancel();
168 90 : }
169 :
170 : inline void
171 17946 : select_tcp_socket::close_socket() noexcept
172 : {
173 17946 : do_close_socket();
174 17946 : }
175 :
176 : inline std::error_code
177 2004 : select_tcp_service::open_socket(
178 : tcp_socket::implementation& impl, int family, int type, int protocol)
179 : {
180 2004 : auto* select_impl = static_cast<select_tcp_socket*>(&impl);
181 2004 : select_impl->close_socket();
182 :
183 2004 : int fd = ::socket(family, type, protocol);
184 2004 : if (fd < 0)
185 MIS 0 : return make_err(errno);
186 :
187 HIT 2004 : if (family == AF_INET6)
188 : {
189 6 : int one = 1;
190 6 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
191 : }
192 :
193 2004 : int flags = ::fcntl(fd, F_GETFL, 0);
194 2004 : if (flags == -1)
195 : {
196 MIS 0 : int errn = errno;
197 0 : ::close(fd);
198 0 : return make_err(errn);
199 : }
200 HIT 2004 : if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
201 : {
202 MIS 0 : int errn = errno;
203 0 : ::close(fd);
204 0 : return make_err(errn);
205 : }
206 HIT 2004 : if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
207 : {
208 MIS 0 : int errn = errno;
209 0 : ::close(fd);
210 0 : return make_err(errn);
211 : }
212 :
213 HIT 2004 : if (fd >= FD_SETSIZE)
214 : {
215 MIS 0 : ::close(fd);
216 0 : return make_err(EMFILE);
217 : }
218 :
219 : #ifdef SO_NOSIGPIPE
220 : {
221 : int one = 1;
222 : ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
223 : }
224 : #endif
225 :
226 HIT 2004 : select_impl->fd_ = fd;
227 :
228 2004 : select_impl->desc_state_.fd = fd;
229 : {
230 2004 : std::lock_guard lock(select_impl->desc_state_.mutex);
231 2004 : select_impl->desc_state_.read_op = nullptr;
232 2004 : select_impl->desc_state_.write_op = nullptr;
233 2004 : select_impl->desc_state_.connect_op = nullptr;
234 2004 : }
235 2004 : scheduler().register_descriptor(fd, &select_impl->desc_state_);
236 :
237 2004 : return {};
238 : }
239 :
240 : inline std::error_code
241 6 : select_tcp_service::bind_socket(
242 : tcp_socket::implementation& impl, endpoint ep)
243 : {
244 6 : return static_cast<select_tcp_socket*>(&impl)->do_bind(ep);
245 : }
246 :
247 : } // namespace boost::corosio::detail
248 :
249 : #endif // BOOST_COROSIO_HAS_SELECT
250 :
251 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_SERVICE_HPP
|