diff options
Diffstat (limited to 'tcps_sess.c')
-rw-r--r-- | tcps_sess.c | 116 |
1 files changed, 73 insertions, 43 deletions
diff --git a/tcps_sess.c b/tcps_sess.c index 62d51f66..69b40ad0 100644 --- a/tcps_sess.c +++ b/tcps_sess.c @@ -36,6 +36,7 @@ #include "rsyslog.h" #include "dirty.h" +#include "unicode-helper.h" #include "module-template.h" #include "net.h" #include "tcpsrv.h" @@ -45,6 +46,8 @@ #include "netstrm.h" #include "msg.h" #include "datetime.h" +#include "prop.h" +#include "debug.h" /* static data */ @@ -52,10 +55,14 @@ DEFobjStaticHelpers DEFobjCurrIf(glbl) DEFobjCurrIf(errmsg) DEFobjCurrIf(netstrm) +DEFobjCurrIf(prop) DEFobjCurrIf(datetime) static int iMaxLine; /* maximum size of a single message */ +static int iNbrTimeUsed = 0; /* how often has previous time been used so far? */ + + /* forward definitions */ static rsRetVal Close(tcps_sess_t *pThis); @@ -66,7 +73,7 @@ BEGINobjConstruct(tcps_sess) /* be sure to specify the object type also in END m pThis->bAtStrtOfFram = 1; /* indicate frame header expected */ pThis->eFraming = TCP_FRAMING_OCTET_STUFFING; /* just make sure... */ /* now allocate the message reception buffer */ - CHKmalloc(pThis->pMsg = (uchar*) malloc(sizeof(uchar) * iMaxLine + 1)); + CHKmalloc(pThis->pMsg = (uchar*) MALLOC(sizeof(uchar) * iMaxLine + 1)); finalize_it: ENDobjConstruct(tcps_sess) @@ -98,11 +105,10 @@ CODESTARTobjDestruct(tcps_sess) } /* now destruct our own properties */ if(pThis->fromHost != NULL) - free(pThis->fromHost); + CHKiRet(prop.Destruct(&pThis->fromHost)); if(pThis->fromHostIP != NULL) - free(pThis->fromHostIP); - if(pThis->pMsg != NULL) - free(pThis->pMsg); + CHKiRet(prop.Destruct(&pThis->fromHostIP)); + free(pThis->pMsg); ENDobjDestruct(tcps_sess) @@ -124,12 +130,13 @@ SetHost(tcps_sess_t *pThis, uchar *pszHost) ISOBJ_TYPE_assert(pThis, tcps_sess); - if(pThis->fromHost != NULL) { - free(pThis->fromHost); - } - - pThis->fromHost = pszHost; + if(pThis->fromHost == NULL) + CHKiRet(prop.Construct(&pThis->fromHost)); + + CHKiRet(prop.SetString(pThis->fromHost, pszHost, ustrlen(pszHost))); +finalize_it: + free(pszHost); /* we must free according to our (old) calling conventions */ RETiRet; } @@ -144,12 +151,13 @@ SetHostIP(tcps_sess_t *pThis, uchar *pszHostIP) ISOBJ_TYPE_assert(pThis, tcps_sess); - if(pThis->fromHostIP != NULL) { - free(pThis->fromHostIP); - } - - pThis->fromHostIP = pszHostIP; + if(pThis->fromHostIP == NULL) + CHKiRet(prop.Construct(&pThis->fromHostIP)); + + CHKiRet(prop.SetString(pThis->fromHostIP, pszHostIP, ustrlen(pszHostIP))); +finalize_it: + free(pszHostIP); RETiRet; } @@ -226,11 +234,9 @@ SetOnMsgReceive(tcps_sess_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar * rgerhards, 2009-04-23 */ static rsRetVal -defaultDoSubmitMessage(tcps_sess_t *pThis) +defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, multi_submit_t *pMultiSub) { msg_t *pMsg; - struct syslogTime stTime; - time_t ttGenTime; DEFiRet; ISOBJ_TYPE_assert(pThis, tcps_sess); @@ -240,22 +246,24 @@ defaultDoSubmitMessage(tcps_sess_t *pThis) FINALIZE; } - //TODO: if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) { - datetime.getCurrTime(&stTime, &ttGenTime); - //} /* we now create our own message object and submit it to the queue */ - CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); - /* first trim the buffer to what we have actually received */ - CHKmalloc(pMsg->pszRawMsg = malloc(sizeof(uchar) * pThis->iMsg)); - memcpy(pMsg->pszRawMsg, pThis->pMsg, pThis->iMsg); - pMsg->iLenRawMsg = pThis->iMsg; - MsgSetInputName(pMsg, pThis->pLstnInfo->pszInputName); + CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime)); + MsgSetRawMsg(pMsg, (char*)pThis->pMsg, pThis->iMsg); + MsgSetInputName(pMsg, pThis->pLstnInfo->pInputName); MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; - pMsg->bParseHOSTNAME = 1; MsgSetRcvFrom(pMsg, pThis->fromHost); CHKiRet(MsgSetRcvFromIP(pMsg, pThis->fromHostIP)); - CHKiRet(submitMsg(pMsg)); + MsgSetRuleset(pMsg, pThis->pLstnInfo->pRuleset); + + if(pMultiSub == NULL) { + CHKiRet(submitMsg(pMsg)); + } else { + pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg; + if(pMultiSub->nElem == pMultiSub->maxElem) + CHKiRet(multiSubmitMsg(pMultiSub)); + } + finalize_it: /* reset status variables */ @@ -281,6 +289,8 @@ finalize_it: static rsRetVal PrepareClose(tcps_sess_t *pThis) { + struct syslogTime stTime; + time_t ttGenTime; DEFiRet; ISOBJ_TYPE_assert(pThis, tcps_sess); @@ -306,7 +316,8 @@ PrepareClose(tcps_sess_t *pThis) * this case. */ dbgprintf("Extra data at end of stream in legacy syslog/tcp message - processing\n"); - defaultDoSubmitMessage(pThis); + datetime.getCurrTime(&stTime, &ttGenTime); + defaultDoSubmitMessage(pThis, &stTime, ttGenTime, NULL); } finalize_it: @@ -325,10 +336,11 @@ Close(tcps_sess_t *pThis) ISOBJ_TYPE_assert(pThis, tcps_sess); netstrm.Destruct(&pThis->pStrm); - free(pThis->fromHost); - pThis->fromHost = NULL; /* not really needed, but... */ - free(pThis->fromHostIP); - pThis->fromHostIP = NULL; /* not really needed, but... */ + if(pThis->fromHost != NULL) { + prop.Destruct(&pThis->fromHost); + } + if(pThis->fromHostIP != NULL) + prop.Destruct(&pThis->fromHostIP); RETiRet; } @@ -341,7 +353,7 @@ Close(tcps_sess_t *pThis) * rgerhards, 2008-03-14 */ static rsRetVal -processDataRcvd(tcps_sess_t *pThis, char c) +processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t ttGenTime, multi_submit_t *pMultiSub) { DEFiRet; ISOBJ_TYPE_assert(pThis, tcps_sess); @@ -387,7 +399,7 @@ processDataRcvd(tcps_sess_t *pThis, char c) if(pThis->iMsg >= iMaxLine) { /* emergency, we now need to flush, no matter if we are at end of message or not... */ dbgprintf("error: message received is larger than max msg size, we split it\n"); - defaultDoSubmitMessage(pThis); + defaultDoSubmitMessage(pThis, stTime, ttGenTime, pMultiSub); /* we might think if it is better to ignore the rest of the * message than to treat it as a new one. Maybe this is a good * candidate for a configuration parameter... @@ -395,10 +407,10 @@ processDataRcvd(tcps_sess_t *pThis, char c) */ } - if(( (c == '\n') + if(( ((c == '\n') && !pThis->pSrv->bDisableLFDelim) || ((pThis->pSrv->addtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) && (c == pThis->pSrv->addtlFrameDelim)) ) && pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { /* record delimiter? */ - defaultDoSubmitMessage(pThis); + defaultDoSubmitMessage(pThis, stTime, ttGenTime, pMultiSub); pThis->inputState = eAtStrtFram; } else { /* IMPORTANT: here we copy the actual frame content to the message - for BOTH framing modes! @@ -415,7 +427,7 @@ processDataRcvd(tcps_sess_t *pThis, char c) pThis->iOctetsRemain--; if(pThis->iOctetsRemain < 1) { /* we have end of frame! */ - defaultDoSubmitMessage(pThis); + defaultDoSubmitMessage(pThis, stTime, ttGenTime, pMultiSub); pThis->inputState = eAtStrtFram; } } @@ -436,27 +448,45 @@ processDataRcvd(tcps_sess_t *pThis, char c) * RS_RET_OK, which means the session should be kept open * or anything else, which means it must be closed. * rgerhards, 2008-03-01 + * As a performance optimization, we pick up the timestamp here. Acutally, + * this *is* the *correct* reception step for all the data we received, because + * we have just received a bunch of data! -- rgerhards, 2009-06-16 */ +#define NUM_MULTISUB 1024 static rsRetVal DataRcvd(tcps_sess_t *pThis, char *pData, size_t iLen) { - DEFiRet; + multi_submit_t multiSub; + msg_t *pMsgs[NUM_MULTISUB]; + struct syslogTime stTime; + time_t ttGenTime; char *pEnd; + DEFiRet; ISOBJ_TYPE_assert(pThis, tcps_sess); assert(pData != NULL); assert(iLen > 0); + datetime.getCurrTime(&stTime, &ttGenTime); + multiSub.ppMsgs = pMsgs; + multiSub.maxElem = NUM_MULTISUB; + multiSub.nElem = 0; + /* We now copy the message to the session buffer. */ pEnd = pData + iLen; /* this is one off, which is intensional */ + iNbrTimeUsed = 0; /* full time query */ while(pData < pEnd) { - CHKiRet(processDataRcvd(pThis, *pData++)); + CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime, &multiSub)); } + /* submit anything that was not yet submitted */ + CHKiRet(multiSubmitMsg(&multiSub)); + finalize_it: RETiRet; } +#undef NUM_MULTISUB /* queryInterface function @@ -503,6 +533,7 @@ CODESTARTObjClassExit(tcps_sess) objRelease(errmsg, CORE_COMPONENT); objRelease(netstrm, LM_NETSTRMS_FILENAME); objRelease(datetime, CORE_COMPONENT); + objRelease(prop, CORE_COMPONENT); ENDObjClassExit(tcps_sess) @@ -515,6 +546,7 @@ BEGINObjClassInit(tcps_sess, 1, OBJ_IS_CORE_MODULE) /* class, version - CHANGE c CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(netstrm, LM_NETSTRMS_FILENAME)); CHKiRet(objUse(datetime, CORE_COMPONENT)); + CHKiRet(objUse(prop, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); iMaxLine = glbl.GetMaxLine(); /* get maximum size we currently support */ @@ -525,7 +557,5 @@ BEGINObjClassInit(tcps_sess, 1, OBJ_IS_CORE_MODULE) /* class, version - CHANGE c OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, tcps_sessConstructFinalize); ENDObjClassInit(tcps_sess) - - /* vim:set ai: */ |