#include "MercuryManager.h" #include #include "Logger.h" std::map MercuryTypeMap({ {MercuryType::GET, "GET"}, {MercuryType::SEND, "SEND"}, {MercuryType::SUB, "SUB"}, {MercuryType::UNSUB, "UNSUB"}, }); MercuryManager::MercuryManager(std::unique_ptr session): bell::Task("mercuryManager", 6 * 1024, 2, 1) { tempMercuryHeader = Header_init_default; this->timeProvider = std::make_shared(); this->callbacks = std::map(); this->subscriptions = std::map(); this->session = std::move(session); this->sequenceId = 0x00000001; this->audioChunkManager = std::make_unique(); this->audioChunkSequence = 0; this->audioKeySequence = 0; this->queue = std::vector>(); queueSemaphore = std::make_unique(200); this->session->shanConn->conn->timeoutHandler = [this]() { return this->timeoutHandler(); }; } MercuryManager::~MercuryManager() { //pb_release(Header_fields, &tempMercuryHeader); } bool MercuryManager::timeoutHandler() { if (!isRunning) return true; auto currentTimestamp = timeProvider->getSyncedTimestamp(); if (this->lastRequestTimestamp != -1 && currentTimestamp - this->lastRequestTimestamp > AUDIOCHUNK_TIMEOUT_MS) { CSPOT_LOG(debug, "Reconnection required, no mercury response"); return true; } if (currentTimestamp - this->lastPingTimestamp > PING_TIMEOUT_MS) { CSPOT_LOG(debug, "Reconnection required, no ping received"); return true; } return false; } void MercuryManager::unregisterMercuryCallback(uint64_t seqId) { auto element = this->callbacks.find(seqId); if (element != this->callbacks.end()) { this->callbacks.erase(element); } } void MercuryManager::requestAudioKey(std::vector trackId, std::vector fileId, audioKeyCallback& audioCallback) { std::lock_guard guard(reconnectionMutex); auto buffer = fileId; this->keyCallback = audioCallback; // Structure: [FILEID] [TRACKID] [4 BYTES SEQUENCE ID] [0x00, 0x00] buffer.insert(buffer.end(), trackId.begin(), trackId.end()); auto audioKeySequence = pack(htonl(this->audioKeySequence)); buffer.insert(buffer.end(), audioKeySequence.begin(), audioKeySequence.end()); auto suffix = std::vector({ 0x00, 0x00 }); buffer.insert(buffer.end(), suffix.begin(), suffix.end()); // Bump audio key sequence this->audioKeySequence += 1; // Used for broken connection detection this->lastRequestTimestamp = timeProvider->getSyncedTimestamp(); this->session->shanConn->sendPacket(static_cast(MercuryType::AUDIO_KEY_REQUEST_COMMAND), buffer); } void MercuryManager::freeAudioKeyCallback() { this->keyCallback = nullptr; } std::shared_ptr MercuryManager::fetchAudioChunk(std::vector fileId, std::vector& audioKey, uint16_t index) { return this->fetchAudioChunk(fileId, audioKey, index * AUDIO_CHUNK_SIZE / 4, (index + 1) * AUDIO_CHUNK_SIZE / 4); } std::shared_ptr MercuryManager::fetchAudioChunk(std::vector fileId, std::vector& audioKey, uint32_t startPos, uint32_t endPos) { std::lock_guard guard(reconnectionMutex); auto sampleStartBytes = pack(htonl(startPos)); auto sampleEndBytes = pack(htonl(endPos)); auto buffer = pack(htons(this->audioChunkSequence)); auto hardcodedData = std::vector( { 0x00, 0x01, // Channel num, currently just hardcoded to 1 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // bytes magic 0x00, 0x00, 0x9C, 0x40, 0x00, 0x02, 0x00, 0x00 }); buffer.insert(buffer.end(), hardcodedData.begin(), hardcodedData.end()); buffer.insert(buffer.end(), fileId.begin(), fileId.end()); buffer.insert(buffer.end(), sampleStartBytes.begin(), sampleStartBytes.end()); buffer.insert(buffer.end(), sampleEndBytes.begin(), sampleEndBytes.end()); // Bump chunk sequence this->audioChunkSequence += 1; this->session->shanConn->sendPacket(static_cast(MercuryType::AUDIO_CHUNK_REQUEST_COMMAND), buffer); // Used for broken connection detection this->lastRequestTimestamp = this->timeProvider->getSyncedTimestamp(); return this->audioChunkManager->registerNewChunk(this->audioChunkSequence - 1, audioKey, startPos, endPos); } void MercuryManager::reconnect() { std::lock_guard guard(this->reconnectionMutex); this->lastPingTimestamp = -1; this->lastRequestTimestamp = -1; RECONNECT: if (!isRunning) return; CSPOT_LOG(debug, "Trying to reconnect..."); try { if (this->session->shanConn->conn != nullptr) { this->session->shanConn->conn->timeoutHandler = nullptr; } this->audioChunkManager->failAllChunks(); if (this->session->authBlob != nullptr) { this->lastAuthBlob = this->session->authBlob; } this->session = std::make_unique(); this->session->connectWithRandomAp(); this->session->authenticate(this->lastAuthBlob); this->session->shanConn->conn->timeoutHandler = [this]() { return this->timeoutHandler(); }; CSPOT_LOG(debug, "Reconnected successfuly :)"); } catch (...) { CSPOT_LOG(debug, "Reconnection failed, willl retry in %d secs", RECONNECTION_RETRY_MS / 1000); usleep(RECONNECTION_RETRY_MS * 1000); goto RECONNECT; //reconnect(); } } void MercuryManager::runTask() { std::scoped_lock lock(this->runningMutex); // Listen for mercury replies and handle them accordingly isRunning = true; while (isRunning) { std::unique_ptr packet; try { packet = this->session->shanConn->recvPacket(); } catch (const std::runtime_error& e) { if (!isRunning) break; // Reconnection required this->reconnect(); this->reconnectedCallback(); continue; } if (static_cast(packet->command) == MercuryType::PING) // @TODO: Handle time synchronization through ping { this->timeProvider->syncWithPingPacket(packet->data); this->lastPingTimestamp = this->timeProvider->getSyncedTimestamp(); this->session->shanConn->sendPacket(0x49, packet->data); } else if (static_cast(packet->command) == MercuryType::AUDIO_CHUNK_SUCCESS_RESPONSE) { this->lastRequestTimestamp = -1; this->audioChunkManager->handleChunkData(packet->data, false); } else { this->queue.push_back(std::move(packet)); this->queueSemaphore->give(); } } } void MercuryManager::stop() { std::scoped_lock stop(this->stopMutex); CSPOT_LOG(debug, "Stopping mercury manager"); isRunning = false; audioChunkManager->close(); std::scoped_lock lock(this->runningMutex); CSPOT_LOG(debug, "mercury stopped"); } void MercuryManager::updateQueue() { if (queueSemaphore->twait() == 0) { if (this->queue.size() > 0) { auto packet = std::move(this->queue[0]); this->queue.erase(this->queue.begin()); CSPOT_LOG(debug, "Received packet with code %d of length %d", packet->command, packet->data.size()); switch (static_cast(packet->command)) { case MercuryType::COUNTRY_CODE_RESPONSE: { countryCode = std::string(packet->data.begin(), packet->data.end()); CSPOT_LOG(debug, "Received country code: %s", countryCode.c_str()); break; } case MercuryType::AUDIO_KEY_FAILURE_RESPONSE: case MercuryType::AUDIO_KEY_SUCCESS_RESPONSE: { this->lastRequestTimestamp = -1; // First four bytes mark the sequence id auto seqId = ntohl(extract(packet->data, 0)); if (seqId == (this->audioKeySequence - 1) && this->keyCallback != nullptr) { auto success = static_cast(packet->command) == MercuryType::AUDIO_KEY_SUCCESS_RESPONSE; this->keyCallback(success, packet->data); } break; } case MercuryType::AUDIO_CHUNK_FAILURE_RESPONSE: { CSPOT_LOG(error, "Audio Chunk failure!"); this->audioChunkManager->handleChunkData(packet->data, true); this->lastRequestTimestamp = -1; break; } case MercuryType::SEND: case MercuryType::SUB: case MercuryType::UNSUB: { auto response = std::make_unique(packet->data); if (response->parts.size() > 0) { CSPOT_LOG(debug, " MercuryType::UNSUB response->parts[0].size() = %d", response->parts[0].size()); } if (this->callbacks.count(response->sequenceId) > 0) { auto seqId = response->sequenceId; this->callbacks[response->sequenceId](std::move(response)); this->callbacks.erase(this->callbacks.find(seqId)); } break; } case MercuryType::SUBRES: { auto response = std::make_unique(packet->data); auto uri = std::string(response->mercuryHeader.uri); if (this->subscriptions.count(uri) > 0) { this->subscriptions[uri](std::move(response)); //this->subscriptions.erase(std::string(response->mercuryHeader.uri)); } break; } default: break; } } } } void MercuryManager::handleQueue() { while (isRunning) { this->updateQueue(); } std::scoped_lock lock(this->stopMutex); } uint64_t MercuryManager::execute(MercuryType method, std::string uri, mercuryCallback& callback, mercuryCallback& subscription, mercuryParts& payload) { if (!isRunning) return -1; std::lock_guard guard(reconnectionMutex); // Construct mercury header CSPOT_LOG(debug, "executing MercuryType %s", MercuryTypeMap[method].c_str()); tempMercuryHeader.uri = (char *)(uri.c_str()); tempMercuryHeader.method = (char *)(MercuryTypeMap[method].c_str()); // GET and SEND are actually the same. Therefore the override // The difference between them is only in header's method if (method == MercuryType::GET) { method = MercuryType::SEND; } auto headerBytes = pbEncode(Header_fields, &tempMercuryHeader); // Register a subscription when given method is called if (method == MercuryType::SUB) { this->subscriptions.insert({ uri, subscription }); } this->callbacks.insert({ sequenceId, callback }); // Structure: [Sequence size] [SequenceId] [0x1] [Payloads number] // [Header size] [Header] [Payloads (size + data)] // Pack sequenceId auto sequenceIdBytes = pack(hton64(this->sequenceId)); auto sequenceSizeBytes = pack(htons(sequenceIdBytes.size())); sequenceIdBytes.insert(sequenceIdBytes.begin(), sequenceSizeBytes.begin(), sequenceSizeBytes.end()); sequenceIdBytes.push_back(0x01); auto payloadNum = pack(htons(payload.size() + 1)); sequenceIdBytes.insert(sequenceIdBytes.end(), payloadNum.begin(), payloadNum.end()); auto headerSizePayload = pack(htons(headerBytes.size())); sequenceIdBytes.insert(sequenceIdBytes.end(), headerSizePayload.begin(), headerSizePayload.end()); sequenceIdBytes.insert(sequenceIdBytes.end(), headerBytes.begin(), headerBytes.end()); // Encode all the payload parts for (int x = 0; x < payload.size(); x++) { headerSizePayload = pack(htons(payload[x].size())); sequenceIdBytes.insert(sequenceIdBytes.end(), headerSizePayload.begin(), headerSizePayload.end()); sequenceIdBytes.insert(sequenceIdBytes.end(), payload[x].begin(), payload[x].end()); } // Bump sequence id this->sequenceId += 1; this->session->shanConn->sendPacket(static_cast::type>(method), sequenceIdBytes); return this->sequenceId - 1; } uint64_t MercuryManager::execute(MercuryType method, std::string uri, mercuryCallback& callback, mercuryParts& payload) { mercuryCallback subscription = nullptr; return this->execute(method, uri, callback, subscription, payload); } uint64_t MercuryManager::execute(MercuryType method, std::string uri, mercuryCallback& callback, mercuryCallback& subscription) { auto payload = mercuryParts(0); return this->execute(method, uri, callback, subscription, payload); } uint64_t MercuryManager::execute(MercuryType method, std::string uri, mercuryCallback& callback) { auto payload = mercuryParts(0); return this->execute(method, uri, callback, payload); }