BufferedStream.cpp 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. // Copyright (c) Kuba Szczodrzyński 2022-1-7.
  2. #include "BufferedStream.h"
  3. #include <cstring>
  4. BufferedStream::BufferedStream(
  5. const std::string &taskName,
  6. uint32_t bufferSize,
  7. uint32_t readThreshold,
  8. uint32_t readSize,
  9. uint32_t readyThreshold,
  10. uint32_t notReadyThreshold,
  11. bool waitForReady)
  12. : bell::Task(taskName, 4096, 5, 0) {
  13. this->bufferSize = bufferSize;
  14. this->readAt = bufferSize - readThreshold;
  15. this->readSize = readSize;
  16. this->readyThreshold = readyThreshold;
  17. this->notReadyThreshold = notReadyThreshold;
  18. this->waitForReady = waitForReady;
  19. this->buf = static_cast<uint8_t *>(malloc(bufferSize));
  20. this->bufEnd = buf + bufferSize;
  21. reset();
  22. }
  23. BufferedStream::~BufferedStream() {
  24. this->close();
  25. free(buf);
  26. }
  27. void BufferedStream::close() {
  28. this->terminate = true;
  29. this->readSem.give(); // force a read operation
  30. const std::lock_guard lock(runningMutex);
  31. if (this->source)
  32. this->source->close();
  33. this->source = nullptr;
  34. }
  35. void BufferedStream::reset() {
  36. this->bufReadPtr = this->buf;
  37. this->bufWritePtr = this->buf;
  38. this->readTotal = 0;
  39. this->bufferTotal = 0;
  40. this->readAvailable = 0;
  41. this->terminate = false;
  42. }
  43. bool BufferedStream::open(const std::shared_ptr<bell::ByteStream> &stream) {
  44. if (this->running)
  45. this->close();
  46. reset();
  47. this->source = stream;
  48. startTask();
  49. return source.get();
  50. }
  51. bool BufferedStream::open(const StreamReader &newReader, uint32_t initialOffset) {
  52. if (this->running)
  53. this->close();
  54. reset();
  55. this->reader = newReader;
  56. this->bufferTotal = initialOffset;
  57. startTask();
  58. return source.get();
  59. }
  60. bool BufferedStream::isReady() const {
  61. return readAvailable >= readyThreshold;
  62. }
  63. bool BufferedStream::isNotReady() const {
  64. return readAvailable < notReadyThreshold;
  65. }
  66. size_t BufferedStream::skip(size_t len) {
  67. return read(nullptr, len);
  68. }
  69. size_t BufferedStream::position() {
  70. return readTotal;
  71. }
  72. size_t BufferedStream::size() {
  73. return source->size();
  74. }
  75. uint32_t BufferedStream::lengthBetween(uint8_t *me, uint8_t *other) {
  76. const std::lock_guard lock(readMutex);
  77. if (other <= me) {
  78. // buf .... other ...... me ........ bufEnd
  79. // buf .... me/other ........ bufEnd
  80. return bufEnd - me;
  81. } else {
  82. // buf ........ me ........ other .... bufEnd
  83. return other - me;
  84. }
  85. }
  86. size_t BufferedStream::read(uint8_t *dst, size_t len) {
  87. if (waitForReady && isNotReady()) {
  88. while ((source || reader) && !isReady()) {} // end waiting after termination
  89. }
  90. if (!running && !readAvailable) {
  91. reset();
  92. return 0;
  93. }
  94. uint32_t read = 0;
  95. uint32_t toReadTotal = std::min(readAvailable.load(), static_cast<uint32_t>(len));
  96. while (toReadTotal > 0) {
  97. uint32_t toRead = std::min(toReadTotal, lengthBetween(bufReadPtr, bufWritePtr));
  98. if (dst) {
  99. memcpy(dst, bufReadPtr, toRead);
  100. dst += toRead;
  101. }
  102. readAvailable -= toRead;
  103. bufReadPtr += toRead;
  104. if (bufReadPtr >= bufEnd)
  105. bufReadPtr = buf;
  106. toReadTotal -= toRead;
  107. read += toRead;
  108. readTotal += toRead;
  109. }
  110. this->readSem.give();
  111. return read;
  112. }
  113. void BufferedStream::runTask() {
  114. const std::lock_guard lock(runningMutex);
  115. running = true;
  116. if (!source && reader) {
  117. // get the initial request on the task's thread
  118. source = reader(this->bufferTotal);
  119. }
  120. while (!terminate) {
  121. if (!source)
  122. break;
  123. if (isReady()) {
  124. // buffer ready, wait for any read operations
  125. this->readSem.wait();
  126. }
  127. if (terminate)
  128. break;
  129. if (readAvailable > readAt)
  130. continue;
  131. // here, the buffer needs re-filling
  132. uint32_t len;
  133. bool wasReady = isReady();
  134. do {
  135. uint32_t toRead = std::min(readSize, lengthBetween(bufWritePtr, bufReadPtr));
  136. if (!source) {
  137. len = 0;
  138. break;
  139. }
  140. len = source->read(bufWritePtr, toRead);
  141. readAvailable += len;
  142. bufferTotal += len;
  143. bufWritePtr += len;
  144. if (bufWritePtr >= bufEnd) // TODO is == enough here?
  145. bufWritePtr = buf;
  146. } while (len && readSize < bufferSize - readAvailable); // loop until there's no more free space in the buffer
  147. if (!len && reader)
  148. source = reader(bufferTotal);
  149. else if (!len)
  150. terminate = true;
  151. // signal that buffer is ready for reading
  152. if (!wasReady && isReady()) {
  153. this->readySem.give();
  154. }
  155. }
  156. source = nullptr;
  157. reader = nullptr;
  158. running = false;
  159. }