BufferedStream.cpp 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. #include "BufferedStream.h"
  2. #include <stdlib.h> // for free, malloc
  3. #include <algorithm> // for min
  4. #include <cstdint> // for uint32_t
  5. #include <cstring> // for memcpy
  6. #include <type_traits> // for remove_extent_t
  7. BufferedStream::BufferedStream(const std::string& taskName, uint32_t bufferSize,
  8. uint32_t readThreshold, uint32_t readSize,
  9. uint32_t readyThreshold,
  10. uint32_t notReadyThreshold, bool waitForReady)
  11. : bell::Task(taskName, 4096, 5, 0) {
  12. this->bufferSize = bufferSize;
  13. this->readAt = bufferSize - readThreshold;
  14. this->readSize = readSize;
  15. this->readyThreshold = readyThreshold;
  16. this->notReadyThreshold = notReadyThreshold;
  17. this->waitForReady = waitForReady;
  18. this->buf = static_cast<uint8_t*>(malloc(bufferSize));
  19. this->bufEnd = buf + bufferSize;
  20. reset();
  21. }
  22. BufferedStream::~BufferedStream() {
  23. this->close();
  24. free(buf);
  25. }
  26. void BufferedStream::close() {
  27. this->terminate = true;
  28. this->readSem.give(); // force a read operation
  29. const std::lock_guard lock(runningMutex);
  30. if (this->source)
  31. this->source->close();
  32. this->source = nullptr;
  33. }
  34. void BufferedStream::reset() {
  35. this->bufReadPtr = this->buf;
  36. this->bufWritePtr = this->buf;
  37. this->readTotal = 0;
  38. this->bufferTotal = 0;
  39. this->readAvailable = 0;
  40. this->terminate = false;
  41. }
  42. bool BufferedStream::open(const std::shared_ptr<bell::ByteStream>& stream) {
  43. if (this->running)
  44. this->close();
  45. reset();
  46. this->source = stream;
  47. startTask();
  48. return source.get();
  49. }
  50. bool BufferedStream::open(const StreamReader& newReader,
  51. uint32_t initialOffset) {
  52. if (this->running)
  53. this->close();
  54. reset();
  55. this->reader = newReader;
  56. this->bufferTotal = initialOffset;
  57. startTask();
  58. return source.get();
  59. }
  60. bool BufferedStream::isReady() const {
  61. return readAvailable >= readyThreshold;
  62. }
  63. bool BufferedStream::isNotReady() const {
  64. return readAvailable < notReadyThreshold;
  65. }
  66. size_t BufferedStream::skip(size_t len) {
  67. return read(nullptr, len);
  68. }
  69. size_t BufferedStream::position() {
  70. return readTotal;
  71. }
  72. size_t BufferedStream::size() {
  73. return source->size();
  74. }
  75. uint32_t BufferedStream::lengthBetween(uint8_t* me, uint8_t* other) {
  76. const std::lock_guard lock(readMutex);
  77. if (other <= me) {
  78. // buf .... other ...... me ........ bufEnd
  79. // buf .... me/other ........ bufEnd
  80. return bufEnd - me;
  81. } else {
  82. // buf ........ me ........ other .... bufEnd
  83. return other - me;
  84. }
  85. }
  86. size_t BufferedStream::read(uint8_t* dst, size_t len) {
  87. if (waitForReady && isNotReady()) {
  88. while ((source || reader) && !isReady()) {
  89. } // end waiting after termination
  90. }
  91. if (!running && !readAvailable) {
  92. reset();
  93. return 0;
  94. }
  95. uint32_t read = 0;
  96. uint32_t toReadTotal =
  97. std::min(readAvailable.load(), static_cast<uint32_t>(len));
  98. while (toReadTotal > 0) {
  99. uint32_t toRead =
  100. std::min(toReadTotal, lengthBetween(bufReadPtr, bufWritePtr));
  101. if (dst) {
  102. memcpy(dst, bufReadPtr, toRead);
  103. dst += toRead;
  104. }
  105. readAvailable -= toRead;
  106. bufReadPtr += toRead;
  107. if (bufReadPtr >= bufEnd)
  108. bufReadPtr = buf;
  109. toReadTotal -= toRead;
  110. read += toRead;
  111. readTotal += toRead;
  112. }
  113. this->readSem.give();
  114. return read;
  115. }
  116. void BufferedStream::runTask() {
  117. const std::lock_guard lock(runningMutex);
  118. running = true;
  119. if (!source && reader) {
  120. // get the initial request on the task's thread
  121. source = reader(this->bufferTotal);
  122. }
  123. while (!terminate) {
  124. if (!source)
  125. break;
  126. if (isReady()) {
  127. // buffer ready, wait for any read operations
  128. this->readSem.wait();
  129. }
  130. if (terminate)
  131. break;
  132. if (readAvailable > readAt)
  133. continue;
  134. // here, the buffer needs re-filling
  135. uint32_t len;
  136. bool wasReady = isReady();
  137. do {
  138. uint32_t toRead =
  139. std::min(readSize, lengthBetween(bufWritePtr, bufReadPtr));
  140. if (!source) {
  141. len = 0;
  142. break;
  143. }
  144. len = source->read(bufWritePtr, toRead);
  145. readAvailable += len;
  146. bufferTotal += len;
  147. bufWritePtr += len;
  148. if (bufWritePtr >= bufEnd) // TODO is == enough here?
  149. bufWritePtr = buf;
  150. } while (
  151. len &&
  152. readSize <
  153. bufferSize -
  154. readAvailable); // loop until there's no more free space in the buffer
  155. if (!len && reader)
  156. source = reader(bufferTotal);
  157. else if (!len)
  158. terminate = true;
  159. // signal that buffer is ready for reading
  160. if (!wasReady && isReady()) {
  161. this->readySem.give();
  162. }
  163. }
  164. source = nullptr;
  165. reader = nullptr;
  166. running = false;
  167. }