summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/imgssapi/imgssapi.c16
-rw-r--r--plugins/imudp/imudp.c20
-rw-r--r--plugins/imuxsock/imuxsock.c34
-rw-r--r--plugins/omoracle/omoracle.c64
4 files changed, 110 insertions, 24 deletions
diff --git a/plugins/imgssapi/imgssapi.c b/plugins/imgssapi/imgssapi.c
index d8791880..1aad6622 100644
--- a/plugins/imgssapi/imgssapi.c
+++ b/plugins/imgssapi/imgssapi.c
@@ -56,6 +56,7 @@
#include "errmsg.h"
#include "netstrm.h"
#include "glbl.h"
+#include "unlimited_select.h"
MODULE_TYPE_INPUT
@@ -414,15 +415,20 @@ OnSessAcceptGSS(tcpsrv_t *pThis, tcps_sess_t *pSess)
CHKiRet(netstrm.GetSock(pSess->pStrm, &fdSess)); // TODO: method access!
if (allowedMethods & ALLOWEDMETHOD_TCP) {
int len;
- fd_set fds;
struct timeval tv;
+#ifdef USE_UNLIMITED_SELECT
+ fd_set *pFds = malloc(glbl.GetFdSetSize());
+#else
+ fd_set fds;
+ fd_set *pFds = &fds;
+#endif
do {
- FD_ZERO(&fds);
- FD_SET(fdSess, &fds);
+ FD_ZERO(pFds);
+ FD_SET(fdSess, pFds);
tv.tv_sec = 1;
tv.tv_usec = 0;
- ret = select(fdSess + 1, &fds, NULL, NULL, &tv);
+ ret = select(fdSess + 1, pFds, NULL, NULL, &tv);
} while (ret < 0 && errno == EINTR);
if (ret < 0) {
errmsg.LogError(0, RS_RET_ERR, "TCP session %p will be closed, error ignored\n", pSess);
@@ -475,6 +481,8 @@ OnSessAcceptGSS(tcpsrv_t *pThis, tcps_sess_t *pSess)
pGSess->allowedMethods = ALLOWEDMETHOD_TCP;
ABORT_FINALIZE(RS_RET_OK); // TODO: define good error codes
}
+
+ freeFdSet(pFds);
}
context = &pGSess->gss_context;
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index 0970259d..ecc36c28 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -45,6 +45,7 @@
#include "datetime.h"
#include "prop.h"
#include "unicode-helper.h"
+#include "unlimited_select.h"
MODULE_TYPE_INPUT
@@ -287,12 +288,18 @@ BEGINrunInput
int maxfds;
int nfds;
int i;
- fd_set readfds;
struct sockaddr_storage frominetPrev;
int bIsPermitted;
uchar fromHost[NI_MAXHOST];
uchar fromHostIP[NI_MAXHOST];
uchar fromHostFQDN[NI_MAXHOST];
+#ifdef USE_UNLIMITED_SELECT
+ fd_set *pReadfds = malloc(glbl.GetFdSetSize());
+#else
+ fd_set readfds;
+ fd_set *pReadfds = &readfds;
+#endif
+
CODESTARTrunInput
/* start "name caching" algo by making sure the previous system indicator
* is invalidated.
@@ -311,30 +318,30 @@ CODESTARTrunInput
* is given without -a, we do not need to listen at all..
*/
maxfds = 0;
- FD_ZERO (&readfds);
+ FD_ZERO (pReadfds);
/* Add the UDP listen sockets to the list of read descriptors. */
for (i = 0; i < *udpLstnSocks; i++) {
if (udpLstnSocks[i+1] != -1) {
if(Debug)
net.debugListenInfo(udpLstnSocks[i+1], "UDP");
- FD_SET(udpLstnSocks[i+1], &readfds);
+ FD_SET(udpLstnSocks[i+1], pReadfds);
if(udpLstnSocks[i+1]>maxfds) maxfds=udpLstnSocks[i+1];
}
}
if(Debug) {
dbgprintf("--------imUDP calling select, active file descriptors (max %d): ", maxfds);
for (nfds = 0; nfds <= maxfds; ++nfds)
- if ( FD_ISSET(nfds, &readfds) )
+ if ( FD_ISSET(nfds, pReadfds) )
dbgprintf("%d ", nfds);
dbgprintf("\n");
}
/* wait for io to become ready */
- nfds = select(maxfds+1, (fd_set *) &readfds, NULL, NULL, NULL);
+ nfds = select(maxfds+1, (fd_set *) pReadfds, NULL, NULL, NULL);
for(i = 0; nfds && i < *udpLstnSocks; i++) {
- if(FD_ISSET(udpLstnSocks[i+1], &readfds)) {
+ if(FD_ISSET(udpLstnSocks[i+1], pReadfds)) {
processSocket(udpLstnSocks[i+1], &frominetPrev, &bIsPermitted,
fromHost, fromHostFQDN, fromHostIP);
--nfds; /* indicate we have processed one descriptor */
@@ -343,6 +350,7 @@ CODESTARTrunInput
/* end of a run, back to loop for next recv() */
}
+ freeFdSet(pReadfds);
return iRet;
ENDrunInput
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index 424d0904..5aa3c25b 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -45,6 +45,7 @@
#include "glbl.h"
#include "msg.h"
#include "prop.h"
+#include "unlimited_select.h"
MODULE_TYPE_INPUT
@@ -77,6 +78,7 @@ static int startIndexUxLocalSockets; /* process funix from that index on (used t
*/
static int funixParseHost[MAXFUNIX] = { 0, }; /* should parser parse host name? read-only after startup */
static int funixFlags[MAXFUNIX] = { IGNDATE, }; /* should parser parse host name? read-only after startup */
+static int funixCreateSockPath[MAXFUNIX] = { 0, }; /* auto-creation of socket directory? */
static uchar *funixn[MAXFUNIX] = { (uchar*) _PATH_LOG }; /* read-only after startup */
static uchar *funixHName[MAXFUNIX] = { NULL, }; /* host-name override - if set, use this instead of actual name */
static int funixFlowCtl[MAXFUNIX] = { eFLOWCTL_NO_DELAY, }; /* flow control settings for this socket */
@@ -89,6 +91,8 @@ static uchar *pLogSockName = NULL;
static uchar *pLogHostName = NULL; /* host name to use with this socket */
static int bUseFlowCtl = 0; /* use flow control or not (if yes, only LIGHT is used! */
static int bIgnoreTimestamp = 1; /* ignore timestamps present in the incoming message? */
+#define DFLT_bCreateSockPath 0
+static int bCreateSockPath = DFLT_bCreateSockPath; /* auto-create socket path? */
/* set the timestamp ignore / not ignore option for the system
@@ -132,6 +136,7 @@ static rsRetVal addLstnSocketName(void __attribute__((unused)) *pVal, uchar *pNe
pLogHostName = NULL; /* re-init for next, not freed because funixHName[] now owns it */
funixFlowCtl[nfunix] = bUseFlowCtl ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY;
funixFlags[nfunix] = bIgnoreTimestamp ? IGNDATE : NOFLAG;
+ funixCreateSockPath[nfunix] = bCreateSockPath;
funixn[nfunix++] = pNewVal;
}
else {
@@ -165,7 +170,7 @@ static rsRetVal discardFunixn(void)
}
-static int create_unix_socket(const char *path)
+static int create_unix_socket(const char *path, int bCreatePath)
{
struct sockaddr_un sunx;
int fd;
@@ -177,6 +182,9 @@ static int create_unix_socket(const char *path)
memset(&sunx, 0, sizeof(sunx));
sunx.sun_family = AF_UNIX;
+ if(bCreatePath) {
+ makeFileParentDirs((uchar*)path, strlen(path), 0755, -1, -1, 0);
+ }
(void) strncpy(sunx.sun_path, path, sizeof(sunx.sun_path));
fd = socket(AF_UNIX, SOCK_DGRAM, 0);
if (fd < 0 || bind(fd, (struct sockaddr *) &sunx, SUN_LEN(&sunx)) < 0 ||
@@ -249,7 +257,13 @@ BEGINrunInput
int nfds;
int i;
int fd;
- fd_set readfds;
+#ifdef USE_UNLIMITED_SELECT
+ fd_set *pReadfds = malloc(glbl.GetFdSetSize());
+#else
+ fd_set readfds;
+ fd_set *pReadfds = &readfds;
+#endif
+
CODESTARTrunInput
/* this is an endless loop - it is terminated when the thread is
* signalled to do so. This, however, is handled by the framework,
@@ -263,11 +277,11 @@ CODESTARTrunInput
* is given without -a, we do not need to listen at all..
*/
maxfds = 0;
- FD_ZERO (&readfds);
+ FD_ZERO (pReadfds);
/* Copy master connections */
for (i = startIndexUxLocalSockets; i < nfunix; i++) {
if (funix[i] != -1) {
- FD_SET(funix[i], &readfds);
+ FD_SET(funix[i], pReadfds);
if (funix[i]>maxfds) maxfds=funix[i];
}
}
@@ -275,22 +289,23 @@ CODESTARTrunInput
if(Debug) {
dbgprintf("--------imuxsock calling select, active file descriptors (max %d): ", maxfds);
for (nfds= 0; nfds <= maxfds; ++nfds)
- if ( FD_ISSET(nfds, &readfds) )
+ if ( FD_ISSET(nfds, pReadfds) )
dbgprintf("%d ", nfds);
dbgprintf("\n");
}
/* wait for io to become ready */
- nfds = select(maxfds+1, (fd_set *) &readfds, NULL, NULL, NULL);
+ nfds = select(maxfds+1, (fd_set *) pReadfds, NULL, NULL, NULL);
for (i = 0; i < nfunix && nfds > 0; i++) {
- if ((fd = funix[i]) != -1 && FD_ISSET(fd, &readfds)) {
+ if ((fd = funix[i]) != -1 && FD_ISSET(fd, pReadfds)) {
readSocket(fd, i);
--nfds; /* indicate we have processed one */
}
}
}
+ freeFdSet(pReadfds);
RETiRet;
ENDrunInput
@@ -306,7 +321,7 @@ CODESTARTwillRun
/* initialize and return if will run or not */
for (i = startIndexUxLocalSockets ; i < nfunix ; i++) {
- if ((funix[i] = create_unix_socket((char*) funixn[i])) != -1)
+ if ((funix[i] = create_unix_socket((char*) funixn[i], funixCreateSockPath[i])) != -1)
dbgprintf("Opened UNIX socket '%s' (fd %d).\n", funixn[i], funix[i]);
}
@@ -376,6 +391,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
nfunix = 1;
bIgnoreTimestamp = 1;
bUseFlowCtl = 0;
+ bCreateSockPath = DFLT_bCreateSockPath;
return RS_RET_OK;
}
@@ -409,6 +425,8 @@ CODEmodInit_QueryRegCFSLineHdlr
NULL, &pLogHostName, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketflowcontrol", 0, eCmdHdlrBinary,
NULL, &bUseFlowCtl, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketcreatepath", 0, eCmdHdlrBinary,
+ NULL, &bCreateSockPath, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"addunixlistensocket", 0, eCmdHdlrGetWord,
addLstnSocketName, NULL, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler,
diff --git a/plugins/omoracle/omoracle.c b/plugins/omoracle/omoracle.c
index 331b7dd4..48ee1fa4 100644
--- a/plugins/omoracle/omoracle.c
+++ b/plugins/omoracle/omoracle.c
@@ -47,9 +47,9 @@
$OmoracleStatement \
insert into foo(hostname,message)values(:host,:message)
- Also note that identifiers to placeholders are arbitrarry. You
- need to define the properties on the template in the correct order
- you want them passed to the statement!
+ Also note that identifiers to placeholders are arbitrary. You need
+ to define the properties on the template in the correct order you
+ want them passed to the statement!
This file is licensed under the terms of the GPL version 3 or, at
your choice, any later version. Exceptionally (perhaps), you are
@@ -87,7 +87,8 @@ MODULE_TYPE_OUTPUT
DEF_OMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
-/** */
+/** Structure defining a batch of items to be sent to the database in
+ * the same statement execution. */
struct oracle_batch
{
/* Batch size */
@@ -162,8 +163,10 @@ static int oci_errors(void* handle, ub4 htype, sword status)
return OCI_SUCCESS;
break;
case OCI_SUCCESS_WITH_INFO:
- errmsg.LogError(0, NO_ERRCODE, "OCI SUCCESS - With info\n");
- break;
+ OCIErrorGet(handle, 1, NULL, &errcode, buf, sizeof buf, htype);
+ errmsg.LogError(0, NO_ERRCODE, "OCI SUCCESS - With info: %s",
+ buf);
+ return OCI_SUCCESS_WITH_INFO;
case OCI_NEED_DATA:
errmsg.LogError(0, NO_ERRCODE, "OCI NEEDS MORE DATA\n");
break;
@@ -180,6 +183,9 @@ static int oci_errors(void* handle, ub4 htype, sword status)
break;
case OCI_INVALID_HANDLE:
errmsg.LogError(0, NO_ERRCODE, "OCI INVALID HANDLE\n");
+ /* In this case we may have to trigger a call to
+ * tryResume(). */
+ return RS_RET_SUSPENDED;
break;
case OCI_STILL_EXECUTING:
errmsg.LogError(0, NO_ERRCODE, "Still executing...\n");
@@ -332,6 +338,48 @@ CODESTARTcreateInstance
finalize_it:
ENDcreateInstance
+/* Analyses the errors during a batch statement execution, and logs
+ * all the corresponding ORA-MESSAGES, together with some useful
+ * information. */
+static void log_detailed_err(instanceData* pData)
+{
+ DEFiRet;
+ int errs, i, row, code, j;
+ OCIError *er = NULL, *er2 = NULL;
+ unsigned char buf[MAX_BUFSIZE];
+
+ OCIAttrGet(pData->statement, OCI_HTYPE_STMT, &errs, 0,
+ OCI_ATTR_NUM_DML_ERRORS, pData->error);
+ errmsg.LogError(0, NO_ERRCODE, "OCI: %d errors in execution of "
+ "statement: %s", errs, pData->txt_statement);
+
+ CHECKENV(pData->environment,
+ OCIHandleAlloc(pData->environment, &er, OCI_HTYPE_ERROR,
+ 0, NULL));
+ CHECKENV(pData->environment,
+ OCIHandleAlloc(pData->environment, &er2, OCI_HTYPE_ERROR,
+ 0, NULL));
+
+ for (i = 0; i < errs; i++) {
+ OCIParamGet(pData->error, OCI_HTYPE_ERROR,
+ er2, &er, i);
+ OCIAttrGet(er, OCI_HTYPE_ERROR, &row, 0,
+ OCI_ATTR_DML_ROW_OFFSET, er2);
+ errmsg.LogError(0, NO_ERRCODE, "OCI failure in row %d:", row);
+ for (j = 0; j < pData->batch.arguments; j++)
+ errmsg.LogError(0, NO_ERRCODE, "%s",
+ pData->batch.parameters[j][row]);
+ OCIErrorGet(er, 1, NULL, &code, buf, sizeof buf,
+ OCI_HTYPE_ERROR);
+ errmsg.LogError(0, NO_ERRCODE, "FAILURE DETAILS: %s", buf);
+ }
+
+finalize_it:
+ OCIHandleFree(er, OCI_HTYPE_ERROR);
+ OCIHandleFree(er2, OCI_HTYPE_ERROR);
+}
+
+
/* Inserts all stored statements into the database, releasing any
* allocated memory. */
static int insert_to_db(instanceData* pData)
@@ -346,6 +394,10 @@ static int insert_to_db(instanceData* pData)
OCI_BATCH_ERRORS));
finalize_it:
+ if (iRet == OCI_SUCCESS_WITH_INFO) {
+ log_detailed_err(pData);
+ iRet = RS_RET_OK;
+ }
pData->batch.n = 0;
OCITransCommit(pData->service, pData->error, 0);
dbgprintf ("omoracle insertion to DB %s\n", iRet == RS_RET_OK ?