msgqueue.h 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. ///////////////////////////////////////////////////////////////////////////////
  2. // Name: wx/msqqueue.h
  3. // Purpose: Message queues for inter-thread communication
  4. // Author: Evgeniy Tarassov
  5. // Created: 2007-10-31
  6. // Copyright: (C) 2007 TT-Solutions SARL
  7. // Licence: wxWindows licence
  8. ///////////////////////////////////////////////////////////////////////////////
  9. #ifndef _WX_MSGQUEUE_H_
  10. #define _WX_MSGQUEUE_H_
  11. // ----------------------------------------------------------------------------
  12. // headers
  13. // ----------------------------------------------------------------------------
  14. #include "wx/thread.h"
  15. #if wxUSE_THREADS
  16. #include "wx/stopwatch.h"
  17. #include "wx/beforestd.h"
  18. #include <queue>
  19. #include "wx/afterstd.h"
  20. enum wxMessageQueueError
  21. {
  22. wxMSGQUEUE_NO_ERROR = 0, // operation completed successfully
  23. wxMSGQUEUE_TIMEOUT, // no messages received before timeout expired
  24. wxMSGQUEUE_MISC_ERROR // some unexpected (and fatal) error has occurred
  25. };
  26. // ---------------------------------------------------------------------------
  27. // Message queue allows passing message between threads.
  28. //
  29. // This class is typically used for communicating between the main and worker
  30. // threads. The main thread calls Post() and the worker thread calls Receive().
  31. //
  32. // For this class a message is an object of arbitrary type T. Notice that
  33. // typically there must be some special message indicating that the thread
  34. // should terminate as there is no other way to gracefully shutdown a thread
  35. // waiting on the message queue.
  36. // ---------------------------------------------------------------------------
  37. template <typename T>
  38. class wxMessageQueue
  39. {
  40. public:
  41. // The type of the messages transported by this queue
  42. typedef T Message;
  43. // Default ctor creates an initially empty queue
  44. wxMessageQueue()
  45. : m_conditionNotEmpty(m_mutex)
  46. {
  47. }
  48. // Add a message to this queue and signal the threads waiting for messages.
  49. //
  50. // This method is safe to call from multiple threads in parallel.
  51. wxMessageQueueError Post(const Message& msg)
  52. {
  53. wxMutexLocker locker(m_mutex);
  54. wxCHECK( locker.IsOk(), wxMSGQUEUE_MISC_ERROR );
  55. m_messages.push(msg);
  56. m_conditionNotEmpty.Signal();
  57. return wxMSGQUEUE_NO_ERROR;
  58. }
  59. // Remove all messages from the queue.
  60. //
  61. // This method is meant to be called from the same thread(s) that call
  62. // Post() to discard any still pending requests if they became unnecessary.
  63. wxMessageQueueError Clear()
  64. {
  65. wxCHECK( IsOk(), wxMSGQUEUE_MISC_ERROR );
  66. wxMutexLocker locker(m_mutex);
  67. std::queue<T> empty;
  68. std::swap(m_messages, empty);
  69. return wxMSGQUEUE_NO_ERROR;
  70. }
  71. // Wait no more than timeout milliseconds until a message becomes available.
  72. //
  73. // Setting timeout to 0 is equivalent to an infinite timeout. See Receive().
  74. wxMessageQueueError ReceiveTimeout(long timeout, T& msg)
  75. {
  76. wxCHECK( IsOk(), wxMSGQUEUE_MISC_ERROR );
  77. wxMutexLocker locker(m_mutex);
  78. wxCHECK( locker.IsOk(), wxMSGQUEUE_MISC_ERROR );
  79. const wxMilliClock_t waitUntil = wxGetLocalTimeMillis() + timeout;
  80. while ( m_messages.empty() )
  81. {
  82. wxCondError result = m_conditionNotEmpty.WaitTimeout(timeout);
  83. if ( result == wxCOND_NO_ERROR )
  84. continue;
  85. wxCHECK( result == wxCOND_TIMEOUT, wxMSGQUEUE_MISC_ERROR );
  86. const wxMilliClock_t now = wxGetLocalTimeMillis();
  87. if ( now >= waitUntil )
  88. return wxMSGQUEUE_TIMEOUT;
  89. timeout = (waitUntil - now).ToLong();
  90. wxASSERT(timeout > 0);
  91. }
  92. msg = m_messages.front();
  93. m_messages.pop();
  94. return wxMSGQUEUE_NO_ERROR;
  95. }
  96. // Same as ReceiveTimeout() but waits for as long as it takes for a message
  97. // to become available (so it can't return wxMSGQUEUE_TIMEOUT)
  98. wxMessageQueueError Receive(T& msg)
  99. {
  100. wxCHECK( IsOk(), wxMSGQUEUE_MISC_ERROR );
  101. wxMutexLocker locker(m_mutex);
  102. wxCHECK( locker.IsOk(), wxMSGQUEUE_MISC_ERROR );
  103. while ( m_messages.empty() )
  104. {
  105. wxCondError result = m_conditionNotEmpty.Wait();
  106. wxCHECK( result == wxCOND_NO_ERROR, wxMSGQUEUE_MISC_ERROR );
  107. }
  108. msg = m_messages.front();
  109. m_messages.pop();
  110. return wxMSGQUEUE_NO_ERROR;
  111. }
  112. // Return false only if there was a fatal error in ctor
  113. bool IsOk() const
  114. {
  115. return m_conditionNotEmpty.IsOk();
  116. }
  117. private:
  118. // Disable copy ctor and assignment operator
  119. wxMessageQueue(const wxMessageQueue<T>& rhs);
  120. wxMessageQueue<T>& operator=(const wxMessageQueue<T>& rhs);
  121. mutable wxMutex m_mutex;
  122. wxCondition m_conditionNotEmpty;
  123. std::queue<T> m_messages;
  124. };
  125. #endif // wxUSE_THREADS
  126. #endif // _WX_MSGQUEUE_H_