diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-05 13:39:40 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-05 13:39:40 +0000 |
commit | 62154cdde95ad579d4b2b98f59fac4817be8a0f4 (patch) | |
tree | 29a65e0ece3080baa9015436ce0704da83d8f06e /queue.c | |
parent | e055d4921b9a53e9dfedc56bbff3a9b12400d34d (diff) | |
download | rsyslog-62154cdde95ad579d4b2b98f59fac4817be8a0f4.tar.gz rsyslog-62154cdde95ad579d4b2b98f59fac4817be8a0f4.tar.bz2 rsyslog-62154cdde95ad579d4b2b98f59fac4817be8a0f4.zip |
added the "direct" queueing mode to queue class (no queing at all)
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 89 |
1 files changed, 71 insertions, 18 deletions
@@ -252,6 +252,45 @@ rsRetVal qDelDisk(queue_t *pThis, void **ppUsr) return iRet; } +/* -------------------- direct (no queueing) -------------------- */ +rsRetVal qConstructDirect(queue_t __attribute__((unused)) *pThis) +{ + return RS_RET_OK; +} + + +rsRetVal qDestructDirect(queue_t __attribute__((unused)) *pThis) +{ + return RS_RET_OK; +} + +rsRetVal qAddDirect(queue_t *pThis, void* pUsr) +{ + DEFiRet; + rsRetVal iRetLocal; + + assert(pThis != NULL); + + /* TODO: calling the consumer should go into its own function! -- rgerhards, 2008-01-05*/ + iRetLocal = pThis->pConsumer(pUsr); + if(iRetLocal != RS_RET_OK) + dbgprintf("Queue 0x%lx: Consumer returned iRet %d\n", + (unsigned long) pThis, iRetLocal); + --pThis->iQueueSize; /* this is kind of a hack, but its the smartest thing we can do given + * the somewhat astonishing fact that this queue type does not actually + * queue anything ;) + */ + + return iRet; +} + +rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out) +{ + return RS_RET_OK; +} + + + /* --------------- end type-specific handlers -------------------- */ @@ -409,6 +448,12 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize, pThis->qAdd = qAddDisk; pThis->qDel = qDelDisk; break; + case QUEUETYPE_DIRECT: + pThis->qConstruct = qConstructDirect; + pThis->qDestruct = qDestructDirect; + pThis->qAdd = qAddDirect; + pThis->qDel = qDelDirect; + break; } /* call type-specific constructor */ @@ -433,11 +478,13 @@ rsRetVal queueStart(queue_t *pThis) { int i; - /* fire up the worker thread */ - pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */ - i = pthread_create(&pThis->thrdWorker, NULL, queueWorker, (void*) pThis); - dbgprintf("Worker thread for queue 0x%lx, type %d started with state %d.\n", - (unsigned long) pThis, (int) pThis->qType, i); + if(pThis->qType != QUEUETYPE_DIRECT) { + /* fire up the worker thread */ + pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */ + i = pthread_create(&pThis->thrdWorker, NULL, queueWorker, (void*) pThis); + dbgprintf("Worker thread for queue 0x%lx, type %d started with state %d.\n", + (unsigned long) pThis, (int) pThis->qType, i); + } return RS_RET_OK; } @@ -449,15 +496,17 @@ rsRetVal queueDestruct(queue_t *pThis) assert(pThis != NULL); - /* first stop the worker thread */ - dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis); - pThis->bDoRun = 0; - /* It's actually not "not empty" below but awaking the worker. The worker - * then finds out that it shall terminate and does so. - */ - pthread_cond_signal(pThis->notEmpty); - pthread_join(pThis->thrdWorker, NULL); - dbgprintf("Worker thread for queue 0x%lx terminated.\n", (unsigned long) pThis); + if(pThis->qType != QUEUETYPE_DIRECT) { + /* first stop the worker thread */ + dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis); + pThis->bDoRun = 0; + /* It's actually not "not empty" below but awaking the worker. The worker + * then finds out that it shall terminate and does so. + */ + pthread_cond_signal(pThis->notEmpty); + pthread_join(pThis->thrdWorker, NULL); + dbgprintf("Worker thread for queue 0x%lx terminated.\n", (unsigned long) pThis); + } /* ... then free resources */ pthread_mutex_destroy(pThis->mut); @@ -492,7 +541,8 @@ queueEnqObj(queue_t *pThis, void *pUsr) assert(pThis != NULL); - pthread_mutex_lock(pThis->mut); + if(pThis->qType != QUEUETYPE_DIRECT) + pthread_mutex_lock(pThis->mut); while(pThis->iQueueSize >= pThis->iMaxQueueSize) { dbgprintf("enqueueMsg: queue 0x%lx FULL.\n", (unsigned long) pThis); @@ -511,9 +561,12 @@ queueEnqObj(queue_t *pThis, void *pUsr) finalize_it: /* now activate the worker thread */ - pthread_mutex_unlock(pThis->mut); - i = pthread_cond_signal(pThis->notEmpty); - dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i); + if(pThis->qType != QUEUETYPE_DIRECT) { + pthread_mutex_unlock(pThis->mut); + i = pthread_cond_signal(pThis->notEmpty); + dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i); + } + return iRet; } /* |