| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 | 
							- #pragma once
 
- #include <stddef.h>    // for size_t
 
- #include <stdint.h>    // for uint32_t, uint8_t
 
- #include <atomic>      // for atomic
 
- #include <functional>  // for function
 
- #include <memory>      // for shared_ptr
 
- #include <mutex>       // for mutex
 
- #include <string>      // for string
 
- #include "BellTask.h"          // for Task
 
- #include "ByteStream.h"        // for ByteStream
 
- #include "WrappedSemaphore.h"  // for WrappedSemaphore
 
- /**
 
-  * This class implements a wrapper around an arbitrary bell::ByteStream,
 
-  * providing a circular reading buffer with configurable thresholds.
 
-  *
 
-  * The BufferedStream runs a bell::Task when it's started, so the caller can
 
-  * access the buffer's data asynchronously, whenever needed. The buffer is refilled
 
-  * automatically from source stream.
 
-  *
 
-  * The class implements bell::ByteStream's methods, although for proper functioning,
 
-  * the caller code should be modified to check isReady() and isNotReady() flags.
 
-  *
 
-  * If the actual reading code can't be modified, waitForReady allows to wait for buffer readiness
 
-  * during reading. Keep in mind that using the semaphore is probably more resource effective.
 
-  *
 
-  * The source stream (passed to open() or returned by the reader) should implement the read()
 
-  * method correctly, such as that 0 is returned if, and only if the stream ends.
 
-  */
 
- class BufferedStream : public bell::ByteStream, bell::Task {
 
-  public:
 
-   typedef std::shared_ptr<bell::ByteStream> StreamPtr;
 
-   typedef std::function<StreamPtr(uint32_t rangeStart)> StreamReader;
 
-  public:
 
-   /**
 
- 	 * @param taskName name to use for the reading task
 
- 	 * @param bufferSize total size of the reading buffer
 
- 	 * @param readThreshold how much can be read before refilling the buffer
 
- 	 * @param readSize amount of bytes to read from the source each time
 
- 	 * @param readyThreshold minimum amount of available bytes to report isReady()
 
- 	 * @param notReadyThreshold maximum amount of available bytes to report isNotReady()
 
- 	 * @param waitForReady whether to wait for the buffer to be ready during reading
 
- 	 * @param endWithSource whether to end the streaming as soon as source returns 0 from read()
 
- 	 */
 
-   BufferedStream(const std::string& taskName, uint32_t bufferSize,
 
-                  uint32_t readThreshold, uint32_t readSize,
 
-                  uint32_t readyThreshold, uint32_t notReadyThreshold,
 
-                  bool waitForReady = false);
 
-   ~BufferedStream() override;
 
-   bool open(const StreamPtr& stream);
 
-   bool open(const StreamReader& newReader, uint32_t initialOffset = 0);
 
-   void close() override;
 
-   // inherited methods
 
-  public:
 
-   /**
 
- 	 * Read len bytes from the buffer to dst. If waitForReady is enabled
 
- 	 * and readAvailable is lower than notReadyThreshold, the function
 
- 	 * will block until readyThreshold bytes is available.
 
- 	 *
 
- 	 * @returns number of bytes copied to dst (might be lower than len,
 
- 	 * if the buffer does not contain len bytes available), or 0 if the source
 
- 	 * stream is already closed and there is no reader attached.
 
- 	 */
 
-   size_t read(uint8_t* dst, size_t len) override;
 
-   size_t skip(size_t len) override;
 
-   size_t position() override;
 
-   size_t size() override;
 
-   // stream status
 
-  public:
 
-   /**
 
- 	 * Total amount of bytes served to read().
 
- 	 */
 
-   uint32_t readTotal;
 
-   /**
 
- 	 * Total amount of bytes read from source.
 
- 	 */
 
-   uint32_t bufferTotal;
 
-   /**
 
- 	 * Amount of bytes available to read from the buffer.
 
- 	 */
 
-   std::atomic<uint32_t> readAvailable;
 
-   /**
 
- 	 * Whether the caller should start reading the data. This indicates that a safe
 
- 	 * amount (determined by readyThreshold) of data is available in the buffer.
 
- 	 */
 
-   bool isReady() const;
 
-   /**
 
- 	 * Whether the caller should stop reading the data. This indicates that the amount of data
 
- 	 * available for reading is decreasing to a non-safe value, as data is being read
 
- 	 * faster than it can be buffered.
 
- 	 */
 
-   bool isNotReady() const;
 
-   /**
 
- 	 * Semaphore that is given when the buffer becomes ready (isReady() == true). Caller can
 
- 	 * wait for the semaphore instead of continuously querying isReady().
 
- 	 */
 
-   bell::WrappedSemaphore readySem;
 
-  private:
 
-   std::mutex runningMutex;
 
-   bool running = false;
 
-   bool terminate = false;
 
-   bell::WrappedSemaphore
 
-       readSem;  // signal to start writing to buffer after reading from it
 
-   std::mutex
 
-       readMutex;  // mutex for locking read operations during writing, and vice versa
 
-   uint32_t bufferSize;
 
-   uint32_t readAt;
 
-   uint32_t readSize;
 
-   uint32_t readyThreshold;
 
-   uint32_t notReadyThreshold;
 
-   bool waitForReady;
 
-   uint8_t* buf;
 
-   uint8_t* bufEnd;
 
-   uint8_t* bufReadPtr;
 
-   uint8_t* bufWritePtr;
 
-   StreamPtr source;
 
-   StreamReader reader;
 
-   void runTask() override;
 
-   void reset();
 
-   uint32_t lengthBetween(uint8_t* me, uint8_t* other);
 
- };
 
 
  |