stream.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624
  1. /*
  2. * Squeezelite - lightweight headless squeezebox emulator
  3. *
  4. * (c) Adrian Smith 2012-2015, triode1@btinternet.com
  5. * Ralph Irving 2015-2017, ralph_irving@hotmail.com
  6. *
  7. * This program is free software: you can redistribute it and/or modify
  8. * it under the terms of the GNU General Public License as published by
  9. * the Free Software Foundation, either version 3 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU General Public License
  18. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  19. *
  20. */
  21. // stream thread
  22. #ifndef _GNU_SOURCE
  23. #define _GNU_SOURCE
  24. #endif
  25. #include "squeezelite.h"
  26. #include <fcntl.h>
  27. #if USE_SSL
  28. #include "openssl/ssl.h"
  29. #include "openssl/err.h"
  30. #endif
  31. #if SUN
  32. #include <signal.h>
  33. #endif
  34. static log_level loglevel;
  35. static struct buffer buf;
  36. struct buffer *streambuf = &buf;
  37. #define LOCK mutex_lock(streambuf->mutex)
  38. #define UNLOCK mutex_unlock(streambuf->mutex)
  39. /*
  40. After a lot of hesitation, I've added that "poll mutex" to prevent
  41. socket from being allocated while we are still in poll(). The issue
  42. happens is we have a close quickly followed by an open, we might still
  43. be in the poll() and simple OS fail as they re-allocate the same socket
  44. on which a thread is still waiting.
  45. Ideally, you want to set the lock in the disconnect() but that would mean
  46. very often we'd have to always wait for the end of the poll(), i.e. up to
  47. 100ms for nothing most of the time where if it is in the open(), it is
  48. less elegant as closing a socket on which there is a poll() is not good
  49. but it's more efficient as it is very rare that you'd have an open() less
  50. then 100ms after a close()
  51. */
  52. #if EMBEDDED
  53. static mutex_type poll_mutex;
  54. #define LOCK_L mutex_lock(poll_mutex)
  55. #define UNLOCK_L mutex_unlock(poll_mutex)
  56. #else
  57. #define LOCK_L
  58. #define UNLOCK_L
  59. #endif
  60. static sockfd fd;
  61. struct streamstate stream;
  62. #if USE_SSL
  63. static SSL_CTX *SSLctx;
  64. SSL *ssl;
  65. #endif
  66. #if !USE_SSL
  67. #define _recv(ssl, fc, buf, n, opt) recv(fd, buf, n, opt)
  68. #define _send(ssl, fd, buf, n, opt) send(fd, buf, n, opt)
  69. #define _poll(ssl, pollinfo, timeout) poll(pollinfo, 1, timeout)
  70. #define _last_error() last_error()
  71. #else
  72. #define _last_error() ERROR_WOULDBLOCK
  73. static int _recv(SSL *ssl, int fd, void *buffer, size_t bytes, int options) {
  74. int n;
  75. if (!ssl) return recv(fd, buffer, bytes, options);
  76. n = SSL_read(ssl, (u8_t*) buffer, bytes);
  77. if (n <= 0 && SSL_get_error(ssl, n) == SSL_ERROR_ZERO_RETURN) return 0;
  78. return n;
  79. }
  80. static int _send(SSL *ssl, int fd, void *buffer, size_t bytes, int options) {
  81. int n;
  82. if (!ssl) return send(fd, buffer, bytes, options);
  83. while (1) {
  84. int err;
  85. ERR_clear_error();
  86. if ((n = SSL_write(ssl, (u8_t*) buffer, bytes)) >= 0) return n;
  87. err = SSL_get_error(ssl, n);
  88. if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) continue;
  89. LOG_INFO("SSL write error %d", err );
  90. return n;
  91. }
  92. }
  93. /*
  94. can't mimic exactly poll as SSL is a real pain. Even if SSL_pending returns
  95. 0, there might be bytes to read but when select (poll) return > 0, there might
  96. be no frame available. As well select (poll) < 0 does not mean that there is
  97. no data pending
  98. */
  99. static int _poll(SSL *ssl, struct pollfd *pollinfo, int timeout) {
  100. if (!ssl) return poll(pollinfo, 1, timeout);
  101. if (pollinfo->events & POLLIN && SSL_pending(ssl)) {
  102. if (pollinfo->events & POLLOUT) poll(pollinfo, 1, 0);
  103. pollinfo->revents = POLLIN;
  104. return 1;
  105. }
  106. return poll(pollinfo, 1, timeout);
  107. }
  108. #endif
  109. static bool send_header(void) {
  110. char *ptr = stream.header;
  111. int len = stream.header_len;
  112. unsigned try = 0;
  113. ssize_t n;
  114. while (len) {
  115. n = _send(ssl, fd, ptr, len, MSG_NOSIGNAL);
  116. if (n <= 0) {
  117. if (n < 0 && _last_error() == ERROR_WOULDBLOCK && try < 10) {
  118. LOG_SDEBUG("retrying (%d) writing to socket", ++try);
  119. usleep(1000);
  120. continue;
  121. }
  122. LOG_INFO("failed writing to socket: %s", strerror(last_error()));
  123. stream.disconnect = LOCAL_DISCONNECT;
  124. stream.state = DISCONNECT;
  125. wake_controller();
  126. return false;
  127. }
  128. LOG_SDEBUG("wrote %d bytes to socket", n);
  129. ptr += n;
  130. len -= n;
  131. }
  132. LOG_SDEBUG("wrote header");
  133. return true;
  134. }
  135. static bool running = true;
  136. static void _disconnect(stream_state state, disconnect_code disconnect) {
  137. stream.state = state;
  138. stream.disconnect = disconnect;
  139. #if USE_SSL
  140. if (ssl) {
  141. SSL_shutdown(ssl);
  142. SSL_free(ssl);
  143. ssl = NULL;
  144. }
  145. #endif
  146. closesocket(fd);
  147. fd = -1;
  148. wake_controller();
  149. }
  150. static void *stream_thread() {
  151. while (running) {
  152. struct pollfd pollinfo;
  153. size_t space;
  154. LOCK;
  155. space = min(_buf_space(streambuf), _buf_cont_write(streambuf));
  156. if (fd < 0 || !space || stream.state <= STREAMING_WAIT) {
  157. UNLOCK;
  158. usleep(space ? 100000 : 25000);
  159. continue;
  160. }
  161. if (stream.state == STREAMING_FILE) {
  162. int n = read(fd, streambuf->writep, space);
  163. if (n == 0) {
  164. LOG_INFO("end of stream");
  165. _disconnect(DISCONNECT, DISCONNECT_OK);
  166. }
  167. if (n > 0) {
  168. _buf_inc_writep(streambuf, n);
  169. stream.bytes += n;
  170. LOG_SDEBUG("streambuf read %d bytes", n);
  171. }
  172. if (n < 0) {
  173. LOG_WARN("error reading: %s", strerror(last_error()));
  174. _disconnect(DISCONNECT, REMOTE_DISCONNECT);
  175. }
  176. UNLOCK;
  177. continue;
  178. } else {
  179. LOCK_L;
  180. pollinfo.fd = fd;
  181. pollinfo.events = POLLIN;
  182. if (stream.state == SEND_HEADERS) {
  183. pollinfo.events |= POLLOUT;
  184. }
  185. }
  186. UNLOCK;
  187. if (_poll(ssl, &pollinfo, 100)) {
  188. UNLOCK_L;
  189. LOCK;
  190. // check socket has not been closed while in poll
  191. if (fd < 0) {
  192. UNLOCK;
  193. continue;
  194. }
  195. if ((pollinfo.revents & POLLOUT) && stream.state == SEND_HEADERS) {
  196. if (send_header()) stream.state = RECV_HEADERS;
  197. stream.header_len = 0;
  198. UNLOCK;
  199. continue;
  200. }
  201. if (pollinfo.revents & (POLLIN | POLLHUP)) {
  202. // get response headers
  203. if (stream.state == RECV_HEADERS) {
  204. // read one byte at a time to catch end of header
  205. char c;
  206. static int endtok;
  207. int n = _recv(ssl, fd, &c, 1, 0);
  208. if (n <= 0) {
  209. if (n < 0 && _last_error() == ERROR_WOULDBLOCK) {
  210. UNLOCK;
  211. continue;
  212. }
  213. LOG_INFO("error reading headers: %s", n ? strerror(last_error()) : "closed");
  214. _disconnect(STOPPED, LOCAL_DISCONNECT);
  215. UNLOCK;
  216. continue;
  217. }
  218. *(stream.header + stream.header_len) = c;
  219. stream.header_len++;
  220. if (stream.header_len > MAX_HEADER - 1) {
  221. LOG_ERROR("received headers too long: %u", stream.header_len);
  222. _disconnect(DISCONNECT, LOCAL_DISCONNECT);
  223. }
  224. if (stream.header_len > 1 && (c == '\r' || c == '\n')) {
  225. endtok++;
  226. if (endtok == 4) {
  227. *(stream.header + stream.header_len) = '\0';
  228. LOG_INFO("headers: len: %d\n%s", stream.header_len, stream.header);
  229. stream.state = stream.cont_wait ? STREAMING_WAIT : STREAMING_BUFFERING;
  230. wake_controller();
  231. }
  232. } else {
  233. endtok = 0;
  234. }
  235. UNLOCK;
  236. continue;
  237. }
  238. // receive icy meta data
  239. if (stream.meta_interval && stream.meta_next == 0) {
  240. if (stream.meta_left == 0) {
  241. // read meta length
  242. u8_t c;
  243. int n = _recv(ssl, fd, &c, 1, 0);
  244. if (n <= 0) {
  245. if (n < 0 && _last_error() == ERROR_WOULDBLOCK) {
  246. UNLOCK;
  247. continue;
  248. }
  249. LOG_INFO("error reading icy meta: %s", n ? strerror(last_error()) : "closed");
  250. _disconnect(STOPPED, LOCAL_DISCONNECT);
  251. UNLOCK;
  252. continue;
  253. }
  254. stream.meta_left = 16 * c;
  255. stream.header_len = 0; // amount of received meta data
  256. // MAX_HEADER must be more than meta max of 16 * 255
  257. }
  258. if (stream.meta_left) {
  259. int n = _recv(ssl, fd, stream.header + stream.header_len, stream.meta_left, 0);
  260. if (n <= 0) {
  261. if (n < 0 && _last_error() == ERROR_WOULDBLOCK) {
  262. UNLOCK;
  263. continue;
  264. }
  265. LOG_INFO("error reading icy meta: %s", n ? strerror(last_error()) : "closed");
  266. _disconnect(STOPPED, LOCAL_DISCONNECT);
  267. UNLOCK;
  268. continue;
  269. }
  270. stream.meta_left -= n;
  271. stream.header_len += n;
  272. }
  273. if (stream.meta_left == 0) {
  274. if (stream.header_len) {
  275. *(stream.header + stream.header_len) = '\0';
  276. LOG_INFO("icy meta: len: %u\n%s", stream.header_len, stream.header);
  277. stream.meta_send = true;
  278. wake_controller();
  279. }
  280. stream.meta_next = stream.meta_interval;
  281. UNLOCK;
  282. continue;
  283. }
  284. // stream body into streambuf
  285. } else {
  286. int n;
  287. space = min(_buf_space(streambuf), _buf_cont_write(streambuf));
  288. if (stream.meta_interval) {
  289. space = min(space, stream.meta_next);
  290. }
  291. n = _recv(ssl, fd, streambuf->writep, space, 0);
  292. if (n == 0) {
  293. LOG_INFO("end of stream");
  294. _disconnect(DISCONNECT, DISCONNECT_OK);
  295. }
  296. if (n < 0 && _last_error() != ERROR_WOULDBLOCK) {
  297. LOG_INFO("error reading: %s", strerror(last_error()));
  298. _disconnect(DISCONNECT, REMOTE_DISCONNECT);
  299. }
  300. if (n > 0) {
  301. _buf_inc_writep(streambuf, n);
  302. stream.bytes += n;
  303. if (stream.meta_interval) {
  304. stream.meta_next -= n;
  305. }
  306. } else {
  307. UNLOCK;
  308. continue;
  309. }
  310. if (stream.state == STREAMING_BUFFERING && stream.bytes > stream.threshold) {
  311. stream.state = STREAMING_HTTP;
  312. wake_controller();
  313. }
  314. LOG_SDEBUG("streambuf read %d bytes", n);
  315. }
  316. }
  317. UNLOCK;
  318. } else {
  319. UNLOCK_L;
  320. LOG_SDEBUG("poll timeout");
  321. }
  322. }
  323. #if USE_SSL
  324. if (SSLctx) {
  325. SSL_CTX_free(SSLctx);
  326. }
  327. #endif
  328. return 0;
  329. }
  330. static thread_type thread;
  331. void stream_init(log_level level, unsigned stream_buf_size) {
  332. loglevel = level;
  333. LOG_INFO("init stream");
  334. LOG_DEBUG("streambuf size: %u", stream_buf_size);
  335. buf_init(streambuf, stream_buf_size);
  336. if (streambuf->buf == NULL) {
  337. LOG_ERROR("unable to malloc buffer");
  338. exit(0);
  339. }
  340. #if USE_SSL
  341. #if !LINKALL && !NO_SSLSYM
  342. if (ssl_loaded) {
  343. #endif
  344. SSL_library_init();
  345. SSLctx = SSL_CTX_new(SSLv23_client_method());
  346. if (SSLctx == NULL) {
  347. LOG_ERROR("unable to allocate SSL context");
  348. exit(0);
  349. }
  350. SSL_CTX_set_options(SSLctx, SSL_OP_NO_SSLv2);
  351. #if !LINKALL && !NO_SSLSYM
  352. }
  353. #endif
  354. ssl = NULL;
  355. #endif
  356. #if SUN
  357. signal(SIGPIPE, SIG_IGN); /* Force sockets to return -1 with EPIPE on pipe signal */
  358. #endif
  359. stream.state = STOPPED;
  360. stream.header = malloc(MAX_HEADER);
  361. *stream.header = '\0';
  362. fd = -1;
  363. #if EMBEDDED
  364. mutex_create_p(poll_mutex);
  365. #endif
  366. #if LINUX || FREEBSD
  367. touch_memory(streambuf->buf, streambuf->size);
  368. #endif
  369. #if LINUX || OSX || FREEBSD || EMBEDDED
  370. pthread_attr_t attr;
  371. pthread_attr_init(&attr);
  372. #ifdef PTHREAD_STACK_MIN
  373. pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN + STREAM_THREAD_STACK_SIZE);
  374. #endif
  375. pthread_create_name(&thread, &attr, stream_thread, NULL, "stream");
  376. pthread_attr_destroy(&attr);
  377. #endif
  378. #if WIN
  379. thread = CreateThread(NULL, STREAM_THREAD_STACK_SIZE, (LPTHREAD_START_ROUTINE)&stream_thread, NULL, 0, NULL);
  380. #endif
  381. }
  382. void stream_close(void) {
  383. LOG_INFO("close stream");
  384. LOCK;
  385. running = false;
  386. UNLOCK;
  387. #if LINUX || OSX || FREEBSD || EMBEDDED
  388. pthread_join(thread, NULL);
  389. #endif
  390. free(stream.header);
  391. buf_destroy(streambuf);
  392. #if EMBEDDED
  393. mutex_destroy(poll_mutex);
  394. #endif
  395. }
  396. void stream_file(const char *header, size_t header_len, unsigned threshold) {
  397. buf_flush(streambuf);
  398. LOCK;
  399. stream.header_len = header_len;
  400. memcpy(stream.header, header, header_len);
  401. *(stream.header+header_len) = '\0';
  402. LOG_INFO("opening local file: %s", stream.header);
  403. #if WIN
  404. fd = open(stream.header, O_RDONLY | O_BINARY);
  405. #else
  406. fd = open(stream.header, O_RDONLY);
  407. #endif
  408. stream.state = STREAMING_FILE;
  409. if (fd < 0) {
  410. LOG_INFO("can't open file: %s", stream.header);
  411. stream.state = DISCONNECT;
  412. }
  413. wake_controller();
  414. stream.cont_wait = false;
  415. stream.meta_interval = 0;
  416. stream.meta_next = 0;
  417. stream.meta_left = 0;
  418. stream.meta_send = false;
  419. stream.sent_headers = false;
  420. stream.bytes = 0;
  421. stream.threshold = threshold;
  422. UNLOCK;
  423. }
  424. void stream_sock(u32_t ip, u16_t port, const char *header, size_t header_len, unsigned threshold, bool cont_wait) {
  425. struct sockaddr_in addr;
  426. LOCK_L;
  427. int sock = socket(AF_INET, SOCK_STREAM, 0);
  428. UNLOCK_L;
  429. if (sock < 0) {
  430. LOG_ERROR("failed to create socket");
  431. return;
  432. }
  433. memset(&addr, 0, sizeof(addr));
  434. addr.sin_family = AF_INET;
  435. addr.sin_addr.s_addr = ip;
  436. addr.sin_port = port;
  437. LOG_INFO("connecting to %s:%d", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
  438. set_nonblock(sock);
  439. set_nosigpipe(sock);
  440. if (connect_timeout(sock, (struct sockaddr *) &addr, sizeof(addr), 10) < 0) {
  441. LOG_INFO("unable to connect to server");
  442. LOCK;
  443. stream.state = DISCONNECT;
  444. stream.disconnect = UNREACHABLE;
  445. UNLOCK;
  446. return;
  447. }
  448. #if USE_SSL
  449. if (ntohs(port) == 443) {
  450. char *server = strcasestr(header, "Host:");
  451. ssl = SSL_new(SSLctx);
  452. SSL_set_fd(ssl, sock);
  453. // add SNI
  454. if (server) {
  455. char *p, *servername = malloc(1024);
  456. sscanf(server, "Host:%255[^:]s", servername);
  457. for (p = servername; *p == ' '; p++);
  458. SSL_set_tlsext_host_name(ssl, p);
  459. free(servername);
  460. }
  461. while (1) {
  462. int status, err = 0;
  463. ERR_clear_error();
  464. status = SSL_connect(ssl);
  465. // successful negotiation
  466. if (status == 1) break;
  467. // error or non-blocking requires more time
  468. if (status < 0) {
  469. err = SSL_get_error(ssl, status);
  470. if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) continue;
  471. }
  472. LOG_WARN("unable to open SSL socket %d (%d)", status, err);
  473. closesocket(sock);
  474. SSL_free(ssl);
  475. ssl = NULL;
  476. LOCK;
  477. stream.state = DISCONNECT;
  478. stream.disconnect = UNREACHABLE;
  479. UNLOCK;
  480. return;
  481. }
  482. } else {
  483. ssl = NULL;
  484. }
  485. #endif
  486. buf_flush(streambuf);
  487. LOCK;
  488. fd = sock;
  489. stream.state = SEND_HEADERS;
  490. stream.cont_wait = cont_wait;
  491. stream.meta_interval = 0;
  492. stream.meta_next = 0;
  493. stream.meta_left = 0;
  494. stream.meta_send = false;
  495. stream.header_len = header_len;
  496. memcpy(stream.header, header, header_len);
  497. *(stream.header+header_len) = '\0';
  498. LOG_INFO("header: %s", stream.header);
  499. stream.sent_headers = false;
  500. stream.bytes = 0;
  501. stream.threshold = threshold;
  502. UNLOCK;
  503. }
  504. bool stream_disconnect(void) {
  505. bool disc = false;
  506. LOCK;
  507. #if USE_SSL
  508. if (ssl) {
  509. SSL_shutdown(ssl);
  510. SSL_free(ssl);
  511. ssl = NULL;
  512. }
  513. #endif
  514. if (fd != -1) {
  515. closesocket(fd);
  516. fd = -1;
  517. disc = true;
  518. }
  519. stream.state = STOPPED;
  520. UNLOCK;
  521. return disc;
  522. }