MercuryManager.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. #include "MercuryManager.h"
  2. #include <iostream>
  3. #include "Logger.h"
  4. std::map<MercuryType, std::string> MercuryTypeMap({
  5. {MercuryType::GET, "GET"},
  6. {MercuryType::SEND, "SEND"},
  7. {MercuryType::SUB, "SUB"},
  8. {MercuryType::UNSUB, "UNSUB"},
  9. });
  10. MercuryManager::MercuryManager(std::unique_ptr<Session> session): bell::Task("mercuryManager", 6 * 1024, 0, 1)
  11. {
  12. tempMercuryHeader = {};
  13. this->timeProvider = std::make_shared<TimeProvider>();
  14. this->callbacks = std::map<uint64_t, mercuryCallback>();
  15. this->subscriptions = std::map<std::string, mercuryCallback>();
  16. this->session = std::move(session);
  17. this->sequenceId = 0x00000001;
  18. this->audioChunkManager = std::make_unique<AudioChunkManager>();
  19. this->audioChunkSequence = 0;
  20. this->audioKeySequence = 0;
  21. this->queue = std::vector<std::unique_ptr<Packet>>();
  22. queueSemaphore = std::make_unique<WrappedSemaphore>(200);
  23. this->session->shanConn->conn->timeoutHandler = [this]() {
  24. return this->timeoutHandler();
  25. };
  26. }
  27. MercuryManager::~MercuryManager()
  28. {
  29. //pb_release(Header_fields, &tempMercuryHeader);
  30. }
  31. bool MercuryManager::timeoutHandler()
  32. {
  33. if (!isRunning) return true;
  34. auto currentTimestamp = timeProvider->getSyncedTimestamp();
  35. if (this->lastRequestTimestamp != -1 && currentTimestamp - this->lastRequestTimestamp > AUDIOCHUNK_TIMEOUT_MS)
  36. {
  37. CSPOT_LOG(debug, "Reconnection required, no mercury response");
  38. return true;
  39. }
  40. if (currentTimestamp - this->lastPingTimestamp > PING_TIMEOUT_MS)
  41. {
  42. CSPOT_LOG(debug, "Reconnection required, no ping received");
  43. return true;
  44. }
  45. return false;
  46. }
  47. void MercuryManager::unregisterMercuryCallback(uint64_t seqId)
  48. {
  49. auto element = this->callbacks.find(seqId);
  50. if (element != this->callbacks.end())
  51. {
  52. this->callbacks.erase(element);
  53. }
  54. }
  55. void MercuryManager::requestAudioKey(std::vector<uint8_t> trackId, std::vector<uint8_t> fileId, audioKeyCallback& audioCallback)
  56. {
  57. std::lock_guard<std::mutex> guard(reconnectionMutex);
  58. auto buffer = fileId;
  59. this->keyCallback = audioCallback;
  60. // Structure: [FILEID] [TRACKID] [4 BYTES SEQUENCE ID] [0x00, 0x00]
  61. buffer.insert(buffer.end(), trackId.begin(), trackId.end());
  62. auto audioKeySequence = pack<uint32_t>(htonl(this->audioKeySequence));
  63. buffer.insert(buffer.end(), audioKeySequence.begin(), audioKeySequence.end());
  64. auto suffix = std::vector<uint8_t>({ 0x00, 0x00 });
  65. buffer.insert(buffer.end(), suffix.begin(), suffix.end());
  66. // Bump audio key sequence
  67. this->audioKeySequence += 1;
  68. // Used for broken connection detection
  69. this->lastRequestTimestamp = timeProvider->getSyncedTimestamp();
  70. this->session->shanConn->sendPacket(static_cast<uint8_t>(MercuryType::AUDIO_KEY_REQUEST_COMMAND), buffer);
  71. }
  72. void MercuryManager::freeAudioKeyCallback()
  73. {
  74. this->keyCallback = nullptr;
  75. }
  76. std::shared_ptr<AudioChunk> MercuryManager::fetchAudioChunk(std::vector<uint8_t> fileId, std::vector<uint8_t>& audioKey, uint16_t index)
  77. {
  78. return this->fetchAudioChunk(fileId, audioKey, index * AUDIO_CHUNK_SIZE / 4, (index + 1) * AUDIO_CHUNK_SIZE / 4);
  79. }
  80. std::shared_ptr<AudioChunk> MercuryManager::fetchAudioChunk(std::vector<uint8_t> fileId, std::vector<uint8_t>& audioKey, uint32_t startPos, uint32_t endPos)
  81. {
  82. std::lock_guard<std::mutex> guard(reconnectionMutex);
  83. auto sampleStartBytes = pack<uint32_t>(htonl(startPos));
  84. auto sampleEndBytes = pack<uint32_t>(htonl(endPos));
  85. auto buffer = pack<uint16_t>(htons(this->audioChunkSequence));
  86. auto hardcodedData = std::vector<uint8_t>(
  87. { 0x00, 0x01, // Channel num, currently just hardcoded to 1
  88. 0x00, 0x00,
  89. 0x00, 0x00, 0x00, 0x00, // bytes magic
  90. 0x00, 0x00, 0x9C, 0x40,
  91. 0x00, 0x02, 0x00, 0x00 });
  92. buffer.insert(buffer.end(), hardcodedData.begin(), hardcodedData.end());
  93. buffer.insert(buffer.end(), fileId.begin(), fileId.end());
  94. buffer.insert(buffer.end(), sampleStartBytes.begin(), sampleStartBytes.end());
  95. buffer.insert(buffer.end(), sampleEndBytes.begin(), sampleEndBytes.end());
  96. // Bump chunk sequence
  97. this->audioChunkSequence += 1;
  98. this->session->shanConn->sendPacket(static_cast<uint8_t>(MercuryType::AUDIO_CHUNK_REQUEST_COMMAND), buffer);
  99. // Used for broken connection detection
  100. this->lastRequestTimestamp = this->timeProvider->getSyncedTimestamp();
  101. return this->audioChunkManager->registerNewChunk(this->audioChunkSequence - 1, audioKey, startPos, endPos);
  102. }
  103. void MercuryManager::reconnect()
  104. {
  105. std::lock_guard<std::mutex> guard(this->reconnectionMutex);
  106. this->lastPingTimestamp = -1;
  107. this->lastRequestTimestamp = -1;
  108. RECONNECT:
  109. if (!isRunning) return;
  110. CSPOT_LOG(debug, "Trying to reconnect...");
  111. try
  112. {
  113. if (this->session->shanConn->conn != nullptr)
  114. {
  115. this->session->shanConn->conn->timeoutHandler = nullptr;
  116. }
  117. this->audioChunkManager->failAllChunks();
  118. if (this->session->authBlob != nullptr)
  119. {
  120. this->lastAuthBlob = this->session->authBlob;
  121. }
  122. this->session = std::make_unique<Session>();
  123. this->session->connectWithRandomAp();
  124. this->session->authenticate(this->lastAuthBlob);
  125. this->session->shanConn->conn->timeoutHandler = [this]() {
  126. return this->timeoutHandler();
  127. };
  128. CSPOT_LOG(debug, "Reconnected successfuly :)");
  129. }
  130. catch (...)
  131. {
  132. CSPOT_LOG(debug, "Reconnection failed, willl retry in %d secs", RECONNECTION_RETRY_MS / 1000);
  133. usleep(RECONNECTION_RETRY_MS * 1000);
  134. goto RECONNECT;
  135. //reconnect();
  136. }
  137. }
  138. void MercuryManager::runTask()
  139. {
  140. std::scoped_lock lock(this->runningMutex);
  141. // Listen for mercury replies and handle them accordingly
  142. isRunning = true;
  143. while (isRunning)
  144. {
  145. std::unique_ptr<Packet> packet;
  146. try
  147. {
  148. packet = this->session->shanConn->recvPacket();
  149. }
  150. catch (const std::runtime_error& e)
  151. {
  152. if (!isRunning) break;
  153. // Reconnection required
  154. this->reconnect();
  155. this->reconnectedCallback();
  156. continue;
  157. }
  158. if (static_cast<MercuryType>(packet->command) == MercuryType::PING) // @TODO: Handle time synchronization through ping
  159. {
  160. this->timeProvider->syncWithPingPacket(packet->data);
  161. this->lastPingTimestamp = this->timeProvider->getSyncedTimestamp();
  162. this->session->shanConn->sendPacket(0x49, packet->data);
  163. }
  164. else if (static_cast<MercuryType>(packet->command) == MercuryType::AUDIO_CHUNK_SUCCESS_RESPONSE)
  165. {
  166. this->lastRequestTimestamp = -1;
  167. this->audioChunkManager->handleChunkData(packet->data, false);
  168. }
  169. else
  170. {
  171. this->queue.push_back(std::move(packet));
  172. this->queueSemaphore->give();
  173. }
  174. }
  175. }
  176. void MercuryManager::stop() {
  177. std::scoped_lock stop(this->stopMutex);
  178. CSPOT_LOG(debug, "Stopping mercury manager");
  179. isRunning = false;
  180. audioChunkManager->close();
  181. std::scoped_lock lock(this->runningMutex);
  182. CSPOT_LOG(debug, "mercury stopped");
  183. }
  184. void MercuryManager::updateQueue() {
  185. if (queueSemaphore->twait() == 0) {
  186. if (this->queue.size() > 0)
  187. {
  188. std::unique_ptr<Packet> packet = std::move(this->queue[0]);
  189. this->queue.erase(this->queue.begin());
  190. if(packet == nullptr){
  191. return;
  192. }
  193. CSPOT_LOG(debug, "Received packet with code %d of length %d", packet->command, packet->data.size());
  194. switch (static_cast<MercuryType>(packet->command))
  195. {
  196. case MercuryType::COUNTRY_CODE_RESPONSE:
  197. {
  198. memcpy(countryCode, packet->data.data(), 2);
  199. CSPOT_LOG(debug, "Received country code: %.2s", countryCode);
  200. break;
  201. }
  202. case MercuryType::AUDIO_KEY_FAILURE_RESPONSE:
  203. case MercuryType::AUDIO_KEY_SUCCESS_RESPONSE:
  204. {
  205. this->lastRequestTimestamp = -1;
  206. // First four bytes mark the sequence id
  207. auto seqId = ntohl(extract<uint32_t>(packet->data, 0));
  208. if (seqId == (this->audioKeySequence - 1) && this->keyCallback != nullptr)
  209. {
  210. auto success = static_cast<MercuryType>(packet->command) == MercuryType::AUDIO_KEY_SUCCESS_RESPONSE;
  211. this->keyCallback(success, packet->data);
  212. }
  213. break;
  214. }
  215. case MercuryType::AUDIO_CHUNK_FAILURE_RESPONSE:
  216. {
  217. CSPOT_LOG(error, "Audio Chunk failure!");
  218. this->audioChunkManager->handleChunkData(packet->data, true);
  219. this->lastRequestTimestamp = -1;
  220. break;
  221. }
  222. case MercuryType::SEND:
  223. case MercuryType::SUB:
  224. case MercuryType::UNSUB:
  225. {
  226. auto response = std::make_unique<MercuryResponse>(packet->data);
  227. if (response->parts.size() > 0)
  228. {
  229. CSPOT_LOG(debug, " MercuryType::UNSUB response->parts[0].size() = %d", response->parts[0].size());
  230. }
  231. if (this->callbacks.count(response->sequenceId) > 0)
  232. {
  233. auto seqId = response->sequenceId;
  234. this->callbacks[response->sequenceId](std::move(response));
  235. this->callbacks.erase(this->callbacks.find(seqId));
  236. }
  237. break;
  238. }
  239. case MercuryType::SUBRES:
  240. {
  241. auto response = std::make_unique<MercuryResponse>(packet->data);
  242. auto uri = std::string(response->mercuryHeader.uri);
  243. if (this->subscriptions.count(uri) > 0)
  244. {
  245. this->subscriptions[uri](std::move(response));
  246. //this->subscriptions.erase(std::string(response->mercuryHeader.uri));
  247. }
  248. break;
  249. }
  250. default:
  251. break;
  252. }
  253. }
  254. }
  255. }
  256. void MercuryManager::handleQueue()
  257. {
  258. while (isRunning)
  259. {
  260. this->updateQueue();
  261. }
  262. std::scoped_lock lock(this->stopMutex);
  263. }
  264. uint64_t MercuryManager::execute(MercuryType method, std::string uri, mercuryCallback& callback, mercuryCallback& subscription, mercuryParts& payload)
  265. {
  266. if (!isRunning) return -1;
  267. std::lock_guard<std::mutex> guard(reconnectionMutex);
  268. // Construct mercury header
  269. CSPOT_LOG(debug, "executing MercuryType %s", MercuryTypeMap[method].c_str());
  270. pbPutString(uri, tempMercuryHeader.uri);
  271. pbPutString(MercuryTypeMap[method], tempMercuryHeader.method);
  272. tempMercuryHeader.has_method = true;
  273. tempMercuryHeader.has_uri = true;
  274. // GET and SEND are actually the same. Therefore the override
  275. // The difference between them is only in header's method
  276. if (method == MercuryType::GET)
  277. {
  278. method = MercuryType::SEND;
  279. }
  280. auto headerBytes = pbEncode(Header_fields, &tempMercuryHeader);
  281. // Register a subscription when given method is called
  282. if (method == MercuryType::SUB)
  283. {
  284. this->subscriptions.insert({ uri, subscription });
  285. }
  286. this->callbacks.insert({ sequenceId, callback });
  287. // Structure: [Sequence size] [SequenceId] [0x1] [Payloads number]
  288. // [Header size] [Header] [Payloads (size + data)]
  289. // Pack sequenceId
  290. auto sequenceIdBytes = pack<uint64_t>(hton64(this->sequenceId));
  291. auto sequenceSizeBytes = pack<uint16_t>(htons(sequenceIdBytes.size()));
  292. sequenceIdBytes.insert(sequenceIdBytes.begin(), sequenceSizeBytes.begin(), sequenceSizeBytes.end());
  293. sequenceIdBytes.push_back(0x01);
  294. auto payloadNum = pack<uint16_t>(htons(payload.size() + 1));
  295. sequenceIdBytes.insert(sequenceIdBytes.end(), payloadNum.begin(), payloadNum.end());
  296. auto headerSizePayload = pack<uint16_t>(htons(headerBytes.size()));
  297. sequenceIdBytes.insert(sequenceIdBytes.end(), headerSizePayload.begin(), headerSizePayload.end());
  298. sequenceIdBytes.insert(sequenceIdBytes.end(), headerBytes.begin(), headerBytes.end());
  299. // Encode all the payload parts
  300. for (int x = 0; x < payload.size(); x++)
  301. {
  302. headerSizePayload = pack<uint16_t>(htons(payload[x].size()));
  303. sequenceIdBytes.insert(sequenceIdBytes.end(), headerSizePayload.begin(), headerSizePayload.end());
  304. sequenceIdBytes.insert(sequenceIdBytes.end(), payload[x].begin(), payload[x].end());
  305. }
  306. // Bump sequence id
  307. this->sequenceId += 1;
  308. this->session->shanConn->sendPacket(static_cast<std::underlying_type<MercuryType>::type>(method), sequenceIdBytes);
  309. return this->sequenceId - 1;
  310. }
  311. uint64_t MercuryManager::execute(MercuryType method, std::string uri, mercuryCallback& callback, mercuryParts& payload)
  312. {
  313. mercuryCallback subscription = nullptr;
  314. return this->execute(method, uri, callback, subscription, payload);
  315. }
  316. uint64_t MercuryManager::execute(MercuryType method, std::string uri, mercuryCallback& callback, mercuryCallback& subscription)
  317. {
  318. auto payload = mercuryParts(0);
  319. return this->execute(method, uri, callback, subscription, payload);
  320. }
  321. uint64_t MercuryManager::execute(MercuryType method, std::string uri, mercuryCallback& callback)
  322. {
  323. auto payload = mercuryParts(0);
  324. return this->execute(method, uri, callback, payload);
  325. }