From cdecd7e524a5114ccff4f2909b32738bdb33c6ea Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Oct 2008 11:46:46 +0200 Subject: slightly improved lock contention situation by moving out of the critical section what could so with acceptable consequences --- runtime/queue.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 76c2f10f..f5f770b3 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2101,6 +2101,15 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr) ISOBJ_TYPE_assert(pThis, queue); + /* first check if we need to discard this message (which will cause CHKiRet() to exit) + * rgerhards, 2008-10-07: It is OK to do this outside of mutex protection. The iQueueSize + * and bRunsDA parameters may not reflect the correct settings here, but they are + * "good enough" in the sense that they can be used to drive the decision. Valgrind's + * threading tools may point this access to be an error, but this is done + * intentional. I do not see this causes problems to us. + */ + CHKiRet(queueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr)); + /* Please note that this function is not cancel-safe and consequently * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE * during its execution. If that is not done, race conditions occur if the @@ -2112,9 +2121,6 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr) d_pthread_mutex_lock(pThis->mut); } - /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ - CHKiRet(queueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr)); - /* then check if we need to add an assistance disk queue */ if(pThis->bIsDA) CHKiRet(queueChkStrtDA(pThis)); -- cgit v1.2.3 From ace4f2f75202aec39449dac11b9eb1deca7428d7 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 8 Oct 2008 18:55:11 +0200 Subject: reordered imudp processing. Message parsing is now done as part of main message queue worker processing (was part of the input thread) This should also improve performance, as potentially more work is done in parallel. --- runtime/queue.c | 2 -- 1 file changed, 2 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index f5f770b3..25c0bd5f 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1513,8 +1513,6 @@ queueRateLimiter(queue_t *pThis) ISOBJ_TYPE_assert(pThis, queue); - dbgoprint((obj_t*) pThis, "entering rate limiter\n"); - iDelay = 0; if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */ /* time calls are expensive, so only do them when needed */ -- cgit v1.2.3 From af3c944563b8651c14b2b9087ea76f7eeacfc065 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 9 Oct 2008 11:21:24 +0200 Subject: added experimental pthread_yield() which so far seems to increase performance. There is also reason for it to do so, see http://kb.monitorware.com/post14216.html#p14216 --- runtime/queue.c | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 25c0bd5f..3ef6d5a0 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2185,6 +2185,11 @@ finalize_it: /* and release the mutex */ d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); + /* the following pthread_yield is experimental, but brought us performance + * benefit. For details, please see http://kb.monitorware.com/post14216.html#p14216 + * rgerhards, 2008-10-09 + */ + pthread_yield(); } RETiRet; -- cgit v1.2.3 From 1229143ca3d194bb0daaa5252809d59ee0c3a0bf Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 9 Oct 2008 12:56:28 +0200 Subject: minor: reorder to slightly reduce size of critical section --- runtime/queue.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 3ef6d5a0..2d1bf7e3 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2181,10 +2181,10 @@ finalize_it: if(pThis->qType != QUEUETYPE_DIRECT) { /* make sure at least one worker is running. */ queueAdviseMaxWorkers(pThis); - dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n"); /* and release the mutex */ d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); + dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n"); /* the following pthread_yield is experimental, but brought us performance * benefit. For details, please see http://kb.monitorware.com/post14216.html#p14216 * rgerhards, 2008-10-09 -- cgit v1.2.3 From 6c6e9a0f3f7d454ba9553a750b195d7f99c7299a Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 9 Oct 2008 13:45:56 +0200 Subject: moved bParseHostname and bIsParsed to msgFlags This enables us to use more efficient calling conventions and also helps us keep the on-disk structure of a msg object more consistent in future releases. --- runtime/queue.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index b0043ef5..42b8137d 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -49,6 +49,7 @@ #include "obj.h" #include "wtp.h" #include "wti.h" +#include "atomic.h" /* static data */ DEFobjStaticHelpers @@ -996,7 +997,7 @@ queueAdd(queue_t *pThis, void *pUsr) CHKiRet(pThis->qAdd(pThis, pUsr)); if(pThis->qType != QUEUETYPE_DIRECT) { - ++pThis->iQueueSize; + ATOMIC_INC(pThis->iQueueSize); dbgoprint((obj_t*) pThis, "entry added, size now %d entries\n", pThis->iQueueSize); } @@ -1025,7 +1026,7 @@ queueDel(queue_t *pThis, void *pUsr) iRet = queueGetUngottenObj(pThis, (obj_t**) pUsr); } else { iRet = pThis->qDel(pThis, pUsr); - --pThis->iQueueSize; + ATOMIC_DEC(pThis->iQueueSize); } dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n", -- cgit v1.2.3 From a27e249e445deecb1d9ef57fbbb203d71bf061dd Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 21 Oct 2008 10:55:33 +0200 Subject: bugfix: (potentially big) memory leak on HUP This occured if queues could not be drained before timeout. Thanks to David Lang for pointing this out. --- runtime/queue.c | 45 ++++++++++++++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 13 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 42b8137d..7fa2df91 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -88,6 +88,30 @@ ENDfunc return pThis->iQueueSize + pThis->iUngottenObjs; } + +/* This function drains the queue in cases where this needs to be done. The most probable + * reason is a HUP which needs to discard data (because the queue is configured to be lossy). + * During a shutdown, this is typically not needed, as the OS frees up ressources and does + * this much quicker than when we clean up ourselvs. -- rgerhards, 2008-10-21 + * This function returns void, as it makes no sense to communicate an error back, even if + * it happens. + */ +static inline void queueDrain(queue_t *pThis) +{ + void *pUsr; + + ASSERT(pThis != NULL); + + /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ + while(pThis->iQueueSize-- > 0) { + pThis->qDel(pThis, &pUsr); + if(pUsr != NULL) { + objDestruct(pUsr); + } + } +} + + /* --------------- code for disk-assisted (DA) queue modes -------------------- */ @@ -196,14 +220,6 @@ queueTurnOffDAMode(queue_t *pThis) } } - /* TODO: we have a *really biiiiig* memory leak here: if the queue could not be persisted, all of - * its data elements are still in memory. That doesn't really matter if we are terminated, but on - * HUP this memory leaks. We MUST add a loop of destructor calls here. However, this takes time - * (possibly a lot), so it is probably best to have a config variable for that. - * Something for 3.11.1! - * rgerhards, 2008-01-30 - */ - RETiRet; } @@ -461,12 +477,15 @@ static rsRetVal qDestructFixedArray(queue_t *pThis) ASSERT(pThis != NULL); + queueDrain(pThis); /* discard any remaining queue entries */ + if(pThis->tVars.farray.pBuf != NULL) free(pThis->tVars.farray.pBuf); RETiRet; } + static rsRetVal qAddFixedArray(queue_t *pThis, void* in) { DEFiRet; @@ -570,11 +589,11 @@ static rsRetVal qConstructLinkedList(queue_t *pThis) static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis) { DEFiRet; - - /* with the linked list type, there is nothing to do here. The - * reason is that the Destructor is only called after all entries - * have bene taken off the queue. In this case, there is nothing - * dynamic left with the linked list. + + queueDrain(pThis); /* discard any remaining queue entries */ + + /* with the linked list type, there is nothing left to do here. The + * reason is that there are no dynamic elements for the list itself. */ RETiRet; -- cgit v1.2.3 From cf38fc81759b01af5125b1a05e0d6fe8e2e1bc21 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 22 Oct 2008 13:54:40 +0200 Subject: added a setting "$OptimizeForUniprocessor" ...to enable users to turn off pthread_yield calls which are counter-productive on multiprocessor machines (but have been shown to be useful on uniprocessors) --- runtime/queue.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 7fa2df91..a3e29a96 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1285,6 +1285,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, /* we have an object, so let's fill the properties */ objConstructSetObjInfo(pThis); + pThis->bOptimizeUniProc = glbl.GetOptimizeUniProc(); if((pThis->pszSpoolDir = (uchar*) strdup((char*)glbl.GetWorkDir())) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); @@ -2208,8 +2209,10 @@ finalize_it: /* the following pthread_yield is experimental, but brought us performance * benefit. For details, please see http://kb.monitorware.com/post14216.html#p14216 * rgerhards, 2008-10-09 + * but this is only true for uniprocessors, so we guard it with an optimize flag -- rgerhards, 2008-10-22 */ - pthread_yield(); + if(pThis->bOptimizeUniProc) + pthread_yield(); } RETiRet; -- cgit v1.2.3