1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_TEST_WRITE_SINK_HPP
10  
#ifndef BOOST_CAPY_TEST_WRITE_SINK_HPP
11  
#define BOOST_CAPY_TEST_WRITE_SINK_HPP
11  
#define BOOST_CAPY_TEST_WRITE_SINK_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/buffers.hpp>
14  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers/buffer_copy.hpp>
15  
#include <boost/capy/buffers/buffer_copy.hpp>
16  
#include <boost/capy/buffers/make_buffer.hpp>
16  
#include <boost/capy/buffers/make_buffer.hpp>
17  
#include <boost/capy/coro.hpp>
17  
#include <boost/capy/coro.hpp>
18  
#include <boost/capy/ex/executor_ref.hpp>
18  
#include <boost/capy/ex/executor_ref.hpp>
19  
#include <boost/capy/io_result.hpp>
19  
#include <boost/capy/io_result.hpp>
20  
#include <boost/capy/error.hpp>
20  
#include <boost/capy/error.hpp>
21  
#include <boost/capy/test/fuse.hpp>
21  
#include <boost/capy/test/fuse.hpp>
22  

22  

23  
#include <algorithm>
23  
#include <algorithm>
24  
#include <stop_token>
24  
#include <stop_token>
25  
#include <string>
25  
#include <string>
26  
#include <string_view>
26  
#include <string_view>
27  

27  

28  
namespace boost {
28  
namespace boost {
29  
namespace capy {
29  
namespace capy {
30  
namespace test {
30  
namespace test {
31  

31  

32  
/** A mock sink for testing write operations.
32  
/** A mock sink for testing write operations.
33  

33  

34  
    Use this to verify code that performs complete writes without needing
34  
    Use this to verify code that performs complete writes without needing
35  
    real I/O. Call @ref write to write data, then @ref data to retrieve
35  
    real I/O. Call @ref write to write data, then @ref data to retrieve
36  
    what was written. The associated @ref fuse enables error injection
36  
    what was written. The associated @ref fuse enables error injection
37  
    at controlled points.
37  
    at controlled points.
38  

38  

39  
    This class satisfies the @ref WriteSink concept by providing partial
39  
    This class satisfies the @ref WriteSink concept by providing partial
40  
    writes via `write_some` (satisfying @ref WriteStream), complete
40  
    writes via `write_some` (satisfying @ref WriteStream), complete
41  
    writes via `write`, and EOF signaling via `write_eof`.
41  
    writes via `write`, and EOF signaling via `write_eof`.
42  

42  

43  
    @par Thread Safety
43  
    @par Thread Safety
44  
    Not thread-safe.
44  
    Not thread-safe.
45  

45  

46  
    @par Example
46  
    @par Example
47  
    @code
47  
    @code
48  
    fuse f;
48  
    fuse f;
49  
    write_sink ws( f );
49  
    write_sink ws( f );
50  

50  

51  
    auto r = f.armed( [&]( fuse& ) -> task<void> {
51  
    auto r = f.armed( [&]( fuse& ) -> task<void> {
52  
        auto [ec, n] = co_await ws.write(
52  
        auto [ec, n] = co_await ws.write(
53  
            const_buffer( "Hello", 5 ) );
53  
            const_buffer( "Hello", 5 ) );
54  
        if( ec )
54  
        if( ec )
55  
            co_return;
55  
            co_return;
56  
        auto [ec2] = co_await ws.write_eof();
56  
        auto [ec2] = co_await ws.write_eof();
57  
        if( ec2 )
57  
        if( ec2 )
58  
            co_return;
58  
            co_return;
59  
        // ws.data() returns "Hello"
59  
        // ws.data() returns "Hello"
60  
    } );
60  
    } );
61  
    @endcode
61  
    @endcode
62  

62  

63  
    @see fuse, WriteSink
63  
    @see fuse, WriteSink
64  
*/
64  
*/
65  
class write_sink
65  
class write_sink
66  
{
66  
{
67  
    fuse f_;
67  
    fuse f_;
68  
    std::string data_;
68  
    std::string data_;
69  
    std::string expect_;
69  
    std::string expect_;
70  
    std::size_t max_write_size_;
70  
    std::size_t max_write_size_;
71  
    bool eof_called_ = false;
71  
    bool eof_called_ = false;
72  

72  

73  
    std::error_code
73  
    std::error_code
74  
    consume_match_() noexcept
74  
    consume_match_() noexcept
75  
    {
75  
    {
76  
        if(data_.empty() || expect_.empty())
76  
        if(data_.empty() || expect_.empty())
77  
            return {};
77  
            return {};
78  
        std::size_t const n = (std::min)(data_.size(), expect_.size());
78  
        std::size_t const n = (std::min)(data_.size(), expect_.size());
79  
        if(std::string_view(data_.data(), n) !=
79  
        if(std::string_view(data_.data(), n) !=
80  
            std::string_view(expect_.data(), n))
80  
            std::string_view(expect_.data(), n))
81  
            return error::test_failure;
81  
            return error::test_failure;
82  
        data_.erase(0, n);
82  
        data_.erase(0, n);
83  
        expect_.erase(0, n);
83  
        expect_.erase(0, n);
84  
        return {};
84  
        return {};
85  
    }
85  
    }
86  

86  

87  
public:
87  
public:
88  
    /** Construct a write sink.
88  
    /** Construct a write sink.
89  

89  

90  
        @param f The fuse used to inject errors during writes.
90  
        @param f The fuse used to inject errors during writes.
91  

91  

92  
        @param max_write_size Maximum bytes transferred per write.
92  
        @param max_write_size Maximum bytes transferred per write.
93  
        Use to simulate chunked delivery.
93  
        Use to simulate chunked delivery.
94  
    */
94  
    */
95  
    explicit write_sink(
95  
    explicit write_sink(
96  
        fuse f = {},
96  
        fuse f = {},
97  
        std::size_t max_write_size = std::size_t(-1)) noexcept
97  
        std::size_t max_write_size = std::size_t(-1)) noexcept
98  
        : f_(std::move(f))
98  
        : f_(std::move(f))
99  
        , max_write_size_(max_write_size)
99  
        , max_write_size_(max_write_size)
100  
    {
100  
    {
101  
    }
101  
    }
102  

102  

103  
    /// Return the written data as a string view.
103  
    /// Return the written data as a string view.
104  
    std::string_view
104  
    std::string_view
105  
    data() const noexcept
105  
    data() const noexcept
106  
    {
106  
    {
107  
        return data_;
107  
        return data_;
108  
    }
108  
    }
109  

109  

110  
    /** Set the expected data for subsequent writes.
110  
    /** Set the expected data for subsequent writes.
111  

111  

112  
        Stores the expected data and immediately tries to match
112  
        Stores the expected data and immediately tries to match
113  
        against any data already written. Matched data is consumed
113  
        against any data already written. Matched data is consumed
114  
        from both buffers.
114  
        from both buffers.
115  

115  

116  
        @param sv The expected data.
116  
        @param sv The expected data.
117  

117  

118  
        @return An error if existing data does not match.
118  
        @return An error if existing data does not match.
119  
    */
119  
    */
120  
    std::error_code
120  
    std::error_code
121  
    expect(std::string_view sv)
121  
    expect(std::string_view sv)
122  
    {
122  
    {
123  
        expect_.assign(sv);
123  
        expect_.assign(sv);
124  
        return consume_match_();
124  
        return consume_match_();
125  
    }
125  
    }
126  

126  

127  
    /// Return the number of bytes written.
127  
    /// Return the number of bytes written.
128  
    std::size_t
128  
    std::size_t
129  
    size() const noexcept
129  
    size() const noexcept
130  
    {
130  
    {
131  
        return data_.size();
131  
        return data_.size();
132  
    }
132  
    }
133  

133  

134  
    /// Return whether write_eof has been called.
134  
    /// Return whether write_eof has been called.
135  
    bool
135  
    bool
136  
    eof_called() const noexcept
136  
    eof_called() const noexcept
137  
    {
137  
    {
138  
        return eof_called_;
138  
        return eof_called_;
139  
    }
139  
    }
140  

140  

141  
    /// Clear all data and reset state.
141  
    /// Clear all data and reset state.
142  
    void
142  
    void
143  
    clear() noexcept
143  
    clear() noexcept
144  
    {
144  
    {
145  
        data_.clear();
145  
        data_.clear();
146  
        expect_.clear();
146  
        expect_.clear();
147  
        eof_called_ = false;
147  
        eof_called_ = false;
148  
    }
148  
    }
149  

149  

150  
    /** Asynchronously write some data to the sink.
150  
    /** Asynchronously write some data to the sink.
151  

151  

152  
        Transfers up to `buffer_size( buffers )` bytes from the provided
152  
        Transfers up to `buffer_size( buffers )` bytes from the provided
153  
        const buffer sequence to the internal buffer. Before every write,
153  
        const buffer sequence to the internal buffer. Before every write,
154  
        the attached @ref fuse is consulted to possibly inject an error.
154  
        the attached @ref fuse is consulted to possibly inject an error.
155  

155  

156  
        @param buffers The const buffer sequence containing data to write.
156  
        @param buffers The const buffer sequence containing data to write.
157  

157  

158  
        @return An awaitable yielding `(error_code,std::size_t)`.
158  
        @return An awaitable yielding `(error_code,std::size_t)`.
159  

159  

160  
        @see fuse
160  
        @see fuse
161  
    */
161  
    */
162  
    template<ConstBufferSequence CB>
162  
    template<ConstBufferSequence CB>
163  
    auto
163  
    auto
164  
    write_some(CB buffers)
164  
    write_some(CB buffers)
165  
    {
165  
    {
166  
        struct awaitable
166  
        struct awaitable
167  
        {
167  
        {
168  
            write_sink* self_;
168  
            write_sink* self_;
169  
            CB buffers_;
169  
            CB buffers_;
170  

170  

171  
            bool await_ready() const noexcept { return true; }
171  
            bool await_ready() const noexcept { return true; }
172  

172  

173  
            void await_suspend(
173  
            void await_suspend(
174  
                coro,
174  
                coro,
175  
                executor_ref,
175  
                executor_ref,
176  
                std::stop_token) const noexcept
176  
                std::stop_token) const noexcept
177  
            {
177  
            {
178  
            }
178  
            }
179  

179  

180  
            io_result<std::size_t>
180  
            io_result<std::size_t>
181  
            await_resume()
181  
            await_resume()
182  
            {
182  
            {
183  
                if(buffer_empty(buffers_))
183  
                if(buffer_empty(buffers_))
184  
                    return {{}, 0};
184  
                    return {{}, 0};
185  

185  

186  
                auto ec = self_->f_.maybe_fail();
186  
                auto ec = self_->f_.maybe_fail();
187  
                if(ec)
187  
                if(ec)
188  
                    return {ec, 0};
188  
                    return {ec, 0};
189  

189  

190  
                std::size_t n = buffer_size(buffers_);
190  
                std::size_t n = buffer_size(buffers_);
191  
                n = (std::min)(n, self_->max_write_size_);
191  
                n = (std::min)(n, self_->max_write_size_);
192  

192  

193  
                std::size_t const old_size = self_->data_.size();
193  
                std::size_t const old_size = self_->data_.size();
194  
                self_->data_.resize(old_size + n);
194  
                self_->data_.resize(old_size + n);
195  
                buffer_copy(make_buffer(
195  
                buffer_copy(make_buffer(
196  
                    self_->data_.data() + old_size, n), buffers_, n);
196  
                    self_->data_.data() + old_size, n), buffers_, n);
197  

197  

198  
                ec = self_->consume_match_();
198  
                ec = self_->consume_match_();
199  
                if(ec)
199  
                if(ec)
200  
                {
200  
                {
201  
                    self_->data_.resize(old_size);
201  
                    self_->data_.resize(old_size);
202  
                    return {ec, 0};
202  
                    return {ec, 0};
203  
                }
203  
                }
204  

204  

205  
                return {{}, n};
205  
                return {{}, n};
206  
            }
206  
            }
207  
        };
207  
        };
208  
        return awaitable{this, buffers};
208  
        return awaitable{this, buffers};
209  
    }
209  
    }
210  

210  

211  
    /** Asynchronously write data to the sink.
211  
    /** Asynchronously write data to the sink.
212  

212  

213  
        Transfers all bytes from the provided const buffer sequence
213  
        Transfers all bytes from the provided const buffer sequence
214  
        to the internal buffer. Unlike @ref write_some, this ignores
214  
        to the internal buffer. Unlike @ref write_some, this ignores
215  
        `max_write_size` and writes all available data, matching the
215  
        `max_write_size` and writes all available data, matching the
216  
        @ref WriteSink semantic contract.
216  
        @ref WriteSink semantic contract.
217  

217  

218  
        @param buffers The const buffer sequence containing data to write.
218  
        @param buffers The const buffer sequence containing data to write.
219  

219  

220  
        @return An awaitable yielding `(error_code,std::size_t)`.
220  
        @return An awaitable yielding `(error_code,std::size_t)`.
221  

221  

222  
        @see fuse
222  
        @see fuse
223  
    */
223  
    */
224  
    template<ConstBufferSequence CB>
224  
    template<ConstBufferSequence CB>
225  
    auto
225  
    auto
226  
    write(CB buffers)
226  
    write(CB buffers)
227  
    {
227  
    {
228  
        struct awaitable
228  
        struct awaitable
229  
        {
229  
        {
230  
            write_sink* self_;
230  
            write_sink* self_;
231  
            CB buffers_;
231  
            CB buffers_;
232  

232  

233  
            bool await_ready() const noexcept { return true; }
233  
            bool await_ready() const noexcept { return true; }
234  

234  

235  
            void await_suspend(
235  
            void await_suspend(
236  
                coro,
236  
                coro,
237  
                executor_ref,
237  
                executor_ref,
238  
                std::stop_token) const noexcept
238  
                std::stop_token) const noexcept
239  
            {
239  
            {
240  
            }
240  
            }
241  

241  

242  
            io_result<std::size_t>
242  
            io_result<std::size_t>
243  
            await_resume()
243  
            await_resume()
244  
            {
244  
            {
245  
                auto ec = self_->f_.maybe_fail();
245  
                auto ec = self_->f_.maybe_fail();
246  
                if(ec)
246  
                if(ec)
247  
                    return {ec, 0};
247  
                    return {ec, 0};
248  

248  

249  
                std::size_t n = buffer_size(buffers_);
249  
                std::size_t n = buffer_size(buffers_);
250  
                if(n == 0)
250  
                if(n == 0)
251  
                    return {{}, 0};
251  
                    return {{}, 0};
252  

252  

253  
                std::size_t const old_size = self_->data_.size();
253  
                std::size_t const old_size = self_->data_.size();
254  
                self_->data_.resize(old_size + n);
254  
                self_->data_.resize(old_size + n);
255  
                buffer_copy(make_buffer(
255  
                buffer_copy(make_buffer(
256  
                    self_->data_.data() + old_size, n), buffers_);
256  
                    self_->data_.data() + old_size, n), buffers_);
257  

257  

258  
                ec = self_->consume_match_();
258  
                ec = self_->consume_match_();
259  
                if(ec)
259  
                if(ec)
260  
                    return {ec, n};
260  
                    return {ec, n};
261  

261  

262  
                return {{}, n};
262  
                return {{}, n};
263  
            }
263  
            }
264  
        };
264  
        };
265  
        return awaitable{this, buffers};
265  
        return awaitable{this, buffers};
266  
    }
266  
    }
267  

267  

268  
    /** Atomically write data and signal end-of-stream.
268  
    /** Atomically write data and signal end-of-stream.
269  

269  

270  
        Transfers all bytes from the provided const buffer sequence to
270  
        Transfers all bytes from the provided const buffer sequence to
271  
        the internal buffer and signals end-of-stream. Before the write,
271  
        the internal buffer and signals end-of-stream. Before the write,
272  
        the attached @ref fuse is consulted to possibly inject an error
272  
        the attached @ref fuse is consulted to possibly inject an error
273  
        for testing fault scenarios.
273  
        for testing fault scenarios.
274  

274  

275  
        @par Effects
275  
        @par Effects
276  
        On success, appends the written bytes to the internal buffer
276  
        On success, appends the written bytes to the internal buffer
277  
        and marks the sink as finalized.
277  
        and marks the sink as finalized.
278  
        If an error is injected by the fuse, the internal buffer remains
278  
        If an error is injected by the fuse, the internal buffer remains
279  
        unchanged.
279  
        unchanged.
280  

280  

281  
        @par Exception Safety
281  
        @par Exception Safety
282  
        No-throw guarantee.
282  
        No-throw guarantee.
283  

283  

284  
        @param buffers The const buffer sequence containing data to write.
284  
        @param buffers The const buffer sequence containing data to write.
285  

285  

286  
        @return An awaitable yielding `(error_code,std::size_t)`.
286  
        @return An awaitable yielding `(error_code,std::size_t)`.
287  

287  

288  
        @see fuse
288  
        @see fuse
289  
    */
289  
    */
290  
    template<ConstBufferSequence CB>
290  
    template<ConstBufferSequence CB>
291  
    auto
291  
    auto
292  
    write_eof(CB buffers)
292  
    write_eof(CB buffers)
293  
    {
293  
    {
294  
        struct awaitable
294  
        struct awaitable
295  
        {
295  
        {
296  
            write_sink* self_;
296  
            write_sink* self_;
297  
            CB buffers_;
297  
            CB buffers_;
298  

298  

299  
            bool await_ready() const noexcept { return true; }
299  
            bool await_ready() const noexcept { return true; }
300  

300  

301  
            void await_suspend(
301  
            void await_suspend(
302  
                coro,
302  
                coro,
303  
                executor_ref,
303  
                executor_ref,
304  
                std::stop_token) const noexcept
304  
                std::stop_token) const noexcept
305  
            {
305  
            {
306  
            }
306  
            }
307  

307  

308  
            io_result<std::size_t>
308  
            io_result<std::size_t>
309  
            await_resume()
309  
            await_resume()
310  
            {
310  
            {
311  
                auto ec = self_->f_.maybe_fail();
311  
                auto ec = self_->f_.maybe_fail();
312  
                if(ec)
312  
                if(ec)
313  
                    return {ec, 0};
313  
                    return {ec, 0};
314  

314  

315  
                std::size_t n = buffer_size(buffers_);
315  
                std::size_t n = buffer_size(buffers_);
316  
                if(n > 0)
316  
                if(n > 0)
317  
                {
317  
                {
318  
                    std::size_t const old_size = self_->data_.size();
318  
                    std::size_t const old_size = self_->data_.size();
319  
                    self_->data_.resize(old_size + n);
319  
                    self_->data_.resize(old_size + n);
320  
                    buffer_copy(make_buffer(
320  
                    buffer_copy(make_buffer(
321  
                        self_->data_.data() + old_size, n), buffers_);
321  
                        self_->data_.data() + old_size, n), buffers_);
322  

322  

323  
                    ec = self_->consume_match_();
323  
                    ec = self_->consume_match_();
324  
                    if(ec)
324  
                    if(ec)
325  
                        return {ec, n};
325  
                        return {ec, n};
326  
                }
326  
                }
327  

327  

328  
                self_->eof_called_ = true;
328  
                self_->eof_called_ = true;
329  

329  

330  
                return {{}, n};
330  
                return {{}, n};
331  
            }
331  
            }
332  
        };
332  
        };
333  
        return awaitable{this, buffers};
333  
        return awaitable{this, buffers};
334  
    }
334  
    }
335  

335  

336  
    /** Signal end-of-stream.
336  
    /** Signal end-of-stream.
337  

337  

338  
        Marks the sink as finalized, indicating no more data will be
338  
        Marks the sink as finalized, indicating no more data will be
339  
        written. Before signaling, the attached @ref fuse is consulted
339  
        written. Before signaling, the attached @ref fuse is consulted
340  
        to possibly inject an error for testing fault scenarios.
340  
        to possibly inject an error for testing fault scenarios.
341  

341  

342  
        @par Effects
342  
        @par Effects
343  
        On success, marks the sink as finalized.
343  
        On success, marks the sink as finalized.
344  
        If an error is injected by the fuse, the state remains unchanged.
344  
        If an error is injected by the fuse, the state remains unchanged.
345  

345  

346  
        @par Exception Safety
346  
        @par Exception Safety
347  
        No-throw guarantee.
347  
        No-throw guarantee.
348  

348  

349  
        @return An awaitable yielding `(error_code)`.
349  
        @return An awaitable yielding `(error_code)`.
350  

350  

351  
        @see fuse
351  
        @see fuse
352  
    */
352  
    */
353  
    auto
353  
    auto
354  
    write_eof()
354  
    write_eof()
355  
    {
355  
    {
356  
        struct awaitable
356  
        struct awaitable
357  
        {
357  
        {
358  
            write_sink* self_;
358  
            write_sink* self_;
359  

359  

360  
            bool await_ready() const noexcept { return true; }
360  
            bool await_ready() const noexcept { return true; }
361  

361  

362  
            // This method is required to satisfy Capy's IoAwaitable concept,
362  
            // This method is required to satisfy Capy's IoAwaitable concept,
363  
            // but is never called because await_ready() returns true.
363  
            // but is never called because await_ready() returns true.
364  
            // See the comment on write(CB buffers) for a detailed explanation.
364  
            // See the comment on write(CB buffers) for a detailed explanation.
365  
            void await_suspend(
365  
            void await_suspend(
366  
                coro,
366  
                coro,
367  
                executor_ref,
367  
                executor_ref,
368  
                std::stop_token) const noexcept
368  
                std::stop_token) const noexcept
369  
            {
369  
            {
370  
            }
370  
            }
371  

371  

372  
            io_result<>
372  
            io_result<>
373  
            await_resume()
373  
            await_resume()
374  
            {
374  
            {
375  
                auto ec = self_->f_.maybe_fail();
375  
                auto ec = self_->f_.maybe_fail();
376  
                if(ec)
376  
                if(ec)
377  
                    return {ec};
377  
                    return {ec};
378  

378  

379  
                self_->eof_called_ = true;
379  
                self_->eof_called_ = true;
380  
                return {};
380  
                return {};
381  
            }
381  
            }
382  
        };
382  
        };
383  
        return awaitable{this};
383  
        return awaitable{this};
384  
    }
384  
    }
385  
};
385  
};
386  

386  

387  
} // test
387  
} // test
388  
} // capy
388  
} // capy
389  
} // boost
389  
} // boost
390  

390  

391  
#endif
391  
#endif