diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-08 13:37:19 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-08 13:37:19 +0000 |
commit | 8d0a174a86d29dbec6412cb1bd38f87b3b3c059b (patch) | |
tree | bcaf1c0cdbdd015eb3366d9af5ec4f9a2ddf9e4a /queue.c | |
parent | 47ccbe9c67c0b3ca518449d80be387ca09904026 (diff) | |
download | rsyslog-8d0a174a86d29dbec6412cb1bd38f87b3b3c059b.tar.gz rsyslog-8d0a174a86d29dbec6412cb1bd38f87b3b3c059b.tar.bz2 rsyslog-8d0a174a86d29dbec6412cb1bd38f87b3b3c059b.zip |
- first implementation of "disk" queue mode finished. It still needs some
work and the deserializer needs also to be expanded, but the queue at
least performs well now.
- fixed a race condition that could occur when input modules were
terminated
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 37 |
1 files changed, 27 insertions, 10 deletions
@@ -1,3 +1,4 @@ +#include <stdio.h> // TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in // call consumer state. Facilitates retaining messages in queue until action could // be called! @@ -127,12 +128,10 @@ static rsRetVal qConstructLinkedList(queue_t *pThis) } -static rsRetVal qDestructLinkedList(queue_t *pThis) +static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis) { DEFiRet; - assert(pThis != NULL); - /* with the linked list type, there is nothing to do here. The * reason is that the Destructor is only called after all entries * have bene taken off the queue. In this case, there is nothing @@ -211,12 +210,14 @@ static rsRetVal qDiskOpenFile(queue_t *pThis, queueFileDescription_t *pFile, int dbgprintf("Queue 0x%lx: opened file '%s' for %d as %d\n", (unsigned long) pThis, pFile->pszFileName, flags, pFile->fd); finalize_it: -dbgprintf("qDiskOpen iRet %d\n", iRet); return iRet; } -/* close a queue file */ +/* close a queue file + * Note that the bDeleteOnClose flag is honored. If it is set, the file will be + * deleted after close. This is in support for the qRead thread. + */ static rsRetVal qDiskCloseFile(queue_t *pThis, queueFileDescription_t *pFile) { DEFiRet; @@ -228,6 +229,10 @@ static rsRetVal qDiskCloseFile(queue_t *pThis, queueFileDescription_t *pFile) close(pFile->fd); // TODO: error check pFile->fd = -1; + if(pFile->bDeleteOnClose) { + unlink((char*) pThis->tVars.disk.fRead.pszFileName); // TODO: check returncode + } + if(pFile->pszFileName != NULL) { free(pFile->pszFileName); /* no longer needed in any case (just for open) */ pFile->pszFileName = NULL; @@ -375,17 +380,19 @@ static rsRetVal qConstructDisk(queue_t *pThis) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); pThis->tVars.disk.lenSpoolDir = strlen((char*)pThis->tVars.disk.pszSpoolDir); - pThis->tVars.disk.iMaxFileSize = 1024 * 3; // TODO: configurable! + pThis->tVars.disk.iMaxFileSize = 10240000; //1024 * 3; // TODO: configurable! pThis->tVars.disk.fWrite.iCurrFileNum = 1; pThis->tVars.disk.fWrite.iCurrOffs = 0; pThis->tVars.disk.fWrite.fd = -1; pThis->tVars.disk.fWrite.iUngetC = -1; + pThis->tVars.disk.fRead.bDeleteOnClose = 0; /* do *NOT* set this to 1! */ pThis->tVars.disk.fRead.iCurrFileNum = 1; pThis->tVars.disk.fRead.fd = -1; pThis->tVars.disk.fRead.iCurrOffs = 0; pThis->tVars.disk.fRead.iUngetC = -1; + pThis->tVars.disk.fRead.bDeleteOnClose = 1; finalize_it: return iRet; @@ -418,7 +425,7 @@ static rsRetVal qAddDisk(queue_t *pThis, void* pUsr) assert(pThis != NULL); if(pThis->tVars.disk.fWrite.fd == -1) - CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fWrite, O_RDWR|O_CREAT, 0600)); // TODO: open modes! + CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fWrite, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes! CHKiRet((objSerialize(pUsr))(pUsr, &pCStr)); iWritten = write(pThis->tVars.disk.fWrite.fd, rsCStrGetBufBeg(pCStr), rsCStrLen(pCStr)); @@ -564,7 +571,7 @@ queueDel(queue_t *pThis, void *pUsr) * Please NOTE: * Having more than one worker requires considerable * additional code review in regard to thread-safety. - */ +*/ static void * queueWorker(void *arg) { @@ -774,16 +781,24 @@ rsRetVal queueEnqObj(queue_t *pThis, void *pUsr) { DEFiRet; + int iCancelStateSave; int i; struct timespec t; assert(pThis != NULL); - if(pThis->qType != QUEUETYPE_DIRECT) + /* Please note that this function is not cancel-safe and consequently + * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE + * during its execution. If that is not done, race conditions occur if the + * thread is canceled (most important use case is input module termination). + * rgerhards, 2008-01-08 + */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + if(pThis->pWorkerThreads != NULL) pthread_mutex_lock(pThis->mut); while(pThis->iQueueSize >= pThis->iMaxQueueSize) { - dbgprintf("enqueueMsg: queue 0x%lx FULL.\n", (unsigned long) pThis); + dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", (unsigned long) pThis); clock_gettime (CLOCK_REALTIME, &t); t.tv_sec += 2; /* TODO: configurable! */ @@ -805,6 +820,8 @@ finalize_it: dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i); } + pthread_setcancelstate(iCancelStateSave, NULL); + return iRet; } /* |