messaging.c 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. /**
  2. *
  3. */
  4. #include <stdlib.h> // Required for libtelnet.h
  5. #include <esp_log.h>
  6. #include "stdbool.h"
  7. #include <lwip/def.h>
  8. #include <lwip/sockets.h>
  9. #include <errno.h>
  10. #include <string.h>
  11. #include "esp_app_trace.h"
  12. #include "esp_attr.h"
  13. #include "config.h"
  14. #include "nvs_utilities.h"
  15. #include "platform_esp32.h"
  16. #include "messaging.h"
  17. #include "trace.h"
  18. #include "globdefs.h"
  19. /************************************
  20. * Globals
  21. */
  22. const static char tag[] = "messaging";
  23. typedef struct {
  24. struct messaging_list_t * next;
  25. char * subscriber_name;
  26. size_t max_count;
  27. RingbufHandle_t buf_handle;
  28. } messaging_list_t;
  29. static messaging_list_t top;
  30. #define MSG_LENGTH_AVG 1024
  31. messaging_list_t * get_struct_ptr(messaging_handle_t handle){
  32. return (messaging_list_t *)handle;
  33. }
  34. messaging_handle_t get_handle_ptr(messaging_list_t * handle){
  35. return (messaging_handle_t )handle;
  36. }
  37. RingbufHandle_t messaging_create_ring_buffer(uint8_t max_count){
  38. RingbufHandle_t buf_handle = NULL;
  39. StaticRingbuffer_t *buffer_struct = malloc_init_external(sizeof(StaticRingbuffer_t));
  40. if (buffer_struct != NULL) {
  41. size_t buf_size = (size_t )(sizeof(single_message_t)+8+MSG_LENGTH_AVG)*(size_t )(max_count>0?max_count:5); // no-split buffer requires an additional 8 bytes
  42. buf_size = buf_size - (buf_size % 4);
  43. uint8_t *buffer_storage = (uint8_t *)heap_caps_malloc(buf_size, MALLOC_CAP_SPIRAM | MALLOC_CAP_32BIT);
  44. if (buffer_storage== NULL) {
  45. ESP_LOGE(tag,"buff alloc failed");
  46. }
  47. else {
  48. buf_handle = xRingbufferCreateStatic(buf_size, RINGBUF_TYPE_NOSPLIT, buffer_storage, buffer_struct);
  49. }
  50. }
  51. else {
  52. ESP_LOGE(tag,"ringbuf alloc failed");
  53. }
  54. return buf_handle;
  55. }
  56. void messaging_fill_messages(messaging_list_t * target_subscriber){
  57. single_message_t * message=NULL;
  58. UBaseType_t uxItemsWaiting;
  59. vRingbufferGetInfo(top.buf_handle, NULL, NULL, NULL, NULL, &uxItemsWaiting);
  60. for(size_t i=0;i<uxItemsWaiting;i++){
  61. message= messaging_retrieve_message(top.buf_handle);
  62. if(message){
  63. //re-post to original queue so it is available to future subscribers
  64. messaging_post_to_queue(get_handle_ptr(&top), message, message->msg_size);
  65. // post to new subscriber
  66. messaging_post_to_queue(get_handle_ptr(target_subscriber) , message, message->msg_size);
  67. FREE_AND_NULL(message);
  68. }
  69. }
  70. }
  71. messaging_handle_t messaging_register_subscriber(uint8_t max_count, char * name){
  72. messaging_list_t * cur=&top;
  73. while(cur->next){
  74. cur = get_struct_ptr(cur->next);
  75. }
  76. cur->next=malloc_init_external(sizeof(messaging_list_t));
  77. if(!cur->next){
  78. ESP_LOGE(tag,"subscriber alloc failed");
  79. return NULL;
  80. }
  81. memset(cur->next,0x00,sizeof(messaging_list_t));
  82. cur = get_struct_ptr(cur->next);
  83. cur->max_count=max_count;
  84. cur->subscriber_name=strdup_psram(name);
  85. cur->buf_handle = messaging_create_ring_buffer(max_count);
  86. if(cur->buf_handle){
  87. messaging_fill_messages(cur);
  88. }
  89. return cur->buf_handle;
  90. }
  91. void messaging_service_init(){
  92. size_t max_count=15;
  93. top.buf_handle = messaging_create_ring_buffer(max_count);
  94. if(!top.buf_handle){
  95. ESP_LOGE(tag, "messaging service init failed.");
  96. }
  97. else {
  98. top.max_count = max_count;
  99. top.subscriber_name = strdup_psram("messaging");
  100. }
  101. return;
  102. }
  103. const char * messaging_get_type_desc(messaging_types msg_type){
  104. switch (msg_type) {
  105. CASE_TO_STR(MESSAGING_INFO);
  106. CASE_TO_STR(MESSAGING_WARNING);
  107. CASE_TO_STR(MESSAGING_ERROR);
  108. default:
  109. return "Unknown";
  110. break;
  111. }
  112. }
  113. const char * messaging_get_class_desc(messaging_classes msg_class){
  114. switch (msg_class) {
  115. CASE_TO_STR(MESSAGING_CLASS_OTA);
  116. CASE_TO_STR(MESSAGING_CLASS_SYSTEM);
  117. CASE_TO_STR(MESSAGING_CLASS_STATS);
  118. CASE_TO_STR(MESSAGING_CLASS_CFGCMD);
  119. CASE_TO_STR(MESSAGING_CLASS_BT);
  120. default:
  121. return "Unknown";
  122. break;
  123. }
  124. }
  125. cJSON * messaging_retrieve_messages(RingbufHandle_t buf_handle){
  126. single_message_t * message=NULL;
  127. cJSON * json_messages=cJSON_CreateArray();
  128. cJSON * json_message=NULL;
  129. size_t item_size;
  130. UBaseType_t uxItemsWaiting;
  131. vRingbufferGetInfo(buf_handle, NULL, NULL, NULL, NULL, &uxItemsWaiting);
  132. for(int i = 0;i<uxItemsWaiting;i++){
  133. message = (single_message_t *)xRingbufferReceive(buf_handle, &item_size, pdMS_TO_TICKS(50));
  134. //Check received data
  135. if (message== NULL) {
  136. ESP_LOGE(tag,"received null ptr");
  137. }
  138. else {
  139. json_message = cJSON_CreateObject();
  140. cJSON_AddStringToObject(json_message, "message", message->message);
  141. cJSON_AddStringToObject(json_message, "type", messaging_get_type_desc(message->type));
  142. cJSON_AddStringToObject(json_message, "class", messaging_get_class_desc(message->msg_class));
  143. cJSON_AddNumberToObject(json_message,"sent_time",message->sent_time);
  144. cJSON_AddNumberToObject(json_message,"current_time",esp_timer_get_time() / 1000);
  145. cJSON_AddItemToArray(json_messages,json_message);
  146. vRingbufferReturnItem(buf_handle, (void *)message);
  147. }
  148. }
  149. return json_messages;
  150. }
  151. single_message_t * messaging_retrieve_message(RingbufHandle_t buf_handle){
  152. single_message_t * message=NULL;
  153. single_message_t * message_copy=NULL;
  154. size_t item_size;
  155. UBaseType_t uxItemsWaiting;
  156. vRingbufferGetInfo(buf_handle, NULL, NULL, NULL, NULL, &uxItemsWaiting);
  157. if(uxItemsWaiting>0){
  158. message = (single_message_t *)xRingbufferReceive(buf_handle, &item_size, pdMS_TO_TICKS(50));
  159. message_copy = clone_obj_psram(message,item_size);
  160. vRingbufferReturnItem(buf_handle, (void *)message);
  161. }
  162. return message_copy;
  163. }
  164. esp_err_t messaging_post_to_queue(messaging_handle_t subscriber_handle, single_message_t * message, size_t message_size){
  165. size_t item_size=0;
  166. messaging_list_t * subscriber=get_struct_ptr(subscriber_handle);
  167. if(!subscriber->buf_handle){
  168. ESP_LOGE(tag,"post failed: null buffer for %s", str_or_unknown(subscriber->subscriber_name));
  169. return ESP_FAIL;
  170. }
  171. void * pItem=NULL;
  172. UBaseType_t res=pdFALSE;
  173. while(1){
  174. ESP_LOGD(tag,"Attempting to reserve %d bytes for %s",message_size, str_or_unknown(subscriber->subscriber_name));
  175. res = xRingbufferSendAcquire(subscriber->buf_handle, &pItem, message_size, pdMS_TO_TICKS(50));
  176. if(res == pdTRUE && pItem){
  177. ESP_LOGD(tag,"Reserving complete for %s", str_or_unknown(subscriber->subscriber_name));
  178. memcpy(pItem,message,message_size);
  179. xRingbufferSendComplete(subscriber->buf_handle, pItem);
  180. break;
  181. }
  182. ESP_LOGD(tag,"Dropping for %s",str_or_unknown(subscriber->subscriber_name));
  183. single_message_t * dummy = (single_message_t *)xRingbufferReceive(subscriber->buf_handle, &item_size, pdMS_TO_TICKS(50));
  184. if (dummy== NULL) {
  185. ESP_LOGE(tag,"Dropping message failed");
  186. break;
  187. }
  188. else {
  189. ESP_LOGD(tag,"Dropping message of %d bytes for %s",item_size, str_or_unknown(subscriber->subscriber_name));
  190. vRingbufferReturnItem(subscriber->buf_handle, (void *)dummy);
  191. }
  192. }
  193. if (res != pdTRUE) {
  194. ESP_LOGE(tag,"post to %s failed",str_or_unknown(subscriber->subscriber_name));
  195. return ESP_FAIL;
  196. }
  197. return ESP_OK;
  198. }
  199. esp_err_t messaging_type_to_err_type(messaging_types type){
  200. switch (type) {
  201. case MESSAGING_INFO:
  202. return ESP_LOG_INFO;
  203. break;
  204. case MESSAGING_ERROR:
  205. return ESP_LOG_ERROR;
  206. break;
  207. case MESSAGING_WARNING:
  208. return ESP_LOG_WARN;
  209. break;
  210. default:
  211. return ESP_LOG_DEBUG;
  212. break;
  213. }
  214. return ESP_LOG_DEBUG;
  215. }
  216. void messaging_post_message(messaging_types type,messaging_classes msg_class, const char *fmt, ...){
  217. single_message_t * message=NULL;
  218. size_t msg_size=0;
  219. size_t ln =0;
  220. messaging_list_t * cur=&top;
  221. va_list va;
  222. va_start(va, fmt);
  223. ln = vsnprintf(NULL, 0, fmt, va)+1;
  224. msg_size = sizeof(single_message_t)+ln;
  225. message = (single_message_t *)malloc_init_external(msg_size);
  226. vsprintf(message->message, fmt, va);
  227. va_end(va);
  228. message->msg_size = msg_size;
  229. message->type = type;
  230. message->msg_class = msg_class;
  231. message->sent_time = esp_timer_get_time() / 1000;
  232. if(type==MESSAGING_WARNING) {
  233. ESP_LOGW(tag,"%s",message->message);
  234. }
  235. else if(type==MESSAGING_ERROR) {
  236. ESP_LOGE(tag,"%s",message->message);
  237. }
  238. else {
  239. ESP_LOGD(tag,"Post: %s",message->message);
  240. }
  241. while(cur){
  242. messaging_post_to_queue(get_handle_ptr(cur), message, msg_size);
  243. cur = get_struct_ptr(cur->next);
  244. }
  245. FREE_AND_NULL(message);
  246. return;
  247. }
  248. char * messaging_alloc_format_string(const char *fmt, ...) {
  249. va_list va;
  250. va_start(va, fmt);
  251. size_t ln = vsnprintf(NULL, 0, fmt, va)+1;
  252. char * message_txt = malloc_init_external(ln);
  253. if(message_txt){
  254. vsprintf(message_txt, fmt, va);
  255. va_end(va);
  256. }
  257. else{
  258. ESP_LOGE(tag, "Memory allocation failed while sending message");
  259. }
  260. return message_txt;
  261. }
  262. void log_send_messaging(messaging_types msgtype,const char *fmt, ...) {
  263. va_list va;
  264. va_start(va, fmt);
  265. size_t ln = vsnprintf(NULL, 0, fmt, va)+1;
  266. char * message_txt = malloc_init_external(ln);
  267. if(message_txt){
  268. vsprintf(message_txt, fmt, va);
  269. va_end(va);
  270. ESP_LOG_LEVEL_LOCAL(messaging_type_to_err_type(msgtype),tag, "%s",message_txt);
  271. messaging_post_message(msgtype, MESSAGING_CLASS_SYSTEM, message_txt );
  272. free(message_txt);
  273. }
  274. else{
  275. ESP_LOGE(tag, "Memory allocation failed while sending message");
  276. }
  277. }
  278. void cmd_send_messaging(const char * cmdname,messaging_types msgtype, const char *fmt, ...){
  279. va_list va;
  280. va_start(va, fmt);
  281. size_t cmd_len = strlen(cmdname)+1;
  282. size_t ln = vsnprintf(NULL, 0, fmt, va)+1;
  283. char * message_txt = malloc_init_external(ln+cmd_len);
  284. if(message_txt){
  285. strcpy(message_txt,cmdname);
  286. strcat(message_txt,"\n");
  287. vsprintf((message_txt+cmd_len), fmt, va);
  288. va_end(va);
  289. ESP_LOG_LEVEL_LOCAL(messaging_type_to_err_type(msgtype),tag, "%s",message_txt);
  290. messaging_post_message(msgtype, MESSAGING_CLASS_CFGCMD, message_txt );
  291. free(message_txt);
  292. }
  293. else{
  294. ESP_LOGE(tag, "Memory allocation failed while sending message");
  295. }
  296. }