rtp.c 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753
  1. /*
  2. * HairTunes - RAOP packet handler and slave-clocked replay engine
  3. * Copyright (c) James Laird 2011
  4. * All rights reserved.
  5. *
  6. * Modularisation: philippe_44@outlook.com, 2019
  7. *
  8. * Permission is hereby granted, free of charge, to any person
  9. * obtaining a copy of this software and associated documentation
  10. * files (the "Software"), to deal in the Software without
  11. * restriction, including without limitation the rights to use,
  12. * copy, modify, merge, publish, distribute, sublicense, and/or
  13. * sell copies of the Software, and to permit persons to whom the
  14. * Software is furnished to do so, subject to the following conditions:
  15. *
  16. * The above copyright notice and this permission notice shall be
  17. * included in all copies or substantial portions of the Software.
  18. *
  19. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  20. * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
  21. * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  22. * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  23. * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
  24. * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  25. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
  26. * OTHER DEALINGS IN THE SOFTWARE.
  27. */
  28. #include <stdio.h>
  29. #include <stdlib.h>
  30. #include <string.h>
  31. #include <stdarg.h>
  32. #include <sys/types.h>
  33. #include <pthread.h>
  34. #include <math.h>
  35. #include <errno.h>
  36. #include <sys/stat.h>
  37. #include <stdint.h>
  38. #include <fcntl.h>
  39. #include <assert.h>
  40. #include "platform.h"
  41. #include "rtp.h"
  42. #include "raop_sink.h"
  43. #include "log_util.h"
  44. #include "util.h"
  45. #ifdef WIN32
  46. #include <openssl/aes.h>
  47. #include "alac.h"
  48. #else
  49. #include "esp_pthread.h"
  50. #include "esp_system.h"
  51. #include <mbedtls/version.h>
  52. #include <mbedtls/aes.h>
  53. //#include "alac_wrapper.h"
  54. #include "alac.h"
  55. #endif
  56. #define NTP2MS(ntp) ((((ntp) >> 10) * 1000L) >> 22)
  57. #define MS2NTP(ms) (((((u64_t) (ms)) << 22) / 1000) << 10)
  58. #define NTP2TS(ntp, rate) ((((ntp) >> 16) * (rate)) >> 16)
  59. #define TS2NTP(ts, rate) (((((u64_t) (ts)) << 16) / (rate)) << 16)
  60. #define MS2TS(ms, rate) ((((u64_t) (ms)) * (rate)) / 1000)
  61. #define TS2MS(ts, rate) NTP2MS(TS2NTP(ts,rate))
  62. extern log_level raop_loglevel;
  63. static log_level *loglevel = &raop_loglevel;
  64. //#define __RTP_STORE
  65. // default buffer size
  66. #define BUFFER_FRAMES ( (120 * 88200) / (352 * 100) )
  67. #define MAX_PACKET 1408
  68. #define RTP_SYNC (0x01)
  69. #define NTP_SYNC (0x02)
  70. #define RESEND_TO 200
  71. enum { DATA, CONTROL, TIMING };
  72. static const u8_t silence_frame[MAX_PACKET] = { 0 };
  73. typedef u16_t seq_t;
  74. typedef struct audio_buffer_entry { // decoded audio packets
  75. int ready;
  76. u32_t rtptime, last_resend;
  77. s16_t *data;
  78. int len;
  79. } abuf_t;
  80. typedef struct rtp_s {
  81. #ifdef __RTP_STORE
  82. FILE *rtpIN, *rtpOUT;
  83. #endif
  84. bool running;
  85. unsigned char aesiv[16];
  86. #ifdef WIN32
  87. AES_KEY aes;
  88. #else
  89. mbedtls_aes_context aes;
  90. #endif
  91. bool decrypt;
  92. u8_t *decrypt_buf;
  93. int frame_size, frame_duration;
  94. int in_frames, out_frames;
  95. struct in_addr host;
  96. struct sockaddr_in rtp_host;
  97. struct {
  98. unsigned short rport, lport;
  99. int sock;
  100. } rtp_sockets[3]; // data, control, timing
  101. struct timing_s {
  102. u64_t local, remote;
  103. } timing;
  104. struct {
  105. u32_t rtp, time;
  106. u8_t status;
  107. } synchro;
  108. struct {
  109. u32_t time;
  110. seq_t seqno;
  111. u32_t rtptime;
  112. } record;
  113. int latency; // rtp hold depth in samples
  114. u32_t resent_req, resent_rec; // total resent + recovered frames
  115. u32_t silent_frames; // total silence frames
  116. u32_t filled_frames; // silence frames in current silence episode
  117. u32_t discarded;
  118. int skip; // number of frames to skip to keep sync alignement
  119. abuf_t audio_buffer[BUFFER_FRAMES];
  120. seq_t ab_read, ab_write;
  121. pthread_mutex_t ab_mutex;
  122. #ifdef WIN32
  123. pthread_t rtp_thread;
  124. #else
  125. TaskHandle_t rtp_thread, joiner;
  126. #endif
  127. alac_file *alac_codec;
  128. int flush_seqno;
  129. bool playing;
  130. raop_data_cb_t data_cb;
  131. raop_cmd_cb_t cmd_cb;
  132. u16_t syncS, syncN;
  133. } rtp_t;
  134. #define BUFIDX(seqno) ((seq_t)(seqno) % BUFFER_FRAMES)
  135. static void buffer_alloc(abuf_t *audio_buffer, int size);
  136. static void buffer_release(abuf_t *audio_buffer);
  137. static void buffer_reset(abuf_t *audio_buffer);
  138. static void buffer_push_packet(rtp_t *ctx);
  139. static bool rtp_request_resend(rtp_t *ctx, seq_t first, seq_t last);
  140. static bool rtp_request_timing(rtp_t *ctx);
  141. static void* rtp_thread_func(void *arg);
  142. static int seq_order(seq_t a, seq_t b);
  143. /*---------------------------------------------------------------------------*/
  144. static alac_file* alac_init(int fmtp[32]) {
  145. alac_file *alac;
  146. int sample_size = fmtp[3];
  147. if (sample_size != 16) {
  148. LOG_ERROR("sample size must be 16 %d", sample_size);
  149. return false;
  150. }
  151. alac = create_alac(sample_size, 2);
  152. if (!alac) {
  153. LOG_ERROR("cannot create alac codec", NULL);
  154. return NULL;
  155. }
  156. alac->setinfo_max_samples_per_frame = fmtp[1];
  157. alac->setinfo_7a = fmtp[2];
  158. alac->setinfo_sample_size = sample_size;
  159. alac->setinfo_rice_historymult = fmtp[4];
  160. alac->setinfo_rice_initialhistory = fmtp[5];
  161. alac->setinfo_rice_kmodifier = fmtp[6];
  162. alac->setinfo_7f = fmtp[7];
  163. alac->setinfo_80 = fmtp[8];
  164. alac->setinfo_82 = fmtp[9];
  165. alac->setinfo_86 = fmtp[10];
  166. alac->setinfo_8a_rate = fmtp[11];
  167. allocate_buffers(alac);
  168. return alac;
  169. }
  170. /*---------------------------------------------------------------------------*/
  171. rtp_resp_t rtp_init(struct in_addr host, int latency, char *aeskey, char *aesiv, char *fmtpstr,
  172. short unsigned pCtrlPort, short unsigned pTimingPort,
  173. raop_cmd_cb_t cmd_cb, raop_data_cb_t data_cb)
  174. {
  175. int i = 0;
  176. char *arg;
  177. int fmtp[12];
  178. bool rc = true;
  179. rtp_t *ctx = calloc(1, sizeof(rtp_t));
  180. rtp_resp_t resp = { 0, 0, 0, NULL };
  181. if (!ctx) return resp;
  182. ctx->host = host;
  183. ctx->decrypt = false;
  184. ctx->cmd_cb = cmd_cb;
  185. ctx->data_cb = data_cb;
  186. ctx->rtp_host.sin_family = AF_INET;
  187. ctx->rtp_host.sin_addr.s_addr = INADDR_ANY;
  188. pthread_mutex_init(&ctx->ab_mutex, 0);
  189. ctx->flush_seqno = -1;
  190. ctx->latency = latency;
  191. // write pointer = last written, read pointer = next to read so fill = w-r+1
  192. ctx->ab_read = ctx->ab_write + 1;
  193. #ifdef __RTP_STORE
  194. ctx->rtpIN = fopen("airplay.rtpin", "wb");
  195. ctx->rtpOUT = fopen("airplay.rtpout", "wb");
  196. #endif
  197. ctx->rtp_sockets[CONTROL].rport = pCtrlPort;
  198. ctx->rtp_sockets[TIMING].rport = pTimingPort;
  199. if (aesiv && aeskey) {
  200. memcpy(ctx->aesiv, aesiv, 16);
  201. #ifdef WIN32
  202. AES_set_decrypt_key((unsigned char*) aeskey, 128, &ctx->aes);
  203. #else
  204. memset(&ctx->aes, 0, sizeof(mbedtls_aes_context));
  205. mbedtls_aes_setkey_dec(&ctx->aes, (unsigned char*) aeskey, 128);
  206. #endif
  207. ctx->decrypt = true;
  208. ctx->decrypt_buf = malloc(MAX_PACKET);
  209. }
  210. memset(fmtp, 0, sizeof(fmtp));
  211. while ((arg = strsep(&fmtpstr, " \t")) != NULL) fmtp[i++] = atoi(arg);
  212. ctx->frame_size = fmtp[1];
  213. ctx->frame_duration = (ctx->frame_size * 1000) / 44100;
  214. // alac decoder
  215. ctx->alac_codec = alac_init(fmtp);
  216. rc &= ctx->alac_codec != NULL;
  217. buffer_alloc(ctx->audio_buffer, ctx->frame_size*4);
  218. // create rtp ports
  219. for (i = 0; i < 3; i++) {
  220. ctx->rtp_sockets[i].sock = bind_socket(&ctx->rtp_sockets[i].lport, SOCK_DGRAM);
  221. rc &= ctx->rtp_sockets[i].sock > 0;
  222. }
  223. // create http port and start listening
  224. resp.cport = ctx->rtp_sockets[CONTROL].lport;
  225. resp.tport = ctx->rtp_sockets[TIMING].lport;
  226. resp.aport = ctx->rtp_sockets[DATA].lport;
  227. if (rc) {
  228. ctx->running = true;
  229. #ifdef WIN32
  230. pthread_create(&ctx->rtp_thread, NULL, rtp_thread_func, (void *) ctx);
  231. #else
  232. xTaskCreate((TaskFunction_t) rtp_thread_func, "RTP_thread", 4096, ctx, configMAX_PRIORITIES - 3, &ctx->rtp_thread);
  233. #endif
  234. } else {
  235. rtp_end(ctx);
  236. ctx = NULL;
  237. }
  238. resp.ctx = ctx;
  239. return resp;
  240. }
  241. /*---------------------------------------------------------------------------*/
  242. void rtp_end(rtp_t *ctx)
  243. {
  244. int i;
  245. if (!ctx) return;
  246. if (ctx->running) {
  247. ctx->running = false;
  248. #ifdef WIN32
  249. pthread_join(ctx->rtp_thread, NULL);
  250. #else
  251. ctx->joiner = xTaskGetCurrentTaskHandle();
  252. xTaskNotifyWait(0, 0, NULL, portMAX_DELAY);
  253. #endif
  254. }
  255. for (i = 0; i < 3; i++) shutdown_socket(ctx->rtp_sockets[i].sock);
  256. delete_alac(ctx->alac_codec);
  257. if (ctx->decrypt_buf) free(ctx->decrypt_buf);
  258. buffer_release(ctx->audio_buffer);
  259. free(ctx);
  260. #ifdef __RTP_STORE
  261. fclose(ctx->rtpIN);
  262. fclose(ctx->rtpOUT);
  263. #endif
  264. }
  265. /*---------------------------------------------------------------------------*/
  266. bool rtp_flush(rtp_t *ctx, unsigned short seqno, unsigned int rtptime)
  267. {
  268. bool rc = true;
  269. u32_t now = gettime_ms();
  270. if (now < ctx->record.time + 250 || (ctx->record.seqno == seqno && ctx->record.rtptime == rtptime)) {
  271. rc = false;
  272. LOG_ERROR("[%p]: FLUSH ignored as same as RECORD (%hu - %u)", ctx, seqno, rtptime);
  273. } else {
  274. pthread_mutex_lock(&ctx->ab_mutex);
  275. buffer_reset(ctx->audio_buffer);
  276. ctx->playing = false;
  277. ctx->flush_seqno = seqno;
  278. pthread_mutex_unlock(&ctx->ab_mutex);
  279. }
  280. LOG_INFO("[%p]: flush %hu %u", ctx, seqno, rtptime);
  281. return rc;
  282. }
  283. /*---------------------------------------------------------------------------*/
  284. void rtp_record(rtp_t *ctx, unsigned short seqno, unsigned rtptime)
  285. {
  286. ctx->record.seqno = seqno;
  287. ctx->record.rtptime = rtptime;
  288. ctx->record.time = gettime_ms();
  289. LOG_INFO("[%p]: record %hu %u", ctx, seqno, rtptime);
  290. }
  291. /*---------------------------------------------------------------------------*/
  292. static void buffer_alloc(abuf_t *audio_buffer, int size) {
  293. int i;
  294. for (i = 0; i < BUFFER_FRAMES; i++) {
  295. audio_buffer[i].data = malloc(size);
  296. audio_buffer[i].ready = 0;
  297. }
  298. }
  299. /*---------------------------------------------------------------------------*/
  300. static void buffer_release(abuf_t *audio_buffer) {
  301. int i;
  302. for (i = 0; i < BUFFER_FRAMES; i++) {
  303. free(audio_buffer[i].data);
  304. }
  305. }
  306. /*---------------------------------------------------------------------------*/
  307. static void buffer_reset(abuf_t *audio_buffer) {
  308. int i;
  309. for (i = 0; i < BUFFER_FRAMES; i++) audio_buffer[i].ready = 0;
  310. }
  311. /*---------------------------------------------------------------------------*/
  312. // the sequence numbers will wrap pretty often.
  313. // this returns true if the second arg is after the first
  314. static int seq_order(seq_t a, seq_t b) {
  315. s16_t d = b - a;
  316. return d > 0;
  317. }
  318. /*---------------------------------------------------------------------------*/
  319. static void alac_decode(rtp_t *ctx, s16_t *dest, char *buf, int len, int *outsize) {
  320. unsigned char iv[16];
  321. int aeslen;
  322. assert(len<=MAX_PACKET);
  323. if (ctx->decrypt) {
  324. aeslen = len & ~0xf;
  325. memcpy(iv, ctx->aesiv, sizeof(iv));
  326. #ifdef WIN32
  327. AES_cbc_encrypt((unsigned char*)buf, ctx->decrypt_buf, aeslen, &ctx->aes, iv, AES_DECRYPT);
  328. #else
  329. mbedtls_aes_crypt_cbc(&ctx->aes, MBEDTLS_AES_DECRYPT, aeslen, iv, (unsigned char*) buf, ctx->decrypt_buf);
  330. #endif
  331. memcpy(ctx->decrypt_buf+aeslen, buf+aeslen, len-aeslen);
  332. decode_frame(ctx->alac_codec, ctx->decrypt_buf, dest, outsize);
  333. } else decode_frame(ctx->alac_codec, (unsigned char*) buf, dest, outsize);
  334. }
  335. /*---------------------------------------------------------------------------*/
  336. static void buffer_put_packet(rtp_t *ctx, seq_t seqno, unsigned rtptime, bool first, char *data, int len) {
  337. abuf_t *abuf = NULL;
  338. u32_t playtime;
  339. pthread_mutex_lock(&ctx->ab_mutex);
  340. if (!ctx->playing) {
  341. if ((ctx->flush_seqno == -1 || seq_order(ctx->flush_seqno, seqno)) &&
  342. (ctx->synchro.status & RTP_SYNC && ctx->synchro.status & NTP_SYNC)) {
  343. ctx->ab_write = seqno-1;
  344. ctx->ab_read = seqno;
  345. ctx->skip = 0;
  346. ctx->flush_seqno = -1;
  347. ctx->playing = true;
  348. ctx->resent_req = ctx->resent_rec = ctx->silent_frames = ctx->discarded = 0;
  349. playtime = ctx->synchro.time + (((s32_t)(rtptime - ctx->synchro.rtp)) * 10) / 441;
  350. ctx->cmd_cb(RAOP_PLAY, &playtime);
  351. } else {
  352. pthread_mutex_unlock(&ctx->ab_mutex);
  353. return;
  354. }
  355. }
  356. if (seqno == ctx->ab_write+1) {
  357. // expected packet
  358. abuf = ctx->audio_buffer + BUFIDX(seqno);
  359. ctx->ab_write = seqno;
  360. LOG_SDEBUG("packet expected seqno:%hu rtptime:%u (W:%hu R:%hu)", seqno, rtptime, ctx->ab_write, ctx->ab_read);
  361. } else if (seq_order(ctx->ab_write, seqno)) {
  362. // newer than expected
  363. if (seqno - ctx->ab_write - 1 > ctx->latency / ctx->frame_size) {
  364. // only get rtp latency-1 frames back (last one is seqno)
  365. LOG_WARN("[%p] too many missing frames %hu", ctx, seqno - ctx->ab_write - 1);
  366. ctx->ab_write = seqno - ctx->latency / ctx->frame_size;
  367. }
  368. if (seqno - ctx->ab_read + 1 > ctx->latency / ctx->frame_size) {
  369. // if ab_read is lagging more than http latency, advance it
  370. LOG_WARN("[%p] on hold for too long %hu", ctx, seqno - ctx->ab_read + 1);
  371. ctx->ab_read = seqno - ctx->latency / ctx->frame_size + 1;
  372. }
  373. if (rtp_request_resend(ctx, ctx->ab_write + 1, seqno-1)) {
  374. seq_t i;
  375. u32_t now = gettime_ms();
  376. for (i = ctx->ab_write + 1; i <= seqno-1; i++) {
  377. ctx->audio_buffer[BUFIDX(i)].rtptime = rtptime - (seqno-i)*ctx->frame_size;
  378. ctx->audio_buffer[BUFIDX(i)].last_resend = now;
  379. }
  380. }
  381. LOG_DEBUG("[%p]: packet newer seqno:%hu rtptime:%u (W:%hu R:%hu)", ctx, seqno, rtptime, ctx->ab_write, ctx->ab_read);
  382. abuf = ctx->audio_buffer + BUFIDX(seqno);
  383. ctx->ab_write = seqno;
  384. } else if (seq_order(ctx->ab_read, seqno + 1)) {
  385. // recovered packet, not yet sent
  386. abuf = ctx->audio_buffer + BUFIDX(seqno);
  387. ctx->resent_rec++;
  388. LOG_DEBUG("[%p]: packet recovered seqno:%hu rtptime:%u (W:%hu R:%hu)", ctx, seqno, rtptime, ctx->ab_write, ctx->ab_read);
  389. } else {
  390. // too late
  391. LOG_DEBUG("[%p]: packet too late seqno:%hu rtptime:%u (W:%hu R:%hu)", ctx, seqno, rtptime, ctx->ab_write, ctx->ab_read);
  392. }
  393. if (ctx->in_frames++ > 1000) {
  394. LOG_INFO("[%p]: fill [level:%hd rec:%u] [W:%hu R:%hu]", ctx, (seq_t) (ctx->ab_write - ctx->ab_read + 1), ctx->resent_rec, ctx->ab_write, ctx->ab_read);
  395. ctx->in_frames = 0;
  396. }
  397. if (abuf) {
  398. alac_decode(ctx, abuf->data, data, len, &abuf->len);
  399. abuf->ready = 1;
  400. // this is the local rtptime when this frame is expected to play
  401. abuf->rtptime = rtptime;
  402. #ifdef __RTP_STORE
  403. fwrite(data, len, 1, ctx->rtpIN);
  404. fwrite(abuf->data, abuf->len, 1, ctx->rtpOUT);
  405. #endif
  406. }
  407. buffer_push_packet(ctx);
  408. pthread_mutex_unlock(&ctx->ab_mutex);
  409. }
  410. /*---------------------------------------------------------------------------*/
  411. // push as many frames as possible through callback
  412. static void buffer_push_packet(rtp_t *ctx) {
  413. abuf_t *curframe = NULL;
  414. u32_t now, playtime, hold = max((ctx->latency * 1000) / (8 * 44100), 100);
  415. int i;
  416. // not ready to play yet
  417. if (!ctx->playing || ctx->synchro.status != (RTP_SYNC | NTP_SYNC)) return;
  418. // maybe re-evaluate time in loop in case data callback blocks ...
  419. now = gettime_ms();
  420. // there is always at least one frame in the buffer
  421. do {
  422. curframe = ctx->audio_buffer + BUFIDX(ctx->ab_read);
  423. playtime = ctx->synchro.time + (((s32_t)(curframe->rtptime - ctx->synchro.rtp)) * 10) / 441;
  424. if (now > playtime) {
  425. LOG_DEBUG("[%p]: discarded frame now:%u missed by %d (W:%hu R:%hu)", ctx, now, now - playtime, ctx->ab_write, ctx->ab_read);
  426. ctx->discarded++;
  427. } else if (curframe->ready) {
  428. ctx->data_cb((const u8_t*) curframe->data, curframe->len);
  429. curframe->ready = 0;
  430. } else if (playtime - now <= hold) {
  431. LOG_DEBUG("[%p]: created zero frame (W:%hu R:%hu)", ctx, ctx->ab_write, ctx->ab_read);
  432. ctx->data_cb(silence_frame, ctx->frame_size * 4);
  433. ctx->silent_frames++;
  434. } else break;
  435. ctx->ab_read++;
  436. ctx->out_frames++;
  437. } while (ctx->ab_write - ctx->ab_read + 1 > 0);
  438. if (ctx->out_frames > 1000) {
  439. LOG_INFO("[%p]: drain [level:%hd gap:%d] [W:%hu R:%hu] [R:%u S:%u F:%u D:%u] (head in %u ms) ",
  440. ctx, ctx->ab_write - ctx->ab_read, playtime - now, ctx->ab_write, ctx->ab_read,
  441. ctx->resent_req, ctx->silent_frames, ctx->filled_frames, ctx->discarded, playtime - now);
  442. ctx->out_frames = 0;
  443. }
  444. LOG_SDEBUG("playtime %u %d [W:%hu R:%hu] %d", playtime, playtime - now, ctx->ab_write, ctx->ab_read, curframe->ready);
  445. // each missing packet will be requested up to (latency_frames / 16) times
  446. for (i = 1; seq_order(ctx->ab_read + i, ctx->ab_write); i += 16) {
  447. abuf_t *frame = ctx->audio_buffer + BUFIDX(ctx->ab_read + i);
  448. if (!frame->ready && now - frame->last_resend > RESEND_TO) {
  449. rtp_request_resend(ctx, ctx->ab_read + i, ctx->ab_read + i);
  450. frame->last_resend = now;
  451. }
  452. }
  453. }
  454. /*---------------------------------------------------------------------------*/
  455. static void *rtp_thread_func(void *arg) {
  456. fd_set fds;
  457. int i, sock = -1;
  458. int count = 0;
  459. bool ntp_sent;
  460. char *packet = malloc(MAX_PACKET);
  461. rtp_t *ctx = (rtp_t*) arg;
  462. for (i = 0; i < 3; i++) {
  463. if (ctx->rtp_sockets[i].sock > sock) sock = ctx->rtp_sockets[i].sock;
  464. // send synchro requets 3 times
  465. ntp_sent = rtp_request_timing(ctx);
  466. }
  467. while (ctx->running) {
  468. ssize_t plen;
  469. char type;
  470. socklen_t rtp_client_len = sizeof(struct sockaddr_storage);
  471. int idx = 0;
  472. char *pktp = packet;
  473. struct timeval timeout = {0, 50*1000};
  474. FD_ZERO(&fds);
  475. for (i = 0; i < 3; i++) { FD_SET(ctx->rtp_sockets[i].sock, &fds); }
  476. if (select(sock + 1, &fds, NULL, NULL, &timeout) <= 0) continue;
  477. for (i = 0; i < 3; i++)
  478. if (FD_ISSET(ctx->rtp_sockets[i].sock, &fds)) idx = i;
  479. plen = recvfrom(ctx->rtp_sockets[idx].sock, packet, MAX_PACKET, 0, (struct sockaddr*) &ctx->rtp_host, &rtp_client_len);
  480. if (!ntp_sent) {
  481. LOG_WARN("[%p]: NTP request not send yet", ctx);
  482. ntp_sent = rtp_request_timing(ctx);
  483. }
  484. if (plen < 0) continue;
  485. assert(plen <= MAX_PACKET);
  486. type = packet[1] & ~0x80;
  487. pktp = packet;
  488. switch (type) {
  489. seq_t seqno;
  490. unsigned rtptime;
  491. // re-sent packet
  492. case 0x56: {
  493. pktp += 4;
  494. plen -= 4;
  495. }
  496. // data packet
  497. case 0x60: {
  498. seqno = ntohs(*(u16_t*)(pktp+2));
  499. rtptime = ntohl(*(u32_t*)(pktp+4));
  500. // adjust pointer and length
  501. pktp += 12;
  502. plen -= 12;
  503. LOG_SDEBUG("[%p]: seqno:%hu rtp:%u (type: %x, first: %u)", ctx, seqno, rtptime, type, packet[1] & 0x80);
  504. // check if packet contains enough content to be reasonable
  505. if (plen < 16) break;
  506. if ((packet[1] & 0x80) && (type != 0x56)) {
  507. LOG_INFO("[%p]: 1st audio packet received", ctx);
  508. }
  509. buffer_put_packet(ctx, seqno, rtptime, packet[1] & 0x80, pktp, plen);
  510. break;
  511. }
  512. // sync packet
  513. case 0x54: {
  514. u32_t rtp_now_latency = ntohl(*(u32_t*)(pktp+4));
  515. u64_t remote = (((u64_t) ntohl(*(u32_t*)(pktp+8))) << 32) + ntohl(*(u32_t*)(pktp+12));
  516. u32_t rtp_now = ntohl(*(u32_t*)(pktp+16));
  517. pthread_mutex_lock(&ctx->ab_mutex);
  518. // re-align timestamp and expected local playback time (and magic 11025 latency)
  519. if (!ctx->latency) ctx->latency = rtp_now - rtp_now_latency + 11025;
  520. ctx->synchro.rtp = rtp_now - ctx->latency;
  521. ctx->synchro.time = ctx->timing.local + (u32_t) NTP2MS(remote - ctx->timing.remote);
  522. // now we are synced on RTP frames
  523. ctx->synchro.status |= RTP_SYNC;
  524. // 1st sync packet received (signals a restart of playback)
  525. if (packet[0] & 0x10) {
  526. LOG_INFO("[%p]: 1st sync packet received", ctx);
  527. }
  528. pthread_mutex_unlock(&ctx->ab_mutex);
  529. LOG_DEBUG("[%p]: sync packet rtp_latency:%u rtp:%u remote ntp:%Lx, local time %u (now:%u)",
  530. ctx, rtp_now_latency, rtp_now, remote, ctx->synchro.time, gettime_ms());
  531. if (!count--) {
  532. rtp_request_timing(ctx);
  533. count = 3;
  534. }
  535. break;
  536. }
  537. // NTP timing packet
  538. case 0x53: {
  539. u64_t expected;
  540. u32_t reference = ntohl(*(u32_t*)(pktp+12)); // only low 32 bits in our case
  541. u64_t remote =(((u64_t) ntohl(*(u32_t*)(pktp+16))) << 32) + ntohl(*(u32_t*)(pktp+20));
  542. u32_t roundtrip = gettime_ms() - reference;
  543. // better discard sync packets when roundtrip is suspicious
  544. if (roundtrip > 100) {
  545. LOG_WARN("[%p]: discarding NTP roundtrip of %u ms", ctx, roundtrip);
  546. break;
  547. }
  548. /*
  549. The expected elapsed remote time should be exactly the same as
  550. elapsed local time between the two request, corrected by the
  551. drifting
  552. */
  553. expected = ctx->timing.remote + MS2NTP(reference - ctx->timing.local);
  554. ctx->timing.remote = remote;
  555. ctx->timing.local = reference;
  556. if (ctx->synchro.status & NTP_SYNC) {
  557. s32_t delta = NTP2MS((s64_t) expected - (s64_t) ctx->timing.remote);
  558. ctx->cmd_cb(RAOP_TIMING, &delta);
  559. }
  560. // now we are synced on NTP (mutex not needed)
  561. ctx->synchro.status |= NTP_SYNC;
  562. LOG_DEBUG("[%p]: Timing references local:%Lu, remote:%Lx (delta:%Ld, sum:%Ld, adjust:%Ld, gaps:%d)",
  563. ctx, ctx->timing.local, ctx->timing.remote);
  564. break;
  565. }
  566. }
  567. }
  568. free(packet);
  569. LOG_INFO("[%p]: terminating", ctx);
  570. #ifndef WIN32
  571. xTaskNotify(ctx->joiner, 0, eNoAction);
  572. vTaskDelete(NULL);
  573. #endif
  574. return NULL;
  575. }
  576. /*---------------------------------------------------------------------------*/
  577. static bool rtp_request_timing(rtp_t *ctx) {
  578. unsigned char req[32];
  579. u32_t now = gettime_ms();
  580. int i;
  581. struct sockaddr_in host;
  582. LOG_DEBUG("[%p]: timing request now:%u (port: %hu)", ctx, now, ctx->rtp_sockets[TIMING].rport);
  583. req[0] = 0x80;
  584. req[1] = 0x52|0x80;
  585. *(u16_t*)(req+2) = htons(7);
  586. *(u32_t*)(req+4) = htonl(0); // dummy
  587. for (i = 0; i < 16; i++) req[i+8] = 0;
  588. *(u32_t*)(req+24) = 0;
  589. *(u32_t*)(req+28) = htonl(now); // this is not a real NTP, but a 32 ms counter in the low part of the NTP
  590. if (ctx->host.s_addr != INADDR_ANY) {
  591. host.sin_family = AF_INET;
  592. host.sin_addr = ctx->host;
  593. } else host = ctx->rtp_host;
  594. // no address from sender, need to wait for 1st packet to be received
  595. if (host.sin_addr.s_addr == INADDR_ANY) return false;
  596. host.sin_port = htons(ctx->rtp_sockets[TIMING].rport);
  597. if (sizeof(req) != sendto(ctx->rtp_sockets[TIMING].sock, req, sizeof(req), 0, (struct sockaddr*) &host, sizeof(host))) {
  598. LOG_WARN("[%p]: SENDTO failed (%s)", ctx, strerror(errno));
  599. }
  600. return true;
  601. }
  602. /*---------------------------------------------------------------------------*/
  603. static bool rtp_request_resend(rtp_t *ctx, seq_t first, seq_t last) {
  604. unsigned char req[8]; // *not* a standard RTCP NACK
  605. // do not request silly ranges (happens in case of network large blackouts)
  606. if (seq_order(last, first) || last - first > BUFFER_FRAMES / 2) return false;
  607. ctx->resent_req += last - first + 1;
  608. LOG_DEBUG("resend request [W:%hu R:%hu first=%hu last=%hu]", ctx->ab_write, ctx->ab_read, first, last);
  609. req[0] = 0x80;
  610. req[1] = 0x55|0x80; // Apple 'resend'
  611. *(u16_t*)(req+2) = htons(1); // our seqnum
  612. *(u16_t*)(req+4) = htons(first); // missed seqnum
  613. *(u16_t*)(req+6) = htons(last-first+1); // count
  614. ctx->rtp_host.sin_port = htons(ctx->rtp_sockets[CONTROL].rport);
  615. if (sizeof(req) != sendto(ctx->rtp_sockets[CONTROL].sock, req, sizeof(req), 0, (struct sockaddr*) &ctx->rtp_host, sizeof(ctx->rtp_host))) {
  616. LOG_WARN("[%p]: SENDTO failed (%s)", ctx, strerror(errno));
  617. }
  618. return true;
  619. }