diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2013-10-29 17:19:15 +0100 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2013-10-29 17:19:15 +0100 |
commit | 5878ee24c7232d0f4f8655e7af6a1d456c07492f (patch) | |
tree | bc4176525e9deaa480bb9af132008dda53f75bba | |
parent | 8f27fab8340f39b070d2b6734e2f9005f12d61b9 (diff) | |
download | rsyslog-5878ee24c7232d0f4f8655e7af6a1d456c07492f.tar.gz rsyslog-5878ee24c7232d0f4f8655e7af6a1d456c07492f.tar.bz2 rsyslog-5878ee24c7232d0f4f8655e7af6a1d456c07492f.zip |
milestone: action mutex removed
This means actions may now be called concurrently and must make
provisions themselves to handle multi-threading.
-rw-r--r-- | action.c | 33 | ||||
-rw-r--r-- | action.h | 1 | ||||
-rw-r--r-- | tools/omfile.c | 9 |
3 files changed, 12 insertions, 31 deletions
@@ -307,7 +307,6 @@ rsRetVal actionDestruct(action_t *pThis) pThis->pMod->freeInstance(pThis->pModData); pthread_mutex_destroy(&pThis->mutAction); - pthread_mutex_destroy(&pThis->mutActExec); d_free(pThis->pszName); d_free(pThis->ppTpl); @@ -341,7 +340,6 @@ rsRetVal actionConstruct(action_t **ppThis) pThis->bRepMsgHasMsg = 0; pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */ pThis->iActionNbr = iActionNbr; - pthread_mutex_init(&pThis->mutActExec, NULL); pthread_mutex_init(&pThis->mutAction, NULL); INIT_ATOMIC_HELPER_MUT(pThis->mutCAS); @@ -1267,7 +1265,6 @@ finalize_it: } -#pragma GCC diagnostic ignored "-Wempty-body" /* receive an array of to-process user pointers and submit them * for processing. * rgerhards, 2009-04-22 @@ -1290,18 +1287,8 @@ processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmed } CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr)); - /* We now must guard the output module against execution by multiple threads. The - * plugin interface specifies that output modules must not be thread-safe (except - * if they notify us they are - functionality not yet implemented...). - * rgerhards, 2008-01-30 - */ - d_pthread_mutex_lock(&pAction->mutActExec); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - iRet = processAction(pAction, pBatch, pWti); - pthread_cleanup_pop(1); /* unlock mutex */ - /* even if processAction failed, we need to release the batch (else we * have a memory leak). So we do this first, and then check if we need to * return an error code. If so, the code from processAction has priority. @@ -1322,15 +1309,12 @@ finalize_it: pBatch->pbShutdownImmediate = pbShutdownImmdtSave; RETiRet; } -#pragma GCC diagnostic warning "-Wempty-body" -/* call the HUP handler for a given action, if such a handler is defined. The - * action mutex is locked, because the HUP handler most probably needs to modify - * some internal state information. - * rgerhards, 2008-10-22 +/* call the HUP handler for a given action, if such a handler is defined. + * Note that the action must be able to service HUP requests concurrently + * to any current doAction() processing. */ -#pragma GCC diagnostic ignored "-Wempty-body" rsRetVal actionCallHUPHdlr(action_t *pAction) { @@ -1339,19 +1323,13 @@ actionCallHUPHdlr(action_t *pAction) ASSERT(pAction != NULL); DBGPRINTF("Action %p checks HUP hdlr: %p\n", pAction, pAction->pMod->doHUP); - if(pAction->pMod->doHUP == NULL) { - FINALIZE; /* no HUP handler, so we are done ;) */ + if(pAction->pMod->doHUP != NULL) { + CHKiRet(pAction->pMod->doHUP(pAction->pModData)); } - d_pthread_mutex_lock(&pAction->mutActExec); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - CHKiRet(pAction->pMod->doHUP(pAction->pModData)); - pthread_cleanup_pop(1); /* unlock mutex */ - finalize_it: RETiRet; } -#pragma GCC diagnostic warning "-Wempty-body" /* set the action message queue mode @@ -1739,6 +1717,7 @@ doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) DEFiRet; d_pthread_mutex_lock(&pAction->mutAction); +dbgprintf("DDDD: locked mutAction\n"); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction); iRet = helperSubmitToActionQComplexBatch(pAction, pBatch, pWti); d_pthread_mutex_unlock(&pAction->mutAction); @@ -79,7 +79,6 @@ struct action_s { * in this order. */ qqueue_t *pQueue; /* action queue */ pthread_mutex_t mutAction; /* primary action mutex */ - pthread_mutex_t mutActExec; /* mutex to guard actual execution of doAction for single-threaded modules */ uchar *pszName; /* action name (for documentation) */ DEF_ATOMIC_HELPER_MUT(mutCAS); /* for statistics subsystem */ diff --git a/tools/omfile.c b/tools/omfile.c index cecca07b..f2731aa5 100644 --- a/tools/omfile.c +++ b/tools/omfile.c @@ -807,8 +807,6 @@ writeFile(instanceData *pData, linebuf_t *linebuf) ASSERT(pData != NULL); - pthread_mutex_lock(&pData->mutWrite); - /* first check if we have a dynamic file name and, if so, * check if it still is ok or a new file needs to be created */ @@ -826,7 +824,6 @@ writeFile(instanceData *pData, linebuf_t *linebuf) CHKiRet(doWrite(pData, linebuf->ln, ustrlen(linebuf->ln))); finalize_it: - pthread_mutex_unlock(&pData->mutWrite); RETiRet; } @@ -981,6 +978,10 @@ submitCachedLines(wrkrInstanceData_t *pWrkrData, instanceData *pData) { linebuf_t *curr, *todel; +dbgprintf("DDDD: omfile: waiting on write lock (pWrkrData %p)\n", pWrkrData); + pthread_mutex_lock(&pData->mutWrite); +dbgprintf("DDDD: omfile: aquired write lock (pWrkrData %p)\n", pWrkrData); + for(curr = pWrkrData->pRoot ; curr != NULL ; ) { DBGPRINTF("omfile: file to log to: %s\n", curr->filename); DBGPRINTF("omfile: start of data: '%.128s'\n", curr->ln); @@ -993,6 +994,8 @@ submitCachedLines(wrkrInstanceData_t *pWrkrData, instanceData *pData) free(todel->ln); free(todel); } + pthread_mutex_unlock(&pData->mutWrite); +dbgprintf("DDDD: omfile: free write lock (pWrkrData %p)\n", pWrkrData); pWrkrData->pRoot = NULL; } |