1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
// Copyright (c) 2026 Steve Gerbino
3  
// Copyright (c) 2026 Steve Gerbino
4  
//
4  
//
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7  
//
7  
//
8  
// Official repository: https://github.com/cppalliance/corosio
8  
// Official repository: https://github.com/cppalliance/corosio
9  
//
9  
//
10  

10  

11  
#ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
11  
#ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12  
#define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12  
#define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13  

13  

14  
#include <boost/corosio/timer.hpp>
14  
#include <boost/corosio/timer.hpp>
15  
#include <boost/corosio/io_context.hpp>
15  
#include <boost/corosio/io_context.hpp>
16  
#include <boost/corosio/detail/scheduler_op.hpp>
16  
#include <boost/corosio/detail/scheduler_op.hpp>
17  
#include <boost/corosio/detail/intrusive.hpp>
17  
#include <boost/corosio/detail/intrusive.hpp>
18  
#include <boost/corosio/detail/thread_local_ptr.hpp>
18  
#include <boost/corosio/detail/thread_local_ptr.hpp>
19  
#include <boost/capy/error.hpp>
19  
#include <boost/capy/error.hpp>
20  
#include <boost/capy/ex/execution_context.hpp>
20  
#include <boost/capy/ex/execution_context.hpp>
21  
#include <boost/capy/ex/executor_ref.hpp>
21  
#include <boost/capy/ex/executor_ref.hpp>
22  
#include <system_error>
22  
#include <system_error>
23  

23  

24  
#include <atomic>
24  
#include <atomic>
25  
#include <chrono>
25  
#include <chrono>
26  
#include <coroutine>
26  
#include <coroutine>
27  
#include <cstddef>
27  
#include <cstddef>
28  
#include <limits>
28  
#include <limits>
29  
#include <mutex>
29  
#include <mutex>
30  
#include <optional>
30  
#include <optional>
31  
#include <stop_token>
31  
#include <stop_token>
32  
#include <utility>
32  
#include <utility>
33  
#include <vector>
33  
#include <vector>
34  

34  

35  
namespace boost::corosio::detail {
35  
namespace boost::corosio::detail {
36  

36  

37  
struct scheduler;
37  
struct scheduler;
38  

38  

39  
/*
39  
/*
40  
    Timer Service
40  
    Timer Service
41  
    =============
41  
    =============
42  

42  

43  
    Data Structures
43  
    Data Structures
44  
    ---------------
44  
    ---------------
45  
    waiter_node holds per-waiter state: coroutine handle, executor,
45  
    waiter_node holds per-waiter state: coroutine handle, executor,
46  
    error output, stop_token, embedded completion_op. Each concurrent
46  
    error output, stop_token, embedded completion_op. Each concurrent
47  
    co_await t.wait() allocates one waiter_node.
47  
    co_await t.wait() allocates one waiter_node.
48  

48  

49  
    timer_service::implementation holds per-timer state: expiry,
49  
    timer_service::implementation holds per-timer state: expiry,
50  
    heap index, and an intrusive_list of waiter_nodes. Multiple
50  
    heap index, and an intrusive_list of waiter_nodes. Multiple
51  
    coroutines can wait on the same timer simultaneously.
51  
    coroutines can wait on the same timer simultaneously.
52  

52  

53  
    timer_service owns a min-heap of active timers, a free list
53  
    timer_service owns a min-heap of active timers, a free list
54  
    of recycled impls, and a free list of recycled waiter_nodes. The
54  
    of recycled impls, and a free list of recycled waiter_nodes. The
55  
    heap is ordered by expiry time; the scheduler queries
55  
    heap is ordered by expiry time; the scheduler queries
56  
    nearest_expiry() to set the epoll/timerfd timeout.
56  
    nearest_expiry() to set the epoll/timerfd timeout.
57  

57  

58  
    Optimization Strategy
58  
    Optimization Strategy
59  
    ---------------------
59  
    ---------------------
60  
    1. Deferred heap insertion — expires_after() stores the expiry
60  
    1. Deferred heap insertion — expires_after() stores the expiry
61  
       but does not insert into the heap. Insertion happens in wait().
61  
       but does not insert into the heap. Insertion happens in wait().
62  
    2. Thread-local impl cache — single-slot per-thread cache.
62  
    2. Thread-local impl cache — single-slot per-thread cache.
63  
    3. Embedded completion_op — eliminates heap allocation per fire/cancel.
63  
    3. Embedded completion_op — eliminates heap allocation per fire/cancel.
64  
    4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
64  
    4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
65  
    5. might_have_pending_waits_ flag — skips lock when no wait issued.
65  
    5. might_have_pending_waits_ flag — skips lock when no wait issued.
66  
    6. Thread-local waiter cache — single-slot per-thread cache.
66  
    6. Thread-local waiter cache — single-slot per-thread cache.
67  

67  

68  
    Concurrency
68  
    Concurrency
69  
    -----------
69  
    -----------
70  
    stop_token callbacks can fire from any thread. The impl_
70  
    stop_token callbacks can fire from any thread. The impl_
71  
    pointer on waiter_node is used as a "still in list" marker.
71  
    pointer on waiter_node is used as a "still in list" marker.
72  
*/
72  
*/
73  

73  

74  
struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
74  
struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
75  

75  

76  
inline void timer_service_invalidate_cache() noexcept;
76  
inline void timer_service_invalidate_cache() noexcept;
77  

77  

78  
// timer_service class body — member function definitions are
78  
// timer_service class body — member function definitions are
79  
// out-of-class (after implementation and waiter_node are complete)
79  
// out-of-class (after implementation and waiter_node are complete)
80  
class BOOST_COROSIO_DECL timer_service final
80  
class BOOST_COROSIO_DECL timer_service final
81  
    : public capy::execution_context::service
81  
    : public capy::execution_context::service
82  
    , public io_object::io_service
82  
    , public io_object::io_service
83  
{
83  
{
84  
public:
84  
public:
85  
    using clock_type = std::chrono::steady_clock;
85  
    using clock_type = std::chrono::steady_clock;
86  
    using time_point = clock_type::time_point;
86  
    using time_point = clock_type::time_point;
87  

87  

88  
    /// Type-erased callback for earliest-expiry-changed notifications.
88  
    /// Type-erased callback for earliest-expiry-changed notifications.
89  
    class callback
89  
    class callback
90  
    {
90  
    {
91  
        void* ctx_         = nullptr;
91  
        void* ctx_         = nullptr;
92  
        void (*fn_)(void*) = nullptr;
92  
        void (*fn_)(void*) = nullptr;
93  

93  

94  
    public:
94  
    public:
95  
        /// Construct an empty callback.
95  
        /// Construct an empty callback.
96  
        callback() = default;
96  
        callback() = default;
97  

97  

98  
        /// Construct a callback with the given context and function.
98  
        /// Construct a callback with the given context and function.
99  
        callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
99  
        callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
100  

100  

101  
        /// Return true if the callback is non-empty.
101  
        /// Return true if the callback is non-empty.
102  
        explicit operator bool() const noexcept
102  
        explicit operator bool() const noexcept
103  
        {
103  
        {
104  
            return fn_ != nullptr;
104  
            return fn_ != nullptr;
105  
        }
105  
        }
106  

106  

107  
        /// Invoke the callback.
107  
        /// Invoke the callback.
108  
        void operator()() const
108  
        void operator()() const
109  
        {
109  
        {
110  
            if (fn_)
110  
            if (fn_)
111  
                fn_(ctx_);
111  
                fn_(ctx_);
112  
        }
112  
        }
113  
    };
113  
    };
114  

114  

115  
    struct implementation;
115  
    struct implementation;
116  

116  

117  
private:
117  
private:
118  
    struct heap_entry
118  
    struct heap_entry
119  
    {
119  
    {
120  
        time_point time_;
120  
        time_point time_;
121  
        implementation* timer_;
121  
        implementation* timer_;
122  
    };
122  
    };
123  

123  

124  
    scheduler* sched_ = nullptr;
124  
    scheduler* sched_ = nullptr;
125  
    mutable std::mutex mutex_;
125  
    mutable std::mutex mutex_;
126  
    std::vector<heap_entry> heap_;
126  
    std::vector<heap_entry> heap_;
127  
    implementation* free_list_     = nullptr;
127  
    implementation* free_list_     = nullptr;
128  
    waiter_node* waiter_free_list_ = nullptr;
128  
    waiter_node* waiter_free_list_ = nullptr;
129  
    callback on_earliest_changed_;
129  
    callback on_earliest_changed_;
130  
    bool shutting_down_ = false;
130  
    bool shutting_down_ = false;
131  
    // Avoids mutex in nearest_expiry() and empty()
131  
    // Avoids mutex in nearest_expiry() and empty()
132  
    mutable std::atomic<std::int64_t> cached_nearest_ns_{
132  
    mutable std::atomic<std::int64_t> cached_nearest_ns_{
133  
        (std::numeric_limits<std::int64_t>::max)()};
133  
        (std::numeric_limits<std::int64_t>::max)()};
134  

134  

135  
public:
135  
public:
136  
    /// Construct the timer service bound to a scheduler.
136  
    /// Construct the timer service bound to a scheduler.
137  
    inline timer_service(capy::execution_context&, scheduler& sched)
137  
    inline timer_service(capy::execution_context&, scheduler& sched)
138  
        : sched_(&sched)
138  
        : sched_(&sched)
139  
    {
139  
    {
140  
    }
140  
    }
141  

141  

142  
    /// Return the associated scheduler.
142  
    /// Return the associated scheduler.
143  
    inline scheduler& get_scheduler() noexcept
143  
    inline scheduler& get_scheduler() noexcept
144  
    {
144  
    {
145  
        return *sched_;
145  
        return *sched_;
146  
    }
146  
    }
147  

147  

148  
    /// Destroy the timer service.
148  
    /// Destroy the timer service.
149  
    ~timer_service() override = default;
149  
    ~timer_service() override = default;
150  

150  

151  
    timer_service(timer_service const&)            = delete;
151  
    timer_service(timer_service const&)            = delete;
152  
    timer_service& operator=(timer_service const&) = delete;
152  
    timer_service& operator=(timer_service const&) = delete;
153  

153  

154  
    /// Register a callback invoked when the earliest expiry changes.
154  
    /// Register a callback invoked when the earliest expiry changes.
155  
    inline void set_on_earliest_changed(callback cb)
155  
    inline void set_on_earliest_changed(callback cb)
156  
    {
156  
    {
157  
        on_earliest_changed_ = cb;
157  
        on_earliest_changed_ = cb;
158  
    }
158  
    }
159  

159  

160  
    /// Return true if no timers are in the heap.
160  
    /// Return true if no timers are in the heap.
161  
    inline bool empty() const noexcept
161  
    inline bool empty() const noexcept
162  
    {
162  
    {
163  
        return cached_nearest_ns_.load(std::memory_order_acquire) ==
163  
        return cached_nearest_ns_.load(std::memory_order_acquire) ==
164  
            (std::numeric_limits<std::int64_t>::max)();
164  
            (std::numeric_limits<std::int64_t>::max)();
165  
    }
165  
    }
166  

166  

167  
    /// Return the nearest timer expiry without acquiring the mutex.
167  
    /// Return the nearest timer expiry without acquiring the mutex.
168  
    inline time_point nearest_expiry() const noexcept
168  
    inline time_point nearest_expiry() const noexcept
169  
    {
169  
    {
170  
        auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
170  
        auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
171  
        return time_point(time_point::duration(ns));
171  
        return time_point(time_point::duration(ns));
172  
    }
172  
    }
173  

173  

174  
    /// Cancel all pending timers and free cached resources.
174  
    /// Cancel all pending timers and free cached resources.
175  
    inline void shutdown() override;
175  
    inline void shutdown() override;
176  

176  

177  
    /// Construct a new timer implementation.
177  
    /// Construct a new timer implementation.
178  
    inline io_object::implementation* construct() override;
178  
    inline io_object::implementation* construct() override;
179  

179  

180  
    /// Destroy a timer implementation, cancelling pending waiters.
180  
    /// Destroy a timer implementation, cancelling pending waiters.
181  
    inline void destroy(io_object::implementation* p) override;
181  
    inline void destroy(io_object::implementation* p) override;
182  

182  

183  
    /// Cancel and recycle a timer implementation.
183  
    /// Cancel and recycle a timer implementation.
184  
    inline void destroy_impl(implementation& impl);
184  
    inline void destroy_impl(implementation& impl);
185  

185  

186  
    /// Create or recycle a waiter node.
186  
    /// Create or recycle a waiter node.
187  
    inline waiter_node* create_waiter();
187  
    inline waiter_node* create_waiter();
188  

188  

189  
    /// Return a waiter node to the cache or free list.
189  
    /// Return a waiter node to the cache or free list.
190  
    inline void destroy_waiter(waiter_node* w);
190  
    inline void destroy_waiter(waiter_node* w);
191  

191  

192  
    /// Update the timer expiry, cancelling existing waiters.
192  
    /// Update the timer expiry, cancelling existing waiters.
193  
    inline std::size_t update_timer(implementation& impl, time_point new_time);
193  
    inline std::size_t update_timer(implementation& impl, time_point new_time);
194  

194  

195  
    /// Insert a waiter into the timer's waiter list and the heap.
195  
    /// Insert a waiter into the timer's waiter list and the heap.
196  
    inline void insert_waiter(implementation& impl, waiter_node* w);
196  
    inline void insert_waiter(implementation& impl, waiter_node* w);
197  

197  

198  
    /// Cancel all waiters on a timer.
198  
    /// Cancel all waiters on a timer.
199  
    inline std::size_t cancel_timer(implementation& impl);
199  
    inline std::size_t cancel_timer(implementation& impl);
200  

200  

201  
    /// Cancel a single waiter ( stop_token callback path ).
201  
    /// Cancel a single waiter ( stop_token callback path ).
202  
    inline void cancel_waiter(waiter_node* w);
202  
    inline void cancel_waiter(waiter_node* w);
203  

203  

204  
    /// Cancel one waiter on a timer.
204  
    /// Cancel one waiter on a timer.
205  
    inline std::size_t cancel_one_waiter(implementation& impl);
205  
    inline std::size_t cancel_one_waiter(implementation& impl);
206  

206  

207  
    /// Complete all waiters whose timers have expired.
207  
    /// Complete all waiters whose timers have expired.
208  
    inline std::size_t process_expired();
208  
    inline std::size_t process_expired();
209  

209  

210  
private:
210  
private:
211  
    inline void refresh_cached_nearest() noexcept
211  
    inline void refresh_cached_nearest() noexcept
212  
    {
212  
    {
213  
        auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
213  
        auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
214  
                                : heap_[0].time_.time_since_epoch().count();
214  
                                : heap_[0].time_.time_since_epoch().count();
215  
        cached_nearest_ns_.store(ns, std::memory_order_release);
215  
        cached_nearest_ns_.store(ns, std::memory_order_release);
216  
    }
216  
    }
217  

217  

218  
    inline void remove_timer_impl(implementation& impl);
218  
    inline void remove_timer_impl(implementation& impl);
219  
    inline void up_heap(std::size_t index);
219  
    inline void up_heap(std::size_t index);
220  
    inline void down_heap(std::size_t index);
220  
    inline void down_heap(std::size_t index);
221  
    inline void swap_heap(std::size_t i1, std::size_t i2);
221  
    inline void swap_heap(std::size_t i1, std::size_t i2);
222  
};
222  
};
223  

223  

224  
struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
224  
struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
225  
    : intrusive_list<waiter_node>::node
225  
    : intrusive_list<waiter_node>::node
226  
{
226  
{
227  
    // Embedded completion op — avoids heap allocation per fire/cancel
227  
    // Embedded completion op — avoids heap allocation per fire/cancel
228  
    struct completion_op final : scheduler_op
228  
    struct completion_op final : scheduler_op
229  
    {
229  
    {
230  
        waiter_node* waiter_ = nullptr;
230  
        waiter_node* waiter_ = nullptr;
231  

231  

232  
        static void do_complete(
232  
        static void do_complete(
233  
            void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
233  
            void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
234  

234  

235  
        completion_op() noexcept : scheduler_op(&do_complete) {}
235  
        completion_op() noexcept : scheduler_op(&do_complete) {}
236  

236  

237  
        void operator()() override;
237  
        void operator()() override;
238  
        void destroy() override;
238  
        void destroy() override;
239  
    };
239  
    };
240  

240  

241  
    // Per-waiter stop_token cancellation
241  
    // Per-waiter stop_token cancellation
242  
    struct canceller
242  
    struct canceller
243  
    {
243  
    {
244  
        waiter_node* waiter_;
244  
        waiter_node* waiter_;
245  
        void operator()() const;
245  
        void operator()() const;
246  
    };
246  
    };
247  

247  

248  
    // nullptr once removed from timer's waiter list (concurrency marker)
248  
    // nullptr once removed from timer's waiter list (concurrency marker)
249  
    timer_service::implementation* impl_ = nullptr;
249  
    timer_service::implementation* impl_ = nullptr;
250  
    timer_service* svc_                  = nullptr;
250  
    timer_service* svc_                  = nullptr;
251  
    std::coroutine_handle<> h_;
251  
    std::coroutine_handle<> h_;
252  
    capy::continuation* cont_            = nullptr;
252  
    capy::continuation* cont_            = nullptr;
253  
    capy::executor_ref d_;
253  
    capy::executor_ref d_;
254  
    std::error_code* ec_out_ = nullptr;
254  
    std::error_code* ec_out_ = nullptr;
255  
    std::stop_token token_;
255  
    std::stop_token token_;
256  
    std::optional<std::stop_callback<canceller>> stop_cb_;
256  
    std::optional<std::stop_callback<canceller>> stop_cb_;
257  
    completion_op op_;
257  
    completion_op op_;
258  
    std::error_code ec_value_;
258  
    std::error_code ec_value_;
259  
    waiter_node* next_free_ = nullptr;
259  
    waiter_node* next_free_ = nullptr;
260  

260  

261  
    waiter_node() noexcept
261  
    waiter_node() noexcept
262  
    {
262  
    {
263  
        op_.waiter_ = this;
263  
        op_.waiter_ = this;
264  
    }
264  
    }
265  
};
265  
};
266  

266  

267  
struct timer_service::implementation final : timer::implementation
267  
struct timer_service::implementation final : timer::implementation
268  
{
268  
{
269  
    using clock_type = std::chrono::steady_clock;
269  
    using clock_type = std::chrono::steady_clock;
270  
    using time_point = clock_type::time_point;
270  
    using time_point = clock_type::time_point;
271  
    using duration   = clock_type::duration;
271  
    using duration   = clock_type::duration;
272  

272  

273  
    timer_service* svc_ = nullptr;
273  
    timer_service* svc_ = nullptr;
274  
    intrusive_list<waiter_node> waiters_;
274  
    intrusive_list<waiter_node> waiters_;
275  

275  

276  
    // Free list linkage (reused when impl is on free_list)
276  
    // Free list linkage (reused when impl is on free_list)
277  
    implementation* next_free_ = nullptr;
277  
    implementation* next_free_ = nullptr;
278  

278  

279  
    inline explicit implementation(timer_service& svc) noexcept;
279  
    inline explicit implementation(timer_service& svc) noexcept;
280  

280  

281  
    inline std::coroutine_handle<> wait(
281  
    inline std::coroutine_handle<> wait(
282  
        std::coroutine_handle<>,
282  
        std::coroutine_handle<>,
283  
        capy::executor_ref,
283  
        capy::executor_ref,
284  
        std::stop_token,
284  
        std::stop_token,
285  
        std::error_code*,
285  
        std::error_code*,
286  
        capy::continuation*) override;
286  
        capy::continuation*) override;
287  
};
287  
};
288  

288  

289  
// Thread-local caches avoid hot-path mutex acquisitions:
289  
// Thread-local caches avoid hot-path mutex acquisitions:
290  
// 1. Impl cache — single-slot, validated by comparing svc_
290  
// 1. Impl cache — single-slot, validated by comparing svc_
291  
// 2. Waiter cache — single-slot, no service affinity
291  
// 2. Waiter cache — single-slot, no service affinity
292  
// All caches are cleared by timer_service_invalidate_cache() during shutdown.
292  
// All caches are cleared by timer_service_invalidate_cache() during shutdown.
293  

293  

294  
inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
294  
inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
295  
inline thread_local_ptr<waiter_node> tl_cached_waiter;
295  
inline thread_local_ptr<waiter_node> tl_cached_waiter;
296  

296  

297  
inline timer_service::implementation*
297  
inline timer_service::implementation*
298  
try_pop_tl_cache(timer_service* svc) noexcept
298  
try_pop_tl_cache(timer_service* svc) noexcept
299  
{
299  
{
300  
    auto* impl = tl_cached_impl.get();
300  
    auto* impl = tl_cached_impl.get();
301  
    if (impl)
301  
    if (impl)
302  
    {
302  
    {
303  
        tl_cached_impl.set(nullptr);
303  
        tl_cached_impl.set(nullptr);
304  
        if (impl->svc_ == svc)
304  
        if (impl->svc_ == svc)
305  
            return impl;
305  
            return impl;
306  
        // Stale impl from a destroyed service
306  
        // Stale impl from a destroyed service
307  
        delete impl;
307  
        delete impl;
308  
    }
308  
    }
309  
    return nullptr;
309  
    return nullptr;
310  
}
310  
}
311  

311  

312  
inline bool
312  
inline bool
313  
try_push_tl_cache(timer_service::implementation* impl) noexcept
313  
try_push_tl_cache(timer_service::implementation* impl) noexcept
314  
{
314  
{
315  
    if (!tl_cached_impl.get())
315  
    if (!tl_cached_impl.get())
316  
    {
316  
    {
317  
        tl_cached_impl.set(impl);
317  
        tl_cached_impl.set(impl);
318  
        return true;
318  
        return true;
319  
    }
319  
    }
320  
    return false;
320  
    return false;
321  
}
321  
}
322  

322  

323  
inline waiter_node*
323  
inline waiter_node*
324  
try_pop_waiter_tl_cache() noexcept
324  
try_pop_waiter_tl_cache() noexcept
325  
{
325  
{
326  
    auto* w = tl_cached_waiter.get();
326  
    auto* w = tl_cached_waiter.get();
327  
    if (w)
327  
    if (w)
328  
    {
328  
    {
329  
        tl_cached_waiter.set(nullptr);
329  
        tl_cached_waiter.set(nullptr);
330  
        return w;
330  
        return w;
331  
    }
331  
    }
332  
    return nullptr;
332  
    return nullptr;
333  
}
333  
}
334  

334  

335  
inline bool
335  
inline bool
336  
try_push_waiter_tl_cache(waiter_node* w) noexcept
336  
try_push_waiter_tl_cache(waiter_node* w) noexcept
337  
{
337  
{
338  
    if (!tl_cached_waiter.get())
338  
    if (!tl_cached_waiter.get())
339  
    {
339  
    {
340  
        tl_cached_waiter.set(w);
340  
        tl_cached_waiter.set(w);
341  
        return true;
341  
        return true;
342  
    }
342  
    }
343  
    return false;
343  
    return false;
344  
}
344  
}
345  

345  

346  
inline void
346  
inline void
347  
timer_service_invalidate_cache() noexcept
347  
timer_service_invalidate_cache() noexcept
348  
{
348  
{
349  
    delete tl_cached_impl.get();
349  
    delete tl_cached_impl.get();
350  
    tl_cached_impl.set(nullptr);
350  
    tl_cached_impl.set(nullptr);
351  

351  

352  
    delete tl_cached_waiter.get();
352  
    delete tl_cached_waiter.get();
353  
    tl_cached_waiter.set(nullptr);
353  
    tl_cached_waiter.set(nullptr);
354  
}
354  
}
355  

355  

356  
// timer_service out-of-class member function definitions
356  
// timer_service out-of-class member function definitions
357  

357  

358  
inline timer_service::implementation::implementation(
358  
inline timer_service::implementation::implementation(
359  
    timer_service& svc) noexcept
359  
    timer_service& svc) noexcept
360  
    : svc_(&svc)
360  
    : svc_(&svc)
361  
{
361  
{
362  
}
362  
}
363  

363  

364  
inline void
364  
inline void
365  
timer_service::shutdown()
365  
timer_service::shutdown()
366  
{
366  
{
367  
    timer_service_invalidate_cache();
367  
    timer_service_invalidate_cache();
368  
    shutting_down_ = true;
368  
    shutting_down_ = true;
369  

369  

370  
    // Snapshot impls and detach them from the heap so that
370  
    // Snapshot impls and detach them from the heap so that
371  
    // coroutine-owned timer destructors (triggered by h.destroy()
371  
    // coroutine-owned timer destructors (triggered by h.destroy()
372  
    // below) cannot re-enter remove_timer_impl() and mutate the
372  
    // below) cannot re-enter remove_timer_impl() and mutate the
373  
    // vector during iteration.
373  
    // vector during iteration.
374  
    std::vector<implementation*> impls;
374  
    std::vector<implementation*> impls;
375  
    impls.reserve(heap_.size());
375  
    impls.reserve(heap_.size());
376  
    for (auto& entry : heap_)
376  
    for (auto& entry : heap_)
377  
    {
377  
    {
378  
        entry.timer_->heap_index_ = (std::numeric_limits<std::size_t>::max)();
378  
        entry.timer_->heap_index_ = (std::numeric_limits<std::size_t>::max)();
379  
        impls.push_back(entry.timer_);
379  
        impls.push_back(entry.timer_);
380  
    }
380  
    }
381  
    heap_.clear();
381  
    heap_.clear();
382  
    cached_nearest_ns_.store(
382  
    cached_nearest_ns_.store(
383  
        (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
383  
        (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
384  

384  

385  
    // Cancel waiting timers. Each waiter called work_started()
385  
    // Cancel waiting timers. Each waiter called work_started()
386  
    // in implementation::wait(). On IOCP the scheduler shutdown
386  
    // in implementation::wait(). On IOCP the scheduler shutdown
387  
    // loop exits when outstanding_work_ reaches zero, so we must
387  
    // loop exits when outstanding_work_ reaches zero, so we must
388  
    // call work_finished() here to balance it. On other backends
388  
    // call work_finished() here to balance it. On other backends
389  
    // this is harmless.
389  
    // this is harmless.
390  
    for (auto* impl : impls)
390  
    for (auto* impl : impls)
391  
    {
391  
    {
392  
        while (auto* w = impl->waiters_.pop_front())
392  
        while (auto* w = impl->waiters_.pop_front())
393  
        {
393  
        {
394  
            w->stop_cb_.reset();
394  
            w->stop_cb_.reset();
395  
            auto h = std::exchange(w->h_, {});
395  
            auto h = std::exchange(w->h_, {});
396  
            sched_->work_finished();
396  
            sched_->work_finished();
397  
            if (h)
397  
            if (h)
398  
                h.destroy();
398  
                h.destroy();
399  
            delete w;
399  
            delete w;
400  
        }
400  
        }
401  
        delete impl;
401  
        delete impl;
402  
    }
402  
    }
403  

403  

404  
    // Delete free-listed impls
404  
    // Delete free-listed impls
405  
    while (free_list_)
405  
    while (free_list_)
406  
    {
406  
    {
407  
        auto* next = free_list_->next_free_;
407  
        auto* next = free_list_->next_free_;
408  
        delete free_list_;
408  
        delete free_list_;
409  
        free_list_ = next;
409  
        free_list_ = next;
410  
    }
410  
    }
411  

411  

412  
    // Delete free-listed waiters
412  
    // Delete free-listed waiters
413  
    while (waiter_free_list_)
413  
    while (waiter_free_list_)
414  
    {
414  
    {
415  
        auto* next = waiter_free_list_->next_free_;
415  
        auto* next = waiter_free_list_->next_free_;
416  
        delete waiter_free_list_;
416  
        delete waiter_free_list_;
417  
        waiter_free_list_ = next;
417  
        waiter_free_list_ = next;
418  
    }
418  
    }
419  
}
419  
}
420  

420  

421  
inline io_object::implementation*
421  
inline io_object::implementation*
422  
timer_service::construct()
422  
timer_service::construct()
423  
{
423  
{
424  
    implementation* impl = try_pop_tl_cache(this);
424  
    implementation* impl = try_pop_tl_cache(this);
425  
    if (impl)
425  
    if (impl)
426  
    {
426  
    {
427  
        impl->svc_        = this;
427  
        impl->svc_        = this;
428  
        impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
428  
        impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
429  
        impl->might_have_pending_waits_ = false;
429  
        impl->might_have_pending_waits_ = false;
430  
        return impl;
430  
        return impl;
431  
    }
431  
    }
432  

432  

433  
    std::lock_guard lock(mutex_);
433  
    std::lock_guard lock(mutex_);
434  
    if (free_list_)
434  
    if (free_list_)
435  
    {
435  
    {
436  
        impl              = free_list_;
436  
        impl              = free_list_;
437  
        free_list_        = impl->next_free_;
437  
        free_list_        = impl->next_free_;
438  
        impl->next_free_  = nullptr;
438  
        impl->next_free_  = nullptr;
439  
        impl->svc_        = this;
439  
        impl->svc_        = this;
440  
        impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
440  
        impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
441  
        impl->might_have_pending_waits_ = false;
441  
        impl->might_have_pending_waits_ = false;
442  
    }
442  
    }
443  
    else
443  
    else
444  
    {
444  
    {
445  
        impl = new implementation(*this);
445  
        impl = new implementation(*this);
446  
    }
446  
    }
447  
    return impl;
447  
    return impl;
448  
}
448  
}
449  

449  

450  
inline void
450  
inline void
451  
timer_service::destroy(io_object::implementation* p)
451  
timer_service::destroy(io_object::implementation* p)
452  
{
452  
{
453  
    destroy_impl(static_cast<implementation&>(*p));
453  
    destroy_impl(static_cast<implementation&>(*p));
454  
}
454  
}
455  

455  

456  
inline void
456  
inline void
457  
timer_service::destroy_impl(implementation& impl)
457  
timer_service::destroy_impl(implementation& impl)
458  
{
458  
{
459  
    // During shutdown the impl is owned by the shutdown loop.
459  
    // During shutdown the impl is owned by the shutdown loop.
460  
    // Re-entering here (from a coroutine-owned timer destructor
460  
    // Re-entering here (from a coroutine-owned timer destructor
461  
    // triggered by h.destroy()) must not modify the heap or
461  
    // triggered by h.destroy()) must not modify the heap or
462  
    // recycle the impl — shutdown deletes it directly.
462  
    // recycle the impl — shutdown deletes it directly.
463  
    if (shutting_down_)
463  
    if (shutting_down_)
464  
        return;
464  
        return;
465  

465  

466  
    cancel_timer(impl);
466  
    cancel_timer(impl);
467  

467  

468  
    if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
468  
    if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
469  
    {
469  
    {
470  
        std::lock_guard lock(mutex_);
470  
        std::lock_guard lock(mutex_);
471  
        remove_timer_impl(impl);
471  
        remove_timer_impl(impl);
472  
        refresh_cached_nearest();
472  
        refresh_cached_nearest();
473  
    }
473  
    }
474  

474  

475  
    if (try_push_tl_cache(&impl))
475  
    if (try_push_tl_cache(&impl))
476  
        return;
476  
        return;
477  

477  

478  
    std::lock_guard lock(mutex_);
478  
    std::lock_guard lock(mutex_);
479  
    impl.next_free_ = free_list_;
479  
    impl.next_free_ = free_list_;
480  
    free_list_      = &impl;
480  
    free_list_      = &impl;
481  
}
481  
}
482  

482  

483  
inline waiter_node*
483  
inline waiter_node*
484  
timer_service::create_waiter()
484  
timer_service::create_waiter()
485  
{
485  
{
486  
    if (auto* w = try_pop_waiter_tl_cache())
486  
    if (auto* w = try_pop_waiter_tl_cache())
487  
        return w;
487  
        return w;
488  

488  

489  
    std::lock_guard lock(mutex_);
489  
    std::lock_guard lock(mutex_);
490  
    if (waiter_free_list_)
490  
    if (waiter_free_list_)
491  
    {
491  
    {
492  
        auto* w           = waiter_free_list_;
492  
        auto* w           = waiter_free_list_;
493  
        waiter_free_list_ = w->next_free_;
493  
        waiter_free_list_ = w->next_free_;
494  
        w->next_free_     = nullptr;
494  
        w->next_free_     = nullptr;
495  
        return w;
495  
        return w;
496  
    }
496  
    }
497  

497  

498  
    return new waiter_node();
498  
    return new waiter_node();
499  
}
499  
}
500  

500  

501  
inline void
501  
inline void
502  
timer_service::destroy_waiter(waiter_node* w)
502  
timer_service::destroy_waiter(waiter_node* w)
503  
{
503  
{
504  
    if (try_push_waiter_tl_cache(w))
504  
    if (try_push_waiter_tl_cache(w))
505  
        return;
505  
        return;
506  

506  

507  
    std::lock_guard lock(mutex_);
507  
    std::lock_guard lock(mutex_);
508  
    w->next_free_     = waiter_free_list_;
508  
    w->next_free_     = waiter_free_list_;
509  
    waiter_free_list_ = w;
509  
    waiter_free_list_ = w;
510  
}
510  
}
511  

511  

512  
inline std::size_t
512  
inline std::size_t
513  
timer_service::update_timer(implementation& impl, time_point new_time)
513  
timer_service::update_timer(implementation& impl, time_point new_time)
514  
{
514  
{
515  
    bool in_heap =
515  
    bool in_heap =
516  
        (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
516  
        (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
517  
    if (!in_heap && impl.waiters_.empty())
517  
    if (!in_heap && impl.waiters_.empty())
518  
        return 0;
518  
        return 0;
519  

519  

520  
    bool notify = false;
520  
    bool notify = false;
521  
    intrusive_list<waiter_node> canceled;
521  
    intrusive_list<waiter_node> canceled;
522  

522  

523  
    {
523  
    {
524  
        std::lock_guard lock(mutex_);
524  
        std::lock_guard lock(mutex_);
525  

525  

526  
        while (auto* w = impl.waiters_.pop_front())
526  
        while (auto* w = impl.waiters_.pop_front())
527  
        {
527  
        {
528  
            w->impl_ = nullptr;
528  
            w->impl_ = nullptr;
529  
            canceled.push_back(w);
529  
            canceled.push_back(w);
530  
        }
530  
        }
531  

531  

532  
        if (impl.heap_index_ < heap_.size())
532  
        if (impl.heap_index_ < heap_.size())
533  
        {
533  
        {
534  
            time_point old_time           = heap_[impl.heap_index_].time_;
534  
            time_point old_time           = heap_[impl.heap_index_].time_;
535  
            heap_[impl.heap_index_].time_ = new_time;
535  
            heap_[impl.heap_index_].time_ = new_time;
536  

536  

537  
            if (new_time < old_time)
537  
            if (new_time < old_time)
538  
                up_heap(impl.heap_index_);
538  
                up_heap(impl.heap_index_);
539  
            else
539  
            else
540  
                down_heap(impl.heap_index_);
540  
                down_heap(impl.heap_index_);
541  

541  

542  
            notify = (impl.heap_index_ == 0);
542  
            notify = (impl.heap_index_ == 0);
543  
        }
543  
        }
544  

544  

545  
        refresh_cached_nearest();
545  
        refresh_cached_nearest();
546  
    }
546  
    }
547  

547  

548  
    std::size_t count = 0;
548  
    std::size_t count = 0;
549  
    while (auto* w = canceled.pop_front())
549  
    while (auto* w = canceled.pop_front())
550  
    {
550  
    {
551  
        w->ec_value_ = make_error_code(capy::error::canceled);
551  
        w->ec_value_ = make_error_code(capy::error::canceled);
552  
        sched_->post(&w->op_);
552  
        sched_->post(&w->op_);
553  
        ++count;
553  
        ++count;
554  
    }
554  
    }
555  

555  

556  
    if (notify)
556  
    if (notify)
557  
        on_earliest_changed_();
557  
        on_earliest_changed_();
558  

558  

559  
    return count;
559  
    return count;
560  
}
560  
}
561  

561  

562  
inline void
562  
inline void
563  
timer_service::insert_waiter(implementation& impl, waiter_node* w)
563  
timer_service::insert_waiter(implementation& impl, waiter_node* w)
564  
{
564  
{
565  
    bool notify = false;
565  
    bool notify = false;
566  
    {
566  
    {
567  
        std::lock_guard lock(mutex_);
567  
        std::lock_guard lock(mutex_);
568  
        if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
568  
        if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
569  
        {
569  
        {
570  
            impl.heap_index_ = heap_.size();
570  
            impl.heap_index_ = heap_.size();
571  
            heap_.push_back({impl.expiry_, &impl});
571  
            heap_.push_back({impl.expiry_, &impl});
572  
            up_heap(heap_.size() - 1);
572  
            up_heap(heap_.size() - 1);
573  
            notify = (impl.heap_index_ == 0);
573  
            notify = (impl.heap_index_ == 0);
574  
            refresh_cached_nearest();
574  
            refresh_cached_nearest();
575  
        }
575  
        }
576  
        impl.waiters_.push_back(w);
576  
        impl.waiters_.push_back(w);
577  
    }
577  
    }
578  
    if (notify)
578  
    if (notify)
579  
        on_earliest_changed_();
579  
        on_earliest_changed_();
580  
}
580  
}
581  

581  

582  
inline std::size_t
582  
inline std::size_t
583  
timer_service::cancel_timer(implementation& impl)
583  
timer_service::cancel_timer(implementation& impl)
584  
{
584  
{
585  
    if (!impl.might_have_pending_waits_)
585  
    if (!impl.might_have_pending_waits_)
586  
        return 0;
586  
        return 0;
587  

587  

588  
    // Not in heap and no waiters — just clear the flag
588  
    // Not in heap and no waiters — just clear the flag
589  
    if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
589  
    if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
590  
        impl.waiters_.empty())
590  
        impl.waiters_.empty())
591  
    {
591  
    {
592  
        impl.might_have_pending_waits_ = false;
592  
        impl.might_have_pending_waits_ = false;
593  
        return 0;
593  
        return 0;
594  
    }
594  
    }
595  

595  

596  
    intrusive_list<waiter_node> canceled;
596  
    intrusive_list<waiter_node> canceled;
597  

597  

598  
    {
598  
    {
599  
        std::lock_guard lock(mutex_);
599  
        std::lock_guard lock(mutex_);
600  
        remove_timer_impl(impl);
600  
        remove_timer_impl(impl);
601  
        while (auto* w = impl.waiters_.pop_front())
601  
        while (auto* w = impl.waiters_.pop_front())
602  
        {
602  
        {
603  
            w->impl_ = nullptr;
603  
            w->impl_ = nullptr;
604  
            canceled.push_back(w);
604  
            canceled.push_back(w);
605  
        }
605  
        }
606  
        refresh_cached_nearest();
606  
        refresh_cached_nearest();
607  
    }
607  
    }
608  

608  

609  
    impl.might_have_pending_waits_ = false;
609  
    impl.might_have_pending_waits_ = false;
610  

610  

611  
    std::size_t count = 0;
611  
    std::size_t count = 0;
612  
    while (auto* w = canceled.pop_front())
612  
    while (auto* w = canceled.pop_front())
613  
    {
613  
    {
614  
        w->ec_value_ = make_error_code(capy::error::canceled);
614  
        w->ec_value_ = make_error_code(capy::error::canceled);
615  
        sched_->post(&w->op_);
615  
        sched_->post(&w->op_);
616  
        ++count;
616  
        ++count;
617  
    }
617  
    }
618  

618  

619  
    return count;
619  
    return count;
620  
}
620  
}
621  

621  

622  
inline void
622  
inline void
623  
timer_service::cancel_waiter(waiter_node* w)
623  
timer_service::cancel_waiter(waiter_node* w)
624  
{
624  
{
625  
    {
625  
    {
626  
        std::lock_guard lock(mutex_);
626  
        std::lock_guard lock(mutex_);
627  
        // Already removed by cancel_timer or process_expired
627  
        // Already removed by cancel_timer or process_expired
628  
        if (!w->impl_)
628  
        if (!w->impl_)
629  
            return;
629  
            return;
630  
        auto* impl = w->impl_;
630  
        auto* impl = w->impl_;
631  
        w->impl_   = nullptr;
631  
        w->impl_   = nullptr;
632  
        impl->waiters_.remove(w);
632  
        impl->waiters_.remove(w);
633  
        if (impl->waiters_.empty())
633  
        if (impl->waiters_.empty())
634  
        {
634  
        {
635  
            remove_timer_impl(*impl);
635  
            remove_timer_impl(*impl);
636  
            impl->might_have_pending_waits_ = false;
636  
            impl->might_have_pending_waits_ = false;
637  
        }
637  
        }
638  
        refresh_cached_nearest();
638  
        refresh_cached_nearest();
639  
    }
639  
    }
640  

640  

641  
    w->ec_value_ = make_error_code(capy::error::canceled);
641  
    w->ec_value_ = make_error_code(capy::error::canceled);
642  
    sched_->post(&w->op_);
642  
    sched_->post(&w->op_);
643  
}
643  
}
644  

644  

645  
inline std::size_t
645  
inline std::size_t
646  
timer_service::cancel_one_waiter(implementation& impl)
646  
timer_service::cancel_one_waiter(implementation& impl)
647  
{
647  
{
648  
    if (!impl.might_have_pending_waits_)
648  
    if (!impl.might_have_pending_waits_)
649  
        return 0;
649  
        return 0;
650  

650  

651  
    waiter_node* w = nullptr;
651  
    waiter_node* w = nullptr;
652  

652  

653  
    {
653  
    {
654  
        std::lock_guard lock(mutex_);
654  
        std::lock_guard lock(mutex_);
655  
        w = impl.waiters_.pop_front();
655  
        w = impl.waiters_.pop_front();
656  
        if (!w)
656  
        if (!w)
657  
            return 0;
657  
            return 0;
658  
        w->impl_ = nullptr;
658  
        w->impl_ = nullptr;
659  
        if (impl.waiters_.empty())
659  
        if (impl.waiters_.empty())
660  
        {
660  
        {
661  
            remove_timer_impl(impl);
661  
            remove_timer_impl(impl);
662  
            impl.might_have_pending_waits_ = false;
662  
            impl.might_have_pending_waits_ = false;
663  
        }
663  
        }
664  
        refresh_cached_nearest();
664  
        refresh_cached_nearest();
665  
    }
665  
    }
666  

666  

667  
    w->ec_value_ = make_error_code(capy::error::canceled);
667  
    w->ec_value_ = make_error_code(capy::error::canceled);
668  
    sched_->post(&w->op_);
668  
    sched_->post(&w->op_);
669  
    return 1;
669  
    return 1;
670  
}
670  
}
671  

671  

672  
inline std::size_t
672  
inline std::size_t
673  
timer_service::process_expired()
673  
timer_service::process_expired()
674  
{
674  
{
675  
    intrusive_list<waiter_node> expired;
675  
    intrusive_list<waiter_node> expired;
676  

676  

677  
    {
677  
    {
678  
        std::lock_guard lock(mutex_);
678  
        std::lock_guard lock(mutex_);
679  
        auto now = clock_type::now();
679  
        auto now = clock_type::now();
680  

680  

681  
        while (!heap_.empty() && heap_[0].time_ <= now)
681  
        while (!heap_.empty() && heap_[0].time_ <= now)
682  
        {
682  
        {
683  
            implementation* t = heap_[0].timer_;
683  
            implementation* t = heap_[0].timer_;
684  
            remove_timer_impl(*t);
684  
            remove_timer_impl(*t);
685  
            while (auto* w = t->waiters_.pop_front())
685  
            while (auto* w = t->waiters_.pop_front())
686  
            {
686  
            {
687  
                w->impl_     = nullptr;
687  
                w->impl_     = nullptr;
688  
                w->ec_value_ = {};
688  
                w->ec_value_ = {};
689  
                expired.push_back(w);
689  
                expired.push_back(w);
690  
            }
690  
            }
691  
            t->might_have_pending_waits_ = false;
691  
            t->might_have_pending_waits_ = false;
692  
        }
692  
        }
693  

693  

694  
        refresh_cached_nearest();
694  
        refresh_cached_nearest();
695  
    }
695  
    }
696  

696  

697  
    std::size_t count = 0;
697  
    std::size_t count = 0;
698  
    while (auto* w = expired.pop_front())
698  
    while (auto* w = expired.pop_front())
699  
    {
699  
    {
700  
        sched_->post(&w->op_);
700  
        sched_->post(&w->op_);
701  
        ++count;
701  
        ++count;
702  
    }
702  
    }
703  

703  

704  
    return count;
704  
    return count;
705  
}
705  
}
706  

706  

707  
inline void
707  
inline void
708  
timer_service::remove_timer_impl(implementation& impl)
708  
timer_service::remove_timer_impl(implementation& impl)
709  
{
709  
{
710  
    std::size_t index = impl.heap_index_;
710  
    std::size_t index = impl.heap_index_;
711  
    if (index >= heap_.size())
711  
    if (index >= heap_.size())
712  
        return; // Not in heap
712  
        return; // Not in heap
713  

713  

714  
    if (index == heap_.size() - 1)
714  
    if (index == heap_.size() - 1)
715  
    {
715  
    {
716  
        // Last element, just pop
716  
        // Last element, just pop
717  
        impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
717  
        impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
718  
        heap_.pop_back();
718  
        heap_.pop_back();
719  
    }
719  
    }
720  
    else
720  
    else
721  
    {
721  
    {
722  
        // Swap with last and reheapify
722  
        // Swap with last and reheapify
723  
        swap_heap(index, heap_.size() - 1);
723  
        swap_heap(index, heap_.size() - 1);
724  
        impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
724  
        impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
725  
        heap_.pop_back();
725  
        heap_.pop_back();
726  

726  

727  
        if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
727  
        if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
728  
            up_heap(index);
728  
            up_heap(index);
729  
        else
729  
        else
730  
            down_heap(index);
730  
            down_heap(index);
731  
    }
731  
    }
732  
}
732  
}
733  

733  

734  
inline void
734  
inline void
735  
timer_service::up_heap(std::size_t index)
735  
timer_service::up_heap(std::size_t index)
736  
{
736  
{
737  
    while (index > 0)
737  
    while (index > 0)
738  
    {
738  
    {
739  
        std::size_t parent = (index - 1) / 2;
739  
        std::size_t parent = (index - 1) / 2;
740  
        if (!(heap_[index].time_ < heap_[parent].time_))
740  
        if (!(heap_[index].time_ < heap_[parent].time_))
741  
            break;
741  
            break;
742  
        swap_heap(index, parent);
742  
        swap_heap(index, parent);
743  
        index = parent;
743  
        index = parent;
744  
    }
744  
    }
745  
}
745  
}
746  

746  

747  
inline void
747  
inline void
748  
timer_service::down_heap(std::size_t index)
748  
timer_service::down_heap(std::size_t index)
749  
{
749  
{
750  
    std::size_t child = index * 2 + 1;
750  
    std::size_t child = index * 2 + 1;
751  
    while (child < heap_.size())
751  
    while (child < heap_.size())
752  
    {
752  
    {
753  
        std::size_t min_child = (child + 1 == heap_.size() ||
753  
        std::size_t min_child = (child + 1 == heap_.size() ||
754  
                                 heap_[child].time_ < heap_[child + 1].time_)
754  
                                 heap_[child].time_ < heap_[child + 1].time_)
755  
            ? child
755  
            ? child
756  
            : child + 1;
756  
            : child + 1;
757  

757  

758  
        if (heap_[index].time_ < heap_[min_child].time_)
758  
        if (heap_[index].time_ < heap_[min_child].time_)
759  
            break;
759  
            break;
760  

760  

761  
        swap_heap(index, min_child);
761  
        swap_heap(index, min_child);
762  
        index = min_child;
762  
        index = min_child;
763  
        child = index * 2 + 1;
763  
        child = index * 2 + 1;
764  
    }
764  
    }
765  
}
765  
}
766  

766  

767  
inline void
767  
inline void
768  
timer_service::swap_heap(std::size_t i1, std::size_t i2)
768  
timer_service::swap_heap(std::size_t i1, std::size_t i2)
769  
{
769  
{
770  
    heap_entry tmp                = heap_[i1];
770  
    heap_entry tmp                = heap_[i1];
771  
    heap_[i1]                     = heap_[i2];
771  
    heap_[i1]                     = heap_[i2];
772  
    heap_[i2]                     = tmp;
772  
    heap_[i2]                     = tmp;
773  
    heap_[i1].timer_->heap_index_ = i1;
773  
    heap_[i1].timer_->heap_index_ = i1;
774  
    heap_[i2].timer_->heap_index_ = i2;
774  
    heap_[i2].timer_->heap_index_ = i2;
775  
}
775  
}
776  

776  

777  
// waiter_node out-of-class member function definitions
777  
// waiter_node out-of-class member function definitions
778  

778  

779  
inline void
779  
inline void
780  
waiter_node::canceller::operator()() const
780  
waiter_node::canceller::operator()() const
781  
{
781  
{
782  
    waiter_->svc_->cancel_waiter(waiter_);
782  
    waiter_->svc_->cancel_waiter(waiter_);
783  
}
783  
}
784  

784  

785  
inline void
785  
inline void
786  
waiter_node::completion_op::do_complete(
786  
waiter_node::completion_op::do_complete(
787  
    [[maybe_unused]] void* owner,
787  
    [[maybe_unused]] void* owner,
788  
    scheduler_op* base,
788  
    scheduler_op* base,
789  
    std::uint32_t,
789  
    std::uint32_t,
790  
    std::uint32_t)
790  
    std::uint32_t)
791  
{
791  
{
792  
    // owner is always non-null here. The destroy path (owner == nullptr)
792  
    // owner is always non-null here. The destroy path (owner == nullptr)
793  
    // is unreachable because completion_op overrides destroy() directly,
793  
    // is unreachable because completion_op overrides destroy() directly,
794  
    // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
794  
    // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
795  
    BOOST_COROSIO_ASSERT(owner);
795  
    BOOST_COROSIO_ASSERT(owner);
796  
    static_cast<completion_op*>(base)->operator()();
796  
    static_cast<completion_op*>(base)->operator()();
797  
}
797  
}
798  

798  

799  
inline void
799  
inline void
800  
waiter_node::completion_op::operator()()
800  
waiter_node::completion_op::operator()()
801  
{
801  
{
802  
    auto* w = waiter_;
802  
    auto* w = waiter_;
803  
    w->stop_cb_.reset();
803  
    w->stop_cb_.reset();
804  
    if (w->ec_out_)
804  
    if (w->ec_out_)
805  
        *w->ec_out_ = w->ec_value_;
805  
        *w->ec_out_ = w->ec_value_;
806  

806  

807  
    auto* cont  = w->cont_;
807  
    auto* cont  = w->cont_;
808  
    auto d      = w->d_;
808  
    auto d      = w->d_;
809  
    auto* svc   = w->svc_;
809  
    auto* svc   = w->svc_;
810  
    auto& sched = svc->get_scheduler();
810  
    auto& sched = svc->get_scheduler();
811  

811  

812  
    svc->destroy_waiter(w);
812  
    svc->destroy_waiter(w);
813  

813  

814  
    d.post(*cont);
814  
    d.post(*cont);
815  
    sched.work_finished();
815  
    sched.work_finished();
816  
}
816  
}
817  

817  

818  
// GCC 14 false-positive: inlining ~optional<stop_callback> through
818  
// GCC 14 false-positive: inlining ~optional<stop_callback> through
819  
// delete loses track that stop_cb_ was already .reset() above.
819  
// delete loses track that stop_cb_ was already .reset() above.
820  
#if defined(__GNUC__) && !defined(__clang__)
820  
#if defined(__GNUC__) && !defined(__clang__)
821  
#pragma GCC diagnostic push
821  
#pragma GCC diagnostic push
822  
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
822  
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
823  
#endif
823  
#endif
824  
inline void
824  
inline void
825  
waiter_node::completion_op::destroy()
825  
waiter_node::completion_op::destroy()
826  
{
826  
{
827  
    // Called during scheduler shutdown drain when this completion_op is
827  
    // Called during scheduler shutdown drain when this completion_op is
828  
    // in the scheduler's ready queue (posted by cancel_timer() or
828  
    // in the scheduler's ready queue (posted by cancel_timer() or
829  
    // process_expired()). Balances the work_started() from
829  
    // process_expired()). Balances the work_started() from
830  
    // implementation::wait(). The scheduler drain loop separately
830  
    // implementation::wait(). The scheduler drain loop separately
831  
    // balances the work_started() from post(). On IOCP both decrements
831  
    // balances the work_started() from post(). On IOCP both decrements
832  
    // are required for outstanding_work_ to reach zero; on other
832  
    // are required for outstanding_work_ to reach zero; on other
833  
    // backends this is harmless.
833  
    // backends this is harmless.
834  
    //
834  
    //
835  
    // This override also prevents scheduler_op::destroy() from calling
835  
    // This override also prevents scheduler_op::destroy() from calling
836  
    // do_complete(nullptr, ...). See also: timer_service::shutdown()
836  
    // do_complete(nullptr, ...). See also: timer_service::shutdown()
837  
    // which drains waiters still in the timer heap (the other path).
837  
    // which drains waiters still in the timer heap (the other path).
838  
    auto* w = waiter_;
838  
    auto* w = waiter_;
839  
    w->stop_cb_.reset();
839  
    w->stop_cb_.reset();
840  
    auto h      = std::exchange(w->h_, {});
840  
    auto h      = std::exchange(w->h_, {});
841  
    auto& sched = w->svc_->get_scheduler();
841  
    auto& sched = w->svc_->get_scheduler();
842  
    delete w;
842  
    delete w;
843  
    sched.work_finished();
843  
    sched.work_finished();
844  
    if (h)
844  
    if (h)
845  
        h.destroy();
845  
        h.destroy();
846  
}
846  
}
847  
#if defined(__GNUC__) && !defined(__clang__)
847  
#if defined(__GNUC__) && !defined(__clang__)
848  
#pragma GCC diagnostic pop
848  
#pragma GCC diagnostic pop
849  
#endif
849  
#endif
850  

850  

851  
inline std::coroutine_handle<>
851  
inline std::coroutine_handle<>
852  
timer_service::implementation::wait(
852  
timer_service::implementation::wait(
853  
    std::coroutine_handle<> h,
853  
    std::coroutine_handle<> h,
854  
    capy::executor_ref d,
854  
    capy::executor_ref d,
855  
    std::stop_token token,
855  
    std::stop_token token,
856  
    std::error_code* ec,
856  
    std::error_code* ec,
857  
    capy::continuation* cont)
857  
    capy::continuation* cont)
858  
{
858  
{
859  
    // Already-expired fast path — no waiter_node, no mutex.
859  
    // Already-expired fast path — no waiter_node, no mutex.
860  
    // Post instead of dispatch so the coroutine yields to the
860  
    // Post instead of dispatch so the coroutine yields to the
861  
    // scheduler, allowing other queued work to run.
861  
    // scheduler, allowing other queued work to run.
862  
    if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
862  
    if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
863  
    {
863  
    {
864  
        if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
864  
        if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
865  
        {
865  
        {
866  
            if (ec)
866  
            if (ec)
867  
                *ec = {};
867  
                *ec = {};
868  
            d.post(*cont);
868  
            d.post(*cont);
869  
            return std::noop_coroutine();
869  
            return std::noop_coroutine();
870  
        }
870  
        }
871  
    }
871  
    }
872  

872  

873  
    auto* w    = svc_->create_waiter();
873  
    auto* w    = svc_->create_waiter();
874  
    w->impl_   = this;
874  
    w->impl_   = this;
875  
    w->svc_    = svc_;
875  
    w->svc_    = svc_;
876  
    w->h_      = h;
876  
    w->h_      = h;
877  
    w->cont_   = cont;
877  
    w->cont_   = cont;
878  
    w->d_      = d;
878  
    w->d_      = d;
879  
    w->token_  = std::move(token);
879  
    w->token_  = std::move(token);
880  
    w->ec_out_ = ec;
880  
    w->ec_out_ = ec;
881  

881  

882  
    svc_->insert_waiter(*this, w);
882  
    svc_->insert_waiter(*this, w);
883  
    might_have_pending_waits_ = true;
883  
    might_have_pending_waits_ = true;
884  
    svc_->get_scheduler().work_started();
884  
    svc_->get_scheduler().work_started();
885  

885  

886  
    if (w->token_.stop_possible())
886  
    if (w->token_.stop_possible())
887  
        w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
887  
        w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
888  

888  

889  
    return std::noop_coroutine();
889  
    return std::noop_coroutine();
890  
}
890  
}
891  

891  

892  
// Free functions
892  
// Free functions
893  

893  

894  
struct timer_service_access
894  
struct timer_service_access
895  
{
895  
{
896  
    static timer_service& get_timer(io_context& ctx) noexcept
896  
    static timer_service& get_timer(io_context& ctx) noexcept
897  
    {
897  
    {
898  
        return *ctx.timer_svc_;
898  
        return *ctx.timer_svc_;
899  
    }
899  
    }
900  

900  

901  
    static void set_timer(io_context& ctx, timer_service& svc) noexcept
901  
    static void set_timer(io_context& ctx, timer_service& svc) noexcept
902  
    {
902  
    {
903  
        ctx.timer_svc_ = &svc;
903  
        ctx.timer_svc_ = &svc;
904  
    }
904  
    }
905  
};
905  
};
906  

906  

907  
// Bypass find_service() mutex by reading io_context's cached pointer
907  
// Bypass find_service() mutex by reading io_context's cached pointer
908  
inline io_object::io_service&
908  
inline io_object::io_service&
909  
timer_service_direct(capy::execution_context& ctx) noexcept
909  
timer_service_direct(capy::execution_context& ctx) noexcept
910  
{
910  
{
911  
    return timer_service_access::get_timer(static_cast<io_context&>(ctx));
911  
    return timer_service_access::get_timer(static_cast<io_context&>(ctx));
912  
}
912  
}
913  

913  

914  
inline std::size_t
914  
inline std::size_t
915  
timer_service_update_expiry(timer::implementation& base)
915  
timer_service_update_expiry(timer::implementation& base)
916  
{
916  
{
917  
    auto& impl = static_cast<timer_service::implementation&>(base);
917  
    auto& impl = static_cast<timer_service::implementation&>(base);
918  
    return impl.svc_->update_timer(impl, impl.expiry_);
918  
    return impl.svc_->update_timer(impl, impl.expiry_);
919  
}
919  
}
920  

920  

921  
inline std::size_t
921  
inline std::size_t
922  
timer_service_cancel(timer::implementation& base) noexcept
922  
timer_service_cancel(timer::implementation& base) noexcept
923  
{
923  
{
924  
    auto& impl = static_cast<timer_service::implementation&>(base);
924  
    auto& impl = static_cast<timer_service::implementation&>(base);
925  
    return impl.svc_->cancel_timer(impl);
925  
    return impl.svc_->cancel_timer(impl);
926  
}
926  
}
927  

927  

928  
inline std::size_t
928  
inline std::size_t
929  
timer_service_cancel_one(timer::implementation& base) noexcept
929  
timer_service_cancel_one(timer::implementation& base) noexcept
930  
{
930  
{
931  
    auto& impl = static_cast<timer_service::implementation&>(base);
931  
    auto& impl = static_cast<timer_service::implementation&>(base);
932  
    return impl.svc_->cancel_one_waiter(impl);
932  
    return impl.svc_->cancel_one_waiter(impl);
933  
}
933  
}
934  

934  

935  
inline timer_service&
935  
inline timer_service&
936  
get_timer_service(capy::execution_context& ctx, scheduler& sched)
936  
get_timer_service(capy::execution_context& ctx, scheduler& sched)
937  
{
937  
{
938  
    auto& svc = ctx.make_service<timer_service>(sched);
938  
    auto& svc = ctx.make_service<timer_service>(sched);
939  
    timer_service_access::set_timer(static_cast<io_context&>(ctx), svc);
939  
    timer_service_access::set_timer(static_cast<io_context&>(ctx), svc);
940  
    return svc;
940  
    return svc;
941  
}
941  
}
942  

942  

943  
} // namespace boost::corosio::detail
943  
} // namespace boost::corosio::detail
944  

944  

945  
#endif
945  
#endif