Qore Programming Language  0.9.3.2
ThreadPool.h
1 /* -*- mode: c++; indent-tabs-mode: nil -*- */
2 /*
3  Qore Programming Language
4 
5  Copyright (C) 2003 - 2015 David Nichols
6 
7  Permission is hereby granted, free of charge, to any person obtaining a
8  copy of this software and associated documentation files (the "Software"),
9  to deal in the Software without restriction, including without limitation
10  the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  and/or sell copies of the Software, and to permit persons to whom the
12  Software is furnished to do so, subject to the following conditions:
13 
14  The above copyright notice and this permission notice shall be included in
15  all copies or substantial portions of the Software.
16 
17  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23  DEALINGS IN THE SOFTWARE.
24 
25  Note that the Qore library is released under a choice of three open-source
26  licenses: MIT (as above), LGPL 2+, or GPL 2+; see README-LICENSE for more
27  information.
28 */
29 
30 #ifndef _QORE_THREADPOOL_H
31 #define _QORE_THREADPOOL_H
32 
33 #define QTP_DEFAULT_RELEASE_MS 5000
34 
35 #include <deque>
36 #include <qore/qlist>
37 
38 class ThreadTask;
39 class ThreadPoolThread;
40 
41 typedef std::deque<ThreadTask*> taskq_t;
42 typedef qlist<ThreadPoolThread*> tplist_t;
43 
44 class ThreadTask {
45 public:
46  DLLLOCAL ThreadTask(ResolvedCallReferenceNode* c, ResolvedCallReferenceNode* cc) : code(c), cancelCode(cc) {
47  }
48 
49  DLLLOCAL ~ThreadTask() {
50  assert(!code);
51  assert(!cancelCode);
52  }
53 
54  DLLLOCAL void del(ExceptionSink* xsink) {
55  code->deref(xsink);
56  if (cancelCode)
57  cancelCode->deref(xsink);
58 #ifdef DEBUG
59  code = nullptr;
60  cancelCode = nullptr;
61 #endif
62  delete this;
63  }
64 
65  DLLLOCAL QoreValue run(ExceptionSink* xsink) {
66  return code->execValue(0, xsink);
67  }
68 
69  DLLLOCAL void cancel(ExceptionSink* xsink) {
70  if (cancelCode)
71  cancelCode->execValue(0, xsink).discard(xsink);
72  }
73 
74 protected:
76  ResolvedCallReferenceNode* cancelCode;
77 };
78 
79 class ThreadTaskHolder {
80 protected:
81  ThreadTask* task;
82  ExceptionSink* xsink;
83 
84 public:
85  DLLLOCAL ThreadTaskHolder(ThreadTask* t, ExceptionSink* xs) : task(t), xsink(xs) {
86  }
87 
88  DLLLOCAL ~ThreadTaskHolder() {
89  if (task) {
90  task->del(xsink);
91  }
92  }
93 
94  DLLLOCAL ThreadTask* release() {
95  ThreadTask* rv = task;
96  task = 0;
97  return rv;
98  }
99 };
100 
101 class ThreadPool;
102 
103 class ThreadPoolThread {
104 protected:
105  int id;
106  ThreadPool& tp;
107  ThreadTask* task;
108  QoreCondition c,
109  *stopCond;
110  QoreThreadLock m;
111  tplist_t::iterator pos;
112  bool stopflag,
113  stopped;
114 
115  DLLLOCAL void finalize(ExceptionSink* xsink);
116 
117 public:
118  DLLLOCAL ThreadPoolThread(ThreadPool& n_tp, ExceptionSink* xsink);
119 
120  DLLLOCAL ~ThreadPoolThread() {
121  delete stopCond;
122  assert(!task);
123  }
124 
125  DLLLOCAL void setPos(tplist_t::iterator p) {
126  pos = p;
127  }
128 
129  DLLLOCAL bool valid() const {
130  return id != -1;
131  }
132 
133  DLLLOCAL void worker(ExceptionSink* xsink);
134 
135  DLLLOCAL void stop() {
136  AutoLocker al(m);
137  assert(!stopflag);
138  stopflag = true;
139  c.signal();
140  //printd(5, "ThreadPoolThread::stop() signaling stop for id %d\n", id);
141  }
142 
143  DLLLOCAL void stopWait() {
144  //printd(5, "ThreadPoolThread::stopWait() stopping id %d\n", id);
145  assert(!stopCond);
146  stopCond = new QoreCondition;
147 
148  AutoLocker al(m);
149  assert(!stopflag);
150  stopflag = true;
151  c.signal();
152  }
153 
154  DLLLOCAL void stopConfirm(ExceptionSink* xsink) {
155  {
156  AutoLocker al(m);
157  assert(stopflag);
158  assert(stopCond);
159  while (!stopped)
160  stopCond->wait(m);
161  }
162 
163  //printd(5, "ThreadPoolThread::stopConfirm() stopped id %d\n", id);
164  finalize(xsink);
165  }
166 
167  DLLLOCAL void submit(ThreadTask* t) {
168  AutoLocker al(m);
169  assert(!stopflag);
170  assert(!task);
171  task = t;
172  c.signal();
173  }
174 
175  DLLLOCAL int getId() const {
176  return id;
177  }
178 
179  DLLLOCAL tplist_t::iterator getPos() const {
180  return pos;
181  }
182 };
183 
184 class ThreadPool : public AbstractPrivateData {
185 protected:
186  int max, // maximum number of threads in pool (if <= 0 then unlimited)
187  minidle, // minimum number of idle threads
188  maxidle, // maximum number of idle threads
189  release_ms; // number of milliseconds before idle threads are released when > minidle
190 
191  // mutex for atomicity
192  QoreThreadLock m;
193 
194  // worker thread condition variable
195  QoreCondition cond;
196 
197  // stop condition variable
198  QoreCondition stopCond;
199 
200  tplist_t ah, // allocated thread list
201  fh; // free thread list
202 
203  // quit flag
204  bool quit;
205 
206  // master task queue
207  taskq_t q;
208 
209  // task waiting flag
210  bool waiting;
211 
212  bool stopflag, // stop flag
213  stopped, // stopped flag
214  confirm; // confirm member thread stop
215 
216  DLLLOCAL int checkStopUnlocked(const char* m, ExceptionSink* xsink) {
217  if (stopflag) {
218  xsink->raiseException("THREADPOOL-ERROR", "ThreadPool::%s() cannot be executed because the ThreadPool is being destroyed", m);
219  return -1;
220  }
221  return 0;
222  }
223 
224  DLLLOCAL int addIdleWorker(ExceptionSink* xsink) {
225  assert(xsink);
226  std::unique_ptr<ThreadPoolThread> tpth(new ThreadPoolThread(*this, xsink));
227  if (!tpth->valid()) {
228  assert(*xsink);
229  return -1;
230  }
231 
232  ThreadPoolThread* tpt = tpth.release();
233  fh.push_back(tpt);
234 #ifdef DEBUG
235  // set to an invalid iterator
236  tpt->setPos(fh.end());
237 #endif
238  return 0;
239  }
240 
241  DLLLOCAL ThreadPoolThread* getThreadUnlocked(ExceptionSink* xsink) {
242  assert(xsink);
243  while (!stopflag && fh.empty() && max && (int)ah.size() == max) {
244  waiting = true;
245  cond.wait(m);
246  waiting = false;
247  }
248 
249  if (stopflag)
250  return 0;
251 
252  ThreadPoolThread* tpt;
253 
254  if (!fh.empty()) {
255  tpt = fh.front();
256  fh.pop_front();
257  }
258  else {
259  std::unique_ptr<ThreadPoolThread> tpt_pt(new ThreadPoolThread(*this, xsink));
260  if (!tpt_pt->valid()) {
261  assert(*xsink);
262  return 0;
263  }
264  tpt = tpt_pt.release();
265  }
266 
267  ah.push_back(tpt);
268  tplist_t::iterator i = ah.end();
269  --i;
270  tpt->setPos(i);
271  return tpt;
272  }
273 
274 public:
275  DLLLOCAL ThreadPool(ExceptionSink* xsink, int n_max = 0, int n_minidle = 0, int m_maxidle = 0, int n_release_ms = QTP_DEFAULT_RELEASE_MS);
276 
277  DLLLOCAL ~ThreadPool() {
278  assert(q.empty());
279  assert(ah.empty());
280  assert(fh.empty());
281  assert(stopped);
282  }
283 
284  DLLLOCAL void toString(QoreString& str) {
285  AutoLocker al(m);
286 
287  str.sprintf("ThreadPool %p total: %d max: %d minidle: %d maxidle: %d release_ms: %d running: [", this, ah.size() + fh.size(), max, minidle, maxidle, release_ms);
288  for (tplist_t::iterator i = ah.begin(), e = ah.end(); i != e; ++i) {
289  if (i != ah.begin())
290  str.concat(", ");
291  str.sprintf("%d", (*i)->getId());
292  }
293 
294  str.concat("] idle: [");
295 
296  for (tplist_t::iterator i = fh.begin(), e = fh.end(); i != e; ++i) {
297  if (i != fh.begin())
298  str.concat(", ");
299  str.sprintf("%d", (*i)->getId());
300  }
301 
302  str.concat(']');
303  }
304 
305  // does not return until the thread pool has been stopped
306  DLLLOCAL void stop() {
307  AutoLocker al(m);
308  if (!stopflag) {
309  stopflag = true;
310  cond.signal();
311  }
312 
313  while (!stopped)
314  stopCond.wait(m);
315  }
316 
317  DLLLOCAL int stopWait(ExceptionSink* xsink) {
318  AutoLocker al(m);
319  if (stopflag && !confirm) {
320  xsink->raiseException("THREADPOOL-ERROR", "cannot call ThreadPool::stopWait() after ()ThreadPool::stop() has been called since child threads have been detached and can no longer be traced");
321  return -1;
322  }
323 
324  if (!stopflag) {
325  stopflag = true;
326  confirm = true;
327  cond.signal();
328  }
329 
330  while (!stopped)
331  stopCond.wait(m);
332 
333  return 0;
334  }
335 
336  DLLLOCAL int submit(ResolvedCallReferenceNode* c, ResolvedCallReferenceNode* cc, ExceptionSink* xsink) {
337  // optimistically create the task object outside the lock
338  ThreadTaskHolder task(new ThreadTask(c, cc), xsink);
339 
340  AutoLocker al(m);
341  if (checkStopUnlocked("submit", xsink))
342  return -1;
343 
344  if (q.empty())
345  cond.signal();
346  q.push_back(task.release());
347 
348  return 0;
349  }
350 
351  DLLLOCAL void threadCounts(int& idle, int& running) {
352  AutoLocker al(m);
353  idle = fh.size();
354  running = ah.size();
355  }
356 
357  DLLLOCAL int done(ThreadPoolThread* tpt) {
358  {
359  AutoLocker al(m);
360  // allow the thread to be removed from the active list by ThreadPool::worker() to avoid race conditions
361  if (stopflag)
362  return 0;
363 
364  if (!confirm) {
365  tplist_t::iterator i = tpt->getPos();
366  ah.erase(i);
367 
368  // requeue thread if possible
369  if ((!maxidle && release_ms) || ((int)fh.size() < maxidle) || q.size() > fh.size()) {
370  fh.push_back(tpt);
371  if (waiting || (release_ms && (int)fh.size() > minidle))
372  cond.signal();
373  return 0;
374  }
375  }
376  }
377 
378  return -1;
379  }
380 
381  DLLLOCAL void worker(ExceptionSink* xsink);
382 };
383 
384 #endif
the base class for all data to be used as private data of Qore objects
Definition: AbstractPrivateData.h:44
DLLEXPORT int sprintf(const char *fmt,...)
this will concatentate a formatted string to the existing string according to the format string and t...
DLLEXPORT AbstractQoreNode * raiseException(const char *err, const char *fmt,...)
appends a Qore-language exception to the list
DLLEXPORT int signal()
signals a single waiting thread to wake up
a thread condition class implementing a wrapper for pthread_cond_t
Definition: QoreCondition.h:45
Qore&#39;s string type supported by the QoreEncoding class.
Definition: QoreString.h:81
DLLEXPORT void concat(const QoreString *str, ExceptionSink *xsink)
concatenates a string and converts encodings if necessary
provides a safe and exception-safe way to hold locks in Qore, only to be used on the stack...
Definition: QoreThreadLock.h:128
The main value class in Qore, designed to be passed by value.
Definition: QoreValue.h:262
container for holding Qore-language exception information and also for registering a "thread_exit" ca...
Definition: ExceptionSink.h:46
base class for resolved call references
Definition: CallReferenceNode.h:105
provides a mutually-exclusive thread lock
Definition: QoreThreadLock.h:47
DLLEXPORT int wait(pthread_mutex_t *m)
blocks a thread on a mutex until the condition is signaled