C++ Instrument Catalog
ThreadedMailbox.h
Go to the documentation of this file.
1 
6 #pragma once
7 
8 #include <mutex>
9 #include <thread>
10 #include <condition_variable>
11 #include <queue>
12 #include <chrono>
13 #include <memory>
14 #include <atomic>
15 
16 namespace MTL {
17  namespace ThreadedMailbox {
18 
19  class CSemaphore
20  {
21  private:
22  std::mutex m_mutex;
23  std::condition_variable m_condition;
24  unsigned long m_count = 0; // Initialized as locked.
25 
26  public:
28  void reset() {
29  std::unique_lock<decltype(m_mutex)> lock(m_mutex);
30  m_count = 0;
31  }
32 
34  void notify() {
35  std::unique_lock<decltype(m_mutex)> lock(m_mutex);
36  ++m_count;
37  m_condition.notify_one();
38  }
39 
41  void wait() {
42  std::unique_lock<decltype(m_mutex)> lock(m_mutex);
43  while (!m_count) // Handle spurious wake-ups.
44  m_condition.wait(lock);
45  --m_count;
46  }
47 
49  bool wait_for(std::chrono::milliseconds timeout_ms) {
50  std::cv_status l_Stat = std::cv_status::no_timeout;
51  std::unique_lock<decltype(m_mutex)> lock(m_mutex);
52  while (!m_count && l_Stat == std::cv_status::no_timeout) // Handle spurious wake-ups.
53  l_Stat = m_condition.wait_for(lock, timeout_ms);
54  if (l_Stat == std::cv_status::no_timeout)
55  --m_count;
56  return (l_Stat == std::cv_status::no_timeout);
57  }
58 
60  bool try_wait() {
61  std::unique_lock<decltype(m_mutex)> lock(m_mutex);
62  if (m_count) {
63  --m_count;
64  return true;
65  }
66  return false;
67  }
68  };
69 
70  template <typename MsgT>
71  class CMailBox
72  {
73  private:
74  std::queue<MsgT> m_MsgQ;
75  std::mutex m_Lock;
76  CSemaphore m_Sem;
77 
78  public:
79  bool empty()
80  {
81  m_Lock.lock();
82  bool l_em = m_MsgQ.empty();
83  m_Lock.unlock();
84  return l_em;
85  }
86  void clear()
87  {
88  m_Lock.lock();
89  std::queue<MsgT> l_Empty;
90  std::swap(m_MsgQ, l_Empty);
91  m_Sem.reset();
92  m_Lock.unlock();
93  }
94  void push(MsgT pMsg)
95  {
96  m_Lock.lock();
97  m_MsgQ.push(pMsg);
98  m_Sem.notify();
99  m_Lock.unlock();
100  }
101  void pop(MsgT & rV)
102  {
103  m_Sem.wait();
104  m_Lock.lock();
105  if (m_MsgQ.empty())
106  return;
107  rV = m_MsgQ.front();
108  m_MsgQ.pop();
109  m_Lock.unlock();
110  }
111  bool try_pop(MsgT & rV)
112  {
113  bool l_Success = false;
114  m_Lock.lock();
115  if (m_Sem.try_wait())
116  {
117  l_Success = true;
118  rV = m_MsgQ.front();
119  m_MsgQ.pop();
120  }
121  m_Lock.unlock();
122  return l_Success;
123  }
124  };
125 
126  class CTimer
127  {
128  private:
129  std::thread m_Thread; // Thread object
130  std::mutex m_Lock; // Lock onto public interface
131  std::chrono::milliseconds m_Period_ms;
132  bool m_SingleShot;
133  CSemaphore m_AbortSem;
134  std::function<void(void)> m_NotifyEvent;
135 
136  //-----------------------------------------//
137  // Thread Context
138  //-----------------------------------------//
139  public:
140  // Constructor as: CTimer timer(Function)
141  CTimer(const std::function<void(void)> & rTimerEventNotifierFunction)
142  : m_NotifyEvent(rTimerEventNotifierFunction)
143  {}
144  // Constructor as: CTimer timer(&Class::method, &ClassObj)
145  template<class Fn, class... Args>
146  CTimer(Fn&& rTimerEventNotifierFunction, Args&&... Ax)
147  : m_NotifyEvent(std::bind(rTimerEventNotifierFunction, Ax...))
148  {}
149  virtual ~CTimer()
150  {
151  Stop();
152  }
153  private:
154  void ThreadMain()
155  {
156  while (1)
157  {
158  // Wait for timer period to elapse
159  bool l_Aborted = m_AbortSem.wait_for(m_Period_ms);
160 
161  // If requested to abort, quit the thread without notification
162  if (l_Aborted)
163  return;
164 
165  // Notify event
166  m_NotifyEvent();
167 
168  // Check if we continue
169  if (m_SingleShot)
170  return; // If single shot, stop thread
171  }
172  }
173  //-----------------------------------------//
174  // Caller Context
175  //-----------------------------------------//
176  public:
177  bool StartSingleShot(std::chrono::milliseconds Delay_ms)
178  {
179  std::lock_guard<decltype(m_Lock)> l_lg(m_Lock);
180 
181  if (m_Thread.joinable()) // If thread is already running
182  return false;
183 
184  m_SingleShot = true;
185  m_Period_ms = Delay_ms;
186  m_AbortSem.reset();
187  m_Thread = std::thread(&CTimer::ThreadMain, this);
188  return true;
189  }
190  bool Start(std::chrono::milliseconds Period_ms)
191  {
192  std::lock_guard<decltype(m_Lock)> l_lg(m_Lock);
193 
194  if (m_Thread.joinable()) // If thread is already running
195  return false;
196 
197  m_SingleShot = false;
198  m_Period_ms = Period_ms;
199  m_AbortSem.reset();
200  m_Thread = std::thread(&CTimer::ThreadMain, this);
201  return true;
202  }
203  void Stop()
204  {
205  std::lock_guard<decltype(m_Lock)> l_lg(m_Lock);
206 
207  if (m_Thread.joinable()) // If worker thread is running
208  {
209  m_AbortSem.notify(); // Notify thread to stop
210  m_Thread.join(); // Wait for worker thread (and its timer thread) to be stopped
211  }
212  }
213  };
214 
215 
216  //-----------------------------------------//
217  //-----------------------------------------//
218  // Class
219  //-----------------------------------------//
220  //-----------------------------------------//
221  template <typename UserMsgT>
223  {
224  //-----------------------------------------//
225  // Types
226  //-----------------------------------------//
227  protected:
229  private:
230  struct sMailBoxMsg {
231  eEventType Evt;
232  UserMsgT UserMsg;
233  sMailBoxMsg()
234  {}
235  sMailBoxMsg(eEventType EvtType, const UserMsgT & rMsg)
236  : Evt(EvtType), UserMsg(rMsg)
237  {}
238  sMailBoxMsg(eEventType EvtType)
239  : Evt(EvtType)
240  {}
241  };
242  protected:
243  //-----------------------------------------//
244  // Attributes
245  //-----------------------------------------//
246  private:
247  CMailBox<sMailBoxMsg> m_MailBox;
248  std::thread m_Thread;
249  std::atomic<bool> m_RequestedToStop;
250 
251  //-----------------------------------------//
252  // Caller context
253  //-----------------------------------------//
254  protected:
255  void l_SendMsg(const UserMsgT & rMsg);
256  public:
258  : m_RequestedToStop(false)
259  {}
260  void ThreadStart();
261  void ThreadStop();
262  protected:
263  bool l_IsRequestedToStop();
264  //-----------------------------------------//
265  // Worker Thread context
266  //-----------------------------------------//
267  private:
268  void l_ThreadMain();
269  protected:
270  virtual void l_ThreadTask(eEventType EvtType, UserMsgT * pMsg) = 0;
271  };
272 
273 
274  //-----------------------------------------//
275  //-----------------------------------------//
276  // Implementation
277  //-----------------------------------------//
278  //-----------------------------------------//
279  template <typename UserMsgT>
281  {
282  if (!m_Thread.joinable()) // If thread is not already running
283  {
284  m_MailBox.clear();
285  m_RequestedToStop = false;
286  m_Thread = std::thread(&CThreadedMailBox::l_ThreadMain, this);
287  }
288  }
289 
290  template <typename UserMsgT>
292  {
293  m_RequestedToStop = true;
294  m_MailBox.push(sMailBoxMsg(eEventType::kExit)); // Request worker thread to stop
295  if (m_Thread.joinable() && std::this_thread::get_id() != m_Thread.get_id()) // If worker thread is running
296  m_Thread.join(); // Wait for worker thread to be stopped
297  }
298 
299  template <typename UserMsgT>
300  void CThreadedMailBox<UserMsgT>::l_SendMsg(const UserMsgT & rMsg)
301  {
302  m_MailBox.push(sMailBoxMsg(eEventType::kUserMsg, rMsg)); // Push our copy to our message queue
303  }
304 
305  template <typename UserMsgT>
307  {
308  return m_RequestedToStop;
309  }
310 
311  template <typename UserMsgT>
313  {
314  sMailBoxMsg Msg;
315  bool l_RunWorkerThread = true;
316  while (l_RunWorkerThread)
317  {
318  m_MailBox.pop(Msg);
319  switch (Msg.Evt)
320  {
321  case eEventType::kExit:
322  l_RunWorkerThread = false; // Request this thread to stop
323  l_ThreadTask(Msg.Evt, nullptr); // Call thread task for the last time
324  break;
325  case eEventType::kUserMsg:
326  l_ThreadTask(Msg.Evt, &Msg.UserMsg); // Forward user message
327  break;
328  }
329  }
330  }
331 
332  } // namespace ThreadedMailbox
333 } // namespace MTL
MTL::ThreadedMailbox::CThreadedMailBox::kExit
@ kExit
Definition: ThreadedMailbox.h:228
MTL::ThreadedMailbox::CSemaphore::try_wait
bool try_wait()
Check whether a semaphore has been raised.
Definition: ThreadedMailbox.h:60
MTL::ThreadedMailbox::CThreadedMailBox::kUserMsg
@ kUserMsg
Definition: ThreadedMailbox.h:228
MTL::ThreadedMailbox::CSemaphore::wait
void wait()
Wait for a semaphore.
Definition: ThreadedMailbox.h:41
MTL::ThreadedMailbox::CTimer::CTimer
CTimer(Fn &&rTimerEventNotifierFunction, Args &&... Ax)
Definition: ThreadedMailbox.h:146
MTL::ThreadedMailbox::CMailBox
Definition: ThreadedMailbox.h:71
MTL::ThreadedMailbox::CThreadedMailBox::CThreadedMailBox
CThreadedMailBox()
Definition: ThreadedMailbox.h:257
MTL::ThreadedMailbox::CThreadedMailBox::l_SendMsg
void l_SendMsg(const UserMsgT &rMsg)
Definition: ThreadedMailbox.h:300
MTL::ThreadedMailbox::CThreadedMailBox
Definition: ThreadedMailbox.h:222
MTL::ThreadedMailbox::CThreadedMailBox< sUSBRemoteBoxControllerMailBoxMsg >::eEventType
eEventType
Definition: ThreadedMailbox.h:228
MTL::ThreadedMailbox::CMailBox::empty
bool empty()
Definition: ThreadedMailbox.h:79
MTL::ThreadedMailbox::CThreadedMailBox::l_IsRequestedToStop
bool l_IsRequestedToStop()
Definition: ThreadedMailbox.h:306
MTL::ThreadedMailbox::CSemaphore
Definition: ThreadedMailbox.h:19
MTL::ThreadedMailbox::CTimer::CTimer
CTimer(const std::function< void(void)> &rTimerEventNotifierFunction)
Definition: ThreadedMailbox.h:141
MTL::ThreadedMailbox::CMailBox::clear
void clear()
Definition: ThreadedMailbox.h:86
MTL::ThreadedMailbox::CTimer::StartSingleShot
bool StartSingleShot(std::chrono::milliseconds Delay_ms)
Definition: ThreadedMailbox.h:177
MTL::ThreadedMailbox::CTimer::~CTimer
virtual ~CTimer()
Definition: ThreadedMailbox.h:149
MTL
Definition: CPT2026PeripheralROM.h:19
MTL::ThreadedMailbox::CThreadedMailBox::ThreadStop
void ThreadStop()
Definition: ThreadedMailbox.h:291
MTL::ThreadedMailbox::CThreadedMailBox::l_ThreadTask
virtual void l_ThreadTask(eEventType EvtType, UserMsgT *pMsg)=0
MTL::ThreadedMailbox::CSemaphore::reset
void reset()
Reset semaphore object.
Definition: ThreadedMailbox.h:28
MTL::ThreadedMailbox::CTimer::Stop
void Stop()
Definition: ThreadedMailbox.h:203
MTL::ThreadedMailbox::CMailBox::pop
void pop(MsgT &rV)
Definition: ThreadedMailbox.h:101
MTL::ThreadedMailbox::CMailBox::push
void push(MsgT pMsg)
Definition: ThreadedMailbox.h:94
MTL::ThreadedMailbox::CSemaphore::notify
void notify()
Raise a semaphore.
Definition: ThreadedMailbox.h:34
MTL::ThreadedMailbox::CTimer
Definition: ThreadedMailbox.h:126
MTL::ThreadedMailbox::CSemaphore::wait_for
bool wait_for(std::chrono::milliseconds timeout_ms)
Wait for a semaphore for a limited time.
Definition: ThreadedMailbox.h:49
MTL::ThreadedMailbox::CThreadedMailBox::ThreadStart
void ThreadStart()
Definition: ThreadedMailbox.h:280
MTL::ThreadedMailbox::CTimer::Start
bool Start(std::chrono::milliseconds Period_ms)
Definition: ThreadedMailbox.h:190
MTL::ThreadedMailbox::CMailBox::try_pop
bool try_pop(MsgT &rV)
Definition: ThreadedMailbox.h:111