Browse Source

Fix for mqtt msg

Per Mårtensson 2 weeks ago
parent
commit
a297d3b8ce
2 changed files with 96 additions and 31 deletions
  1. 95 31
      src/rtl_airband.cpp
  2. 1 0
      src/rtl_airband.h

+ 95 - 31
src/rtl_airband.cpp

@@ -73,8 +73,9 @@
 #include "mqtt/async_client.h"
 #include <ncurses.h>
 #include <curl/curl.h>
+#include <queue>
 #include <jsoncpp/json/json.h>
-
+#include <nlohmann/json.hpp>
 using namespace std;
 using namespace libconfig;
 	struct gpiod_chip *chip;
@@ -85,7 +86,7 @@ using namespace libconfig;
     struct gpiod_line *relay2;
 
 #define MAX_BUFFERS 10
-
+using json = nlohmann::json;
 sem_t mutex_lock_main_thread, mutex_unlock_main_thread;
 sem_t mutex_lock_main_thread_mqtt, mutex_unlock_main_thread_mqtt;
 pthread_mutex_t lock_data; 
@@ -106,11 +107,13 @@ char* bel_mqtt_server;
 char* bel_mqtt_client_id;
 char* bel_mqtt_user_id;
 char* bel_mqtt_password;
-mqtt_msg_t mqtt_msg;
+
 int squelch_status = 0;
 int audio_status = 0;
 int status_data = 0;
 int status_data_old = 0;
+
+
 int device_count, mixer_count;
 static int devices_running = 0;
 int tui = 0;  // do not display textual user interface
@@ -138,7 +141,55 @@ void sighandler(int sig) {
     log(LOG_NOTICE, "Got signal %d, exiting\n", sig);
     do_exit = 1;
 }
+template <typename T>
+class TSQueue {
+private:
+    // Underlying queue
+    std::queue<T> m_queue;
+
+    // mutex for thread synchronization
+    std::mutex m_mutex;
+
+    // Condition variable for signaling
+    std::condition_variable m_cond;
+
+public:
+    // Pushes an element to the queue
+    void push(T item)
+    {
+
+        // Acquire lock
+        std::unique_lock<std::mutex> lock(m_mutex);
+
+        // Add item
+        m_queue.push(item);
+
+        // Notify one thread that
+        // is waiting
+        m_cond.notify_one();
+    }
+
+    // Pops an element off the queue
+    T pop()
+    {
+
+        // acquire lock
+        std::unique_lock<std::mutex> lock(m_mutex);
+
+        // wait until queue is not empty
+        m_cond.wait(lock,
+                    [this]() { return !m_queue.empty(); });
+
+        // retrieve item
+        T item = m_queue.front();
+        m_queue.pop();
 
+        // return item
+        return item;
+    }
+     int size() { return m_queue.size(); }
+};
+TSQueue<mqtt_msg_t> mqtt_queue;
 void* controller_thread(void* params) {
     device_t* dev = (device_t*)params;
     int i = 0;
@@ -223,7 +274,6 @@ void* mqtt_control_thread(void*){
     connOpts.set_ssl(ssl_options);
     
     connOpts.set_automatic_reconnect(10, 40);
-    struct timespec ts;
     try
     {
         PublisherCallback callback;
@@ -235,29 +285,27 @@ void* mqtt_control_thread(void*){
         subToken->wait();
         while (true)
         {
-            int sem_try=0;
-            if (clock_gettime(CLOCK_REALTIME, &ts) == -1 && tui)
-            printf("clock_gettime");
-            ts.tv_sec +=1;
             mqtt_msg_t temp_mess;
-            sem_try = sem_timedwait(&mutex_lock_main_thread_mqtt, &ts);
-            if (sem_try == 0){
-                temp_mess=mqtt_msg;
-                sem_post(&mutex_unlock_main_thread_mqtt);
-                Json::Value event;
+
+                //printf("Size %i\n",mqtt_queue.size());
+            temp_mess=mqtt_queue.pop();
+            Json::Value event;
+            if (temp_mess.bel_status==-1){
                 event["sq"]=temp_mess.sq;
                 event["freq"]=temp_mess.freq;
                 event["name"]=temp_mess.name;
                 event["mountpoint"]=temp_mess.mountpoint;
-                Json::StreamWriterBuilder builder;
-                if (client.is_connected()){
-                    mqtt::message_ptr pubMessage = mqtt::make_message(TOPIC1,Json::writeString(builder, event), QOS, false);
-                    client.publish(pubMessage)->wait();
-                }
-                
             }
-        }
+            if (temp_mess.bel_status!=-1){
+                event["bel_status"]=temp_mess.bel_status;
+            }
+            Json::StreamWriterBuilder builder;
+            if (client.is_connected()){
+                mqtt::message_ptr pubMessage = mqtt::make_message(TOPIC1,Json::writeString(builder, event), QOS, false);
+                client.publish(pubMessage)->wait();
+            }
 
+        }
         mqtt::token_ptr disconnectionToken = client.disconnect();
         disconnectionToken->wait();
     }
@@ -266,7 +314,7 @@ void* mqtt_control_thread(void*){
         std::cerr << "MQTT Exception: " << ex.what() << std::endl;
         return 0;
     }
-
+    printf("HEJ3");
     return 0;
 }
 void* belysning_control_thread(void*){
@@ -278,7 +326,15 @@ void* belysning_control_thread(void*){
     bool sq_status = false;
     long int audio_start_time=0;
     bool active=false;
-
+    char *str ;
+    memset(&str, 0, sizeof(str));
+    mqtt_msg_t mqtt_msg;
+    mqtt_msg.sq = 0;
+    mqtt_msg.freq = 0;
+    mqtt_msg.name = str;
+    mqtt_msg.audio = 0;
+    mqtt_msg.mountpoint = str;
+    mqtt_msg.bel_status = -1;
     int status_data_temp=0;
     gettimeofday(&tv, 0);
     start_time=tv.tv_sec;
@@ -329,6 +385,8 @@ void* belysning_control_thread(void*){
             stop_time=tv.tv_sec+bel_start_on_time;
             mqtt_activate=false;
             active=true;
+            mqtt_msg.bel_status = 1;
+            mqtt_queue.push(mqtt_msg);
             if (tui)
                 printf("Start\n");
 
@@ -340,12 +398,16 @@ void* belysning_control_thread(void*){
                 stop_time=tv.tv_sec+bel_prolong_time;
                 if (tui)
                     printf("Prolong %u\n",bel_prolong_time);
+                mqtt_msg.bel_status = 2;
+                mqtt_queue.push(mqtt_msg);
                 prolong = false;
             }
         }
         if (stop_time<=tv.tv_sec && active){
             active=false;
             start_time=start_time;
+            mqtt_msg.bel_status = 0;
+                mqtt_queue.push(mqtt_msg);
             if (tui)
                 printf("Stop\n");
         }
@@ -926,19 +988,14 @@ void* demodulate(void* params) {
                             }
                            
                             if ((temp_status & 1) != (channel->freqlist[channel->freq_idx].oldstatus & 1)) {
-                                pthread_mutex_lock(&lock_mqtt_data);
-                                if (sem_post (&mutex_lock_main_thread_mqtt) == -1) {
-                                    perror ("sem_post: mutex_lock_main_thread"); exit (1);
-                                }
+                                mqtt_msg_t mqtt_msg;
                                 mqtt_msg.audio =(audio_status << 1);
                                 mqtt_msg.sq = squelch_status;
                                 mqtt_msg.freq = channel->freqlist->frequency;
                                 mqtt_msg.name = fparms->label;
                                 mqtt_msg.mountpoint = fparms->mountpoint;
-                                if (sem_wait(&mutex_unlock_main_thread_mqtt) == -1){
-                                    perror ("sem_post: mutex_unlock_main_thread"); exit (1);
-                                }
-                                pthread_mutex_unlock(&lock_mqtt_data);
+                                mqtt_msg.bel_status = -1;
+                                mqtt_queue.push(mqtt_msg);
                             }
                             channel->freqlist[channel->freq_idx].oldstatus = temp_status;
                             //Here for mqtt
@@ -1492,6 +1549,7 @@ int main(int argc, char* argv[]) {
     }
 
     log(LOG_INFO, "Cleaning up\n");
+
     for (int i = 0; i < device_count; i++) {
         if (devices[i].mode == R_SCAN)
             pthread_join(devices[i].controller_thread, NULL);
@@ -1515,6 +1573,7 @@ int main(int argc, char* argv[]) {
         pthread_join(mixer, NULL);
     }
 
+
     log(LOG_INFO, "Closing output thread(s)\n");
     for (int i = 0; i < output_thread_count; i++) {
         output_params[i].mp3_signal->send();
@@ -1530,7 +1589,12 @@ int main(int argc, char* argv[]) {
             }
         }
     }
-
+    log(LOG_INFO, "Closing bel thread\n");
+        pthread_join(belysning_control_check, NULL);
+    log(LOG_INFO, "Closing mqtt thread\n");
+    pthread_kill(mqtt_control_check,SIGALRM);
+       // pthread_join(mqtt_control_check, NULL);
+    log(LOG_INFO, "Closing xxx thread\n");
     close_debug();
 #ifdef WITH_PROFILING
     ProfilerStop();

+ 1 - 0
src/rtl_airband.h

@@ -294,6 +294,7 @@ struct mqtt_msg_t {
     int freq;
     char* name;
     char* mountpoint;
+    int bel_status;
 };
 struct mixinput_t {
     float* wavein;