1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
11  
#define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
11  
#define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_copy.hpp>
16  
#include <boost/capy/buffers/buffer_copy.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
18  
#include <boost/capy/buffers/slice.hpp>
18  
#include <boost/capy/buffers/slice.hpp>
19  
#include <boost/capy/concept/buffer_source.hpp>
19  
#include <boost/capy/concept/buffer_source.hpp>
20  
#include <boost/capy/concept/io_awaitable.hpp>
20  
#include <boost/capy/concept/io_awaitable.hpp>
21  
#include <boost/capy/concept/read_source.hpp>
21  
#include <boost/capy/concept/read_source.hpp>
22  
#include <boost/capy/coro.hpp>
22  
#include <boost/capy/coro.hpp>
23  
#include <boost/capy/error.hpp>
23  
#include <boost/capy/error.hpp>
24  
#include <boost/capy/ex/executor_ref.hpp>
24  
#include <boost/capy/ex/executor_ref.hpp>
25  
#include <boost/capy/io_result.hpp>
25  
#include <boost/capy/io_result.hpp>
26  
#include <boost/capy/io_task.hpp>
26  
#include <boost/capy/io_task.hpp>
27  

27  

28  
#include <concepts>
28  
#include <concepts>
29  
#include <coroutine>
29  
#include <coroutine>
30  
#include <cstddef>
30  
#include <cstddef>
31  
#include <exception>
31  
#include <exception>
32  
#include <new>
32  
#include <new>
33  
#include <span>
33  
#include <span>
34  
#include <stop_token>
34  
#include <stop_token>
35  
#include <system_error>
35  
#include <system_error>
36  
#include <utility>
36  
#include <utility>
37  

37  

38  
namespace boost {
38  
namespace boost {
39  
namespace capy {
39  
namespace capy {
40  

40  

41  
/** Type-erased wrapper for any BufferSource.
41  
/** Type-erased wrapper for any BufferSource.
42  

42  

43  
    This class provides type erasure for any type satisfying the
43  
    This class provides type erasure for any type satisfying the
44  
    @ref BufferSource concept, enabling runtime polymorphism for
44  
    @ref BufferSource concept, enabling runtime polymorphism for
45  
    buffer pull operations. It uses cached awaitable storage to achieve
45  
    buffer pull operations. It uses cached awaitable storage to achieve
46  
    zero steady-state allocation after construction.
46  
    zero steady-state allocation after construction.
47  

47  

48  
    The wrapper also satisfies @ref ReadSource. When the wrapped type
48  
    The wrapper also satisfies @ref ReadSource. When the wrapped type
49  
    satisfies only @ref BufferSource, the read operations are
49  
    satisfies only @ref BufferSource, the read operations are
50  
    synthesized using @ref pull and @ref consume with an extra
50  
    synthesized using @ref pull and @ref consume with an extra
51  
    buffer copy. When the wrapped type satisfies both @ref BufferSource
51  
    buffer copy. When the wrapped type satisfies both @ref BufferSource
52  
    and @ref ReadSource, the native read operations are forwarded
52  
    and @ref ReadSource, the native read operations are forwarded
53  
    directly across the virtual boundary, avoiding the copy.
53  
    directly across the virtual boundary, avoiding the copy.
54  

54  

55  
    The wrapper supports two construction modes:
55  
    The wrapper supports two construction modes:
56  
    - **Owning**: Pass by value to transfer ownership. The wrapper
56  
    - **Owning**: Pass by value to transfer ownership. The wrapper
57  
      allocates storage and owns the source.
57  
      allocates storage and owns the source.
58  
    - **Reference**: Pass a pointer to wrap without ownership. The
58  
    - **Reference**: Pass a pointer to wrap without ownership. The
59  
      pointed-to source must outlive this wrapper.
59  
      pointed-to source must outlive this wrapper.
60  

60  

61  
    Within each mode, the vtable is populated at compile time based
61  
    Within each mode, the vtable is populated at compile time based
62  
    on whether the wrapped type also satisfies @ref ReadSource:
62  
    on whether the wrapped type also satisfies @ref ReadSource:
63  
    - **BufferSource only**: @ref read_some and @ref read are
63  
    - **BufferSource only**: @ref read_some and @ref read are
64  
      synthesized from @ref pull and @ref consume, incurring one
64  
      synthesized from @ref pull and @ref consume, incurring one
65  
      buffer copy per operation.
65  
      buffer copy per operation.
66  
    - **BufferSource + ReadSource**: All read operations are
66  
    - **BufferSource + ReadSource**: All read operations are
67  
      forwarded natively through the type-erased boundary with
67  
      forwarded natively through the type-erased boundary with
68  
      no extra copy.
68  
      no extra copy.
69  

69  

70  
    @par Awaitable Preallocation
70  
    @par Awaitable Preallocation
71  
    The constructor preallocates storage for the type-erased awaitable.
71  
    The constructor preallocates storage for the type-erased awaitable.
72  
    This reserves all virtual address space at server startup
72  
    This reserves all virtual address space at server startup
73  
    so memory usage can be measured up front, rather than
73  
    so memory usage can be measured up front, rather than
74  
    allocating piecemeal as traffic arrives.
74  
    allocating piecemeal as traffic arrives.
75  

75  

76  
    @par Thread Safety
76  
    @par Thread Safety
77  
    Not thread-safe. Concurrent operations on the same wrapper
77  
    Not thread-safe. Concurrent operations on the same wrapper
78  
    are undefined behavior.
78  
    are undefined behavior.
79  

79  

80  
    @par Example
80  
    @par Example
81  
    @code
81  
    @code
82  
    // Owning - takes ownership of the source
82  
    // Owning - takes ownership of the source
83  
    any_buffer_source abs(some_buffer_source{args...});
83  
    any_buffer_source abs(some_buffer_source{args...});
84  

84  

85  
    // Reference - wraps without ownership
85  
    // Reference - wraps without ownership
86  
    some_buffer_source src;
86  
    some_buffer_source src;
87  
    any_buffer_source abs(&src);
87  
    any_buffer_source abs(&src);
88  

88  

89  
    const_buffer arr[16];
89  
    const_buffer arr[16];
90  
    auto [ec, bufs] = co_await abs.pull(arr);
90  
    auto [ec, bufs] = co_await abs.pull(arr);
91  

91  

92  
    // ReadSource interface also available
92  
    // ReadSource interface also available
93  
    char buf[64];
93  
    char buf[64];
94  
    auto [ec2, n] = co_await abs.read_some(mutable_buffer(buf, 64));
94  
    auto [ec2, n] = co_await abs.read_some(mutable_buffer(buf, 64));
95  
    @endcode
95  
    @endcode
96  

96  

97  
    @see any_buffer_sink, BufferSource, ReadSource
97  
    @see any_buffer_sink, BufferSource, ReadSource
98  
*/
98  
*/
99  
class any_buffer_source
99  
class any_buffer_source
100  
{
100  
{
101  
    struct vtable;
101  
    struct vtable;
102  
    struct awaitable_ops;
102  
    struct awaitable_ops;
103  
    struct read_awaitable_ops;
103  
    struct read_awaitable_ops;
104  

104  

105  
    template<BufferSource S>
105  
    template<BufferSource S>
106  
    struct vtable_for_impl;
106  
    struct vtable_for_impl;
107  

107  

108  
    // hot-path members first for cache locality
108  
    // hot-path members first for cache locality
109  
    void* source_ = nullptr;
109  
    void* source_ = nullptr;
110  
    vtable const* vt_ = nullptr;
110  
    vtable const* vt_ = nullptr;
111  
    void* cached_awaitable_ = nullptr;
111  
    void* cached_awaitable_ = nullptr;
112  
    awaitable_ops const* active_ops_ = nullptr;
112  
    awaitable_ops const* active_ops_ = nullptr;
113  
    read_awaitable_ops const* active_read_ops_ = nullptr;
113  
    read_awaitable_ops const* active_read_ops_ = nullptr;
114  
    void* storage_ = nullptr;
114  
    void* storage_ = nullptr;
115  

115  

116  
public:
116  
public:
117  
    /** Destructor.
117  
    /** Destructor.
118  

118  

119  
        Destroys the owned source (if any) and releases the cached
119  
        Destroys the owned source (if any) and releases the cached
120  
        awaitable storage.
120  
        awaitable storage.
121  
    */
121  
    */
122  
    ~any_buffer_source();
122  
    ~any_buffer_source();
123  

123  

124  
    /** Default constructor.
124  
    /** Default constructor.
125  

125  

126  
        Constructs an empty wrapper. Operations on a default-constructed
126  
        Constructs an empty wrapper. Operations on a default-constructed
127  
        wrapper result in undefined behavior.
127  
        wrapper result in undefined behavior.
128  
    */
128  
    */
129  
    any_buffer_source() = default;
129  
    any_buffer_source() = default;
130  

130  

131  
    /** Non-copyable.
131  
    /** Non-copyable.
132  

132  

133  
        The awaitable cache is per-instance and cannot be shared.
133  
        The awaitable cache is per-instance and cannot be shared.
134  
    */
134  
    */
135  
    any_buffer_source(any_buffer_source const&) = delete;
135  
    any_buffer_source(any_buffer_source const&) = delete;
136  
    any_buffer_source& operator=(any_buffer_source const&) = delete;
136  
    any_buffer_source& operator=(any_buffer_source const&) = delete;
137  

137  

138  
    /** Move constructor.
138  
    /** Move constructor.
139  

139  

140  
        Transfers ownership of the wrapped source (if owned) and
140  
        Transfers ownership of the wrapped source (if owned) and
141  
        cached awaitable storage from `other`. After the move, `other` is
141  
        cached awaitable storage from `other`. After the move, `other` is
142  
        in a default-constructed state.
142  
        in a default-constructed state.
143  

143  

144  
        @param other The wrapper to move from.
144  
        @param other The wrapper to move from.
145  
    */
145  
    */
146  
    any_buffer_source(any_buffer_source&& other) noexcept
146  
    any_buffer_source(any_buffer_source&& other) noexcept
147  
        : source_(std::exchange(other.source_, nullptr))
147  
        : source_(std::exchange(other.source_, nullptr))
148  
        , vt_(std::exchange(other.vt_, nullptr))
148  
        , vt_(std::exchange(other.vt_, nullptr))
149  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
149  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
150  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
150  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
151  
        , active_read_ops_(std::exchange(other.active_read_ops_, nullptr))
151  
        , active_read_ops_(std::exchange(other.active_read_ops_, nullptr))
152  
        , storage_(std::exchange(other.storage_, nullptr))
152  
        , storage_(std::exchange(other.storage_, nullptr))
153  
    {
153  
    {
154  
    }
154  
    }
155  

155  

156  
    /** Move assignment operator.
156  
    /** Move assignment operator.
157  

157  

158  
        Destroys any owned source and releases existing resources,
158  
        Destroys any owned source and releases existing resources,
159  
        then transfers ownership from `other`.
159  
        then transfers ownership from `other`.
160  

160  

161  
        @param other The wrapper to move from.
161  
        @param other The wrapper to move from.
162  
        @return Reference to this wrapper.
162  
        @return Reference to this wrapper.
163  
    */
163  
    */
164  
    any_buffer_source&
164  
    any_buffer_source&
165  
    operator=(any_buffer_source&& other) noexcept;
165  
    operator=(any_buffer_source&& other) noexcept;
166  

166  

167  
    /** Construct by taking ownership of a BufferSource.
167  
    /** Construct by taking ownership of a BufferSource.
168  

168  

169  
        Allocates storage and moves the source into this wrapper.
169  
        Allocates storage and moves the source into this wrapper.
170  
        The wrapper owns the source and will destroy it. If `S` also
170  
        The wrapper owns the source and will destroy it. If `S` also
171  
        satisfies @ref ReadSource, native read operations are
171  
        satisfies @ref ReadSource, native read operations are
172  
        forwarded through the virtual boundary.
172  
        forwarded through the virtual boundary.
173  

173  

174  
        @param s The source to take ownership of.
174  
        @param s The source to take ownership of.
175  
    */
175  
    */
176  
    template<BufferSource S>
176  
    template<BufferSource S>
177  
        requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
177  
        requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
178  
    any_buffer_source(S s);
178  
    any_buffer_source(S s);
179  

179  

180  
    /** Construct by wrapping a BufferSource without ownership.
180  
    /** Construct by wrapping a BufferSource without ownership.
181  

181  

182  
        Wraps the given source by pointer. The source must remain
182  
        Wraps the given source by pointer. The source must remain
183  
        valid for the lifetime of this wrapper. If `S` also
183  
        valid for the lifetime of this wrapper. If `S` also
184  
        satisfies @ref ReadSource, native read operations are
184  
        satisfies @ref ReadSource, native read operations are
185  
        forwarded through the virtual boundary.
185  
        forwarded through the virtual boundary.
186  

186  

187  
        @param s Pointer to the source to wrap.
187  
        @param s Pointer to the source to wrap.
188  
    */
188  
    */
189  
    template<BufferSource S>
189  
    template<BufferSource S>
190  
    any_buffer_source(S* s);
190  
    any_buffer_source(S* s);
191  

191  

192  
    /** Check if the wrapper contains a valid source.
192  
    /** Check if the wrapper contains a valid source.
193  

193  

194  
        @return `true` if wrapping a source, `false` if default-constructed
194  
        @return `true` if wrapping a source, `false` if default-constructed
195  
            or moved-from.
195  
            or moved-from.
196  
    */
196  
    */
197  
    bool
197  
    bool
198  
    has_value() const noexcept
198  
    has_value() const noexcept
199  
    {
199  
    {
200  
        return source_ != nullptr;
200  
        return source_ != nullptr;
201  
    }
201  
    }
202  

202  

203  
    /** Check if the wrapper contains a valid source.
203  
    /** Check if the wrapper contains a valid source.
204  

204  

205  
        @return `true` if wrapping a source, `false` if default-constructed
205  
        @return `true` if wrapping a source, `false` if default-constructed
206  
            or moved-from.
206  
            or moved-from.
207  
    */
207  
    */
208  
    explicit
208  
    explicit
209  
    operator bool() const noexcept
209  
    operator bool() const noexcept
210  
    {
210  
    {
211  
        return has_value();
211  
        return has_value();
212  
    }
212  
    }
213  

213  

214  
    /** Consume bytes from the source.
214  
    /** Consume bytes from the source.
215  

215  

216  
        Advances the internal read position of the underlying source
216  
        Advances the internal read position of the underlying source
217  
        by the specified number of bytes. The next call to @ref pull
217  
        by the specified number of bytes. The next call to @ref pull
218  
        returns data starting after the consumed bytes.
218  
        returns data starting after the consumed bytes.
219  

219  

220  
        @param n The number of bytes to consume. Must not exceed the
220  
        @param n The number of bytes to consume. Must not exceed the
221  
        total size of buffers returned by the previous @ref pull.
221  
        total size of buffers returned by the previous @ref pull.
222  

222  

223  
        @par Preconditions
223  
        @par Preconditions
224  
        The wrapper must contain a valid source (`has_value() == true`).
224  
        The wrapper must contain a valid source (`has_value() == true`).
225  
    */
225  
    */
226  
    void
226  
    void
227  
    consume(std::size_t n) noexcept;
227  
    consume(std::size_t n) noexcept;
228  

228  

229  
    /** Pull buffer data from the source.
229  
    /** Pull buffer data from the source.
230  

230  

231  
        Fills the provided span with buffer descriptors from the
231  
        Fills the provided span with buffer descriptors from the
232  
        underlying source. The operation completes when data is
232  
        underlying source. The operation completes when data is
233  
        available, the source is exhausted, or an error occurs.
233  
        available, the source is exhausted, or an error occurs.
234  

234  

235  
        @param dest Span of const_buffer to fill.
235  
        @param dest Span of const_buffer to fill.
236  

236  

237  
        @return An awaitable yielding `(error_code,std::span<const_buffer>)`.
237  
        @return An awaitable yielding `(error_code,std::span<const_buffer>)`.
238  
            On success with data, a non-empty span of filled buffers.
238  
            On success with data, a non-empty span of filled buffers.
239  
            On EOF, `ec == cond::eof` and span is empty.
239  
            On EOF, `ec == cond::eof` and span is empty.
240  

240  

241  
        @par Preconditions
241  
        @par Preconditions
242  
        The wrapper must contain a valid source (`has_value() == true`).
242  
        The wrapper must contain a valid source (`has_value() == true`).
243  
        The caller must not call this function again after a prior
243  
        The caller must not call this function again after a prior
244  
        call returned an error.
244  
        call returned an error.
245  
    */
245  
    */
246  
    auto
246  
    auto
247  
    pull(std::span<const_buffer> dest);
247  
    pull(std::span<const_buffer> dest);
248  

248  

249  
    /** Read some data into a mutable buffer sequence.
249  
    /** Read some data into a mutable buffer sequence.
250  

250  

251  
        Reads one or more bytes into the caller's buffers. May fill
251  
        Reads one or more bytes into the caller's buffers. May fill
252  
        less than the full sequence.
252  
        less than the full sequence.
253  

253  

254  
        When the wrapped type provides native @ref ReadSource support,
254  
        When the wrapped type provides native @ref ReadSource support,
255  
        the operation forwards directly. Otherwise it is synthesized
255  
        the operation forwards directly. Otherwise it is synthesized
256  
        from @ref pull, @ref buffer_copy, and @ref consume.
256  
        from @ref pull, @ref buffer_copy, and @ref consume.
257  

257  

258  
        @param buffers The buffer sequence to fill.
258  
        @param buffers The buffer sequence to fill.
259  

259  

260  
        @return An awaitable yielding `(error_code,std::size_t)`.
260  
        @return An awaitable yielding `(error_code,std::size_t)`.
261  

261  

262  
        @par Preconditions
262  
        @par Preconditions
263  
        The wrapper must contain a valid source (`has_value() == true`).
263  
        The wrapper must contain a valid source (`has_value() == true`).
264  
        The caller must not call this function again after a prior
264  
        The caller must not call this function again after a prior
265  
        call returned an error (including EOF).
265  
        call returned an error (including EOF).
266  

266  

267  
        @see pull, consume
267  
        @see pull, consume
268  
    */
268  
    */
269  
    template<MutableBufferSequence MB>
269  
    template<MutableBufferSequence MB>
270  
    io_task<std::size_t>
270  
    io_task<std::size_t>
271  
    read_some(MB buffers);
271  
    read_some(MB buffers);
272  

272  

273  
    /** Read data into a mutable buffer sequence.
273  
    /** Read data into a mutable buffer sequence.
274  

274  

275  
        Fills the provided buffer sequence completely. When the
275  
        Fills the provided buffer sequence completely. When the
276  
        wrapped type provides native @ref ReadSource support, each
276  
        wrapped type provides native @ref ReadSource support, each
277  
        window is forwarded directly. Otherwise the data is
277  
        window is forwarded directly. Otherwise the data is
278  
        synthesized from @ref pull, @ref buffer_copy, and @ref consume.
278  
        synthesized from @ref pull, @ref buffer_copy, and @ref consume.
279  

279  

280  
        @param buffers The buffer sequence to fill.
280  
        @param buffers The buffer sequence to fill.
281  

281  

282  
        @return An awaitable yielding `(error_code,std::size_t)`.
282  
        @return An awaitable yielding `(error_code,std::size_t)`.
283  
            On success, `n == buffer_size(buffers)`.
283  
            On success, `n == buffer_size(buffers)`.
284  
            On EOF, `ec == error::eof` and `n` is bytes transferred.
284  
            On EOF, `ec == error::eof` and `n` is bytes transferred.
285  

285  

286  
        @par Preconditions
286  
        @par Preconditions
287  
        The wrapper must contain a valid source (`has_value() == true`).
287  
        The wrapper must contain a valid source (`has_value() == true`).
288  
        The caller must not call this function again after a prior
288  
        The caller must not call this function again after a prior
289  
        call returned an error (including EOF).
289  
        call returned an error (including EOF).
290  

290  

291  
        @see pull, consume
291  
        @see pull, consume
292  
    */
292  
    */
293  
    template<MutableBufferSequence MB>
293  
    template<MutableBufferSequence MB>
294  
    io_task<std::size_t>
294  
    io_task<std::size_t>
295  
    read(MB buffers);
295  
    read(MB buffers);
296  

296  

297  
protected:
297  
protected:
298  
    /** Rebind to a new source after move.
298  
    /** Rebind to a new source after move.
299  

299  

300  
        Updates the internal pointer to reference a new source object.
300  
        Updates the internal pointer to reference a new source object.
301  
        Used by owning wrappers after move assignment when the owned
301  
        Used by owning wrappers after move assignment when the owned
302  
        object has moved to a new location.
302  
        object has moved to a new location.
303  

303  

304  
        @param new_source The new source to bind to. Must be the same
304  
        @param new_source The new source to bind to. Must be the same
305  
            type as the original source.
305  
            type as the original source.
306  

306  

307  
        @note Terminates if called with a source of different type
307  
        @note Terminates if called with a source of different type
308  
            than the original.
308  
            than the original.
309  
    */
309  
    */
310  
    template<BufferSource S>
310  
    template<BufferSource S>
311  
    void
311  
    void
312  
    rebind(S& new_source) noexcept
312  
    rebind(S& new_source) noexcept
313  
    {
313  
    {
314  
        if(vt_ != &vtable_for_impl<S>::value)
314  
        if(vt_ != &vtable_for_impl<S>::value)
315  
            std::terminate();
315  
            std::terminate();
316  
        source_ = &new_source;
316  
        source_ = &new_source;
317  
    }
317  
    }
318  

318  

319  
private:
319  
private:
320  
    /** Forward a partial read through the vtable.
320  
    /** Forward a partial read through the vtable.
321  

321  

322  
        Constructs the underlying `read_some` awaitable in
322  
        Constructs the underlying `read_some` awaitable in
323  
        cached storage and returns a type-erased awaitable.
323  
        cached storage and returns a type-erased awaitable.
324  
    */
324  
    */
325  
    auto
325  
    auto
326  
    read_some_(std::span<mutable_buffer const> buffers);
326  
    read_some_(std::span<mutable_buffer const> buffers);
327  

327  

328  
    /** Forward a complete read through the vtable.
328  
    /** Forward a complete read through the vtable.
329  

329  

330  
        Constructs the underlying `read` awaitable in
330  
        Constructs the underlying `read` awaitable in
331  
        cached storage and returns a type-erased awaitable.
331  
        cached storage and returns a type-erased awaitable.
332  
    */
332  
    */
333  
    auto
333  
    auto
334  
    read_(std::span<mutable_buffer const> buffers);
334  
    read_(std::span<mutable_buffer const> buffers);
335  
};
335  
};
336  

336  

337  
//----------------------------------------------------------
337  
//----------------------------------------------------------
338  

338  

339  
/** Type-erased ops for awaitables yielding `io_result<std::span<const_buffer>>`. */
339  
/** Type-erased ops for awaitables yielding `io_result<std::span<const_buffer>>`. */
340  
struct any_buffer_source::awaitable_ops
340  
struct any_buffer_source::awaitable_ops
341  
{
341  
{
342  
    bool (*await_ready)(void*);
342  
    bool (*await_ready)(void*);
343 -
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
343 +
    coro (*await_suspend)(void*, coro, executor_ref const&, std::stop_token const&);
344  
    io_result<std::span<const_buffer>> (*await_resume)(void*);
344  
    io_result<std::span<const_buffer>> (*await_resume)(void*);
345  
    void (*destroy)(void*) noexcept;
345  
    void (*destroy)(void*) noexcept;
346  
};
346  
};
347  

347  

348  
/** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
348  
/** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
349  
struct any_buffer_source::read_awaitable_ops
349  
struct any_buffer_source::read_awaitable_ops
350  
{
350  
{
351  
    bool (*await_ready)(void*);
351  
    bool (*await_ready)(void*);
352 -
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
352 +
    coro (*await_suspend)(void*, coro, executor_ref const&, std::stop_token const&);
353  
    io_result<std::size_t> (*await_resume)(void*);
353  
    io_result<std::size_t> (*await_resume)(void*);
354  
    void (*destroy)(void*) noexcept;
354  
    void (*destroy)(void*) noexcept;
355  
};
355  
};
356  

356  

357  
struct any_buffer_source::vtable
357  
struct any_buffer_source::vtable
358  
{
358  
{
359  
    // BufferSource ops (always populated)
359  
    // BufferSource ops (always populated)
360  
    void (*destroy)(void*) noexcept;
360  
    void (*destroy)(void*) noexcept;
361  
    void (*do_consume)(void* source, std::size_t n) noexcept;
361  
    void (*do_consume)(void* source, std::size_t n) noexcept;
362  
    std::size_t awaitable_size;
362  
    std::size_t awaitable_size;
363  
    std::size_t awaitable_align;
363  
    std::size_t awaitable_align;
364  
    awaitable_ops const* (*construct_awaitable)(
364  
    awaitable_ops const* (*construct_awaitable)(
365  
        void* source,
365  
        void* source,
366  
        void* storage,
366  
        void* storage,
367  
        std::span<const_buffer> dest);
367  
        std::span<const_buffer> dest);
368  

368  

369  
    // ReadSource forwarding (null when wrapped type is BufferSource-only)
369  
    // ReadSource forwarding (null when wrapped type is BufferSource-only)
370  
    read_awaitable_ops const* (*construct_read_some_awaitable)(
370  
    read_awaitable_ops const* (*construct_read_some_awaitable)(
371  
        void* source,
371  
        void* source,
372  
        void* storage,
372  
        void* storage,
373  
        std::span<mutable_buffer const> buffers);
373  
        std::span<mutable_buffer const> buffers);
374  
    read_awaitable_ops const* (*construct_read_awaitable)(
374  
    read_awaitable_ops const* (*construct_read_awaitable)(
375  
        void* source,
375  
        void* source,
376  
        void* storage,
376  
        void* storage,
377  
        std::span<mutable_buffer const> buffers);
377  
        std::span<mutable_buffer const> buffers);
378  
};
378  
};
379  

379  

380  
template<BufferSource S>
380  
template<BufferSource S>
381  
struct any_buffer_source::vtable_for_impl
381  
struct any_buffer_source::vtable_for_impl
382  
{
382  
{
383  
    using PullAwaitable = decltype(std::declval<S&>().pull(
383  
    using PullAwaitable = decltype(std::declval<S&>().pull(
384  
        std::declval<std::span<const_buffer>>()));
384  
        std::declval<std::span<const_buffer>>()));
385  

385  

386  
    static void
386  
    static void
387  
    do_destroy_impl(void* source) noexcept
387  
    do_destroy_impl(void* source) noexcept
388  
    {
388  
    {
389  
        static_cast<S*>(source)->~S();
389  
        static_cast<S*>(source)->~S();
390  
    }
390  
    }
391  

391  

392  
    static void
392  
    static void
393  
    do_consume_impl(void* source, std::size_t n) noexcept
393  
    do_consume_impl(void* source, std::size_t n) noexcept
394  
    {
394  
    {
395  
        static_cast<S*>(source)->consume(n);
395  
        static_cast<S*>(source)->consume(n);
396  
    }
396  
    }
397  

397  

398  
    static awaitable_ops const*
398  
    static awaitable_ops const*
399  
    construct_awaitable_impl(
399  
    construct_awaitable_impl(
400  
        void* source,
400  
        void* source,
401  
        void* storage,
401  
        void* storage,
402  
        std::span<const_buffer> dest)
402  
        std::span<const_buffer> dest)
403  
    {
403  
    {
404  
        auto& s = *static_cast<S*>(source);
404  
        auto& s = *static_cast<S*>(source);
405  
        ::new(storage) PullAwaitable(s.pull(dest));
405  
        ::new(storage) PullAwaitable(s.pull(dest));
406  

406  

407  
        static constexpr awaitable_ops ops = {
407  
        static constexpr awaitable_ops ops = {
408  
            +[](void* p) {
408  
            +[](void* p) {
409  
                return static_cast<PullAwaitable*>(p)->await_ready();
409  
                return static_cast<PullAwaitable*>(p)->await_ready();
410  
            },
410  
            },
411 -
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
411 +
            +[](void* p, coro h, executor_ref const& ex, std::stop_token const& token) {
412  
                return detail::call_await_suspend(
412  
                return detail::call_await_suspend(
413  
                    static_cast<PullAwaitable*>(p), h, ex, token);
413  
                    static_cast<PullAwaitable*>(p), h, ex, token);
414  
            },
414  
            },
415  
            +[](void* p) {
415  
            +[](void* p) {
416  
                return static_cast<PullAwaitable*>(p)->await_resume();
416  
                return static_cast<PullAwaitable*>(p)->await_resume();
417  
            },
417  
            },
418  
            +[](void* p) noexcept {
418  
            +[](void* p) noexcept {
419  
                static_cast<PullAwaitable*>(p)->~PullAwaitable();
419  
                static_cast<PullAwaitable*>(p)->~PullAwaitable();
420  
            }
420  
            }
421  
        };
421  
        };
422  
        return &ops;
422  
        return &ops;
423  
    }
423  
    }
424  

424  

425  
    //------------------------------------------------------
425  
    //------------------------------------------------------
426  
    // ReadSource forwarding (only instantiated when ReadSource<S>)
426  
    // ReadSource forwarding (only instantiated when ReadSource<S>)
427  

427  

428  
    static read_awaitable_ops const*
428  
    static read_awaitable_ops const*
429  
    construct_read_some_awaitable_impl(
429  
    construct_read_some_awaitable_impl(
430  
        void* source,
430  
        void* source,
431  
        void* storage,
431  
        void* storage,
432  
        std::span<mutable_buffer const> buffers)
432  
        std::span<mutable_buffer const> buffers)
433  
        requires ReadSource<S>
433  
        requires ReadSource<S>
434  
    {
434  
    {
435  
        using Aw = decltype(std::declval<S&>().read_some(
435  
        using Aw = decltype(std::declval<S&>().read_some(
436  
            std::span<mutable_buffer const>{}));
436  
            std::span<mutable_buffer const>{}));
437  
        auto& s = *static_cast<S*>(source);
437  
        auto& s = *static_cast<S*>(source);
438  
        ::new(storage) Aw(s.read_some(buffers));
438  
        ::new(storage) Aw(s.read_some(buffers));
439  

439  

440  
        static constexpr read_awaitable_ops ops = {
440  
        static constexpr read_awaitable_ops ops = {
441  
            +[](void* p) {
441  
            +[](void* p) {
442  
                return static_cast<Aw*>(p)->await_ready();
442  
                return static_cast<Aw*>(p)->await_ready();
443  
            },
443  
            },
444 -
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
444 +
            +[](void* p, coro h, executor_ref const& ex, std::stop_token const& token) {
445  
                return detail::call_await_suspend(
445  
                return detail::call_await_suspend(
446  
                    static_cast<Aw*>(p), h, ex, token);
446  
                    static_cast<Aw*>(p), h, ex, token);
447  
            },
447  
            },
448  
            +[](void* p) {
448  
            +[](void* p) {
449  
                return static_cast<Aw*>(p)->await_resume();
449  
                return static_cast<Aw*>(p)->await_resume();
450  
            },
450  
            },
451  
            +[](void* p) noexcept {
451  
            +[](void* p) noexcept {
452  
                static_cast<Aw*>(p)->~Aw();
452  
                static_cast<Aw*>(p)->~Aw();
453  
            }
453  
            }
454  
        };
454  
        };
455  
        return &ops;
455  
        return &ops;
456  
    }
456  
    }
457  

457  

458  
    static read_awaitable_ops const*
458  
    static read_awaitable_ops const*
459  
    construct_read_awaitable_impl(
459  
    construct_read_awaitable_impl(
460  
        void* source,
460  
        void* source,
461  
        void* storage,
461  
        void* storage,
462  
        std::span<mutable_buffer const> buffers)
462  
        std::span<mutable_buffer const> buffers)
463  
        requires ReadSource<S>
463  
        requires ReadSource<S>
464  
    {
464  
    {
465  
        using Aw = decltype(std::declval<S&>().read(
465  
        using Aw = decltype(std::declval<S&>().read(
466  
            std::span<mutable_buffer const>{}));
466  
            std::span<mutable_buffer const>{}));
467  
        auto& s = *static_cast<S*>(source);
467  
        auto& s = *static_cast<S*>(source);
468  
        ::new(storage) Aw(s.read(buffers));
468  
        ::new(storage) Aw(s.read(buffers));
469  

469  

470  
        static constexpr read_awaitable_ops ops = {
470  
        static constexpr read_awaitable_ops ops = {
471  
            +[](void* p) {
471  
            +[](void* p) {
472  
                return static_cast<Aw*>(p)->await_ready();
472  
                return static_cast<Aw*>(p)->await_ready();
473  
            },
473  
            },
474 -
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
474 +
            +[](void* p, coro h, executor_ref const& ex, std::stop_token const& token) {
475  
                return detail::call_await_suspend(
475  
                return detail::call_await_suspend(
476  
                    static_cast<Aw*>(p), h, ex, token);
476  
                    static_cast<Aw*>(p), h, ex, token);
477  
            },
477  
            },
478  
            +[](void* p) {
478  
            +[](void* p) {
479  
                return static_cast<Aw*>(p)->await_resume();
479  
                return static_cast<Aw*>(p)->await_resume();
480  
            },
480  
            },
481  
            +[](void* p) noexcept {
481  
            +[](void* p) noexcept {
482  
                static_cast<Aw*>(p)->~Aw();
482  
                static_cast<Aw*>(p)->~Aw();
483  
            }
483  
            }
484  
        };
484  
        };
485  
        return &ops;
485  
        return &ops;
486  
    }
486  
    }
487  

487  

488  
    //------------------------------------------------------
488  
    //------------------------------------------------------
489  

489  

490  
    static consteval std::size_t
490  
    static consteval std::size_t
491  
    compute_max_size() noexcept
491  
    compute_max_size() noexcept
492  
    {
492  
    {
493  
        std::size_t s = sizeof(PullAwaitable);
493  
        std::size_t s = sizeof(PullAwaitable);
494  
        if constexpr (ReadSource<S>)
494  
        if constexpr (ReadSource<S>)
495  
        {
495  
        {
496  
            using RS = decltype(std::declval<S&>().read_some(
496  
            using RS = decltype(std::declval<S&>().read_some(
497  
                std::span<mutable_buffer const>{}));
497  
                std::span<mutable_buffer const>{}));
498  
            using R = decltype(std::declval<S&>().read(
498  
            using R = decltype(std::declval<S&>().read(
499  
                std::span<mutable_buffer const>{}));
499  
                std::span<mutable_buffer const>{}));
500  

500  

501  
            if(sizeof(RS) > s) s = sizeof(RS);
501  
            if(sizeof(RS) > s) s = sizeof(RS);
502  
            if(sizeof(R) > s) s = sizeof(R);
502  
            if(sizeof(R) > s) s = sizeof(R);
503  
        }
503  
        }
504  
        return s;
504  
        return s;
505  
    }
505  
    }
506  

506  

507  
    static consteval std::size_t
507  
    static consteval std::size_t
508  
    compute_max_align() noexcept
508  
    compute_max_align() noexcept
509  
    {
509  
    {
510  
        std::size_t a = alignof(PullAwaitable);
510  
        std::size_t a = alignof(PullAwaitable);
511  
        if constexpr (ReadSource<S>)
511  
        if constexpr (ReadSource<S>)
512  
        {
512  
        {
513  
            using RS = decltype(std::declval<S&>().read_some(
513  
            using RS = decltype(std::declval<S&>().read_some(
514  
                std::span<mutable_buffer const>{}));
514  
                std::span<mutable_buffer const>{}));
515  
            using R = decltype(std::declval<S&>().read(
515  
            using R = decltype(std::declval<S&>().read(
516  
                std::span<mutable_buffer const>{}));
516  
                std::span<mutable_buffer const>{}));
517  

517  

518  
            if(alignof(RS) > a) a = alignof(RS);
518  
            if(alignof(RS) > a) a = alignof(RS);
519  
            if(alignof(R) > a) a = alignof(R);
519  
            if(alignof(R) > a) a = alignof(R);
520  
        }
520  
        }
521  
        return a;
521  
        return a;
522  
    }
522  
    }
523  

523  

524  
    static consteval vtable
524  
    static consteval vtable
525  
    make_vtable() noexcept
525  
    make_vtable() noexcept
526  
    {
526  
    {
527  
        vtable v{};
527  
        vtable v{};
528  
        v.destroy = &do_destroy_impl;
528  
        v.destroy = &do_destroy_impl;
529  
        v.do_consume = &do_consume_impl;
529  
        v.do_consume = &do_consume_impl;
530  
        v.awaitable_size = compute_max_size();
530  
        v.awaitable_size = compute_max_size();
531  
        v.awaitable_align = compute_max_align();
531  
        v.awaitable_align = compute_max_align();
532  
        v.construct_awaitable = &construct_awaitable_impl;
532  
        v.construct_awaitable = &construct_awaitable_impl;
533  
        v.construct_read_some_awaitable = nullptr;
533  
        v.construct_read_some_awaitable = nullptr;
534  
        v.construct_read_awaitable = nullptr;
534  
        v.construct_read_awaitable = nullptr;
535  

535  

536  
        if constexpr (ReadSource<S>)
536  
        if constexpr (ReadSource<S>)
537  
        {
537  
        {
538  
            v.construct_read_some_awaitable =
538  
            v.construct_read_some_awaitable =
539  
                &construct_read_some_awaitable_impl;
539  
                &construct_read_some_awaitable_impl;
540  
            v.construct_read_awaitable =
540  
            v.construct_read_awaitable =
541  
                &construct_read_awaitable_impl;
541  
                &construct_read_awaitable_impl;
542  
        }
542  
        }
543  
        return v;
543  
        return v;
544  
    }
544  
    }
545  

545  

546  
    static constexpr vtable value = make_vtable();
546  
    static constexpr vtable value = make_vtable();
547  
};
547  
};
548  

548  

549  
//----------------------------------------------------------
549  
//----------------------------------------------------------
550  

550  

551  
inline
551  
inline
552  
any_buffer_source::~any_buffer_source()
552  
any_buffer_source::~any_buffer_source()
553  
{
553  
{
554  
    if(storage_)
554  
    if(storage_)
555  
    {
555  
    {
556  
        vt_->destroy(source_);
556  
        vt_->destroy(source_);
557  
        ::operator delete(storage_);
557  
        ::operator delete(storage_);
558  
    }
558  
    }
559  
    if(cached_awaitable_)
559  
    if(cached_awaitable_)
560  
        ::operator delete(cached_awaitable_);
560  
        ::operator delete(cached_awaitable_);
561  
}
561  
}
562  

562  

563  
inline any_buffer_source&
563  
inline any_buffer_source&
564  
any_buffer_source::operator=(any_buffer_source&& other) noexcept
564  
any_buffer_source::operator=(any_buffer_source&& other) noexcept
565  
{
565  
{
566  
    if(this != &other)
566  
    if(this != &other)
567  
    {
567  
    {
568  
        if(storage_)
568  
        if(storage_)
569  
        {
569  
        {
570  
            vt_->destroy(source_);
570  
            vt_->destroy(source_);
571  
            ::operator delete(storage_);
571  
            ::operator delete(storage_);
572  
        }
572  
        }
573  
        if(cached_awaitable_)
573  
        if(cached_awaitable_)
574  
            ::operator delete(cached_awaitable_);
574  
            ::operator delete(cached_awaitable_);
575  
        source_ = std::exchange(other.source_, nullptr);
575  
        source_ = std::exchange(other.source_, nullptr);
576  
        vt_ = std::exchange(other.vt_, nullptr);
576  
        vt_ = std::exchange(other.vt_, nullptr);
577  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
577  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
578  
        storage_ = std::exchange(other.storage_, nullptr);
578  
        storage_ = std::exchange(other.storage_, nullptr);
579  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
579  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
580  
        active_read_ops_ = std::exchange(other.active_read_ops_, nullptr);
580  
        active_read_ops_ = std::exchange(other.active_read_ops_, nullptr);
581  
    }
581  
    }
582  
    return *this;
582  
    return *this;
583  
}
583  
}
584  

584  

585  
template<BufferSource S>
585  
template<BufferSource S>
586  
    requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
586  
    requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
587  
any_buffer_source::any_buffer_source(S s)
587  
any_buffer_source::any_buffer_source(S s)
588  
    : vt_(&vtable_for_impl<S>::value)
588  
    : vt_(&vtable_for_impl<S>::value)
589  
{
589  
{
590  
    struct guard {
590  
    struct guard {
591  
        any_buffer_source* self;
591  
        any_buffer_source* self;
592  
        bool committed = false;
592  
        bool committed = false;
593  
        ~guard() {
593  
        ~guard() {
594  
            if(!committed && self->storage_) {
594  
            if(!committed && self->storage_) {
595  
                self->vt_->destroy(self->source_);
595  
                self->vt_->destroy(self->source_);
596  
                ::operator delete(self->storage_);
596  
                ::operator delete(self->storage_);
597  
                self->storage_ = nullptr;
597  
                self->storage_ = nullptr;
598  
                self->source_ = nullptr;
598  
                self->source_ = nullptr;
599  
            }
599  
            }
600  
        }
600  
        }
601  
    } g{this};
601  
    } g{this};
602  

602  

603  
    storage_ = ::operator new(sizeof(S));
603  
    storage_ = ::operator new(sizeof(S));
604  
    source_ = ::new(storage_) S(std::move(s));
604  
    source_ = ::new(storage_) S(std::move(s));
605  

605  

606  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
606  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
607  

607  

608  
    g.committed = true;
608  
    g.committed = true;
609  
}
609  
}
610  

610  

611  
template<BufferSource S>
611  
template<BufferSource S>
612  
any_buffer_source::any_buffer_source(S* s)
612  
any_buffer_source::any_buffer_source(S* s)
613  
    : source_(s)
613  
    : source_(s)
614  
    , vt_(&vtable_for_impl<S>::value)
614  
    , vt_(&vtable_for_impl<S>::value)
615  
{
615  
{
616  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
616  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
617  
}
617  
}
618  

618  

619  
//----------------------------------------------------------
619  
//----------------------------------------------------------
620  

620  

621  
inline void
621  
inline void
622  
any_buffer_source::consume(std::size_t n) noexcept
622  
any_buffer_source::consume(std::size_t n) noexcept
623  
{
623  
{
624  
    vt_->do_consume(source_, n);
624  
    vt_->do_consume(source_, n);
625  
}
625  
}
626  

626  

627  
inline auto
627  
inline auto
628  
any_buffer_source::pull(std::span<const_buffer> dest)
628  
any_buffer_source::pull(std::span<const_buffer> dest)
629  
{
629  
{
630  
    struct awaitable
630  
    struct awaitable
631  
    {
631  
    {
632  
        any_buffer_source* self_;
632  
        any_buffer_source* self_;
633  
        std::span<const_buffer> dest_;
633  
        std::span<const_buffer> dest_;
634  

634  

635  
        bool
635  
        bool
636  
        await_ready()
636  
        await_ready()
637  
        {
637  
        {
638  
            self_->active_ops_ = self_->vt_->construct_awaitable(
638  
            self_->active_ops_ = self_->vt_->construct_awaitable(
639  
                self_->source_,
639  
                self_->source_,
640  
                self_->cached_awaitable_,
640  
                self_->cached_awaitable_,
641  
                dest_);
641  
                dest_);
642  
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
642  
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
643  
        }
643  
        }
644  

644  

645  
        coro
645  
        coro
646 -
        await_suspend(coro h, executor_ref ex, std::stop_token token)
646 +
        await_suspend(coro h, executor_ref const& ex, std::stop_token const& token)
647  
        {
647  
        {
648  
            return self_->active_ops_->await_suspend(
648  
            return self_->active_ops_->await_suspend(
649  
                self_->cached_awaitable_, h, ex, token);
649  
                self_->cached_awaitable_, h, ex, token);
650  
        }
650  
        }
651  

651  

652  
        io_result<std::span<const_buffer>>
652  
        io_result<std::span<const_buffer>>
653  
        await_resume()
653  
        await_resume()
654  
        {
654  
        {
655  
            struct guard {
655  
            struct guard {
656  
                any_buffer_source* self;
656  
                any_buffer_source* self;
657  
                ~guard() {
657  
                ~guard() {
658  
                    self->active_ops_->destroy(self->cached_awaitable_);
658  
                    self->active_ops_->destroy(self->cached_awaitable_);
659  
                    self->active_ops_ = nullptr;
659  
                    self->active_ops_ = nullptr;
660  
                }
660  
                }
661  
            } g{self_};
661  
            } g{self_};
662  
            return self_->active_ops_->await_resume(
662  
            return self_->active_ops_->await_resume(
663  
                self_->cached_awaitable_);
663  
                self_->cached_awaitable_);
664  
        }
664  
        }
665  
    };
665  
    };
666  
    return awaitable{this, dest};
666  
    return awaitable{this, dest};
667  
}
667  
}
668  

668  

669  
//----------------------------------------------------------
669  
//----------------------------------------------------------
670  
// Private helpers for native ReadSource forwarding
670  
// Private helpers for native ReadSource forwarding
671  

671  

672  
inline auto
672  
inline auto
673  
any_buffer_source::read_some_(
673  
any_buffer_source::read_some_(
674  
    std::span<mutable_buffer const> buffers)
674  
    std::span<mutable_buffer const> buffers)
675  
{
675  
{
676  
    struct awaitable
676  
    struct awaitable
677  
    {
677  
    {
678  
        any_buffer_source* self_;
678  
        any_buffer_source* self_;
679  
        std::span<mutable_buffer const> buffers_;
679  
        std::span<mutable_buffer const> buffers_;
680  

680  

681  
        bool
681  
        bool
682  
        await_ready() const noexcept
682  
        await_ready() const noexcept
683  
        {
683  
        {
684  
            return false;
684  
            return false;
685  
        }
685  
        }
686  

686  

687  
        coro
687  
        coro
688 -
        await_suspend(coro h, executor_ref ex, std::stop_token token)
688 +
        await_suspend(coro h, executor_ref const& ex, std::stop_token const& token)
689  
        {
689  
        {
690  
            self_->active_read_ops_ =
690  
            self_->active_read_ops_ =
691  
                self_->vt_->construct_read_some_awaitable(
691  
                self_->vt_->construct_read_some_awaitable(
692  
                    self_->source_,
692  
                    self_->source_,
693  
                    self_->cached_awaitable_,
693  
                    self_->cached_awaitable_,
694  
                    buffers_);
694  
                    buffers_);
695  

695  

696  
            if(self_->active_read_ops_->await_ready(
696  
            if(self_->active_read_ops_->await_ready(
697  
                self_->cached_awaitable_))
697  
                self_->cached_awaitable_))
698  
                return h;
698  
                return h;
699  

699  

700  
            return self_->active_read_ops_->await_suspend(
700  
            return self_->active_read_ops_->await_suspend(
701  
                self_->cached_awaitable_, h, ex, token);
701  
                self_->cached_awaitable_, h, ex, token);
702  
        }
702  
        }
703  

703  

704  
        io_result<std::size_t>
704  
        io_result<std::size_t>
705  
        await_resume()
705  
        await_resume()
706  
        {
706  
        {
707  
            struct guard {
707  
            struct guard {
708  
                any_buffer_source* self;
708  
                any_buffer_source* self;
709  
                ~guard() {
709  
                ~guard() {
710  
                    self->active_read_ops_->destroy(
710  
                    self->active_read_ops_->destroy(
711  
                        self->cached_awaitable_);
711  
                        self->cached_awaitable_);
712  
                    self->active_read_ops_ = nullptr;
712  
                    self->active_read_ops_ = nullptr;
713  
                }
713  
                }
714  
            } g{self_};
714  
            } g{self_};
715  
            return self_->active_read_ops_->await_resume(
715  
            return self_->active_read_ops_->await_resume(
716  
                self_->cached_awaitable_);
716  
                self_->cached_awaitable_);
717  
        }
717  
        }
718  
    };
718  
    };
719  
    return awaitable{this, buffers};
719  
    return awaitable{this, buffers};
720  
}
720  
}
721  

721  

722  
inline auto
722  
inline auto
723  
any_buffer_source::read_(
723  
any_buffer_source::read_(
724  
    std::span<mutable_buffer const> buffers)
724  
    std::span<mutable_buffer const> buffers)
725  
{
725  
{
726  
    struct awaitable
726  
    struct awaitable
727  
    {
727  
    {
728  
        any_buffer_source* self_;
728  
        any_buffer_source* self_;
729  
        std::span<mutable_buffer const> buffers_;
729  
        std::span<mutable_buffer const> buffers_;
730  

730  

731  
        bool
731  
        bool
732  
        await_ready() const noexcept
732  
        await_ready() const noexcept
733  
        {
733  
        {
734  
            return false;
734  
            return false;
735  
        }
735  
        }
736  

736  

737  
        coro
737  
        coro
738 -
        await_suspend(coro h, executor_ref ex, std::stop_token token)
738 +
        await_suspend(coro h, executor_ref const& ex, std::stop_token const& token)
739  
        {
739  
        {
740  
            self_->active_read_ops_ =
740  
            self_->active_read_ops_ =
741  
                self_->vt_->construct_read_awaitable(
741  
                self_->vt_->construct_read_awaitable(
742  
                    self_->source_,
742  
                    self_->source_,
743  
                    self_->cached_awaitable_,
743  
                    self_->cached_awaitable_,
744  
                    buffers_);
744  
                    buffers_);
745  

745  

746  
            if(self_->active_read_ops_->await_ready(
746  
            if(self_->active_read_ops_->await_ready(
747  
                self_->cached_awaitable_))
747  
                self_->cached_awaitable_))
748  
                return h;
748  
                return h;
749  

749  

750  
            return self_->active_read_ops_->await_suspend(
750  
            return self_->active_read_ops_->await_suspend(
751  
                self_->cached_awaitable_, h, ex, token);
751  
                self_->cached_awaitable_, h, ex, token);
752  
        }
752  
        }
753  

753  

754  
        io_result<std::size_t>
754  
        io_result<std::size_t>
755  
        await_resume()
755  
        await_resume()
756  
        {
756  
        {
757  
            struct guard {
757  
            struct guard {
758  
                any_buffer_source* self;
758  
                any_buffer_source* self;
759  
                ~guard() {
759  
                ~guard() {
760  
                    self->active_read_ops_->destroy(
760  
                    self->active_read_ops_->destroy(
761  
                        self->cached_awaitable_);
761  
                        self->cached_awaitable_);
762  
                    self->active_read_ops_ = nullptr;
762  
                    self->active_read_ops_ = nullptr;
763  
                }
763  
                }
764  
            } g{self_};
764  
            } g{self_};
765  
            return self_->active_read_ops_->await_resume(
765  
            return self_->active_read_ops_->await_resume(
766  
                self_->cached_awaitable_);
766  
                self_->cached_awaitable_);
767  
        }
767  
        }
768  
    };
768  
    };
769  
    return awaitable{this, buffers};
769  
    return awaitable{this, buffers};
770  
}
770  
}
771  

771  

772  
//----------------------------------------------------------
772  
//----------------------------------------------------------
773  
// Public ReadSource methods
773  
// Public ReadSource methods
774  

774  

775  
template<MutableBufferSequence MB>
775  
template<MutableBufferSequence MB>
776  
io_task<std::size_t>
776  
io_task<std::size_t>
777  
any_buffer_source::read_some(MB buffers)
777  
any_buffer_source::read_some(MB buffers)
778  
{
778  
{
779  
    buffer_param<MB> bp(buffers);
779  
    buffer_param<MB> bp(buffers);
780  
    auto dest = bp.data();
780  
    auto dest = bp.data();
781  
    if(dest.empty())
781  
    if(dest.empty())
782  
        co_return {{}, 0};
782  
        co_return {{}, 0};
783  

783  

784  
    // Native ReadSource path
784  
    // Native ReadSource path
785  
    if(vt_->construct_read_some_awaitable)
785  
    if(vt_->construct_read_some_awaitable)
786  
        co_return co_await read_some_(dest);
786  
        co_return co_await read_some_(dest);
787  

787  

788  
    // Synthesized path: pull + buffer_copy + consume
788  
    // Synthesized path: pull + buffer_copy + consume
789  
    const_buffer arr[detail::max_iovec_];
789  
    const_buffer arr[detail::max_iovec_];
790  
    auto [ec, bufs] = co_await pull(arr);
790  
    auto [ec, bufs] = co_await pull(arr);
791  
    if(ec)
791  
    if(ec)
792  
        co_return {ec, 0};
792  
        co_return {ec, 0};
793  

793  

794  
    auto n = buffer_copy(dest, bufs);
794  
    auto n = buffer_copy(dest, bufs);
795  
    consume(n);
795  
    consume(n);
796  
    co_return {{}, n};
796  
    co_return {{}, n};
797  
}
797  
}
798  

798  

799  
template<MutableBufferSequence MB>
799  
template<MutableBufferSequence MB>
800  
io_task<std::size_t>
800  
io_task<std::size_t>
801  
any_buffer_source::read(MB buffers)
801  
any_buffer_source::read(MB buffers)
802  
{
802  
{
803  
    buffer_param<MB> bp(buffers);
803  
    buffer_param<MB> bp(buffers);
804  
    std::size_t total = 0;
804  
    std::size_t total = 0;
805  

805  

806  
    // Native ReadSource path
806  
    // Native ReadSource path
807  
    if(vt_->construct_read_awaitable)
807  
    if(vt_->construct_read_awaitable)
808  
    {
808  
    {
809  
        for(;;)
809  
        for(;;)
810  
        {
810  
        {
811  
            auto dest = bp.data();
811  
            auto dest = bp.data();
812  
            if(dest.empty())
812  
            if(dest.empty())
813  
                break;
813  
                break;
814  

814  

815  
            auto [ec, n] = co_await read_(dest);
815  
            auto [ec, n] = co_await read_(dest);
816  
            total += n;
816  
            total += n;
817  
            if(ec)
817  
            if(ec)
818  
                co_return {ec, total};
818  
                co_return {ec, total};
819  
            bp.consume(n);
819  
            bp.consume(n);
820  
        }
820  
        }
821  
        co_return {{}, total};
821  
        co_return {{}, total};
822  
    }
822  
    }
823  

823  

824  
    // Synthesized path: pull + buffer_copy + consume
824  
    // Synthesized path: pull + buffer_copy + consume
825  
    for(;;)
825  
    for(;;)
826  
    {
826  
    {
827  
        auto dest = bp.data();
827  
        auto dest = bp.data();
828  
        if(dest.empty())
828  
        if(dest.empty())
829  
            break;
829  
            break;
830  

830  

831  
        const_buffer arr[detail::max_iovec_];
831  
        const_buffer arr[detail::max_iovec_];
832  
        auto [ec, bufs] = co_await pull(arr);
832  
        auto [ec, bufs] = co_await pull(arr);
833  

833  

834  
        if(ec)
834  
        if(ec)
835  
            co_return {ec, total};
835  
            co_return {ec, total};
836  

836  

837  
        auto n = buffer_copy(dest, bufs);
837  
        auto n = buffer_copy(dest, bufs);
838  
        consume(n);
838  
        consume(n);
839  
        total += n;
839  
        total += n;
840  
        bp.consume(n);
840  
        bp.consume(n);
841  
    }
841  
    }
842  

842  

843  
    co_return {{}, total};
843  
    co_return {{}, total};
844  
}
844  
}
845  

845  

846  
//----------------------------------------------------------
846  
//----------------------------------------------------------
847  

847  

848  
static_assert(BufferSource<any_buffer_source>);
848  
static_assert(BufferSource<any_buffer_source>);
849  
static_assert(ReadSource<any_buffer_source>);
849  
static_assert(ReadSource<any_buffer_source>);
850  

850  

851  
} // namespace capy
851  
} // namespace capy
852  
} // namespace boost
852  
} // namespace boost
853  

853  

854  
#endif
854  
#endif