| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 | 
							- #include "BufferedStream.h"
 
- #include <stdlib.h>     // for free, malloc
 
- #include <algorithm>    // for min
 
- #include <cstdint>      // for uint32_t
 
- #include <cstring>      // for memcpy
 
- #include <type_traits>  // for remove_extent_t
 
- BufferedStream::BufferedStream(const std::string& taskName, uint32_t bufferSize,
 
-                                uint32_t readThreshold, uint32_t readSize,
 
-                                uint32_t readyThreshold,
 
-                                uint32_t notReadyThreshold, bool waitForReady)
 
-     : bell::Task(taskName, 4096, 5, 0) {
 
-   this->bufferSize = bufferSize;
 
-   this->readAt = bufferSize - readThreshold;
 
-   this->readSize = readSize;
 
-   this->readyThreshold = readyThreshold;
 
-   this->notReadyThreshold = notReadyThreshold;
 
-   this->waitForReady = waitForReady;
 
-   this->buf = static_cast<uint8_t*>(malloc(bufferSize));
 
-   this->bufEnd = buf + bufferSize;
 
-   reset();
 
- }
 
- BufferedStream::~BufferedStream() {
 
-   this->close();
 
-   free(buf);
 
- }
 
- void BufferedStream::close() {
 
-   this->terminate = true;
 
-   this->readSem.give();  // force a read operation
 
-   const std::lock_guard lock(runningMutex);
 
-   if (this->source)
 
-     this->source->close();
 
-   this->source = nullptr;
 
- }
 
- void BufferedStream::reset() {
 
-   this->bufReadPtr = this->buf;
 
-   this->bufWritePtr = this->buf;
 
-   this->readTotal = 0;
 
-   this->bufferTotal = 0;
 
-   this->readAvailable = 0;
 
-   this->terminate = false;
 
- }
 
- bool BufferedStream::open(const std::shared_ptr<bell::ByteStream>& stream) {
 
-   if (this->running)
 
-     this->close();
 
-   reset();
 
-   this->source = stream;
 
-   startTask();
 
-   return source.get();
 
- }
 
- bool BufferedStream::open(const StreamReader& newReader,
 
-                           uint32_t initialOffset) {
 
-   if (this->running)
 
-     this->close();
 
-   reset();
 
-   this->reader = newReader;
 
-   this->bufferTotal = initialOffset;
 
-   startTask();
 
-   return source.get();
 
- }
 
- bool BufferedStream::isReady() const {
 
-   return readAvailable >= readyThreshold;
 
- }
 
- bool BufferedStream::isNotReady() const {
 
-   return readAvailable < notReadyThreshold;
 
- }
 
- size_t BufferedStream::skip(size_t len) {
 
-   return read(nullptr, len);
 
- }
 
- size_t BufferedStream::position() {
 
-   return readTotal;
 
- }
 
- size_t BufferedStream::size() {
 
-   return source->size();
 
- }
 
- uint32_t BufferedStream::lengthBetween(uint8_t* me, uint8_t* other) {
 
-   const std::lock_guard lock(readMutex);
 
-   if (other <= me) {
 
-     // buf .... other ...... me ........ bufEnd
 
-     // buf .... me/other ........ bufEnd
 
-     return bufEnd - me;
 
-   } else {
 
-     // buf ........ me ........ other .... bufEnd
 
-     return other - me;
 
-   }
 
- }
 
- size_t BufferedStream::read(uint8_t* dst, size_t len) {
 
-   if (waitForReady && isNotReady()) {
 
-     while ((source || reader) && !isReady()) {
 
-     }  // end waiting after termination
 
-   }
 
-   if (!running && !readAvailable) {
 
-     reset();
 
-     return 0;
 
-   }
 
-   uint32_t read = 0;
 
-   uint32_t toReadTotal =
 
-       std::min(readAvailable.load(), static_cast<uint32_t>(len));
 
-   while (toReadTotal > 0) {
 
-     uint32_t toRead =
 
-         std::min(toReadTotal, lengthBetween(bufReadPtr, bufWritePtr));
 
-     if (dst) {
 
-       memcpy(dst, bufReadPtr, toRead);
 
-       dst += toRead;
 
-     }
 
-     readAvailable -= toRead;
 
-     bufReadPtr += toRead;
 
-     if (bufReadPtr >= bufEnd)
 
-       bufReadPtr = buf;
 
-     toReadTotal -= toRead;
 
-     read += toRead;
 
-     readTotal += toRead;
 
-   }
 
-   this->readSem.give();
 
-   return read;
 
- }
 
- void BufferedStream::runTask() {
 
-   const std::lock_guard lock(runningMutex);
 
-   running = true;
 
-   if (!source && reader) {
 
-     // get the initial request on the task's thread
 
-     source = reader(this->bufferTotal);
 
-   }
 
-   while (!terminate) {
 
-     if (!source)
 
-       break;
 
-     if (isReady()) {
 
-       // buffer ready, wait for any read operations
 
-       this->readSem.wait();
 
-     }
 
-     if (terminate)
 
-       break;
 
-     if (readAvailable > readAt)
 
-       continue;
 
-     // here, the buffer needs re-filling
 
-     uint32_t len;
 
-     bool wasReady = isReady();
 
-     do {
 
-       uint32_t toRead =
 
-           std::min(readSize, lengthBetween(bufWritePtr, bufReadPtr));
 
-       if (!source) {
 
-         len = 0;
 
-         break;
 
-       }
 
-       len = source->read(bufWritePtr, toRead);
 
-       readAvailable += len;
 
-       bufferTotal += len;
 
-       bufWritePtr += len;
 
-       if (bufWritePtr >= bufEnd)  // TODO is == enough here?
 
-         bufWritePtr = buf;
 
-     } while (
 
-         len &&
 
-         readSize <
 
-             bufferSize -
 
-                 readAvailable);  // loop until there's no more free space in the buffer
 
-     if (!len && reader)
 
-       source = reader(bufferTotal);
 
-     else if (!len)
 
-       terminate = true;
 
-     // signal that buffer is ready for reading
 
-     if (!wasReady && isReady()) {
 
-       this->readySem.give();
 
-     }
 
-   }
 
-   source = nullptr;
 
-   reader = nullptr;
 
-   running = false;
 
- }
 
 
  |