diff options
Diffstat (limited to 'plugins/imdiag/imdiag.c')
-rw-r--r-- | plugins/imdiag/imdiag.c | 190 |
1 files changed, 175 insertions, 15 deletions
diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c index 40f94692..81b357ef 100644 --- a/plugins/imdiag/imdiag.c +++ b/plugins/imdiag/imdiag.c @@ -52,6 +52,8 @@ #include "errmsg.h" #include "tcpsrv.h" #include "srUtils.h" +#include "msg.h" +#include "datetime.h" #include "net.h" /* for permittedPeers, may be removed when this is removed */ MODULE_TYPE_INPUT @@ -63,10 +65,15 @@ DEFobjCurrIf(tcps_sess) DEFobjCurrIf(net) DEFobjCurrIf(netstrm) DEFobjCurrIf(errmsg) +DEFobjCurrIf(datetime) +DEFobjCurrIf(prop) /* Module static data */ static tcpsrv_t *pOurTcpsrv = NULL; /* our TCP server(listener) TODO: change for multiple instances */ static permittedPeers_t *pPermPeersRoot = NULL; +static prop_t *pInputName = NULL; /* there is only one global inputName for all messages generated by this input */ +static prop_t *pRcvDummy = NULL; +static prop_t *pRcvIPDummy = NULL; /* config settings */ @@ -134,35 +141,157 @@ onErrClose(tcps_sess_t *pSess) /* ------------------------------ end callbacks ------------------------------ */ +/* get the first word delimited by space from a given string. The pointer is + * advanced to after the word. Any leading spaces are discarded. If the + * output buffer is too small, parsing ends on buffer full condition. + * An empty buffer is returned if there is no more data inside the string. + * rgerhards, 2009-05-27 + */ +#define TO_LOWERCASE 1 +#define NO_MODIFY 0 +static void +getFirstWord(uchar **ppszSrc, uchar *pszBuf, size_t lenBuf, int options) +{ + uchar c; + uchar *pszSrc = *ppszSrc; + + while(*pszSrc && *pszSrc == ' ') + ++pszSrc; /* skip to first non-space */ + + while(*pszSrc && *pszSrc != ' ' && lenBuf > 1) { + c = *pszSrc++; + if(options & TO_LOWERCASE) + c = tolower(c); + *pszBuf++ = c; + lenBuf--; + } + + *pszBuf = '\0'; + *ppszSrc = pszSrc; +} + + +/* send a response back to the originator + * rgerhards, 2009-05-27 + */ +static rsRetVal __attribute__((format(printf, 2, 3))) +sendResponse(tcps_sess_t *pSess, char *fmt, ...) +{ + va_list ap; + ssize_t len; + uchar buf[1024]; + DEFiRet; + + va_start(ap, fmt); + len = vsnprintf((char*)buf, sizeof(buf), fmt, ap); + va_end(ap); + CHKiRet(netstrm.Send(pSess->pStrm, buf, &len)); + +finalize_it: + RETiRet; +} + + +/* actually submit a message to the rsyslog core + */ +static rsRetVal +doInjectMsg(int iNum) +{ + uchar szMsg[1024]; + msg_t *pMsg; + struct syslogTime stTime; + time_t ttGenTime; + DEFiRet; + + snprintf((char*)szMsg, sizeof(szMsg)/sizeof(uchar), + "<167>Mar 1 01:00:00 172.20.245.8 tag msgnum:%8.8d:\n", iNum); + + datetime.getCurrTime(&stTime, &ttGenTime); + /* we now create our own message object and submit it to the queue */ + CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); + MsgSetRawMsg(pMsg, (char*) szMsg, ustrlen(szMsg)); + MsgSetInputName(pMsg, pInputName); + MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); + pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; + MsgSetRcvFrom(pMsg, pRcvDummy); + CHKiRet(MsgSetRcvFromIP(pMsg, pRcvIPDummy)); + CHKiRet(submitMsg(pMsg)); + +finalize_it: + RETiRet; +} + + +/* This function injects messages. Command format: + * injectmsg <fromnbr> <number-of-messages> + * rgerhards, 2009-05-27 + */ +static rsRetVal +injectMsg(uchar *pszCmd, tcps_sess_t *pSess) +{ + uchar wordBuf[1024]; + int iFrom; + int nMsgs; + int i; + DEFiRet; + + /* we do not check errors here! */ + getFirstWord(&pszCmd, wordBuf, sizeof(wordBuf)/sizeof(uchar), TO_LOWERCASE); + iFrom = atoi((char*)wordBuf); + getFirstWord(&pszCmd, wordBuf, sizeof(wordBuf)/sizeof(uchar), TO_LOWERCASE); + nMsgs = atoi((char*)wordBuf); + + for(i = 0 ; i < nMsgs ; ++i) { + doInjectMsg(i + iFrom); + } + + CHKiRet(sendResponse(pSess, "%d messages injected\n", nMsgs)); + +finalize_it: + RETiRet; +} + + /* This function waits until the main queue is drained (size = 0) */ static rsRetVal -waitMainQEmpty(void) +waitMainQEmpty(tcps_sess_t *pSess) { int iMsgQueueSize; + int iPrint = 0; DEFiRet; CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize)); while(iMsgQueueSize > 0) { + /* DEV DEBUG ONLY if(iPrint++ % 500) + printf("imdiag: main msg queue size: %d\n", iMsgQueueSize); + */ + if(iPrint++ % 500 == 0) + dbgprintf("imdiag sleeping, wait mainq drain, curr size %d\n", iMsgQueueSize); srSleep(0,2); /* wait a little bit */ CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize)); + if(iMsgQueueSize == 0) { + /* verify that queue is still empty (else it could just be a race!) */ + srSleep(1,5); /* wait a little bit */ + CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize)); + } } + CHKiRet(sendResponse(pSess, "mainqueue empty\n")); + finalize_it: RETiRet; } - /* Function to handle received messages. This is our core function! * rgerhards, 2009-05-24 */ static rsRetVal OnMsgReceived(tcps_sess_t *pSess, uchar *pRcv, int iLenMsg) { - ssize_t len; int iMsgQueueSize; uchar *pszMsg; - uchar buf[1024]; + uchar cmdBuf[1024]; DEFiRet; assert(pSess != NULL); @@ -172,21 +301,23 @@ OnMsgReceived(tcps_sess_t *pSess, uchar *pRcv, int iLenMsg) * WITHOUT a termination \0 char. So we need to convert it to one * before proceeding. */ - CHKmalloc(pszMsg = malloc(sizeof(uchar) * (iLenMsg + 1))); + CHKmalloc(pszMsg = MALLOC(sizeof(uchar) * (iLenMsg + 1))); memcpy(pszMsg, pRcv, iLenMsg); pszMsg[iLenMsg] = '\0'; - if(!ustrcmp(pszMsg, UCHAR_CONSTANT("GetMainMsgQueueSize"))) { + getFirstWord(&pszMsg, cmdBuf, sizeof(cmdBuf)/sizeof(uchar), TO_LOWERCASE); + + dbgprintf("imdiag received command '%s'\n", cmdBuf); + if(!ustrcmp(cmdBuf, UCHAR_CONSTANT("getmainmsgqueuesize"))) { CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize)); - len = snprintf((char*)buf, sizeof(buf)/sizeof(uchar), "%d\n", iMsgQueueSize); - CHKiRet(netstrm.Send(pSess->pStrm, buf, &len)); - } else if(!ustrcmp(pszMsg, UCHAR_CONSTANT("WaitMainQueueEmpty"))) { - CHKiRet(waitMainQEmpty()); - len = snprintf((char*)buf, sizeof(buf)/sizeof(uchar), "mainqueue empty\n"); - CHKiRet(netstrm.Send(pSess->pStrm, buf, &len)); + CHKiRet(sendResponse(pSess, "%d\n", iMsgQueueSize)); + } else if(!ustrcmp(cmdBuf, UCHAR_CONSTANT("waitmainqueueempty"))) { + CHKiRet(waitMainQEmpty(pSess)); + } else if(!ustrcmp(cmdBuf, UCHAR_CONSTANT("injectmsg"))) { + CHKiRet(injectMsg(pszMsg, pSess)); } else { - len = snprintf((char*)buf, sizeof(buf)/sizeof(uchar), "unkown command '%s'\n", pszMsg); - CHKiRet(netstrm.Send(pSess->pStrm, buf, &len)); + dbgprintf("imdiag unkown command '%s'\n", cmdBuf); + CHKiRet(sendResponse(pSess, "unkown command '%s'\n", cmdBuf)); } finalize_it: @@ -260,13 +391,31 @@ CODESTARTwillRun /* first apply some config settings */ if(pOurTcpsrv == NULL) ABORT_FINALIZE(RS_RET_NO_RUN); + /* we need to create the inputName property (only once during our lifetime) */ + CHKiRet(prop.Construct(&pInputName)); + CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("imdiag"), sizeof("imdiag") - 1)); + CHKiRet(prop.ConstructFinalize(pInputName)); + + CHKiRet(prop.Construct(&pRcvDummy)); + CHKiRet(prop.SetString(pRcvDummy, UCHAR_CONSTANT("127.0.0.1"), sizeof("127.0.0.1") - 1)); + CHKiRet(prop.ConstructFinalize(pRcvDummy)); + + CHKiRet(prop.Construct(&pRcvIPDummy)); + CHKiRet(prop.SetString(pRcvIPDummy, UCHAR_CONSTANT("127.0.0.1"), sizeof("127.0.0.1") - 1)); + CHKiRet(prop.ConstructFinalize(pRcvIPDummy)); + finalize_it: ENDwillRun BEGINafterRun CODESTARTafterRun - /* do cleanup here */ + if(pInputName != NULL) + prop.Destruct(&pInputName); + if(pRcvDummy != NULL) + prop.Destruct(&pRcvDummy); + if(pRcvIPDummy != NULL) + prop.Destruct(&pRcvIPDummy); ENDafterRun @@ -285,6 +434,8 @@ CODESTARTmodExit objRelease(tcps_sess, LM_TCPSRV_FILENAME); objRelease(tcpsrv, LM_TCPSRV_FILENAME); objRelease(errmsg, CORE_COMPONENT); + objRelease(datetime, CORE_COMPONENT); + objRelease(prop, CORE_COMPONENT); ENDmodExit @@ -303,10 +454,17 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus } +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURENonCancelInputTermination) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt @@ -321,6 +479,8 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(tcps_sess, LM_TCPSRV_FILENAME)); CHKiRet(objUse(tcpsrv, LM_TCPSRV_FILENAME)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(datetime, CORE_COMPONENT)); + CHKiRet(objUse(prop, CORE_COMPONENT)); /* register config file handlers */ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("imdiagserverrun"), 0, eCmdHdlrGetWord, |