123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- #include "BufferedStream.h"
- #include <stdlib.h> // for free, malloc
- #include <algorithm> // for min
- #include <cstdint> // for uint32_t
- #include <cstring> // for memcpy
- #include <type_traits> // for remove_extent_t
- BufferedStream::BufferedStream(const std::string& taskName, uint32_t bufferSize,
- uint32_t readThreshold, uint32_t readSize,
- uint32_t readyThreshold,
- uint32_t notReadyThreshold, bool waitForReady)
- : bell::Task(taskName, 4096, 5, 0) {
- this->bufferSize = bufferSize;
- this->readAt = bufferSize - readThreshold;
- this->readSize = readSize;
- this->readyThreshold = readyThreshold;
- this->notReadyThreshold = notReadyThreshold;
- this->waitForReady = waitForReady;
- this->buf = static_cast<uint8_t*>(malloc(bufferSize));
- this->bufEnd = buf + bufferSize;
- reset();
- }
- BufferedStream::~BufferedStream() {
- this->close();
- free(buf);
- }
- void BufferedStream::close() {
- this->terminate = true;
- this->readSem.give(); // force a read operation
- const std::lock_guard lock(runningMutex);
- if (this->source)
- this->source->close();
- this->source = nullptr;
- }
- void BufferedStream::reset() {
- this->bufReadPtr = this->buf;
- this->bufWritePtr = this->buf;
- this->readTotal = 0;
- this->bufferTotal = 0;
- this->readAvailable = 0;
- this->terminate = false;
- }
- bool BufferedStream::open(const std::shared_ptr<bell::ByteStream>& stream) {
- if (this->running)
- this->close();
- reset();
- this->source = stream;
- startTask();
- return source.get();
- }
- bool BufferedStream::open(const StreamReader& newReader,
- uint32_t initialOffset) {
- if (this->running)
- this->close();
- reset();
- this->reader = newReader;
- this->bufferTotal = initialOffset;
- startTask();
- return source.get();
- }
- bool BufferedStream::isReady() const {
- return readAvailable >= readyThreshold;
- }
- bool BufferedStream::isNotReady() const {
- return readAvailable < notReadyThreshold;
- }
- size_t BufferedStream::skip(size_t len) {
- return read(nullptr, len);
- }
- size_t BufferedStream::position() {
- return readTotal;
- }
- size_t BufferedStream::size() {
- return source->size();
- }
- uint32_t BufferedStream::lengthBetween(uint8_t* me, uint8_t* other) {
- const std::lock_guard lock(readMutex);
- if (other <= me) {
- // buf .... other ...... me ........ bufEnd
- // buf .... me/other ........ bufEnd
- return bufEnd - me;
- } else {
- // buf ........ me ........ other .... bufEnd
- return other - me;
- }
- }
- size_t BufferedStream::read(uint8_t* dst, size_t len) {
- if (waitForReady && isNotReady()) {
- while ((source || reader) && !isReady()) {
- } // end waiting after termination
- }
- if (!running && !readAvailable) {
- reset();
- return 0;
- }
- uint32_t read = 0;
- uint32_t toReadTotal =
- std::min(readAvailable.load(), static_cast<uint32_t>(len));
- while (toReadTotal > 0) {
- uint32_t toRead =
- std::min(toReadTotal, lengthBetween(bufReadPtr, bufWritePtr));
- if (dst) {
- memcpy(dst, bufReadPtr, toRead);
- dst += toRead;
- }
- readAvailable -= toRead;
- bufReadPtr += toRead;
- if (bufReadPtr >= bufEnd)
- bufReadPtr = buf;
- toReadTotal -= toRead;
- read += toRead;
- readTotal += toRead;
- }
- this->readSem.give();
- return read;
- }
- void BufferedStream::runTask() {
- const std::lock_guard lock(runningMutex);
- running = true;
- if (!source && reader) {
- // get the initial request on the task's thread
- source = reader(this->bufferTotal);
- }
- while (!terminate) {
- if (!source)
- break;
- if (isReady()) {
- // buffer ready, wait for any read operations
- this->readSem.wait();
- }
- if (terminate)
- break;
- if (readAvailable > readAt)
- continue;
- // here, the buffer needs re-filling
- uint32_t len;
- bool wasReady = isReady();
- do {
- uint32_t toRead =
- std::min(readSize, lengthBetween(bufWritePtr, bufReadPtr));
- if (!source) {
- len = 0;
- break;
- }
- len = source->read(bufWritePtr, toRead);
- readAvailable += len;
- bufferTotal += len;
- bufWritePtr += len;
- if (bufWritePtr >= bufEnd) // TODO is == enough here?
- bufWritePtr = buf;
- } while (
- len &&
- readSize <
- bufferSize -
- readAvailable); // loop until there's no more free space in the buffer
- if (!len && reader)
- source = reader(bufferTotal);
- else if (!len)
- terminate = true;
- // signal that buffer is ready for reading
- if (!wasReady && isReady()) {
- this->readySem.give();
- }
- }
- source = nullptr;
- reader = nullptr;
- running = false;
- }
|