OpenWalnut  1.5.0dev
WThreadedFunction.h
1 //---------------------------------------------------------------------------
2 //
3 // Project: OpenWalnut ( http://www.openwalnut.org )
4 //
5 // Copyright 2009 OpenWalnut Community, BSV@Uni-Leipzig and CNCF@MPI-CBS
6 // For more information see http://www.openwalnut.org/copying
7 //
8 // This file is part of OpenWalnut.
9 //
10 // OpenWalnut is free software: you can redistribute it and/or modify
11 // it under the terms of the GNU Lesser General Public License as published by
12 // the Free Software Foundation, either version 3 of the License, or
13 // (at your option) any later version.
14 //
15 // OpenWalnut is distributed in the hope that it will be useful,
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 // GNU Lesser General Public License for more details.
19 //
20 // You should have received a copy of the GNU Lesser General Public License
21 // along with OpenWalnut. If not, see <http://www.gnu.org/licenses/>.
22 //
23 //---------------------------------------------------------------------------
24 
25 #ifndef WTHREADEDFUNCTION_H
26 #define WTHREADEDFUNCTION_H
27 
28 #include <iostream>
29 #include <memory>
30 #include <string>
31 #include <vector>
32 
33 #include <boost/thread.hpp>
34 
35 #include "WAssert.h"
36 #include "WSharedObject.h"
37 #include "WWorkerThread.h"
38 
39 
40 /**
41  * An enum indicating the status of a multithreaded computation
42  */
43 enum WThreadedFunctionStatus
44 {
45  W_THREADS_INITIALIZED, //! the status after constructing the function
46  W_THREADS_RUNNING, //! the threads were started
47  W_THREADS_STOP_REQUESTED, //! a stop was requested and not all threads have stopped yet
48  W_THREADS_ABORTED, //! at least one thread was aborted due to a stop request or an exception
49  W_THREADS_FINISHED //! all threads completed their work successfully
50 };
51 
52 /**
53  * An enum indicating the number of threads used
54  */
55 enum WThreadedFunctionNbThreads
56 {
57  W_AUTOMATIC_NB_THREADS = 0 //!< Use half the available cores as number of threads
58 };
59 
60 /**
61  * \class WThreadedFunctionBase
62  *
63  * A virtual base class for threaded functions (see below).
64  */
65 class WThreadedFunctionBase // NOLINT
66 {
67  //! a type for exception signals
68  typedef boost::signals2::signal< void ( WException const& ) > ExceptionSignal;
69 
70 public:
71  //! a type for exception callbacks
72  typedef boost::function< void ( WException const& ) > ExceptionFunction;
73 
74  /**
75  * Standard constructor.
76  */
78 
79  /**
80  * Destroys the thread pool and stops all threads, if any one of them is still running.
81  *
82  * \note Of course, the client has to make sure the threads do not work endlessly on a single job.
83  */
84  virtual ~WThreadedFunctionBase();
85 
86  /**
87  * Starts the threads.
88  */
89  virtual void run() = 0;
90 
91  /**
92  * Request all threads to stop. Returns immediately, so you might
93  * have to wait() for the threads to actually finish.
94  */
95  virtual void stop() = 0;
96 
97  /**
98  * Wait for all threads to stop.
99  */
100  virtual void wait() = 0;
101 
102  /**
103  * Get the status of the threads.
104  *
105  * \return The current status.
106  */
107  WThreadedFunctionStatus status();
108 
109  /**
110  * Returns a condition that gets fired when all threads have finished.
111  *
112  * \return The condition indicating all threads are done.
113  */
114  std::shared_ptr< WCondition > getThreadsDoneCondition();
115 
116  /**
117  * Subscribe a function to an exception signal.
118  *
119  * \param func The function to subscribe.
120  */
122 
123 protected:
124  /**
125  * WThreadedFunctionBase is non-copyable, so the copy constructor is not implemented.
126  */
128 
129  /**
130  * WThreadedFunctionBase is non-copyable, so the copy operator is not implemented.
131  *
132  * \return this function
133  */
135 
136  //! a condition that gets notified when the work is complete
137  std::shared_ptr< WCondition > m_doneCondition;
138 
139  //! a signal for exceptions
141 
142  //! the current status
144 };
145 
146 /**
147  * \class WThreadedFunction
148  *
149  * Creates threads that computes a function in a multithreaded fashion. The template parameter
150  * is an object that provides a function to execute. The following function needs to be implemented:
151  *
152  * void operator ( std::size_t id, std::size_t mx, WBoolFlag const& s );
153  *
154  * Here, 'id' is the number of the thread currently executing the function, ranging from
155  * 0 to mx - 1, where 'mx' is the number of threads running. 's' is a flag that indicates
156  * if the execution should be stopped. Make sure to check the flag often, so that the threads
157  * can be stopped when needed.
158  *
159  * This class itself is NOT thread-safe, do not access it from different threads simultaneously.
160  * Also, make sure any resources used by your function are accessed in a threadsafe manner,
161  * as all threads share the same function object.
162  *
163  * Any exception thrown by your function will be caught and forwarded via the exception
164  * signal. Beware that the signal function will be called in the executing threads, as opposed
165  * to in your module thread. This means that the exception handler bound to the exception
166  * signal must be threadsafe too.
167  *
168  * The status of the execution can be checked via the status() function. Also, when all threads
169  * finish (due to throwing exceptions or actually successfully finishing computation ), a condition
170  * will be notified.
171  *
172  * \ingroup common
173  */
174 template< class Function_T >
176 {
177  //! a type for exception signals
178  typedef boost::signals2::signal< void ( WException const& ) > ExceptionSignal;
179 
180 public:
181  //! a type for exception callbacks
182  typedef boost::function< void ( WException const& ) > ExceptionFunction;
183 
184  /**
185  * Creates the thread pool with a given number of threads.
186  *
187  * \param numThreads The number of threads to create.
188  * \param function The function object.
189  *
190  * \note If the number of threads equals 0, a good number of threads will be determined by the threadpool.
191  */
192  WThreadedFunction( std::size_t numThreads, std::shared_ptr< Function_T > function );
193 
194  /**
195  * Destroys the thread pool and stops all threads, if any one of them is still running.
196  *
197  * \note Of course, the client has to make sure the threads do not work endlessly on a single job.
198  */
199  virtual ~WThreadedFunction();
200 
201  /**
202  * Starts the threads.
203  */
204  virtual void run();
205 
206  /**
207  * Request all threads to stop. Returns immediately, so you might
208  * have to wait() for the threads to actually finish.
209  */
210  virtual void stop();
211 
212  /**
213  * Wait for all threads to stop.
214  */
215  virtual void wait();
216 
217 private:
218  /**
219  * WThreadedFunction is non-copyable, so the copy constructor is not implemented.
220  */
222 
223  /**
224  * WThreadedFunction is non-copyable, so the copy operator is not implemented.
225  *
226  * \return this function
227  */
229 
230  /**
231  * This function gets subscribed to the threads' stop signals.
232  */
233  void handleThreadDone();
234 
235  /**
236  * This function handles exceptions thrown in the worker threads.
237  *
238  * \param e The exception that was thrown.
239  */
240  void handleThreadException( WException const& e );
241 
242  //! the number of threads to manage
243  std::size_t m_numThreads;
244 
245  //! the threads
246  // use shared_ptr here, because WWorkerThread is non-copyable
247  std::vector< std::shared_ptr< WWorkerThread< Function_T > > > m_threads;
248 
249  //! the function object
250  std::shared_ptr< Function_T > m_func;
251 
252  //! a counter that keeps track of how many threads have finished
254 };
255 
256 template< class Function_T >
257 WThreadedFunction< Function_T >::WThreadedFunction( std::size_t numThreads, std::shared_ptr< Function_T > function )
259  m_numThreads( numThreads ),
260  m_threads(),
261  m_func( function ),
262  m_threadsDone()
263 {
264  if( !m_func )
265  {
266  throw WException( std::string( "No valid thread function pointer." ) );
267  }
268 
269  // find a suitable number of threads
270  if( m_numThreads == W_AUTOMATIC_NB_THREADS )
271  {
272  m_numThreads = 1;
273  while( m_numThreads < boost::thread::hardware_concurrency() / 2 && m_numThreads < 1024 )
274  {
275  m_numThreads *= 2;
276  }
277  }
278 
279  // set number of finished threads to 0
280  m_threadsDone.getWriteTicket()->get() = 0;
281 
282  // create threads
283  for( std::size_t k = 0; k < m_numThreads; ++k )
284  {
285  std::shared_ptr< WWorkerThread< Function_T > > t( new WWorkerThread< Function_T >( m_func, k, m_numThreads ) );
286  t->subscribeStopSignal( boost::bind( &WThreadedFunction::handleThreadDone, this ) );
287  t->subscribeExceptionSignal( boost::bind( &WThreadedFunction::handleThreadException, this, boost::placeholders::_1 ) );
288  m_threads.push_back( t );
289  }
290 }
291 
292 template< class Function_T >
294 {
295  stop();
296 }
297 
298 template< class Function_T >
300 {
301  // set the number of finished threads to 0
302  m_threadsDone.getWriteTicket()->get() = 0;
303  // change status
304  m_status.getWriteTicket()->get() = W_THREADS_RUNNING;
305  // start threads
306  for( std::size_t k = 0; k < m_numThreads; ++k )
307  {
308  m_threads[ k ]->run();
309  }
310 }
311 
312 template< class Function_T >
314 {
315  // change status
316  m_status.getWriteTicket()->get() = W_THREADS_STOP_REQUESTED;
317 
318  typename std::vector< std::shared_ptr< WWorkerThread< Function_T > > >::iterator it;
319  // tell the threads to stop
320  for( it = m_threads.begin(); it != m_threads.end(); ++it )
321  {
322  ( *it )->requestStop();
323  }
324 }
325 
326 template< class Function_T >
328 {
329  typename std::vector< std::shared_ptr< WWorkerThread< Function_T > > >::iterator it;
330  // wait for the threads to stop
331  for( it = m_threads.begin(); it != m_threads.end(); ++it )
332  {
333  ( *it )->wait();
334  }
335 }
336 
337 template< class Function_T >
339 {
340  typedef typename WSharedObject< std::size_t >::WriteTicket WT;
341 
342  WT t = m_threadsDone.getWriteTicket();
343  WAssert( t->get() < m_numThreads, "" );
344  ++t->get();
345  std::size_t k = t->get();
346  t = WT();
347 
348  if( m_numThreads == k )
349  {
351  ST s = m_status.getWriteTicket();
352  if( s->get() == W_THREADS_RUNNING )
353  {
354  s->get() = W_THREADS_FINISHED;
355  }
356  else if( s->get() == W_THREADS_STOP_REQUESTED )
357  {
358  s->get() = W_THREADS_ABORTED;
359  }
360  else
361  {
362  throw WException( std::string( "Invalid status change." ) );
363  }
364  m_doneCondition->notify();
365  }
366 }
367 
368 template< class Function_T >
370 {
371  // change status
373  WT w = m_status.getWriteTicket();
374  WAssert( w->get() != W_THREADS_FINISHED &&
375  w->get() != W_THREADS_ABORTED, "" );
376  if( w->get() == W_THREADS_RUNNING )
377  {
378  w->get() = W_THREADS_STOP_REQUESTED;
379  }
380  // force destruction of the write ticket
381  w = WT();
382  // update the number of finished threads
383  handleThreadDone();
384 
385  m_exceptionSignal( e );
386 }
387 
388 #endif // WTHREADEDFUNCTION_H
Basic exception handler.
Definition: WException.h:39
WriteTicket getWriteTicket(bool suppressNotify=false) const
Returns a ticket to get write access to the contained data.
A virtual base class for threaded functions (see below).
std::shared_ptr< WCondition > m_doneCondition
a condition that gets notified when the work is complete
std::shared_ptr< WCondition > getThreadsDoneCondition()
Returns a condition that gets fired when all threads have finished.
WThreadedFunctionStatus status()
Get the status of the threads.
boost::signals2::signal< void(WException const &) > ExceptionSignal
a type for exception signals
boost::function< void(WException const &) > ExceptionFunction
a type for exception callbacks
WThreadedFunctionBase & operator=(WThreadedFunctionBase const &)
WThreadedFunctionBase is non-copyable, so the copy operator is not implemented.
ExceptionSignal m_exceptionSignal
a signal for exceptions
virtual void run()=0
Starts the threads.
WThreadedFunctionBase()
Standard constructor.
void subscribeExceptionSignal(ExceptionFunction func)
Subscribe a function to an exception signal.
virtual ~WThreadedFunctionBase()
Destroys the thread pool and stops all threads, if any one of them is still running.
virtual void stop()=0
Request all threads to stop.
WSharedObject< WThreadedFunctionStatus > m_status
the current status
WThreadedFunctionBase(WThreadedFunctionBase const &)
WThreadedFunctionBase is non-copyable, so the copy constructor is not implemented.
virtual void wait()=0
Wait for all threads to stop.
Creates threads that computes a function in a multithreaded fashion.
WThreadedFunction(std::size_t numThreads, std::shared_ptr< Function_T > function)
Creates the thread pool with a given number of threads.
WSharedObject< std::size_t > m_threadsDone
a counter that keeps track of how many threads have finished
virtual void wait()
Wait for all threads to stop.
virtual void stop()
Request all threads to stop.
boost::function< void(WException const &) > ExceptionFunction
a type for exception callbacks
std::vector< std::shared_ptr< WWorkerThread< Function_T > > > m_threads
the threads
void handleThreadDone()
This function gets subscribed to the threads' stop signals.
virtual ~WThreadedFunction()
Destroys the thread pool and stops all threads, if any one of them is still running.
boost::signals2::signal< void(WException const &) > ExceptionSignal
a type for exception signals
WThreadedFunction & operator=(WThreadedFunction const &)
WThreadedFunction is non-copyable, so the copy operator is not implemented.
std::size_t m_numThreads
the number of threads to manage
std::shared_ptr< Function_T > m_func
the function object
void handleThreadException(WException const &e)
This function handles exceptions thrown in the worker threads.
WThreadedFunction(WThreadedFunction const &)
WThreadedFunction is non-copyable, so the copy constructor is not implemented.
virtual void run()
Starts the threads.
A worker thread that belongs to a.
Definition: WWorkerThread.h:43