1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11  
#define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11  
#define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_copy.hpp>
16  
#include <boost/capy/buffers/buffer_copy.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
18  
#include <boost/capy/concept/buffer_sink.hpp>
18  
#include <boost/capy/concept/buffer_sink.hpp>
19  
#include <boost/capy/concept/io_awaitable.hpp>
19  
#include <boost/capy/concept/io_awaitable.hpp>
20  
#include <boost/capy/concept/write_sink.hpp>
20  
#include <boost/capy/concept/write_sink.hpp>
21  
#include <boost/capy/coro.hpp>
21  
#include <boost/capy/coro.hpp>
22  
#include <boost/capy/ex/executor_ref.hpp>
22  
#include <boost/capy/ex/executor_ref.hpp>
23  
#include <boost/capy/io_result.hpp>
23  
#include <boost/capy/io_result.hpp>
24  
#include <boost/capy/io_task.hpp>
24  
#include <boost/capy/io_task.hpp>
25  

25  

26  
#include <concepts>
26  
#include <concepts>
27  
#include <coroutine>
27  
#include <coroutine>
28  
#include <cstddef>
28  
#include <cstddef>
29  
#include <exception>
29  
#include <exception>
30  
#include <new>
30  
#include <new>
31  
#include <span>
31  
#include <span>
32  
#include <stop_token>
32  
#include <stop_token>
33  
#include <system_error>
33  
#include <system_error>
34  
#include <utility>
34  
#include <utility>
35  

35  

36  
namespace boost {
36  
namespace boost {
37  
namespace capy {
37  
namespace capy {
38  

38  

39  
/** Type-erased wrapper for any BufferSink.
39  
/** Type-erased wrapper for any BufferSink.
40  

40  

41  
    This class provides type erasure for any type satisfying the
41  
    This class provides type erasure for any type satisfying the
42  
    @ref BufferSink concept, enabling runtime polymorphism for
42  
    @ref BufferSink concept, enabling runtime polymorphism for
43  
    buffer sink operations. It uses cached awaitable storage to achieve
43  
    buffer sink operations. It uses cached awaitable storage to achieve
44  
    zero steady-state allocation after construction.
44  
    zero steady-state allocation after construction.
45  

45  

46  
    The wrapper exposes two interfaces for producing data:
46  
    The wrapper exposes two interfaces for producing data:
47  
    the @ref BufferSink interface (`prepare`, `commit`, `commit_eof`)
47  
    the @ref BufferSink interface (`prepare`, `commit`, `commit_eof`)
48  
    and the @ref WriteSink interface (`write_some`, `write`,
48  
    and the @ref WriteSink interface (`write_some`, `write`,
49  
    `write_eof`). Choose the interface that matches how your data
49  
    `write_eof`). Choose the interface that matches how your data
50  
    is produced:
50  
    is produced:
51  

51  

52  
    @par Choosing an Interface
52  
    @par Choosing an Interface
53  

53  

54  
    Use the **BufferSink** interface when you are a generator that
54  
    Use the **BufferSink** interface when you are a generator that
55  
    produces data into externally-provided buffers. The sink owns
55  
    produces data into externally-provided buffers. The sink owns
56  
    the memory; you call @ref prepare to obtain writable buffers,
56  
    the memory; you call @ref prepare to obtain writable buffers,
57  
    fill them, then call @ref commit or @ref commit_eof.
57  
    fill them, then call @ref commit or @ref commit_eof.
58  

58  

59  
    Use the **WriteSink** interface when you already have buffers
59  
    Use the **WriteSink** interface when you already have buffers
60  
    containing the data to write:
60  
    containing the data to write:
61  
    - If the entire body is available up front, call
61  
    - If the entire body is available up front, call
62  
      @ref write_eof(buffers) to send everything atomically.
62  
      @ref write_eof(buffers) to send everything atomically.
63  
    - If data arrives incrementally, call @ref write or
63  
    - If data arrives incrementally, call @ref write or
64  
      @ref write_some in a loop, then @ref write_eof() when done.
64  
      @ref write_some in a loop, then @ref write_eof() when done.
65  
      Prefer `write` (complete) unless your streaming pattern
65  
      Prefer `write` (complete) unless your streaming pattern
66  
      benefits from partial writes via `write_some`.
66  
      benefits from partial writes via `write_some`.
67  

67  

68  
    If the wrapped type only satisfies @ref BufferSink, the
68  
    If the wrapped type only satisfies @ref BufferSink, the
69  
    @ref WriteSink operations are provided automatically.
69  
    @ref WriteSink operations are provided automatically.
70  

70  

71  
    @par Construction Modes
71  
    @par Construction Modes
72  

72  

73  
    - **Owning**: Pass by value to transfer ownership. The wrapper
73  
    - **Owning**: Pass by value to transfer ownership. The wrapper
74  
      allocates storage and owns the sink.
74  
      allocates storage and owns the sink.
75  
    - **Reference**: Pass a pointer to wrap without ownership. The
75  
    - **Reference**: Pass a pointer to wrap without ownership. The
76  
      pointed-to sink must outlive this wrapper.
76  
      pointed-to sink must outlive this wrapper.
77  

77  

78  
    @par Awaitable Preallocation
78  
    @par Awaitable Preallocation
79  
    The constructor preallocates storage for the type-erased awaitable.
79  
    The constructor preallocates storage for the type-erased awaitable.
80  
    This reserves all virtual address space at server startup
80  
    This reserves all virtual address space at server startup
81  
    so memory usage can be measured up front, rather than
81  
    so memory usage can be measured up front, rather than
82  
    allocating piecemeal as traffic arrives.
82  
    allocating piecemeal as traffic arrives.
83  

83  

84  
    @par Thread Safety
84  
    @par Thread Safety
85  
    Not thread-safe. Concurrent operations on the same wrapper
85  
    Not thread-safe. Concurrent operations on the same wrapper
86  
    are undefined behavior.
86  
    are undefined behavior.
87  

87  

88  
    @par Example
88  
    @par Example
89  
    @code
89  
    @code
90  
    // Owning - takes ownership of the sink
90  
    // Owning - takes ownership of the sink
91  
    any_buffer_sink abs(some_buffer_sink{args...});
91  
    any_buffer_sink abs(some_buffer_sink{args...});
92  

92  

93  
    // Reference - wraps without ownership
93  
    // Reference - wraps without ownership
94  
    some_buffer_sink sink;
94  
    some_buffer_sink sink;
95  
    any_buffer_sink abs(&sink);
95  
    any_buffer_sink abs(&sink);
96  

96  

97  
    // BufferSink interface: generate into callee-owned buffers
97  
    // BufferSink interface: generate into callee-owned buffers
98  
    mutable_buffer arr[16];
98  
    mutable_buffer arr[16];
99  
    auto bufs = abs.prepare(arr);
99  
    auto bufs = abs.prepare(arr);
100  
    // Write data into bufs[0..bufs.size())
100  
    // Write data into bufs[0..bufs.size())
101  
    auto [ec] = co_await abs.commit(bytes_written);
101  
    auto [ec] = co_await abs.commit(bytes_written);
102  
    auto [ec2] = co_await abs.commit_eof(0);
102  
    auto [ec2] = co_await abs.commit_eof(0);
103  

103  

104  
    // WriteSink interface: send caller-owned buffers
104  
    // WriteSink interface: send caller-owned buffers
105  
    auto [ec3, n] = co_await abs.write(make_buffer("hello", 5));
105  
    auto [ec3, n] = co_await abs.write(make_buffer("hello", 5));
106  
    auto [ec4] = co_await abs.write_eof();
106  
    auto [ec4] = co_await abs.write_eof();
107  

107  

108  
    // Or send everything at once
108  
    // Or send everything at once
109  
    auto [ec5, n2] = co_await abs.write_eof(
109  
    auto [ec5, n2] = co_await abs.write_eof(
110  
        make_buffer(body_data));
110  
        make_buffer(body_data));
111  
    @endcode
111  
    @endcode
112  

112  

113  
    @see any_buffer_source, BufferSink, WriteSink
113  
    @see any_buffer_source, BufferSink, WriteSink
114  
*/
114  
*/
115  
class any_buffer_sink
115  
class any_buffer_sink
116  
{
116  
{
117  
    struct vtable;
117  
    struct vtable;
118  
    struct awaitable_ops;
118  
    struct awaitable_ops;
119  
    struct write_awaitable_ops;
119  
    struct write_awaitable_ops;
120  

120  

121  
    template<BufferSink S>
121  
    template<BufferSink S>
122  
    struct vtable_for_impl;
122  
    struct vtable_for_impl;
123  

123  

124  
    // hot-path members first for cache locality
124  
    // hot-path members first for cache locality
125  
    void* sink_ = nullptr;
125  
    void* sink_ = nullptr;
126  
    vtable const* vt_ = nullptr;
126  
    vtable const* vt_ = nullptr;
127  
    void* cached_awaitable_ = nullptr;
127  
    void* cached_awaitable_ = nullptr;
128  
    awaitable_ops const* active_ops_ = nullptr;
128  
    awaitable_ops const* active_ops_ = nullptr;
129  
    write_awaitable_ops const* active_write_ops_ = nullptr;
129  
    write_awaitable_ops const* active_write_ops_ = nullptr;
130  
    void* storage_ = nullptr;
130  
    void* storage_ = nullptr;
131  

131  

132  
public:
132  
public:
133  
    /** Destructor.
133  
    /** Destructor.
134  

134  

135  
        Destroys the owned sink (if any) and releases the cached
135  
        Destroys the owned sink (if any) and releases the cached
136  
        awaitable storage.
136  
        awaitable storage.
137  
    */
137  
    */
138  
    ~any_buffer_sink();
138  
    ~any_buffer_sink();
139  

139  

140  
    /** Default constructor.
140  
    /** Default constructor.
141  

141  

142  
        Constructs an empty wrapper. Operations on a default-constructed
142  
        Constructs an empty wrapper. Operations on a default-constructed
143  
        wrapper result in undefined behavior.
143  
        wrapper result in undefined behavior.
144  
    */
144  
    */
145  
    any_buffer_sink() = default;
145  
    any_buffer_sink() = default;
146  

146  

147  
    /** Non-copyable.
147  
    /** Non-copyable.
148  

148  

149  
        The awaitable cache is per-instance and cannot be shared.
149  
        The awaitable cache is per-instance and cannot be shared.
150  
    */
150  
    */
151  
    any_buffer_sink(any_buffer_sink const&) = delete;
151  
    any_buffer_sink(any_buffer_sink const&) = delete;
152  
    any_buffer_sink& operator=(any_buffer_sink const&) = delete;
152  
    any_buffer_sink& operator=(any_buffer_sink const&) = delete;
153  

153  

154  
    /** Move constructor.
154  
    /** Move constructor.
155  

155  

156  
        Transfers ownership of the wrapped sink (if owned) and
156  
        Transfers ownership of the wrapped sink (if owned) and
157  
        cached awaitable storage from `other`. After the move, `other` is
157  
        cached awaitable storage from `other`. After the move, `other` is
158  
        in a default-constructed state.
158  
        in a default-constructed state.
159  

159  

160  
        @param other The wrapper to move from.
160  
        @param other The wrapper to move from.
161  
    */
161  
    */
162  
    any_buffer_sink(any_buffer_sink&& other) noexcept
162  
    any_buffer_sink(any_buffer_sink&& other) noexcept
163  
        : sink_(std::exchange(other.sink_, nullptr))
163  
        : sink_(std::exchange(other.sink_, nullptr))
164  
        , vt_(std::exchange(other.vt_, nullptr))
164  
        , vt_(std::exchange(other.vt_, nullptr))
165  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
165  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
166  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
166  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
167  
        , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
167  
        , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
168  
        , storage_(std::exchange(other.storage_, nullptr))
168  
        , storage_(std::exchange(other.storage_, nullptr))
169  
    {
169  
    {
170  
    }
170  
    }
171  

171  

172  
    /** Move assignment operator.
172  
    /** Move assignment operator.
173  

173  

174  
        Destroys any owned sink and releases existing resources,
174  
        Destroys any owned sink and releases existing resources,
175  
        then transfers ownership from `other`.
175  
        then transfers ownership from `other`.
176  

176  

177  
        @param other The wrapper to move from.
177  
        @param other The wrapper to move from.
178  
        @return Reference to this wrapper.
178  
        @return Reference to this wrapper.
179  
    */
179  
    */
180  
    any_buffer_sink&
180  
    any_buffer_sink&
181  
    operator=(any_buffer_sink&& other) noexcept;
181  
    operator=(any_buffer_sink&& other) noexcept;
182  

182  

183  
    /** Construct by taking ownership of a BufferSink.
183  
    /** Construct by taking ownership of a BufferSink.
184  

184  

185  
        Allocates storage and moves the sink into this wrapper.
185  
        Allocates storage and moves the sink into this wrapper.
186  
        The wrapper owns the sink and will destroy it. If `S` also
186  
        The wrapper owns the sink and will destroy it. If `S` also
187  
        satisfies @ref WriteSink, native write operations are
187  
        satisfies @ref WriteSink, native write operations are
188  
        forwarded through the virtual boundary.
188  
        forwarded through the virtual boundary.
189  

189  

190  
        @param s The sink to take ownership of.
190  
        @param s The sink to take ownership of.
191  
    */
191  
    */
192  
    template<BufferSink S>
192  
    template<BufferSink S>
193  
        requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
193  
        requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
194  
    any_buffer_sink(S s);
194  
    any_buffer_sink(S s);
195  

195  

196  
    /** Construct by wrapping a BufferSink without ownership.
196  
    /** Construct by wrapping a BufferSink without ownership.
197  

197  

198  
        Wraps the given sink by pointer. The sink must remain
198  
        Wraps the given sink by pointer. The sink must remain
199  
        valid for the lifetime of this wrapper. If `S` also
199  
        valid for the lifetime of this wrapper. If `S` also
200  
        satisfies @ref WriteSink, native write operations are
200  
        satisfies @ref WriteSink, native write operations are
201  
        forwarded through the virtual boundary.
201  
        forwarded through the virtual boundary.
202  

202  

203  
        @param s Pointer to the sink to wrap.
203  
        @param s Pointer to the sink to wrap.
204  
    */
204  
    */
205  
    template<BufferSink S>
205  
    template<BufferSink S>
206  
    any_buffer_sink(S* s);
206  
    any_buffer_sink(S* s);
207  

207  

208  
    /** Check if the wrapper contains a valid sink.
208  
    /** Check if the wrapper contains a valid sink.
209  

209  

210  
        @return `true` if wrapping a sink, `false` if default-constructed
210  
        @return `true` if wrapping a sink, `false` if default-constructed
211  
            or moved-from.
211  
            or moved-from.
212  
    */
212  
    */
213  
    bool
213  
    bool
214  
    has_value() const noexcept
214  
    has_value() const noexcept
215  
    {
215  
    {
216  
        return sink_ != nullptr;
216  
        return sink_ != nullptr;
217  
    }
217  
    }
218  

218  

219  
    /** Check if the wrapper contains a valid sink.
219  
    /** Check if the wrapper contains a valid sink.
220  

220  

221  
        @return `true` if wrapping a sink, `false` if default-constructed
221  
        @return `true` if wrapping a sink, `false` if default-constructed
222  
            or moved-from.
222  
            or moved-from.
223  
    */
223  
    */
224  
    explicit
224  
    explicit
225  
    operator bool() const noexcept
225  
    operator bool() const noexcept
226  
    {
226  
    {
227  
        return has_value();
227  
        return has_value();
228  
    }
228  
    }
229  

229  

230  
    /** Prepare writable buffers.
230  
    /** Prepare writable buffers.
231  

231  

232  
        Fills the provided span with mutable buffer descriptors
232  
        Fills the provided span with mutable buffer descriptors
233  
        pointing to the underlying sink's internal storage. This
233  
        pointing to the underlying sink's internal storage. This
234  
        operation is synchronous.
234  
        operation is synchronous.
235  

235  

236  
        @param dest Span of mutable_buffer to fill.
236  
        @param dest Span of mutable_buffer to fill.
237  

237  

238  
        @return A span of filled buffers.
238  
        @return A span of filled buffers.
239  

239  

240  
        @par Preconditions
240  
        @par Preconditions
241  
        The wrapper must contain a valid sink (`has_value() == true`).
241  
        The wrapper must contain a valid sink (`has_value() == true`).
242  
    */
242  
    */
243  
    std::span<mutable_buffer>
243  
    std::span<mutable_buffer>
244  
    prepare(std::span<mutable_buffer> dest);
244  
    prepare(std::span<mutable_buffer> dest);
245  

245  

246  
    /** Commit bytes written to the prepared buffers.
246  
    /** Commit bytes written to the prepared buffers.
247  

247  

248  
        Commits `n` bytes written to the buffers returned by the
248  
        Commits `n` bytes written to the buffers returned by the
249  
        most recent call to @ref prepare. The operation may trigger
249  
        most recent call to @ref prepare. The operation may trigger
250  
        underlying I/O.
250  
        underlying I/O.
251  

251  

252  
        @param n The number of bytes to commit.
252  
        @param n The number of bytes to commit.
253  

253  

254  
        @return An awaitable yielding `(error_code)`.
254  
        @return An awaitable yielding `(error_code)`.
255  

255  

256  
        @par Preconditions
256  
        @par Preconditions
257  
        The wrapper must contain a valid sink (`has_value() == true`).
257  
        The wrapper must contain a valid sink (`has_value() == true`).
258  
    */
258  
    */
259  
    auto
259  
    auto
260  
    commit(std::size_t n);
260  
    commit(std::size_t n);
261  

261  

262  
    /** Commit final bytes and signal end-of-stream.
262  
    /** Commit final bytes and signal end-of-stream.
263  

263  

264  
        Commits `n` bytes written to the buffers returned by the
264  
        Commits `n` bytes written to the buffers returned by the
265  
        most recent call to @ref prepare and finalizes the sink.
265  
        most recent call to @ref prepare and finalizes the sink.
266  
        After success, no further operations are permitted.
266  
        After success, no further operations are permitted.
267  

267  

268  
        @param n The number of bytes to commit.
268  
        @param n The number of bytes to commit.
269  

269  

270  
        @return An awaitable yielding `(error_code)`.
270  
        @return An awaitable yielding `(error_code)`.
271  

271  

272  
        @par Preconditions
272  
        @par Preconditions
273  
        The wrapper must contain a valid sink (`has_value() == true`).
273  
        The wrapper must contain a valid sink (`has_value() == true`).
274  
    */
274  
    */
275  
    auto
275  
    auto
276  
    commit_eof(std::size_t n);
276  
    commit_eof(std::size_t n);
277  

277  

278  
    /** Write some data from a buffer sequence.
278  
    /** Write some data from a buffer sequence.
279  

279  

280  
        Writes one or more bytes from the buffer sequence to the
280  
        Writes one or more bytes from the buffer sequence to the
281  
        underlying sink. May consume less than the full sequence.
281  
        underlying sink. May consume less than the full sequence.
282  

282  

283  
        When the wrapped type provides native @ref WriteSink support,
283  
        When the wrapped type provides native @ref WriteSink support,
284  
        the operation forwards directly. Otherwise it is synthesized
284  
        the operation forwards directly. Otherwise it is synthesized
285  
        from @ref prepare and @ref commit with a buffer copy.
285  
        from @ref prepare and @ref commit with a buffer copy.
286  

286  

287  
        @param buffers The buffer sequence to write.
287  
        @param buffers The buffer sequence to write.
288  

288  

289  
        @return An awaitable yielding `(error_code,std::size_t)`.
289  
        @return An awaitable yielding `(error_code,std::size_t)`.
290  

290  

291  
        @par Preconditions
291  
        @par Preconditions
292  
        The wrapper must contain a valid sink (`has_value() == true`).
292  
        The wrapper must contain a valid sink (`has_value() == true`).
293  
    */
293  
    */
294  
    template<ConstBufferSequence CB>
294  
    template<ConstBufferSequence CB>
295  
    io_task<std::size_t>
295  
    io_task<std::size_t>
296  
    write_some(CB buffers);
296  
    write_some(CB buffers);
297  

297  

298  
    /** Write all data from a buffer sequence.
298  
    /** Write all data from a buffer sequence.
299  

299  

300  
        Writes all data from the buffer sequence to the underlying
300  
        Writes all data from the buffer sequence to the underlying
301  
        sink. This method satisfies the @ref WriteSink concept.
301  
        sink. This method satisfies the @ref WriteSink concept.
302  

302  

303  
        When the wrapped type provides native @ref WriteSink support,
303  
        When the wrapped type provides native @ref WriteSink support,
304  
        each window is forwarded directly. Otherwise the data is
304  
        each window is forwarded directly. Otherwise the data is
305  
        copied into the sink via @ref prepare and @ref commit.
305  
        copied into the sink via @ref prepare and @ref commit.
306  

306  

307  
        @param buffers The buffer sequence to write.
307  
        @param buffers The buffer sequence to write.
308  

308  

309  
        @return An awaitable yielding `(error_code,std::size_t)`.
309  
        @return An awaitable yielding `(error_code,std::size_t)`.
310  

310  

311  
        @par Preconditions
311  
        @par Preconditions
312  
        The wrapper must contain a valid sink (`has_value() == true`).
312  
        The wrapper must contain a valid sink (`has_value() == true`).
313  
    */
313  
    */
314  
    template<ConstBufferSequence CB>
314  
    template<ConstBufferSequence CB>
315  
    io_task<std::size_t>
315  
    io_task<std::size_t>
316  
    write(CB buffers);
316  
    write(CB buffers);
317  

317  

318  
    /** Atomically write data and signal end-of-stream.
318  
    /** Atomically write data and signal end-of-stream.
319  

319  

320  
        Writes all data from the buffer sequence to the underlying
320  
        Writes all data from the buffer sequence to the underlying
321  
        sink and then signals end-of-stream.
321  
        sink and then signals end-of-stream.
322  

322  

323  
        When the wrapped type provides native @ref WriteSink support,
323  
        When the wrapped type provides native @ref WriteSink support,
324  
        the final window is sent atomically via the underlying
324  
        the final window is sent atomically via the underlying
325  
        `write_eof(buffers)`. Otherwise the data is synthesized
325  
        `write_eof(buffers)`. Otherwise the data is synthesized
326  
        through @ref prepare, @ref commit, and @ref commit_eof.
326  
        through @ref prepare, @ref commit, and @ref commit_eof.
327  

327  

328  
        @param buffers The buffer sequence to write.
328  
        @param buffers The buffer sequence to write.
329  

329  

330  
        @return An awaitable yielding `(error_code,std::size_t)`.
330  
        @return An awaitable yielding `(error_code,std::size_t)`.
331  

331  

332  
        @par Preconditions
332  
        @par Preconditions
333  
        The wrapper must contain a valid sink (`has_value() == true`).
333  
        The wrapper must contain a valid sink (`has_value() == true`).
334  
    */
334  
    */
335  
    template<ConstBufferSequence CB>
335  
    template<ConstBufferSequence CB>
336  
    io_task<std::size_t>
336  
    io_task<std::size_t>
337  
    write_eof(CB buffers);
337  
    write_eof(CB buffers);
338  

338  

339  
    /** Signal end-of-stream.
339  
    /** Signal end-of-stream.
340  

340  

341  
        Indicates that no more data will be written to the sink.
341  
        Indicates that no more data will be written to the sink.
342  
        This method satisfies the @ref WriteSink concept.
342  
        This method satisfies the @ref WriteSink concept.
343  

343  

344  
        When the wrapped type provides native @ref WriteSink support,
344  
        When the wrapped type provides native @ref WriteSink support,
345  
        the underlying `write_eof()` is called. Otherwise the
345  
        the underlying `write_eof()` is called. Otherwise the
346  
        operation is implemented as `commit_eof(0)`.
346  
        operation is implemented as `commit_eof(0)`.
347  

347  

348  
        @return An awaitable yielding `(error_code)`.
348  
        @return An awaitable yielding `(error_code)`.
349  

349  

350  
        @par Preconditions
350  
        @par Preconditions
351  
        The wrapper must contain a valid sink (`has_value() == true`).
351  
        The wrapper must contain a valid sink (`has_value() == true`).
352  
    */
352  
    */
353  
    auto
353  
    auto
354  
    write_eof();
354  
    write_eof();
355  

355  

356  
protected:
356  
protected:
357  
    /** Rebind to a new sink after move.
357  
    /** Rebind to a new sink after move.
358  

358  

359  
        Updates the internal pointer to reference a new sink object.
359  
        Updates the internal pointer to reference a new sink object.
360  
        Used by owning wrappers after move assignment when the owned
360  
        Used by owning wrappers after move assignment when the owned
361  
        object has moved to a new location.
361  
        object has moved to a new location.
362  

362  

363  
        @param new_sink The new sink to bind to. Must be the same
363  
        @param new_sink The new sink to bind to. Must be the same
364  
            type as the original sink.
364  
            type as the original sink.
365  

365  

366  
        @note Terminates if called with a sink of different type
366  
        @note Terminates if called with a sink of different type
367  
            than the original.
367  
            than the original.
368  
    */
368  
    */
369  
    template<BufferSink S>
369  
    template<BufferSink S>
370  
    void
370  
    void
371  
    rebind(S& new_sink) noexcept
371  
    rebind(S& new_sink) noexcept
372  
    {
372  
    {
373  
        if(vt_ != &vtable_for_impl<S>::value)
373  
        if(vt_ != &vtable_for_impl<S>::value)
374  
            std::terminate();
374  
            std::terminate();
375  
        sink_ = &new_sink;
375  
        sink_ = &new_sink;
376  
    }
376  
    }
377  

377  

378  
private:
378  
private:
379  
    /** Forward a partial write through the vtable.
379  
    /** Forward a partial write through the vtable.
380  

380  

381  
        Constructs the underlying `write_some` awaitable in
381  
        Constructs the underlying `write_some` awaitable in
382  
        cached storage and returns a type-erased awaitable.
382  
        cached storage and returns a type-erased awaitable.
383  
    */
383  
    */
384  
    auto
384  
    auto
385  
    write_some_(std::span<const_buffer const> buffers);
385  
    write_some_(std::span<const_buffer const> buffers);
386  

386  

387  
    /** Forward a complete write through the vtable.
387  
    /** Forward a complete write through the vtable.
388  

388  

389  
        Constructs the underlying `write` awaitable in
389  
        Constructs the underlying `write` awaitable in
390  
        cached storage and returns a type-erased awaitable.
390  
        cached storage and returns a type-erased awaitable.
391  
    */
391  
    */
392  
    auto
392  
    auto
393  
    write_(std::span<const_buffer const> buffers);
393  
    write_(std::span<const_buffer const> buffers);
394  

394  

395  
    /** Forward an atomic write-with-EOF through the vtable.
395  
    /** Forward an atomic write-with-EOF through the vtable.
396  

396  

397  
        Constructs the underlying `write_eof(buffers)` awaitable
397  
        Constructs the underlying `write_eof(buffers)` awaitable
398  
        in cached storage and returns a type-erased awaitable.
398  
        in cached storage and returns a type-erased awaitable.
399  
    */
399  
    */
400  
    auto
400  
    auto
401  
    write_eof_buffers_(std::span<const_buffer const> buffers);
401  
    write_eof_buffers_(std::span<const_buffer const> buffers);
402  
};
402  
};
403  

403  

404  
//----------------------------------------------------------
404  
//----------------------------------------------------------
405  

405  

406  
/** Type-erased ops for awaitables yielding `io_result<>`. */
406  
/** Type-erased ops for awaitables yielding `io_result<>`. */
407  
struct any_buffer_sink::awaitable_ops
407  
struct any_buffer_sink::awaitable_ops
408  
{
408  
{
409  
    bool (*await_ready)(void*);
409  
    bool (*await_ready)(void*);
410 -
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
410 +
    coro (*await_suspend)(void*, coro, executor_ref const&, std::stop_token const&);
411  
    io_result<> (*await_resume)(void*);
411  
    io_result<> (*await_resume)(void*);
412  
    void (*destroy)(void*) noexcept;
412  
    void (*destroy)(void*) noexcept;
413  
};
413  
};
414  

414  

415  
/** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
415  
/** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
416  
struct any_buffer_sink::write_awaitable_ops
416  
struct any_buffer_sink::write_awaitable_ops
417  
{
417  
{
418  
    bool (*await_ready)(void*);
418  
    bool (*await_ready)(void*);
419 -
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
419 +
    coro (*await_suspend)(void*, coro, executor_ref const&, std::stop_token const&);
420  
    io_result<std::size_t> (*await_resume)(void*);
420  
    io_result<std::size_t> (*await_resume)(void*);
421  
    void (*destroy)(void*) noexcept;
421  
    void (*destroy)(void*) noexcept;
422  
};
422  
};
423  

423  

424  
struct any_buffer_sink::vtable
424  
struct any_buffer_sink::vtable
425  
{
425  
{
426  
    void (*destroy)(void*) noexcept;
426  
    void (*destroy)(void*) noexcept;
427  
    std::span<mutable_buffer> (*do_prepare)(
427  
    std::span<mutable_buffer> (*do_prepare)(
428  
        void* sink,
428  
        void* sink,
429  
        std::span<mutable_buffer> dest);
429  
        std::span<mutable_buffer> dest);
430  
    std::size_t awaitable_size;
430  
    std::size_t awaitable_size;
431  
    std::size_t awaitable_align;
431  
    std::size_t awaitable_align;
432  
    awaitable_ops const* (*construct_commit_awaitable)(
432  
    awaitable_ops const* (*construct_commit_awaitable)(
433  
        void* sink,
433  
        void* sink,
434  
        void* storage,
434  
        void* storage,
435  
        std::size_t n);
435  
        std::size_t n);
436  
    awaitable_ops const* (*construct_commit_eof_awaitable)(
436  
    awaitable_ops const* (*construct_commit_eof_awaitable)(
437  
        void* sink,
437  
        void* sink,
438  
        void* storage,
438  
        void* storage,
439  
        std::size_t n);
439  
        std::size_t n);
440  

440  

441  
    // WriteSink forwarding (null when wrapped type is BufferSink-only)
441  
    // WriteSink forwarding (null when wrapped type is BufferSink-only)
442  
    write_awaitable_ops const* (*construct_write_some_awaitable)(
442  
    write_awaitable_ops const* (*construct_write_some_awaitable)(
443  
        void* sink,
443  
        void* sink,
444  
        void* storage,
444  
        void* storage,
445  
        std::span<const_buffer const> buffers);
445  
        std::span<const_buffer const> buffers);
446  
    write_awaitable_ops const* (*construct_write_awaitable)(
446  
    write_awaitable_ops const* (*construct_write_awaitable)(
447  
        void* sink,
447  
        void* sink,
448  
        void* storage,
448  
        void* storage,
449  
        std::span<const_buffer const> buffers);
449  
        std::span<const_buffer const> buffers);
450  
    write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
450  
    write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
451  
        void* sink,
451  
        void* sink,
452  
        void* storage,
452  
        void* storage,
453  
        std::span<const_buffer const> buffers);
453  
        std::span<const_buffer const> buffers);
454  
    awaitable_ops const* (*construct_write_eof_awaitable)(
454  
    awaitable_ops const* (*construct_write_eof_awaitable)(
455  
        void* sink,
455  
        void* sink,
456  
        void* storage);
456  
        void* storage);
457  
};
457  
};
458  

458  

459  
template<BufferSink S>
459  
template<BufferSink S>
460  
struct any_buffer_sink::vtable_for_impl
460  
struct any_buffer_sink::vtable_for_impl
461  
{
461  
{
462  
    using CommitAwaitable = decltype(std::declval<S&>().commit(
462  
    using CommitAwaitable = decltype(std::declval<S&>().commit(
463  
        std::size_t{}));
463  
        std::size_t{}));
464  
    using CommitEofAwaitable = decltype(std::declval<S&>().commit_eof(
464  
    using CommitEofAwaitable = decltype(std::declval<S&>().commit_eof(
465  
        std::size_t{}));
465  
        std::size_t{}));
466  

466  

467  
    static void
467  
    static void
468  
    do_destroy_impl(void* sink) noexcept
468  
    do_destroy_impl(void* sink) noexcept
469  
    {
469  
    {
470  
        static_cast<S*>(sink)->~S();
470  
        static_cast<S*>(sink)->~S();
471  
    }
471  
    }
472  

472  

473  
    static std::span<mutable_buffer>
473  
    static std::span<mutable_buffer>
474  
    do_prepare_impl(
474  
    do_prepare_impl(
475  
        void* sink,
475  
        void* sink,
476  
        std::span<mutable_buffer> dest)
476  
        std::span<mutable_buffer> dest)
477  
    {
477  
    {
478  
        auto& s = *static_cast<S*>(sink);
478  
        auto& s = *static_cast<S*>(sink);
479  
        return s.prepare(dest);
479  
        return s.prepare(dest);
480  
    }
480  
    }
481  

481  

482  
    static awaitable_ops const*
482  
    static awaitable_ops const*
483  
    construct_commit_awaitable_impl(
483  
    construct_commit_awaitable_impl(
484  
        void* sink,
484  
        void* sink,
485  
        void* storage,
485  
        void* storage,
486  
        std::size_t n)
486  
        std::size_t n)
487  
    {
487  
    {
488  
        auto& s = *static_cast<S*>(sink);
488  
        auto& s = *static_cast<S*>(sink);
489  
        ::new(storage) CommitAwaitable(s.commit(n));
489  
        ::new(storage) CommitAwaitable(s.commit(n));
490  

490  

491  
        static constexpr awaitable_ops ops = {
491  
        static constexpr awaitable_ops ops = {
492  
            +[](void* p) {
492  
            +[](void* p) {
493  
                return static_cast<CommitAwaitable*>(p)->await_ready();
493  
                return static_cast<CommitAwaitable*>(p)->await_ready();
494  
            },
494  
            },
495 -
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
495 +
            +[](void* p, coro h, executor_ref const& ex, std::stop_token const& token) {
496  
                return detail::call_await_suspend(
496  
                return detail::call_await_suspend(
497  
                    static_cast<CommitAwaitable*>(p), h, ex, token);
497  
                    static_cast<CommitAwaitable*>(p), h, ex, token);
498  
            },
498  
            },
499  
            +[](void* p) {
499  
            +[](void* p) {
500  
                return static_cast<CommitAwaitable*>(p)->await_resume();
500  
                return static_cast<CommitAwaitable*>(p)->await_resume();
501  
            },
501  
            },
502  
            +[](void* p) noexcept {
502  
            +[](void* p) noexcept {
503  
                static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
503  
                static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
504  
            }
504  
            }
505  
        };
505  
        };
506  
        return &ops;
506  
        return &ops;
507  
    }
507  
    }
508  

508  

509  
    static awaitable_ops const*
509  
    static awaitable_ops const*
510  
    construct_commit_eof_awaitable_impl(
510  
    construct_commit_eof_awaitable_impl(
511  
        void* sink,
511  
        void* sink,
512  
        void* storage,
512  
        void* storage,
513  
        std::size_t n)
513  
        std::size_t n)
514  
    {
514  
    {
515  
        auto& s = *static_cast<S*>(sink);
515  
        auto& s = *static_cast<S*>(sink);
516  
        ::new(storage) CommitEofAwaitable(s.commit_eof(n));
516  
        ::new(storage) CommitEofAwaitable(s.commit_eof(n));
517  

517  

518  
        static constexpr awaitable_ops ops = {
518  
        static constexpr awaitable_ops ops = {
519  
            +[](void* p) {
519  
            +[](void* p) {
520  
                return static_cast<CommitEofAwaitable*>(p)->await_ready();
520  
                return static_cast<CommitEofAwaitable*>(p)->await_ready();
521  
            },
521  
            },
522 -
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
522 +
            +[](void* p, coro h, executor_ref const& ex, std::stop_token const& token) {
523  
                return detail::call_await_suspend(
523  
                return detail::call_await_suspend(
524  
                    static_cast<CommitEofAwaitable*>(p), h, ex, token);
524  
                    static_cast<CommitEofAwaitable*>(p), h, ex, token);
525  
            },
525  
            },
526  
            +[](void* p) {
526  
            +[](void* p) {
527  
                return static_cast<CommitEofAwaitable*>(p)->await_resume();
527  
                return static_cast<CommitEofAwaitable*>(p)->await_resume();
528  
            },
528  
            },
529  
            +[](void* p) noexcept {
529  
            +[](void* p) noexcept {
530  
                static_cast<CommitEofAwaitable*>(p)->~CommitEofAwaitable();
530  
                static_cast<CommitEofAwaitable*>(p)->~CommitEofAwaitable();
531  
            }
531  
            }
532  
        };
532  
        };
533  
        return &ops;
533  
        return &ops;
534  
    }
534  
    }
535  

535  

536  
    //------------------------------------------------------
536  
    //------------------------------------------------------
537  
    // WriteSink forwarding (only instantiated when WriteSink<S>)
537  
    // WriteSink forwarding (only instantiated when WriteSink<S>)
538  

538  

539  
    static write_awaitable_ops const*
539  
    static write_awaitable_ops const*
540  
    construct_write_some_awaitable_impl(
540  
    construct_write_some_awaitable_impl(
541  
        void* sink,
541  
        void* sink,
542  
        void* storage,
542  
        void* storage,
543  
        std::span<const_buffer const> buffers)
543  
        std::span<const_buffer const> buffers)
544  
        requires WriteSink<S>
544  
        requires WriteSink<S>
545  
    {
545  
    {
546  
        using Aw = decltype(std::declval<S&>().write_some(
546  
        using Aw = decltype(std::declval<S&>().write_some(
547  
            std::span<const_buffer const>{}));
547  
            std::span<const_buffer const>{}));
548  
        auto& s = *static_cast<S*>(sink);
548  
        auto& s = *static_cast<S*>(sink);
549  
        ::new(storage) Aw(s.write_some(buffers));
549  
        ::new(storage) Aw(s.write_some(buffers));
550  

550  

551  
        static constexpr write_awaitable_ops ops = {
551  
        static constexpr write_awaitable_ops ops = {
552  
            +[](void* p) {
552  
            +[](void* p) {
553  
                return static_cast<Aw*>(p)->await_ready();
553  
                return static_cast<Aw*>(p)->await_ready();
554  
            },
554  
            },
555 -
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
555 +
            +[](void* p, coro h, executor_ref const& ex, std::stop_token const& token) {
556  
                return detail::call_await_suspend(
556  
                return detail::call_await_suspend(
557  
                    static_cast<Aw*>(p), h, ex, token);
557  
                    static_cast<Aw*>(p), h, ex, token);
558  
            },
558  
            },
559  
            +[](void* p) {
559  
            +[](void* p) {
560  
                return static_cast<Aw*>(p)->await_resume();
560  
                return static_cast<Aw*>(p)->await_resume();
561  
            },
561  
            },
562  
            +[](void* p) noexcept {
562  
            +[](void* p) noexcept {
563  
                static_cast<Aw*>(p)->~Aw();
563  
                static_cast<Aw*>(p)->~Aw();
564  
            }
564  
            }
565  
        };
565  
        };
566  
        return &ops;
566  
        return &ops;
567  
    }
567  
    }
568  

568  

569  
    static write_awaitable_ops const*
569  
    static write_awaitable_ops const*
570  
    construct_write_awaitable_impl(
570  
    construct_write_awaitable_impl(
571  
        void* sink,
571  
        void* sink,
572  
        void* storage,
572  
        void* storage,
573  
        std::span<const_buffer const> buffers)
573  
        std::span<const_buffer const> buffers)
574  
        requires WriteSink<S>
574  
        requires WriteSink<S>
575  
    {
575  
    {
576  
        using Aw = decltype(std::declval<S&>().write(
576  
        using Aw = decltype(std::declval<S&>().write(
577  
            std::span<const_buffer const>{}));
577  
            std::span<const_buffer const>{}));
578  
        auto& s = *static_cast<S*>(sink);
578  
        auto& s = *static_cast<S*>(sink);
579  
        ::new(storage) Aw(s.write(buffers));
579  
        ::new(storage) Aw(s.write(buffers));
580  

580  

581  
        static constexpr write_awaitable_ops ops = {
581  
        static constexpr write_awaitable_ops ops = {
582  
            +[](void* p) {
582  
            +[](void* p) {
583  
                return static_cast<Aw*>(p)->await_ready();
583  
                return static_cast<Aw*>(p)->await_ready();
584  
            },
584  
            },
585 -
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
585 +
            +[](void* p, coro h, executor_ref const& ex, std::stop_token const& token) {
586  
                return detail::call_await_suspend(
586  
                return detail::call_await_suspend(
587  
                    static_cast<Aw*>(p), h, ex, token);
587  
                    static_cast<Aw*>(p), h, ex, token);
588  
            },
588  
            },
589  
            +[](void* p) {
589  
            +[](void* p) {
590  
                return static_cast<Aw*>(p)->await_resume();
590  
                return static_cast<Aw*>(p)->await_resume();
591  
            },
591  
            },
592  
            +[](void* p) noexcept {
592  
            +[](void* p) noexcept {
593  
                static_cast<Aw*>(p)->~Aw();
593  
                static_cast<Aw*>(p)->~Aw();
594  
            }
594  
            }
595  
        };
595  
        };
596  
        return &ops;
596  
        return &ops;
597  
    }
597  
    }
598  

598  

599  
    static write_awaitable_ops const*
599  
    static write_awaitable_ops const*
600  
    construct_write_eof_buffers_awaitable_impl(
600  
    construct_write_eof_buffers_awaitable_impl(
601  
        void* sink,
601  
        void* sink,
602  
        void* storage,
602  
        void* storage,
603  
        std::span<const_buffer const> buffers)
603  
        std::span<const_buffer const> buffers)
604  
        requires WriteSink<S>
604  
        requires WriteSink<S>
605  
    {
605  
    {
606  
        using Aw = decltype(std::declval<S&>().write_eof(
606  
        using Aw = decltype(std::declval<S&>().write_eof(
607  
            std::span<const_buffer const>{}));
607  
            std::span<const_buffer const>{}));
608  
        auto& s = *static_cast<S*>(sink);
608  
        auto& s = *static_cast<S*>(sink);
609  
        ::new(storage) Aw(s.write_eof(buffers));
609  
        ::new(storage) Aw(s.write_eof(buffers));
610  

610  

611  
        static constexpr write_awaitable_ops ops = {
611  
        static constexpr write_awaitable_ops ops = {
612  
            +[](void* p) {
612  
            +[](void* p) {
613  
                return static_cast<Aw*>(p)->await_ready();
613  
                return static_cast<Aw*>(p)->await_ready();
614  
            },
614  
            },
615 -
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
615 +
            +[](void* p, coro h, executor_ref const& ex, std::stop_token const& token) {
616  
                return detail::call_await_suspend(
616  
                return detail::call_await_suspend(
617  
                    static_cast<Aw*>(p), h, ex, token);
617  
                    static_cast<Aw*>(p), h, ex, token);
618  
            },
618  
            },
619  
            +[](void* p) {
619  
            +[](void* p) {
620  
                return static_cast<Aw*>(p)->await_resume();
620  
                return static_cast<Aw*>(p)->await_resume();
621  
            },
621  
            },
622  
            +[](void* p) noexcept {
622  
            +[](void* p) noexcept {
623  
                static_cast<Aw*>(p)->~Aw();
623  
                static_cast<Aw*>(p)->~Aw();
624  
            }
624  
            }
625  
        };
625  
        };
626  
        return &ops;
626  
        return &ops;
627  
    }
627  
    }
628  

628  

629  
    static awaitable_ops const*
629  
    static awaitable_ops const*
630  
    construct_write_eof_awaitable_impl(
630  
    construct_write_eof_awaitable_impl(
631  
        void* sink,
631  
        void* sink,
632  
        void* storage)
632  
        void* storage)
633  
        requires WriteSink<S>
633  
        requires WriteSink<S>
634  
    {
634  
    {
635  
        using Aw = decltype(std::declval<S&>().write_eof());
635  
        using Aw = decltype(std::declval<S&>().write_eof());
636  
        auto& s = *static_cast<S*>(sink);
636  
        auto& s = *static_cast<S*>(sink);
637  
        ::new(storage) Aw(s.write_eof());
637  
        ::new(storage) Aw(s.write_eof());
638  

638  

639  
        static constexpr awaitable_ops ops = {
639  
        static constexpr awaitable_ops ops = {
640  
            +[](void* p) {
640  
            +[](void* p) {
641  
                return static_cast<Aw*>(p)->await_ready();
641  
                return static_cast<Aw*>(p)->await_ready();
642  
            },
642  
            },
643 -
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
643 +
            +[](void* p, coro h, executor_ref const& ex, std::stop_token const& token) {
644  
                return detail::call_await_suspend(
644  
                return detail::call_await_suspend(
645  
                    static_cast<Aw*>(p), h, ex, token);
645  
                    static_cast<Aw*>(p), h, ex, token);
646  
            },
646  
            },
647  
            +[](void* p) {
647  
            +[](void* p) {
648  
                return static_cast<Aw*>(p)->await_resume();
648  
                return static_cast<Aw*>(p)->await_resume();
649  
            },
649  
            },
650  
            +[](void* p) noexcept {
650  
            +[](void* p) noexcept {
651  
                static_cast<Aw*>(p)->~Aw();
651  
                static_cast<Aw*>(p)->~Aw();
652  
            }
652  
            }
653  
        };
653  
        };
654  
        return &ops;
654  
        return &ops;
655  
    }
655  
    }
656  

656  

657  
    //------------------------------------------------------
657  
    //------------------------------------------------------
658  

658  

659  
    static consteval std::size_t
659  
    static consteval std::size_t
660  
    compute_max_size() noexcept
660  
    compute_max_size() noexcept
661  
    {
661  
    {
662  
        std::size_t s = sizeof(CommitAwaitable) > sizeof(CommitEofAwaitable)
662  
        std::size_t s = sizeof(CommitAwaitable) > sizeof(CommitEofAwaitable)
663  
            ? sizeof(CommitAwaitable)
663  
            ? sizeof(CommitAwaitable)
664  
            : sizeof(CommitEofAwaitable);
664  
            : sizeof(CommitEofAwaitable);
665  
        if constexpr (WriteSink<S>)
665  
        if constexpr (WriteSink<S>)
666  
        {
666  
        {
667  
            using WS = decltype(std::declval<S&>().write_some(
667  
            using WS = decltype(std::declval<S&>().write_some(
668  
                std::span<const_buffer const>{}));
668  
                std::span<const_buffer const>{}));
669  
            using W = decltype(std::declval<S&>().write(
669  
            using W = decltype(std::declval<S&>().write(
670  
                std::span<const_buffer const>{}));
670  
                std::span<const_buffer const>{}));
671  
            using WEB = decltype(std::declval<S&>().write_eof(
671  
            using WEB = decltype(std::declval<S&>().write_eof(
672  
                std::span<const_buffer const>{}));
672  
                std::span<const_buffer const>{}));
673  
            using WE = decltype(std::declval<S&>().write_eof());
673  
            using WE = decltype(std::declval<S&>().write_eof());
674  

674  

675  
            if(sizeof(WS) > s) s = sizeof(WS);
675  
            if(sizeof(WS) > s) s = sizeof(WS);
676  
            if(sizeof(W) > s) s = sizeof(W);
676  
            if(sizeof(W) > s) s = sizeof(W);
677  
            if(sizeof(WEB) > s) s = sizeof(WEB);
677  
            if(sizeof(WEB) > s) s = sizeof(WEB);
678  
            if(sizeof(WE) > s) s = sizeof(WE);
678  
            if(sizeof(WE) > s) s = sizeof(WE);
679  
        }
679  
        }
680  
        return s;
680  
        return s;
681  
    }
681  
    }
682  

682  

683  
    static consteval std::size_t
683  
    static consteval std::size_t
684  
    compute_max_align() noexcept
684  
    compute_max_align() noexcept
685  
    {
685  
    {
686  
        std::size_t a = alignof(CommitAwaitable) > alignof(CommitEofAwaitable)
686  
        std::size_t a = alignof(CommitAwaitable) > alignof(CommitEofAwaitable)
687  
            ? alignof(CommitAwaitable)
687  
            ? alignof(CommitAwaitable)
688  
            : alignof(CommitEofAwaitable);
688  
            : alignof(CommitEofAwaitable);
689  
        if constexpr (WriteSink<S>)
689  
        if constexpr (WriteSink<S>)
690  
        {
690  
        {
691  
            using WS = decltype(std::declval<S&>().write_some(
691  
            using WS = decltype(std::declval<S&>().write_some(
692  
                std::span<const_buffer const>{}));
692  
                std::span<const_buffer const>{}));
693  
            using W = decltype(std::declval<S&>().write(
693  
            using W = decltype(std::declval<S&>().write(
694  
                std::span<const_buffer const>{}));
694  
                std::span<const_buffer const>{}));
695  
            using WEB = decltype(std::declval<S&>().write_eof(
695  
            using WEB = decltype(std::declval<S&>().write_eof(
696  
                std::span<const_buffer const>{}));
696  
                std::span<const_buffer const>{}));
697  
            using WE = decltype(std::declval<S&>().write_eof());
697  
            using WE = decltype(std::declval<S&>().write_eof());
698  

698  

699  
            if(alignof(WS) > a) a = alignof(WS);
699  
            if(alignof(WS) > a) a = alignof(WS);
700  
            if(alignof(W) > a) a = alignof(W);
700  
            if(alignof(W) > a) a = alignof(W);
701  
            if(alignof(WEB) > a) a = alignof(WEB);
701  
            if(alignof(WEB) > a) a = alignof(WEB);
702  
            if(alignof(WE) > a) a = alignof(WE);
702  
            if(alignof(WE) > a) a = alignof(WE);
703  
        }
703  
        }
704  
        return a;
704  
        return a;
705  
    }
705  
    }
706  

706  

707  
    static consteval vtable
707  
    static consteval vtable
708  
    make_vtable() noexcept
708  
    make_vtable() noexcept
709  
    {
709  
    {
710  
        vtable v{};
710  
        vtable v{};
711  
        v.destroy = &do_destroy_impl;
711  
        v.destroy = &do_destroy_impl;
712  
        v.do_prepare = &do_prepare_impl;
712  
        v.do_prepare = &do_prepare_impl;
713  
        v.awaitable_size = compute_max_size();
713  
        v.awaitable_size = compute_max_size();
714  
        v.awaitable_align = compute_max_align();
714  
        v.awaitable_align = compute_max_align();
715  
        v.construct_commit_awaitable = &construct_commit_awaitable_impl;
715  
        v.construct_commit_awaitable = &construct_commit_awaitable_impl;
716  
        v.construct_commit_eof_awaitable = &construct_commit_eof_awaitable_impl;
716  
        v.construct_commit_eof_awaitable = &construct_commit_eof_awaitable_impl;
717  
        v.construct_write_some_awaitable = nullptr;
717  
        v.construct_write_some_awaitable = nullptr;
718  
        v.construct_write_awaitable = nullptr;
718  
        v.construct_write_awaitable = nullptr;
719  
        v.construct_write_eof_buffers_awaitable = nullptr;
719  
        v.construct_write_eof_buffers_awaitable = nullptr;
720  
        v.construct_write_eof_awaitable = nullptr;
720  
        v.construct_write_eof_awaitable = nullptr;
721  

721  

722  
        if constexpr (WriteSink<S>)
722  
        if constexpr (WriteSink<S>)
723  
        {
723  
        {
724  
            v.construct_write_some_awaitable =
724  
            v.construct_write_some_awaitable =
725  
                &construct_write_some_awaitable_impl;
725  
                &construct_write_some_awaitable_impl;
726  
            v.construct_write_awaitable =
726  
            v.construct_write_awaitable =
727  
                &construct_write_awaitable_impl;
727  
                &construct_write_awaitable_impl;
728  
            v.construct_write_eof_buffers_awaitable =
728  
            v.construct_write_eof_buffers_awaitable =
729  
                &construct_write_eof_buffers_awaitable_impl;
729  
                &construct_write_eof_buffers_awaitable_impl;
730  
            v.construct_write_eof_awaitable =
730  
            v.construct_write_eof_awaitable =
731  
                &construct_write_eof_awaitable_impl;
731  
                &construct_write_eof_awaitable_impl;
732  
        }
732  
        }
733  
        return v;
733  
        return v;
734  
    }
734  
    }
735  

735  

736  
    static constexpr vtable value = make_vtable();
736  
    static constexpr vtable value = make_vtable();
737  
};
737  
};
738  

738  

739  
//----------------------------------------------------------
739  
//----------------------------------------------------------
740  

740  

741  
inline
741  
inline
742  
any_buffer_sink::~any_buffer_sink()
742  
any_buffer_sink::~any_buffer_sink()
743  
{
743  
{
744  
    if(storage_)
744  
    if(storage_)
745  
    {
745  
    {
746  
        vt_->destroy(sink_);
746  
        vt_->destroy(sink_);
747  
        ::operator delete(storage_);
747  
        ::operator delete(storage_);
748  
    }
748  
    }
749  
    if(cached_awaitable_)
749  
    if(cached_awaitable_)
750  
        ::operator delete(cached_awaitable_);
750  
        ::operator delete(cached_awaitable_);
751  
}
751  
}
752  

752  

753  
inline any_buffer_sink&
753  
inline any_buffer_sink&
754  
any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
754  
any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
755  
{
755  
{
756  
    if(this != &other)
756  
    if(this != &other)
757  
    {
757  
    {
758  
        if(storage_)
758  
        if(storage_)
759  
        {
759  
        {
760  
            vt_->destroy(sink_);
760  
            vt_->destroy(sink_);
761  
            ::operator delete(storage_);
761  
            ::operator delete(storage_);
762  
        }
762  
        }
763  
        if(cached_awaitable_)
763  
        if(cached_awaitable_)
764  
            ::operator delete(cached_awaitable_);
764  
            ::operator delete(cached_awaitable_);
765  
        sink_ = std::exchange(other.sink_, nullptr);
765  
        sink_ = std::exchange(other.sink_, nullptr);
766  
        vt_ = std::exchange(other.vt_, nullptr);
766  
        vt_ = std::exchange(other.vt_, nullptr);
767  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
767  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
768  
        storage_ = std::exchange(other.storage_, nullptr);
768  
        storage_ = std::exchange(other.storage_, nullptr);
769  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
769  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
770  
        active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
770  
        active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
771  
    }
771  
    }
772  
    return *this;
772  
    return *this;
773  
}
773  
}
774  

774  

775  
template<BufferSink S>
775  
template<BufferSink S>
776  
    requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
776  
    requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
777  
any_buffer_sink::any_buffer_sink(S s)
777  
any_buffer_sink::any_buffer_sink(S s)
778  
    : vt_(&vtable_for_impl<S>::value)
778  
    : vt_(&vtable_for_impl<S>::value)
779  
{
779  
{
780  
    struct guard {
780  
    struct guard {
781  
        any_buffer_sink* self;
781  
        any_buffer_sink* self;
782  
        bool committed = false;
782  
        bool committed = false;
783  
        ~guard() {
783  
        ~guard() {
784  
            if(!committed && self->storage_) {
784  
            if(!committed && self->storage_) {
785  
                self->vt_->destroy(self->sink_);
785  
                self->vt_->destroy(self->sink_);
786  
                ::operator delete(self->storage_);
786  
                ::operator delete(self->storage_);
787  
                self->storage_ = nullptr;
787  
                self->storage_ = nullptr;
788  
                self->sink_ = nullptr;
788  
                self->sink_ = nullptr;
789  
            }
789  
            }
790  
        }
790  
        }
791  
    } g{this};
791  
    } g{this};
792  

792  

793  
    storage_ = ::operator new(sizeof(S));
793  
    storage_ = ::operator new(sizeof(S));
794  
    sink_ = ::new(storage_) S(std::move(s));
794  
    sink_ = ::new(storage_) S(std::move(s));
795  

795  

796  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
796  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
797  

797  

798  
    g.committed = true;
798  
    g.committed = true;
799  
}
799  
}
800  

800  

801  
template<BufferSink S>
801  
template<BufferSink S>
802  
any_buffer_sink::any_buffer_sink(S* s)
802  
any_buffer_sink::any_buffer_sink(S* s)
803  
    : sink_(s)
803  
    : sink_(s)
804  
    , vt_(&vtable_for_impl<S>::value)
804  
    , vt_(&vtable_for_impl<S>::value)
805  
{
805  
{
806  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
806  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
807  
}
807  
}
808  

808  

809  
//----------------------------------------------------------
809  
//----------------------------------------------------------
810  

810  

811  
inline std::span<mutable_buffer>
811  
inline std::span<mutable_buffer>
812  
any_buffer_sink::prepare(std::span<mutable_buffer> dest)
812  
any_buffer_sink::prepare(std::span<mutable_buffer> dest)
813  
{
813  
{
814  
    return vt_->do_prepare(sink_, dest);
814  
    return vt_->do_prepare(sink_, dest);
815  
}
815  
}
816  

816  

817  
inline auto
817  
inline auto
818  
any_buffer_sink::commit(std::size_t n)
818  
any_buffer_sink::commit(std::size_t n)
819  
{
819  
{
820  
    struct awaitable
820  
    struct awaitable
821  
    {
821  
    {
822  
        any_buffer_sink* self_;
822  
        any_buffer_sink* self_;
823  
        std::size_t n_;
823  
        std::size_t n_;
824  

824  

825  
        bool
825  
        bool
826  
        await_ready()
826  
        await_ready()
827  
        {
827  
        {
828  
            self_->active_ops_ = self_->vt_->construct_commit_awaitable(
828  
            self_->active_ops_ = self_->vt_->construct_commit_awaitable(
829  
                self_->sink_,
829  
                self_->sink_,
830  
                self_->cached_awaitable_,
830  
                self_->cached_awaitable_,
831  
                n_);
831  
                n_);
832  
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
832  
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
833  
        }
833  
        }
834  

834  

835  
        coro
835  
        coro
836 -
        await_suspend(coro h, executor_ref ex, std::stop_token token)
836 +
        await_suspend(coro h, executor_ref const& ex, std::stop_token const& token)
837  
        {
837  
        {
838  
            return self_->active_ops_->await_suspend(
838  
            return self_->active_ops_->await_suspend(
839  
                self_->cached_awaitable_, h, ex, token);
839  
                self_->cached_awaitable_, h, ex, token);
840  
        }
840  
        }
841  

841  

842  
        io_result<>
842  
        io_result<>
843  
        await_resume()
843  
        await_resume()
844  
        {
844  
        {
845  
            struct guard {
845  
            struct guard {
846  
                any_buffer_sink* self;
846  
                any_buffer_sink* self;
847  
                ~guard() {
847  
                ~guard() {
848  
                    self->active_ops_->destroy(self->cached_awaitable_);
848  
                    self->active_ops_->destroy(self->cached_awaitable_);
849  
                    self->active_ops_ = nullptr;
849  
                    self->active_ops_ = nullptr;
850  
                }
850  
                }
851  
            } g{self_};
851  
            } g{self_};
852  
            return self_->active_ops_->await_resume(
852  
            return self_->active_ops_->await_resume(
853  
                self_->cached_awaitable_);
853  
                self_->cached_awaitable_);
854  
        }
854  
        }
855  
    };
855  
    };
856  
    return awaitable{this, n};
856  
    return awaitable{this, n};
857  
}
857  
}
858  

858  

859  
inline auto
859  
inline auto
860  
any_buffer_sink::commit_eof(std::size_t n)
860  
any_buffer_sink::commit_eof(std::size_t n)
861  
{
861  
{
862  
    struct awaitable
862  
    struct awaitable
863  
    {
863  
    {
864  
        any_buffer_sink* self_;
864  
        any_buffer_sink* self_;
865  
        std::size_t n_;
865  
        std::size_t n_;
866  

866  

867  
        bool
867  
        bool
868  
        await_ready()
868  
        await_ready()
869  
        {
869  
        {
870  
            self_->active_ops_ = self_->vt_->construct_commit_eof_awaitable(
870  
            self_->active_ops_ = self_->vt_->construct_commit_eof_awaitable(
871  
                self_->sink_,
871  
                self_->sink_,
872  
                self_->cached_awaitable_,
872  
                self_->cached_awaitable_,
873  
                n_);
873  
                n_);
874  
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
874  
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
875  
        }
875  
        }
876  

876  

877  
        coro
877  
        coro
878 -
        await_suspend(coro h, executor_ref ex, std::stop_token token)
878 +
        await_suspend(coro h, executor_ref const& ex, std::stop_token const& token)
879  
        {
879  
        {
880  
            return self_->active_ops_->await_suspend(
880  
            return self_->active_ops_->await_suspend(
881  
                self_->cached_awaitable_, h, ex, token);
881  
                self_->cached_awaitable_, h, ex, token);
882  
        }
882  
        }
883  

883  

884  
        io_result<>
884  
        io_result<>
885  
        await_resume()
885  
        await_resume()
886  
        {
886  
        {
887  
            struct guard {
887  
            struct guard {
888  
                any_buffer_sink* self;
888  
                any_buffer_sink* self;
889  
                ~guard() {
889  
                ~guard() {
890  
                    self->active_ops_->destroy(self->cached_awaitable_);
890  
                    self->active_ops_->destroy(self->cached_awaitable_);
891  
                    self->active_ops_ = nullptr;
891  
                    self->active_ops_ = nullptr;
892  
                }
892  
                }
893  
            } g{self_};
893  
            } g{self_};
894  
            return self_->active_ops_->await_resume(
894  
            return self_->active_ops_->await_resume(
895  
                self_->cached_awaitable_);
895  
                self_->cached_awaitable_);
896  
        }
896  
        }
897  
    };
897  
    };
898  
    return awaitable{this, n};
898  
    return awaitable{this, n};
899  
}
899  
}
900  

900  

901  
//----------------------------------------------------------
901  
//----------------------------------------------------------
902  
// Private helpers for native WriteSink forwarding
902  
// Private helpers for native WriteSink forwarding
903  

903  

904  
inline auto
904  
inline auto
905  
any_buffer_sink::write_some_(
905  
any_buffer_sink::write_some_(
906  
    std::span<const_buffer const> buffers)
906  
    std::span<const_buffer const> buffers)
907  
{
907  
{
908  
    struct awaitable
908  
    struct awaitable
909  
    {
909  
    {
910  
        any_buffer_sink* self_;
910  
        any_buffer_sink* self_;
911  
        std::span<const_buffer const> buffers_;
911  
        std::span<const_buffer const> buffers_;
912  

912  

913  
        bool
913  
        bool
914  
        await_ready() const noexcept
914  
        await_ready() const noexcept
915  
        {
915  
        {
916  
            return false;
916  
            return false;
917  
        }
917  
        }
918  

918  

919  
        coro
919  
        coro
920 -
        await_suspend(coro h, executor_ref ex, std::stop_token token)
920 +
        await_suspend(coro h, executor_ref const& ex, std::stop_token const& token)
921  
        {
921  
        {
922  
            self_->active_write_ops_ =
922  
            self_->active_write_ops_ =
923  
                self_->vt_->construct_write_some_awaitable(
923  
                self_->vt_->construct_write_some_awaitable(
924  
                    self_->sink_,
924  
                    self_->sink_,
925  
                    self_->cached_awaitable_,
925  
                    self_->cached_awaitable_,
926  
                    buffers_);
926  
                    buffers_);
927  

927  

928  
            if(self_->active_write_ops_->await_ready(
928  
            if(self_->active_write_ops_->await_ready(
929  
                self_->cached_awaitable_))
929  
                self_->cached_awaitable_))
930  
                return h;
930  
                return h;
931  

931  

932  
            return self_->active_write_ops_->await_suspend(
932  
            return self_->active_write_ops_->await_suspend(
933  
                self_->cached_awaitable_, h, ex, token);
933  
                self_->cached_awaitable_, h, ex, token);
934  
        }
934  
        }
935  

935  

936  
        io_result<std::size_t>
936  
        io_result<std::size_t>
937  
        await_resume()
937  
        await_resume()
938  
        {
938  
        {
939  
            struct guard {
939  
            struct guard {
940  
                any_buffer_sink* self;
940  
                any_buffer_sink* self;
941  
                ~guard() {
941  
                ~guard() {
942  
                    self->active_write_ops_->destroy(
942  
                    self->active_write_ops_->destroy(
943  
                        self->cached_awaitable_);
943  
                        self->cached_awaitable_);
944  
                    self->active_write_ops_ = nullptr;
944  
                    self->active_write_ops_ = nullptr;
945  
                }
945  
                }
946  
            } g{self_};
946  
            } g{self_};
947  
            return self_->active_write_ops_->await_resume(
947  
            return self_->active_write_ops_->await_resume(
948  
                self_->cached_awaitable_);
948  
                self_->cached_awaitable_);
949  
        }
949  
        }
950  
    };
950  
    };
951  
    return awaitable{this, buffers};
951  
    return awaitable{this, buffers};
952  
}
952  
}
953  

953  

954  
inline auto
954  
inline auto
955  
any_buffer_sink::write_(
955  
any_buffer_sink::write_(
956  
    std::span<const_buffer const> buffers)
956  
    std::span<const_buffer const> buffers)
957  
{
957  
{
958  
    struct awaitable
958  
    struct awaitable
959  
    {
959  
    {
960  
        any_buffer_sink* self_;
960  
        any_buffer_sink* self_;
961  
        std::span<const_buffer const> buffers_;
961  
        std::span<const_buffer const> buffers_;
962  

962  

963  
        bool
963  
        bool
964  
        await_ready() const noexcept
964  
        await_ready() const noexcept
965  
        {
965  
        {
966  
            return false;
966  
            return false;
967  
        }
967  
        }
968  

968  

969  
        coro
969  
        coro
970 -
        await_suspend(coro h, executor_ref ex, std::stop_token token)
970 +
        await_suspend(coro h, executor_ref const& ex, std::stop_token const& token)
971  
        {
971  
        {
972  
            self_->active_write_ops_ =
972  
            self_->active_write_ops_ =
973  
                self_->vt_->construct_write_awaitable(
973  
                self_->vt_->construct_write_awaitable(
974  
                    self_->sink_,
974  
                    self_->sink_,
975  
                    self_->cached_awaitable_,
975  
                    self_->cached_awaitable_,
976  
                    buffers_);
976  
                    buffers_);
977  

977  

978  
            if(self_->active_write_ops_->await_ready(
978  
            if(self_->active_write_ops_->await_ready(
979  
                self_->cached_awaitable_))
979  
                self_->cached_awaitable_))
980  
                return h;
980  
                return h;
981  

981  

982  
            return self_->active_write_ops_->await_suspend(
982  
            return self_->active_write_ops_->await_suspend(
983  
                self_->cached_awaitable_, h, ex, token);
983  
                self_->cached_awaitable_, h, ex, token);
984  
        }
984  
        }
985  

985  

986  
        io_result<std::size_t>
986  
        io_result<std::size_t>
987  
        await_resume()
987  
        await_resume()
988  
        {
988  
        {
989  
            struct guard {
989  
            struct guard {
990  
                any_buffer_sink* self;
990  
                any_buffer_sink* self;
991  
                ~guard() {
991  
                ~guard() {
992  
                    self->active_write_ops_->destroy(
992  
                    self->active_write_ops_->destroy(
993  
                        self->cached_awaitable_);
993  
                        self->cached_awaitable_);
994  
                    self->active_write_ops_ = nullptr;
994  
                    self->active_write_ops_ = nullptr;
995  
                }
995  
                }
996  
            } g{self_};
996  
            } g{self_};
997  
            return self_->active_write_ops_->await_resume(
997  
            return self_->active_write_ops_->await_resume(
998  
                self_->cached_awaitable_);
998  
                self_->cached_awaitable_);
999  
        }
999  
        }
1000  
    };
1000  
    };
1001  
    return awaitable{this, buffers};
1001  
    return awaitable{this, buffers};
1002  
}
1002  
}
1003  

1003  

1004  
inline auto
1004  
inline auto
1005  
any_buffer_sink::write_eof_buffers_(
1005  
any_buffer_sink::write_eof_buffers_(
1006  
    std::span<const_buffer const> buffers)
1006  
    std::span<const_buffer const> buffers)
1007  
{
1007  
{
1008  
    struct awaitable
1008  
    struct awaitable
1009  
    {
1009  
    {
1010  
        any_buffer_sink* self_;
1010  
        any_buffer_sink* self_;
1011  
        std::span<const_buffer const> buffers_;
1011  
        std::span<const_buffer const> buffers_;
1012  

1012  

1013  
        bool
1013  
        bool
1014  
        await_ready() const noexcept
1014  
        await_ready() const noexcept
1015  
        {
1015  
        {
1016  
            return false;
1016  
            return false;
1017  
        }
1017  
        }
1018  

1018  

1019  
        coro
1019  
        coro
1020 -
        await_suspend(coro h, executor_ref ex, std::stop_token token)
1020 +
        await_suspend(coro h, executor_ref const& ex, std::stop_token const& token)
1021  
        {
1021  
        {
1022  
            self_->active_write_ops_ =
1022  
            self_->active_write_ops_ =
1023  
                self_->vt_->construct_write_eof_buffers_awaitable(
1023  
                self_->vt_->construct_write_eof_buffers_awaitable(
1024  
                    self_->sink_,
1024  
                    self_->sink_,
1025  
                    self_->cached_awaitable_,
1025  
                    self_->cached_awaitable_,
1026  
                    buffers_);
1026  
                    buffers_);
1027  

1027  

1028  
            if(self_->active_write_ops_->await_ready(
1028  
            if(self_->active_write_ops_->await_ready(
1029  
                self_->cached_awaitable_))
1029  
                self_->cached_awaitable_))
1030  
                return h;
1030  
                return h;
1031  

1031  

1032  
            return self_->active_write_ops_->await_suspend(
1032  
            return self_->active_write_ops_->await_suspend(
1033  
                self_->cached_awaitable_, h, ex, token);
1033  
                self_->cached_awaitable_, h, ex, token);
1034  
        }
1034  
        }
1035  

1035  

1036  
        io_result<std::size_t>
1036  
        io_result<std::size_t>
1037  
        await_resume()
1037  
        await_resume()
1038  
        {
1038  
        {
1039  
            struct guard {
1039  
            struct guard {
1040  
                any_buffer_sink* self;
1040  
                any_buffer_sink* self;
1041  
                ~guard() {
1041  
                ~guard() {
1042  
                    self->active_write_ops_->destroy(
1042  
                    self->active_write_ops_->destroy(
1043  
                        self->cached_awaitable_);
1043  
                        self->cached_awaitable_);
1044  
                    self->active_write_ops_ = nullptr;
1044  
                    self->active_write_ops_ = nullptr;
1045  
                }
1045  
                }
1046  
            } g{self_};
1046  
            } g{self_};
1047  
            return self_->active_write_ops_->await_resume(
1047  
            return self_->active_write_ops_->await_resume(
1048  
                self_->cached_awaitable_);
1048  
                self_->cached_awaitable_);
1049  
        }
1049  
        }
1050  
    };
1050  
    };
1051  
    return awaitable{this, buffers};
1051  
    return awaitable{this, buffers};
1052  
}
1052  
}
1053  

1053  

1054  
//----------------------------------------------------------
1054  
//----------------------------------------------------------
1055  
// Public WriteSink methods
1055  
// Public WriteSink methods
1056  

1056  

1057  
template<ConstBufferSequence CB>
1057  
template<ConstBufferSequence CB>
1058  
io_task<std::size_t>
1058  
io_task<std::size_t>
1059  
any_buffer_sink::write_some(CB buffers)
1059  
any_buffer_sink::write_some(CB buffers)
1060  
{
1060  
{
1061  
    buffer_param<CB> bp(buffers);
1061  
    buffer_param<CB> bp(buffers);
1062  
    auto src = bp.data();
1062  
    auto src = bp.data();
1063  
    if(src.empty())
1063  
    if(src.empty())
1064  
        co_return {{}, 0};
1064  
        co_return {{}, 0};
1065  

1065  

1066  
    // Native WriteSink path
1066  
    // Native WriteSink path
1067  
    if(vt_->construct_write_some_awaitable)
1067  
    if(vt_->construct_write_some_awaitable)
1068  
        co_return co_await write_some_(src);
1068  
        co_return co_await write_some_(src);
1069  

1069  

1070  
    // Synthesized path: prepare + buffer_copy + commit
1070  
    // Synthesized path: prepare + buffer_copy + commit
1071  
    mutable_buffer arr[detail::max_iovec_];
1071  
    mutable_buffer arr[detail::max_iovec_];
1072  
    auto dst_bufs = prepare(arr);
1072  
    auto dst_bufs = prepare(arr);
1073  
    if(dst_bufs.empty())
1073  
    if(dst_bufs.empty())
1074  
    {
1074  
    {
1075  
        auto [ec] = co_await commit(0);
1075  
        auto [ec] = co_await commit(0);
1076  
        if(ec)
1076  
        if(ec)
1077  
            co_return {ec, 0};
1077  
            co_return {ec, 0};
1078  
        dst_bufs = prepare(arr);
1078  
        dst_bufs = prepare(arr);
1079  
        if(dst_bufs.empty())
1079  
        if(dst_bufs.empty())
1080  
            co_return {{}, 0};
1080  
            co_return {{}, 0};
1081  
    }
1081  
    }
1082  

1082  

1083  
    auto n = buffer_copy(dst_bufs, src);
1083  
    auto n = buffer_copy(dst_bufs, src);
1084  
    auto [ec] = co_await commit(n);
1084  
    auto [ec] = co_await commit(n);
1085  
    if(ec)
1085  
    if(ec)
1086  
        co_return {ec, 0};
1086  
        co_return {ec, 0};
1087  
    co_return {{}, n};
1087  
    co_return {{}, n};
1088  
}
1088  
}
1089  

1089  

1090  
template<ConstBufferSequence CB>
1090  
template<ConstBufferSequence CB>
1091  
io_task<std::size_t>
1091  
io_task<std::size_t>
1092  
any_buffer_sink::write(CB buffers)
1092  
any_buffer_sink::write(CB buffers)
1093  
{
1093  
{
1094  
    buffer_param<CB> bp(buffers);
1094  
    buffer_param<CB> bp(buffers);
1095  
    std::size_t total = 0;
1095  
    std::size_t total = 0;
1096  

1096  

1097  
    // Native WriteSink path
1097  
    // Native WriteSink path
1098  
    if(vt_->construct_write_awaitable)
1098  
    if(vt_->construct_write_awaitable)
1099  
    {
1099  
    {
1100  
        for(;;)
1100  
        for(;;)
1101  
        {
1101  
        {
1102  
            auto bufs = bp.data();
1102  
            auto bufs = bp.data();
1103  
            if(bufs.empty())
1103  
            if(bufs.empty())
1104  
                break;
1104  
                break;
1105  

1105  

1106  
            auto [ec, n] = co_await write_(bufs);
1106  
            auto [ec, n] = co_await write_(bufs);
1107  
            total += n;
1107  
            total += n;
1108  
            if(ec)
1108  
            if(ec)
1109  
                co_return {ec, total};
1109  
                co_return {ec, total};
1110  
            bp.consume(n);
1110  
            bp.consume(n);
1111  
        }
1111  
        }
1112  
        co_return {{}, total};
1112  
        co_return {{}, total};
1113  
    }
1113  
    }
1114  

1114  

1115  
    // Synthesized path: prepare + buffer_copy + commit
1115  
    // Synthesized path: prepare + buffer_copy + commit
1116  
    for(;;)
1116  
    for(;;)
1117  
    {
1117  
    {
1118  
        auto src = bp.data();
1118  
        auto src = bp.data();
1119  
        if(src.empty())
1119  
        if(src.empty())
1120  
            break;
1120  
            break;
1121  

1121  

1122  
        mutable_buffer arr[detail::max_iovec_];
1122  
        mutable_buffer arr[detail::max_iovec_];
1123  
        auto dst_bufs = prepare(arr);
1123  
        auto dst_bufs = prepare(arr);
1124  
        if(dst_bufs.empty())
1124  
        if(dst_bufs.empty())
1125  
        {
1125  
        {
1126  
            auto [ec] = co_await commit(0);
1126  
            auto [ec] = co_await commit(0);
1127  
            if(ec)
1127  
            if(ec)
1128  
                co_return {ec, total};
1128  
                co_return {ec, total};
1129  
            continue;
1129  
            continue;
1130  
        }
1130  
        }
1131  

1131  

1132  
        auto n = buffer_copy(dst_bufs, src);
1132  
        auto n = buffer_copy(dst_bufs, src);
1133  
        auto [ec] = co_await commit(n);
1133  
        auto [ec] = co_await commit(n);
1134  
        if(ec)
1134  
        if(ec)
1135  
            co_return {ec, total};
1135  
            co_return {ec, total};
1136  
        bp.consume(n);
1136  
        bp.consume(n);
1137  
        total += n;
1137  
        total += n;
1138  
    }
1138  
    }
1139  

1139  

1140  
    co_return {{}, total};
1140  
    co_return {{}, total};
1141  
}
1141  
}
1142  

1142  

1143  
inline auto
1143  
inline auto
1144  
any_buffer_sink::write_eof()
1144  
any_buffer_sink::write_eof()
1145  
{
1145  
{
1146  
    struct awaitable
1146  
    struct awaitable
1147  
    {
1147  
    {
1148  
        any_buffer_sink* self_;
1148  
        any_buffer_sink* self_;
1149  

1149  

1150  
        bool
1150  
        bool
1151  
        await_ready()
1151  
        await_ready()
1152  
        {
1152  
        {
1153  
            if(self_->vt_->construct_write_eof_awaitable)
1153  
            if(self_->vt_->construct_write_eof_awaitable)
1154  
            {
1154  
            {
1155  
                // Native WriteSink: forward to underlying write_eof()
1155  
                // Native WriteSink: forward to underlying write_eof()
1156  
                self_->active_ops_ =
1156  
                self_->active_ops_ =
1157  
                    self_->vt_->construct_write_eof_awaitable(
1157  
                    self_->vt_->construct_write_eof_awaitable(
1158  
                        self_->sink_,
1158  
                        self_->sink_,
1159  
                        self_->cached_awaitable_);
1159  
                        self_->cached_awaitable_);
1160  
            }
1160  
            }
1161  
            else
1161  
            else
1162  
            {
1162  
            {
1163  
                // Synthesized: commit_eof(0)
1163  
                // Synthesized: commit_eof(0)
1164  
                self_->active_ops_ =
1164  
                self_->active_ops_ =
1165  
                    self_->vt_->construct_commit_eof_awaitable(
1165  
                    self_->vt_->construct_commit_eof_awaitable(
1166  
                        self_->sink_,
1166  
                        self_->sink_,
1167  
                        self_->cached_awaitable_,
1167  
                        self_->cached_awaitable_,
1168  
                        0);
1168  
                        0);
1169  
            }
1169  
            }
1170  
            return self_->active_ops_->await_ready(
1170  
            return self_->active_ops_->await_ready(
1171  
                self_->cached_awaitable_);
1171  
                self_->cached_awaitable_);
1172  
        }
1172  
        }
1173  

1173  

1174  
        coro
1174  
        coro
1175 -
        await_suspend(coro h, executor_ref ex, std::stop_token token)
1175 +
        await_suspend(coro h, executor_ref const& ex, std::stop_token const& token)
1176  
        {
1176  
        {
1177  
            return self_->active_ops_->await_suspend(
1177  
            return self_->active_ops_->await_suspend(
1178  
                self_->cached_awaitable_, h, ex, token);
1178  
                self_->cached_awaitable_, h, ex, token);
1179  
        }
1179  
        }
1180  

1180  

1181  
        io_result<>
1181  
        io_result<>
1182  
        await_resume()
1182  
        await_resume()
1183  
        {
1183  
        {
1184  
            struct guard {
1184  
            struct guard {
1185  
                any_buffer_sink* self;
1185  
                any_buffer_sink* self;
1186  
                ~guard() {
1186  
                ~guard() {
1187  
                    self->active_ops_->destroy(self->cached_awaitable_);
1187  
                    self->active_ops_->destroy(self->cached_awaitable_);
1188  
                    self->active_ops_ = nullptr;
1188  
                    self->active_ops_ = nullptr;
1189  
                }
1189  
                }
1190  
            } g{self_};
1190  
            } g{self_};
1191  
            return self_->active_ops_->await_resume(
1191  
            return self_->active_ops_->await_resume(
1192  
                self_->cached_awaitable_);
1192  
                self_->cached_awaitable_);
1193  
        }
1193  
        }
1194  
    };
1194  
    };
1195  
    return awaitable{this};
1195  
    return awaitable{this};
1196  
}
1196  
}
1197  

1197  

1198  
template<ConstBufferSequence CB>
1198  
template<ConstBufferSequence CB>
1199  
io_task<std::size_t>
1199  
io_task<std::size_t>
1200  
any_buffer_sink::write_eof(CB buffers)
1200  
any_buffer_sink::write_eof(CB buffers)
1201  
{
1201  
{
1202  
    // Native WriteSink path
1202  
    // Native WriteSink path
1203  
    if(vt_->construct_write_eof_buffers_awaitable)
1203  
    if(vt_->construct_write_eof_buffers_awaitable)
1204  
    {
1204  
    {
1205  
        const_buffer_param<CB> bp(buffers);
1205  
        const_buffer_param<CB> bp(buffers);
1206  
        std::size_t total = 0;
1206  
        std::size_t total = 0;
1207  

1207  

1208  
        for(;;)
1208  
        for(;;)
1209  
        {
1209  
        {
1210  
            auto bufs = bp.data();
1210  
            auto bufs = bp.data();
1211  
            if(bufs.empty())
1211  
            if(bufs.empty())
1212  
            {
1212  
            {
1213  
                auto [ec] = co_await write_eof();
1213  
                auto [ec] = co_await write_eof();
1214  
                co_return {ec, total};
1214  
                co_return {ec, total};
1215  
            }
1215  
            }
1216  

1216  

1217  
            if(!bp.more())
1217  
            if(!bp.more())
1218  
            {
1218  
            {
1219  
                // Last window: send atomically with EOF
1219  
                // Last window: send atomically with EOF
1220  
                auto [ec, n] = co_await write_eof_buffers_(bufs);
1220  
                auto [ec, n] = co_await write_eof_buffers_(bufs);
1221  
                total += n;
1221  
                total += n;
1222  
                co_return {ec, total};
1222  
                co_return {ec, total};
1223  
            }
1223  
            }
1224  

1224  

1225  
            auto [ec, n] = co_await write_(bufs);
1225  
            auto [ec, n] = co_await write_(bufs);
1226  
            total += n;
1226  
            total += n;
1227  
            if(ec)
1227  
            if(ec)
1228  
                co_return {ec, total};
1228  
                co_return {ec, total};
1229  
            bp.consume(n);
1229  
            bp.consume(n);
1230  
        }
1230  
        }
1231  
    }
1231  
    }
1232  

1232  

1233  
    // Synthesized path: prepare + buffer_copy + commit + commit_eof
1233  
    // Synthesized path: prepare + buffer_copy + commit + commit_eof
1234  
    buffer_param<CB> bp(buffers);
1234  
    buffer_param<CB> bp(buffers);
1235  
    std::size_t total = 0;
1235  
    std::size_t total = 0;
1236  

1236  

1237  
    for(;;)
1237  
    for(;;)
1238  
    {
1238  
    {
1239  
        auto src = bp.data();
1239  
        auto src = bp.data();
1240  
        if(src.empty())
1240  
        if(src.empty())
1241  
            break;
1241  
            break;
1242  

1242  

1243  
        mutable_buffer arr[detail::max_iovec_];
1243  
        mutable_buffer arr[detail::max_iovec_];
1244  
        auto dst_bufs = prepare(arr);
1244  
        auto dst_bufs = prepare(arr);
1245  
        if(dst_bufs.empty())
1245  
        if(dst_bufs.empty())
1246  
        {
1246  
        {
1247  
            auto [ec] = co_await commit(0);
1247  
            auto [ec] = co_await commit(0);
1248  
            if(ec)
1248  
            if(ec)
1249  
                co_return {ec, total};
1249  
                co_return {ec, total};
1250  
            continue;
1250  
            continue;
1251  
        }
1251  
        }
1252  

1252  

1253  
        auto n = buffer_copy(dst_bufs, src);
1253  
        auto n = buffer_copy(dst_bufs, src);
1254  
        auto [ec] = co_await commit(n);
1254  
        auto [ec] = co_await commit(n);
1255  
        if(ec)
1255  
        if(ec)
1256  
            co_return {ec, total};
1256  
            co_return {ec, total};
1257  
        bp.consume(n);
1257  
        bp.consume(n);
1258  
        total += n;
1258  
        total += n;
1259  
    }
1259  
    }
1260  

1260  

1261  
    auto [ec] = co_await commit_eof(0);
1261  
    auto [ec] = co_await commit_eof(0);
1262  
    if(ec)
1262  
    if(ec)
1263  
        co_return {ec, total};
1263  
        co_return {ec, total};
1264  

1264  

1265  
    co_return {{}, total};
1265  
    co_return {{}, total};
1266  
}
1266  
}
1267  

1267  

1268  
//----------------------------------------------------------
1268  
//----------------------------------------------------------
1269  

1269  

1270  
static_assert(BufferSink<any_buffer_sink>);
1270  
static_assert(BufferSink<any_buffer_sink>);
1271  
static_assert(WriteSink<any_buffer_sink>);
1271  
static_assert(WriteSink<any_buffer_sink>);
1272  

1272  

1273  
} // namespace capy
1273  
} // namespace capy
1274  
} // namespace boost
1274  
} // namespace boost
1275  

1275  

1276  
#endif
1276  
#endif