diff options
-rw-r--r-- | action.c | 4 | ||||
-rw-r--r-- | doc/rsyslog_secure_tls.html | 2 | ||||
-rw-r--r-- | doc/tls_cert_server.html | 9 | ||||
-rw-r--r-- | runtime/batch.h | 2 | ||||
-rw-r--r-- | runtime/queue.c | 74 | ||||
-rwxr-xr-x | tests/daqueue-persist-drvr.sh | 5 |
6 files changed, 54 insertions, 42 deletions
@@ -936,8 +936,8 @@ processAction(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) CHKiRet(localRet); /* this must be moved away - up into the dequeue part of the queue, I guess, but that's for another day */ - //for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { - for(i = 0 ; i < pBatch->nElem ; i++) { + for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { + //for(i = 0 ; i < pBatch->nElem ; i++) { pMsg = (msg_t*) pBatch->pElem[i].pUsrp; } iRet = finishBatch(pAction); diff --git a/doc/rsyslog_secure_tls.html b/doc/rsyslog_secure_tls.html index be2811f4..b15e5a4e 100644 --- a/doc/rsyslog_secure_tls.html +++ b/doc/rsyslog_secure_tls.html @@ -51,7 +51,7 @@ google_ad_height = 125; src="http://pagead2.googlesyndication.com/pagead/show_ads.js"> </script> </span> -I private keys have become known to third parties, the system does not provide +If private keys have become known to third parties, the system does not provide any security at all. Also, our solution bases on X.509 certificates and a (very limited) chain of trust. We have one instance (the CA) that issues all machine certificates. The machine certificate indentifies a particular machine. hile in diff --git a/doc/tls_cert_server.html b/doc/tls_cert_server.html index 9c68db5d..9c024bc9 100644 --- a/doc/tls_cert_server.html +++ b/doc/tls_cert_server.html @@ -37,6 +37,15 @@ src="http://pagead2.googlesyndication.com/pagead/show_ads.js"> </script> </span> <p><center><img src="tls_cert_100.jpg"></center> +<p><i><font color="red"><b>Important:</b> Keep in mind that the order of configuration directives +is very important in rsyslog. As such, the samples given below do only work if the given +order is preserved.</font> Re-ordering the directives can break configurations and has broken them +in practice. If you intend to re-order them, please be sure that you fully understand how +the configuration language works and, most importantly, which statements form a block together. +Please also note that we understand the the current configuration file format is +ugly. However, there has been more important work in the way of enhancing it. If you would like +to contribute some time to improve the config file language, please let us know. Any help +is appreciated (be it doc or coding work!).</i> <p>Steps to do: <ul> <li>make sure you have a functional CA (<a href="tls_cert_ca.html">Setting up the CA</a>) diff --git a/runtime/batch.h b/runtime/batch.h index 031718a7..2b3aa83e 100644 --- a/runtime/batch.h +++ b/runtime/batch.h @@ -34,7 +34,7 @@ typedef enum { BATCH_STATE_RDY = 0, /* object ready for processing */ BATCH_STATE_BAD = 1, /* unrecoverable failure while processing, do NOT resubmit to same action */ - BATCH_STATE_SUB = 2, /* message submitted for processing, outcome yet unkonwn */ + BATCH_STATE_SUB = 2, /* message submitted for processing, outcome yet unknown */ BATCH_STATE_COMM = 3, /* message successfully commited */ BATCH_STATE_DISC = 4, /* discarded - processed OK, but do not submit to any other action */ } batch_state_t; diff --git a/runtime/queue.c b/runtime/queue.c index 67bc40c2..b4f00446 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -70,6 +70,7 @@ DEFobjCurrIf(strm) DEFobjCurrIf(errmsg) /* forward-definitions */ +static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr); static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates); static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); @@ -1362,7 +1363,7 @@ dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQue * picking up things from the to-delete list. */ static inline rsRetVal -DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) +DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted) { toDeleteLst_t *pTdl; qDeqID deqIDDel; @@ -1370,10 +1371,11 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); + assert(nDeleted > 0); pTdl = tdlPeek(pThis); /* get current head element */ if(pTdl == NULL) { /* to-delete list empty */ - DoDeleteBatchFromQStore(pThis, pBatch->nElemDeq); + DoDeleteBatchFromQStore(pThis, nDeleted); } else if(pBatch->deqID == pThis->deqIDDel) { deqIDDel = pThis->deqIDDel; pTdl = tdlPeek(pThis); @@ -1386,7 +1388,7 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) } else { /* can not delete, insert into to-delete list */ dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID); - CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElemDeq)); + CHKiRet(tdlAdd(pThis, pBatch->deqID, nDeleted)); } finalize_it: @@ -1395,7 +1397,8 @@ finalize_it: /* Delete a batch of processed user objects from the queue, which includes - * destructing the objects themself. + * destructing the objects themself. Any entries not marked as finally + * processed are enqueued again. The new enqueue is necessary because we have a * rgerhards, 2009-05-13 */ static inline rsRetVal @@ -1403,18 +1406,37 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) { int i; void *pUsr; + int nEnqueued = 0; + rsRetVal localRet; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); +dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state); for(i = 0 ; i < pBatch->nElem ; ++i) { dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch->pElem[i].state); pUsr = pBatch->pElem[i].pUsrp; + if( pBatch->pElem[i].state == BATCH_STATE_RDY + || pBatch->pElem[i].state == BATCH_STATE_SUB) { +RUNLOG_STR("we need to requeue the entry"); + localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, + (obj_t*)MsgAddRef((msg_t*) pUsr)); + ++nEnqueued; + if(localRet != RS_RET_OK) { + DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet); + } + } objDestruct(pUsr); } - iRet = DeleteBatchFromQStore(pThis, pBatch); +dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); + + if(nEnqueued > 0) + qqueueChkPersist(pThis, nEnqueued); + + if(i > 0) + iRet = DeleteBatchFromQStore(pThis, pBatch, i); pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ @@ -1423,7 +1445,11 @@ dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch /* dequeue as many user pointers as are available, until we hit the configured - * upper limit of pointers. + * upper limit of pointers. Note that this function also deletes all processed + * objects from the previous batch. However, it is perfectly valid that the + * previous batch contained NO objects at all. For example, this happens + * immediately after system startup or when a queue was exhausted and the queue + * worker needed to wait for new data. * This must only be called when the queue mutex is LOOKED, otherwise serious * malfunction will happen. */ @@ -1720,14 +1746,16 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) d_pthread_mutex_unlock(pThis->mut); /* iterate over returned results and enqueue them in DA queue */ - //for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { - for(i = 0 ; i < pWti->batch.nElem ; i++) { + for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { + //for(i = 0 ; i < pWti->batch.nElem ; i++) { /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs * the message. So far, we simply assume we always have msg_t, what currently is always the case. * rgerhards, 2009-05-28 */ dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp))->pszRawMsg); - CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); + CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, + (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); + pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */ } /* now we are done, but need to re-aquire the mutex */ @@ -1741,12 +1769,6 @@ finalize_it: /* must only be called when the queue mutex is locked, else results * are not stable! - * If we are a child, we have done our duty when the queue is empty. In that case, - * we can terminate. - * Version for the DA worker thread. NOTE: the pThis->bRunsDA is different from - * the DA queue. - * If our queue is in destruction, we drain to the DA queue and so we shall not terminate - * until we have done so. */ static rsRetVal qqueueChkStopWrkrDA(qqueue_t *pThis) @@ -1755,28 +1777,6 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) if(pThis->bEnqOnly) { iRet = RS_RET_TERMINATE_WHEN_IDLE; -#if 0 - } else { - if(pThis->bRunsDA) { - ASSERT(pThis->pqDA != NULL); - if( pThis->pqDA->bEnqOnly - && pThis->pqDA->sizeOnDiskMax > 0 - && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) { - /* this queue can never grow, so we can give up... */ - iRet = RS_RET_TERMINATE_NOW; - } else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { -dbgprintf("XXX: terminate_NOW DA worker: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk); - iRet = RS_RET_TERMINATE_NOW; -RUNLOG_STR("XXX: re-start reg worker"); -if(!pThis->bShutdownImmediate) - qqueueAdviseMaxWorkers(pThis); -RUNLOG_STR("XXX: done re-start reg worker"); - } - } else { - // experimental iRet = RS_RET_TERMINATE_NOW; - ; - } -#endif } RETiRet; diff --git a/tests/daqueue-persist-drvr.sh b/tests/daqueue-persist-drvr.sh index 30a7c635..7934eb2b 100755 --- a/tests/daqueue-persist-drvr.sh +++ b/tests/daqueue-persist-drvr.sh @@ -8,6 +8,9 @@ echo \[daqueue-persist-drvr.sh\]: testing memory daqueue persisting to disk, mode $1 source $srcdir/diag.sh init +#export RSYSLOG_DEBUG="debug nologfuncflow nostdout noprintmutexaction" +#export RSYSLOG_DEBUGLOG="log" + # prepare config echo \$MainMsgQueueType $1 > work-queuemode.conf echo "*.* :omtesting:sleep 0 1000" > work-delay.conf @@ -21,7 +24,7 @@ source $srcdir/diag.sh check-mainq-spool echo "Enter phase 2, rsyslogd restart" -#exit +exit # restart engine and have rest processed #remove delay |