summaryrefslogtreecommitdiffstats
path: root/plugins/omhiredis
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/omhiredis')
-rw-r--r--plugins/omhiredis/omhiredis.c120
1 files changed, 68 insertions, 52 deletions
diff --git a/plugins/omhiredis/omhiredis.c b/plugins/omhiredis/omhiredis.c
index 757d5eb2..28a1b9cd 100644
--- a/plugins/omhiredis/omhiredis.c
+++ b/plugins/omhiredis/omhiredis.c
@@ -52,14 +52,17 @@ DEFobjCurrIf(errmsg)
* this will be accessable
* via pData */
typedef struct _instanceData {
- redisContext *conn; /* redis connection */
uchar *server; /* redis server address */
int port; /* redis port */
uchar *tplName; /* template name */
- redisReply **replies; /* array to hold replies from redis */
- int count; /* count of command sent for current batch */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+ redisContext *conn; /* redis connection */
+ redisReply **replies; /* array to hold replies from redis */
+ int count; /* count of command sent for current batch */
+} wrkrInstanceData_t;
static struct cnfparamdescr actpdescr[] = {
{ "server", eCmdHdlrGetWord, 0 },
@@ -76,6 +79,11 @@ BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ pWrkrData->conn = NULL; /* Connect later */
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
if(eFeat == sFEATURERepeatedMsgReduction)
@@ -83,11 +91,11 @@ CODESTARTisCompatibleWithFeature
ENDisCompatibleWithFeature
/* called when closing */
-static void closeHiredis(instanceData *pData)
+static void closeHiredis(wrkrInstanceData_t *pWrkrData)
{
- if(pData->conn != NULL) {
- redisFree(pData->conn);
- pData->conn = NULL;
+ if(pWrkrData->conn != NULL) {
+ redisFree(pWrkrData->conn);
+ pWrkrData->conn = NULL;
}
}
@@ -95,10 +103,15 @@ static void closeHiredis(instanceData *pData)
* TODO: free **replies */
BEGINfreeInstance
CODESTARTfreeInstance
- closeHiredis(pData);
- free(pData->server);
+ if (pData->server != NULL) {
+ free(pData->server);
+ }
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ closeHiredis(pWrkrData);
+ENDfreeWrkrInstance
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
@@ -106,17 +119,20 @@ CODESTARTdbgPrintInstInfo
ENDdbgPrintInstInfo
/* establish our connection to redis */
-static rsRetVal initHiredis(instanceData *pData, int bSilent)
+static rsRetVal initHiredis(wrkrInstanceData_t *pWrkrData, int bSilent)
{
char *server;
DEFiRet;
- server = (pData->server == NULL) ? "127.0.0.1" : (char*) pData->server;
- DBGPRINTF("omhiredis: trying connect to '%s' at port %d\n", server, pData->port);
-
+ server = (pWrkrData->pData->server == NULL) ? "127.0.0.1" :
+ (char*) pWrkrData->pData->server;
+ DBGPRINTF("omhiredis: trying connect to '%s' at port %d\n", server,
+ pWrkrData->pData->port);
+
struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */
- pData->conn = redisConnectWithTimeout(server, pData->port, timeout);
- if (pData->conn->err) {
+ pWrkrData->conn = redisConnectWithTimeout(server, pWrkrData->pData->port,
+ timeout);
+ if (pWrkrData->conn->err) {
if(!bSilent)
errmsg.LogError(0, RS_RET_SUSPENDED,
"can not initialize redis handle");
@@ -126,29 +142,29 @@ finalize_it:
RETiRet;
}
-rsRetVal writeHiredis(uchar *message, instanceData *pData)
+rsRetVal writeHiredis(uchar *message, wrkrInstanceData_t *pWrkrData)
{
DEFiRet;
- /* if we do not have a redis connection, call
- * initHiredis and try to establish one */
- if(pData->conn == NULL)
- CHKiRet(initHiredis(pData, 0));
+ /* if we do not have a redis connection, call
+ * initHiredis and try to establish one */
+ if(pWrkrData->conn == NULL)
+ CHKiRet(initHiredis(pWrkrData, 0));
- /* try to append the command to the pipeline.
- * REDIS_ERR reply indicates something bad
- * happened, in which case abort. otherwise
- * increase our current pipeline count
- * by 1 and continue. */
+ /* try to append the command to the pipeline.
+ * REDIS_ERR reply indicates something bad
+ * happened, in which case abort. otherwise
+ * increase our current pipeline count
+ * by 1 and continue. */
int rc;
- rc = redisAppendCommand(pData->conn, (char*)message);
+ rc = redisAppendCommand(pWrkrData->conn, (char*)message);
if (rc == REDIS_ERR) {
- errmsg.LogError(0, NO_ERRCODE, "omhiredis: %s", pData->conn->errstr);
- dbgprintf("omhiredis: %s\n", pData->conn->errstr);
+ errmsg.LogError(0, NO_ERRCODE, "omhiredis: %s", pWrkrData->conn->errstr);
+ dbgprintf("omhiredis: %s\n", pWrkrData->conn->errstr);
ABORT_FINALIZE(RS_RET_ERR);
} else {
- pData->count++;
- }
+ pWrkrData->count++;
+ }
finalize_it:
RETiRet;
@@ -158,17 +174,18 @@ finalize_it:
* try to restablish our connection to redis */
BEGINtryResume
CODESTARTtryResume
- if(pData->conn == NULL)
- iRet = initHiredis(pData, 0);
+ if(pWrkrData->conn == NULL)
+ iRet = initHiredis(pWrkrData, 0);
ENDtryResume
-/* begin a transaction. for now does nothing.
+/* begin a transaction.
* if I decide to use MULTI ... EXEC in the
- * fture, this block should send the
+ * future, this block should send the
* MULTI command to redis. */
BEGINbeginTransaction
CODESTARTbeginTransaction
- dbgprintf("omhiredis: beginTransaction called\n");
+ dbgprintf("omhiredis: beginTransaction called\n");
+ pWrkrData->count = 0;
ENDbeginTransaction
/* call writeHiredis for this log line,
@@ -176,8 +193,8 @@ ENDbeginTransaction
* current pipeline */
BEGINdoAction
CODESTARTdoAction
- CHKiRet(writeHiredis(ppString[0], pData));
- iRet = RS_RET_DEFER_COMMIT;
+ CHKiRet(writeHiredis(ppString[0], pWrkrData));
+ iRet = RS_RET_DEFER_COMMIT;
finalize_it:
ENDdoAction
@@ -189,16 +206,15 @@ ENDdoAction
* which should be fixed */
BEGINendTransaction
CODESTARTendTransaction
- dbgprintf("omhiredis: endTransaction called\n");
- int i;
- pData->replies = malloc ( sizeof ( redisReply* ) * pData->count );
- for ( i = 0; i < pData->count; i++ ) {
- redisGetReply ( pData->conn, (void *)&pData->replies[i] );
- /* TODO: add error checking here! */
- freeReplyObject ( pData->replies[i] );
- }
- free ( pData->replies );
- pData->count = 0;
+ dbgprintf("omhiredis: endTransaction called\n");
+ int i;
+ pWrkrData->replies = malloc ( sizeof ( redisReply* ) * pWrkrData->count );
+ for ( i = 0; i < pWrkrData->count; i++ ) {
+ redisGetReply ( pWrkrData->conn, (void *)&pWrkrData->replies[i] );
+ /* TODO: add error checking here! */
+ freeReplyObject ( pWrkrData->replies[i] );
+ }
+ free ( pWrkrData->replies );
ENDendTransaction
/* set defaults. note server is set to NULL
@@ -211,7 +227,6 @@ setInstParamDefaults(instanceData *pData)
pData->server = NULL;
pData->port = 6379;
pData->tplName = NULL;
- pData->count = 0;
}
/* here is where the work to set up a new instance
@@ -281,6 +296,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_TXIF_OMOD_QUERIES /* supports transaction interface */
ENDqueryEtryPt
@@ -292,9 +308,9 @@ CODESTARTmodInit
CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING);
- if (!bCoreSupportsBatching) {
- errmsg.LogError(0, NO_ERRCODE, "omhiredis: rsyslog core does not support batching - abort");
- ABORT_FINALIZE(RS_RET_ERR);
- }
+ if (!bCoreSupportsBatching) {
+ errmsg.LogError(0, NO_ERRCODE, "omhiredis: rsyslog core does not support batching - abort");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
DBGPRINTF("omhiredis: module compiled with rsyslog version %s.\n", VERSION);
ENDmodInit