include/boost/corosio/native/detail/epoll/epoll_op.hpp

83.2% Lines (84/101) 85.0% List of functions (17/20)
f(x) Functions (20)
Function Calls Lines Branches Blocks
boost::corosio::detail::descriptor_state::add_ready_events(unsigned int) :146 0 100.0% boost::corosio::detail::descriptor_state::destroy() :157 0 100.0% boost::corosio::detail::epoll_op::epoll_op() :192 0 100.0% boost::corosio::detail::epoll_op::reset() :194 0 100.0% boost::corosio::detail::epoll_op::is_read_operation() const :208 0 100.0% boost::corosio::detail::epoll_op::destroy() :214 0 0.0% boost::corosio::detail::epoll_op::request_cancel() :220 0 100.0% boost::corosio::detail::epoll_op::start(std::stop_token const&, boost::corosio::detail::epoll_socket*) :225 0 100.0% boost::corosio::detail::epoll_op::start(std::stop_token const&, boost::corosio::detail::epoll_acceptor*) :236 0 100.0% boost::corosio::detail::epoll_op::complete(int, unsigned long) :247 0 100.0% boost::corosio::detail::epoll_op::perform_io() :253 0 0.0% boost::corosio::detail::epoll_connect_op::reset() :260 0 100.0% boost::corosio::detail::epoll_connect_op::perform_io() :266 0 85.7% boost::corosio::detail::epoll_read_op::is_read_operation() const :288 0 100.0% boost::corosio::detail::epoll_read_op::reset() :293 0 100.0% boost::corosio::detail::epoll_read_op::perform_io() :300 0 100.0% boost::corosio::detail::epoll_write_op::reset() :324 0 100.0% boost::corosio::detail::epoll_write_op::perform_io() :330 0 0.0% boost::corosio::detail::epoll_accept_op::reset() :358 0 100.0% boost::corosio::detail::epoll_accept_op::perform_io() :366 0 90.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/io/io_object.hpp>
19 #include <boost/corosio/endpoint.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <coroutine>
22 #include <boost/capy/error.hpp>
23 #include <system_error>
24
25 #include <boost/corosio/native/detail/make_err.hpp>
26 #include <boost/corosio/detail/dispatch_coro.hpp>
27 #include <boost/corosio/detail/scheduler_op.hpp>
28 #include <boost/corosio/native/detail/endpoint_convert.hpp>
29
30 #include <unistd.h>
31 #include <errno.h>
32
33 #include <atomic>
34 #include <cstddef>
35 #include <memory>
36 #include <mutex>
37 #include <optional>
38 #include <stop_token>
39
40 #include <netinet/in.h>
41 #include <sys/socket.h>
42 #include <sys/uio.h>
43
44 /*
45 epoll Operation State
46 =====================
47
48 Each async I/O operation has a corresponding epoll_op-derived struct that
49 holds the operation's state while it's in flight. The socket impl owns
50 fixed slots for each operation type (conn_, rd_, wr_), so only one
51 operation of each type can be pending per socket at a time.
52
53 Persistent Registration
54 -----------------------
55 File descriptors are registered with epoll once (via descriptor_state) and
56 stay registered until closed. The descriptor_state tracks which operations
57 are pending (read_op, write_op, connect_op). When an event arrives, the
58 reactor dispatches to the appropriate pending operation.
59
60 Impl Lifetime Management
61 ------------------------
62 When cancel() posts an op to the scheduler's ready queue, the socket impl
63 might be destroyed before the scheduler processes the op. The `impl_ptr`
64 member holds a shared_ptr to the impl, keeping it alive until the op
65 completes. This is set by cancel() and cleared in operator() after the
66 coroutine is resumed.
67
68 EOF Detection
69 -------------
70 For reads, 0 bytes with no error means EOF. But an empty user buffer also
71 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
72
73 SIGPIPE Prevention
74 ------------------
75 Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
76 SIGPIPE when the peer has closed.
77 */
78
79 namespace boost::corosio::detail {
80
81 // Forward declarations
82 class epoll_socket;
83 class epoll_acceptor;
84 struct epoll_op;
85
86 // Forward declaration
87 class epoll_scheduler;
88
89 /** Per-descriptor state for persistent epoll registration.
90
91 Tracks pending operations for a file descriptor. The fd is registered
92 once with epoll and stays registered until closed.
93
94 This struct extends scheduler_op to support deferred I/O processing.
95 When epoll events arrive, the reactor sets ready_events and queues
96 this descriptor for processing. When popped from the scheduler queue,
97 operator() performs the actual I/O and queues completion handlers.
98
99 @par Deferred I/O Model
100 The reactor no longer performs I/O directly. Instead:
101 1. Reactor sets ready_events and queues descriptor_state
102 2. Scheduler pops descriptor_state and calls operator()
103 3. operator() performs I/O under mutex and queues completions
104
105 This eliminates per-descriptor mutex locking from the reactor hot path.
106
107 @par Thread Safety
108 The mutex protects operation pointers and ready flags during I/O.
109 ready_events_ and is_enqueued_ are atomic for lock-free reactor access.
110 */
111 struct descriptor_state final : scheduler_op
112 {
113 std::mutex mutex;
114
115 // Protected by mutex
116 epoll_op* read_op = nullptr;
117 epoll_op* write_op = nullptr;
118 epoll_op* connect_op = nullptr;
119
120 // Caches edge events that arrived before an op was registered
121 bool read_ready = false;
122 bool write_ready = false;
123
124 // Deferred cancellation: set by cancel() when the target op is not
125 // parked (e.g. completing inline via speculative I/O). Checked when
126 // the next op parks; if set, the op is immediately self-cancelled.
127 // This matches IOCP semantics where CancelIoEx always succeeds.
128 bool read_cancel_pending = false;
129 bool write_cancel_pending = false;
130 bool connect_cancel_pending = false;
131
132 // Set during registration only (no mutex needed)
133 std::uint32_t registered_events = 0;
134 int fd = -1;
135
136 // For deferred I/O - set by reactor, read by scheduler
137 std::atomic<std::uint32_t> ready_events_{0};
138 std::atomic<bool> is_enqueued_{false};
139 epoll_scheduler const* scheduler_ = nullptr;
140
141 // Prevents impl destruction while this descriptor_state is queued.
142 // Set by close_socket() when is_enqueued_ is true, cleared by operator().
143 std::shared_ptr<void> impl_ref_;
144
145 /// Add ready events atomically.
146 43932x void add_ready_events(std::uint32_t ev) noexcept
147 {
148 43932x ready_events_.fetch_or(ev, std::memory_order_relaxed);
149 43932x }
150
151 /// Perform deferred I/O and queue completions.
152 void operator()() override;
153
154 /// Destroy without invoking.
155 /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
156 /// the self-referential cycle set by close_socket().
157 30x void destroy() override
158 {
159 30x impl_ref_.reset();
160 30x }
161 };
162
163 struct epoll_op : scheduler_op
164 {
165 struct canceller
166 {
167 epoll_op* op;
168 void operator()() const noexcept;
169 };
170
171 std::coroutine_handle<> h;
172 capy::executor_ref ex;
173 std::error_code* ec_out = nullptr;
174 std::size_t* bytes_out = nullptr;
175
176 int fd = -1;
177 int errn = 0;
178 std::size_t bytes_transferred = 0;
179
180 std::atomic<bool> cancelled{false};
181 std::optional<std::stop_callback<canceller>> stop_cb;
182
183 // Prevents use-after-free when socket is closed with pending ops.
184 // See "Impl Lifetime Management" in file header.
185 std::shared_ptr<void> impl_ptr;
186
187 // For stop_token cancellation - pointer to owning socket/acceptor impl.
188 // When stop is requested, we call back to the impl to perform actual I/O cancellation.
189 epoll_socket* socket_impl_ = nullptr;
190 epoll_acceptor* acceptor_impl_ = nullptr;
191
192 42479x epoll_op() = default;
193
194 262458x void reset() noexcept
195 {
196 262458x fd = -1;
197 262458x errn = 0;
198 262458x bytes_transferred = 0;
199 262458x cancelled.store(false, std::memory_order_relaxed);
200 262458x impl_ptr.reset();
201 262458x socket_impl_ = nullptr;
202 262458x acceptor_impl_ = nullptr;
203 262458x }
204
205 // Defined in sockets.cpp where epoll_socket is complete
206 void operator()() override;
207
208 25256x virtual bool is_read_operation() const noexcept
209 {
210 25256x return false;
211 }
212 virtual void cancel() noexcept = 0;
213
214 void destroy() override
215 {
216 stop_cb.reset();
217 impl_ptr.reset();
218 }
219
220 128089x void request_cancel() noexcept
221 {
222 128089x cancelled.store(true, std::memory_order_release);
223 128089x }
224
225 55394x void start(std::stop_token const& token, epoll_socket* impl)
226 {
227 55394x cancelled.store(false, std::memory_order_release);
228 55394x stop_cb.reset();
229 55394x socket_impl_ = impl;
230 55394x acceptor_impl_ = nullptr;
231
232 55394x if (token.stop_possible())
233 99x stop_cb.emplace(token, canceller{this});
234 55394x }
235
236 4699x void start(std::stop_token const& token, epoll_acceptor* impl)
237 {
238 4699x cancelled.store(false, std::memory_order_release);
239 4699x stop_cb.reset();
240 4699x socket_impl_ = nullptr;
241 4699x acceptor_impl_ = impl;
242
243 4699x if (token.stop_possible())
244 9x stop_cb.emplace(token, canceller{this});
245 4699x }
246
247 60029x void complete(int err, std::size_t bytes) noexcept
248 {
249 60029x errn = err;
250 60029x bytes_transferred = bytes;
251 60029x }
252
253 virtual void perform_io() noexcept {}
254 };
255
256 struct epoll_connect_op final : epoll_op
257 {
258 endpoint target_endpoint;
259
260 4692x void reset() noexcept
261 {
262 4692x epoll_op::reset();
263 4692x target_endpoint = endpoint{};
264 4692x }
265
266 4691x void perform_io() noexcept override
267 {
268 // connect() completion status is retrieved via SO_ERROR, not return value
269 4691x int err = 0;
270 4691x socklen_t len = sizeof(err);
271 4691x if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
272 err = errno;
273 4691x complete(err, 0);
274 4691x }
275
276 // Defined in sockets.cpp where epoll_socket is complete
277 void operator()() override;
278 void cancel() noexcept override;
279 };
280
281 struct epoll_read_op final : epoll_op
282 {
283 static constexpr std::size_t max_buffers = 16;
284 iovec iovecs[max_buffers];
285 int iovec_count = 0;
286 bool empty_buffer_read = false;
287
288 25242x bool is_read_operation() const noexcept override
289 {
290 25242x return !empty_buffer_read;
291 }
292
293 126633x void reset() noexcept
294 {
295 126633x epoll_op::reset();
296 126633x iovec_count = 0;
297 126633x empty_buffer_read = false;
298 126633x }
299
300 145x void perform_io() noexcept override
301 {
302 ssize_t n;
303 do
304 {
305 145x n = ::readv(fd, iovecs, iovec_count);
306 }
307 145x while (n < 0 && errno == EINTR);
308
309 145x if (n >= 0)
310 4x complete(0, static_cast<std::size_t>(n));
311 else
312 141x complete(errno, 0);
313 145x }
314
315 void cancel() noexcept override;
316 };
317
318 struct epoll_write_op final : epoll_op
319 {
320 static constexpr std::size_t max_buffers = 16;
321 iovec iovecs[max_buffers];
322 int iovec_count = 0;
323
324 126434x void reset() noexcept
325 {
326 126434x epoll_op::reset();
327 126434x iovec_count = 0;
328 126434x }
329
330 void perform_io() noexcept override
331 {
332 msghdr msg{};
333 msg.msg_iov = iovecs;
334 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
335
336 ssize_t n;
337 do
338 {
339 n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
340 }
341 while (n < 0 && errno == EINTR);
342
343 if (n >= 0)
344 complete(0, static_cast<std::size_t>(n));
345 else
346 complete(errno, 0);
347 }
348
349 void cancel() noexcept override;
350 };
351
352 struct epoll_accept_op final : epoll_op
353 {
354 int accepted_fd = -1;
355 io_object::implementation** impl_out = nullptr;
356 sockaddr_storage peer_storage{};
357
358 4699x void reset() noexcept
359 {
360 4699x epoll_op::reset();
361 4699x accepted_fd = -1;
362 4699x impl_out = nullptr;
363 4699x peer_storage = {};
364 4699x }
365
366 4688x void perform_io() noexcept override
367 {
368 4688x socklen_t addrlen = sizeof(peer_storage);
369 int new_fd;
370 do
371 {
372 9376x new_fd = ::accept4(
373 4688x fd, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
374 SOCK_NONBLOCK | SOCK_CLOEXEC);
375 }
376 4688x while (new_fd < 0 && errno == EINTR);
377
378 4688x if (new_fd >= 0)
379 {
380 4688x accepted_fd = new_fd;
381 4688x complete(0, 0);
382 }
383 else
384 {
385 complete(errno, 0);
386 }
387 4688x }
388
389 // Defined in acceptors.cpp where epoll_acceptor is complete
390 void operator()() override;
391 void cancel() noexcept override;
392 };
393
394 } // namespace boost::corosio::detail
395
396 #endif // BOOST_COROSIO_HAS_EPOLL
397
398 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
399