messaging.c 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. /**
  2. *
  3. */
  4. #include <stdlib.h> // Required for libtelnet.h
  5. #include <esp_log.h>
  6. #include "stdbool.h"
  7. #include <lwip/def.h>
  8. #include <lwip/sockets.h>
  9. #include <errno.h>
  10. #include <string.h>
  11. #include "esp_app_trace.h"
  12. #include "esp_attr.h"
  13. #include "config.h"
  14. #include "nvs_utilities.h"
  15. #include "platform_esp32.h"
  16. #include "messaging.h"
  17. /************************************
  18. * Globals
  19. */
  20. const static char tag[] = "messaging";
  21. typedef struct {
  22. struct messaging_list_t * next;
  23. char * subscriber_name;
  24. size_t max_count;
  25. RingbufHandle_t buf_handle;
  26. } messaging_list_t;
  27. static messaging_list_t top;
  28. messaging_list_t * get_struct_ptr(messaging_handle_t handle){
  29. return (messaging_list_t *)handle;
  30. }
  31. messaging_handle_t get_handle_ptr(messaging_list_t * handle){
  32. return (messaging_handle_t )handle;
  33. }
  34. RingbufHandle_t messaging_create_ring_buffer(uint8_t max_count){
  35. RingbufHandle_t buf_handle = NULL;
  36. StaticRingbuffer_t *buffer_struct = malloc(sizeof(StaticRingbuffer_t));
  37. if (buffer_struct != NULL) {
  38. size_t buf_size = (size_t )(sizeof(single_message_t)+8)*(size_t )(max_count>0?max_count:5); // no-split buffer requires an additional 8 bytes
  39. uint8_t *buffer_storage = (uint8_t *)heap_caps_malloc(buf_size, MALLOC_CAP_SPIRAM | MALLOC_CAP_8BIT);
  40. if (buffer_storage== NULL) {
  41. ESP_LOGE(tag,"buff alloc failed");
  42. }
  43. else {
  44. buf_handle = xRingbufferCreateStatic(buf_size, RINGBUF_TYPE_NOSPLIT, buffer_storage, buffer_struct);
  45. }
  46. }
  47. else {
  48. ESP_LOGE(tag,"ringbuf alloc failed");
  49. }
  50. return buf_handle;
  51. }
  52. void messaging_fill_messages(messaging_list_t * target_subscriber){
  53. single_message_t * message=NULL;
  54. UBaseType_t uxItemsWaiting;
  55. vRingbufferGetInfo(top.buf_handle, NULL, NULL, NULL, NULL, &uxItemsWaiting);
  56. for(size_t i=0;i<uxItemsWaiting;i++){
  57. message= messaging_retrieve_message(top.buf_handle);
  58. if(message){
  59. //re-post to original queue so it is available to future subscribers
  60. messaging_post_to_queue(get_handle_ptr(&top), message, sizeof(single_message_t));
  61. // post to new subscriber
  62. messaging_post_to_queue(get_handle_ptr(target_subscriber) , message, sizeof(single_message_t));
  63. FREE_AND_NULL(message);
  64. }
  65. }
  66. }
  67. messaging_handle_t messaging_register_subscriber(uint8_t max_count, char * name){
  68. messaging_list_t * cur=&top;
  69. while(cur->next){
  70. cur = get_struct_ptr(cur->next);
  71. }
  72. cur->next=heap_caps_malloc(sizeof(messaging_list_t), MALLOC_CAP_SPIRAM | MALLOC_CAP_8BIT);
  73. if(!cur->next){
  74. ESP_LOGE(tag,"subscriber alloc failed");
  75. return NULL;
  76. }
  77. memset(cur->next,0x00,sizeof(messaging_list_t));
  78. cur = get_struct_ptr(cur->next);
  79. cur->max_count=max_count;
  80. cur->subscriber_name=strdup(name);
  81. cur->buf_handle = messaging_create_ring_buffer(max_count);
  82. if(cur->buf_handle){
  83. messaging_fill_messages(cur);
  84. }
  85. return cur->buf_handle;
  86. }
  87. void messaging_service_init(){
  88. size_t max_count=15;
  89. top.buf_handle = messaging_create_ring_buffer(max_count);
  90. if(!top.buf_handle){
  91. ESP_LOGE(tag, "messaging service init failed.");
  92. }
  93. else {
  94. top.max_count = max_count;
  95. top.subscriber_name = strdup("messaging");
  96. }
  97. return;
  98. }
  99. const char * messaging_get_type_desc(messaging_types msg_type){
  100. switch (msg_type) {
  101. CASE_TO_STR(MESSAGING_INFO);
  102. CASE_TO_STR(MESSAGING_WARNING);
  103. CASE_TO_STR(MESSAGING_ERROR);
  104. default:
  105. return "Unknown";
  106. break;
  107. }
  108. }
  109. const char * messaging_get_class_desc(messaging_classes msg_class){
  110. switch (msg_class) {
  111. CASE_TO_STR(MESSAGING_CLASS_OTA);
  112. CASE_TO_STR(MESSAGING_CLASS_SYSTEM);
  113. default:
  114. return "Unknown";
  115. break;
  116. }
  117. }
  118. cJSON * messaging_retrieve_messages(RingbufHandle_t buf_handle){
  119. single_message_t * message=NULL;
  120. cJSON * json_messages=cJSON_CreateArray();
  121. cJSON * json_message=NULL;
  122. size_t item_size;
  123. UBaseType_t uxItemsWaiting;
  124. vRingbufferGetInfo(buf_handle, NULL, NULL, NULL, NULL, &uxItemsWaiting);
  125. for(int i = 0;i<uxItemsWaiting;i++){
  126. message = (single_message_t *)xRingbufferReceive(buf_handle, &item_size, pdMS_TO_TICKS(50));
  127. //Check received data
  128. if (message== NULL) {
  129. ESP_LOGE(tag,"received null ptr");
  130. }
  131. else {
  132. json_message = cJSON_CreateObject();
  133. cJSON_AddStringToObject(json_message, "message", message->message);
  134. cJSON_AddStringToObject(json_message, "type", messaging_get_type_desc(message->type));
  135. cJSON_AddStringToObject(json_message, "class", messaging_get_class_desc(message->msg_class));
  136. cJSON_AddNumberToObject(json_message,"sent_time",message->sent_time);
  137. cJSON_AddNumberToObject(json_message,"current_time",esp_timer_get_time() / 1000);
  138. cJSON_AddItemToArray(json_messages,json_message);
  139. vRingbufferReturnItem(buf_handle, (void *)message);
  140. }
  141. }
  142. return json_messages;
  143. }
  144. single_message_t * messaging_retrieve_message(RingbufHandle_t buf_handle){
  145. single_message_t * message=NULL;
  146. single_message_t * message_copy=NULL;
  147. size_t item_size;
  148. UBaseType_t uxItemsWaiting;
  149. vRingbufferGetInfo(buf_handle, NULL, NULL, NULL, NULL, &uxItemsWaiting);
  150. if(uxItemsWaiting>0){
  151. message = (single_message_t *)xRingbufferReceive(buf_handle, &item_size, pdMS_TO_TICKS(50));
  152. if(item_size!=sizeof(single_message_t)){
  153. ESP_LOGE(tag,"Invalid message length!");
  154. }
  155. else {
  156. message_copy = heap_caps_malloc(item_size, MALLOC_CAP_SPIRAM | MALLOC_CAP_8BIT);
  157. if(message_copy){
  158. memcpy(message_copy,message,item_size);
  159. }
  160. }
  161. vRingbufferReturnItem(buf_handle, (void *)message);
  162. }
  163. return message_copy;
  164. }
  165. esp_err_t messaging_post_to_queue(messaging_handle_t subscriber_handle, single_message_t * message, size_t message_size){
  166. UBaseType_t uxItemsWaiting=0;
  167. size_t item_size=0;
  168. messaging_list_t * subscriber=get_struct_ptr(subscriber_handle);
  169. if(!subscriber->buf_handle){
  170. ESP_LOGE(tag,"post failed: null buffer for %s", str_or_unknown(subscriber->subscriber_name));
  171. return ESP_FAIL;
  172. }
  173. vRingbufferGetInfo(subscriber->buf_handle,NULL,NULL,NULL,NULL,&uxItemsWaiting);
  174. if(uxItemsWaiting>=subscriber->max_count){
  175. ESP_LOGD(tag,"messaged dropped for %s",str_or_unknown(subscriber->subscriber_name));
  176. single_message_t * dummy = (single_message_t *)xRingbufferReceive(subscriber->buf_handle, &item_size, pdMS_TO_TICKS(50));
  177. if (dummy== NULL) {
  178. ESP_LOGE(tag,"receive from buffer failed");
  179. }
  180. else {
  181. vRingbufferReturnItem(subscriber->buf_handle, (void *)dummy);
  182. }
  183. }
  184. UBaseType_t res = xRingbufferSend(subscriber->buf_handle, message, message_size, pdMS_TO_TICKS(1000));
  185. if (res != pdTRUE) {
  186. ESP_LOGE(tag,"post to %s failed",str_or_unknown(subscriber->subscriber_name));
  187. return ESP_FAIL;
  188. }
  189. return ESP_OK;
  190. }
  191. void messaging_post_message(messaging_types type,messaging_classes msg_class, char *fmt, ...){
  192. single_message_t message={};
  193. messaging_list_t * cur=&top;
  194. va_list va;
  195. va_start(va, fmt);
  196. vsnprintf(message.message, sizeof(message.message), fmt, va);
  197. va_end(va);
  198. message.type = type;
  199. message.msg_class = msg_class;
  200. message.sent_time = esp_timer_get_time() / 1000;
  201. while(cur){
  202. messaging_post_to_queue(get_handle_ptr(cur), &message, sizeof(single_message_t));
  203. cur = get_struct_ptr(cur->next);
  204. }
  205. return;
  206. }