30 #ifndef _QORE_THREADPOOL_H 31 #define _QORE_THREADPOOL_H 33 #define QTP_DEFAULT_RELEASE_MS 5000 39 class ThreadPoolThread;
41 typedef std::deque<ThreadTask*> taskq_t;
42 typedef qlist<ThreadPoolThread*> tplist_t;
49 DLLLOCAL ~ThreadTask() {
57 cancelCode->deref(xsink);
66 return code->execValue(0, xsink);
71 cancelCode->execValue(0, xsink).discard(xsink);
79 class ThreadTaskHolder {
85 DLLLOCAL ThreadTaskHolder(ThreadTask* t,
ExceptionSink* xs) : task(t), xsink(xs) {
88 DLLLOCAL ~ThreadTaskHolder() {
94 DLLLOCAL ThreadTask* release() {
95 ThreadTask* rv = task;
103 class ThreadPoolThread {
111 tplist_t::iterator pos;
118 DLLLOCAL ThreadPoolThread(ThreadPool& n_tp,
ExceptionSink* xsink);
120 DLLLOCAL ~ThreadPoolThread() {
125 DLLLOCAL
void setPos(tplist_t::iterator p) {
129 DLLLOCAL
bool valid()
const {
135 DLLLOCAL
void stop() {
143 DLLLOCAL
void stopWait() {
167 DLLLOCAL
void submit(ThreadTask* t) {
175 DLLLOCAL
int getId()
const {
179 DLLLOCAL tplist_t::iterator getPos()
const {
216 DLLLOCAL
int checkStopUnlocked(
const char* m,
ExceptionSink* xsink) {
218 xsink->
raiseException(
"THREADPOOL-ERROR",
"ThreadPool::%s() cannot be executed because the ThreadPool is being destroyed", m);
226 std::unique_ptr<ThreadPoolThread> tpth(
new ThreadPoolThread(*
this, xsink));
227 if (!tpth->valid()) {
232 ThreadPoolThread* tpt = tpth.release();
236 tpt->setPos(fh.end());
241 DLLLOCAL ThreadPoolThread* getThreadUnlocked(
ExceptionSink* xsink) {
243 while (!stopflag && fh.empty() && max && (int)ah.size() == max) {
252 ThreadPoolThread* tpt;
259 std::unique_ptr<ThreadPoolThread> tpt_pt(
new ThreadPoolThread(*
this, xsink));
260 if (!tpt_pt->valid()) {
264 tpt = tpt_pt.release();
268 tplist_t::iterator i = ah.end();
275 DLLLOCAL ThreadPool(
ExceptionSink* xsink,
int n_max = 0,
int n_minidle = 0,
int m_maxidle = 0,
int n_release_ms = QTP_DEFAULT_RELEASE_MS);
277 DLLLOCAL ~ThreadPool() {
287 str.
sprintf(
"ThreadPool %p total: %d max: %d minidle: %d maxidle: %d release_ms: %d running: [",
this, ah.size() + fh.size(), max, minidle, maxidle, release_ms);
288 for (tplist_t::iterator i = ah.begin(), e = ah.end(); i != e; ++i) {
291 str.
sprintf(
"%d", (*i)->getId());
296 for (tplist_t::iterator i = fh.begin(), e = fh.end(); i != e; ++i) {
299 str.
sprintf(
"%d", (*i)->getId());
306 DLLLOCAL
void stop() {
319 if (stopflag && !confirm) {
320 xsink->
raiseException(
"THREADPOOL-ERROR",
"cannot call ThreadPool::stopWait() after ()ThreadPool::stop() has been called since child threads have been detached and can no longer be traced");
338 ThreadTaskHolder task(
new ThreadTask(c, cc), xsink);
341 if (checkStopUnlocked(
"submit", xsink))
346 q.push_back(task.release());
351 DLLLOCAL
void threadCounts(
int& idle,
int& running) {
357 DLLLOCAL
int done(ThreadPoolThread* tpt) {
365 tplist_t::iterator i = tpt->getPos();
369 if ((!maxidle && release_ms) || ((
int)fh.size() < maxidle) || q.size() > fh.size()) {
371 if (waiting || (release_ms && (
int)fh.size() > minidle))
the base class for all data to be used as private data of Qore objects
Definition: AbstractPrivateData.h:44
DLLEXPORT int sprintf(const char *fmt,...)
this will concatentate a formatted string to the existing string according to the format string and t...
DLLEXPORT AbstractQoreNode * raiseException(const char *err, const char *fmt,...)
appends a Qore-language exception to the list
DLLEXPORT int signal()
signals a single waiting thread to wake up
a thread condition class implementing a wrapper for pthread_cond_t
Definition: QoreCondition.h:45
Qore's string type supported by the QoreEncoding class.
Definition: QoreString.h:81
DLLEXPORT void concat(const QoreString *str, ExceptionSink *xsink)
concatenates a string and converts encodings if necessary
provides a safe and exception-safe way to hold locks in Qore, only to be used on the stack...
Definition: QoreThreadLock.h:128
The main value class in Qore, designed to be passed by value.
Definition: QoreValue.h:262
container for holding Qore-language exception information and also for registering a "thread_exit" ca...
Definition: ExceptionSink.h:46
base class for resolved call references
Definition: CallReferenceNode.h:105
provides a mutually-exclusive thread lock
Definition: QoreThreadLock.h:47
DLLEXPORT int wait(pthread_mutex_t *m)
blocks a thread on a mutex until the condition is signaled