summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c3
-rw-r--r--grammar/rainerscript.c2
-rw-r--r--runtime/queue.c124
3 files changed, 66 insertions, 63 deletions
diff --git a/action.c b/action.c
index 8d3239b6..8a3d8920 100644
--- a/action.c
+++ b/action.c
@@ -1616,7 +1616,8 @@ DEFFUNC_llExecFunc(doActivateActions)
}
actionDisable(pThis);
}
- DBGPRINTF("Action %p: queue %p started\n", pThis, pThis->pQueue);
+ DBGPRINTF("Action %s[%p]: queue %p started\n", modGetName(pThis->pMod),
+ pThis, pThis->pQueue);
ENDfunc
return RS_RET_OK; /* we ignore errors, we can not do anything either way */
}
diff --git a/grammar/rainerscript.c b/grammar/rainerscript.c
index b8edf991..dd7eda72 100644
--- a/grammar/rainerscript.c
+++ b/grammar/rainerscript.c
@@ -2127,7 +2127,7 @@ cnfstmtOptimizePRIFilt(struct cnfstmt *stmt)
DBGPRINTF("optimizer: removing always-true PRIFILT %p\n", stmt);
if(stmt->d.s_prifilt.t_else != NULL) {
parser_errmsg("error: always-true PRI filter has else part!\n");
- // TODO: enable (requires changes in action.c) cnfstmtDestruct(stmt->d.s_prifilt.t_else);
+ cnfstmtDestruct(stmt->d.s_prifilt.t_else);
}
subroot = stmt->d.s_prifilt.t_then;
if(subroot == NULL) {
diff --git a/runtime/queue.c b/runtime/queue.c
index 90ffc0a0..0cd33701 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -2079,6 +2079,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pThis->pqParent == NULL ? 0 : 1, pThis->iFullDlyMrk, pThis->iLightDlyMrk,
pThis->iDeqBatchSize);
+ pThis->bQueueStarted = 1;
if(pThis->qType == QUEUETYPE_DIRECT)
FINALIZE; /* with direct queues, we are already finished... */
@@ -2109,7 +2110,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
* the case when a disk queue has been loaded. If we did not start it here, it would never start.
*/
qqueueAdviseMaxWorkers(pThis);
- pThis->bQueueStarted = 1;
/* support statistics gathering */
qName = obj.GetName((obj_t*)pThis);
@@ -2307,73 +2307,75 @@ DoSaveOnShutdown(qqueue_t *pThis)
/* destructor for the queue object */
BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(qqueue)
- /* shut down all workers
- * We do not need to shutdown workers when we are in enqueue-only mode or we are a
- * direct queue - because in both cases we have none... ;)
- * with a child! -- rgerhards, 2008-01-28
- */
- if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL
- && pThis->pWtpReg != NULL)
- ShutdownWorkers(pThis);
+ if(pThis->bQueueStarted) {
+ /* shut down all workers
+ * We do not need to shutdown workers when we are in enqueue-only mode or we are a
+ * direct queue - because in both cases we have none... ;)
+ * with a child! -- rgerhards, 2008-01-28
+ */
+ if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL
+ && pThis->pWtpReg != NULL)
+ ShutdownWorkers(pThis);
- if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
- CHKiRet(DoSaveOnShutdown(pThis));
- }
+ if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
+ CHKiRet(DoSaveOnShutdown(pThis));
+ }
- /* finally destruct our (regular) worker thread pool
- * Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen,
- * e.g. when they are not created in enqueue-only mode. We already check the condition
- * as this may otherwise be very hard to find once we optimize (and have long forgotten
- * about this condition here ;)
- * rgerhards, 2008-01-25
- */
- if(pThis->qType != QUEUETYPE_DIRECT && pThis->pWtpReg != NULL) {
- wtpDestruct(&pThis->pWtpReg);
- }
+ /* finally destruct our (regular) worker thread pool
+ * Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen,
+ * e.g. when they are not created in enqueue-only mode. We already check the condition
+ * as this may otherwise be very hard to find once we optimize (and have long forgotten
+ * about this condition here ;)
+ * rgerhards, 2008-01-25
+ */
+ if(pThis->qType != QUEUETYPE_DIRECT && pThis->pWtpReg != NULL) {
+ wtpDestruct(&pThis->pWtpReg);
+ }
- /* Now check if we actually have a DA queue and, if so, destruct it.
- * Note that the wtp must be destructed first, it may be in cancel cleanup handler
- * *right now* and actually *need* to access the queue object to persist some final
- * data (re-queueing case). So we need to destruct the wtp first, which will make
- * sure all workers have terminated. Please note that this also generates a situation
- * where it is possible that the DA queue has a parent pointer but the parent has
- * no WtpDA associated with it - which is perfectly legal thanks to this code here.
- */
- if(pThis->pWtpDA != NULL) {
- wtpDestruct(&pThis->pWtpDA);
- }
- if(pThis->pqDA != NULL) {
- qqueueDestruct(&pThis->pqDA);
- }
+ /* Now check if we actually have a DA queue and, if so, destruct it.
+ * Note that the wtp must be destructed first, it may be in cancel cleanup handler
+ * *right now* and actually *need* to access the queue object to persist some final
+ * data (re-queueing case). So we need to destruct the wtp first, which will make
+ * sure all workers have terminated. Please note that this also generates a situation
+ * where it is possible that the DA queue has a parent pointer but the parent has
+ * no WtpDA associated with it - which is perfectly legal thanks to this code here.
+ */
+ if(pThis->pWtpDA != NULL) {
+ wtpDestruct(&pThis->pWtpDA);
+ }
+ if(pThis->pqDA != NULL) {
+ qqueueDestruct(&pThis->pqDA);
+ }
- /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty)
- * This handler is most important for disk queues, it will finally persist the necessary
- * on-disk structures. In theory, other queueing modes may implement their other (non-DA)
- * methods of persisting a queue between runs, but in practice all of this is done via
- * disk queues and DA mode. Anyhow, it doesn't hurt to know that we could extend it here
- * if need arises (what I doubt...) -- rgerhards, 2008-01-25
- */
- CHKiRet_Hdlr(qqueuePersist(pThis, QUEUE_NO_CHECKPOINT)) {
- DBGOPRINT((obj_t*) pThis, "error %d persisting queue - data lost!\n", iRet);
- }
+ /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty)
+ * This handler is most important for disk queues, it will finally persist the necessary
+ * on-disk structures. In theory, other queueing modes may implement their other (non-DA)
+ * methods of persisting a queue between runs, but in practice all of this is done via
+ * disk queues and DA mode. Anyhow, it doesn't hurt to know that we could extend it here
+ * if need arises (what I doubt...) -- rgerhards, 2008-01-25
+ */
+ CHKiRet_Hdlr(qqueuePersist(pThis, QUEUE_NO_CHECKPOINT)) {
+ DBGOPRINT((obj_t*) pThis, "error %d persisting queue - data lost!\n", iRet);
+ }
- /* finally, clean up some simple things... */
- if(pThis->pqParent == NULL) {
- /* if we are not a child, we allocated our own mutex, which we now need to destroy */
- pthread_mutex_destroy(pThis->mut);
- free(pThis->mut);
- }
- pthread_mutex_destroy(&pThis->mutThrdMgmt);
- pthread_cond_destroy(&pThis->notFull);
- pthread_cond_destroy(&pThis->notEmpty);
- pthread_cond_destroy(&pThis->belowFullDlyWtrMrk);
- pthread_cond_destroy(&pThis->belowLightDlyWtrMrk);
+ /* finally, clean up some simple things... */
+ if(pThis->pqParent == NULL) {
+ /* if we are not a child, we allocated our own mutex, which we now need to destroy */
+ pthread_mutex_destroy(pThis->mut);
+ free(pThis->mut);
+ }
+ pthread_mutex_destroy(&pThis->mutThrdMgmt);
+ pthread_cond_destroy(&pThis->notFull);
+ pthread_cond_destroy(&pThis->notEmpty);
+ pthread_cond_destroy(&pThis->belowFullDlyWtrMrk);
+ pthread_cond_destroy(&pThis->belowLightDlyWtrMrk);
- DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
- DESTROY_ATOMIC_HELPER_MUT(pThis->mutLogDeq);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutLogDeq);
- /* type-specific destructor */
- iRet = pThis->qDestruct(pThis);
+ /* type-specific destructor */
+ iRet = pThis->qDestruct(pThis);
+ }
free(pThis->pszFilePrefix);
free(pThis->pszSpoolDir);