summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-05-26 12:43:43 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-05-26 12:43:43 +0200
commitaa9426f683fa6af9280bc63050ee0187ba4c57e1 (patch)
tree5ba68517cc2661ab3de4afb417592ed67bdab183 /runtime/queue.c
parent210f43137d6a077abbd8b77c1f72193dcd81cc99 (diff)
downloadrsyslog-aa9426f683fa6af9280bc63050ee0187ba4c57e1.tar.gz
rsyslog-aa9426f683fa6af9280bc63050ee0187ba4c57e1.tar.xz
rsyslog-aa9426f683fa6af9280bc63050ee0187ba4c57e1.zip
solved design issue with queue termination
... and also improved the test suite. There is a design issue in the v3 queue engine that manifested to some serious problems with the new processing mode. However, in v3 shutdown may take eternally if a queue runs in DA mode, is configured to preserve data AND the action fails and retries immediately. There is no cure available for v3, it would require doing much of the work we have done on the new engine. The window of exposure, as one might guess from the description, is very small. That is probably the reason why we have not seen it in practice.
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c253
1 files changed, 158 insertions, 95 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 0019297b..539cf4ec 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -202,9 +202,10 @@ getLogicalQueueSize(qqueue_t *pThis)
static inline void queueDrain(qqueue_t *pThis)
{
void *pUsr;
-
ASSERT(pThis != NULL);
+ BEGINfunc
+ dbgoprint((obj_t*) pThis, "queue will lose %d messages, destroying...\n", pThis->iQueueSize);
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
while(pThis->iQueueSize-- > 0) {
pThis->qDeq(pThis, &pUsr);
@@ -213,6 +214,7 @@ static inline void queueDrain(qqueue_t *pThis)
}
pThis->qDel(pThis);
}
+ ENDfunc
}
@@ -617,6 +619,7 @@ static rsRetVal qDeqFixedArray(qqueue_t *pThis, void **out)
RETiRet;
}
+
static rsRetVal qDelFixedArray(qqueue_t *pThis)
{
DEFiRet;
@@ -631,6 +634,26 @@ static rsRetVal qDelFixedArray(qqueue_t *pThis)
}
+/* reset the logical dequeue pointer to the physical dequeue position.
+ * This is only needed after we cancelled workers (during queue shutdown).
+ */
+static rsRetVal
+qUnDeqAllFixedArray(qqueue_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+
+ dbgoprint((obj_t*) pThis, "resetting FixedArray deq index to %ld (was %ld), logical dequeue count %d\n",
+ pThis->tVars.farray.head, pThis->tVars.farray.deqhead, pThis->nLogDeq);
+
+ pThis->tVars.farray.deqhead = pThis->tVars.farray.head;
+ pThis->nLogDeq = 0;
+
+ RETiRet;
+}
+
+
/* -------------------- linked list -------------------- */
@@ -695,7 +718,6 @@ static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr)
qLinkedList_t *pEntry;
DEFiRet;
-RUNLOG_VAR("%p", pThis->tVars.linklist.pDeqRoot);
pEntry = pThis->tVars.linklist.pDeqRoot;
*ppUsr = pEntry->pUsr;
pThis->tVars.linklist.pDeqRoot = pEntry->pNext;
@@ -723,6 +745,26 @@ static rsRetVal qDelLinkedList(qqueue_t *pThis)
}
+/* reset the logical dequeue pointer to the physical dequeue position.
+ * This is only needed after we cancelled workers (during queue shutdown).
+ */
+static rsRetVal
+qUnDeqAllLinkedList(qqueue_t *pThis)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+
+ dbgoprint((obj_t*) pThis, "resetting LinkedList deq ptr to %p (was %p), logical dequeue count %d\n",
+ pThis->tVars.linklist.pDelRoot, pThis->tVars.linklist.pDeqRoot, pThis->nLogDeq);
+
+ pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pDelRoot;
+ pThis->nLogDeq = 0;
+
+ RETiRet;
+}
+
+
/* -------------------- disk -------------------- */
@@ -822,25 +864,16 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF,
(rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
- CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite));
- CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDel));
-
/* we now need to take care of the Deq handle. It is not persisted, so we can create
* a virgin copy based on pReadDel. // TODO duplicat code, same as blow - single function!
*/
- CHKiRet(strmConstruct(&pThis->tVars.disk.pReadDeq));
- CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0));
- CHKiRet(strmSetDir(pThis->tVars.disk.pReadDeq, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
- CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000));
- CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ));
- CHKiRet(strmSetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR));
+ CHKiRet(strmDup(pThis->tVars.disk.pReadDel, &pThis->tVars.disk.pReadDeq));
+ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); /* deq must NOT delete the files! */
CHKiRet(strmConstructFinalize(pThis->tVars.disk.pReadDeq));
- /* TODO: dirty, need stream methods --> */
- pThis->tVars.disk.pReadDeq->iCurrFNum = pThis->tVars.disk.pReadDel->iCurrFNum;
- pThis->tVars.disk.pReadDeq->iCurrOffs = pThis->tVars.disk.pReadDel->iCurrOffs;
- /* <-- dirty, need stream methods :TODO */
+ CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite));
+ CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDel));
CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDeq));
/* OK, we could successfully read the file, so we now can request that it be
@@ -1012,6 +1045,16 @@ finalize_it:
}
+/* This is a dummy function for disks - we do not need to reset anything
+ * because everything is already persisted...
+ */
+static rsRetVal
+qUnDeqAllDisk(__attribute__((unused)) qqueue_t *pThis)
+{
+ return RS_RET_OK;
+}
+
+
/* -------------------- direct (no queueing) -------------------- */
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis)
{
@@ -1055,6 +1098,12 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
return RS_RET_OK;
}
+static rsRetVal
+qUnDeqAllDirect(__attribute__((unused)) qqueue_t *pThis)
+{
+ return RS_RET_OK;
+}
+
/* --------------- end type-specific handlers -------------------- */
@@ -1109,22 +1158,29 @@ qqueueDeq(qqueue_t *pThis, void *pUsr)
/* This function shuts down all worker threads and waits until they
- * have terminated. If they timeout, they are cancelled. Parameters have been set
- * before this function is called so that DA queues will be fully persisted to
- * disk (if configured to do so).
+ * have terminated. If they timeout, they are cancelled.
* rgerhards, 2008-01-24
* Please note that this function shuts down BOTH the parent AND the child queue
* in DA case. This is necessary because their timeouts are tightly coupled. Most
* importantly, the timeouts would be applied twice (or logic be extremely
* complex) if each would have its own shutdown. The function does not self check
* this condition - the caller must make sure it is not called with a parent.
+ * rgerhards, 2009-05-26: we do NO logner persist the queue here if bSaveOnShutdown
+ * is set. This must be handled by the caller. Not doing that cleans up the queue
+ * shutdown considerably. Also, older engines had a potential hang condition when
+ * the DA queue was already started and the DA worker configured for infinite
+ * retries and the action was during retry processing. This was a design issue,
+ * which is solved as of now. Note that the shutdown now may take a little bit
+ * longer, because we no longer can persist the queue in parallel to waiting
+ * on worker timeouts.
*/
-static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
+static rsRetVal
+ShutdownWorkers(qqueue_t *pThis)
{
- DEFiRet;
DEFVARS_mutexProtection;
struct timespec tTimeout;
rsRetVal iRetLocal;
+ DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
@@ -1193,53 +1249,18 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
}
/* when we reach this point, both queues are either empty or the regular queue shutdown timeout
- * has expired. Now we need to check if we are configured to not loose messages. If so, we need
- * to persist the queue to disk (this is only possible if the queue is DA-enabled). We must also
- * set the primary queue to SHUTDOWN_IMMEDIATE, as it shall now terminate as soon as its consumer
- * is done. This is especially important as we otherwise may interfere with queue order while the
- * DA consumer is running. -- rgerhards, 2008-01-27
- * Note: there was a note that we should not wait eternally on the DA worker if we run in
- * enqueue-only note. I have reviewed the code and think there is no need for this check. Howerver,
- * I'd like to keep this note in here should we happen to run into some related trouble.
- * rgerhards, 2008-01-28
+ * has expired. We must set the primary queue to SHUTDOWN_IMMEDIATE, as it shall now terminate
+ * as soon as its consumer is done. In particular, it does no longer need try to empty the queue.
*/
wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); /* set primary queue to shutdown only */
/* at this stage, we need to have the DA worker properly initialized and running (if there is one) */
- if(pThis->bRunsDA)
+ if(pThis->bRunsDA) {
qqueueWaitDAModeInitialized(pThis);
-
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- /* optimize parameters for shutdown of DA-enabled queues */
- if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
- /* switch to enqueue-only mode so that no more actions happen */
- if(pThis->bRunsDA == 0) {
- qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
- } else {
- /* TODO: RACE: we may reach this point when the DA worker has been initialized (state 1)
- * but is not yet running (state 2). In this case, pThis->pqDA is NULL! rgerhards, 2008-02-27
- */
- qqueueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */
- }
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- /* make sure we do not timeout before we are done */
- dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, eternal timeout set\n");
- timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
- /* and run the primary queue's DA worker to drain the queue */
- iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
- if(iRetLocal != RS_RET_OK) {
- dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying to shut down primary queue in disk save mode, "
- "continuing, but results are unpredictable\n", iRetLocal);
- }
- } else {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE); /* also stop DA queue */
}
- /* now the primary queue is either empty, persisted to disk - or set to loose messages. So we
- * can now request immediate shutdown of any remaining workers. Note that if bSaveOnShutdown was set,
- * the queue is now empty. If regular workers are still running, and try to pull the next message,
- * they will automatically terminate as there no longer is any message left to process.
- */
+ /* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
if(getPhysicalQueueSize(pThis) > 0) {
timeoutComp(&tTimeout, pThis->toActShutdown);
@@ -1278,10 +1299,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
/* Now queue workers should have terminated. If not, we need to cancel them as we have applied
* all timeout setting. If any worker in any queue still executes, its consumer is possibly
- * long-running and cancelling is the only way to get rid of it. Note that the
- * cancellation handler will probably re-queue a user pointer, so the queue's enqueue
- * function is still needed (what is no problem as we do not yet destroy the queue - but I
- * thought it's a good idea to mention that fact). -- rgerhards, 2008-01-25
+ * long-running and cancelling is the only way to get rid of it.
*/
dbgoprint((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the primary queue\n");
iRetLocal = wtpCancelAll(pThis->pWtpReg); /* returns immediately if all threads already have terminated */
@@ -1290,12 +1308,6 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
"threads, continuing, but results are unpredictable\n", iRetLocal);
}
-
- /* TODO: think: do we really need to do this here? Can't it happen on DA queue destruction? If we
- * disable it, we get an assertion... I think this is OK, as we need to have a certain order and
- * canceling the DA workers here ensures that order. But in any instant, we may have a look at this
- * code after we have reaced the milestone. -- rgerhards, 2008-01-27
- */
/* ... and now the DA queue, if it exists (should always be after the primary one) */
if(pThis->pqDA != NULL) {
dbgoprint((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the DA queue\n");
@@ -1310,7 +1322,8 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
* Well, more precisely, they *are in termination*. Some cancel cleanup handlers
* may still be running.
*/
- dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", getPhysicalQueueSize(pThis));
+ dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size log %d, phys %d.\n",
+ getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
RETiRet;
}
@@ -1367,6 +1380,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddFixedArray;
pThis->qDeq = qDeqFixedArray;
pThis->qDel = qDelFixedArray;
+ pThis->qUnDeqAll = qUnDeqAllFixedArray;
break;
case QUEUETYPE_LINKEDLIST:
pThis->qConstruct = qConstructLinkedList;
@@ -1374,6 +1388,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddLinkedList;
pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
+ pThis->qUnDeqAll = qUnDeqAllLinkedList;
break;
case QUEUETYPE_DISK:
pThis->qConstruct = qConstructDisk;
@@ -1381,6 +1396,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddDisk;
pThis->qDeq = qDeqDisk;
pThis->qDel = qDelDisk;
+ pThis->qUnDeqAll = qUnDeqAllDisk;
/* special handling */
pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
break;
@@ -1389,6 +1405,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qDestruct = qDestructDirect;
pThis->qAdd = qAddDirect;
pThis->qDel = qDelDirect;
+ pThis->qUnDeqAll = qUnDeqAllDirect;
break;
}
@@ -1817,20 +1834,17 @@ finalize_it:
* If we are a child, we have done our duty when the queue is empty. In that case,
* we can terminate.
* Version for the DA worker thread. NOTE: the pThis->bRunsDA is different from
- * the DA queue
+ * the DA queue.
+ * If our queue is in destruction, we drain to the DA queue and so we shall not terminate
+ * until we have done so.
*/
-static int
+static rsRetVal
qqueueChkStopWrkrDA(qqueue_t *pThis)
{
- /* if our queue is in destruction, we drain to the DA queue and so we shall not terminate
- * until we have done so.
- */
- int bStopWrkr;
-
- BEGINfunc
+ DEFiRet;
if(pThis->bEnqOnly) {
- bStopWrkr = 1;
+ iRet = RS_RET_TERMINATE_NOW;
} else {
if(pThis->bRunsDA) {
ASSERT(pThis->pqDA != NULL);
@@ -1838,19 +1852,16 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
&& pThis->pqDA->sizeOnDiskMax > 0
&& pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) {
/* this queue can never grow, so we can give up... */
- bStopWrkr = 1;
+ iRet = RS_RET_TERMINATE_NOW;
} else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
- bStopWrkr = 1;
- } else {
- bStopWrkr = 0;
+ iRet = RS_RET_TERMINATE_NOW;
}
} else {
- bStopWrkr = 1;
+ iRet = RS_RET_TERMINATE_NOW;
}
}
- ENDfunc
- return bStopWrkr;
+ RETiRet;
}
@@ -1861,10 +1872,20 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
* Version for the regular worker thread. NOTE: the pThis->bRunsDA is different from
* the DA queue
*/
-static int
+static rsRetVal
ChkStopWrkrReg(qqueue_t *pThis)
{
+ DEFiRet;
+ /* original condition
return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && getPhysicalQueueSize(pThis) == 0);
+ * TODO: remove when verified! -- rgerhards, 2009-05-26
+ */
+ if(pThis->bEnqOnly || pThis->bRunsDA)
+ iRet = RS_RET_TERMINATE_NOW;
+ else if(pThis->pqParent != NULL)
+ iRet = RS_RET_TERMINATE_WHEN_IDLE;
+
+ RETiRet;
}
@@ -1881,7 +1902,6 @@ GetDeqBatchSize(qqueue_t *pThis, int *pVal)
}
-/* common function for the idle functions that deletes the last batch if TODO MULTI */
/* must only be called when the queue mutex is locked, else results
* are not stable! DA queue version
*/
@@ -1890,7 +1910,7 @@ qqueueIsIdleDA(qqueue_t *pThis)
{
/* remember: iQueueSize is the DA queue size, not the main queue! */
/* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */
- return(getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk));
+ return(getPhysicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk));
}
/* must only be called when the queue mutex is locked, else results
* are not stable! Regular queue version
@@ -1905,7 +1925,7 @@ IsIdleReg(qqueue_t *pThis)
return ret;
#else
/* regular code! */
- return(getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk));
+ return(getPhysicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk));
#endif
}
@@ -2176,19 +2196,62 @@ finalize_it:
}
+/* persist a queue with all data elements to disk - this is used to handle
+ * bSaveOnShutdown. We utilize the DA worker to do this. This must only
+ * be called after all workers have been shut down and if bSaveOnShutdown
+ * is actually set. Note that this function may potentially run long,
+ * depending on the queue configuration (e.g. store on remote machine).
+ * rgerhards, 2009-05-26
+ */
+static inline rsRetVal
+DoSaveOnShutdown(qqueue_t *pThis)
+{
+ struct timespec tTimeout;
+ rsRetVal iRetLocal;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+
+ qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */
+ /* make sure we do not timeout before we are done */
+ dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, eternal timeout set\n");
+ timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
+ /* and run the primary queue's DA worker to drain the queue */
+ iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
+ dbgoprint((obj_t*) pThis, "end queue persistence run, iRet %d, queue size log %d, phys %d\n",
+ iRetLocal, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
+ if(iRetLocal != RS_RET_OK) {
+ dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying to shut down primary queue in disk save mode, "
+ "continuing, but results are unpredictable\n", iRetLocal);
+ }
+
+ RETiRet;
+}
+
+
/* destructor for the queue object */
BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(qqueue)
pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
- /* shut down all workers (handles *all* of the persistence logic)
- * See function head comment of queueShutdownWorkers () on why we don't call it
- * We also do not need to shutdown workers when we are in enqueue-only mode or we are a
+ /* shut down all workers
+ * We do not need to shutdown workers when we are in enqueue-only mode or we are a
* direct queue - because in both cases we have none... ;)
* with a child! -- rgerhards, 2008-01-28
*/
if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL)
- qqueueShutdownWorkers(pThis);
+ ShutdownWorkers(pThis);
+
+ /* now all workers are terminated. Messages may exist. Also, some logically dequeued
+ * messages may never have been processed because their worker was terminated. So
+ * we need to reset the logical dequeue pointer, persist the queue if configured to do
+ * so and then destruct everything. -- rgerhards, 2009-05-26
+ */
+ CHKiRet(pThis->qUnDeqAll(pThis));
+
+ if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
+ CHKiRet(DoSaveOnShutdown(pThis));
+ }
/* finally destruct our (regular) worker thread pool
* Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen,