diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2012-11-06 17:48:35 +0100 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2012-11-06 17:48:35 +0100 |
commit | 94f6326237404545877c3d3df0edef44e28bcda9 (patch) | |
tree | b7edefc64a85cc3a41ad91e91013d3ad216bd35f /runtime | |
parent | 28047567a2363f83b24b13108c626423dc1f1821 (diff) | |
download | rsyslog-94f6326237404545877c3d3df0edef44e28bcda9.tar.gz rsyslog-94f6326237404545877c3d3df0edef44e28bcda9.tar.bz2 rsyslog-94f6326237404545877c3d3df0edef44e28bcda9.zip |
queue: reduce CPU load for deserializing message properties
Linear runtime due to message order. Was quadratic before. However, not
a big overall improvement.
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/msg.c | 197 | ||||
-rw-r--r-- | runtime/msg.h | 1 | ||||
-rw-r--r-- | runtime/obj.c | 40 | ||||
-rw-r--r-- | runtime/obj.h | 4 | ||||
-rw-r--r-- | runtime/queue.c | 6 | ||||
-rw-r--r-- | runtime/rsyslog.h | 1 |
6 files changed, 226 insertions, 23 deletions
diff --git a/runtime/msg.c b/runtime/msg.c index 5c6d199f..e52d2c14 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -59,6 +59,7 @@ #include "ruleset.h" #include "prop.h" #include "net.h" +#include "var.h" #include "rsconf.h" /* static data */ @@ -68,6 +69,7 @@ DEFobjCurrIf(glbl) DEFobjCurrIf(regexp) DEFobjCurrIf(prop) DEFobjCurrIf(net) +DEFobjCurrIf(var) static struct { uchar *pszName; @@ -413,6 +415,16 @@ rsRetVal MsgEnableThreadSafety(void) /* end locking functions */ +/* rgerhards 2012-04-18: set associated ruleset (by ruleset name) + * If ruleset cannot be found, no update is done. + */ +static void +MsgSetRulesetByName(msg_t *pMsg, cstr_t *rulesetName) +{ + rulesetGetRuleset(runConf, &(pMsg->pRuleset), rsCStrGetSzStrNoNULL(rulesetName)); +} + + static inline int getProtocolVersion(msg_t *pM) { return(pM->iProtocolVersion); @@ -1133,6 +1145,167 @@ finalize_it: } +/* This is a helper for MsgDeserialize that re-inits the var object. This + * whole construct should be replaced, var is really ready to be retired. + * But as an interim help during refactoring let's introduce this function + * here (and thus NOT as method of var object!). -- rgerhads, 2012-11-06 + */ +static inline void +reinitVar(var_t *pVar) +{ + rsCStrDestruct(&pVar->pcsName); /* no longer needed */ + if(pVar->varType == VARTYPE_STR) { + if(pVar->val.pStr != NULL) + rsCStrDestruct(&pVar->val.pStr); + } +} +/* deserialize the message again + * we deserialize the properties in the same order that we serialized them. Except + * for some checks to cover downlevel version, we do not need to do all these + * CPU intense name checkings. + */ +#define isProp(name) !rsCStrSzStrCmp(pVar->pcsName, (uchar*) name, sizeof(name) - 1) +rsRetVal +MsgDeserialize(msg_t *pMsg, strm_t *pStrm) +{ + prop_t *myProp; + prop_t *propRcvFrom = NULL; + prop_t *propRcvFromIP = NULL; + struct json_tokener *tokener; + struct json_object *json; + var_t *pVar = NULL; + DEFiRet; + + ISOBJ_TYPE_assert(pStrm, strm); + + CHKiRet(var.Construct(&pVar)); + CHKiRet(var.ConstructFinalize(pVar)); + + CHKiRet(objDeserializeProperty(pVar, pStrm)); + if(isProp("iProtocolVersion")) { + setProtocolVersion(pMsg, pVar->val.num); + reinitVar(pVar); + CHKiRet(objDeserializeProperty(pVar, pStrm)); + } + if(isProp("iSeverity")) { + pMsg->iSeverity = pVar->val.num; + reinitVar(pVar); + CHKiRet(objDeserializeProperty(pVar, pStrm)); + } + if(isProp("iFacility")) { + pMsg->iFacility = pVar->val.num; + reinitVar(pVar); + CHKiRet(objDeserializeProperty(pVar, pStrm)); + } + if(isProp("msgFlags")) { + pMsg->msgFlags = pVar->val.num; + reinitVar(pVar); + CHKiRet(objDeserializeProperty(pVar, pStrm)); + } + if(isProp("ttGenTime")) { + pMsg->ttGenTime = pVar->val.num; + reinitVar(pVar); + CHKiRet(objDeserializeProperty(pVar, pStrm)); + } + if(isProp("tRcvdAt")) { + memcpy(&pMsg->tRcvdAt, &pVar->val.vSyslogTime, sizeof(struct syslogTime)); + reinitVar(pVar); + CHKiRet(objDeserializeProperty(pVar, pStrm)); + } + if(isProp("tTIMESTAMP")) { + memcpy(&pMsg->tRcvdAt, &pVar->val.vSyslogTime, sizeof(struct syslogTime)); + reinitVar(pVar); + CHKiRet(objDeserializeProperty(pVar, pStrm)); + } + if(isProp("pszTAG")) { + MsgSetTAG(pMsg, rsCStrGetSzStrNoNULL(pVar->val.pStr), cstrLen(pVar->val.pStr)); + reinitVar(pVar); + CHKiRet(objDeserializeProperty(pVar, pStrm)); + } + if(isProp("pszRawMsg")) { + MsgSetRawMsg(pMsg, (char*) rsCStrGetSzStrNoNULL(pVar->val.pStr), cstrLen(pVar->val.pStr)); + reinitVar(pVar); + CHKiRet(objDeserializeProperty(pVar, pStrm)); + } + if(isProp("pszHOSTNAME")) { + MsgSetHOSTNAME(pMsg, rsCStrGetSzStrNoNULL(pVar->val.pStr), rsCStrLen(pVar->val.pStr)); + reinitVar(pVar); + CHKiRet(objDeserializeProperty(pVar, pStrm)); + } + if(isProp("pszInputName")) { + /* we need to create a property */ + CHKiRet(prop.Construct(&myProp)); + CHKiRet(prop.SetString(myProp, rsCStrGetSzStrNoNULL(pVar->val.pStr), rsCStrLen(pVar->val.pStr))); + CHKiRet(prop.ConstructFinalize(myProp)); + MsgSetInputName(pMsg, myProp); + prop.Destruct(&myProp); + reinitVar(pVar); + CHKiRet(objDeserializeProperty(pVar, pStrm)); + } + if(isProp("pszRcvFrom")) { + MsgSetRcvFromStr(pMsg, rsCStrGetSzStrNoNULL(pVar->val.pStr), rsCStrLen(pVar->val.pStr), &propRcvFrom); + prop.Destruct(&propRcvFrom); + reinitVar(pVar); + CHKiRet(objDeserializeProperty(pVar, pStrm)); + } + if(isProp("pszRcvFromIP")) { + MsgSetRcvFromIPStr(pMsg, rsCStrGetSzStrNoNULL(pVar->val.pStr), rsCStrLen(pVar->val.pStr), &propRcvFromIP); + prop.Destruct(&propRcvFromIP); + reinitVar(pVar); + CHKiRet(objDeserializeProperty(pVar, pStrm)); + } + if(isProp("json")) { + tokener = json_tokener_new(); + json = json_tokener_parse_ex(tokener, (char*)rsCStrGetSzStrNoNULL(pVar->val.pStr), + cstrLen(pVar->val.pStr)); + reinitVar(pVar); + CHKiRet(objDeserializeProperty(pVar, pStrm)); + } + if(isProp("pCSStrucData")) { + MsgSetStructuredData(pMsg, (char*) rsCStrGetSzStrNoNULL(pVar->val.pStr)); + reinitVar(pVar); + CHKiRet(objDeserializeProperty(pVar, pStrm)); + } + if(isProp("pCSAPPNAME")) { + MsgSetAPPNAME(pMsg, (char*) rsCStrGetSzStrNoNULL(pVar->val.pStr)); + reinitVar(pVar); + CHKiRet(objDeserializeProperty(pVar, pStrm)); + } + if(isProp("pCSPROCID")) { + MsgSetPROCID(pMsg, (char*) rsCStrGetSzStrNoNULL(pVar->val.pStr)); + reinitVar(pVar); + CHKiRet(objDeserializeProperty(pVar, pStrm)); + } + if(isProp("pCSMSGID")) { + MsgSetMSGID(pMsg, (char*) rsCStrGetSzStrNoNULL(pVar->val.pStr)); + reinitVar(pVar); + CHKiRet(objDeserializeProperty(pVar, pStrm)); + } + if(isProp("pszUUID")) { + pMsg->pszUUID = ustrdup(rsCStrGetSzStrNoNULL(pVar->val.pStr)); + reinitVar(pVar); + CHKiRet(objDeserializeProperty(pVar, pStrm)); + } + if(isProp("pszRuleset")) { + MsgSetRulesetByName(pMsg, pVar->val.pStr); + reinitVar(pVar); + CHKiRet(objDeserializeProperty(pVar, pStrm)); + } + /* "offMSG" must always be our last field, so we use this as an + * indicator if the sequence is correct. This is a bit questionable, + * but on the other hand it works decently AND we will probably replace + * the whole persisted format soon in any case. -- rgerhards, 2012-11-06 + */ + if(!isProp("offMSG")) + ABORT_FINALIZE(RS_RET_DS_PROP_SEQ_ERR); + MsgSetMSGoffs(pMsg, pVar->val.num); +finalize_it: + if(pVar != NULL) + var.Destruct(&pVar); + RETiRet; +} + + /* Increment reference count - see description of the "msg" * structure for details. As a convenience to developers, * this method returns the msg pointer that is passed to it. @@ -1799,16 +1972,6 @@ void MsgSetRuleset(msg_t *pMsg, ruleset_t *pRuleset) } -/* rgerhards 2012-04-18: set associated ruleset (by ruleset name) - * If ruleset cannot be found, no update is done. - */ -static void -MsgSetRulesetByName(msg_t *pMsg, cstr_t *rulesetName) -{ - rulesetGetRuleset(runConf, &(pMsg->pRuleset), rsCStrGetSzStrNoNULL(rulesetName)); -} - - /* set TAG in msg object * (rewritten 2009-06-18 rgerhards) */ @@ -3594,6 +3757,7 @@ msgGetMsgVarNew(msg_t *pThis, uchar *name) * change over time). * rgerhards, 2008-01-07 */ +#undef isProp #define isProp(name) !rsCStrSzStrCmp(pProp->pcsName, (uchar*) name, sizeof(name) - 1) rsRetVal MsgSetProperty(msg_t *pThis, var_t *pProp) { @@ -3951,25 +4115,25 @@ done: return dst; rsRetVal -msgSetJSONFromVar(msg_t *pMsg, uchar *varname, struct var *var) +msgSetJSONFromVar(msg_t *pMsg, uchar *varname, struct var *v) { struct json_object *json = NULL; char *cstr; DEFiRet; - switch(var->datatype) { + switch(v->datatype) { case 'S':/* string */ - cstr = es_str2cstr(var->d.estr, NULL); + cstr = es_str2cstr(v->d.estr, NULL); json = json_object_new_string(cstr); free(cstr); break; case 'N':/* number (integer) */ - json = json_object_new_int((int) var->d.n); + json = json_object_new_int((int) v->d.n); break; case 'J':/* native JSON */ - json = jsonDeepCopy(var->d.json); + json = jsonDeepCopy(v->d.json); break; default:DBGPRINTF("msgSetJSONFromVar: unsupported datatype %c\n", - var->datatype); + v->datatype); ABORT_FINALIZE(RS_RET_ERR); } msgAddJSON(pMsg, varname+1, json); @@ -3989,6 +4153,7 @@ BEGINObjClassInit(msg, 1, OBJ_IS_CORE_MODULE) CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); + CHKiRet(objUse(var, CORE_COMPONENT)); /* set our own handlers */ OBJSetMethodHandler(objMethod_SERIALIZE, MsgSerialize); diff --git a/runtime/msg.h b/runtime/msg.h index 950de559..97bcc645 100644 --- a/runtime/msg.h +++ b/runtime/msg.h @@ -191,6 +191,7 @@ es_str_t* msgGetCEEVarNew(msg_t *pMsg, char *name); rsRetVal msgAddJSON(msg_t *pM, uchar *name, struct json_object *json); rsRetVal getCEEPropVal(msg_t *pM, es_str_t *propName, uchar **pRes, rs_size_t *buflen, unsigned short *pbMustBeFreed); rsRetVal MsgGetSeverity(msg_t *pThis, int *piSeverity); +rsRetVal MsgDeserialize(msg_t *pMsg, strm_t *pStrm); /* TODO: remove these five (so far used in action.c) */ uchar *getMSG(msg_t *pM); diff --git a/runtime/obj.c b/runtime/obj.c index b100a522..7f4800fd 100644 --- a/runtime/obj.c +++ b/runtime/obj.c @@ -604,7 +604,7 @@ finalize_it: /* Deserialize a single property. Pointer must be positioned at begin of line. Whole line * up until the \n is read. */ -static rsRetVal objDeserializeProperty(var_t *pProp, strm_t *pStrm) +rsRetVal objDeserializeProperty(var_t *pProp, strm_t *pStrm) { DEFiRet; number_t i; @@ -877,7 +877,7 @@ finalize_it: * rgerhards, 2012-11-03 */ rsRetVal -objDeserializeWithMethods(void *ppObj, uchar *pszTypeExpected, int lenTypeExpected, strm_t *pStrm, rsRetVal (*fFixup)(obj_t*,void*), void *pUsr, rsRetVal (*objConstruct)(), rsRetVal (*objConstructFinalize)(), rsRetVal (*objSetProperty)()) +objDeserializeWithMethods(void *ppObj, uchar *pszTypeExpected, int lenTypeExpected, strm_t *pStrm, rsRetVal (*fFixup)(obj_t*,void*), void *pUsr, rsRetVal (*objConstruct)(), rsRetVal (*objConstructFinalize)(), rsRetVal (*objDeserialize)()) { DEFiRet; rsRetVal iRetLocal; @@ -910,7 +910,8 @@ objDeserializeWithMethods(void *ppObj, uchar *pszTypeExpected, int lenTypeExpect CHKiRet(objConstruct(&pObj)); /* we got the object, now we need to fill the properties */ - CHKiRet(objDeserializeProperties(pObj, objSetProperty, pStrm)); + CHKiRet(objDeserialize(pObj, pStrm)); + CHKiRet(objDeserializeTrailer(pStrm)); /* do trailer checks */ /* check if we need to call a fixup function that modifies the object * before it is finalized. -- rgerhards, 2008-01-13 @@ -933,6 +934,39 @@ finalize_it: RETiRet; } +/* This is a dummy deserializer, to be used for the delete queue reader + * specifically. This is kind of a hack, but also to be replace (hopefully) soon + * by totally different code. So let's make it as simple as possible... + * rgerhards, 2012-11-06 + */ +rsRetVal +objDeserializeDummy(obj_t __attribute__((unused)) *pObj, strm_t *pStrm) +{ + DEFiRet; + var_t *pVar = NULL; + + CHKiRet(var.Construct(&pVar)); + CHKiRet(var.ConstructFinalize(pVar)); + + iRet = objDeserializeProperty(pVar, pStrm); + while(iRet == RS_RET_OK) { + /* this loop does actually NOGHTING but read the file... */ + /* re-init var object - TODO: method of var! */ + rsCStrDestruct(&pVar->pcsName); /* no longer needed */ + if(pVar->varType == VARTYPE_STR) { + if(pVar->val.pStr != NULL) + rsCStrDestruct(&pVar->val.pStr); + } + iRet = objDeserializeProperty(pVar, pStrm); + } +finalize_it: + if(iRet == RS_RET_NO_PROPLINE) + iRet = RS_RET_OK; /* NO_PROPLINE is OK and a kind of EOF! */ + if(pVar != NULL) + var.Destruct(&pVar); + RETiRet; +} + /* De-Serialize an object, but treat it as property bag. * rgerhards, 2008-01-11 diff --git a/runtime/obj.h b/runtime/obj.h index 72f806da..27d32b7a 100644 --- a/runtime/obj.h +++ b/runtime/obj.h @@ -118,7 +118,9 @@ ENDinterface(obj) rsRetVal objGetObjInterface(obj_if_t *pIf); PROTOTYPEObjClassInit(obj); PROTOTYPEObjClassExit(obj); -rsRetVal objDeserializeWithMethods(void *ppObj, uchar *pszTypeExpected, int lenTypeExpected, strm_t *pStrm, rsRetVal (*fFixup)(obj_t*,void*), void *pUsr, rsRetVal (*objConstruct)(), rsRetVal (*objConstructFinalize)(), rsRetVal (*objSetProperty)()); +rsRetVal objDeserializeWithMethods(void *ppObj, uchar *pszTypeExpected, int lenTypeExpected, strm_t *pStrm, rsRetVal (*fFixup)(obj_t*,void*), void *pUsr, rsRetVal (*objConstruct)(), rsRetVal (*objConstructFinalize)(), rsRetVal (*objDeserialize)()); +rsRetVal objDeserializeProperty(var_t *pProp, strm_t *pStrm); +rsRetVal objDeserializeDummy(obj_t *pObj, strm_t *pStrm); /* the following definition is only for "friends" */ diff --git a/runtime/queue.c b/runtime/queue.c index 9113ccb5..802a9340 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -922,7 +922,7 @@ static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppMsg) { DEFiRet; iRet = objDeserializeWithMethods(ppMsg, (uchar*) "msg", 3, pThis->tVars.disk.pReadDeq, NULL, - NULL, msgConstructForDeserializer, msgConstructFinalizer, MsgSetProperty); + NULL, msgConstructForDeserializer, msgConstructFinalizer, MsgDeserialize); RETiRet; } @@ -940,7 +940,7 @@ qDelDiskCallbackDummy(void) } static rsRetVal qDelDisk(qqueue_t *pThis) { - obj_t *pDummyObj; /* another dummy, nothing is created */ + msg_t *pDummyObj; /* another dummy, nothing is created */ DEFiRet; int64 offsIn; @@ -948,7 +948,7 @@ static rsRetVal qDelDisk(qqueue_t *pThis) CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsIn)); CHKiRet(objDeserializeWithMethods(&pDummyObj, (uchar*) "msg", 3, pThis->tVars.disk.pReadDel, - NULL, NULL, qDelDiskCallbackDummy, qDelDiskCallbackDummy, qDelDiskCallbackDummy)); + NULL, NULL, qDelDiskCallbackDummy, qDelDiskCallbackDummy, objDeserializeDummy)); CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsOut)); /* This time it is a bit tricky: we free disk space only upon file deletion. So we need diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index 0bf0642a..a826decb 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -395,6 +395,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_INVLD_SETOP = -2305, /**< invalid variable set operation, incompatible type */ RS_RET_RULESET_EXISTS = -2306,/**< ruleset already exists */ RS_RET_DEPRECATED = -2307,/**< deprecated functionality is used */ + RS_RET_DS_PROP_SEQ_ERR = -2308,/**< property sequence error deserializing object */ /* RainerScript error messages (range 1000.. 1999) */ RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */ |