BufferedStream.h 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. #pragma once
  2. #include <stddef.h> // for size_t
  3. #include <stdint.h> // for uint32_t, uint8_t
  4. #include <atomic> // for atomic
  5. #include <functional> // for function
  6. #include <memory> // for shared_ptr
  7. #include <mutex> // for mutex
  8. #include <string> // for string
  9. #include "BellTask.h" // for Task
  10. #include "ByteStream.h" // for ByteStream
  11. #include "WrappedSemaphore.h" // for WrappedSemaphore
  12. /**
  13. * This class implements a wrapper around an arbitrary bell::ByteStream,
  14. * providing a circular reading buffer with configurable thresholds.
  15. *
  16. * The BufferedStream runs a bell::Task when it's started, so the caller can
  17. * access the buffer's data asynchronously, whenever needed. The buffer is refilled
  18. * automatically from source stream.
  19. *
  20. * The class implements bell::ByteStream's methods, although for proper functioning,
  21. * the caller code should be modified to check isReady() and isNotReady() flags.
  22. *
  23. * If the actual reading code can't be modified, waitForReady allows to wait for buffer readiness
  24. * during reading. Keep in mind that using the semaphore is probably more resource effective.
  25. *
  26. * The source stream (passed to open() or returned by the reader) should implement the read()
  27. * method correctly, such as that 0 is returned if, and only if the stream ends.
  28. */
  29. class BufferedStream : public bell::ByteStream, bell::Task {
  30. public:
  31. typedef std::shared_ptr<bell::ByteStream> StreamPtr;
  32. typedef std::function<StreamPtr(uint32_t rangeStart)> StreamReader;
  33. public:
  34. /**
  35. * @param taskName name to use for the reading task
  36. * @param bufferSize total size of the reading buffer
  37. * @param readThreshold how much can be read before refilling the buffer
  38. * @param readSize amount of bytes to read from the source each time
  39. * @param readyThreshold minimum amount of available bytes to report isReady()
  40. * @param notReadyThreshold maximum amount of available bytes to report isNotReady()
  41. * @param waitForReady whether to wait for the buffer to be ready during reading
  42. * @param endWithSource whether to end the streaming as soon as source returns 0 from read()
  43. */
  44. BufferedStream(const std::string& taskName, uint32_t bufferSize,
  45. uint32_t readThreshold, uint32_t readSize,
  46. uint32_t readyThreshold, uint32_t notReadyThreshold,
  47. bool waitForReady = false);
  48. ~BufferedStream() override;
  49. bool open(const StreamPtr& stream);
  50. bool open(const StreamReader& newReader, uint32_t initialOffset = 0);
  51. void close() override;
  52. // inherited methods
  53. public:
  54. /**
  55. * Read len bytes from the buffer to dst. If waitForReady is enabled
  56. * and readAvailable is lower than notReadyThreshold, the function
  57. * will block until readyThreshold bytes is available.
  58. *
  59. * @returns number of bytes copied to dst (might be lower than len,
  60. * if the buffer does not contain len bytes available), or 0 if the source
  61. * stream is already closed and there is no reader attached.
  62. */
  63. size_t read(uint8_t* dst, size_t len) override;
  64. size_t skip(size_t len) override;
  65. size_t position() override;
  66. size_t size() override;
  67. // stream status
  68. public:
  69. /**
  70. * Total amount of bytes served to read().
  71. */
  72. uint32_t readTotal;
  73. /**
  74. * Total amount of bytes read from source.
  75. */
  76. uint32_t bufferTotal;
  77. /**
  78. * Amount of bytes available to read from the buffer.
  79. */
  80. std::atomic<uint32_t> readAvailable;
  81. /**
  82. * Whether the caller should start reading the data. This indicates that a safe
  83. * amount (determined by readyThreshold) of data is available in the buffer.
  84. */
  85. bool isReady() const;
  86. /**
  87. * Whether the caller should stop reading the data. This indicates that the amount of data
  88. * available for reading is decreasing to a non-safe value, as data is being read
  89. * faster than it can be buffered.
  90. */
  91. bool isNotReady() const;
  92. /**
  93. * Semaphore that is given when the buffer becomes ready (isReady() == true). Caller can
  94. * wait for the semaphore instead of continuously querying isReady().
  95. */
  96. bell::WrappedSemaphore readySem;
  97. private:
  98. std::mutex runningMutex;
  99. bool running = false;
  100. bool terminate = false;
  101. bell::WrappedSemaphore
  102. readSem; // signal to start writing to buffer after reading from it
  103. std::mutex
  104. readMutex; // mutex for locking read operations during writing, and vice versa
  105. uint32_t bufferSize;
  106. uint32_t readAt;
  107. uint32_t readSize;
  108. uint32_t readyThreshold;
  109. uint32_t notReadyThreshold;
  110. bool waitForReady;
  111. uint8_t* buf;
  112. uint8_t* bufEnd;
  113. uint8_t* bufReadPtr;
  114. uint8_t* bufWritePtr;
  115. StreamPtr source;
  116. StreamReader reader;
  117. void runTask() override;
  118. void reset();
  119. uint32_t lengthBetween(uint8_t* me, uint8_t* other);
  120. };