diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/imgssapi/imgssapi.c | 16 | ||||
-rw-r--r-- | plugins/imudp/imudp.c | 20 | ||||
-rw-r--r-- | plugins/imuxsock/imuxsock.c | 34 | ||||
-rw-r--r-- | plugins/omoracle/omoracle.c | 64 |
4 files changed, 110 insertions, 24 deletions
diff --git a/plugins/imgssapi/imgssapi.c b/plugins/imgssapi/imgssapi.c index d8791880..1aad6622 100644 --- a/plugins/imgssapi/imgssapi.c +++ b/plugins/imgssapi/imgssapi.c @@ -56,6 +56,7 @@ #include "errmsg.h" #include "netstrm.h" #include "glbl.h" +#include "unlimited_select.h" MODULE_TYPE_INPUT @@ -414,15 +415,20 @@ OnSessAcceptGSS(tcpsrv_t *pThis, tcps_sess_t *pSess) CHKiRet(netstrm.GetSock(pSess->pStrm, &fdSess)); // TODO: method access! if (allowedMethods & ALLOWEDMETHOD_TCP) { int len; - fd_set fds; struct timeval tv; +#ifdef USE_UNLIMITED_SELECT + fd_set *pFds = malloc(glbl.GetFdSetSize()); +#else + fd_set fds; + fd_set *pFds = &fds; +#endif do { - FD_ZERO(&fds); - FD_SET(fdSess, &fds); + FD_ZERO(pFds); + FD_SET(fdSess, pFds); tv.tv_sec = 1; tv.tv_usec = 0; - ret = select(fdSess + 1, &fds, NULL, NULL, &tv); + ret = select(fdSess + 1, pFds, NULL, NULL, &tv); } while (ret < 0 && errno == EINTR); if (ret < 0) { errmsg.LogError(0, RS_RET_ERR, "TCP session %p will be closed, error ignored\n", pSess); @@ -475,6 +481,8 @@ OnSessAcceptGSS(tcpsrv_t *pThis, tcps_sess_t *pSess) pGSess->allowedMethods = ALLOWEDMETHOD_TCP; ABORT_FINALIZE(RS_RET_OK); // TODO: define good error codes } + + freeFdSet(pFds); } context = &pGSess->gss_context; diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 0970259d..ecc36c28 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -45,6 +45,7 @@ #include "datetime.h" #include "prop.h" #include "unicode-helper.h" +#include "unlimited_select.h" MODULE_TYPE_INPUT @@ -287,12 +288,18 @@ BEGINrunInput int maxfds; int nfds; int i; - fd_set readfds; struct sockaddr_storage frominetPrev; int bIsPermitted; uchar fromHost[NI_MAXHOST]; uchar fromHostIP[NI_MAXHOST]; uchar fromHostFQDN[NI_MAXHOST]; +#ifdef USE_UNLIMITED_SELECT + fd_set *pReadfds = malloc(glbl.GetFdSetSize()); +#else + fd_set readfds; + fd_set *pReadfds = &readfds; +#endif + CODESTARTrunInput /* start "name caching" algo by making sure the previous system indicator * is invalidated. @@ -311,30 +318,30 @@ CODESTARTrunInput * is given without -a, we do not need to listen at all.. */ maxfds = 0; - FD_ZERO (&readfds); + FD_ZERO (pReadfds); /* Add the UDP listen sockets to the list of read descriptors. */ for (i = 0; i < *udpLstnSocks; i++) { if (udpLstnSocks[i+1] != -1) { if(Debug) net.debugListenInfo(udpLstnSocks[i+1], "UDP"); - FD_SET(udpLstnSocks[i+1], &readfds); + FD_SET(udpLstnSocks[i+1], pReadfds); if(udpLstnSocks[i+1]>maxfds) maxfds=udpLstnSocks[i+1]; } } if(Debug) { dbgprintf("--------imUDP calling select, active file descriptors (max %d): ", maxfds); for (nfds = 0; nfds <= maxfds; ++nfds) - if ( FD_ISSET(nfds, &readfds) ) + if ( FD_ISSET(nfds, pReadfds) ) dbgprintf("%d ", nfds); dbgprintf("\n"); } /* wait for io to become ready */ - nfds = select(maxfds+1, (fd_set *) &readfds, NULL, NULL, NULL); + nfds = select(maxfds+1, (fd_set *) pReadfds, NULL, NULL, NULL); for(i = 0; nfds && i < *udpLstnSocks; i++) { - if(FD_ISSET(udpLstnSocks[i+1], &readfds)) { + if(FD_ISSET(udpLstnSocks[i+1], pReadfds)) { processSocket(udpLstnSocks[i+1], &frominetPrev, &bIsPermitted, fromHost, fromHostFQDN, fromHostIP); --nfds; /* indicate we have processed one descriptor */ @@ -343,6 +350,7 @@ CODESTARTrunInput /* end of a run, back to loop for next recv() */ } + freeFdSet(pReadfds); return iRet; ENDrunInput diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index 424d0904..5aa3c25b 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -45,6 +45,7 @@ #include "glbl.h" #include "msg.h" #include "prop.h" +#include "unlimited_select.h" MODULE_TYPE_INPUT @@ -77,6 +78,7 @@ static int startIndexUxLocalSockets; /* process funix from that index on (used t */ static int funixParseHost[MAXFUNIX] = { 0, }; /* should parser parse host name? read-only after startup */ static int funixFlags[MAXFUNIX] = { IGNDATE, }; /* should parser parse host name? read-only after startup */ +static int funixCreateSockPath[MAXFUNIX] = { 0, }; /* auto-creation of socket directory? */ static uchar *funixn[MAXFUNIX] = { (uchar*) _PATH_LOG }; /* read-only after startup */ static uchar *funixHName[MAXFUNIX] = { NULL, }; /* host-name override - if set, use this instead of actual name */ static int funixFlowCtl[MAXFUNIX] = { eFLOWCTL_NO_DELAY, }; /* flow control settings for this socket */ @@ -89,6 +91,8 @@ static uchar *pLogSockName = NULL; static uchar *pLogHostName = NULL; /* host name to use with this socket */ static int bUseFlowCtl = 0; /* use flow control or not (if yes, only LIGHT is used! */ static int bIgnoreTimestamp = 1; /* ignore timestamps present in the incoming message? */ +#define DFLT_bCreateSockPath 0 +static int bCreateSockPath = DFLT_bCreateSockPath; /* auto-create socket path? */ /* set the timestamp ignore / not ignore option for the system @@ -132,6 +136,7 @@ static rsRetVal addLstnSocketName(void __attribute__((unused)) *pVal, uchar *pNe pLogHostName = NULL; /* re-init for next, not freed because funixHName[] now owns it */ funixFlowCtl[nfunix] = bUseFlowCtl ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY; funixFlags[nfunix] = bIgnoreTimestamp ? IGNDATE : NOFLAG; + funixCreateSockPath[nfunix] = bCreateSockPath; funixn[nfunix++] = pNewVal; } else { @@ -165,7 +170,7 @@ static rsRetVal discardFunixn(void) } -static int create_unix_socket(const char *path) +static int create_unix_socket(const char *path, int bCreatePath) { struct sockaddr_un sunx; int fd; @@ -177,6 +182,9 @@ static int create_unix_socket(const char *path) memset(&sunx, 0, sizeof(sunx)); sunx.sun_family = AF_UNIX; + if(bCreatePath) { + makeFileParentDirs((uchar*)path, strlen(path), 0755, -1, -1, 0); + } (void) strncpy(sunx.sun_path, path, sizeof(sunx.sun_path)); fd = socket(AF_UNIX, SOCK_DGRAM, 0); if (fd < 0 || bind(fd, (struct sockaddr *) &sunx, SUN_LEN(&sunx)) < 0 || @@ -249,7 +257,13 @@ BEGINrunInput int nfds; int i; int fd; - fd_set readfds; +#ifdef USE_UNLIMITED_SELECT + fd_set *pReadfds = malloc(glbl.GetFdSetSize()); +#else + fd_set readfds; + fd_set *pReadfds = &readfds; +#endif + CODESTARTrunInput /* this is an endless loop - it is terminated when the thread is * signalled to do so. This, however, is handled by the framework, @@ -263,11 +277,11 @@ CODESTARTrunInput * is given without -a, we do not need to listen at all.. */ maxfds = 0; - FD_ZERO (&readfds); + FD_ZERO (pReadfds); /* Copy master connections */ for (i = startIndexUxLocalSockets; i < nfunix; i++) { if (funix[i] != -1) { - FD_SET(funix[i], &readfds); + FD_SET(funix[i], pReadfds); if (funix[i]>maxfds) maxfds=funix[i]; } } @@ -275,22 +289,23 @@ CODESTARTrunInput if(Debug) { dbgprintf("--------imuxsock calling select, active file descriptors (max %d): ", maxfds); for (nfds= 0; nfds <= maxfds; ++nfds) - if ( FD_ISSET(nfds, &readfds) ) + if ( FD_ISSET(nfds, pReadfds) ) dbgprintf("%d ", nfds); dbgprintf("\n"); } /* wait for io to become ready */ - nfds = select(maxfds+1, (fd_set *) &readfds, NULL, NULL, NULL); + nfds = select(maxfds+1, (fd_set *) pReadfds, NULL, NULL, NULL); for (i = 0; i < nfunix && nfds > 0; i++) { - if ((fd = funix[i]) != -1 && FD_ISSET(fd, &readfds)) { + if ((fd = funix[i]) != -1 && FD_ISSET(fd, pReadfds)) { readSocket(fd, i); --nfds; /* indicate we have processed one */ } } } + freeFdSet(pReadfds); RETiRet; ENDrunInput @@ -306,7 +321,7 @@ CODESTARTwillRun /* initialize and return if will run or not */ for (i = startIndexUxLocalSockets ; i < nfunix ; i++) { - if ((funix[i] = create_unix_socket((char*) funixn[i])) != -1) + if ((funix[i] = create_unix_socket((char*) funixn[i], funixCreateSockPath[i])) != -1) dbgprintf("Opened UNIX socket '%s' (fd %d).\n", funixn[i], funix[i]); } @@ -376,6 +391,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a nfunix = 1; bIgnoreTimestamp = 1; bUseFlowCtl = 0; + bCreateSockPath = DFLT_bCreateSockPath; return RS_RET_OK; } @@ -409,6 +425,8 @@ CODEmodInit_QueryRegCFSLineHdlr NULL, &pLogHostName, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketflowcontrol", 0, eCmdHdlrBinary, NULL, &bUseFlowCtl, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketcreatepath", 0, eCmdHdlrBinary, + NULL, &bCreateSockPath, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"addunixlistensocket", 0, eCmdHdlrGetWord, addLstnSocketName, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, diff --git a/plugins/omoracle/omoracle.c b/plugins/omoracle/omoracle.c index 331b7dd4..48ee1fa4 100644 --- a/plugins/omoracle/omoracle.c +++ b/plugins/omoracle/omoracle.c @@ -47,9 +47,9 @@ $OmoracleStatement \ insert into foo(hostname,message)values(:host,:message) - Also note that identifiers to placeholders are arbitrarry. You - need to define the properties on the template in the correct order - you want them passed to the statement! + Also note that identifiers to placeholders are arbitrary. You need + to define the properties on the template in the correct order you + want them passed to the statement! This file is licensed under the terms of the GPL version 3 or, at your choice, any later version. Exceptionally (perhaps), you are @@ -87,7 +87,8 @@ MODULE_TYPE_OUTPUT DEF_OMOD_STATIC_DATA DEFobjCurrIf(errmsg) -/** */ +/** Structure defining a batch of items to be sent to the database in + * the same statement execution. */ struct oracle_batch { /* Batch size */ @@ -162,8 +163,10 @@ static int oci_errors(void* handle, ub4 htype, sword status) return OCI_SUCCESS; break; case OCI_SUCCESS_WITH_INFO: - errmsg.LogError(0, NO_ERRCODE, "OCI SUCCESS - With info\n"); - break; + OCIErrorGet(handle, 1, NULL, &errcode, buf, sizeof buf, htype); + errmsg.LogError(0, NO_ERRCODE, "OCI SUCCESS - With info: %s", + buf); + return OCI_SUCCESS_WITH_INFO; case OCI_NEED_DATA: errmsg.LogError(0, NO_ERRCODE, "OCI NEEDS MORE DATA\n"); break; @@ -180,6 +183,9 @@ static int oci_errors(void* handle, ub4 htype, sword status) break; case OCI_INVALID_HANDLE: errmsg.LogError(0, NO_ERRCODE, "OCI INVALID HANDLE\n"); + /* In this case we may have to trigger a call to + * tryResume(). */ + return RS_RET_SUSPENDED; break; case OCI_STILL_EXECUTING: errmsg.LogError(0, NO_ERRCODE, "Still executing...\n"); @@ -332,6 +338,48 @@ CODESTARTcreateInstance finalize_it: ENDcreateInstance +/* Analyses the errors during a batch statement execution, and logs + * all the corresponding ORA-MESSAGES, together with some useful + * information. */ +static void log_detailed_err(instanceData* pData) +{ + DEFiRet; + int errs, i, row, code, j; + OCIError *er = NULL, *er2 = NULL; + unsigned char buf[MAX_BUFSIZE]; + + OCIAttrGet(pData->statement, OCI_HTYPE_STMT, &errs, 0, + OCI_ATTR_NUM_DML_ERRORS, pData->error); + errmsg.LogError(0, NO_ERRCODE, "OCI: %d errors in execution of " + "statement: %s", errs, pData->txt_statement); + + CHECKENV(pData->environment, + OCIHandleAlloc(pData->environment, &er, OCI_HTYPE_ERROR, + 0, NULL)); + CHECKENV(pData->environment, + OCIHandleAlloc(pData->environment, &er2, OCI_HTYPE_ERROR, + 0, NULL)); + + for (i = 0; i < errs; i++) { + OCIParamGet(pData->error, OCI_HTYPE_ERROR, + er2, &er, i); + OCIAttrGet(er, OCI_HTYPE_ERROR, &row, 0, + OCI_ATTR_DML_ROW_OFFSET, er2); + errmsg.LogError(0, NO_ERRCODE, "OCI failure in row %d:", row); + for (j = 0; j < pData->batch.arguments; j++) + errmsg.LogError(0, NO_ERRCODE, "%s", + pData->batch.parameters[j][row]); + OCIErrorGet(er, 1, NULL, &code, buf, sizeof buf, + OCI_HTYPE_ERROR); + errmsg.LogError(0, NO_ERRCODE, "FAILURE DETAILS: %s", buf); + } + +finalize_it: + OCIHandleFree(er, OCI_HTYPE_ERROR); + OCIHandleFree(er2, OCI_HTYPE_ERROR); +} + + /* Inserts all stored statements into the database, releasing any * allocated memory. */ static int insert_to_db(instanceData* pData) @@ -346,6 +394,10 @@ static int insert_to_db(instanceData* pData) OCI_BATCH_ERRORS)); finalize_it: + if (iRet == OCI_SUCCESS_WITH_INFO) { + log_detailed_err(pData); + iRet = RS_RET_OK; + } pData->batch.n = 0; OCITransCommit(pData->service, pData->error, 0); dbgprintf ("omoracle insertion to DB %s\n", iRet == RS_RET_OK ? |