BufferedStream.cpp 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. #include "BufferedStream.h"
  2. #include <cstring>
  3. BufferedStream::BufferedStream(
  4. const std::string &taskName,
  5. uint32_t bufferSize,
  6. uint32_t readThreshold,
  7. uint32_t readSize,
  8. uint32_t readyThreshold,
  9. uint32_t notReadyThreshold,
  10. bool waitForReady)
  11. : bell::Task(taskName, 4096, 5, 0) {
  12. this->bufferSize = bufferSize;
  13. this->readAt = bufferSize - readThreshold;
  14. this->readSize = readSize;
  15. this->readyThreshold = readyThreshold;
  16. this->notReadyThreshold = notReadyThreshold;
  17. this->waitForReady = waitForReady;
  18. this->buf = static_cast<uint8_t *>(malloc(bufferSize));
  19. this->bufEnd = buf + bufferSize;
  20. reset();
  21. }
  22. BufferedStream::~BufferedStream() {
  23. this->close();
  24. free(buf);
  25. }
  26. void BufferedStream::close() {
  27. this->terminate = true;
  28. this->readSem.give(); // force a read operation
  29. const std::lock_guard lock(runningMutex);
  30. if (this->source)
  31. this->source->close();
  32. this->source = nullptr;
  33. }
  34. void BufferedStream::reset() {
  35. this->bufReadPtr = this->buf;
  36. this->bufWritePtr = this->buf;
  37. this->readTotal = 0;
  38. this->bufferTotal = 0;
  39. this->readAvailable = 0;
  40. this->terminate = false;
  41. }
  42. bool BufferedStream::open(const std::shared_ptr<bell::ByteStream> &stream) {
  43. if (this->running)
  44. this->close();
  45. reset();
  46. this->source = stream;
  47. startTask();
  48. return source.get();
  49. }
  50. bool BufferedStream::open(const StreamReader &newReader, uint32_t initialOffset) {
  51. if (this->running)
  52. this->close();
  53. reset();
  54. this->reader = newReader;
  55. this->bufferTotal = initialOffset;
  56. startTask();
  57. return source.get();
  58. }
  59. bool BufferedStream::isReady() const {
  60. return readAvailable >= readyThreshold;
  61. }
  62. bool BufferedStream::isNotReady() const {
  63. return readAvailable < notReadyThreshold;
  64. }
  65. size_t BufferedStream::skip(size_t len) {
  66. return read(nullptr, len);
  67. }
  68. size_t BufferedStream::position() {
  69. return readTotal;
  70. }
  71. size_t BufferedStream::size() {
  72. return source->size();
  73. }
  74. uint32_t BufferedStream::lengthBetween(uint8_t *me, uint8_t *other) {
  75. const std::lock_guard lock(readMutex);
  76. if (other <= me) {
  77. // buf .... other ...... me ........ bufEnd
  78. // buf .... me/other ........ bufEnd
  79. return bufEnd - me;
  80. } else {
  81. // buf ........ me ........ other .... bufEnd
  82. return other - me;
  83. }
  84. }
  85. size_t BufferedStream::read(uint8_t *dst, size_t len) {
  86. if (waitForReady && isNotReady()) {
  87. while ((source || reader) && !isReady()) {} // end waiting after termination
  88. }
  89. if (!running && !readAvailable) {
  90. reset();
  91. return 0;
  92. }
  93. uint32_t read = 0;
  94. uint32_t toReadTotal = std::min(readAvailable.load(), static_cast<uint32_t>(len));
  95. while (toReadTotal > 0) {
  96. uint32_t toRead = std::min(toReadTotal, lengthBetween(bufReadPtr, bufWritePtr));
  97. if (dst) {
  98. memcpy(dst, bufReadPtr, toRead);
  99. dst += toRead;
  100. }
  101. readAvailable -= toRead;
  102. bufReadPtr += toRead;
  103. if (bufReadPtr >= bufEnd)
  104. bufReadPtr = buf;
  105. toReadTotal -= toRead;
  106. read += toRead;
  107. readTotal += toRead;
  108. }
  109. this->readSem.give();
  110. return read;
  111. }
  112. void BufferedStream::runTask() {
  113. const std::lock_guard lock(runningMutex);
  114. running = true;
  115. if (!source && reader) {
  116. // get the initial request on the task's thread
  117. source = reader(this->bufferTotal);
  118. }
  119. while (!terminate) {
  120. if (!source)
  121. break;
  122. if (isReady()) {
  123. // buffer ready, wait for any read operations
  124. this->readSem.wait();
  125. }
  126. if (terminate)
  127. break;
  128. if (readAvailable > readAt)
  129. continue;
  130. // here, the buffer needs re-filling
  131. uint32_t len;
  132. bool wasReady = isReady();
  133. do {
  134. uint32_t toRead = std::min(readSize, lengthBetween(bufWritePtr, bufReadPtr));
  135. if (!source) {
  136. len = 0;
  137. break;
  138. }
  139. len = source->read(bufWritePtr, toRead);
  140. readAvailable += len;
  141. bufferTotal += len;
  142. bufWritePtr += len;
  143. if (bufWritePtr >= bufEnd) // TODO is == enough here?
  144. bufWritePtr = buf;
  145. } while (len && readSize < bufferSize - readAvailable); // loop until there's no more free space in the buffer
  146. if (!len && reader)
  147. source = reader(bufferTotal);
  148. else if (!len)
  149. terminate = true;
  150. // signal that buffer is ready for reading
  151. if (!wasReady && isReady()) {
  152. this->readySem.give();
  153. }
  154. }
  155. source = nullptr;
  156. reader = nullptr;
  157. running = false;
  158. }