rtp.c 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895
  1. /*
  2. * HairTunes - RAOP packet handler and slave-clocked replay engine
  3. * Copyright (c) James Laird 2011
  4. * All rights reserved.
  5. *
  6. * Modularisation: philippe_44@outlook.com, 2019
  7. *
  8. * Permission is hereby granted, free of charge, to any person
  9. * obtaining a copy of this software and associated documentation
  10. * files (the "Software"), to deal in the Software without
  11. * restriction, including without limitation the rights to use,
  12. * copy, modify, merge, publish, distribute, sublicense, and/or
  13. * sell copies of the Software, and to permit persons to whom the
  14. * Software is furnished to do so, subject to the following conditions:
  15. *
  16. * The above copyright notice and this permission notice shall be
  17. * included in all copies or substantial portions of the Software.
  18. *
  19. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  20. * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
  21. * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  22. * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  23. * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
  24. * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  25. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
  26. * OTHER DEALINGS IN THE SOFTWARE.
  27. */
  28. #include <stdio.h>
  29. #include <stdlib.h>
  30. #include <string.h>
  31. #include <stdarg.h>
  32. #include <sys/types.h>
  33. #include <pthread.h>
  34. #include <math.h>
  35. #include <errno.h>
  36. #include <sys/stat.h>
  37. #include <stdint.h>
  38. #include <fcntl.h>
  39. #include <assert.h>
  40. #include "platform.h"
  41. #include "rtp.h"
  42. #include "log_util.h"
  43. #include "util.h"
  44. #ifdef WIN32
  45. #include <openssl/aes.h>
  46. #include "alac.h"
  47. #else
  48. #include "esp_pthread.h"
  49. #include "esp_system.h"
  50. #include <mbedtls/version.h>
  51. #include <mbedtls/aes.h>
  52. //#include "alac_wrapper.h"
  53. #include "alac.h"
  54. #endif
  55. #define NTP2MS(ntp) ((((ntp) >> 10) * 1000L) >> 22)
  56. #define MS2NTP(ms) (((((u64_t) (ms)) << 22) / 1000) << 10)
  57. #define NTP2TS(ntp, rate) ((((ntp) >> 16) * (rate)) >> 16)
  58. #define TS2NTP(ts, rate) (((((u64_t) (ts)) << 16) / (rate)) << 16)
  59. #define MS2TS(ms, rate) ((((u64_t) (ms)) * (rate)) / 1000)
  60. #define TS2MS(ts, rate) NTP2MS(TS2NTP(ts,rate))
  61. #define GAP_THRES 8
  62. #define GAP_COUNT 20
  63. extern log_level raop_loglevel;
  64. static log_level *loglevel = &raop_loglevel;
  65. //#define __RTP_STORE
  66. // default buffer size
  67. #define BUFFER_FRAMES ( (120 * 44100) / (352 * 100) )
  68. #define MAX_PACKET 1408
  69. #define RTP_SYNC (0x01)
  70. #define NTP_SYNC (0x02)
  71. #define RESEND_TO 200
  72. enum { DATA, CONTROL, TIMING };
  73. static const u8_t silence_frame[MAX_PACKET] = { 0 };
  74. typedef u16_t seq_t;
  75. typedef struct audio_buffer_entry { // decoded audio packets
  76. int ready;
  77. u32_t rtptime, last_resend;
  78. s16_t *data;
  79. int len;
  80. } abuf_t;
  81. typedef struct rtp_s {
  82. #ifdef __RTP_STORE
  83. FILE *rtpIN, *rtpOUT;
  84. #endif
  85. bool running;
  86. unsigned char aesiv[16];
  87. #ifdef WIN32
  88. AES_KEY aes;
  89. #else
  90. mbedtls_aes_context aes;
  91. #endif
  92. bool decrypt, range;
  93. int frame_size, frame_duration;
  94. int in_frames, out_frames;
  95. struct in_addr host;
  96. struct sockaddr_in rtp_host;
  97. struct {
  98. unsigned short rport, lport;
  99. int sock;
  100. } rtp_sockets[3]; // data, control, timing
  101. struct timing_s {
  102. bool drift;
  103. u64_t local, remote;
  104. u32_t count, gap_count;
  105. s64_t gap_sum, gap_adjust;
  106. } timing;
  107. struct {
  108. u32_t rtp, time;
  109. u8_t status;
  110. bool first, required;
  111. } synchro;
  112. struct {
  113. u32_t time;
  114. seq_t seqno;
  115. u32_t rtptime;
  116. } record;
  117. int latency; // rtp hold depth in samples
  118. u32_t resent_frames; // total recovered frames
  119. u32_t silent_frames; // total silence frames
  120. u32_t filled_frames; // silence frames in current silence episode
  121. int skip; // number of frames to skip to keep sync alignement
  122. abuf_t audio_buffer[BUFFER_FRAMES];
  123. seq_t ab_read, ab_write;
  124. pthread_mutex_t ab_mutex;
  125. #ifdef WIN32
  126. pthread_t rtp_thread;
  127. #else
  128. TaskHandle_t rtp_thread, joiner;
  129. #endif
  130. alac_file *alac_codec;
  131. int flush_seqno;
  132. bool playing;
  133. rtp_data_cb_t callback;
  134. } rtp_t;
  135. #define BUFIDX(seqno) ((seq_t)(seqno) % BUFFER_FRAMES)
  136. static void buffer_alloc(abuf_t *audio_buffer, int size);
  137. static void buffer_release(abuf_t *audio_buffer);
  138. static void buffer_reset(abuf_t *audio_buffer);
  139. static void buffer_push_packet(rtp_t *ctx);
  140. static bool rtp_request_resend(rtp_t *ctx, seq_t first, seq_t last);
  141. static bool rtp_request_timing(rtp_t *ctx);
  142. static void* rtp_thread_func(void *arg);
  143. static int seq_order(seq_t a, seq_t b);
  144. /*---------------------------------------------------------------------------*/
  145. static alac_file* alac_init(int fmtp[32]) {
  146. alac_file *alac;
  147. int sample_size = fmtp[3];
  148. if (sample_size != 16) {
  149. LOG_ERROR("sample size must be 16 %d", sample_size);
  150. return false;
  151. }
  152. alac = create_alac(sample_size, 2);
  153. if (!alac) {
  154. LOG_ERROR("cannot create alac codec", NULL);
  155. return NULL;
  156. }
  157. alac->setinfo_max_samples_per_frame = fmtp[1];
  158. alac->setinfo_7a = fmtp[2];
  159. alac->setinfo_sample_size = sample_size;
  160. alac->setinfo_rice_historymult = fmtp[4];
  161. alac->setinfo_rice_initialhistory = fmtp[5];
  162. alac->setinfo_rice_kmodifier = fmtp[6];
  163. alac->setinfo_7f = fmtp[7];
  164. alac->setinfo_80 = fmtp[8];
  165. alac->setinfo_82 = fmtp[9];
  166. alac->setinfo_86 = fmtp[10];
  167. alac->setinfo_8a_rate = fmtp[11];
  168. allocate_buffers(alac);
  169. return alac;
  170. }
  171. /*---------------------------------------------------------------------------*/
  172. rtp_resp_t rtp_init(struct in_addr host, bool sync, bool drift, bool range,
  173. int latency, char *aeskey, char *aesiv, char *fmtpstr,
  174. short unsigned pCtrlPort, short unsigned pTimingPort,
  175. rtp_data_cb_t callback)
  176. {
  177. int i = 0;
  178. char *arg;
  179. int fmtp[12];
  180. bool rc = true;
  181. rtp_t *ctx = calloc(1, sizeof(rtp_t));
  182. rtp_resp_t resp = { 0, 0, 0, NULL };
  183. if (!ctx) return resp;
  184. ctx->host = host;
  185. ctx->decrypt = false;
  186. ctx->callback = callback;
  187. ctx->rtp_host.sin_family = AF_INET;
  188. ctx->rtp_host.sin_addr.s_addr = INADDR_ANY;
  189. pthread_mutex_init(&ctx->ab_mutex, 0);
  190. ctx->flush_seqno = -1;
  191. ctx->latency = latency;
  192. ctx->synchro.required = sync;
  193. ctx->timing.drift = drift;
  194. ctx->range = range;
  195. // write pointer = last written, read pointer = next to read so fill = w-r+1
  196. ctx->ab_read = ctx->ab_write + 1;
  197. #ifdef __RTP_STORE
  198. ctx->rtpIN = fopen("airplay.rtpin", "wb");
  199. ctx->rtpOUT = fopen("airplay.rtpout", "wb");
  200. #endif
  201. ctx->rtp_sockets[CONTROL].rport = pCtrlPort;
  202. ctx->rtp_sockets[TIMING].rport = pTimingPort;
  203. if (aesiv && aeskey) {
  204. memcpy(ctx->aesiv, aesiv, 16);
  205. #ifdef WIN32
  206. AES_set_decrypt_key((unsigned char*) aeskey, 128, &ctx->aes);
  207. #else
  208. memset(&ctx->aes, 0, sizeof(mbedtls_aes_context));
  209. mbedtls_aes_setkey_dec(&ctx->aes, (unsigned char*) aeskey, 128);
  210. #endif
  211. ctx->decrypt = true;
  212. }
  213. memset(fmtp, 0, sizeof(fmtp));
  214. while ((arg = strsep(&fmtpstr, " \t")) != NULL) fmtp[i++] = atoi(arg);
  215. ctx->frame_size = fmtp[1];
  216. ctx->frame_duration = (ctx->frame_size * 1000) / 44100;
  217. // alac decoder
  218. ctx->alac_codec = alac_init(fmtp);
  219. rc &= ctx->alac_codec != NULL;
  220. buffer_alloc(ctx->audio_buffer, ctx->frame_size*4);
  221. // create rtp ports
  222. for (i = 0; i < 3; i++) {
  223. ctx->rtp_sockets[i].sock = bind_socket(&ctx->rtp_sockets[i].lport, SOCK_DGRAM);
  224. rc &= ctx->rtp_sockets[i].sock > 0;
  225. }
  226. // create http port and start listening
  227. resp.cport = ctx->rtp_sockets[CONTROL].lport;
  228. resp.tport = ctx->rtp_sockets[TIMING].lport;
  229. resp.aport = ctx->rtp_sockets[DATA].lport;
  230. if (rc) {
  231. ctx->running = true;
  232. #ifdef WIN32
  233. pthread_create(&ctx->rtp_thread, NULL, rtp_thread_func, (void *) ctx);
  234. #else
  235. xTaskCreate((TaskFunction_t) rtp_thread_func, "RTP_thread", 4096, ctx, configMAX_PRIORITIES - 3, &ctx->rtp_thread);
  236. #endif
  237. } else {
  238. rtp_end(ctx);
  239. ctx = NULL;
  240. }
  241. resp.ctx = ctx;
  242. return resp;
  243. }
  244. /*---------------------------------------------------------------------------*/
  245. void rtp_end(rtp_t *ctx)
  246. {
  247. int i;
  248. if (!ctx) return;
  249. if (ctx->running) {
  250. ctx->running = false;
  251. #ifdef WIN32
  252. pthread_join(ctx->rtp_thread, NULL);
  253. #else
  254. ctx->joiner = xTaskGetCurrentTaskHandle();
  255. xTaskNotifyWait(0, 0, NULL, portMAX_DELAY);
  256. #endif
  257. }
  258. for (i = 0; i < 3; i++) shutdown_socket(ctx->rtp_sockets[i].sock);
  259. delete_alac(ctx->alac_codec);
  260. buffer_release(ctx->audio_buffer);
  261. free(ctx);
  262. #ifdef __RTP_STORE
  263. fclose(ctx->rtpIN);
  264. fclose(ctx->rtpOUT);
  265. #endif
  266. }
  267. /*---------------------------------------------------------------------------*/
  268. bool rtp_flush(rtp_t *ctx, unsigned short seqno, unsigned int rtptime)
  269. {
  270. bool rc = true;
  271. u32_t now = gettime_ms();
  272. if (now < ctx->record.time + 250 || (ctx->record.seqno == seqno && ctx->record.rtptime == rtptime)) {
  273. rc = false;
  274. LOG_ERROR("[%p]: FLUSH ignored as same as RECORD (%hu - %u)", ctx, seqno, rtptime);
  275. } else {
  276. pthread_mutex_lock(&ctx->ab_mutex);
  277. buffer_reset(ctx->audio_buffer);
  278. ctx->playing = false;
  279. ctx->flush_seqno = seqno;
  280. ctx->synchro.first = false;
  281. pthread_mutex_unlock(&ctx->ab_mutex);
  282. }
  283. LOG_INFO("[%p]: flush %hu %u", ctx, seqno, rtptime);
  284. return rc;
  285. }
  286. /*---------------------------------------------------------------------------*/
  287. void rtp_record(rtp_t *ctx, unsigned short seqno, unsigned rtptime)
  288. {
  289. ctx->record.seqno = seqno;
  290. ctx->record.rtptime = rtptime;
  291. ctx->record.time = gettime_ms();
  292. LOG_INFO("[%p]: record %hu %u", ctx, seqno, rtptime);
  293. }
  294. /*---------------------------------------------------------------------------*/
  295. static void buffer_alloc(abuf_t *audio_buffer, int size) {
  296. int i;
  297. for (i = 0; i < BUFFER_FRAMES; i++) {
  298. audio_buffer[i].data = malloc(size);
  299. audio_buffer[i].ready = 0;
  300. }
  301. }
  302. /*---------------------------------------------------------------------------*/
  303. static void buffer_release(abuf_t *audio_buffer) {
  304. int i;
  305. for (i = 0; i < BUFFER_FRAMES; i++) {
  306. free(audio_buffer[i].data);
  307. }
  308. }
  309. /*---------------------------------------------------------------------------*/
  310. static void buffer_reset(abuf_t *audio_buffer) {
  311. int i;
  312. for (i = 0; i < BUFFER_FRAMES; i++) audio_buffer[i].ready = 0;
  313. }
  314. /*---------------------------------------------------------------------------*/
  315. // the sequence numbers will wrap pretty often.
  316. // this returns true if the second arg is after the first
  317. static int seq_order(seq_t a, seq_t b) {
  318. s16_t d = b - a;
  319. return d > 0;
  320. }
  321. /*---------------------------------------------------------------------------*/
  322. static void alac_decode(rtp_t *ctx, s16_t *dest, char *buf, int len, int *outsize) {
  323. unsigned char packet[MAX_PACKET];
  324. unsigned char iv[16];
  325. int aeslen;
  326. assert(len<=MAX_PACKET);
  327. if (ctx->decrypt) {
  328. aeslen = len & ~0xf;
  329. memcpy(iv, ctx->aesiv, sizeof(iv));
  330. #ifdef WIN32
  331. AES_cbc_encrypt((unsigned char*)buf, packet, aeslen, &ctx->aes, iv, AES_DECRYPT);
  332. #else
  333. mbedtls_aes_crypt_cbc(&ctx->aes, MBEDTLS_AES_DECRYPT, aeslen, iv, (unsigned char*) buf, packet);
  334. #endif
  335. memcpy(packet+aeslen, buf+aeslen, len-aeslen);
  336. decode_frame(ctx->alac_codec, packet, dest, outsize);
  337. } else decode_frame(ctx->alac_codec, (unsigned char*) buf, dest, outsize);
  338. }
  339. /*---------------------------------------------------------------------------*/
  340. static void buffer_put_packet(rtp_t *ctx, seq_t seqno, unsigned rtptime, bool first, char *data, int len) {
  341. abuf_t *abuf = NULL;
  342. pthread_mutex_lock(&ctx->ab_mutex);
  343. if (!ctx->playing) {
  344. if ((ctx->flush_seqno == -1 || seq_order(ctx->flush_seqno, seqno)) &&
  345. ((ctx->synchro.required && ctx->synchro.first) || !ctx->synchro.required)) {
  346. ctx->ab_write = seqno-1;
  347. ctx->ab_read = seqno;
  348. ctx->skip = 0;
  349. ctx->flush_seqno = -1;
  350. ctx->playing = true;
  351. ctx->synchro.first = false;
  352. ctx->resent_frames = ctx->silent_frames = 0;
  353. } else {
  354. pthread_mutex_unlock(&ctx->ab_mutex);
  355. return;
  356. }
  357. }
  358. if (seqno == ctx->ab_write+1) {
  359. // expected packet
  360. abuf = ctx->audio_buffer + BUFIDX(seqno);
  361. ctx->ab_write = seqno;
  362. LOG_SDEBUG("packet expected seqno:%hu rtptime:%u (W:%hu R:%hu)", seqno, rtptime, ctx->ab_write, ctx->ab_read);
  363. } else if (seq_order(ctx->ab_write, seqno)) {
  364. // newer than expected
  365. if (seqno - ctx->ab_write - 1 > ctx->latency / ctx->frame_size) {
  366. // only get rtp latency-1 frames back (last one is seqno)
  367. LOG_WARN("[%p] too many missing frames %hu", ctx, seqno - ctx->ab_write - 1);
  368. ctx->ab_write = seqno - ctx->latency / ctx->frame_size;
  369. }
  370. if (seqno - ctx->ab_read + 1 > ctx->latency / ctx->frame_size) {
  371. // if ab_read is lagging more than http latency, advance it
  372. LOG_WARN("[%p] on hold for too long %hu", ctx, seqno - ctx->ab_read + 1);
  373. ctx->ab_read = seqno - ctx->latency / ctx->frame_size + 1;
  374. }
  375. if (rtp_request_resend(ctx, ctx->ab_write + 1, seqno-1)) {
  376. seq_t i;
  377. u32_t now = gettime_ms();
  378. for (i = ctx->ab_write + 1; i <= seqno-1; i++) {
  379. ctx->audio_buffer[BUFIDX(i)].rtptime = rtptime - (seqno-i)*ctx->frame_size;
  380. ctx->audio_buffer[BUFIDX(i)].last_resend = now;
  381. }
  382. }
  383. LOG_DEBUG("[%p]: packet newer seqno:%hu rtptime:%u (W:%hu R:%hu)", ctx, seqno, rtptime, ctx->ab_write, ctx->ab_read);
  384. abuf = ctx->audio_buffer + BUFIDX(seqno);
  385. ctx->ab_write = seqno;
  386. } else if (seq_order(ctx->ab_read, seqno + 1)) {
  387. // recovered packet, not yet sent
  388. abuf = ctx->audio_buffer + BUFIDX(seqno);
  389. LOG_DEBUG("[%p]: packet recovered seqno:%hu rtptime:%u (W:%hu R:%hu)", ctx, seqno, rtptime, ctx->ab_write, ctx->ab_read);
  390. } else {
  391. // too late
  392. LOG_DEBUG("[%p]: packet too late seqno:%hu rtptime:%u (W:%hu R:%hu)", ctx, seqno, rtptime, ctx->ab_write, ctx->ab_read);
  393. }
  394. if (ctx->in_frames++ > 1000) {
  395. LOG_INFO("[%p]: fill [level:%hd] [W:%hu R:%hu]", ctx, (seq_t) (ctx->ab_write - ctx->ab_read + 1), ctx->ab_write, ctx->ab_read);
  396. ctx->in_frames = 0;
  397. }
  398. if (abuf) {
  399. alac_decode(ctx, abuf->data, data, len, &abuf->len);
  400. abuf->ready = 1;
  401. // this is the local rtptime when this frame is expected to play
  402. abuf->rtptime = rtptime;
  403. #ifdef __RTP_STORE
  404. fwrite(data, len, 1, ctx->rtpIN);
  405. fwrite(abuf->data, abuf->len, 1, ctx->rtpOUT);
  406. #endif
  407. }
  408. buffer_push_packet(ctx);
  409. pthread_mutex_unlock(&ctx->ab_mutex);
  410. }
  411. /*---------------------------------------------------------------------------*/
  412. // push as many frames as possible through callback
  413. static void buffer_push_packet(rtp_t *ctx) {
  414. abuf_t *curframe = NULL;
  415. u32_t now, playtime;
  416. int i;
  417. // not ready to play yet
  418. if (!ctx->playing || ctx->synchro.status != (RTP_SYNC | NTP_SYNC)) return;
  419. // maybe re-evaluate time in loop in case data callback blocks ...
  420. now = gettime_ms();
  421. // there is always at least one frame in the buffer
  422. do {
  423. curframe = ctx->audio_buffer + BUFIDX(ctx->ab_read);
  424. playtime = ctx->synchro.time + (((s32_t)(curframe->rtptime - ctx->synchro.rtp)) * 1000) / 44100;
  425. /*
  426. if (now > playtime + ctx->frame_duration) {
  427. //LOG_INFO("[%p]: discarded frame (W:%hu R:%hu)", ctx, ctx->ab_write, ctx->ab_read);
  428. } else if (curframe->ready) {
  429. ctx->callback((const u8_t*) curframe->data, curframe->len);
  430. curframe->ready = 0;
  431. } else if (now >= playtime) {
  432. LOG_DEBUG("[%p]: created zero frame (W:%hu R:%hu)", ctx, ctx->ab_write, ctx->ab_read);
  433. ctx->callback(silence_frame, ctx->frame_size * 4);
  434. ctx->silent_frames++;
  435. } else break;
  436. */
  437. if (curframe->ready) {
  438. ctx->callback((const u8_t*) curframe->data, curframe->len);
  439. curframe->ready = 0;
  440. } else if (now >= playtime) {
  441. LOG_DEBUG("[%p]: created zero frame (W:%hu R:%hu)", ctx, ctx->ab_write, ctx->ab_read);
  442. ctx->callback(silence_frame, ctx->frame_size * 4);
  443. ctx->silent_frames++;
  444. } else break;
  445. ctx->ab_read++;
  446. ctx->out_frames++;
  447. } while (ctx->ab_write - ctx->ab_read + 1 > 0);
  448. if (ctx->out_frames > 1000) {
  449. LOG_INFO("[%p]: drain [level:%hd gap:%d] [W:%hu R:%hu] [R:%u S:%u F:%u]",
  450. ctx, ctx->ab_write - ctx->ab_read, playtime - now, ctx->ab_write, ctx->ab_read,
  451. ctx->resent_frames, ctx->silent_frames, ctx->filled_frames);
  452. ctx->out_frames = 0;
  453. }
  454. LOG_SDEBUG("playtime %u %d [W:%hu R:%hu] %d", playtime, playtime - now, ctx->ab_write, ctx->ab_read, curframe->ready);
  455. // each missing packet will be requested up to (latency_frames / 16) times
  456. for (i = 16; seq_order(ctx->ab_read + i, ctx->ab_write); i += 16) {
  457. abuf_t *frame = ctx->audio_buffer + BUFIDX(ctx->ab_read + i);
  458. if (!frame->ready && now - frame->last_resend > RESEND_TO) {
  459. rtp_request_resend(ctx, ctx->ab_read + i, ctx->ab_read + i);
  460. frame->last_resend = now;
  461. }
  462. }
  463. }
  464. /*---------------------------------------------------------------------------*/
  465. static void *rtp_thread_func(void *arg) {
  466. fd_set fds;
  467. int i, sock = -1;
  468. int count = 0;
  469. bool ntp_sent;
  470. char *packet = malloc(MAX_PACKET);
  471. rtp_t *ctx = (rtp_t*) arg;
  472. for (i = 0; i < 3; i++) {
  473. if (ctx->rtp_sockets[i].sock > sock) sock = ctx->rtp_sockets[i].sock;
  474. // send synchro requets 3 times
  475. ntp_sent = rtp_request_timing(ctx);
  476. }
  477. while (ctx->running) {
  478. ssize_t plen;
  479. char type;
  480. socklen_t rtp_client_len = sizeof(struct sockaddr_storage);
  481. int idx = 0;
  482. char *pktp = packet;
  483. struct timeval timeout = {0, 50*1000};
  484. FD_ZERO(&fds);
  485. for (i = 0; i < 3; i++) { FD_SET(ctx->rtp_sockets[i].sock, &fds); }
  486. if (select(sock + 1, &fds, NULL, NULL, &timeout) <= 0) continue;
  487. for (i = 0; i < 3; i++)
  488. if (FD_ISSET(ctx->rtp_sockets[i].sock, &fds)) idx = i;
  489. plen = recvfrom(ctx->rtp_sockets[idx].sock, packet, MAX_PACKET, 0, (struct sockaddr*) &ctx->rtp_host, &rtp_client_len);
  490. if (!ntp_sent) {
  491. LOG_WARN("[%p]: NTP request not send yet", ctx);
  492. ntp_sent = rtp_request_timing(ctx);
  493. }
  494. if (plen < 0) continue;
  495. assert(plen <= MAX_PACKET);
  496. type = packet[1] & ~0x80;
  497. pktp = packet;
  498. switch (type) {
  499. seq_t seqno;
  500. unsigned rtptime;
  501. // re-sent packet
  502. case 0x56: {
  503. pktp += 4;
  504. plen -= 4;
  505. }
  506. // data packet
  507. case 0x60: {
  508. seqno = ntohs(*(u16_t*)(pktp+2));
  509. rtptime = ntohl(*(u32_t*)(pktp+4));
  510. // adjust pointer and length
  511. pktp += 12;
  512. plen -= 12;
  513. LOG_SDEBUG("[%p]: seqno:%hu rtp:%u (type: %x, first: %u)", ctx, seqno, rtptime, type, packet[1] & 0x80);
  514. // check if packet contains enough content to be reasonable
  515. if (plen < 16) break;
  516. if ((packet[1] & 0x80) && (type != 0x56)) {
  517. LOG_INFO("[%p]: 1st audio packet received", ctx);
  518. }
  519. buffer_put_packet(ctx, seqno, rtptime, packet[1] & 0x80, pktp, plen);
  520. break;
  521. }
  522. // sync packet
  523. case 0x54: {
  524. u32_t rtp_now_latency = ntohl(*(u32_t*)(pktp+4));
  525. u64_t remote = (((u64_t) ntohl(*(u32_t*)(pktp+8))) << 32) + ntohl(*(u32_t*)(pktp+12));
  526. u32_t rtp_now = ntohl(*(u32_t*)(pktp+16));
  527. pthread_mutex_lock(&ctx->ab_mutex);
  528. // re-align timestamp and expected local playback time
  529. if (!ctx->latency) ctx->latency = rtp_now - rtp_now_latency;
  530. ctx->synchro.rtp = rtp_now - ctx->latency;
  531. ctx->synchro.time = ctx->timing.local + (u32_t) NTP2MS(remote - ctx->timing.remote);
  532. // now we are synced on RTP frames
  533. ctx->synchro.status |= RTP_SYNC;
  534. // 1st sync packet received (signals a restart of playback)
  535. if (packet[0] & 0x10) {
  536. ctx->synchro.first = true;
  537. LOG_INFO("[%p]: 1st sync packet received", ctx);
  538. }
  539. pthread_mutex_unlock(&ctx->ab_mutex);
  540. LOG_DEBUG("[%p]: sync packet rtp_latency:%u rtp:%u remote ntp:%Lx, local time %u (now:%u)",
  541. ctx, rtp_now_latency, rtp_now, remote, ctx->synchro.time, gettime_ms());
  542. if (!count--) {
  543. rtp_request_timing(ctx);
  544. count = 3;
  545. }
  546. break;
  547. }
  548. // NTP timing packet
  549. case 0x53: {
  550. u64_t expected;
  551. s64_t delta = 0;
  552. u32_t reference = ntohl(*(u32_t*)(pktp+12)); // only low 32 bits in our case
  553. u64_t remote =(((u64_t) ntohl(*(u32_t*)(pktp+16))) << 32) + ntohl(*(u32_t*)(pktp+20));
  554. u32_t roundtrip = gettime_ms() - reference;
  555. // better discard sync packets when roundtrip is suspicious
  556. if (roundtrip > 100) {
  557. LOG_WARN("[%p]: discarding NTP roundtrip of %u ms", ctx, roundtrip);
  558. break;
  559. }
  560. /*
  561. The expected elapsed remote time should be exactly the same as
  562. elapsed local time between the two request, corrected by the
  563. drifting
  564. */
  565. expected = ctx->timing.remote + MS2NTP(reference - ctx->timing.local);
  566. ctx->timing.remote = remote;
  567. ctx->timing.local = reference;
  568. ctx->timing.count++;
  569. if (!ctx->timing.drift && (ctx->synchro.status & NTP_SYNC)) {
  570. delta = NTP2MS((s64_t) expected - (s64_t) ctx->timing.remote);
  571. ctx->timing.gap_sum += delta;
  572. pthread_mutex_lock(&ctx->ab_mutex);
  573. /*
  574. if expected time is more than remote, then our time is
  575. running faster and we are transmitting frames too quickly,
  576. so we'll run out of frames, need to add one
  577. */
  578. if (ctx->timing.gap_sum > GAP_THRES && ctx->timing.gap_count++ > GAP_COUNT) {
  579. LOG_INFO("[%p]: Sending packets too fast %Ld [W:%hu R:%hu]", ctx, ctx->timing.gap_sum, ctx->ab_write, ctx->ab_read);
  580. ctx->ab_read--;
  581. ctx->audio_buffer[BUFIDX(ctx->ab_read)].ready = 1;
  582. ctx->timing.gap_sum -= GAP_THRES;
  583. ctx->timing.gap_adjust -= GAP_THRES;
  584. /*
  585. if expected time is less than remote, then our time is
  586. running slower and we are transmitting frames too slowly,
  587. so we'll overflow frames buffer, need to remove one
  588. */
  589. } else if (ctx->timing.gap_sum < -GAP_THRES && ctx->timing.gap_count++ > GAP_COUNT) {
  590. if (seq_order(ctx->ab_read, ctx->ab_write + 1)) {
  591. ctx->audio_buffer[BUFIDX(ctx->ab_read)].ready = 0;
  592. ctx->ab_read++;
  593. } else ctx->skip++;
  594. ctx->timing.gap_sum += GAP_THRES;
  595. ctx->timing.gap_adjust += GAP_THRES;
  596. LOG_INFO("[%p]: Sending packets too slow %Ld (skip: %d) [W:%hu R:%hu]", ctx, ctx->timing.gap_sum, ctx->skip, ctx->ab_write, ctx->ab_read);
  597. }
  598. if (llabs(ctx->timing.gap_sum) < 8) ctx->timing.gap_count = 0;
  599. pthread_mutex_unlock(&ctx->ab_mutex);
  600. }
  601. // now we are synced on NTP (mutex not needed)
  602. ctx->synchro.status |= NTP_SYNC;
  603. LOG_DEBUG("[%p]: Timing references local:%Lu, remote:%Lx (delta:%Ld, sum:%Ld, adjust:%Ld, gaps:%d)",
  604. ctx, ctx->timing.local, ctx->timing.remote, delta, ctx->timing.gap_sum, ctx->timing.gap_adjust, ctx->timing.gap_count);
  605. break;
  606. }
  607. }
  608. }
  609. free(packet);
  610. LOG_INFO("[%p]: terminating", ctx);
  611. #ifndef WIN32
  612. xTaskNotify(ctx->joiner, 0, eNoAction);
  613. vTaskDelete(NULL);
  614. #endif
  615. return NULL;
  616. }
  617. /*---------------------------------------------------------------------------*/
  618. static bool rtp_request_timing(rtp_t *ctx) {
  619. unsigned char req[32];
  620. u32_t now = gettime_ms();
  621. int i;
  622. struct sockaddr_in host;
  623. LOG_DEBUG("[%p]: timing request now:%u (port: %hu)", ctx, now, ctx->rtp_sockets[TIMING].rport);
  624. req[0] = 0x80;
  625. req[1] = 0x52|0x80;
  626. *(u16_t*)(req+2) = htons(7);
  627. *(u32_t*)(req+4) = htonl(0); // dummy
  628. for (i = 0; i < 16; i++) req[i+8] = 0;
  629. *(u32_t*)(req+24) = 0;
  630. *(u32_t*)(req+28) = htonl(now); // this is not a real NTP, but a 32 ms counter in the low part of the NTP
  631. if (ctx->host.s_addr != INADDR_ANY) {
  632. host.sin_family = AF_INET;
  633. host.sin_addr = ctx->host;
  634. } else host = ctx->rtp_host;
  635. // no address from sender, need to wait for 1st packet to be received
  636. if (host.sin_addr.s_addr == INADDR_ANY) return false;
  637. host.sin_port = htons(ctx->rtp_sockets[TIMING].rport);
  638. if (sizeof(req) != sendto(ctx->rtp_sockets[TIMING].sock, req, sizeof(req), 0, (struct sockaddr*) &host, sizeof(host))) {
  639. LOG_WARN("[%p]: SENDTO failed (%s)", ctx, strerror(errno));
  640. }
  641. return true;
  642. }
  643. /*---------------------------------------------------------------------------*/
  644. static bool rtp_request_resend(rtp_t *ctx, seq_t first, seq_t last) {
  645. unsigned char req[8]; // *not* a standard RTCP NACK
  646. // do not request silly ranges (happens in case of network large blackouts)
  647. if (seq_order(last, first) || last - first > BUFFER_FRAMES / 2) return false;
  648. ctx->resent_frames += last - first + 1;
  649. LOG_DEBUG("resend request [W:%hu R:%hu first=%hu last=%hu]", ctx->ab_write, ctx->ab_read, first, last);
  650. req[0] = 0x80;
  651. req[1] = 0x55|0x80; // Apple 'resend'
  652. *(u16_t*)(req+2) = htons(1); // our seqnum
  653. *(u16_t*)(req+4) = htons(first); // missed seqnum
  654. *(u16_t*)(req+6) = htons(last-first+1); // count
  655. ctx->rtp_host.sin_port = htons(ctx->rtp_sockets[CONTROL].rport);
  656. if (sizeof(req) != sendto(ctx->rtp_sockets[CONTROL].sock, req, sizeof(req), 0, (struct sockaddr*) &ctx->rtp_host, sizeof(ctx->rtp_host))) {
  657. LOG_WARN("[%p]: SENDTO failed (%s)", ctx, strerror(errno));
  658. }
  659. return true;
  660. }
  661. #if 0
  662. /*---------------------------------------------------------------------------*/
  663. // get the next frame, when available. return 0 if underrun/stream reset.
  664. static short *_buffer_get_frame(rtp_t *ctx, int *len) {
  665. short buf_fill;
  666. abuf_t *curframe = 0;
  667. int i;
  668. u32_t now, playtime;
  669. if (!ctx->playing) return NULL;
  670. // skip frames if we are running late and skip could not be done in SYNC
  671. while (ctx->skip && seq_order(ctx->ab_read, ctx->ab_write + 1)) {
  672. ctx->audio_buffer[BUFIDX(ctx->ab_read)].ready = 0;
  673. ctx->ab_read++;
  674. ctx->skip--;
  675. LOG_INFO("[%p]: Sending packets too slow (skip: %d) [W:%hu R:%hu]", ctx, ctx->skip, ctx->ab_write, ctx->ab_read);
  676. }
  677. buf_fill = ctx->ab_write - ctx->ab_read + 1;
  678. if (buf_fill >= BUFFER_FRAMES) {
  679. LOG_ERROR("[%p]: Buffer overrun %hu", ctx, buf_fill);
  680. ctx->ab_read = ctx->ab_write - (BUFFER_FRAMES - 64);
  681. buf_fill = ctx->ab_write - ctx->ab_read + 1;
  682. }
  683. now = gettime_ms();
  684. curframe = ctx->audio_buffer + BUFIDX(ctx->ab_read);
  685. // use next frame when buffer is empty or silence continues to be sent
  686. if (!buf_fill) curframe->rtptime = ctx->audio_buffer[BUFIDX(ctx->ab_read - 1)].rtptime + ctx->frame_size;
  687. playtime = ctx->synchro.time + (((s32_t)(curframe->rtptime - ctx->synchro.rtp))*1000)/44100;
  688. LOG_SDEBUG("playtime %u %d [W:%hu R:%hu] %d", playtime, playtime - now, ctx->ab_write, ctx->ab_read, curframe->ready);
  689. // wait if not ready but have time, otherwise send silence
  690. if (!buf_fill || ctx->synchro.status != (RTP_SYNC | NTP_SYNC) || (now < playtime && !curframe->ready)) {
  691. LOG_SDEBUG("[%p]: waiting (fill:%hd, W:%hu R:%hu) now:%u, playtime:%u, wait:%d", ctx, buf_fill, ctx->ab_write, ctx->ab_read, now, playtime, playtime - now);
  692. // look for "blocking" frames at the top of the queue and try to catch-up
  693. for (i = 0; i < min(16, buf_fill); i++) {
  694. abuf_t *frame = ctx->audio_buffer + BUFIDX(ctx->ab_read + i);
  695. if (!frame->ready && now - frame->last_resend > RESEND_TO) {
  696. rtp_request_resend(ctx, ctx->ab_read + i, ctx->ab_read + i);
  697. frame->last_resend = now;
  698. }
  699. }
  700. return NULL;
  701. }
  702. // when silence is inserted at the top, need to move write pointer
  703. if (!buf_fill) {
  704. if (!ctx->filled_frames) {
  705. LOG_WARN("[%p]: start silence (late %d ms) [W:%hu R:%hu]", ctx, now - playtime, ctx->ab_write, ctx->ab_read);
  706. }
  707. ctx->ab_write++;
  708. ctx->filled_frames++;
  709. } else ctx->filled_frames = 0;
  710. if (!(ctx->out_frames++ & 0x1ff)) {
  711. LOG_INFO("[%p]: drain [level:%hd gap:%d] [W:%hu R:%hu] [R:%u S:%u F:%u]",
  712. ctx, buf_fill-1, playtime - now, ctx->ab_write, ctx->ab_read,
  713. ctx->resent_frames, ctx->silent_frames, ctx->filled_frames);
  714. }
  715. // each missing packet will be requested up to (latency_frames / 16) times
  716. for (i = 16; seq_order(ctx->ab_read + i, ctx->ab_write); i += 16) {
  717. abuf_t *frame = ctx->audio_buffer + BUFIDX(ctx->ab_read + i);
  718. if (!frame->ready && now - frame->last_resend > RESEND_TO) {
  719. rtp_request_resend(ctx, ctx->ab_read + i, ctx->ab_read + i);
  720. frame->last_resend = now;
  721. }
  722. }
  723. if (!curframe->ready) {
  724. LOG_DEBUG("[%p]: created zero frame (W:%hu R:%hu)", ctx, ctx->ab_write, ctx->ab_read);
  725. memset(curframe->data, 0, ctx->frame_size*4);
  726. curframe->len = ctx->frame_size * 4;
  727. ctx->silent_frames++;
  728. } else {
  729. LOG_SDEBUG("[%p]: prepared frame (fill:%hd, W:%hu R:%hu)", ctx, buf_fill-1, ctx->ab_write, ctx->ab_read);
  730. }
  731. *len = curframe->len;
  732. curframe->ready = 0;
  733. ctx->ab_read++;
  734. return curframe->data;
  735. }
  736. #endif