rtp.c 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864
  1. /*
  2. * HairTunes - RAOP packet handler and slave-clocked replay engine
  3. * Copyright (c) James Laird 2011
  4. * All rights reserved.
  5. *
  6. * Modularisation: philippe_44@outlook.com, 2019
  7. *
  8. * Permission is hereby granted, free of charge, to any person
  9. * obtaining a copy of this software and associated documentation
  10. * files (the "Software"), to deal in the Software without
  11. * restriction, including without limitation the rights to use,
  12. * copy, modify, merge, publish, distribute, sublicense, and/or
  13. * sell copies of the Software, and to permit persons to whom the
  14. * Software is furnished to do so, subject to the following conditions:
  15. *
  16. * The above copyright notice and this permission notice shall be
  17. * included in all copies or substantial portions of the Software.
  18. *
  19. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  20. * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
  21. * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  22. * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  23. * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
  24. * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  25. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
  26. * OTHER DEALINGS IN THE SOFTWARE.
  27. */
  28. #include <stdio.h>
  29. #include <stdlib.h>
  30. #include <string.h>
  31. #include <stdarg.h>
  32. #include <sys/types.h>
  33. #include <pthread.h>
  34. #include <math.h>
  35. #include <errno.h>
  36. #include <sys/stat.h>
  37. #include <stdint.h>
  38. #include <fcntl.h>
  39. #include <assert.h>
  40. #include "platform.h"
  41. #include "rtp.h"
  42. #include "raop_sink.h"
  43. #include "log_util.h"
  44. #include "util.h"
  45. #ifdef WIN32
  46. #include <openssl/aes.h>
  47. #include "alac_wrapper.h"
  48. #define MSG_DONTWAIT 0
  49. #else
  50. #include "esp_pthread.h"
  51. #include "esp_system.h"
  52. #include <mbedtls/version.h>
  53. #include <mbedtls/aes.h>
  54. #include "alac_wrapper.h"
  55. #endif
  56. #define NTP2MS(ntp) ((((ntp) >> 10) * 1000L) >> 22)
  57. #define MS2NTP(ms) (((((u64_t) (ms)) << 22) / 1000) << 10)
  58. #define NTP2TS(ntp, rate) ((((ntp) >> 16) * (rate)) >> 16)
  59. #define TS2NTP(ts, rate) (((((u64_t) (ts)) << 16) / (rate)) << 16)
  60. #define MS2TS(ms, rate) ((((u64_t) (ms)) * (rate)) / 1000)
  61. #define TS2MS(ts, rate) NTP2MS(TS2NTP(ts,rate))
  62. extern log_level raop_loglevel;
  63. static log_level *loglevel = &raop_loglevel;
  64. //#define __RTP_STORE
  65. // default buffer size
  66. #define BUFFER_FRAMES_MAX ((RAOP_SAMPLE_RATE * 10) / 352 )
  67. #define BUFFER_FRAMES_MIN ( (150 * RAOP_SAMPLE_RATE * 2) / (352 * 100) )
  68. #define MAX_PACKET 1408
  69. #define MIN_LATENCY 11025
  70. #define MAX_LATENCY ( (120 * RAOP_SAMPLE_RATE * 2) / 100 )
  71. #define RTP_STACK_SIZE (4*1024)
  72. #define RTP_SYNC (0x01)
  73. #define NTP_SYNC (0x02)
  74. #define RESEND_TO 250
  75. enum { DATA = 0, CONTROL, TIMING };
  76. static const u8_t silence_frame[MAX_PACKET] = { 0 };
  77. uint32_t buffer_frames = ((150 * RAOP_SAMPLE_RATE * 2) / (352 * 100));
  78. typedef u16_t seq_t;
  79. typedef struct __attribute__((__packed__)) audio_buffer_entry { // decoded audio packets
  80. u32_t rtptime, last_resend;
  81. s16_t *data;
  82. u16_t len;
  83. u8_t ready;
  84. u8_t allocated;
  85. u8_t missed;
  86. } abuf_t;
  87. typedef struct rtp_s {
  88. #ifdef __RTP_STORE
  89. FILE *rtpIN, *rtpOUT;
  90. #endif
  91. bool running;
  92. unsigned char aesiv[16];
  93. #ifdef WIN32
  94. AES_KEY aes;
  95. #else
  96. mbedtls_aes_context aes;
  97. #endif
  98. bool decrypt;
  99. u8_t *decrypt_buf;
  100. u32_t frame_size, frame_duration;
  101. u32_t in_frames, out_frames;
  102. struct in_addr host;
  103. struct sockaddr_in rtp_host;
  104. struct {
  105. unsigned short rport, lport;
  106. int sock;
  107. } rtp_sockets[3]; // data, control, timing
  108. struct timing_s {
  109. u64_t local, remote;
  110. } timing;
  111. struct {
  112. u32_t rtp, time;
  113. u8_t status;
  114. } synchro;
  115. int latency; // rtp hold depth in samples
  116. u32_t resent_req, resent_rec; // total resent + recovered frames
  117. u32_t silent_frames; // total silence frames
  118. u32_t discarded;
  119. abuf_t audio_buffer[BUFFER_FRAMES_MAX];
  120. seq_t ab_read, ab_write;
  121. pthread_mutex_t ab_mutex;
  122. #ifdef WIN32
  123. pthread_t thread;
  124. #else
  125. TaskHandle_t thread, joiner;
  126. StaticTask_t *xTaskBuffer;
  127. StackType_t xStack[RTP_STACK_SIZE] __attribute__ ((aligned (4)));
  128. #endif
  129. struct alac_codec_s *alac_codec;
  130. int first_seqno;
  131. enum { RTP_WAIT, RTP_STREAM, RTP_PLAY } state;
  132. int stalled;
  133. raop_data_cb_t data_cb;
  134. raop_cmd_cb_t cmd_cb;
  135. } rtp_t;
  136. #define BUFIDX(seqno) ((seq_t)(seqno) % buffer_frames)
  137. static void buffer_alloc(abuf_t *audio_buffer, int size, uint8_t *buf, size_t buf_size);
  138. static void buffer_release(abuf_t *audio_buffer);
  139. static void buffer_reset(abuf_t *audio_buffer);
  140. static void buffer_push_packet(rtp_t *ctx);
  141. static bool rtp_request_resend(rtp_t *ctx, seq_t first, seq_t last);
  142. static bool rtp_request_timing(rtp_t *ctx);
  143. static int seq_order(seq_t a, seq_t b);
  144. #ifdef WIN32
  145. static void *rtp_thread_func(void *arg);
  146. #else
  147. static void rtp_thread_func(void *arg);
  148. #endif
  149. /*---------------------------------------------------------------------------*/
  150. static struct alac_codec_s* alac_init(int fmtp[32]) {
  151. struct alac_codec_s *alac;
  152. unsigned sample_rate, block_size;
  153. unsigned char sample_size, channels;
  154. struct {
  155. uint32_t frameLength;
  156. uint8_t compatibleVersion;
  157. uint8_t bitDepth;
  158. uint8_t pb;
  159. uint8_t mb;
  160. uint8_t kb;
  161. uint8_t numChannels;
  162. uint16_t maxRun;
  163. uint32_t maxFrameBytes;
  164. uint32_t avgBitRate;
  165. uint32_t sampleRate;
  166. } config;
  167. config.frameLength = htonl(fmtp[1]);
  168. config.compatibleVersion = fmtp[2];
  169. config.bitDepth = fmtp[3];
  170. config.pb = fmtp[4];
  171. config.mb = fmtp[5];
  172. config.kb = fmtp[6];
  173. config.numChannels = fmtp[7];
  174. config.maxRun = htons(fmtp[8]);
  175. config.maxFrameBytes = htonl(fmtp[9]);
  176. config.avgBitRate = htonl(fmtp[10]);
  177. config.sampleRate = htonl(fmtp[11]);
  178. alac = alac_create_decoder(sizeof(config), (unsigned char*) &config, &sample_size, &sample_rate, &channels, &block_size);
  179. if (!alac) {
  180. LOG_ERROR("cannot create alac codec", NULL);
  181. return NULL;
  182. }
  183. return alac;
  184. }
  185. /*---------------------------------------------------------------------------*/
  186. rtp_resp_t rtp_init(struct in_addr host, int latency, char *aeskey, char *aesiv, char *fmtpstr,
  187. short unsigned pCtrlPort, short unsigned pTimingPort,
  188. uint8_t *buffer, size_t size,
  189. raop_cmd_cb_t cmd_cb, raop_data_cb_t data_cb)
  190. {
  191. int i = 0;
  192. char *arg;
  193. int fmtp[12];
  194. bool rc = true;
  195. rtp_t *ctx = calloc(1, sizeof(rtp_t));
  196. rtp_resp_t resp = { 0, 0, 0, NULL };
  197. if (!ctx) return resp;
  198. ctx->host = host;
  199. ctx->decrypt = false;
  200. ctx->cmd_cb = cmd_cb;
  201. ctx->data_cb = data_cb;
  202. ctx->rtp_host.sin_family = AF_INET;
  203. ctx->rtp_host.sin_addr.s_addr = INADDR_ANY;
  204. pthread_mutex_init(&ctx->ab_mutex, 0);
  205. ctx->first_seqno = -1;
  206. ctx->latency = latency;
  207. ctx->ab_read = ctx->ab_write;
  208. #ifdef __RTP_STORE
  209. ctx->rtpIN = fopen("airplay.rtpin", "wb");
  210. ctx->rtpOUT = fopen("airplay.rtpout", "wb");
  211. #endif
  212. ctx->rtp_sockets[CONTROL].rport = pCtrlPort;
  213. ctx->rtp_sockets[TIMING].rport = pTimingPort;
  214. if (aesiv && aeskey) {
  215. memcpy(ctx->aesiv, aesiv, 16);
  216. #ifdef WIN32
  217. AES_set_decrypt_key((unsigned char*) aeskey, 128, &ctx->aes);
  218. #else
  219. memset(&ctx->aes, 0, sizeof(mbedtls_aes_context));
  220. mbedtls_aes_setkey_dec(&ctx->aes, (unsigned char*) aeskey, 128);
  221. #endif
  222. ctx->decrypt = true;
  223. ctx->decrypt_buf = malloc(MAX_PACKET);
  224. }
  225. memset(fmtp, 0, sizeof(fmtp));
  226. while ((arg = strsep(&fmtpstr, " \t")) != NULL) fmtp[i++] = atoi(arg);
  227. ctx->frame_size = fmtp[1];
  228. ctx->frame_duration = (ctx->frame_size * 1000) / RAOP_SAMPLE_RATE;
  229. // alac decoder
  230. ctx->alac_codec = alac_init(fmtp);
  231. rc &= ctx->alac_codec != NULL;
  232. buffer_alloc(ctx->audio_buffer, ctx->frame_size*4, buffer, size);
  233. // create rtp ports
  234. for (i = 0; i < 3; i++) {
  235. ctx->rtp_sockets[i].sock = bind_socket(&ctx->rtp_sockets[i].lport, SOCK_DGRAM);
  236. rc &= ctx->rtp_sockets[i].sock > 0;
  237. }
  238. // create http port and start listening
  239. resp.cport = ctx->rtp_sockets[CONTROL].lport;
  240. resp.tport = ctx->rtp_sockets[TIMING].lport;
  241. resp.aport = ctx->rtp_sockets[DATA].lport;
  242. ctx->running = true;
  243. #ifdef WIN32
  244. pthread_create(&ctx->thread, NULL, rtp_thread_func, (void *) ctx);
  245. #else
  246. ctx->xTaskBuffer = (StaticTask_t*) heap_caps_malloc(sizeof(StaticTask_t), MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT);
  247. ctx->thread = xTaskCreateStaticPinnedToCore( (TaskFunction_t) rtp_thread_func, "RTP_thread", RTP_STACK_SIZE, ctx,
  248. CONFIG_ESP32_PTHREAD_TASK_PRIO_DEFAULT + 1, ctx->xStack, ctx->xTaskBuffer,
  249. CONFIG_PTHREAD_TASK_CORE_DEFAULT );
  250. #endif
  251. // cleanup everything if we failed
  252. if (!rc) {
  253. LOG_ERROR("[%p]: cannot start RTP", ctx);
  254. rtp_end(ctx);
  255. ctx = NULL;
  256. }
  257. resp.ctx = ctx;
  258. return resp;
  259. }
  260. /*---------------------------------------------------------------------------*/
  261. void rtp_end(rtp_t *ctx)
  262. {
  263. int i;
  264. if (!ctx) return;
  265. if (ctx->running) {
  266. #if !defined WIN32
  267. ctx->joiner = xTaskGetCurrentTaskHandle();
  268. #endif
  269. ctx->running = false;
  270. #ifdef WIN32
  271. pthread_join(ctx->thread, NULL);
  272. #else
  273. ulTaskNotifyTake(pdFALSE, portMAX_DELAY);
  274. vTaskDelete(ctx->thread);
  275. SAFE_PTR_FREE(ctx->xTaskBuffer);
  276. #endif
  277. }
  278. for (i = 0; i < 3; i++) closesocket(ctx->rtp_sockets[i].sock);
  279. if (ctx->alac_codec) alac_delete_decoder(ctx->alac_codec);
  280. if (ctx->decrypt_buf) free(ctx->decrypt_buf);
  281. pthread_mutex_destroy(&ctx->ab_mutex);
  282. buffer_release(ctx->audio_buffer);
  283. free(ctx);
  284. #ifdef __RTP_STORE
  285. fclose(ctx->rtpIN);
  286. fclose(ctx->rtpOUT);
  287. #endif
  288. }
  289. /*---------------------------------------------------------------------------*/
  290. bool rtp_flush(rtp_t *ctx, unsigned short seqno, unsigned int rtptime, bool exit_locked)
  291. {
  292. pthread_mutex_lock(&ctx->ab_mutex);
  293. // always store flush seqno as we only want stricly above it, even when equal to RECORD
  294. ctx->first_seqno = seqno;
  295. bool flushed = false;
  296. // no need to stop playing if recent or equal to record - but first_seqno is needed
  297. if (ctx->state == RTP_PLAY) {
  298. buffer_reset(ctx->audio_buffer);
  299. ctx->state = RTP_WAIT;
  300. flushed = true;
  301. LOG_INFO("[%p]: FLUSH packets below %hu - %u", ctx, seqno, rtptime);
  302. }
  303. if (!exit_locked || !flushed) pthread_mutex_unlock(&ctx->ab_mutex);
  304. return flushed;
  305. }
  306. /*---------------------------------------------------------------------------*/
  307. void rtp_flush_release(rtp_t *ctx) {
  308. pthread_mutex_unlock(&ctx->ab_mutex);
  309. }
  310. /*---------------------------------------------------------------------------*/
  311. void rtp_record(rtp_t *ctx, unsigned short seqno, unsigned rtptime) {
  312. ctx->first_seqno = (seqno || rtptime) ? seqno : -1;
  313. ctx->state = RTP_WAIT;
  314. LOG_INFO("[%p]: record %hu - %u", ctx, seqno, rtptime);
  315. }
  316. /*---------------------------------------------------------------------------*/
  317. static void buffer_alloc(abuf_t *audio_buffer, int size, uint8_t *buf, size_t buf_size) {
  318. for (buffer_frames = 0; buf && buf_size >= size && buffer_frames < BUFFER_FRAMES_MAX; buffer_frames++) {
  319. audio_buffer[buffer_frames].data = (s16_t*) buf;
  320. audio_buffer[buffer_frames].allocated = 0;
  321. audio_buffer[buffer_frames].ready = 0;
  322. buf += size;
  323. buf_size -= size;
  324. }
  325. LOG_INFO("allocated %d buffers (min=%d) from buffer of %zu bytes", buffer_frames, BUFFER_FRAMES_MIN, buf_size + buffer_frames * size);
  326. for(; buffer_frames < BUFFER_FRAMES_MIN; buffer_frames++) {
  327. audio_buffer[buffer_frames].data = malloc(size);
  328. audio_buffer[buffer_frames].allocated = 1;
  329. audio_buffer[buffer_frames].ready = 0;
  330. }
  331. }
  332. /*---------------------------------------------------------------------------*/
  333. static void buffer_release(abuf_t *audio_buffer) {
  334. int i;
  335. for (i = 0; i < buffer_frames; i++) {
  336. if (audio_buffer[i].allocated) free(audio_buffer[i].data);
  337. }
  338. }
  339. /*---------------------------------------------------------------------------*/
  340. static void buffer_reset(abuf_t *audio_buffer) {
  341. int i;
  342. for (i = 0; i < buffer_frames; i++) audio_buffer[i].ready = 0;
  343. }
  344. /*---------------------------------------------------------------------------*/
  345. // the sequence numbers will wrap pretty often.
  346. // this returns true if the second arg is after the first
  347. static int seq_order(seq_t a, seq_t b) {
  348. s16_t d = b - a;
  349. return d > 0;
  350. }
  351. /*---------------------------------------------------------------------------*/
  352. static void alac_decode(rtp_t *ctx, s16_t *dest, char *buf, int len, u16_t *outsize) {
  353. unsigned char iv[16];
  354. int aeslen;
  355. assert(len<=MAX_PACKET);
  356. if (ctx->decrypt) {
  357. aeslen = len & ~0xf;
  358. memcpy(iv, ctx->aesiv, sizeof(iv));
  359. #ifdef WIN32
  360. AES_cbc_encrypt((unsigned char*)buf, ctx->decrypt_buf, aeslen, &ctx->aes, iv, AES_DECRYPT);
  361. #else
  362. mbedtls_aes_crypt_cbc(&ctx->aes, MBEDTLS_AES_DECRYPT, aeslen, iv, (unsigned char*) buf, ctx->decrypt_buf);
  363. #endif
  364. memcpy(ctx->decrypt_buf+aeslen, buf+aeslen, len-aeslen);
  365. alac_to_pcm(ctx->alac_codec, (unsigned char*) ctx->decrypt_buf, (unsigned char*) dest, 2, (unsigned int*) outsize);
  366. } else {
  367. alac_to_pcm(ctx->alac_codec, (unsigned char*) buf, (unsigned char*) dest, 2, (unsigned int*) outsize);
  368. }
  369. *outsize *= 4;
  370. }
  371. /*---------------------------------------------------------------------------*/
  372. static void buffer_put_packet(rtp_t *ctx, seq_t seqno, unsigned rtptime, bool first, char *data, int len) {
  373. abuf_t *abuf = NULL;
  374. pthread_mutex_lock(&ctx->ab_mutex);
  375. /* if we have received a RECORD with a seqno, then this is the first allowed rtp sequence number
  376. * and we are in RTP_WAIT state. If seqno was 0, then we are waiting for a flush that will tell
  377. * us what should be our first allowed packet but we must accept everything, wait and clean when
  378. * we the it arrives. This means that first packet moves us to RTP_STREAM state where we accept
  379. * frames but wait for the FLUSH. If this was a FLUSH while playing, then we are also in RTP_WAIT
  380. * state but we do have an allowed seqno and we should not accept any frame before we have it */
  381. // if we have a pending first seqno and we are below, always ignore it
  382. if (ctx->first_seqno != -1 && seq_order(seqno, ctx->first_seqno)) {
  383. pthread_mutex_unlock(&ctx->ab_mutex);
  384. return;
  385. }
  386. if (ctx->state == RTP_WAIT) {
  387. ctx->ab_write = seqno - 1;
  388. ctx->ab_read = ctx->ab_write + 1;
  389. ctx->resent_req = ctx->resent_rec = ctx->silent_frames = ctx->discarded = 0;
  390. if (ctx->first_seqno != -1) {
  391. LOG_INFO("[%p]: 1st accepted packet:%d, now playing", ctx, seqno);
  392. ctx->state = RTP_PLAY;
  393. ctx->first_seqno = -1;
  394. u32_t playtime = ctx->synchro.time + ((rtptime - ctx->synchro.rtp) * 10) / (RAOP_SAMPLE_RATE / 100);
  395. ctx->cmd_cb(RAOP_PLAY, playtime);
  396. } else {
  397. ctx->state = RTP_STREAM;
  398. LOG_INFO("[%p]: 1st accepted packet:%hu, waiting for FLUSH", ctx, seqno);
  399. }
  400. } else if (ctx->state == RTP_STREAM && ctx->first_seqno != -1 && seq_order(ctx->first_seqno, seqno + 1)) {
  401. // now we're talking, but first discard all packets with a seqno below first_seqno AND not ready
  402. while (seq_order(ctx->ab_read, ctx->first_seqno) ||
  403. !ctx->audio_buffer[BUFIDX(ctx->ab_read)].ready) {
  404. ctx->audio_buffer[BUFIDX(ctx->ab_read)].ready = false;
  405. ctx->ab_read++;
  406. }
  407. LOG_INFO("[%p]: done waiting for FLUSH with packet:%d, now playing starting:%hu", ctx, seqno, ctx->ab_read);
  408. ctx->state = RTP_PLAY;
  409. ctx->first_seqno = -1;
  410. u32_t playtime = ctx->synchro.time + ((rtptime - ctx->synchro.rtp) * 10) / (RAOP_SAMPLE_RATE / 100);
  411. ctx->cmd_cb(RAOP_PLAY, playtime);
  412. }
  413. abuf = ctx->audio_buffer + BUFIDX(seqno);
  414. if (seqno == (u16_t) (ctx->ab_write+1)) {
  415. // expected packet
  416. ctx->ab_write = seqno;
  417. LOG_SDEBUG("packet expected seqno:%hu rtptime:%u (W:%hu R:%hu)", seqno, rtptime, ctx->ab_write, ctx->ab_read);
  418. } else if (seq_order(ctx->ab_write, seqno)) {
  419. // newer than expected
  420. if (ctx->latency && seq_order(ctx->latency / ctx->frame_size, seqno - ctx->ab_write - 1)) {
  421. // this is a shitstorm, reset buffer
  422. LOG_WARN("[%p] too many missing frames %hu seq: %hu, (W:%hu R:%hu)", ctx, seqno - ctx->ab_write - 1, seqno, ctx->ab_write, ctx->ab_read);
  423. ctx->ab_read = seqno;
  424. } else {
  425. // request re-send missed frames and evaluate resent date as a whole *after*
  426. if (ctx->state == RTP_PLAY) rtp_request_resend(ctx, ctx->ab_write + 1, seqno-1);
  427. // resend date is after all requests have been sent
  428. u32_t now = gettime_ms();
  429. // set expected timing of missed frames for buffer_push_packet and set last_resend date
  430. for (seq_t i = ctx->ab_write + 1; seq_order(i, seqno); i++) {
  431. ctx->audio_buffer[BUFIDX(i)].rtptime = rtptime - (seqno-i)*ctx->frame_size;
  432. ctx->audio_buffer[BUFIDX(i)].last_resend = now;
  433. }
  434. LOG_DEBUG("[%p]: packet newer seqno:%hu rtptime:%u (W:%hu R:%hu)", ctx, seqno, rtptime, ctx->ab_write, ctx->ab_read);
  435. }
  436. ctx->ab_write = seqno;
  437. } else if (seq_order(ctx->ab_read, seqno + 1)) {
  438. // recovered packet, not yet sent
  439. ctx->resent_rec++;
  440. LOG_DEBUG("[%p]: packet recovered seqno:%hu rtptime:%u (W:%hu R:%hu)", ctx, seqno, rtptime, ctx->ab_write, ctx->ab_read);
  441. } else {
  442. // too late
  443. if (abuf->missed) LOG_INFO("[%p]: packet too late seqno:%hu rtptime:%u (W:%hu R:%hu)", ctx, seqno, rtptime, ctx->ab_write, ctx->ab_read);
  444. abuf = NULL;
  445. }
  446. if (ctx->in_frames++ > 1000) {
  447. LOG_INFO("[%p]: fill [level:%hu rec:%u] [W:%hu R:%hu]", ctx, ctx->ab_write - ctx->ab_read, ctx->resent_rec, ctx->ab_write, ctx->ab_read);
  448. ctx->in_frames = 0;
  449. }
  450. if (abuf) {
  451. alac_decode(ctx, abuf->data, data, len, &abuf->len);
  452. abuf->ready = 1;
  453. abuf->missed = 0;
  454. // this is the local rtptime when this frame is expected to play
  455. abuf->rtptime = rtptime;
  456. buffer_push_packet(ctx);
  457. #ifdef __RTP_STORE
  458. fwrite(data, len, 1, ctx->rtpIN);
  459. fwrite(abuf->data, abuf->len, 1, ctx->rtpOUT);
  460. #endif
  461. }
  462. pthread_mutex_unlock(&ctx->ab_mutex);
  463. }
  464. /*---------------------------------------------------------------------------*/
  465. // push as many frames as possible through callback
  466. static void buffer_push_packet(rtp_t *ctx) {
  467. abuf_t *curframe = NULL;
  468. u32_t now, playtime, hold = max((ctx->latency * 1000) / (8 * RAOP_SAMPLE_RATE), 100);
  469. // not ready to play yet
  470. if (ctx->state != RTP_PLAY || ctx->synchro.status != (RTP_SYNC | NTP_SYNC)) return;
  471. // there is always at least one frame in the buffer
  472. do {
  473. // re-evaluate time in loop in case data callback blocks ...
  474. now = gettime_ms();
  475. // try to manage playtime so that we overflow as late as possible if we miss NTP (2^31 / 10 / 44100)
  476. curframe = ctx->audio_buffer + BUFIDX(ctx->ab_read);
  477. playtime = ctx->synchro.time + ((curframe->rtptime - ctx->synchro.rtp) * 10) / (RAOP_SAMPLE_RATE / 100);
  478. if (now > playtime) {
  479. LOG_DEBUG("[%p]: discarded frame now:%u missed by:%d (W:%hu R:%hu)", ctx, now, now - playtime, ctx->ab_write, ctx->ab_read);
  480. ctx->discarded++;
  481. curframe->ready = 0;
  482. } else if (playtime - now <= hold) {
  483. if (curframe->ready) {
  484. ctx->data_cb((const u8_t*) curframe->data, curframe->len, playtime);
  485. curframe->ready = 0;
  486. } else {
  487. LOG_DEBUG("[%p]: created zero frame (W:%hu R:%hu)", ctx, ctx->ab_write, ctx->ab_read);
  488. ctx->data_cb(silence_frame, ctx->frame_size * 4, playtime);
  489. ctx->silent_frames++;
  490. curframe->missed = 1;
  491. }
  492. } else if (curframe->ready) {
  493. ctx->data_cb((const u8_t*) curframe->data, curframe->len, playtime);
  494. curframe->ready = 0;
  495. } else {
  496. break;
  497. }
  498. ctx->ab_read++;
  499. ctx->out_frames++;
  500. } while (seq_order(ctx->ab_read, ctx->ab_write));
  501. if (ctx->out_frames > 1000) {
  502. LOG_INFO("[%p]: drain [level:%hd head:%d ms] [W:%hu R:%hu] [req:%u sil:%u dis:%u]",
  503. ctx, ctx->ab_write - ctx->ab_read, playtime - now, ctx->ab_write, ctx->ab_read,
  504. ctx->resent_req, ctx->silent_frames, ctx->discarded);
  505. ctx->out_frames = 0;
  506. }
  507. LOG_SDEBUG("playtime %u %d [W:%hu R:%hu] %d", playtime, playtime - now, ctx->ab_write, ctx->ab_read, curframe->ready);
  508. // try to request resend missing packet in order, explore up to 32 frames
  509. for (int step = max((ctx->ab_write - ctx->ab_read + 1) / 32, 1),
  510. i = 0, first = 0;
  511. seq_order(ctx->ab_read + i, ctx->ab_write); i += step) {
  512. abuf_t* frame = ctx->audio_buffer + BUFIDX(ctx->ab_read + i);
  513. // stop when we reach a ready frame or a recent pending resend
  514. if (first && (frame->ready || now - frame->last_resend <= RESEND_TO)) {
  515. if (!rtp_request_resend(ctx, first, ctx->ab_read + i - 1)) break;
  516. first = 0;
  517. i += step - 1;
  518. } else if (!frame->ready && now - frame->last_resend > RESEND_TO) {
  519. if (!first) first = ctx->ab_read + i;
  520. frame->last_resend = now;
  521. }
  522. }
  523. }
  524. /*---------------------------------------------------------------------------*/
  525. #ifdef WIN32
  526. static void *rtp_thread_func(void *arg) {
  527. #else
  528. static void rtp_thread_func(void *arg) {
  529. #endif
  530. fd_set fds;
  531. int i, sock = -1;
  532. int count = 0;
  533. bool ntp_sent;
  534. char *packet = malloc(MAX_PACKET);
  535. rtp_t *ctx = (rtp_t*) arg;
  536. for (i = 0; i < 3; i++) {
  537. if (ctx->rtp_sockets[i].sock > sock) sock = ctx->rtp_sockets[i].sock;
  538. // send synchro request 3 times
  539. ntp_sent = rtp_request_timing(ctx);
  540. }
  541. while (ctx->running) {
  542. ssize_t plen;
  543. char type;
  544. socklen_t rtp_client_len = sizeof(struct sockaddr_in);
  545. int idx = 0;
  546. char *pktp = packet;
  547. struct timeval timeout = {0, 100*1000};
  548. FD_ZERO(&fds);
  549. for (i = 0; i < 3; i++) { FD_SET(ctx->rtp_sockets[i].sock, &fds); }
  550. if (select(sock + 1, &fds, NULL, NULL, &timeout) <= 0) {
  551. if (ctx->stalled++ == 30*10) ctx->cmd_cb(RAOP_STALLED);
  552. continue;
  553. }
  554. for (i = 0; i < 3; i++)
  555. if (FD_ISSET(ctx->rtp_sockets[i].sock, &fds)) idx = i;
  556. plen = recvfrom(ctx->rtp_sockets[idx].sock, packet, MAX_PACKET, MSG_DONTWAIT, (struct sockaddr*) &ctx->rtp_host, &rtp_client_len);
  557. if (!ntp_sent) {
  558. LOG_WARN("[%p]: NTP request not send yet", ctx);
  559. ntp_sent = rtp_request_timing(ctx);
  560. }
  561. if (plen <= 0) {
  562. LOG_WARN("Nothing received on a readable socket %d", plen);
  563. continue;
  564. }
  565. assert(plen <= MAX_PACKET);
  566. ctx->stalled = 0;
  567. type = packet[1] & ~0x80;
  568. pktp = packet;
  569. switch (type) {
  570. seq_t seqno;
  571. unsigned rtptime;
  572. // re-sent packet
  573. case 0x56: {
  574. pktp += 4;
  575. plen -= 4;
  576. }
  577. // fall through
  578. // data packet
  579. case 0x60: {
  580. seqno = ntohs(*(u16_t*)(pktp+2));
  581. rtptime = ntohl(*(u32_t*)(pktp+4));
  582. // adjust pointer and length
  583. pktp += 12;
  584. plen -= 12;
  585. LOG_SDEBUG("[%p]: seqno:%hu rtp:%u (type: %x, first: %u)", ctx, seqno, rtptime, type, packet[1] & 0x80);
  586. // check if packet contains enough content to be reasonable
  587. if (plen < 16) break;
  588. if ((packet[1] & 0x80) && (type != 0x56)) {
  589. LOG_INFO("[%p]: 1st audio packet received", ctx);
  590. }
  591. buffer_put_packet(ctx, seqno, rtptime, packet[1] & 0x80, pktp, plen);
  592. break;
  593. }
  594. // sync packet
  595. case 0x54: {
  596. u32_t rtp_now_latency = ntohl(*(u32_t*)(pktp+4));
  597. u64_t remote = (((u64_t) ntohl(*(u32_t*)(pktp+8))) << 32) + ntohl(*(u32_t*)(pktp+12));
  598. u32_t rtp_now = ntohl(*(u32_t*)(pktp+16));
  599. u16_t flags = ntohs(*(u16_t*)(pktp+2));
  600. u32_t remote_gap = NTP2MS(remote - ctx->timing.remote);
  601. // try to get NTP every 3 sec or every time if we are not synced
  602. if (!count-- || !(ctx->synchro.status & NTP_SYNC)) {
  603. rtp_request_timing(ctx);
  604. count = 3;
  605. }
  606. // something is wrong, we should not have such gap
  607. if (remote_gap > 10000) {
  608. LOG_WARN("discarding remote timing information %u", remote_gap);
  609. break;
  610. }
  611. pthread_mutex_lock(&ctx->ab_mutex);
  612. // re-align timestamp and expected local playback time (and magic 11025 latency)
  613. ctx->latency = rtp_now - rtp_now_latency;
  614. if (flags == 7 || flags == 4) ctx->latency += 11025;
  615. if (ctx->latency < MIN_LATENCY) ctx->latency = MIN_LATENCY;
  616. else if (ctx->latency > MAX_LATENCY) ctx->latency = MAX_LATENCY;
  617. ctx->synchro.rtp = rtp_now - ctx->latency;
  618. ctx->synchro.time = ctx->timing.local + remote_gap;
  619. // now we are synced on RTP frames
  620. ctx->synchro.status |= RTP_SYNC;
  621. // 1st sync packet received (signals a restart of playback)
  622. if (packet[0] & 0x10) {
  623. LOG_INFO("[%p]: 1st sync packet received", ctx);
  624. }
  625. pthread_mutex_unlock(&ctx->ab_mutex);
  626. LOG_DEBUG("[%p]: sync packet latency:%d rtp_latency:%u rtp:%u remote ntp:%llx, local time:%u local rtp:%u (now:%u)",
  627. ctx, ctx->latency, rtp_now_latency, rtp_now, remote, ctx->synchro.time, ctx->synchro.rtp, gettime_ms());
  628. if ((ctx->synchro.status & RTP_SYNC) && (ctx->synchro.status & NTP_SYNC)) ctx->cmd_cb(RAOP_TIMING);
  629. break;
  630. }
  631. // NTP timing packet
  632. case 0x53: {
  633. u32_t reference = ntohl(*(u32_t*)(pktp+12)); // only low 32 bits in our case
  634. u64_t remote =(((u64_t) ntohl(*(u32_t*)(pktp+16))) << 32) + ntohl(*(u32_t*)(pktp+20));
  635. u32_t roundtrip = gettime_ms() - reference;
  636. // better discard sync packets when roundtrip is suspicious
  637. if (roundtrip > 100) {
  638. // ask for another one only if we are not synced already
  639. if (!(ctx->synchro.status & NTP_SYNC)) rtp_request_timing(ctx);
  640. LOG_WARN("[%p]: discarding NTP roundtrip of %u ms", ctx, roundtrip);
  641. break;
  642. }
  643. /*
  644. The expected elapsed remote time should be exactly the same as
  645. elapsed local time between the two request, corrected by the
  646. drifting
  647. u64_t expected = ctx->timing.remote + MS2NTP(reference - ctx->timing.local);
  648. */
  649. ctx->timing.remote = remote;
  650. ctx->timing.local = reference;
  651. // now we are synced on NTP (mutex not needed)
  652. ctx->synchro.status |= NTP_SYNC;
  653. LOG_DEBUG("[%p]: Timing references local:%llu, remote:%llx (delta:%lld, sum:%lld, adjust:%lld, gaps:%d)",
  654. ctx, ctx->timing.local, ctx->timing.remote);
  655. break;
  656. }
  657. default: {
  658. LOG_WARN("Unknown packet received %x", (int) type);
  659. break;
  660. }
  661. }
  662. }
  663. free(packet);
  664. LOG_INFO("[%p]: terminating", ctx);
  665. #ifndef WIN32
  666. xTaskNotifyGive(ctx->joiner);
  667. vTaskSuspend(NULL);
  668. #else
  669. return NULL;
  670. #endif
  671. }
  672. /*---------------------------------------------------------------------------*/
  673. static bool rtp_request_timing(rtp_t *ctx) {
  674. unsigned char req[32];
  675. u32_t now = gettime_ms();
  676. int i;
  677. struct sockaddr_in host;
  678. LOG_DEBUG("[%p]: timing request now:%u (port: %hu)", ctx, now, ctx->rtp_sockets[TIMING].rport);
  679. req[0] = 0x80;
  680. req[1] = 0x52|0x80;
  681. *(u16_t*)(req+2) = htons(7);
  682. *(u32_t*)(req+4) = htonl(0); // dummy
  683. for (i = 0; i < 16; i++) req[i+8] = 0;
  684. *(u32_t*)(req+24) = 0;
  685. *(u32_t*)(req+28) = htonl(now); // this is not a real NTP, but a 32 ms counter in the low part of the NTP
  686. if (ctx->host.s_addr != INADDR_ANY) {
  687. host.sin_family = AF_INET;
  688. host.sin_addr = ctx->host;
  689. } else host = ctx->rtp_host;
  690. // no address from sender, need to wait for 1st packet to be received
  691. if (host.sin_addr.s_addr == INADDR_ANY) return false;
  692. host.sin_port = htons(ctx->rtp_sockets[TIMING].rport);
  693. if (sizeof(req) != sendto(ctx->rtp_sockets[TIMING].sock, req, sizeof(req), MSG_DONTWAIT, (struct sockaddr*) &host, sizeof(host))) {
  694. LOG_WARN("[%p]: SENDTO failed (%s)", ctx, strerror(errno));
  695. }
  696. return true;
  697. }
  698. /*---------------------------------------------------------------------------*/
  699. static bool rtp_request_resend(rtp_t *ctx, seq_t first, seq_t last) {
  700. unsigned char req[8]; // *not* a standard RTCP NACK
  701. // do not request silly ranges (happens in case of network large blackouts)
  702. if (seq_order(last, first) || last - first > buffer_frames / 2) return false;
  703. ctx->resent_req += (seq_t) (last - first) + 1;
  704. LOG_DEBUG("resend request [W:%hu R:%hu first=%hu last=%hu]", ctx->ab_write, ctx->ab_read, first, last);
  705. req[0] = 0x80;
  706. req[1] = 0x55|0x80; // Apple 'resend'
  707. *(u16_t*)(req+2) = htons(1); // our seqnum
  708. *(u16_t*)(req+4) = htons(first); // missed seqnum
  709. *(u16_t*)(req+6) = htons(last-first+1); // count
  710. ctx->rtp_host.sin_port = htons(ctx->rtp_sockets[CONTROL].rport);
  711. if (sizeof(req) != sendto(ctx->rtp_sockets[CONTROL].sock, req, sizeof(req), MSG_DONTWAIT, (struct sockaddr*) &ctx->rtp_host, sizeof(ctx->rtp_host))) {
  712. LOG_WARN("[%p]: SENDTO failed (%s)", ctx, strerror(errno));
  713. return false;
  714. }
  715. return true;
  716. }