1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_TEST_READ_STREAM_HPP
10  
#ifndef BOOST_CAPY_TEST_READ_STREAM_HPP
11  
#define BOOST_CAPY_TEST_READ_STREAM_HPP
11  
#define BOOST_CAPY_TEST_READ_STREAM_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/buffers.hpp>
14  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers/buffer_copy.hpp>
15  
#include <boost/capy/buffers/buffer_copy.hpp>
16  
#include <boost/capy/buffers/make_buffer.hpp>
16  
#include <boost/capy/buffers/make_buffer.hpp>
17  
#include <boost/capy/cond.hpp>
17  
#include <boost/capy/cond.hpp>
18  
#include <boost/capy/coro.hpp>
18  
#include <boost/capy/coro.hpp>
19  
#include <boost/capy/ex/executor_ref.hpp>
19  
#include <boost/capy/ex/executor_ref.hpp>
20  
#include <boost/capy/io_result.hpp>
20  
#include <boost/capy/io_result.hpp>
21  
#include <boost/capy/test/fuse.hpp>
21  
#include <boost/capy/test/fuse.hpp>
22  

22  

23  
#include <stop_token>
23  
#include <stop_token>
24  
#include <string>
24  
#include <string>
25  
#include <string_view>
25  
#include <string_view>
26  

26  

27  
namespace boost {
27  
namespace boost {
28  
namespace capy {
28  
namespace capy {
29  
namespace test {
29  
namespace test {
30  

30  

31  
/** A mock stream for testing read operations.
31  
/** A mock stream for testing read operations.
32  

32  

33  
    Use this to verify code that performs reads without needing
33  
    Use this to verify code that performs reads without needing
34  
    real I/O. Call @ref provide to supply data, then @ref read_some
34  
    real I/O. Call @ref provide to supply data, then @ref read_some
35  
    to consume it. The associated @ref fuse enables error injection
35  
    to consume it. The associated @ref fuse enables error injection
36  
    at controlled points. An optional `max_read_size` constructor
36  
    at controlled points. An optional `max_read_size` constructor
37  
    parameter limits bytes per read to simulate chunked delivery.
37  
    parameter limits bytes per read to simulate chunked delivery.
38  

38  

39  
    This class satisfies the @ref ReadStream concept.
39  
    This class satisfies the @ref ReadStream concept.
40  

40  

41  
    @par Thread Safety
41  
    @par Thread Safety
42  
    Not thread-safe.
42  
    Not thread-safe.
43  

43  

44  
    @par Example
44  
    @par Example
45  
    @code
45  
    @code
46  
    fuse f;
46  
    fuse f;
47  
    read_stream rs( f );
47  
    read_stream rs( f );
48  
    rs.provide( "Hello, " );
48  
    rs.provide( "Hello, " );
49  
    rs.provide( "World!" );
49  
    rs.provide( "World!" );
50  

50  

51  
    auto r = f.armed( [&]( fuse& ) -> task<void> {
51  
    auto r = f.armed( [&]( fuse& ) -> task<void> {
52  
        char buf[32];
52  
        char buf[32];
53  
        auto [ec, n] = co_await rs.read_some(
53  
        auto [ec, n] = co_await rs.read_some(
54  
            mutable_buffer( buf, sizeof( buf ) ) );
54  
            mutable_buffer( buf, sizeof( buf ) ) );
55  
        if( ec )
55  
        if( ec )
56  
            co_return;
56  
            co_return;
57  
        // buf contains "Hello, World!"
57  
        // buf contains "Hello, World!"
58  
    } );
58  
    } );
59  
    @endcode
59  
    @endcode
60  

60  

61  
    @see fuse, ReadStream
61  
    @see fuse, ReadStream
62  
*/
62  
*/
63  
class read_stream
63  
class read_stream
64  
{
64  
{
65  
    fuse f_;
65  
    fuse f_;
66  
    std::string data_;
66  
    std::string data_;
67  
    std::size_t pos_ = 0;
67  
    std::size_t pos_ = 0;
68  
    std::size_t max_read_size_;
68  
    std::size_t max_read_size_;
69  

69  

70  
public:
70  
public:
71  
    /** Construct a read stream.
71  
    /** Construct a read stream.
72  

72  

73  
        @param f The fuse used to inject errors during reads.
73  
        @param f The fuse used to inject errors during reads.
74  

74  

75  
        @param max_read_size Maximum bytes returned per read.
75  
        @param max_read_size Maximum bytes returned per read.
76  
        Use to simulate chunked network delivery.
76  
        Use to simulate chunked network delivery.
77  
    */
77  
    */
78  
    explicit read_stream(
78  
    explicit read_stream(
79  
        fuse f = {},
79  
        fuse f = {},
80  
        std::size_t max_read_size = std::size_t(-1)) noexcept
80  
        std::size_t max_read_size = std::size_t(-1)) noexcept
81  
        : f_(std::move(f))
81  
        : f_(std::move(f))
82  
        , max_read_size_(max_read_size)
82  
        , max_read_size_(max_read_size)
83  
    {
83  
    {
84  
    }
84  
    }
85  

85  

86  
    /** Append data to be returned by subsequent reads.
86  
    /** Append data to be returned by subsequent reads.
87  

87  

88  
        Multiple calls accumulate data that @ref read_some returns.
88  
        Multiple calls accumulate data that @ref read_some returns.
89  

89  

90  
        @param sv The data to append.
90  
        @param sv The data to append.
91  
    */
91  
    */
92  
    void
92  
    void
93  
    provide(std::string_view sv)
93  
    provide(std::string_view sv)
94  
    {
94  
    {
95  
        data_.append(sv);
95  
        data_.append(sv);
96  
    }
96  
    }
97  

97  

98  
    /// Clear all data and reset the read position.
98  
    /// Clear all data and reset the read position.
99  
    void
99  
    void
100  
    clear() noexcept
100  
    clear() noexcept
101  
    {
101  
    {
102  
        data_.clear();
102  
        data_.clear();
103  
        pos_ = 0;
103  
        pos_ = 0;
104  
    }
104  
    }
105  

105  

106  
    /// Return the number of bytes available for reading.
106  
    /// Return the number of bytes available for reading.
107  
    std::size_t
107  
    std::size_t
108  
    available() const noexcept
108  
    available() const noexcept
109  
    {
109  
    {
110  
        return data_.size() - pos_;
110  
        return data_.size() - pos_;
111  
    }
111  
    }
112  

112  

113  
    /** Asynchronously read data from the stream.
113  
    /** Asynchronously read data from the stream.
114  

114  

115  
        Transfers up to `buffer_size( buffers )` bytes from the internal
115  
        Transfers up to `buffer_size( buffers )` bytes from the internal
116  
        buffer to the provided mutable buffer sequence. If no data remains,
116  
        buffer to the provided mutable buffer sequence. If no data remains,
117  
        returns `error::eof`. Before every read, the attached @ref fuse is
117  
        returns `error::eof`. Before every read, the attached @ref fuse is
118  
        consulted to possibly inject an error for testing fault scenarios.
118  
        consulted to possibly inject an error for testing fault scenarios.
119  
        The returned `std::size_t` is the number of bytes transferred.
119  
        The returned `std::size_t` is the number of bytes transferred.
120  

120  

121  
        @par Effects
121  
        @par Effects
122  
        On success, advances the internal read position by the number of
122  
        On success, advances the internal read position by the number of
123  
        bytes copied. If an error is injected by the fuse, the read position
123  
        bytes copied. If an error is injected by the fuse, the read position
124  
        remains unchanged.
124  
        remains unchanged.
125  

125  

126  
        @par Exception Safety
126  
        @par Exception Safety
127  
        No-throw guarantee.
127  
        No-throw guarantee.
128  

128  

129  
        @param buffers The mutable buffer sequence to receive data.
129  
        @param buffers The mutable buffer sequence to receive data.
130  

130  

131  
        @return An awaitable yielding `(error_code,std::size_t)`.
131  
        @return An awaitable yielding `(error_code,std::size_t)`.
132  

132  

133  
        @see fuse
133  
        @see fuse
134  
    */
134  
    */
135  
    template<MutableBufferSequence MB>
135  
    template<MutableBufferSequence MB>
136  
    auto
136  
    auto
137  
    read_some(MB buffers)
137  
    read_some(MB buffers)
138  
    {
138  
    {
139  
        struct awaitable
139  
        struct awaitable
140  
        {
140  
        {
141  
            read_stream* self_;
141  
            read_stream* self_;
142  
            MB buffers_;
142  
            MB buffers_;
143  

143  

144  
            bool await_ready() const noexcept { return true; }
144  
            bool await_ready() const noexcept { return true; }
145  

145  

146  
            // This method is required to satisfy Capy's IoAwaitable concept,
146  
            // This method is required to satisfy Capy's IoAwaitable concept,
147  
            // but is never called because await_ready() returns true.
147  
            // but is never called because await_ready() returns true.
148  
            //
148  
            //
149  
            // Capy uses a two-layer awaitable system: the promise's
149  
            // Capy uses a two-layer awaitable system: the promise's
150  
            // await_transform wraps awaitables in a transform_awaiter whose
150  
            // await_transform wraps awaitables in a transform_awaiter whose
151  
            // standard await_suspend(coroutine_handle) calls this custom
151  
            // standard await_suspend(coroutine_handle) calls this custom
152  
            // 3-argument overload, passing the executor and stop_token from
152  
            // 3-argument overload, passing the executor and stop_token from
153  
            // the coroutine's context. For synchronous test awaitables like
153  
            // the coroutine's context. For synchronous test awaitables like
154  
            // this one, the coroutine never suspends, so this is not invoked.
154  
            // this one, the coroutine never suspends, so this is not invoked.
155  
            // The signature exists to allow the same awaitable type to work
155  
            // The signature exists to allow the same awaitable type to work
156  
            // with both synchronous (test) and asynchronous (real I/O) code.
156  
            // with both synchronous (test) and asynchronous (real I/O) code.
157  
            void await_suspend(
157  
            void await_suspend(
158  
                coro,
158  
                coro,
159  
                executor_ref,
159  
                executor_ref,
160  
                std::stop_token) const noexcept
160  
                std::stop_token) const noexcept
161  
            {
161  
            {
162  
            }
162  
            }
163  

163  

164  
            io_result<std::size_t>
164  
            io_result<std::size_t>
165  
            await_resume()
165  
            await_resume()
166  
            {
166  
            {
167  
                // Empty buffer is a no-op regardless of
167  
                // Empty buffer is a no-op regardless of
168  
                // stream state or fuse.
168  
                // stream state or fuse.
169  
                if(buffer_empty(buffers_))
169  
                if(buffer_empty(buffers_))
170  
                    return {{}, 0};
170  
                    return {{}, 0};
171  

171  

172  
                auto ec = self_->f_.maybe_fail();
172  
                auto ec = self_->f_.maybe_fail();
173  
                if(ec)
173  
                if(ec)
174  
                    return {ec, 0};
174  
                    return {ec, 0};
175  

175  

176  
                if(self_->pos_ >= self_->data_.size())
176  
                if(self_->pos_ >= self_->data_.size())
177  
                    return {error::eof, 0};
177  
                    return {error::eof, 0};
178  

178  

179  
                std::size_t avail = self_->data_.size() - self_->pos_;
179  
                std::size_t avail = self_->data_.size() - self_->pos_;
180  
                if(avail > self_->max_read_size_)
180  
                if(avail > self_->max_read_size_)
181  
                    avail = self_->max_read_size_;
181  
                    avail = self_->max_read_size_;
182  
                auto src = make_buffer(self_->data_.data() + self_->pos_, avail);
182  
                auto src = make_buffer(self_->data_.data() + self_->pos_, avail);
183  
                std::size_t const n = buffer_copy(buffers_, src);
183  
                std::size_t const n = buffer_copy(buffers_, src);
184  
                self_->pos_ += n;
184  
                self_->pos_ += n;
185  
                return {{}, n};
185  
                return {{}, n};
186  
            }
186  
            }
187  
        };
187  
        };
188  
        return awaitable{this, buffers};
188  
        return awaitable{this, buffers};
189  
    }
189  
    }
190  
};
190  
};
191  

191  

192  
} // test
192  
} // test
193  
} // capy
193  
} // capy
194  
} // boost
194  
} // boost
195  

195  

196  
#endif
196  
#endif