diff options
Diffstat (limited to 'plugins/omhiredis/omhiredis.c')
-rw-r--r-- | plugins/omhiredis/omhiredis.c | 120 |
1 files changed, 68 insertions, 52 deletions
diff --git a/plugins/omhiredis/omhiredis.c b/plugins/omhiredis/omhiredis.c index 757d5eb2..28a1b9cd 100644 --- a/plugins/omhiredis/omhiredis.c +++ b/plugins/omhiredis/omhiredis.c @@ -52,14 +52,17 @@ DEFobjCurrIf(errmsg) * this will be accessable * via pData */ typedef struct _instanceData { - redisContext *conn; /* redis connection */ uchar *server; /* redis server address */ int port; /* redis port */ uchar *tplName; /* template name */ - redisReply **replies; /* array to hold replies from redis */ - int count; /* count of command sent for current batch */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; + redisContext *conn; /* redis connection */ + redisReply **replies; /* array to hold replies from redis */ + int count; /* count of command sent for current batch */ +} wrkrInstanceData_t; static struct cnfparamdescr actpdescr[] = { { "server", eCmdHdlrGetWord, 0 }, @@ -76,6 +79,11 @@ BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance + pWrkrData->conn = NULL; /* Connect later */ +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -83,11 +91,11 @@ CODESTARTisCompatibleWithFeature ENDisCompatibleWithFeature /* called when closing */ -static void closeHiredis(instanceData *pData) +static void closeHiredis(wrkrInstanceData_t *pWrkrData) { - if(pData->conn != NULL) { - redisFree(pData->conn); - pData->conn = NULL; + if(pWrkrData->conn != NULL) { + redisFree(pWrkrData->conn); + pWrkrData->conn = NULL; } } @@ -95,10 +103,15 @@ static void closeHiredis(instanceData *pData) * TODO: free **replies */ BEGINfreeInstance CODESTARTfreeInstance - closeHiredis(pData); - free(pData->server); + if (pData->server != NULL) { + free(pData->server); + } ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance + closeHiredis(pWrkrData); +ENDfreeWrkrInstance BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo @@ -106,17 +119,20 @@ CODESTARTdbgPrintInstInfo ENDdbgPrintInstInfo /* establish our connection to redis */ -static rsRetVal initHiredis(instanceData *pData, int bSilent) +static rsRetVal initHiredis(wrkrInstanceData_t *pWrkrData, int bSilent) { char *server; DEFiRet; - server = (pData->server == NULL) ? "127.0.0.1" : (char*) pData->server; - DBGPRINTF("omhiredis: trying connect to '%s' at port %d\n", server, pData->port); - + server = (pWrkrData->pData->server == NULL) ? "127.0.0.1" : + (char*) pWrkrData->pData->server; + DBGPRINTF("omhiredis: trying connect to '%s' at port %d\n", server, + pWrkrData->pData->port); + struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */ - pData->conn = redisConnectWithTimeout(server, pData->port, timeout); - if (pData->conn->err) { + pWrkrData->conn = redisConnectWithTimeout(server, pWrkrData->pData->port, + timeout); + if (pWrkrData->conn->err) { if(!bSilent) errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize redis handle"); @@ -126,29 +142,29 @@ finalize_it: RETiRet; } -rsRetVal writeHiredis(uchar *message, instanceData *pData) +rsRetVal writeHiredis(uchar *message, wrkrInstanceData_t *pWrkrData) { DEFiRet; - /* if we do not have a redis connection, call - * initHiredis and try to establish one */ - if(pData->conn == NULL) - CHKiRet(initHiredis(pData, 0)); + /* if we do not have a redis connection, call + * initHiredis and try to establish one */ + if(pWrkrData->conn == NULL) + CHKiRet(initHiredis(pWrkrData, 0)); - /* try to append the command to the pipeline. - * REDIS_ERR reply indicates something bad - * happened, in which case abort. otherwise - * increase our current pipeline count - * by 1 and continue. */ + /* try to append the command to the pipeline. + * REDIS_ERR reply indicates something bad + * happened, in which case abort. otherwise + * increase our current pipeline count + * by 1 and continue. */ int rc; - rc = redisAppendCommand(pData->conn, (char*)message); + rc = redisAppendCommand(pWrkrData->conn, (char*)message); if (rc == REDIS_ERR) { - errmsg.LogError(0, NO_ERRCODE, "omhiredis: %s", pData->conn->errstr); - dbgprintf("omhiredis: %s\n", pData->conn->errstr); + errmsg.LogError(0, NO_ERRCODE, "omhiredis: %s", pWrkrData->conn->errstr); + dbgprintf("omhiredis: %s\n", pWrkrData->conn->errstr); ABORT_FINALIZE(RS_RET_ERR); } else { - pData->count++; - } + pWrkrData->count++; + } finalize_it: RETiRet; @@ -158,17 +174,18 @@ finalize_it: * try to restablish our connection to redis */ BEGINtryResume CODESTARTtryResume - if(pData->conn == NULL) - iRet = initHiredis(pData, 0); + if(pWrkrData->conn == NULL) + iRet = initHiredis(pWrkrData, 0); ENDtryResume -/* begin a transaction. for now does nothing. +/* begin a transaction. * if I decide to use MULTI ... EXEC in the - * fture, this block should send the + * future, this block should send the * MULTI command to redis. */ BEGINbeginTransaction CODESTARTbeginTransaction - dbgprintf("omhiredis: beginTransaction called\n"); + dbgprintf("omhiredis: beginTransaction called\n"); + pWrkrData->count = 0; ENDbeginTransaction /* call writeHiredis for this log line, @@ -176,8 +193,8 @@ ENDbeginTransaction * current pipeline */ BEGINdoAction CODESTARTdoAction - CHKiRet(writeHiredis(ppString[0], pData)); - iRet = RS_RET_DEFER_COMMIT; + CHKiRet(writeHiredis(ppString[0], pWrkrData)); + iRet = RS_RET_DEFER_COMMIT; finalize_it: ENDdoAction @@ -189,16 +206,15 @@ ENDdoAction * which should be fixed */ BEGINendTransaction CODESTARTendTransaction - dbgprintf("omhiredis: endTransaction called\n"); - int i; - pData->replies = malloc ( sizeof ( redisReply* ) * pData->count ); - for ( i = 0; i < pData->count; i++ ) { - redisGetReply ( pData->conn, (void *)&pData->replies[i] ); - /* TODO: add error checking here! */ - freeReplyObject ( pData->replies[i] ); - } - free ( pData->replies ); - pData->count = 0; + dbgprintf("omhiredis: endTransaction called\n"); + int i; + pWrkrData->replies = malloc ( sizeof ( redisReply* ) * pWrkrData->count ); + for ( i = 0; i < pWrkrData->count; i++ ) { + redisGetReply ( pWrkrData->conn, (void *)&pWrkrData->replies[i] ); + /* TODO: add error checking here! */ + freeReplyObject ( pWrkrData->replies[i] ); + } + free ( pWrkrData->replies ); ENDendTransaction /* set defaults. note server is set to NULL @@ -211,7 +227,6 @@ setInstParamDefaults(instanceData *pData) pData->server = NULL; pData->port = 6379; pData->tplName = NULL; - pData->count = 0; } /* here is where the work to set up a new instance @@ -281,6 +296,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_TXIF_OMOD_QUERIES /* supports transaction interface */ ENDqueryEtryPt @@ -292,9 +308,9 @@ CODESTARTmodInit CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING); - if (!bCoreSupportsBatching) { - errmsg.LogError(0, NO_ERRCODE, "omhiredis: rsyslog core does not support batching - abort"); - ABORT_FINALIZE(RS_RET_ERR); - } + if (!bCoreSupportsBatching) { + errmsg.LogError(0, NO_ERRCODE, "omhiredis: rsyslog core does not support batching - abort"); + ABORT_FINALIZE(RS_RET_ERR); + } DBGPRINTF("omhiredis: module compiled with rsyslog version %s.\n", VERSION); ENDmodInit |