messaging.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. /**
  2. *
  3. */
  4. #include "config.h"
  5. #include "esp_app_trace.h"
  6. #include "esp_attr.h"
  7. #include "stdbool.h"
  8. #include <errno.h>
  9. #include <esp_log.h>
  10. #include <lwip/def.h>
  11. #include <lwip/sockets.h>
  12. #include <stdlib.h> // Required for libtelnet.h
  13. #include <string.h>
  14. // #include "nvs_utilities.h"
  15. #include "messaging.h"
  16. #include "platform_esp32.h"
  17. #include "tools.h"
  18. /************************************
  19. * Globals
  20. */
  21. const static char tag[] = "messaging";
  22. typedef struct {
  23. struct messaging_list_t* next;
  24. char* subscriber_name;
  25. size_t max_count;
  26. RingbufHandle_t buf_handle;
  27. } messaging_list_t;
  28. static messaging_list_t top;
  29. #define MSG_LENGTH_AVG 1024
  30. messaging_list_t* get_struct_ptr(messaging_handle_t handle) { return (messaging_list_t*)handle; }
  31. messaging_handle_t get_handle_ptr(messaging_list_t* handle) { return (messaging_handle_t)handle; }
  32. RingbufHandle_t messaging_create_ring_buffer(uint8_t max_count) {
  33. RingbufHandle_t buf_handle = NULL;
  34. StaticRingbuffer_t* buffer_struct = malloc_init_external(sizeof(StaticRingbuffer_t));
  35. if (buffer_struct != NULL) {
  36. size_t buf_size =
  37. (size_t)(sizeof(single_message_t) + 8 + MSG_LENGTH_AVG) *
  38. (size_t)(max_count > 0 ? max_count
  39. : 5); // no-split buffer requires an additional 8 bytes
  40. buf_size = buf_size - (buf_size % 4);
  41. uint8_t* buffer_storage =
  42. (uint8_t*)heap_caps_malloc(buf_size, MALLOC_CAP_SPIRAM | MALLOC_CAP_32BIT);
  43. if (buffer_storage == NULL) {
  44. ESP_LOGE(tag, "buff alloc failed");
  45. } else {
  46. buf_handle = xRingbufferCreateStatic(
  47. buf_size, RINGBUF_TYPE_NOSPLIT, buffer_storage, buffer_struct);
  48. }
  49. } else {
  50. ESP_LOGE(tag, "ringbuf alloc failed");
  51. }
  52. return buf_handle;
  53. }
  54. void messaging_fill_messages(messaging_list_t* target_subscriber) {
  55. single_message_t* message = NULL;
  56. UBaseType_t uxItemsWaiting;
  57. if (!top.buf_handle) {
  58. ESP_LOGE(tag, "Queue initialization error!");
  59. return;
  60. }
  61. vRingbufferGetInfo(top.buf_handle, NULL, NULL, NULL, NULL, &uxItemsWaiting);
  62. for (size_t i = 0; i < uxItemsWaiting; i++) {
  63. message = messaging_retrieve_message(top.buf_handle);
  64. if (message) {
  65. // re-post to original queue so it is available to future subscribers
  66. messaging_post_to_queue(get_handle_ptr(&top), message, message->msg_size);
  67. // post to new subscriber
  68. messaging_post_to_queue(get_handle_ptr(target_subscriber), message, message->msg_size);
  69. FREE_AND_NULL(message);
  70. }
  71. }
  72. }
  73. messaging_handle_t messaging_register_subscriber(uint8_t max_count, char* name) {
  74. messaging_list_t* cur = &top;
  75. while (cur->next) {
  76. cur = get_struct_ptr(cur->next);
  77. }
  78. cur->next = malloc_init_external(sizeof(messaging_list_t));
  79. if (!cur->next) {
  80. ESP_LOGE(tag, "subscriber alloc failed");
  81. return NULL;
  82. }
  83. memset(cur->next, 0x00, sizeof(messaging_list_t));
  84. cur = get_struct_ptr(cur->next);
  85. cur->max_count = max_count;
  86. cur->subscriber_name = strdup_psram(name);
  87. cur->buf_handle = messaging_create_ring_buffer(max_count);
  88. if (cur->buf_handle) {
  89. messaging_fill_messages(cur);
  90. }
  91. return cur->buf_handle;
  92. }
  93. void messaging_service_init() {
  94. size_t max_count = 15;
  95. ESP_LOGI(tag, "Setting up messaging");
  96. top.buf_handle = messaging_create_ring_buffer(max_count);
  97. if (!top.buf_handle) {
  98. ESP_LOGE(tag, "messaging service init failed.");
  99. } else {
  100. top.max_count = max_count;
  101. top.subscriber_name = strdup_psram("messaging");
  102. }
  103. return;
  104. }
  105. const char* messaging_get_type_desc(messaging_types msg_type) {
  106. switch (msg_type) {
  107. CASE_TO_STR(MESSAGING_INFO);
  108. CASE_TO_STR(MESSAGING_WARNING);
  109. CASE_TO_STR(MESSAGING_ERROR);
  110. default:
  111. return "Unknown";
  112. break;
  113. }
  114. }
  115. const char* messaging_get_class_desc(messaging_classes msg_class) {
  116. switch (msg_class) {
  117. CASE_TO_STR(MESSAGING_CLASS_OTA);
  118. CASE_TO_STR(MESSAGING_CLASS_SYSTEM);
  119. CASE_TO_STR(MESSAGING_CLASS_STATS);
  120. CASE_TO_STR(MESSAGING_CLASS_CFGCMD);
  121. CASE_TO_STR(MESSAGING_CLASS_BT);
  122. default:
  123. return "Unknown";
  124. break;
  125. }
  126. }
  127. cJSON* messaging_retrieve_messages(RingbufHandle_t buf_handle) {
  128. single_message_t* message = NULL;
  129. cJSON* json_messages = cJSON_CreateArray();
  130. cJSON* json_message = NULL;
  131. size_t item_size;
  132. UBaseType_t uxItemsWaiting;
  133. vRingbufferGetInfo(buf_handle, NULL, NULL, NULL, NULL, &uxItemsWaiting);
  134. for (int i = 0; i < uxItemsWaiting; i++) {
  135. message = (single_message_t*)xRingbufferReceive(buf_handle, &item_size, pdMS_TO_TICKS(50));
  136. // Check received data
  137. if (message == NULL) {
  138. ESP_LOGE(tag, "received null ptr");
  139. } else {
  140. json_message = cJSON_CreateObject();
  141. cJSON_AddStringToObject(json_message, "message", message->message);
  142. cJSON_AddStringToObject(json_message, "type", messaging_get_type_desc(message->type));
  143. cJSON_AddStringToObject(
  144. json_message, "class", messaging_get_class_desc(message->msg_class));
  145. cJSON_AddNumberToObject(json_message, "sent_time", message->sent_time);
  146. cJSON_AddNumberToObject(json_message, "current_time", esp_timer_get_time() / 1000);
  147. cJSON_AddItemToArray(json_messages, json_message);
  148. vRingbufferReturnItem(buf_handle, (void*)message);
  149. }
  150. }
  151. return json_messages;
  152. }
  153. single_message_t* messaging_retrieve_message(RingbufHandle_t buf_handle) {
  154. single_message_t* message = NULL;
  155. single_message_t* message_copy = NULL;
  156. size_t item_size;
  157. UBaseType_t uxItemsWaiting;
  158. vRingbufferGetInfo(buf_handle, NULL, NULL, NULL, NULL, &uxItemsWaiting);
  159. if (uxItemsWaiting > 0) {
  160. message = (single_message_t*)xRingbufferReceive(buf_handle, &item_size, pdMS_TO_TICKS(50));
  161. message_copy = clone_obj_psram(message, item_size);
  162. vRingbufferReturnItem(buf_handle, (void*)message);
  163. }
  164. return message_copy;
  165. }
  166. esp_err_t messaging_post_to_queue(
  167. messaging_handle_t subscriber_handle, single_message_t* message, size_t message_size) {
  168. size_t item_size = 0;
  169. messaging_list_t* subscriber = get_struct_ptr(subscriber_handle);
  170. if (!subscriber->buf_handle) {
  171. ESP_LOGE(
  172. tag, "post failed: null buffer for %s", str_or_unknown(subscriber->subscriber_name));
  173. return ESP_FAIL;
  174. }
  175. void* pItem = NULL;
  176. UBaseType_t res = pdFALSE;
  177. while (1) {
  178. ESP_LOGD(tag, "Attempting to reserve %d bytes for %s", message_size,
  179. str_or_unknown(subscriber->subscriber_name));
  180. res =
  181. xRingbufferSendAcquire(subscriber->buf_handle, &pItem, message_size, pdMS_TO_TICKS(50));
  182. if (res == pdTRUE && pItem) {
  183. ESP_LOGD(tag, "Reserving complete for %s", str_or_unknown(subscriber->subscriber_name));
  184. memcpy(pItem, message, message_size);
  185. xRingbufferSendComplete(subscriber->buf_handle, pItem);
  186. break;
  187. }
  188. ESP_LOGD(tag, "Dropping for %s", str_or_unknown(subscriber->subscriber_name));
  189. single_message_t* dummy = (single_message_t*)xRingbufferReceive(
  190. subscriber->buf_handle, &item_size, pdMS_TO_TICKS(50));
  191. if (dummy == NULL) {
  192. ESP_LOGE(tag, "Dropping message failed");
  193. break;
  194. } else {
  195. ESP_LOGD(tag, "Dropping message of %d bytes for %s", item_size,
  196. str_or_unknown(subscriber->subscriber_name));
  197. vRingbufferReturnItem(subscriber->buf_handle, (void*)dummy);
  198. }
  199. }
  200. if (res != pdTRUE) {
  201. ESP_LOGE(tag, "post to %s failed", str_or_unknown(subscriber->subscriber_name));
  202. return ESP_FAIL;
  203. }
  204. return ESP_OK;
  205. }
  206. esp_err_t messaging_type_to_err_type(messaging_types type) {
  207. switch (type) {
  208. case MESSAGING_INFO:
  209. return ESP_LOG_INFO;
  210. break;
  211. case MESSAGING_ERROR:
  212. return ESP_LOG_ERROR;
  213. break;
  214. case MESSAGING_WARNING:
  215. return ESP_LOG_WARN;
  216. break;
  217. default:
  218. return ESP_LOG_DEBUG;
  219. break;
  220. }
  221. return ESP_LOG_DEBUG;
  222. }
  223. void messaging_post_message(
  224. messaging_types type, messaging_classes msg_class, const char* fmt, ...) {
  225. va_list va;
  226. va_start(va, fmt);
  227. vmessaging_post_message(type, msg_class, fmt, va);
  228. va_end(va);
  229. }
  230. void vmessaging_post_message(
  231. messaging_types type, messaging_classes msg_class, const char* fmt, va_list va) {
  232. single_message_t* message = NULL;
  233. size_t msg_size = 0;
  234. size_t ln = 0;
  235. messaging_list_t* cur = &top;
  236. ln = vsnprintf(NULL, 0, fmt, va) + 1;
  237. msg_size = sizeof(single_message_t) + ln;
  238. message = (single_message_t*)malloc_init_external(msg_size);
  239. vsprintf(message->message, fmt, va);
  240. message->msg_size = msg_size;
  241. message->type = type;
  242. message->msg_class = msg_class;
  243. message->sent_time = esp_timer_get_time() / 1000;
  244. if (type == MESSAGING_WARNING) {
  245. ESP_LOGW(tag, "%s", message->message);
  246. } else if (type == MESSAGING_ERROR) {
  247. ESP_LOGE(tag, "%s", message->message);
  248. } else {
  249. ESP_LOGD(tag, "Post: %s", message->message);
  250. }
  251. while (cur) {
  252. messaging_post_to_queue(get_handle_ptr(cur), message, msg_size);
  253. cur = get_struct_ptr(cur->next);
  254. }
  255. FREE_AND_NULL(message);
  256. return;
  257. }
  258. char* messaging_alloc_format_string(const char* fmt, ...) {
  259. va_list va;
  260. va_start(va, fmt);
  261. size_t ln = vsnprintf(NULL, 0, fmt, va) + 1;
  262. char* message_txt = malloc_init_external(ln);
  263. if (message_txt) {
  264. vsprintf(message_txt, fmt, va);
  265. va_end(va);
  266. } else {
  267. ESP_LOGE(tag, "Memory allocation failed while sending message");
  268. }
  269. return message_txt;
  270. }
  271. void log_send_messaging(messaging_types msgtype, const char* fmt, ...) {
  272. va_list va;
  273. va_start(va, fmt);
  274. size_t ln = vsnprintf(NULL, 0, fmt, va) + 1;
  275. char* message_txt = malloc_init_external(ln);
  276. if (message_txt) {
  277. vsprintf(message_txt, fmt, va);
  278. va_end(va);
  279. ESP_LOG_LEVEL_LOCAL(messaging_type_to_err_type(msgtype), tag, "%s", message_txt);
  280. messaging_post_message(msgtype, MESSAGING_CLASS_SYSTEM, message_txt);
  281. free(message_txt);
  282. } else {
  283. ESP_LOGE(tag, "Memory allocation failed while sending message");
  284. }
  285. }
  286. void cmd_send_messaging(const char* cmdname, messaging_types msgtype, const char* fmt, ...) {
  287. va_list va;
  288. va_start(va, fmt);
  289. size_t cmd_len = strlen(cmdname) + 1;
  290. size_t ln = vsnprintf(NULL, 0, fmt, va) + 1;
  291. char* message_txt = malloc_init_external(ln + cmd_len);
  292. if (message_txt) {
  293. strcpy(message_txt, cmdname);
  294. strcat(message_txt, "\n");
  295. vsprintf((message_txt + cmd_len), fmt, va);
  296. va_end(va);
  297. ESP_LOG_LEVEL_LOCAL(messaging_type_to_err_type(msgtype), tag, "%s", message_txt);
  298. messaging_post_message(msgtype, MESSAGING_CLASS_CFGCMD, message_txt);
  299. free(message_txt);
  300. } else {
  301. ESP_LOGE(tag, "Memory allocation failed while sending message");
  302. }
  303. }