output.cpp 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005
  1. /*
  2. * output.cpp
  3. * Output related routines
  4. *
  5. * Copyright (c) 2015-2021 Tomasz Lemiech <szpajder@gmail.com>
  6. *
  7. * This program is free software: you can redistribute it and/or modify
  8. * it under the terms of the GNU General Public License as published by
  9. * the Free Software Foundation, either version 3 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU General Public License
  18. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  19. */
  20. #include <math.h>
  21. #include <ogg/ogg.h>
  22. #include <shout/shout.h>
  23. #include <stdio.h>
  24. #include <sys/stat.h>
  25. #include <sys/time.h>
  26. #include <sys/types.h>
  27. #include <unistd.h>
  28. #include <vorbis/vorbisenc.h>
  29. // SHOUTERR_RETRY is available since libshout 2.4.0.
  30. // Set it to an impossible value if it's not there.
  31. #ifndef SHOUTERR_RETRY
  32. #define SHOUTERR_RETRY (-255)
  33. #endif /* SHOUTERR_RETRY */
  34. #include <lame/lame.h>
  35. #ifdef WITH_PULSEAUDIO
  36. #include <pulse/pulseaudio.h>
  37. #endif /* WITH_PULSEAUDIO */
  38. #include <syslog.h>
  39. #include <cassert>
  40. #include <cerrno>
  41. #include <cstdlib>
  42. #include <cstring>
  43. #include <ctime>
  44. #include <sstream>
  45. #include <string>
  46. #include "config.h"
  47. #include "helper_functions.h"
  48. #include "input-common.h"
  49. #include "rtl_airband.h"
  50. void shout_setup(icecast_data* icecast, mix_modes mixmode) {
  51. int ret;
  52. shout_t* shouttemp = shout_new();
  53. if (shouttemp == NULL) {
  54. printf("cannot allocate\n");
  55. }
  56. if (shout_set_host(shouttemp, icecast->hostname) != SHOUTERR_SUCCESS) {
  57. shout_free(shouttemp);
  58. return;
  59. }
  60. if (shout_set_protocol(shouttemp, SHOUT_PROTOCOL_HTTP) != SHOUTERR_SUCCESS) {
  61. shout_free(shouttemp);
  62. return;
  63. }
  64. if (shout_set_port(shouttemp, icecast->port) != SHOUTERR_SUCCESS) {
  65. shout_free(shouttemp);
  66. return;
  67. }
  68. #ifdef LIBSHOUT_HAS_TLS
  69. if (shout_set_tls(shouttemp, icecast->tls_mode) != SHOUTERR_SUCCESS) {
  70. shout_free(shouttemp);
  71. return;
  72. }
  73. #endif /* LIBSHOUT_HAS_TLS */
  74. char mp[100];
  75. sprintf(mp, "/%s", icecast->mountpoint);
  76. if (shout_set_mount(shouttemp, mp) != SHOUTERR_SUCCESS) {
  77. shout_free(shouttemp);
  78. return;
  79. }
  80. if (shout_set_user(shouttemp, icecast->username) != SHOUTERR_SUCCESS) {
  81. shout_free(shouttemp);
  82. return;
  83. }
  84. if (shout_set_password(shouttemp, icecast->password) != SHOUTERR_SUCCESS) {
  85. shout_free(shouttemp);
  86. return;
  87. }
  88. #ifdef LIBSHOUT_HAS_CONTENT_FORMAT
  89. if (shout_set_content_format(shouttemp, SHOUT_FORMAT_MP3, SHOUT_USAGE_AUDIO, NULL) != SHOUTERR_SUCCESS) {
  90. #else
  91. if (shout_set_format(shouttemp, SHOUT_FORMAT_MP3) != SHOUTERR_SUCCESS) {
  92. #endif /* LIBSHOUT_HAS_CONTENT_FORMAT */
  93. shout_free(shouttemp);
  94. return;
  95. }
  96. if (icecast->name && shout_set_meta(shouttemp, SHOUT_META_NAME, icecast->name) != SHOUTERR_SUCCESS) {
  97. shout_free(shouttemp);
  98. return;
  99. }
  100. if (icecast->genre && shout_set_meta(shouttemp, SHOUT_META_GENRE, icecast->genre) != SHOUTERR_SUCCESS) {
  101. shout_free(shouttemp);
  102. return;
  103. }
  104. if (icecast->description && shout_set_meta(shouttemp, SHOUT_META_DESCRIPTION, icecast->description) != SHOUTERR_SUCCESS) {
  105. shout_free(shouttemp);
  106. return;
  107. }
  108. char samplerates[20];
  109. sprintf(samplerates, "%d", MP3_RATE);
  110. shout_set_audio_info(shouttemp, SHOUT_AI_SAMPLERATE, samplerates);
  111. shout_set_audio_info(shouttemp, SHOUT_AI_CHANNELS, (mixmode == MM_STEREO ? "2" : "1"));
  112. if (shout_set_nonblocking(shouttemp, 1) != SHOUTERR_SUCCESS) {
  113. log(LOG_ERR, "Error setting non-blocking mode: %s\n", shout_get_error(shouttemp));
  114. return;
  115. }
  116. ret = shout_open(shouttemp);
  117. if (ret == SHOUTERR_SUCCESS)
  118. ret = SHOUTERR_CONNECTED;
  119. if (ret == SHOUTERR_BUSY || ret == SHOUTERR_RETRY)
  120. log(LOG_NOTICE, "Connecting to %s:%d/%s...\n", icecast->hostname, icecast->port, icecast->mountpoint);
  121. int shout_timeout = 30 * 5; // 30 * 5 * 200ms = 30s
  122. while ((ret == SHOUTERR_BUSY || ret == SHOUTERR_RETRY) && shout_timeout-- > 0) {
  123. SLEEP(200);
  124. ret = shout_get_connected(shouttemp);
  125. }
  126. if (ret == SHOUTERR_CONNECTED) {
  127. log(LOG_NOTICE, "Connected to %s:%d/%s\n", icecast->hostname, icecast->port, icecast->mountpoint);
  128. SLEEP(100);
  129. icecast->shout = shouttemp;
  130. } else {
  131. log(LOG_WARNING, "Could not connect to %s:%d/%s: %s\n", icecast->hostname, icecast->port, icecast->mountpoint, shout_get_error(shouttemp));
  132. shout_close(shouttemp);
  133. shout_free(shouttemp);
  134. return;
  135. }
  136. }
  137. lame_t airlame_init(mix_modes mixmode, int highpass, int lowpass) {
  138. lame_t lame = lame_init();
  139. if (!lame) {
  140. log(LOG_WARNING, "lame_init failed\n");
  141. return NULL;
  142. }
  143. lame_set_in_samplerate(lame, WAVE_RATE);
  144. lame_set_VBR(lame, vbr_mtrh);
  145. lame_set_brate(lame, 16);
  146. lame_set_quality(lame, 7);
  147. lame_set_lowpassfreq(lame, lowpass);
  148. lame_set_highpassfreq(lame, highpass);
  149. lame_set_out_samplerate(lame, MP3_RATE);
  150. if (mixmode == MM_STEREO) {
  151. lame_set_num_channels(lame, 2);
  152. lame_set_mode(lame, JOINT_STEREO);
  153. } else {
  154. lame_set_num_channels(lame, 1);
  155. lame_set_mode(lame, MONO);
  156. }
  157. debug_print("lame init with mixmode=%s\n", mixmode == MM_STEREO ? "MM_STEREO" : "MM_MONO");
  158. lame_init_params(lame);
  159. return lame;
  160. }
  161. class LameTone {
  162. unsigned char* _data;
  163. int _bytes;
  164. public:
  165. LameTone(mix_modes mixmode, int msec, unsigned int hz = 0) : _data(NULL), _bytes(0) {
  166. _data = (unsigned char*)XCALLOC(1, LAMEBUF_SIZE);
  167. int samples = (msec * WAVE_RATE) / 1000;
  168. float* buf = (float*)XCALLOC(samples, sizeof(float));
  169. debug_print("LameTone with mixmode=%s msec=%d hz=%u\n", mixmode == MM_STEREO ? "MM_STEREO" : "MM_MONO", msec, hz);
  170. if (hz > 0) {
  171. const float period = 1.0 / (float)hz;
  172. const float sample_time = 1.0 / (float)WAVE_RATE;
  173. float t = 0;
  174. for (int i = 0; i < samples; ++i, t += sample_time) {
  175. buf[i] = 0.9 * sinf(t * 2.0 * M_PI / period);
  176. }
  177. } else
  178. memset(buf, 0, samples * sizeof(float));
  179. lame_t lame = airlame_init(mixmode, 0, 0);
  180. if (lame) {
  181. _bytes = lame_encode_buffer_ieee_float(lame, buf, (mixmode == MM_STEREO ? buf : NULL), samples, _data, LAMEBUF_SIZE);
  182. if (_bytes > 0) {
  183. int flush_ofs = _bytes;
  184. if (flush_ofs & 0x1f)
  185. flush_ofs += 0x20 - (flush_ofs & 0x1f);
  186. if (flush_ofs < LAMEBUF_SIZE) {
  187. int flush_bytes = lame_encode_flush(lame, _data + flush_ofs, LAMEBUF_SIZE - flush_ofs);
  188. if (flush_bytes > 0) {
  189. memmove(_data + _bytes, _data + flush_ofs, flush_bytes);
  190. _bytes += flush_bytes;
  191. }
  192. }
  193. } else
  194. log(LOG_WARNING, "lame_encode_buffer_ieee_float: %d\n", _bytes);
  195. lame_close(lame);
  196. }
  197. free(buf);
  198. }
  199. ~LameTone() {
  200. if (_data)
  201. free(_data);
  202. }
  203. int write(FILE* f) {
  204. if (!_data || _bytes <= 0)
  205. return 1;
  206. if (fwrite(_data, 1, _bytes, f) != (unsigned int)_bytes) {
  207. log(LOG_WARNING, "LameTone: failed to write %d bytes\n", _bytes);
  208. return -1;
  209. }
  210. return 0;
  211. }
  212. };
  213. int rename_if_exists(char const* oldpath, char const* newpath) {
  214. int ret = rename(oldpath, newpath);
  215. if (ret < 0) {
  216. if (errno == ENOENT) {
  217. return 0;
  218. } else {
  219. log(LOG_ERR, "Could not rename %s to %s: %s\n", oldpath, newpath, strerror(errno));
  220. }
  221. }
  222. return ret;
  223. }
  224. /*
  225. * Open output file (mp3 or raw IQ) for append or initial write.
  226. * If appending to an audio file, insert discontinuity indictor tones
  227. * as well as the appropriate amount of silence when in continuous mode.
  228. */
  229. static int open_file(file_data* fdata, mix_modes mixmode, int is_audio) {
  230. int rename_result = rename_if_exists(fdata->file_path.c_str(), fdata->file_path_tmp.c_str());
  231. fdata->f = fopen(fdata->file_path_tmp.c_str(), fdata->append ? "a+" : "w");
  232. if (fdata->f == NULL) {
  233. return -1;
  234. }
  235. struct stat st = {};
  236. if (!fdata->append || fstat(fileno(fdata->f), &st) != 0 || st.st_size == 0) {
  237. if (!fdata->split_on_transmission) {
  238. log(LOG_INFO, "Writing to %s\n", fdata->file_path.c_str());
  239. } else {
  240. debug_print("Writing to %s\n", fdata->file_path_tmp.c_str());
  241. }
  242. return 0;
  243. }
  244. if (rename_result < 0) {
  245. log(LOG_INFO, "Writing to %s\n", fdata->file_path.c_str());
  246. debug_print("Writing to %s\n", fdata->file_path_tmp.c_str());
  247. } else {
  248. log(LOG_INFO, "Appending from pos %llu to %s\n", (unsigned long long)st.st_size, fdata->file_path.c_str());
  249. debug_print("Appending from pos %llu to %s\n", (unsigned long long)st.st_size, fdata->file_path_tmp.c_str());
  250. }
  251. if (is_audio) {
  252. // fill missing space with marker tones
  253. LameTone lt_a(mixmode, 120, 2222);
  254. LameTone lt_b(mixmode, 120, 1111);
  255. LameTone lt_c(mixmode, 120, 555);
  256. int r = lt_a.write(fdata->f);
  257. if (r == 0)
  258. r = lt_b.write(fdata->f);
  259. if (r == 0)
  260. r = lt_c.write(fdata->f);
  261. // fill in time delta with silence if continuous output mode
  262. if (fdata->continuous) {
  263. time_t now = time(NULL);
  264. if (now > st.st_mtime) {
  265. time_t delta = now - st.st_mtime;
  266. if (delta > 3600) {
  267. log(LOG_WARNING, "Too big time difference: %llu sec, limiting to one hour\n", (unsigned long long)delta);
  268. delta = 3600;
  269. }
  270. LameTone lt_silence(mixmode, 1000);
  271. for (; (r == 0 && delta > 1); --delta)
  272. r = lt_silence.write(fdata->f);
  273. }
  274. }
  275. if (r == 0)
  276. r = lt_c.write(fdata->f);
  277. if (r == 0)
  278. r = lt_b.write(fdata->f);
  279. if (r == 0)
  280. r = lt_a.write(fdata->f);
  281. if (r < 0)
  282. fseek(fdata->f, st.st_size, SEEK_SET);
  283. }
  284. return 0;
  285. }
  286. static void close_file(channel_t* channel, file_data* fdata) {
  287. if (!fdata) {
  288. return;
  289. }
  290. if (fdata->type == O_FILE && fdata->f && channel->lame) {
  291. int encoded = lame_encode_flush_nogap(channel->lame, channel->lamebuf, LAMEBUF_SIZE);
  292. debug_print("closing file %s flushed %d\n", fdata->file_path.c_str(), encoded);
  293. if (encoded > 0) {
  294. size_t written = fwrite((void*)channel->lamebuf, 1, (size_t)encoded, fdata->f);
  295. if (written == 0 || written < (size_t)encoded)
  296. log(LOG_WARNING, "Problem writing %s (%s)\n", fdata->file_path.c_str(), strerror(errno));
  297. }
  298. }
  299. if (fdata->f) {
  300. fclose(fdata->f);
  301. fdata->f = NULL;
  302. rename_if_exists(fdata->file_path_tmp.c_str(), fdata->file_path.c_str());
  303. }
  304. fdata->file_path.clear();
  305. fdata->file_path_tmp.clear();
  306. }
  307. /*
  308. * Close current output file based on certain conditions:
  309. * If "split_on_transmission" mode is true check:
  310. * If current duration too long, or we've been idle too long
  311. * else (append or continuous) check:
  312. * if hour is different.
  313. */
  314. static void close_if_necessary(channel_t* channel, file_data* fdata) {
  315. static const double MIN_TRANSMISSION_TIME_SEC = 1.0;
  316. static const double MAX_TRANSMISSION_TIME_SEC = 60.0 * 60.0;
  317. static const double MAX_TRANSMISSION_IDLE_SEC = 0.5;
  318. if (!fdata || !fdata->f) {
  319. return;
  320. }
  321. timeval current_time;
  322. gettimeofday(&current_time, NULL);
  323. if (fdata->split_on_transmission) {
  324. double duration_sec = delta_sec(&fdata->open_time, &current_time);
  325. double idle_sec = delta_sec(&fdata->last_write_time, &current_time);
  326. if (duration_sec > MAX_TRANSMISSION_TIME_SEC || (duration_sec > MIN_TRANSMISSION_TIME_SEC && idle_sec > MAX_TRANSMISSION_IDLE_SEC)) {
  327. debug_print("closing file %s, duration %f sec, idle %f sec\n", fdata->file_path.c_str(), duration_sec, idle_sec);
  328. close_file(channel, fdata);
  329. }
  330. return;
  331. }
  332. // Check if the hour boundary was just crossed. NOTE: Actual hour number doesn't matter but still
  333. // need to use localtime if enabled (some timezones have partial hour offsets)
  334. int start_hour;
  335. int current_hour;
  336. if (use_localtime) {
  337. start_hour = localtime(&(fdata->open_time.tv_sec))->tm_hour;
  338. current_hour = localtime(&current_time.tv_sec)->tm_hour;
  339. } else {
  340. start_hour = gmtime(&(fdata->open_time.tv_sec))->tm_hour;
  341. current_hour = gmtime(&current_time.tv_sec)->tm_hour;
  342. }
  343. if (start_hour != current_hour) {
  344. debug_print("closing file %s after crossing hour boundary\n", fdata->file_path.c_str());
  345. close_file(channel, fdata);
  346. }
  347. }
  348. /*
  349. * For a particular channel file output, check if there is a file currently open.
  350. * If so, that file may need to be flushed and closed.
  351. *
  352. * If the existing open file is good for continued use, return true.
  353. * Otherwise, create a file name based on the current timestamp and
  354. * open that new file. If that file open succeeded, return true.
  355. */
  356. static bool output_file_ready(channel_t* channel, file_data* fdata, mix_modes mixmode, int is_audio) {
  357. if (!fdata) {
  358. return false;
  359. }
  360. close_if_necessary(channel, fdata);
  361. if (fdata->f) { // still open
  362. return true;
  363. }
  364. timeval current_time;
  365. gettimeofday(&current_time, NULL);
  366. struct tm* time;
  367. if (use_localtime) {
  368. time = localtime(&current_time.tv_sec);
  369. } else {
  370. time = gmtime(&current_time.tv_sec);
  371. }
  372. char timestamp[32];
  373. if (strftime(timestamp, sizeof(timestamp), fdata->split_on_transmission ? "_%Y%m%d_%H%M%S" : "_%Y%m%d_%H", time) == 0) {
  374. log(LOG_NOTICE, "strftime returned 0\n");
  375. return false;
  376. }
  377. std::string output_dir;
  378. if (fdata->dated_subdirectories) {
  379. output_dir = make_dated_subdirs(fdata->basedir, time);
  380. if (output_dir.empty()) {
  381. log(LOG_ERR, "Failed to create dated subdirectory\n");
  382. return false;
  383. }
  384. } else {
  385. output_dir = fdata->basedir;
  386. make_dir(output_dir);
  387. }
  388. // use a string stream to build the output filepath
  389. std::stringstream ss;
  390. ss << output_dir << '/' << fdata->basename << timestamp;
  391. if (fdata->include_freq) {
  392. ss << '_' << channel->freqlist[channel->freq_idx].frequency;
  393. }
  394. ss << fdata->suffix;
  395. fdata->file_path = ss.str();
  396. fdata->file_path_tmp = fdata->file_path + ".tmp";
  397. fdata->open_time = fdata->last_write_time = current_time;
  398. if (open_file(fdata, mixmode, is_audio) < 0) {
  399. log(LOG_WARNING, "Cannot open output file %s (%s)\n", fdata->file_path_tmp.c_str(), strerror(errno));
  400. return false;
  401. }
  402. return true;
  403. }
  404. // Create all the output for a particular channel.
  405. void process_outputs(channel_t* channel, int cur_scan_freq) {
  406. int mp3_bytes = 0;
  407. if (channel->need_mp3) {
  408. // debug_bulk_print("channel->mode=%s\n", channel->mode == MM_STEREO ? "MM_STEREO" : "MM_MONO");
  409. mp3_bytes = lame_encode_buffer_ieee_float(channel->lame, channel->waveout, (channel->mode == MM_STEREO ? channel->waveout_r : NULL), WAVE_BATCH, channel->lamebuf, LAMEBUF_SIZE);
  410. if (mp3_bytes < 0)
  411. log(LOG_WARNING, "lame_encode_buffer_ieee_float: %d\n", mp3_bytes);
  412. }
  413. for (int k = 0; k < channel->output_count; k++) {
  414. if (channel->outputs[k].enabled == false)
  415. continue;
  416. if (channel->outputs[k].type == O_ICECAST) {
  417. icecast_data* icecast = (icecast_data*)(channel->outputs[k].data);
  418. if (icecast->shout == NULL || mp3_bytes <= 0)
  419. continue;
  420. int ret = shout_send(icecast->shout, channel->lamebuf, mp3_bytes);
  421. if (ret != SHOUTERR_SUCCESS || shout_queuelen(icecast->shout) > MAX_SHOUT_QUEUELEN) {
  422. if (shout_queuelen(icecast->shout) > MAX_SHOUT_QUEUELEN)
  423. log(LOG_WARNING, "Exceeded max backlog for %s:%d/%s, disconnecting\n", icecast->hostname, icecast->port, icecast->mountpoint);
  424. // reset connection
  425. log(LOG_WARNING, "Lost connection to %s:%d/%s\n", icecast->hostname, icecast->port, icecast->mountpoint);
  426. shout_close(icecast->shout);
  427. shout_free(icecast->shout);
  428. icecast->shout = NULL;
  429. } else if (icecast->send_scan_freq_tags && cur_scan_freq >= 0) {
  430. shout_metadata_t* meta = shout_metadata_new();
  431. char description[32];
  432. if (channel->freqlist[channel->freq_idx].label != NULL) {
  433. if (shout_metadata_add(meta, "song", channel->freqlist[channel->freq_idx].label) != SHOUTERR_SUCCESS) {
  434. log(LOG_WARNING, "Failed to add shout metadata\n");
  435. }
  436. } else {
  437. snprintf(description, sizeof(description), "%.3f MHz", channel->freqlist[channel->freq_idx].frequency / 1000000.0);
  438. if (shout_metadata_add(meta, "song", description) != SHOUTERR_SUCCESS) {
  439. log(LOG_WARNING, "Failed to add shout metadata\n");
  440. }
  441. }
  442. if (SHOUT_SET_METADATA(icecast->shout, meta) != SHOUTERR_SUCCESS) {
  443. log(LOG_WARNING, "Failed to add shout metadata\n");
  444. }
  445. shout_metadata_free(meta);
  446. }
  447. } else if (channel->outputs[k].type == O_FILE || channel->outputs[k].type == O_RAWFILE) {
  448. file_data* fdata = (file_data*)(channel->outputs[k].data);
  449. if (fdata->continuous == false && channel->axcindicate == NO_SIGNAL && channel->outputs[k].active == false) {
  450. close_if_necessary(channel, fdata);
  451. continue;
  452. }
  453. if (channel->outputs[k].type == O_FILE && mp3_bytes <= 0)
  454. continue;
  455. if (!output_file_ready(channel, fdata, channel->mode, (channel->outputs[k].type == O_RAWFILE ? 0 : 1))) {
  456. log(LOG_WARNING, "Output disabled\n");
  457. channel->outputs[k].enabled = false;
  458. continue;
  459. };
  460. size_t buflen = 0, written = 0;
  461. if (channel->outputs[k].type == O_FILE) {
  462. buflen = (size_t)mp3_bytes;
  463. written = fwrite(channel->lamebuf, 1, buflen, fdata->f);
  464. } else if (channel->outputs[k].type == O_RAWFILE) {
  465. buflen = 2 * sizeof(float) * WAVE_BATCH;
  466. written = fwrite(channel->iq_out, 1, buflen, fdata->f);
  467. }
  468. if (written < buflen) {
  469. if (ferror(fdata->f))
  470. log(LOG_WARNING, "Cannot write to %s (%s), output disabled\n", fdata->file_path.c_str(), strerror(errno));
  471. else
  472. log(LOG_WARNING, "Short write on %s, output disabled\n", fdata->file_path.c_str());
  473. close_file(channel, fdata);
  474. channel->outputs[k].enabled = false;
  475. }
  476. channel->outputs[k].active = (channel->axcindicate != NO_SIGNAL);
  477. gettimeofday(&fdata->last_write_time, NULL);
  478. } else if (channel->outputs[k].type == O_MIXER) {
  479. mixer_data* mdata = (mixer_data*)(channel->outputs[k].data);
  480. mixer_put_samples(mdata->mixer, mdata->input, channel->waveout, channel->axcindicate != NO_SIGNAL, WAVE_BATCH);
  481. } else if (channel->outputs[k].type == O_UDP_STREAM) {
  482. udp_stream_data* sdata = (udp_stream_data*)channel->outputs[k].data;
  483. if (sdata->continuous == false && channel->axcindicate == NO_SIGNAL) {
  484. continue;
  485. }
  486. if (channel->mode == MM_MONO) {
  487. udp_stream_write(sdata, channel->waveout, (size_t)WAVE_BATCH * sizeof(float));
  488. } else {
  489. udp_stream_write(sdata, channel->waveout, channel->waveout_r, (size_t)WAVE_BATCH * sizeof(float));
  490. }
  491. #ifdef WITH_PULSEAUDIO
  492. } else if (channel->outputs[k].type == O_PULSE) {
  493. pulse_data* pdata = (pulse_data*)(channel->outputs[k].data);
  494. if (pdata->continuous == false && channel->axcindicate == NO_SIGNAL)
  495. continue;
  496. pulse_write_stream(pdata, channel->mode, channel->waveout, channel->waveout_r, (size_t)WAVE_BATCH * sizeof(float));
  497. #endif /* WITH_PULSEAUDIO */
  498. }
  499. }
  500. }
  501. void disable_channel_outputs(channel_t* channel) {
  502. for (int k = 0; k < channel->output_count; k++) {
  503. output_t* output = channel->outputs + k;
  504. output->enabled = false;
  505. if (output->type == O_ICECAST) {
  506. icecast_data* icecast = (icecast_data*)(channel->outputs[k].data);
  507. if (icecast->shout == NULL)
  508. continue;
  509. log(LOG_WARNING, "Closing connection to %s:%d/%s\n", icecast->hostname, icecast->port, icecast->mountpoint);
  510. shout_close(icecast->shout);
  511. shout_free(icecast->shout);
  512. icecast->shout = NULL;
  513. } else if (output->type == O_FILE || output->type == O_RAWFILE) {
  514. file_data* fdata = (file_data*)(channel->outputs[k].data);
  515. close_file(channel, fdata);
  516. } else if (output->type == O_MIXER) {
  517. mixer_data* mdata = (mixer_data*)(output->data);
  518. mixer_disable_input(mdata->mixer, mdata->input);
  519. } else if (output->type == O_UDP_STREAM) {
  520. udp_stream_data* sdata = (udp_stream_data*)output->data;
  521. udp_stream_shutdown(sdata);
  522. #ifdef WITH_PULSEAUDIO
  523. } else if (output->type == O_PULSE) {
  524. pulse_data* pdata = (pulse_data*)(output->data);
  525. pulse_shutdown(pdata);
  526. #endif /* WITH_PULSEAUDIO */
  527. }
  528. }
  529. }
  530. void disable_device_outputs(device_t* dev) {
  531. log(LOG_INFO, "Disabling device outputs\n");
  532. for (int j = 0; j < dev->channel_count; j++) {
  533. disable_channel_outputs(dev->channels + j);
  534. }
  535. }
  536. static void print_channel_metric(FILE* f, char const* name, float freq, char* label) {
  537. fprintf(f, "%s{freq=\"%.3f\"", name, freq / 1000000.0);
  538. if (label != NULL) {
  539. fprintf(f, ",label=\"%s\"", label);
  540. }
  541. fprintf(f, "}");
  542. }
  543. static void output_channel_noise_levels(FILE* f) {
  544. fprintf(f,
  545. "# HELP channel_noise_level Raw squelch noise_level.\n"
  546. "# TYPE channel_noise_level gauge\n");
  547. for (int i = 0; i < device_count; i++) {
  548. device_t* dev = devices + i;
  549. for (int j = 0; j < dev->channel_count; j++) {
  550. channel_t* channel = devices[i].channels + j;
  551. for (int k = 0; k < channel->freq_count; k++) {
  552. print_channel_metric(f, "channel_noise_level", channel->freqlist[k].frequency, channel->freqlist[k].label);
  553. fprintf(f, "\t%.3f\n", channel->freqlist[k].squelch.noise_level());
  554. }
  555. }
  556. }
  557. fprintf(f, "\n");
  558. }
  559. static void output_channel_dbfs_noise_levels(FILE* f) {
  560. fprintf(f,
  561. "# HELP channel_dbfs_noise_level Squelch noise_level as dBFS.\n"
  562. "# TYPE channel_dbfs_noise_level gauge\n");
  563. for (int i = 0; i < device_count; i++) {
  564. device_t* dev = devices + i;
  565. for (int j = 0; j < dev->channel_count; j++) {
  566. channel_t* channel = devices[i].channels + j;
  567. for (int k = 0; k < channel->freq_count; k++) {
  568. print_channel_metric(f, "channel_dbfs_noise_level", channel->freqlist[k].frequency, channel->freqlist[k].label);
  569. fprintf(f, "\t%.3f\n", level_to_dBFS(channel->freqlist[k].squelch.noise_level()));
  570. }
  571. }
  572. }
  573. fprintf(f, "\n");
  574. }
  575. static void output_channel_signal_levels(FILE* f) {
  576. fprintf(f,
  577. "# HELP channel_signal_level Raw squelch signal_level.\n"
  578. "# TYPE channel_signal_level gauge\n");
  579. for (int i = 0; i < device_count; i++) {
  580. device_t* dev = devices + i;
  581. for (int j = 0; j < dev->channel_count; j++) {
  582. channel_t* channel = devices[i].channels + j;
  583. for (int k = 0; k < channel->freq_count; k++) {
  584. print_channel_metric(f, "channel_signal_level", channel->freqlist[k].frequency, channel->freqlist[k].label);
  585. fprintf(f, "\t%.3f\n", channel->freqlist[k].squelch.signal_level());
  586. }
  587. }
  588. }
  589. fprintf(f, "\n");
  590. }
  591. static void output_channel_dbfs_signal_levels(FILE* f) {
  592. fprintf(f,
  593. "# HELP channel_dbfs_signal_level Squelch signal_level as dBFS.\n"
  594. "# TYPE channel_dbfs_signal_level gauge\n");
  595. for (int i = 0; i < device_count; i++) {
  596. device_t* dev = devices + i;
  597. for (int j = 0; j < dev->channel_count; j++) {
  598. channel_t* channel = devices[i].channels + j;
  599. for (int k = 0; k < channel->freq_count; k++) {
  600. print_channel_metric(f, "channel_dbfs_signal_level", channel->freqlist[k].frequency, channel->freqlist[k].label);
  601. fprintf(f, "\t%.3f\n", level_to_dBFS(channel->freqlist[k].squelch.signal_level()));
  602. }
  603. }
  604. }
  605. fprintf(f, "\n");
  606. }
  607. static void output_channel_squelch_levels(FILE* f) {
  608. fprintf(f,
  609. "# HELP channel_squelch_level Squelch squelch_level.\n"
  610. "# TYPE channel_squelch_level gauge\n");
  611. for (int i = 0; i < device_count; i++) {
  612. device_t* dev = devices + i;
  613. for (int j = 0; j < dev->channel_count; j++) {
  614. channel_t* channel = devices[i].channels + j;
  615. for (int k = 0; k < channel->freq_count; k++) {
  616. print_channel_metric(f, "channel_squelch_level", channel->freqlist[k].frequency, channel->freqlist[k].label);
  617. fprintf(f, "\t%.3f\n", channel->freqlist[k].squelch.squelch_level());
  618. }
  619. }
  620. }
  621. fprintf(f, "\n");
  622. }
  623. static void output_channel_squelch_counter(FILE* f) {
  624. fprintf(f,
  625. "# HELP channel_squelch_counter Squelch open_count.\n"
  626. "# TYPE channel_squelch_counter counter\n");
  627. for (int i = 0; i < device_count; i++) {
  628. device_t* dev = devices + i;
  629. for (int j = 0; j < dev->channel_count; j++) {
  630. channel_t* channel = devices[i].channels + j;
  631. for (int k = 0; k < channel->freq_count; k++) {
  632. print_channel_metric(f, "channel_squelch_counter", channel->freqlist[k].frequency, channel->freqlist[k].label);
  633. fprintf(f, "\t%zu\n", channel->freqlist[k].squelch.open_count());
  634. }
  635. }
  636. }
  637. fprintf(f, "\n");
  638. }
  639. static void output_channel_flappy_counter(FILE* f) {
  640. fprintf(f,
  641. "# HELP channel_flappy_counter Squelch flappy_count.\n"
  642. "# TYPE channel_flappy_counter counter\n");
  643. for (int i = 0; i < device_count; i++) {
  644. device_t* dev = devices + i;
  645. for (int j = 0; j < dev->channel_count; j++) {
  646. channel_t* channel = devices[i].channels + j;
  647. for (int k = 0; k < channel->freq_count; k++) {
  648. print_channel_metric(f, "channel_flappy_counter", channel->freqlist[k].frequency, channel->freqlist[k].label);
  649. fprintf(f, "\t%zu\n", channel->freqlist[k].squelch.flappy_count());
  650. }
  651. }
  652. }
  653. fprintf(f, "\n");
  654. }
  655. static void output_channel_ctcss_counter(FILE* f) {
  656. fprintf(f,
  657. "# HELP channel_ctcss_counter count of windows with CTCSS detected.\n"
  658. "# TYPE channel_ctcss_counter counter\n");
  659. for (int i = 0; i < device_count; i++) {
  660. device_t* dev = devices + i;
  661. for (int j = 0; j < dev->channel_count; j++) {
  662. channel_t* channel = devices[i].channels + j;
  663. for (int k = 0; k < channel->freq_count; k++) {
  664. print_channel_metric(f, "channel_ctcss_counter", channel->freqlist[k].frequency, channel->freqlist[k].label);
  665. fprintf(f, "\t%zu\n", channel->freqlist[k].squelch.ctcss_count());
  666. }
  667. }
  668. }
  669. fprintf(f, "\n");
  670. }
  671. static void output_channel_no_ctcss_counter(FILE* f) {
  672. fprintf(f,
  673. "# HELP channel_no_ctcss_counter count of windows without CTCSS detected.\n"
  674. "# TYPE channel_no_ctcss_counter counter\n");
  675. for (int i = 0; i < device_count; i++) {
  676. device_t* dev = devices + i;
  677. for (int j = 0; j < dev->channel_count; j++) {
  678. channel_t* channel = devices[i].channels + j;
  679. for (int k = 0; k < channel->freq_count; k++) {
  680. print_channel_metric(f, "channel_no_ctcss_counter", channel->freqlist[k].frequency, channel->freqlist[k].label);
  681. fprintf(f, "\t%zu\n", channel->freqlist[k].squelch.no_ctcss_count());
  682. }
  683. }
  684. }
  685. fprintf(f, "\n");
  686. }
  687. static void output_channel_activity_counters(FILE* f) {
  688. fprintf(f,
  689. "# HELP channel_activity_counter Loops of output_thread with frequency active.\n"
  690. "# TYPE channel_activity_counter counter\n");
  691. for (int i = 0; i < device_count; i++) {
  692. device_t* dev = devices + i;
  693. for (int j = 0; j < dev->channel_count; j++) {
  694. channel_t* channel = devices[i].channels + j;
  695. for (int k = 0; k < channel->freq_count; k++) {
  696. print_channel_metric(f, "channel_activity_counter", channel->freqlist[k].frequency, channel->freqlist[k].label);
  697. fprintf(f, "\t%zu\n", channel->freqlist[k].active_counter);
  698. }
  699. }
  700. }
  701. fprintf(f, "\n");
  702. }
  703. static void output_device_buffer_overflows(FILE* f) {
  704. fprintf(f,
  705. "# HELP buffer_overflow_count Number of times a device's buffer has overflowed.\n"
  706. "# TYPE buffer_overflow_count counter\n");
  707. for (int i = 0; i < device_count; i++) {
  708. device_t* dev = devices + i;
  709. fprintf(f, "buffer_overflow_count{device=\"%d\"}\t%zu\n", i, dev->input->overflow_count);
  710. }
  711. fprintf(f, "\n");
  712. }
  713. static void output_output_overruns(FILE* f) {
  714. fprintf(f,
  715. "# HELP output_overrun_count Number of times a device or mixer output has overrun.\n"
  716. "# TYPE output_overrun_count counter\n");
  717. for (int i = 0; i < device_count; i++) {
  718. device_t* dev = devices + i;
  719. fprintf(f, "output_overrun_count{device=\"%d\"}\t%zu\n", i, dev->output_overrun_count);
  720. }
  721. for (int i = 0; i < mixer_count; i++) {
  722. mixer_t* mixer = mixers + i;
  723. fprintf(f, "output_overrun_count{mixer=\"%d\"}\t%zu\n", i, mixer->output_overrun_count);
  724. }
  725. fprintf(f, "\n");
  726. }
  727. static void output_input_overruns(FILE* f) {
  728. if (mixer_count == 0) {
  729. return;
  730. }
  731. fprintf(f,
  732. "# HELP input_overrun_count Number of times mixer input has overrun.\n"
  733. "# TYPE input_overrun_count counter\n");
  734. for (int i = 0; i < mixer_count; i++) {
  735. mixer_t* mixer = mixers + i;
  736. for (int j = 0; j < mixer->input_count; j++) {
  737. mixinput_t* input = mixer->inputs + j;
  738. fprintf(f, "input_overrun_count{mixer=\"%d\",input=\"%d\"}\t%zu\n", i, j, input->input_overrun_count);
  739. }
  740. }
  741. fprintf(f, "\n");
  742. }
  743. void write_stats_file(timeval* last_stats_write) {
  744. if (!stats_filepath) {
  745. return;
  746. }
  747. timeval current_time;
  748. gettimeofday(&current_time, NULL);
  749. static const double STATS_FILE_TIMING = 15.0;
  750. if (!do_exit && delta_sec(last_stats_write, &current_time) < STATS_FILE_TIMING) {
  751. return;
  752. }
  753. *last_stats_write = current_time;
  754. FILE* file = fopen(stats_filepath, "w");
  755. if (!file) {
  756. log(LOG_WARNING, "Cannot open output file %s (%s)\n", stats_filepath, strerror(errno));
  757. return;
  758. }
  759. output_channel_activity_counters(file);
  760. output_channel_noise_levels(file);
  761. output_channel_dbfs_noise_levels(file);
  762. output_channel_signal_levels(file);
  763. output_channel_dbfs_signal_levels(file);
  764. output_channel_squelch_counter(file);
  765. output_channel_squelch_levels(file);
  766. output_channel_flappy_counter(file);
  767. output_channel_ctcss_counter(file);
  768. output_channel_no_ctcss_counter(file);
  769. output_device_buffer_overflows(file);
  770. output_output_overruns(file);
  771. output_input_overruns(file);
  772. fclose(file);
  773. }
  774. void* output_thread(void* param) {
  775. assert(param != NULL);
  776. output_params_t* output_param = (output_params_t*)param;
  777. struct freq_tag tag;
  778. struct timeval tv;
  779. int new_freq = -1;
  780. timeval last_stats_write = {0, 0};
  781. debug_print("Starting output thread, devices %d:%d, mixers %d:%d, signal %p\n", output_param->device_start, output_param->device_end, output_param->mixer_start, output_param->mixer_end,
  782. output_param->mp3_signal);
  783. #ifdef DEBUG
  784. timeval ts, te;
  785. gettimeofday(&ts, NULL);
  786. #endif /* DEBUG */
  787. while (!do_exit) {
  788. output_param->mp3_signal->wait();
  789. for (int i = output_param->mixer_start; i < output_param->mixer_end; i++) {
  790. if (mixers[i].enabled == false)
  791. continue;
  792. channel_t* channel = &mixers[i].channel;
  793. if (channel->state == CH_READY) {
  794. process_outputs(channel, -1);
  795. channel->state = CH_DIRTY;
  796. }
  797. }
  798. #ifdef DEBUG
  799. gettimeofday(&te, NULL);
  800. debug_bulk_print("mixeroutput: %lu.%lu %lu\n", te.tv_sec, (unsigned long)te.tv_usec, (te.tv_sec - ts.tv_sec) * 1000000UL + te.tv_usec - ts.tv_usec);
  801. ts.tv_sec = te.tv_sec;
  802. ts.tv_usec = te.tv_usec;
  803. #endif /* DEBUG */
  804. for (int i = output_param->device_start; i < output_param->device_end; i++) {
  805. device_t* dev = devices + i;
  806. if (dev->input->state == INPUT_RUNNING && dev->waveavail) {
  807. if (dev->mode == R_SCAN) {
  808. tag_queue_get(dev, &tag);
  809. if (tag.freq >= 0) {
  810. tag.tv.tv_sec += shout_metadata_delay;
  811. gettimeofday(&tv, NULL);
  812. if (tag.tv.tv_sec < tv.tv_sec || (tag.tv.tv_sec == tv.tv_sec && tag.tv.tv_usec <= tv.tv_usec)) {
  813. new_freq = tag.freq;
  814. tag_queue_advance(dev);
  815. }
  816. }
  817. }
  818. for (int j = 0; j < dev->channel_count; j++) {
  819. channel_t* channel = devices[i].channels + j;
  820. process_outputs(channel, new_freq);
  821. memcpy(channel->waveout, channel->waveout + WAVE_BATCH, AGC_EXTRA * 4);
  822. }
  823. dev->waveavail = 0;
  824. }
  825. // make sure we don't carry new_freq value to the next receiver which might be working
  826. // in multichannel mode
  827. new_freq = -1;
  828. }
  829. if (output_param->device_start == 0) {
  830. write_stats_file(&last_stats_write);
  831. }
  832. }
  833. return 0;
  834. }
  835. // reconnect as required
  836. void* output_check_thread(void*) {
  837. while (!do_exit) {
  838. SLEEP(10000);
  839. for (int i = 0; i < device_count; i++) {
  840. device_t* dev = devices + i;
  841. for (int j = 0; j < dev->channel_count; j++) {
  842. for (int k = 0; k < dev->channels[j].output_count; k++) {
  843. if (dev->channels[j].outputs[k].type == O_ICECAST) {
  844. icecast_data* icecast = (icecast_data*)(dev->channels[j].outputs[k].data);
  845. if (dev->input->state == INPUT_FAILED) {
  846. if (icecast->shout) {
  847. log(LOG_WARNING, "Device #%d failed, disconnecting stream %s:%d/%s\n", i, icecast->hostname, icecast->port, icecast->mountpoint);
  848. shout_close(icecast->shout);
  849. shout_free(icecast->shout);
  850. icecast->shout = NULL;
  851. }
  852. } else if (dev->input->state == INPUT_RUNNING) {
  853. if (icecast->shout == NULL) {
  854. log(LOG_NOTICE, "Trying to reconnect to %s:%d/%s...\n", icecast->hostname, icecast->port, icecast->mountpoint);
  855. shout_setup(icecast, dev->channels[j].mode);
  856. }
  857. }
  858. } else if (dev->channels[j].outputs[k].type == O_UDP_STREAM) {
  859. udp_stream_data* sdata = (udp_stream_data*)dev->channels[j].outputs[k].data;
  860. if (dev->input->state == INPUT_FAILED) {
  861. udp_stream_shutdown(sdata);
  862. }
  863. #ifdef WITH_PULSEAUDIO
  864. } else if (dev->channels[j].outputs[k].type == O_PULSE) {
  865. pulse_data* pdata = (pulse_data*)(dev->channels[j].outputs[k].data);
  866. if (dev->input->state == INPUT_FAILED) {
  867. if (pdata->context) {
  868. pulse_shutdown(pdata);
  869. }
  870. } else if (dev->input->state == INPUT_RUNNING) {
  871. if (pdata->context == NULL) {
  872. pulse_setup(pdata, dev->channels[j].mode);
  873. }
  874. }
  875. #endif /* WITH_PULSEAUDIO */
  876. }
  877. }
  878. }
  879. }
  880. for (int i = 0; i < mixer_count; i++) {
  881. if (mixers[i].enabled == false)
  882. continue;
  883. for (int k = 0; k < mixers[i].channel.output_count; k++) {
  884. if (mixers[i].channel.outputs[k].enabled == false)
  885. continue;
  886. if (mixers[i].channel.outputs[k].type == O_ICECAST) {
  887. icecast_data* icecast = (icecast_data*)(mixers[i].channel.outputs[k].data);
  888. if (icecast->shout == NULL) {
  889. log(LOG_NOTICE, "Trying to reconnect to %s:%d/%s...\n", icecast->hostname, icecast->port, icecast->mountpoint);
  890. shout_setup(icecast, mixers[i].channel.mode);
  891. }
  892. #ifdef WITH_PULSEAUDIO
  893. } else if (mixers[i].channel.outputs[k].type == O_PULSE) {
  894. pulse_data* pdata = (pulse_data*)(mixers[i].channel.outputs[k].data);
  895. if (pdata->context == NULL) {
  896. pulse_setup(pdata, mixers[i].channel.mode);
  897. }
  898. #endif /* WITH_PULSEAUDIO */
  899. }
  900. }
  901. }
  902. }
  903. return 0;
  904. }