From 93f873277bfe5ebb309ff5e92f5dc7244ebd9f1a Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 18 May 2009 17:28:34 +0200 Subject: t-delete list implemented, queue store drivers updated... ... on the way to the ultra-reliable queue modes (redesign doc). This version does not really work, but is a good commit point. Next comes queue size calculation. DA mode does not yet work. --- runtime/stream.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index f1f69cc8..50d419be 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -99,7 +99,10 @@ static rsRetVal strmOpenFile(strm_t *pThis) pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode); if(pThis->fd == -1) { int ierrnoSave = errno; - dbgoprint((obj_t*) pThis, "open error %d, file '%s'\n", errno, pThis->pszCurrFName); + char errStr[1024]; + dbgoprint((obj_t*) pThis, "open error[%d]: '%s'; file '%s'/%s\n", errno, + rs_strerror_r(errno, errStr, sizeof(errStr)), pThis->pszCurrFName, + (pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE"); if(ierrnoSave == ENOENT) ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); else -- cgit v1.2.3 From aa9426f683fa6af9280bc63050ee0187ba4c57e1 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 26 May 2009 12:43:43 +0200 Subject: solved design issue with queue termination ... and also improved the test suite. There is a design issue in the v3 queue engine that manifested to some serious problems with the new processing mode. However, in v3 shutdown may take eternally if a queue runs in DA mode, is configured to preserve data AND the action fails and retries immediately. There is no cure available for v3, it would require doing much of the work we have done on the new engine. The window of exposure, as one might guess from the description, is very small. That is probably the reason why we have not seen it in practice. --- runtime/stream.c | 44 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index 50d419be..59e8be3a 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -45,6 +45,7 @@ #include "srUtils.h" #include "obj.h" #include "stream.h" +#include "unicode-helper.h" /* static data */ DEFobjStaticHelpers @@ -80,7 +81,7 @@ static rsRetVal strmOpenFile(strm_t *pThis) pThis->pszFName, pThis->lenFName, pThis->iCurrFNum, pThis->iFileNumDigits)); } else { if(pThis->pszDir == NULL) { - if((pThis->pszCurrFName = (uchar*) strdup((char*) pThis->pszFName)) == NULL) + if((pThis->pszCurrFName = ustrdup(pThis->pszFName)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } else { CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir, @@ -814,6 +815,47 @@ finalize_it: } +/* duplicate a stream object excluding dynamic properties. This function is + * primarily meant to provide a duplicate that later on can be used to access + * the data. This is needed, for example, for a restart of the disk queue. + * Note that ConstructFinalize() is NOT called. So our caller may change some + * properties before finalizing things. + * rgerhards, 2009-05-26 + */ +rsRetVal +strmDup(strm_t *pThis, strm_t **ppNew) +{ + strm_t *pNew = NULL; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, strm); + assert(ppNew != NULL); + + CHKiRet(strmConstruct(&pNew)); + pNew->sType = pThis->sType; + pNew->iCurrFNum = pThis->iCurrFNum; + CHKmalloc(pNew->pszFName = ustrdup(pThis->pszFName)); + pNew->lenFName = pThis->lenFName; + CHKmalloc(pNew->pszDir = ustrdup(pThis->pszDir)); + pNew->lenDir = pThis->lenDir; + pNew->tOperationsMode = pThis->tOperationsMode; + pNew->tOpenMode = pThis->tOpenMode; + pNew->iAddtlOpenFlags = pThis->iAddtlOpenFlags; + pNew->iMaxFileSize = pThis->iMaxFileSize; + pNew->iMaxFiles = pThis->iMaxFiles; + pNew->iFileNumDigits = pThis->iFileNumDigits; + pNew->bDeleteOnClose = pThis->bDeleteOnClose; + pNew->iCurrOffs = pThis->iCurrOffs; + + *ppNew = pNew; + pNew = NULL; + +finalize_it: + if(pNew != NULL) + strmDestruct(&pNew); + + RETiRet; +} /* set a user write-counter. This counter is initialized to zero and * receives the number of bytes written. It is accurate only after a -- cgit v1.2.3 From 9e434f19a9baa4a6f411808b5cb6bc22d6a32781 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 4 Jun 2009 12:15:59 +0200 Subject: cleaned up stream class ... ... and also made it callable via an rsyslog interface rather then relying on the OS loader (important if we go for using it inside loadbale modules, which we soon possible will) --- runtime/stream.c | 100 ++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 62 insertions(+), 38 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index f1f69cc8..261eb9ca 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -45,10 +45,17 @@ #include "srUtils.h" #include "obj.h" #include "stream.h" +#include "unicode-helper.h" +#include "module-template.h" /* static data */ DEFobjStaticHelpers +/* forward definitions */ +static rsRetVal strmFlush(strm_t *pThis); +static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); + + /* methods */ /* first, we define type-specific handlers. The provide a generic functionality, @@ -234,10 +241,6 @@ strmHandleEOF(strm_t *pThis) case STREAMTYPE_FILE_CIRCULAR: /* we have multiple files and need to switch to the next one */ /* TODO: think about emulating EOF in this case (not yet needed) */ -#if 0 - if(pThis->iMaxFiles == 0) /* TODO: why do we need this? ;) */ - ABORT_FINALIZE(RS_RET_EOF); -#endif dbgoprint((obj_t*) pThis, "file %d EOF\n", pThis->fd); CHKiRet(strmNextFile(pThis)); break; @@ -295,7 +298,7 @@ finalize_it: * NOTE: needs to be enhanced to support sticking with a strm entry (if not * deleted). */ -rsRetVal strmReadChar(strm_t *pThis, uchar *pC) +static rsRetVal strmReadChar(strm_t *pThis, uchar *pC) { DEFiRet; @@ -329,7 +332,7 @@ finalize_it: * character buffering capability. * rgerhards, 2008-01-07 */ -rsRetVal strmUnreadChar(strm_t *pThis, uchar c) +static rsRetVal strmUnreadChar(strm_t *pThis, uchar c) { ASSERT(pThis != NULL); ASSERT(pThis->iUngetC == -1); @@ -351,7 +354,7 @@ rsRetVal strmUnreadChar(strm_t *pThis, uchar c) * are pthread_killed() upon termination. So if we use their native pointer, they * can cleanup (but only then). */ -rsRetVal +static rsRetVal strmReadLine(strm_t *pThis, cstr_t **ppCStr) { DEFiRet; @@ -393,7 +396,7 @@ ENDobjConstruct(strm) /* ConstructionFinalizer * rgerhards, 2008-01-09 */ -rsRetVal strmConstructFinalize(strm_t *pThis) +static rsRetVal strmConstructFinalize(strm_t *pThis) { DEFiRet; @@ -420,14 +423,10 @@ CODESTARTobjDestruct(strm) if(pThis->fd != -1) strmCloseFile(pThis); - if(pThis->pszDir != NULL) - free(pThis->pszDir); - if(pThis->pIOBuf != NULL) - free(pThis->pIOBuf); - if(pThis->pszCurrFName != NULL) - free(pThis->pszCurrFName); - if(pThis->pszFName != NULL) - free(pThis->pszFName); + free(pThis->pszDir); + free(pThis->pIOBuf); + free(pThis->pszCurrFName); + free(pThis->pszFName); ENDobjDestruct(strm) @@ -453,6 +452,7 @@ finalize_it: RETiRet; } + /* write memory buffer to a stream object. * To support direct writes of large objects, this method may be called * with a buffer pointing to some region other than the stream buffer itself. @@ -503,7 +503,8 @@ finalize_it: * and is automatically called when the output buffer is full. * rgerhards, 2008-01-10 */ -rsRetVal strmFlush(strm_t *pThis) +static rsRetVal +strmFlush(strm_t *pThis) { DEFiRet; @@ -545,7 +546,7 @@ static rsRetVal strmSeek(strm_t *pThis, off_t offs) /* seek to current offset. This is primarily a helper to readjust the OS file * pointer after a strm object has been deserialized. */ -rsRetVal strmSeekCurrOffs(strm_t *pThis) +static rsRetVal strmSeekCurrOffs(strm_t *pThis) { DEFiRet; @@ -558,7 +559,7 @@ rsRetVal strmSeekCurrOffs(strm_t *pThis) /* write a *single* character to a stream object -- rgerhards, 2008-01-10 */ -rsRetVal strmWriteChar(strm_t *pThis, uchar c) +static rsRetVal strmWriteChar(strm_t *pThis, uchar c) { DEFiRet; @@ -578,7 +579,7 @@ finalize_it: /* write an integer value (actually a long) to a stream object */ -rsRetVal strmWriteLong(strm_t *pThis, long i) +static rsRetVal strmWriteLong(strm_t *pThis, long i) { DEFiRet; uchar szBuf[32]; @@ -595,7 +596,8 @@ finalize_it: /* write memory buffer to a stream object */ -rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +static rsRetVal +strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) { DEFiRet; size_t iPartial; @@ -645,14 +647,14 @@ DEFpropSetMeth(strm, tOperationsMode, int) DEFpropSetMeth(strm, tOpenMode, mode_t) DEFpropSetMeth(strm, sType, strmType_t) -rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) +static rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) { pThis->iMaxFiles = iNewVal; pThis->iFileNumDigits = getNumberDigits(iNewVal); return RS_RET_OK; } -rsRetVal strmSetiAddtlOpenFlags(strm_t *pThis, int iNewVal) +static rsRetVal strmSetiAddtlOpenFlags(strm_t *pThis, int iNewVal) { DEFiRet; @@ -671,7 +673,7 @@ finalize_it: * it any longer, it must free it. * rgerhards, 2008-01-09 */ -rsRetVal +static rsRetVal strmSetFName(strm_t *pThis, uchar *pszName, size_t iLenName) { DEFiRet; @@ -701,7 +703,7 @@ finalize_it: * it any longer, it must free it. * rgerhards, 2008-01-09 */ -rsRetVal +static rsRetVal strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir) { DEFiRet; @@ -745,7 +747,7 @@ finalize_it: * * rgerhards, 2008-01-10 */ -rsRetVal strmRecordBegin(strm_t *pThis) +static rsRetVal strmRecordBegin(strm_t *pThis) { ASSERT(pThis != NULL); ASSERT(pThis->bInRecord == 0); @@ -753,7 +755,7 @@ rsRetVal strmRecordBegin(strm_t *pThis) return RS_RET_OK; } -rsRetVal strmRecordEnd(strm_t *pThis) +static rsRetVal strmRecordEnd(strm_t *pThis) { DEFiRet; ASSERT(pThis != NULL); @@ -775,7 +777,7 @@ rsRetVal strmRecordEnd(strm_t *pThis) * We do not serialize the dynamic properties. * rgerhards, 2008-01-10 */ -rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm) +static rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm) { DEFiRet; int i; @@ -821,7 +823,7 @@ finalize_it: * any new set overwrites the previous one. * rgerhards, 2008-02-27 */ -rsRetVal +static rsRetVal strmSetWCntr(strm_t *pThis, number_t *pWCnt) { DEFiRet; @@ -841,8 +843,8 @@ strmSetWCntr(strm_t *pThis, number_t *pWCnt) /* This function can be used as a generic way to set properties. * rgerhards, 2008-01-11 */ -#define isProp(name) !rsCStrSzStrCmp(pProp->pcsName, (uchar*) name, sizeof(name) - 1) -rsRetVal strmSetProperty(strm_t *pThis, var_t *pProp) +#define isProp(name) !rsCStrSzStrCmp(pProp->pcsName, UCHAR_CONSTANT(name), sizeof(name) - 1) +static rsRetVal strmSetProperty(strm_t *pThis, var_t *pProp) { DEFiRet; @@ -881,7 +883,7 @@ finalize_it: * reported on the second call may actually be lower than on the first call. This is due to * file circulation. A caller must deal with that. -- rgerhards, 2008-01-30 */ -rsRetVal +static rsRetVal strmGetCurrOffset(strm_t *pThis, int64 *pOffs) { DEFiRet; @@ -909,8 +911,33 @@ CODESTARTobjQueryInterface(strm) * work here (if we can support an older interface version - that, * of course, also affects the "if" above). */ - /*xxxpIf->oID = OBJvm; SAMPLE */ - + pIf->Construct = strmConstruct; + pIf->ConstructFinalize = strmConstructFinalize; + pIf->Destruct = strmDestruct; + pIf->ReadChar = strmReadChar; + pIf->UnreadChar = strmUnreadChar; + pIf->ReadLine = strmReadLine; + pIf->SeekCurrOffs = strmSeekCurrOffs; + pIf->Write = strmWrite; + pIf->WriteChar = strmWriteChar; + pIf->WriteLong = strmWriteLong; + pIf->SetFName = strmSetFName; + pIf->SetDir = strmSetDir; + pIf->Flush = strmFlush; + pIf->RecordBegin = strmRecordBegin; + pIf->RecordEnd = strmRecordEnd; + pIf->Serialize = strmSerialize; + pIf->SetiAddtlOpenFlags = strmSetiAddtlOpenFlags; + pIf->GetCurrOffset = strmGetCurrOffset; + pIf->SetWCntr = strmSetWCntr; + /* set methods */ + pIf->SetbDeleteOnClose = strmSetbDeleteOnClose; + pIf->SetiMaxFileSize = strmSetiMaxFileSize; + pIf->SetiMaxFiles = strmSetiMaxFiles; + pIf->SetiFileNumDigits = strmSetiFileNumDigits; + pIf->SettOperationsMode = strmSettOperationsMode; + pIf->SettOpenMode = strmSettOpenMode; + pIf->SetsType = strmSetsType; finalize_it: ENDobjQueryInterface(strm) @@ -921,13 +948,10 @@ ENDobjQueryInterface(strm) */ BEGINObjClassInit(strm, 1, OBJ_IS_CORE_MODULE) /* request objects we use */ - OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize); OBJSetMethodHandler(objMethod_SETPROPERTY, strmSetProperty); OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, strmConstructFinalize); ENDObjClassInit(strm) - -/* - * vi:set ai: +/* vi:set ai: */ -- cgit v1.2.3 From f2800ba261d2fb7466cbdebbf80afe92f0bffd3d Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 4 Jun 2009 15:10:24 +0200 Subject: modified stream class and omfile to work with it now some basic operations are carried out via the stream class. --- runtime/stream.c | 168 +++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 144 insertions(+), 24 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index 261eb9ca..abcfce0c 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -1,4 +1,3 @@ -//TODO: O_TRUC mode! /* The serial stream class. * * A serial stream provides serial data access. In theory, serial streams @@ -50,6 +49,7 @@ /* static data */ DEFobjStaticHelpers +DEFobjCurrIf(zlibw) /* forward definitions */ static rsRetVal strmFlush(strm_t *pThis); @@ -74,7 +74,6 @@ static rsRetVal strmOpenFile(strm_t *pThis) int iFlags; ASSERT(pThis != NULL); - ASSERT(pThis->tOperationsMode == STREAMMODE_READ || pThis->tOperationsMode == STREAMMODE_WRITE); if(pThis->fd != -1) ABORT_FINALIZE(RS_RET_OK); @@ -95,13 +94,27 @@ static rsRetVal strmOpenFile(strm_t *pThis) } } +RUNLOG_VAR("%d", pThis->tOperationsMode); /* compute which flags we need to provide to open */ - if(pThis->tOperationsMode == STREAMMODE_READ) - iFlags = O_RDONLY; - else - iFlags = O_WRONLY | O_CREAT; + switch(pThis->tOperationsMode) { + case STREAMMODE_READ: + iFlags = O_CLOEXEC | O_NOCTTY | O_RDONLY; + break; + case STREAMMODE_WRITE: /* legacy mode used inside queue engine */ + iFlags = O_CLOEXEC | O_NOCTTY | O_WRONLY | O_CREAT; + break; + case STREAMMODE_WRITE_TRUNC: + iFlags = O_CLOEXEC | O_NOCTTY | O_WRONLY | O_CREAT | O_TRUNC; + break; + case STREAMMODE_WRITE_APPEND: + iFlags = O_CLOEXEC | O_NOCTTY | O_WRONLY | O_CREAT | O_APPEND; + break; + default:assert(0); + break; + } - iFlags |= pThis->iAddtlOpenFlags; + iFlags |= pThis->iAddtlOpenFlags; // TODO: remove this! +dbgprintf("XXX: open with flags %d\n", iFlags); pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode); if(pThis->fd == -1) { @@ -135,7 +148,7 @@ static rsRetVal strmCloseFile(strm_t *pThis) ASSERT(pThis->fd != -1); dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd); - if(pThis->tOperationsMode == STREAMMODE_WRITE) + if(pThis->tOperationsMode != STREAMMODE_READ) strmFlush(pThis); close(pThis->fd); // TODO: error check @@ -398,14 +411,25 @@ ENDobjConstruct(strm) */ static rsRetVal strmConstructFinalize(strm_t *pThis) { + rsRetVal localRet; DEFiRet; ASSERT(pThis != NULL); - if(pThis->pIOBuf == NULL) { /* allocate our io buffer in case we have not yet */ - if((pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - pThis->iBufPtrMax = 0; /* results in immediate read request */ + CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); + pThis->iBufPtrMax = 0; /* results in immediate read request */ + if(pThis->iZipLevel) { /* do we need a zip buf? */ + localRet = objUse(zlibw, LM_ZLIBW_FILENAME); + if(localRet != RS_RET_OK) { + pThis->iZipLevel = 0; + DBGPRINTF("stream was requested with zip mode, but zlibw module unavailable (%d) - using " + "without zip\n", localRet); + } else { + /* we use the same size as the original buf, as we would like + * to make sure we can write out everyting with a SINGLE api call! + */ + CHKmalloc(pThis->pZipBuf = (Bytef*) malloc(sizeof(uchar) * pThis->sIOBufSize)); + } } finalize_it: @@ -416,15 +440,20 @@ finalize_it: /* destructor for the strm object */ BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(strm) - if(pThis->tOperationsMode == STREAMMODE_WRITE) + if(pThis->tOperationsMode != STREAMMODE_READ) strmFlush(pThis); /* ... then free resources */ if(pThis->fd != -1) strmCloseFile(pThis); + if(pThis->iZipLevel) { /* do we need a zip buf? */ + objRelease(zlibw, LM_ZLIBW_FILENAME); + } + free(pThis->pszDir); free(pThis->pIOBuf); + free(pThis->pZipBuf); free(pThis->pszCurrFName); free(pThis->pszFName); ENDobjDestruct(strm) @@ -453,20 +482,17 @@ finalize_it: } -/* write memory buffer to a stream object. - * To support direct writes of large objects, this method may be called - * with a buffer pointing to some region other than the stream buffer itself. - * However, in that case the stream buffer must be empty (strmFlush() has to - * be called before), because we would otherwise mess up with the sequence - * inside the stream. -- rgerhards, 2008-01-10 +/* physically write to the output file. the provided data is ready for + * writing (e.g. zipped if we are requested to do that). + * rgerhards, 2009-06-04 */ -static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) +static rsRetVal +strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) { DEFiRet; int iWritten; ASSERT(pThis != NULL); - ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); if(pThis->fd == -1) CHKiRet(strmOpenFile(pThis)); @@ -499,6 +525,95 @@ finalize_it: } +/* write the output buffer in zip mode + * This means we compress it first and then do a physical write. + * Note that we always do a full deflateInit ... deflate ... deflateEnd + * sequence. While this is not optimal, we need to do it because we need + * to ensure that the file is readable even when we are aborted. Doing the + * full sequence brings us as far towards this goal as possible (and not + * doing it would be a total failure). It may be worth considering to + * add a config switch so that the user can decide the risk he is ready + * to take, but so far this is not yet implemented (not even requested ;)). + * rgerhards, 2009-06-04 + */ +static rsRetVal +doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + z_stream zstrm; + int zRet; /* zlib return state */ + DEFiRet; + assert(pThis != NULL); + assert(pBuf != NULL); + + /* allocate deflate state */ + zstrm.zalloc = Z_NULL; + zstrm.zfree = Z_NULL; + zstrm.opaque = Z_NULL; + /* see note in file header for the params we use with deflateInit2() */ + zRet = zlibw.DeflateInit2(&zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY); + if(zRet != Z_OK) { + dbgprintf("error %d returned from zlib/deflateInit2()\n", zRet); + ABORT_FINALIZE(RS_RET_ZLIB_ERR); + } +RUNLOG_STR("deflateInit2() done successfully\n"); + + /* now doing the compression */ + zstrm.avail_in = lenBuf; + zstrm.next_in = (Bytef*) pBuf; + /* run deflate() on input until output buffer not full, finish + compression if all of source has been read in */ + do { + dbgprintf("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in); + zstrm.avail_out = pThis->sIOBufSize; + zstrm.next_out = pThis->pZipBuf; + zRet = zlibw.Deflate(&zstrm, Z_FINISH); /* no bad return value */ + dbgprintf("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out); + assert(zRet != Z_STREAM_ERROR); /* state not clobbered */ + CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, pThis->sIOBufSize - zstrm.avail_out)); + } while (zstrm.avail_out == 0); + assert(zstrm.avail_in == 0); /* all input will be used */ + +RUNLOG_STR("deflate() should be done successfully\n"); + + zRet = zlibw.DeflateEnd(&zstrm); + if(zRet != Z_OK) { + dbgprintf("error %d returned from zlib/deflateEnd()\n", zRet); + ABORT_FINALIZE(RS_RET_ZLIB_ERR); + } +RUNLOG_STR("deflateEnd() done successfully\n"); + +finalize_it: + RETiRet; +} + + +/* write memory buffer to a stream object. + * To support direct writes of large objects, this method may be called + * with a buffer pointing to some region other than the stream buffer itself. + * However, in that case the stream buffer must be empty (strmFlush() has to + * be called before), because we would otherwise mess up with the sequence + * inside the stream. -- rgerhards, 2008-01-10 + */ +static rsRetVal +strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + DEFiRet; + + ASSERT(pThis != NULL); + ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); + + if(pThis->iZipLevel) { + CHKiRet(doZipWrite(pThis, pBuf, lenBuf)); + } else { + /* write without zipping */ + CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf)); + } + +finalize_it: + RETiRet; +} + + /* flush stream output buffer to persistent storage. This can be called at any time * and is automatically called when the output buffer is full. * rgerhards, 2008-01-10 @@ -511,7 +626,7 @@ strmFlush(strm_t *pThis) ASSERT(pThis != NULL); dbgoprint((obj_t*) pThis, "file %d flush, buflen %ld\n", pThis->fd, (long) pThis->iBufPtr); - if(pThis->tOperationsMode == STREAMMODE_WRITE && pThis->iBufPtr > 0) { + if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) { iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr); } @@ -605,6 +720,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) ASSERT(pThis != NULL); ASSERT(pBuf != NULL); +dbgprintf("strmWrite(%p, '%s', %ld);\n", pThis, pBuf,lenBuf); /* check if the to-be-written data is larger than our buffer size */ if(lenBuf >= pThis->sIOBufSize) { /* it is - so we do a direct write, that is most efficient. @@ -646,6 +762,7 @@ DEFpropSetMeth(strm, iFileNumDigits, int) DEFpropSetMeth(strm, tOperationsMode, int) DEFpropSetMeth(strm, tOpenMode, mode_t) DEFpropSetMeth(strm, sType, strmType_t) +DEFpropSetMeth(strm, iZipLevel, int) static rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) { @@ -681,13 +798,14 @@ strmSetFName(strm_t *pThis, uchar *pszName, size_t iLenName) ASSERT(pThis != NULL); ASSERT(pszName != NULL); +dbgprintf("XXX: strm setFname: '%s'\n", pszName); if(iLenName < 1) ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); if(pThis->pszFName != NULL) free(pThis->pszFName); - if((pThis->pszFName = malloc(sizeof(uchar) * iLenName + 1)) == NULL) + if((pThis->pszFName = malloc(sizeof(uchar) * (iLenName + 1))) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); memcpy(pThis->pszFName, pszName, iLenName + 1); /* always think about the \0! */ @@ -711,6 +829,7 @@ strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir) ASSERT(pThis != NULL); ASSERT(pszDir != NULL); +dbgprintf("XXX: strm setDir: '%s'\n", pszDir); if(iLenDir < 1) ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); @@ -927,7 +1046,7 @@ CODESTARTobjQueryInterface(strm) pIf->RecordBegin = strmRecordBegin; pIf->RecordEnd = strmRecordEnd; pIf->Serialize = strmSerialize; - pIf->SetiAddtlOpenFlags = strmSetiAddtlOpenFlags; + /* TODO: remove this! */ pIf->SetiAddtlOpenFlags = strmSetiAddtlOpenFlags; pIf->GetCurrOffset = strmGetCurrOffset; pIf->SetWCntr = strmSetWCntr; /* set methods */ @@ -938,6 +1057,7 @@ CODESTARTobjQueryInterface(strm) pIf->SettOperationsMode = strmSettOperationsMode; pIf->SettOpenMode = strmSettOpenMode; pIf->SetsType = strmSetsType; + pIf->SetiZipLevel = strmSetiZipLevel; finalize_it: ENDobjQueryInterface(strm) -- cgit v1.2.3 From 76da7f9f4e3dc900046a1956153d10532f2b1ae0 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 4 Jun 2009 15:59:37 +0200 Subject: added $OMFileIOBufferSize config directive and plumbing --- runtime/stream.c | 21 ++------------------- 1 file changed, 2 insertions(+), 19 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index abcfce0c..a4844d2b 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -94,7 +94,6 @@ static rsRetVal strmOpenFile(strm_t *pThis) } } -RUNLOG_VAR("%d", pThis->tOperationsMode); /* compute which flags we need to provide to open */ switch(pThis->tOperationsMode) { case STREAMMODE_READ: @@ -113,9 +112,6 @@ RUNLOG_VAR("%d", pThis->tOperationsMode); break; } - iFlags |= pThis->iAddtlOpenFlags; // TODO: remove this! -dbgprintf("XXX: open with flags %d\n", iFlags); - pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode); if(pThis->fd == -1) { int ierrnoSave = errno; @@ -763,6 +759,7 @@ DEFpropSetMeth(strm, tOperationsMode, int) DEFpropSetMeth(strm, tOpenMode, mode_t) DEFpropSetMeth(strm, sType, strmType_t) DEFpropSetMeth(strm, iZipLevel, int) +DEFpropSetMeth(strm, sIOBufSize, size_t) static rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) { @@ -771,19 +768,6 @@ static rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) return RS_RET_OK; } -static rsRetVal strmSetiAddtlOpenFlags(strm_t *pThis, int iNewVal) -{ - DEFiRet; - - if(iNewVal & O_APPEND) - ABORT_FINALIZE(RS_RET_PARAM_ERROR); - - pThis->iAddtlOpenFlags = iNewVal; - -finalize_it: - RETiRet; -} - /* set the stream's file prefix * The passed-in string is duplicated. So if the caller does not need @@ -829,7 +813,6 @@ strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir) ASSERT(pThis != NULL); ASSERT(pszDir != NULL); -dbgprintf("XXX: strm setDir: '%s'\n", pszDir); if(iLenDir < 1) ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); @@ -1046,7 +1029,6 @@ CODESTARTobjQueryInterface(strm) pIf->RecordBegin = strmRecordBegin; pIf->RecordEnd = strmRecordEnd; pIf->Serialize = strmSerialize; - /* TODO: remove this! */ pIf->SetiAddtlOpenFlags = strmSetiAddtlOpenFlags; pIf->GetCurrOffset = strmGetCurrOffset; pIf->SetWCntr = strmSetWCntr; /* set methods */ @@ -1058,6 +1040,7 @@ CODESTARTobjQueryInterface(strm) pIf->SettOpenMode = strmSettOpenMode; pIf->SetsType = strmSetsType; pIf->SetiZipLevel = strmSetiZipLevel; + pIf->SetsIOBufSize = strmSetsIOBufSize; finalize_it: ENDobjQueryInterface(strm) -- cgit v1.2.3 From 8d1e2e496c6a4a4d40d1e8604c746e0d32013536 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 8 Jun 2009 18:39:06 +0200 Subject: improved error handling and added fsync() support ... restoring missing functionality after the restructuring of imfile. As a side-effect, this also lays the foundation for even more reliable queue engine operations (but this is not yet done). --- runtime/stream.c | 111 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 106 insertions(+), 5 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index a4844d2b..8e2f87dc 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -150,6 +150,12 @@ static rsRetVal strmCloseFile(strm_t *pThis) close(pThis->fd); // TODO: error check pThis->fd = -1; + if(pThis->fdDir != -1) { + /* close associated directory handle, if it is open */ + close(pThis->fdDir); + pThis->fdDir = -1; + } + if(pThis->bDeleteOnClose) { unlink((char*) pThis->pszCurrFName); // TODO: check returncode } @@ -395,10 +401,11 @@ finalize_it: BEGINobjConstruct(strm) /* be sure to specify the object type also in END macro! */ pThis->iCurrFNum = 1; pThis->fd = -1; + pThis->fdDir = -1; pThis->iUngetC = -1; pThis->sType = STREAMTYPE_FILE_SINGLE; pThis->sIOBufSize = glblGetIOBufSize(); - pThis->tOpenMode = 0600; /* TODO: make configurable */ + pThis->tOpenMode = 0600; ENDobjConstruct(strm) @@ -428,6 +435,19 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) } } + /* if we are aset to sync, we must obtain a file handle to the directory for fsync() purposes */ + if(pThis->bSync) { + pThis->fdDir = open((char*)pThis->pszDir, O_RDONLY); + if(pThis->fdDir == -1) { + char errStr[1024]; + int err = errno; + rs_strerror_r(err, errStr, sizeof(errStr)); + // TODO: log an error message? think so... + DBGPRINTF("error %d opening directory file for fsync() use - fsync for directory disabled: %s\n", + errno, errStr); + } + } + finalize_it: RETiRet; } @@ -478,6 +498,81 @@ finalize_it: } + +/* issue write() api calls until either the buffer is completely + * written or an error occured (it may happen that multiple writes + * are required, what is perfectly legal. On exit, *pLenBuf contains + * the number of bytes actually written. + * rgerhards, 2009-06-08 + */ +static rsRetVal +doWriteCall(int fd, uchar *pBuf, size_t *pLenBuf) +{ + ssize_t lenBuf; + ssize_t iTotalWritten; + ssize_t iWritten; + char *pWriteBuf; + DEFiRet; + + lenBuf = *pLenBuf; + pWriteBuf = (char*) pBuf; + iTotalWritten = 0; + do { + iWritten = write(fd, pWriteBuf, lenBuf); + if(iWritten < 0) { + char errStr[1024]; + int err = errno; + rs_strerror_r(err, errStr, sizeof(errStr)); + DBGPRINTF("log file (%d) write error %d: %s\n", fd, err, errStr); + if(err == EINTR) { + /*NO ERROR, just continue */; + } else { + ABORT_FINALIZE(RS_RET_ERR); + // TODO: cover more error cases! + } + } + /* advance buffer to next write position */ + iTotalWritten += iWritten; + lenBuf -= iWritten; + pWriteBuf += iWritten; + } while(lenBuf > 0); /* Warning: do..while()! */ + +finalize_it: + *pLenBuf -= iTotalWritten; + RETiRet; +} + + +/* sync the file to disk, so that any unwritten data is persisted. This + * also syncs the directory and thus makes sure that the file survives + * fatal failure. -- rgerhards, 2009-06-08 + */ +static rsRetVal +syncFile(strm_t *pThis) +{ + int ret; + DEFiRet; + + DBGPRINTF("syncing file %d\n", pThis->fd); + ret = fdatasync(pThis->fd); + if(ret != 0) { + char errStr[1024]; + int err = errno; + rs_strerror_r(err, errStr, sizeof(errStr)); + DBGPRINTF("sync failed for file %d with error (%d): %s - ignoring\n", + pThis->fd, err, errStr); + } + // TODO: check error! + + if(pThis->fdDir != -1) { + ret = fsync(pThis->fdDir); +dbgprintf("sync on dir (fd %d) requested, return code %d\n", pThis->fdDir, ret); + } + + RETiRet; +} + + /* physically write to the output file. the provided data is ready for * writing (e.g. zipped if we are requested to do that). * rgerhards, 2009-06-04 @@ -485,17 +580,17 @@ finalize_it: static rsRetVal strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) { + size_t iWritten; DEFiRet; - int iWritten; ASSERT(pThis != NULL); if(pThis->fd == -1) CHKiRet(strmOpenFile(pThis)); - iWritten = write(pThis->fd, pBuf, lenBuf); - dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, iWritten); - /* TODO: handle error case -- rgerhards, 2008-01-07 */ + iWritten = lenBuf; + CHKiRet(doWriteCall(pThis->fd, pBuf, &iWritten)); + dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten); /* Now indicate buffer empty again. We do this in any case, because there * is no way we could react more intelligently to an error during write. @@ -511,6 +606,10 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) if(pThis->pUsrWCntr != NULL) *pThis->pUsrWCntr += iWritten; + if(pThis->bSync) { + CHKiRet(syncFile(pThis)); + } + if(pThis->sType == STREAMTYPE_FILE_CIRCULAR) CHKiRet(strmCheckNextOutputFile(pThis)); @@ -759,6 +858,7 @@ DEFpropSetMeth(strm, tOperationsMode, int) DEFpropSetMeth(strm, tOpenMode, mode_t) DEFpropSetMeth(strm, sType, strmType_t) DEFpropSetMeth(strm, iZipLevel, int) +DEFpropSetMeth(strm, bSync, int) DEFpropSetMeth(strm, sIOBufSize, size_t) static rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) @@ -1040,6 +1140,7 @@ CODESTARTobjQueryInterface(strm) pIf->SettOpenMode = strmSettOpenMode; pIf->SetsType = strmSetsType; pIf->SetiZipLevel = strmSetiZipLevel; + pIf->SetbSync = strmSetbSync; pIf->SetsIOBufSize = strmSetsIOBufSize; finalize_it: ENDobjQueryInterface(strm) -- cgit v1.2.3 From 6f4e3c4e4c85acdcf58969970484a54639ecc8f9 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 10 Jun 2009 16:49:14 +0200 Subject: restructered code in perparation for multiple rule set support ... this was long overdue, and I finlly tackeld it. It turned out to be more complex than I initially thought. The next step now probably is to actually implement multiple rule sets and the beauty that comes with them. --- runtime/stream.c | 3 --- 1 file changed, 3 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index 8e2f87dc..49d29e0e 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -650,7 +650,6 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) dbgprintf("error %d returned from zlib/deflateInit2()\n", zRet); ABORT_FINALIZE(RS_RET_ZLIB_ERR); } -RUNLOG_STR("deflateInit2() done successfully\n"); /* now doing the compression */ zstrm.avail_in = lenBuf; @@ -668,14 +667,12 @@ RUNLOG_STR("deflateInit2() done successfully\n"); } while (zstrm.avail_out == 0); assert(zstrm.avail_in == 0); /* all input will be used */ -RUNLOG_STR("deflate() should be done successfully\n"); zRet = zlibw.DeflateEnd(&zstrm); if(zRet != Z_OK) { dbgprintf("error %d returned from zlib/deflateEnd()\n", zRet); ABORT_FINALIZE(RS_RET_ZLIB_ERR); } -RUNLOG_STR("deflateEnd() done successfully\n"); finalize_it: RETiRet; -- cgit v1.2.3 From 0917edf26da9055c6dc160aafca97896daea3e6c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 12 Jun 2009 09:47:44 +0200 Subject: re-enabled outchannel functionality --- runtime/stream.c | 130 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 124 insertions(+), 6 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index 49d29e0e..5355a1a5 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -6,8 +6,9 @@ * "driver"). * * File begun on 2008-01-09 by RGerhards + * Large modifications in 2009-06 to support using it with omfile, including zip writer. * - * Copyright 2008 Rainer Gerhards and Adiscon GmbH. + * Copyright 2008, 2009 Rainer Gerhards and Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -54,11 +55,112 @@ DEFobjCurrIf(zlibw) /* forward definitions */ static rsRetVal strmFlush(strm_t *pThis); static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); +static rsRetVal strmCloseFile(strm_t *pThis); /* methods */ -/* first, we define type-specific handlers. The provide a generic functionality, + +/* Try to resolve a size limit situation. This is used to support custom-file size handlers + * for omfile. It first runs the command, and then checks if we are still above the size + * treshold. Note that this works only with single file names, NOT with circular names. + * Note that pszCurrFName can NOT be taken from pThis, because the stream is closed when + * we are called (and that destroys pszCurrFName, as there is NO CURRENT file name!). So + * we need to receive the name as a parameter. + * initially wirtten 2005-06-21, moved to this class & updates 2009-06-01, both rgerhards + */ +static rsRetVal +resolveFileSizeLimit(strm_t *pThis, uchar *pszCurrFName) +{ + uchar *pParams; + uchar *pCmd; + uchar *p; + off_t actualFileSize; + rsRetVal localRet; + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); + assert(pszCurrFName != NULL); + + if(pThis->pszSizeLimitCmd == NULL) { + ABORT_FINALIZE(RS_RET_NON_SIZELIMITCMD); /* nothing we can do in this case... */ + } + + /* we first check if we have command line parameters. We assume this, + * when we have a space in the program name. If we find it, everything after + * the space is treated as a single argument. + */ + CHKmalloc(pCmd = ustrdup(pThis->pszSizeLimitCmd)); + + for(p = pCmd ; *p && *p != ' ' ; ++p) { + /* JUST SKIP */ + } + + if(*p == ' ') { + *p = '\0'; /* pretend string-end */ + pParams = p+1; + } else + pParams = NULL; + + /* the execProg() below is probably not great, but at least is is + * fairly secure now. Once we change the way file size limits are + * handled, we should also revisit how this command is run (and + * with which parameters). rgerhards, 2007-07-20 + */ + execProg(pCmd, 1, pParams); + + free(pCmd); + + localRet = getFileSize(pszCurrFName, &actualFileSize); + + if(localRet == RS_RET_OK && actualFileSize >= pThis->iSizeLimit) { + ABORT_FINALIZE(RS_RET_SIZELIMITCMD_DIDNT_RESOLVE); /* OK, it didn't work out... */ + } else if(localRet != RS_RET_FILE_NOT_FOUND) { + /* file not found is OK, the command may have moved away the file */ + ABORT_FINALIZE(localRet); + } + +finalize_it: + if(iRet != RS_RET_OK) { + if(iRet == RS_RET_SIZELIMITCMD_DIDNT_RESOLVE) + dbgprintf("file size limit cmd for file '%s' did no resolve situation\n", pszCurrFName); + else + dbgprintf("file size limit cmd for file '%s' failed with code %d.\n", pszCurrFName, iRet); + pThis->bDisabled = 1; + } + + RETiRet; +} + + +/* Check if the file has grown beyond the configured omfile iSizeLimit + * and, if so, initiate processing. + */ +static rsRetVal +doSizeLimitProcessing(strm_t *pThis) +{ + uchar *pszCurrFName = NULL; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, strm); + ASSERT(pThis->iSizeLimit != 0); + ASSERT(pThis->fd != -1); + + if(pThis->iCurrOffs >= pThis->iSizeLimit) { + /* strmClosefile() destroys the current file name, so we + * need to preserve it. + */ + CHKmalloc(pszCurrFName = ustrdup(pThis->pszCurrFName)); + CHKiRet(strmCloseFile(pThis)); + CHKiRet(resolveFileSizeLimit(pThis, pszCurrFName)); + } + +finalize_it: + free(pszCurrFName); + RETiRet; +} + + +/* now, we define type-specific handlers. The provide a generic functionality, * but for this specific type of strm. The mapping to these handlers happens during * strm construction. Later on, handlers are called by pointers present in the * strm instance object. @@ -123,6 +225,12 @@ static rsRetVal strmOpenFile(strm_t *pThis) } pThis->iCurrOffs = 0; + if(pThis->tOperationsMode == STREAMMODE_WRITE_APPEND) { + /* we need to obtain the current offset */ + off_t offset; + CHKiRet(getFileSize(pThis->pszCurrFName, &offset)); + pThis->iCurrOffs = offset; + } dbgoprint((obj_t*) pThis, "opened file '%s' for %s (0x%x) as %d\n", pThis->pszCurrFName, (pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE", iFlags, pThis->fd); @@ -527,7 +635,7 @@ doWriteCall(int fd, uchar *pBuf, size_t *pLenBuf) if(err == EINTR) { /*NO ERROR, just continue */; } else { - ABORT_FINALIZE(RS_RET_ERR); + ABORT_FINALIZE(RS_RET_IO_ERROR); // TODO: cover more error cases! } } @@ -538,7 +646,7 @@ doWriteCall(int fd, uchar *pBuf, size_t *pLenBuf) } while(lenBuf > 0); /* Warning: do..while()! */ finalize_it: - *pLenBuf -= iTotalWritten; + *pLenBuf = iTotalWritten; RETiRet; } @@ -610,8 +718,11 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) CHKiRet(syncFile(pThis)); } - if(pThis->sType == STREAMTYPE_FILE_CIRCULAR) + if(pThis->sType == STREAMTYPE_FILE_CIRCULAR) { CHKiRet(strmCheckNextOutputFile(pThis)); + } else if(pThis->iSizeLimit != 0) { + CHKiRet(doSizeLimitProcessing(pThis)); + } finalize_it: pThis->iBufPtr = 0; /* see comment above */ @@ -812,7 +923,10 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) ASSERT(pThis != NULL); ASSERT(pBuf != NULL); -dbgprintf("strmWrite(%p, '%s', %ld);\n", pThis, pBuf,lenBuf); +dbgprintf("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs); + if(pThis->bDisabled) + ABORT_FINALIZE(RS_RET_STREAM_DISABLED); + /* check if the to-be-written data is larger than our buffer size */ if(lenBuf >= pThis->sIOBufSize) { /* it is - so we do a direct write, that is most efficient. @@ -857,6 +971,8 @@ DEFpropSetMeth(strm, sType, strmType_t) DEFpropSetMeth(strm, iZipLevel, int) DEFpropSetMeth(strm, bSync, int) DEFpropSetMeth(strm, sIOBufSize, size_t) +DEFpropSetMeth(strm, iSizeLimit, off_t) +DEFpropSetMeth(strm, pszSizeLimitCmd, uchar*) static rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) { @@ -1139,6 +1255,8 @@ CODESTARTobjQueryInterface(strm) pIf->SetiZipLevel = strmSetiZipLevel; pIf->SetbSync = strmSetbSync; pIf->SetsIOBufSize = strmSetsIOBufSize; + pIf->SetiSizeLimit = strmSetiSizeLimit; + pIf->SetpszSizeLimitCmd = strmSetpszSizeLimitCmd; finalize_it: ENDobjQueryInterface(strm) -- cgit v1.2.3 From e3d9843c85b1dfddabc937ac6ccb4057d626bf03 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 12 Jun 2009 11:47:00 +0200 Subject: re-enabled pipe, tty and console in omfile ... by moving code to stream.c. Thanks to the new design, new cases are not really needed, resulting in cleaner code. I also did a cleanup of header file usage as a side-activity. --- runtime/stream.c | 162 ++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 107 insertions(+), 55 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index 5355a1a5..c8672aa2 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -166,35 +166,14 @@ finalize_it: * strm instance object. */ -/* open a strm file - * It is OK to call this function when the stream is already open. In that - * case, it returns immediately with RS_RET_OK +/* do the physical open() call on a file. */ -static rsRetVal strmOpenFile(strm_t *pThis) +static rsRetVal +doPhysOpen(strm_t *pThis) { - DEFiRet; int iFlags; - - ASSERT(pThis != NULL); - - if(pThis->fd != -1) - ABORT_FINALIZE(RS_RET_OK); - - if(pThis->pszFName == NULL) - ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); - - if(pThis->sType == STREAMTYPE_FILE_CIRCULAR) { - CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir, - pThis->pszFName, pThis->lenFName, pThis->iCurrFNum, pThis->iFileNumDigits)); - } else { - if(pThis->pszDir == NULL) { - if((pThis->pszCurrFName = (uchar*) strdup((char*) pThis->pszFName)) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } else { - CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir, - pThis->pszFName, pThis->lenFName, -1, 0)); - } - } + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); /* compute which flags we need to provide to open */ switch(pThis->tOperationsMode) { @@ -222,8 +201,51 @@ static rsRetVal strmOpenFile(strm_t *pThis) ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); else ABORT_FINALIZE(RS_RET_IO_ERROR); + } else { + if(!ustrcmp(pThis->pszCurrFName, UCHAR_CONSTANT(_PATH_CONSOLE)) || isatty(pThis->fd)) { + DBGPRINTF("file %d is a tty-type file\n", pThis->fd); + pThis->bIsTTY = 1; + } else { + pThis->bIsTTY = 0; + } } +finalize_it: + RETiRet; +} + + +/* open a strm file + * It is OK to call this function when the stream is already open. In that + * case, it returns immediately with RS_RET_OK + */ +static rsRetVal strmOpenFile(strm_t *pThis) +{ + DEFiRet; + + ASSERT(pThis != NULL); + + if(pThis->fd != -1) + ABORT_FINALIZE(RS_RET_OK); + + if(pThis->pszFName == NULL) + ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); + + if(pThis->sType == STREAMTYPE_FILE_CIRCULAR) { + CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir, + pThis->pszFName, pThis->lenFName, pThis->iCurrFNum, pThis->iFileNumDigits)); + } else { + if(pThis->pszDir == NULL) { + if((pThis->pszCurrFName = (uchar*) strdup((char*) pThis->pszFName)) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } else { + CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir, + pThis->pszFName, pThis->lenFName, -1, 0)); + } + } + + CHKiRet(doPhysOpen(pThis)); + pThis->iCurrOffs = 0; if(pThis->tOperationsMode == STREAMMODE_WRITE_APPEND) { /* we need to obtain the current offset */ @@ -232,8 +254,8 @@ static rsRetVal strmOpenFile(strm_t *pThis) pThis->iCurrOffs = offset; } - dbgoprint((obj_t*) pThis, "opened file '%s' for %s (0x%x) as %d\n", pThis->pszCurrFName, - (pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE", iFlags, pThis->fd); + dbgoprint((obj_t*) pThis, "opened file '%s' for %s as %d\n", pThis->pszCurrFName, + (pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE", pThis->fd); finalize_it: RETiRet; @@ -255,7 +277,7 @@ static rsRetVal strmCloseFile(strm_t *pThis) if(pThis->tOperationsMode != STREAMMODE_READ) strmFlush(pThis); - close(pThis->fd); // TODO: error check + close(pThis->fd); pThis->fd = -1; if(pThis->fdDir != -1) { @@ -265,7 +287,13 @@ static rsRetVal strmCloseFile(strm_t *pThis) } if(pThis->bDeleteOnClose) { - unlink((char*) pThis->pszCurrFName); // TODO: check returncode + if(unlink((char*) pThis->pszCurrFName) == -1) { + char errStr[1024]; + int err = errno; + rs_strerror_r(err, errStr, sizeof(errStr)); + DBGPRINTF("error %d unlinking '%s' - ignored: %s\n", + errno, pThis->pszCurrFName, errStr); + } } pThis->iCurrOffs = 0; /* we are back at begin of file */ @@ -544,13 +572,12 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) } /* if we are aset to sync, we must obtain a file handle to the directory for fsync() purposes */ - if(pThis->bSync) { - pThis->fdDir = open((char*)pThis->pszDir, O_RDONLY); + if(pThis->bSync && !pThis->bIsTTY) { + pThis->fdDir = open((char*)pThis->pszDir, O_RDONLY | O_CLOEXEC | O_NOCTTY); if(pThis->fdDir == -1) { char errStr[1024]; int err = errno; rs_strerror_r(err, errStr, sizeof(errStr)); - // TODO: log an error message? think so... DBGPRINTF("error %d opening directory file for fsync() use - fsync for directory disabled: %s\n", errno, errStr); } @@ -606,6 +633,29 @@ finalize_it: } +/* try to recover a tty after a write error. This may have happend + * due to vhangup(), and, if so, we can simply re-open it. + */ +#ifdef linux +# define ERR_TTYHUP EIO +#else +# define ERR_TTYHUP EBADF +#endif +static rsRetVal +tryTTYRecover(strm_t *pThis, int err) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); + if(err == ERR_TTYHUP) { + close(pThis->fd); + CHKiRet(doPhysOpen(pThis)); + } + +finalize_it: + RETiRet; +} +#undef ER_TTYHUP + /* issue write() api calls until either the buffer is completely * written or an error occured (it may happen that multiple writes @@ -614,29 +664,36 @@ finalize_it: * rgerhards, 2009-06-08 */ static rsRetVal -doWriteCall(int fd, uchar *pBuf, size_t *pLenBuf) +doWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf) { ssize_t lenBuf; ssize_t iTotalWritten; ssize_t iWritten; char *pWriteBuf; DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); lenBuf = *pLenBuf; pWriteBuf = (char*) pBuf; iTotalWritten = 0; do { - iWritten = write(fd, pWriteBuf, lenBuf); + iWritten = write(pThis->fd, pWriteBuf, lenBuf); if(iWritten < 0) { char errStr[1024]; int err = errno; rs_strerror_r(err, errStr, sizeof(errStr)); - DBGPRINTF("log file (%d) write error %d: %s\n", fd, err, errStr); + DBGPRINTF("log file (%d) write error %d: %s\n", pThis->fd, err, errStr); if(err == EINTR) { /*NO ERROR, just continue */; } else { - ABORT_FINALIZE(RS_RET_IO_ERROR); - // TODO: cover more error cases! + if(pThis->bIsTTY) { + CHKiRet(tryTTYRecover(pThis, err)); + } else { + ABORT_FINALIZE(RS_RET_IO_ERROR); + /* Would it make sense to cover more error cases? So far, I + * do not see good reason to do so. + */ + } } } /* advance buffer to next write position */ @@ -653,7 +710,10 @@ finalize_it: /* sync the file to disk, so that any unwritten data is persisted. This * also syncs the directory and thus makes sure that the file survives - * fatal failure. -- rgerhards, 2009-06-08 + * fatal failure. Note that we do NOT return an error status if the + * sync fails. Doing so would probably cause more trouble than it + * is worth (read: data loss may occur where we otherwise might not + * have it). -- rgerhards, 2009-06-08 */ static rsRetVal syncFile(strm_t *pThis) @@ -661,6 +721,9 @@ syncFile(strm_t *pThis) int ret; DEFiRet; + if(pThis->bIsTTY) + FINALIZE; /* TTYs can not be synced */ + DBGPRINTF("syncing file %d\n", pThis->fd); ret = fdatasync(pThis->fd); if(ret != 0) { @@ -670,19 +733,20 @@ syncFile(strm_t *pThis) DBGPRINTF("sync failed for file %d with error (%d): %s - ignoring\n", pThis->fd, err, errStr); } - // TODO: check error! if(pThis->fdDir != -1) { ret = fsync(pThis->fdDir); -dbgprintf("sync on dir (fd %d) requested, return code %d\n", pThis->fdDir, ret); } +finalize_it: RETiRet; } /* physically write to the output file. the provided data is ready for * writing (e.g. zipped if we are requested to do that). + * Note that if the write() API fails, we do not reset any pointers, but return + * an error code. That means we may redo work in the next iteration. * rgerhards, 2009-06-04 */ static rsRetVal @@ -690,24 +754,15 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) { size_t iWritten; DEFiRet; - - ASSERT(pThis != NULL); + ISOBJ_TYPE_assert(pThis, strm); if(pThis->fd == -1) CHKiRet(strmOpenFile(pThis)); iWritten = lenBuf; - CHKiRet(doWriteCall(pThis->fd, pBuf, &iWritten)); + CHKiRet(doWriteCall(pThis, pBuf, &iWritten)); dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten); - /* Now indicate buffer empty again. We do this in any case, because there - * is no way we could react more intelligently to an error during write. - * This MUST be done BEFORE strCheckNextOutputFile(), otherwise we have an - * endless loop. We reset the buffer pointer also in finalize_it - this is - * necessary if we run into problems. Not resetting it would again cause an - * endless loop. So it is better to loose some data (which also justifies - * duplicating that code, too...) -- rgerhards, 2008-01-10 - */ pThis->iBufPtr = 0; pThis->iCurrOffs += iWritten; /* update user counter, if provided */ @@ -725,8 +780,6 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) } finalize_it: - pThis->iBufPtr = 0; /* see comment above */ - RETiRet; } @@ -995,7 +1048,6 @@ strmSetFName(strm_t *pThis, uchar *pszName, size_t iLenName) ASSERT(pThis != NULL); ASSERT(pszName != NULL); -dbgprintf("XXX: strm setFname: '%s'\n", pszName); if(iLenName < 1) ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); -- cgit v1.2.3 From 16ecb90c3ac88bb2261c31c990d88f97f1a1b32f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Jun 2009 13:44:51 +0200 Subject: omfile buffers are now synchronized after inactivity This is the first shot at this functionality. Currently, we run off a fixed counter in the rsyslogd mainloop, which needs to be restructured. But this code works, so it is a good time for a commit. --- runtime/stream.c | 68 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index c8672aa2..773da319 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -39,6 +39,7 @@ #include #include /* required for HP UX */ #include +#include #include "rsyslog.h" #include "stringbuf.h" @@ -47,10 +48,12 @@ #include "stream.h" #include "unicode-helper.h" #include "module-template.h" +#include "apc.h" /* static data */ DEFobjStaticHelpers DEFobjCurrIf(zlibw) +DEFobjCurrIf(apc) /* forward definitions */ static rsRetVal strmFlush(strm_t *pThis); @@ -60,6 +63,20 @@ static rsRetVal strmCloseFile(strm_t *pThis); /* methods */ +/* async flush apc handler + */ +static void +flushApc(void *param1, void __attribute__((unused)) *param2) +{ + DEFVARS_mutexProtection_uncond; + strm_t *pThis = (strm_t*) param1; + ISOBJ_TYPE_assert(pThis, strm); + + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); + strmFlush(pThis); + END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); +} + /* Try to resolve a size limit situation. This is used to support custom-file size handlers * for omfile. It first runs the command, and then checks if we are still above the size @@ -583,6 +600,11 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) } } + /* if we should call flush apc's, we need a mutex */ + if(pThis->iFlushInterval != 0) { + pthread_mutex_init(&pThis->mut, 0); + } + finalize_it: RETiRet; } @@ -602,6 +624,11 @@ CODESTARTobjDestruct(strm) objRelease(zlibw, LM_ZLIBW_FILENAME); } + if(pThis->iFlushInterval != 0) { + // TODO: check if there is an apc and remove it! + pthread_mutex_destroy(&pThis->mut); + } + free(pThis->pszDir); free(pThis->pIOBuf); free(pThis->pZipBuf); @@ -965,11 +992,33 @@ finalize_it: } +/* schedule an Apc flush request. + * rgerhards, 2009-06-15 + */ +static inline rsRetVal +scheduleFlushRequest(strm_t *pThis) +{ + apc_t *pApc; + DEFiRet; + + CHKiRet(apc.CancelApc(pThis->apcID)); +dbgprintf("XXX: requesting to add apc!\n"); + CHKiRet(apc.Construct(&pApc)); + CHKiRet(apc.SetProcedure(pApc, (void (*)(void*, void*))flushApc)); + CHKiRet(apc.SetParam1(pApc, pThis)); + CHKiRet(apc.ConstructFinalize(pApc, &pThis->apcID)); + +finalize_it: + RETiRet; +} + + /* write memory buffer to a stream object */ static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) { + DEFVARS_mutexProtection_uncond; DEFiRet; size_t iPartial; @@ -980,6 +1029,11 @@ dbgprintf("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n if(pThis->bDisabled) ABORT_FINALIZE(RS_RET_STREAM_DISABLED); +RUNLOG_VAR("%d", pThis->iFlushInterval); + if(pThis->iFlushInterval != 0) { + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); + } + /* check if the to-be-written data is larger than our buffer size */ if(lenBuf >= pThis->sIOBufSize) { /* it is - so we do a direct write, that is most efficient. @@ -1008,7 +1062,17 @@ dbgprintf("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n } } + /* we ignore the outcome of scheduleFlushRequest(), as we will write the data always at + * termination. For Zip mode, it could be fatal if we write after each record. + */ + if(pThis->iFlushInterval != 0) + scheduleFlushRequest(pThis); + finalize_it: + if(pThis->iFlushInterval != 0) { + END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); + } + RETiRet; } @@ -1025,6 +1089,7 @@ DEFpropSetMeth(strm, iZipLevel, int) DEFpropSetMeth(strm, bSync, int) DEFpropSetMeth(strm, sIOBufSize, size_t) DEFpropSetMeth(strm, iSizeLimit, off_t) +DEFpropSetMeth(strm, iFlushInterval, int) DEFpropSetMeth(strm, pszSizeLimitCmd, uchar*) static rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) @@ -1308,6 +1373,7 @@ CODESTARTobjQueryInterface(strm) pIf->SetbSync = strmSetbSync; pIf->SetsIOBufSize = strmSetsIOBufSize; pIf->SetiSizeLimit = strmSetiSizeLimit; + pIf->SetiFlushInterval = strmSetiFlushInterval; pIf->SetpszSizeLimitCmd = strmSetpszSizeLimitCmd; finalize_it: ENDobjQueryInterface(strm) @@ -1319,6 +1385,8 @@ ENDobjQueryInterface(strm) */ BEGINObjClassInit(strm, 1, OBJ_IS_CORE_MODULE) /* request objects we use */ + CHKiRet(objUse(apc, CORE_COMPONENT)); + OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize); OBJSetMethodHandler(objMethod_SETPROPERTY, strmSetProperty); OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, strmConstructFinalize); -- cgit v1.2.3 From f7579e68a67364c8040966be57c2eae4c9550ee5 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 16 Jun 2009 11:36:05 +0200 Subject: done various optimizations to the stringbuf and its users --- runtime/stream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index f1f69cc8..1cff2da6 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -368,7 +368,7 @@ strmReadLine(strm_t *pThis, cstr_t **ppCStr) CHKiRet(rsCStrAppendChar(*ppCStr, c)); CHKiRet(strmReadChar(pThis, &c)); } - CHKiRet(rsCStrFinish(*ppCStr)); + CHKiRet(cstrFinalize(*ppCStr)); finalize_it: if(iRet != RS_RET_OK && *ppCStr != NULL) -- cgit v1.2.3 From 6f0db63e9b962edf6305860b608500e8c650b71b Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 16 Jun 2009 15:13:47 +0200 Subject: milestone: input-side multiSubmit capability ... commit before I try to touch the queue side ;) --- runtime/stream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index f13258b5..8cbe0297 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -188,7 +188,7 @@ finalize_it: static rsRetVal doPhysOpen(strm_t *pThis) { - int iFlags; + int iFlags = 0; DEFiRet; ISOBJ_TYPE_assert(pThis, strm); -- cgit v1.2.3 From d2d54013aebb756169182ed8716b142d27134a70 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 17 Jun 2009 15:22:13 +0200 Subject: going forward in moving string-handling functions to new interface... --- runtime/stream.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index 8cbe0297..f9859617 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -531,19 +531,19 @@ strmReadLine(strm_t *pThis, cstr_t **ppCStr) ASSERT(pThis != NULL); ASSERT(ppCStr != NULL); - CHKiRet(rsCStrConstruct(ppCStr)); + CHKiRet(cstrConstruct(ppCStr)); /* now read the line */ CHKiRet(strmReadChar(pThis, &c)); while(c != '\n') { - CHKiRet(rsCStrAppendChar(*ppCStr, c)); + CHKiRet(cstrAppendChar(*ppCStr, c)); CHKiRet(strmReadChar(pThis, &c)); } CHKiRet(cstrFinalize(*ppCStr)); finalize_it: if(iRet != RS_RET_OK && *ppCStr != NULL) - rsCStrDestruct(ppCStr); + cstrDestruct(ppCStr); RETiRet; } -- cgit v1.2.3 From b1f2e53921b7ab19c363a63de47a0e7a35866052 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 23 Jun 2009 15:17:55 +0200 Subject: prevented unneccessary apc calls --- runtime/stream.c | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index f9859617..00c726d9 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -74,6 +74,7 @@ flushApc(void *param1, void __attribute__((unused)) *param2) BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); strmFlush(pThis); + pThis->apcRequested = 0; END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); } @@ -1001,12 +1002,16 @@ scheduleFlushRequest(strm_t *pThis) apc_t *pApc; DEFiRet; - CHKiRet(apc.CancelApc(pThis->apcID)); + if(!pThis->apcRequested) { + /* we do an request only if none is yet pending */ + pThis->apcRequested = 1; + // TODO: find similar thing later CHKiRet(apc.CancelApc(pThis->apcID)); dbgprintf("XXX: requesting to add apc!\n"); - CHKiRet(apc.Construct(&pApc)); - CHKiRet(apc.SetProcedure(pApc, (void (*)(void*, void*))flushApc)); - CHKiRet(apc.SetParam1(pApc, pThis)); - CHKiRet(apc.ConstructFinalize(pApc, &pThis->apcID)); + CHKiRet(apc.Construct(&pApc)); + CHKiRet(apc.SetProcedure(pApc, (void (*)(void*, void*))flushApc)); + CHKiRet(apc.SetParam1(pApc, pThis)); + CHKiRet(apc.ConstructFinalize(pApc, &pThis->apcID)); + } finalize_it: RETiRet; -- cgit v1.2.3 From e3040285dbf0854443bc2443e0de5ac59f6f839e Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 6 Jul 2009 16:38:09 +0200 Subject: first shot at asynchronous stream writer with timeout capability ... seems to work on quick testing, but needs a far more testing and improvement. Good milestone commit. --- runtime/stream.c | 216 +++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 146 insertions(+), 70 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index 00c726d9..a9f4803f 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -48,37 +48,21 @@ #include "stream.h" #include "unicode-helper.h" #include "module-template.h" -#include "apc.h" +#include /* static data */ DEFobjStaticHelpers DEFobjCurrIf(zlibw) -DEFobjCurrIf(apc) /* forward definitions */ static rsRetVal strmFlush(strm_t *pThis); static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); static rsRetVal strmCloseFile(strm_t *pThis); +static void *asyncWriterThread(void *pPtr); /* methods */ -/* async flush apc handler - */ -static void -flushApc(void *param1, void __attribute__((unused)) *param2) -{ - DEFVARS_mutexProtection_uncond; - strm_t *pThis = (strm_t*) param1; - ISOBJ_TYPE_assert(pThis, strm); - - BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); - strmFlush(pThis); - pThis->apcRequested = 0; - END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); -} - - /* Try to resolve a size limit situation. This is used to support custom-file size handlers * for omfile. It first runs the command, and then checks if we are still above the size * treshold. Note that this works only with single file names, NOT with circular names. @@ -569,11 +553,11 @@ ENDobjConstruct(strm) static rsRetVal strmConstructFinalize(strm_t *pThis) { rsRetVal localRet; + int i; DEFiRet; ASSERT(pThis != NULL); - CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); pThis->iBufPtrMax = 0; /* results in immediate read request */ if(pThis->iZipLevel) { /* do we need a zip buf? */ localRet = objUse(zlibw, LM_ZLIBW_FILENAME); @@ -601,9 +585,33 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) } } - /* if we should call flush apc's, we need a mutex */ +dbgprintf("TTT: before checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); + /* if we have a flush interval, we need to do async writes in any case */ if(pThis->iFlushInterval != 0) { + pThis->bAsyncWrite = 1; + } +dbgprintf("TTT: after checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); + + /* if we work asynchronously, we need a couple of synchronization objects */ + if(pThis->bAsyncWrite) { pthread_mutex_init(&pThis->mut, 0); + pthread_cond_init(&pThis->notFull, 0); + pthread_cond_init(&pThis->notEmpty, 0); + pthread_cond_init(&pThis->isEmpty, 0); + pThis->iCnt = pThis->iEnq = pThis->iDeq = 0; + for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) { + CHKmalloc(pThis->asyncBuf[i].pBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); + } + //pThis->pIOBuf = pThis->ioBuf[0]; + pThis->bStopWriter = 0; + // TODO: detached thread? + if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0) + dbgprintf("ERROR: stream %p cold not create writer thread\n", pThis); + // TODO: remove that below later! + CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); + } else { + /* we work synchronously, so we need to alloc a fixed pIOBuf */ + CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); } finalize_it: @@ -611,8 +619,26 @@ finalize_it: } +/* stop the writer thread (we MUST be runnnig asynchronously when this method + * is called!) -- rgerhards, 2009-07-06 + */ +static inline void +stopWriter(strm_t *pThis) +{ + BEGINfunc + d_pthread_mutex_lock(&pThis->mut); + pThis->bStopWriter = 1; + pthread_cond_signal(&pThis->notEmpty); + d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut); + d_pthread_mutex_unlock(&pThis->mut); + pthread_join(pThis->writerThreadID, NULL); + ENDfunc +} + + /* destructor for the strm object */ BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */ + int i; CODESTARTobjDestruct(strm) if(pThis->tOperationsMode != STREAMMODE_READ) strmFlush(pThis); @@ -625,16 +651,23 @@ CODESTARTobjDestruct(strm) objRelease(zlibw, LM_ZLIBW_FILENAME); } - if(pThis->iFlushInterval != 0) { - // TODO: check if there is an apc and remove it! - pthread_mutex_destroy(&pThis->mut); - } - free(pThis->pszDir); - free(pThis->pIOBuf); free(pThis->pZipBuf); free(pThis->pszCurrFName); free(pThis->pszFName); + + if(pThis->bAsyncWrite) { + stopWriter(pThis); + pthread_mutex_destroy(&pThis->mut); + pthread_cond_destroy(&pThis->notFull); + pthread_cond_destroy(&pThis->notEmpty); + pthread_cond_destroy(&pThis->isEmpty); + for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) { + free(pThis->asyncBuf[i].pBuf); + } + } else { + free(pThis->pIOBuf); + } ENDobjDestruct(strm) @@ -730,11 +763,93 @@ doWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf) pWriteBuf += iWritten; } while(lenBuf > 0); /* Warning: do..while()! */ + dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten); + finalize_it: *pLenBuf = iTotalWritten; RETiRet; } +#include + +/* This is the writer thread for asynchronous mode. + * -- rgerhards, 2009-07-06 + */ +static void* +asyncWriterThread(void *pPtr) +{ + int iDeq; + size_t iWritten; + strm_t *pThis = (strm_t*) pPtr; + ISOBJ_TYPE_assert(pThis, strm); + + BEGINfunc + if(prctl(PR_SET_NAME, "rs:asyn strmwr", 0, 0, 0) != 0) { + DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer"); + } + +fprintf(stderr, "async stream writer thread started\n");fflush(stderr); +dbgprintf("TTT: writer thread startup\n"); + while(1) { /* loop broken inside */ + d_pthread_mutex_lock(&pThis->mut); + while(pThis->iCnt == 0) { +dbgprintf("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter); + if(pThis->bStopWriter) { + pthread_cond_signal(&pThis->isEmpty); + d_pthread_mutex_unlock(&pThis->mut); + goto finalize_it; /* break main loop */ + } + d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut); + } + + iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; + iWritten = pThis->asyncBuf[iDeq].lenBuf; + doWriteCall(pThis, pThis->asyncBuf[iDeq].pBuf, &iWritten); + // TODO: error check!!!!! 2009-07-06 + + --pThis->iCnt; + if(pThis->iCnt < STREAM_ASYNC_NUMBUFS) { + pthread_cond_signal(&pThis->notFull); + } + d_pthread_mutex_unlock(&pThis->mut); + } + +finalize_it: +dbgprintf("TTT: writer thread shutdown\n"); + ENDfunc + return NULL; /* to keep pthreads happy */ +} + + +/* This function is called to "do" an async write call, what primarily means that + * the data is handed over to the writer thread (which will then do the actual write + * in parallel. -- rgerhards, 2009-07-06 + */ +static inline rsRetVal +doAsyncWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf) +{ + int iEnq; + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); + + d_pthread_mutex_lock(&pThis->mut); + while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) + d_pthread_cond_wait(&pThis->notFull, &pThis->mut); + + iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; + // TODO: optimize, memcopy only for getting it initially going! + //pThis->asyncBuf[iEnq].pBuf = pBuf; + memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, *pLenBuf); + pThis->asyncBuf[iEnq].lenBuf = *pLenBuf; + + if(++pThis->iCnt == 1) + pthread_cond_signal(&pThis->notEmpty); + d_pthread_mutex_unlock(&pThis->mut); + +finalize_it: + RETiRet; +} + /* sync the file to disk, so that any unwritten data is persisted. This * also syncs the directory and thus makes sure that the file survives @@ -788,8 +903,11 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) CHKiRet(strmOpenFile(pThis)); iWritten = lenBuf; - CHKiRet(doWriteCall(pThis, pBuf, &iWritten)); - dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten); + if(pThis->bAsyncWrite) { + CHKiRet(doAsyncWriteCall(pThis, pBuf, &iWritten)); + } else { + CHKiRet(doWriteCall(pThis, pBuf, &iWritten)); + } pThis->iBufPtr = 0; pThis->iCurrOffs += iWritten; @@ -993,37 +1111,11 @@ finalize_it: } -/* schedule an Apc flush request. - * rgerhards, 2009-06-15 - */ -static inline rsRetVal -scheduleFlushRequest(strm_t *pThis) -{ - apc_t *pApc; - DEFiRet; - - if(!pThis->apcRequested) { - /* we do an request only if none is yet pending */ - pThis->apcRequested = 1; - // TODO: find similar thing later CHKiRet(apc.CancelApc(pThis->apcID)); -dbgprintf("XXX: requesting to add apc!\n"); - CHKiRet(apc.Construct(&pApc)); - CHKiRet(apc.SetProcedure(pApc, (void (*)(void*, void*))flushApc)); - CHKiRet(apc.SetParam1(pApc, pThis)); - CHKiRet(apc.ConstructFinalize(pApc, &pThis->apcID)); - } - -finalize_it: - RETiRet; -} - - /* write memory buffer to a stream object */ static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) { - DEFVARS_mutexProtection_uncond; DEFiRet; size_t iPartial; @@ -1034,11 +1126,6 @@ dbgprintf("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n if(pThis->bDisabled) ABORT_FINALIZE(RS_RET_STREAM_DISABLED); -RUNLOG_VAR("%d", pThis->iFlushInterval); - if(pThis->iFlushInterval != 0) { - BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); - } - /* check if the to-be-written data is larger than our buffer size */ if(lenBuf >= pThis->sIOBufSize) { /* it is - so we do a direct write, that is most efficient. @@ -1067,17 +1154,7 @@ RUNLOG_VAR("%d", pThis->iFlushInterval); } } - /* we ignore the outcome of scheduleFlushRequest(), as we will write the data always at - * termination. For Zip mode, it could be fatal if we write after each record. - */ - if(pThis->iFlushInterval != 0) - scheduleFlushRequest(pThis); - finalize_it: - if(pThis->iFlushInterval != 0) { - END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); - } - RETiRet; } @@ -1390,7 +1467,6 @@ ENDobjQueryInterface(strm) */ BEGINObjClassInit(strm, 1, OBJ_IS_CORE_MODULE) /* request objects we use */ - CHKiRet(objUse(apc, CORE_COMPONENT)); OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize); OBJSetMethodHandler(objMethod_SETPROPERTY, strmSetProperty); -- cgit v1.2.3 From 8e76a0521bee36e02e8bce2e97fa3d2aa67130da Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 6 Jul 2009 19:28:22 +0200 Subject: some minor cleanup --- runtime/stream.c | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index a9f4803f..2bb19caf 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -123,10 +123,11 @@ resolveFileSizeLimit(strm_t *pThis, uchar *pszCurrFName) finalize_it: if(iRet != RS_RET_OK) { - if(iRet == RS_RET_SIZELIMITCMD_DIDNT_RESOLVE) - dbgprintf("file size limit cmd for file '%s' did no resolve situation\n", pszCurrFName); - else - dbgprintf("file size limit cmd for file '%s' failed with code %d.\n", pszCurrFName, iRet); + if(iRet == RS_RET_SIZELIMITCMD_DIDNT_RESOLVE) { + DBGPRINTF("file size limit cmd for file '%s' did no resolve situation\n", pszCurrFName); + } else { + DBGPRINTF("file size limit cmd for file '%s' failed with code %d.\n", pszCurrFName, iRet); + } pThis->bDisabled = 1; } @@ -585,12 +586,12 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) } } -dbgprintf("TTT: before checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); +DBGPRINTF("TTT: before checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); /* if we have a flush interval, we need to do async writes in any case */ if(pThis->iFlushInterval != 0) { pThis->bAsyncWrite = 1; } -dbgprintf("TTT: after checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); +DBGPRINTF("TTT: after checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); /* if we work asynchronously, we need a couple of synchronization objects */ if(pThis->bAsyncWrite) { @@ -604,9 +605,8 @@ dbgprintf("TTT: after checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlus } //pThis->pIOBuf = pThis->ioBuf[0]; pThis->bStopWriter = 0; - // TODO: detached thread? if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0) - dbgprintf("ERROR: stream %p cold not create writer thread\n", pThis); + DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis); // TODO: remove that below later! CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); } else { @@ -789,11 +789,11 @@ asyncWriterThread(void *pPtr) } fprintf(stderr, "async stream writer thread started\n");fflush(stderr); -dbgprintf("TTT: writer thread startup\n"); +DBGPRINTF("TTT: writer thread startup\n"); while(1) { /* loop broken inside */ d_pthread_mutex_lock(&pThis->mut); while(pThis->iCnt == 0) { -dbgprintf("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter); +DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter); if(pThis->bStopWriter) { pthread_cond_signal(&pThis->isEmpty); d_pthread_mutex_unlock(&pThis->mut); @@ -815,7 +815,7 @@ dbgprintf("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter) } finalize_it: -dbgprintf("TTT: writer thread shutdown\n"); +DBGPRINTF("TTT: writer thread shutdown\n"); ENDfunc return NULL; /* to keep pthreads happy */ } @@ -957,7 +957,7 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) /* see note in file header for the params we use with deflateInit2() */ zRet = zlibw.DeflateInit2(&zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY); if(zRet != Z_OK) { - dbgprintf("error %d returned from zlib/deflateInit2()\n", zRet); + DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet); ABORT_FINALIZE(RS_RET_ZLIB_ERR); } @@ -967,11 +967,11 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) /* run deflate() on input until output buffer not full, finish compression if all of source has been read in */ do { - dbgprintf("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in); + DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in); zstrm.avail_out = pThis->sIOBufSize; zstrm.next_out = pThis->pZipBuf; zRet = zlibw.Deflate(&zstrm, Z_FINISH); /* no bad return value */ - dbgprintf("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out); + DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out); assert(zRet != Z_STREAM_ERROR); /* state not clobbered */ CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, pThis->sIOBufSize - zstrm.avail_out)); } while (zstrm.avail_out == 0); @@ -980,7 +980,7 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) zRet = zlibw.DeflateEnd(&zstrm); if(zRet != Z_OK) { - dbgprintf("error %d returned from zlib/deflateEnd()\n", zRet); + DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet); ABORT_FINALIZE(RS_RET_ZLIB_ERR); } @@ -1122,7 +1122,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) ASSERT(pThis != NULL); ASSERT(pBuf != NULL); -dbgprintf("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs); +DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs); if(pThis->bDisabled) ABORT_FINALIZE(RS_RET_STREAM_DISABLED); -- cgit v1.2.3 From 53b055b6aabd87fa096edf70a6e58eea6c87f38b Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 6 Jul 2009 19:44:53 +0200 Subject: moved zip part to writer thread ... this is necessary in preparation for the final solution (we need to have a "unified" writer). If it causes worse performance to have the zip writher togehter with the synchronous write, we may do an async write... --- runtime/stream.c | 158 +++++++++++++++++++++++++++++++------------------------ 1 file changed, 89 insertions(+), 69 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index 2bb19caf..dfb43891 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -59,6 +59,8 @@ static rsRetVal strmFlush(strm_t *pThis); static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); static rsRetVal strmCloseFile(strm_t *pThis); static void *asyncWriterThread(void *pPtr); +static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); +static rsRetVal strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); /* methods */ @@ -586,12 +588,10 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) } } -DBGPRINTF("TTT: before checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); /* if we have a flush interval, we need to do async writes in any case */ if(pThis->iFlushInterval != 0) { pThis->bAsyncWrite = 1; } -DBGPRINTF("TTT: after checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); /* if we work asynchronously, we need a couple of synchronization objects */ if(pThis->bAsyncWrite) { @@ -770,7 +770,88 @@ finalize_it: RETiRet; } -#include + + +/* write memory buffer to a stream object. + * To support direct writes of large objects, this method may be called + * with a buffer pointing to some region other than the stream buffer itself. + * However, in that case the stream buffer must be empty (strmFlush() has to + * be called before), because we would otherwise mess up with the sequence + * inside the stream. -- rgerhards, 2008-01-10 + */ +static inline rsRetVal +doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + DEFiRet; + + ASSERT(pThis != NULL); + ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); + + if(pThis->iZipLevel) { + CHKiRet(doZipWrite(pThis, pBuf, lenBuf)); + } else { + /* write without zipping */ + CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf)); + } + +finalize_it: + RETiRet; +} + + +/* This function is called to "do" an async write call, what primarily means that + * the data is handed over to the writer thread (which will then do the actual write + * in parallel. -- rgerhards, 2009-07-06 + */ +static inline rsRetVal +doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + int iEnq; + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); + + d_pthread_mutex_lock(&pThis->mut); + while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) + d_pthread_cond_wait(&pThis->notFull, &pThis->mut); + + iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; + // TODO: optimize, memcopy only for getting it initially going! + //pThis->asyncBuf[iEnq].pBuf = pBuf; + memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, lenBuf); + pThis->asyncBuf[iEnq].lenBuf = lenBuf; + + if(++pThis->iCnt == 1) + pthread_cond_signal(&pThis->notEmpty); + d_pthread_mutex_unlock(&pThis->mut); + +finalize_it: + RETiRet; +} + + +/* schedule writing to the stream. Depending on our concurrency settings, + * this either directly writes to the stream or schedules writing via + * the background thread. -- rgerhards, 2009-07-07 + */ +static rsRetVal +strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + DEFiRet; + + ASSERT(pThis != NULL); + ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); + + if(pThis->bAsyncWrite) { + CHKiRet(doAsyncWriteInternal(pThis, pBuf, lenBuf)); + } else { + CHKiRet(doWriteInternal(pThis, pBuf, lenBuf)); + } + +finalize_it: + RETiRet; +} + + /* This is the writer thread for asynchronous mode. * -- rgerhards, 2009-07-06 @@ -788,7 +869,6 @@ asyncWriterThread(void *pPtr) DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer"); } -fprintf(stderr, "async stream writer thread started\n");fflush(stderr); DBGPRINTF("TTT: writer thread startup\n"); while(1) { /* loop broken inside */ d_pthread_mutex_lock(&pThis->mut); @@ -804,7 +884,8 @@ DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter) iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; iWritten = pThis->asyncBuf[iDeq].lenBuf; - doWriteCall(pThis, pThis->asyncBuf[iDeq].pBuf, &iWritten); + doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, iWritten); + // doWriteCall(pThis, pThis->asyncBuf[iDeq].pBuf, &iWritten); // TODO: error check!!!!! 2009-07-06 --pThis->iCnt; @@ -821,36 +902,6 @@ DBGPRINTF("TTT: writer thread shutdown\n"); } -/* This function is called to "do" an async write call, what primarily means that - * the data is handed over to the writer thread (which will then do the actual write - * in parallel. -- rgerhards, 2009-07-06 - */ -static inline rsRetVal -doAsyncWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf) -{ - int iEnq; - DEFiRet; - ISOBJ_TYPE_assert(pThis, strm); - - d_pthread_mutex_lock(&pThis->mut); - while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) - d_pthread_cond_wait(&pThis->notFull, &pThis->mut); - - iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; - // TODO: optimize, memcopy only for getting it initially going! - //pThis->asyncBuf[iEnq].pBuf = pBuf; - memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, *pLenBuf); - pThis->asyncBuf[iEnq].lenBuf = *pLenBuf; - - if(++pThis->iCnt == 1) - pthread_cond_signal(&pThis->notEmpty); - d_pthread_mutex_unlock(&pThis->mut); - -finalize_it: - RETiRet; -} - - /* sync the file to disk, so that any unwritten data is persisted. This * also syncs the directory and thus makes sure that the file survives * fatal failure. Note that we do NOT return an error status if the @@ -903,11 +954,7 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) CHKiRet(strmOpenFile(pThis)); iWritten = lenBuf; - if(pThis->bAsyncWrite) { - CHKiRet(doAsyncWriteCall(pThis, pBuf, &iWritten)); - } else { - CHKiRet(doWriteCall(pThis, pBuf, &iWritten)); - } + CHKiRet(doWriteCall(pThis, pBuf, &iWritten)); pThis->iBufPtr = 0; pThis->iCurrOffs += iWritten; @@ -989,33 +1036,6 @@ finalize_it: } -/* write memory buffer to a stream object. - * To support direct writes of large objects, this method may be called - * with a buffer pointing to some region other than the stream buffer itself. - * However, in that case the stream buffer must be empty (strmFlush() has to - * be called before), because we would otherwise mess up with the sequence - * inside the stream. -- rgerhards, 2008-01-10 - */ -static rsRetVal -strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) -{ - DEFiRet; - - ASSERT(pThis != NULL); - ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); - - if(pThis->iZipLevel) { - CHKiRet(doZipWrite(pThis, pBuf, lenBuf)); - } else { - /* write without zipping */ - CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf)); - } - -finalize_it: - RETiRet; -} - - /* flush stream output buffer to persistent storage. This can be called at any time * and is automatically called when the output buffer is full. * rgerhards, 2008-01-10 @@ -1029,7 +1049,7 @@ strmFlush(strm_t *pThis) dbgoprint((obj_t*) pThis, "file %d flush, buflen %ld\n", pThis->fd, (long) pThis->iBufPtr); if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) { - iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr); + iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr); } RETiRet; @@ -1132,7 +1152,7 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n * TODO: is it really? think about disk block sizes! */ CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */ - CHKiRet(strmWriteInternal(pThis, pBuf, lenBuf)); + CHKiRet(strmSchedWrite(pThis, pBuf, lenBuf)); } else { /* data fits into a buffer - we just need to see if it * fits into the current buffer... -- cgit v1.2.3 From f27b5cc2535f0b1763e1963304546b611cd3c26a Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Jul 2009 08:08:22 +0200 Subject: moved locking primitives --- runtime/stream.c | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index dfb43891..d25f8b02 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -801,7 +801,8 @@ finalize_it: /* This function is called to "do" an async write call, what primarily means that * the data is handed over to the writer thread (which will then do the actual write - * in parallel. -- rgerhards, 2009-07-06 + * in parallel). Note that the stream mutex has already been locked by the + * strmWrite...() calls. -- rgerhards, 2009-07-06 */ static inline rsRetVal doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) @@ -810,7 +811,6 @@ doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) DEFiRet; ISOBJ_TYPE_assert(pThis, strm); - d_pthread_mutex_lock(&pThis->mut); while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) d_pthread_cond_wait(&pThis->notFull, &pThis->mut); @@ -1102,6 +1102,9 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c) ASSERT(pThis != NULL); + if(pThis->bAsyncWrite) + d_pthread_mutex_lock(&pThis->mut); + /* if the buffer is full, we need to flush before we can write */ if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); @@ -1111,11 +1114,18 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c) pThis->iBufPtr++; finalize_it: + if(pThis->bAsyncWrite) + d_pthread_mutex_unlock(&pThis->mut); + RETiRet; } -/* write an integer value (actually a long) to a stream object */ +/* write an integer value (actually a long) to a stream object + * Note that we do not need to lock the mutex here, because we call + * strmWrite(), which does the lock (aka: we must not lock it, else we + * would run into a recursive lock, resulting in a deadlock!) + */ static rsRetVal strmWriteLong(strm_t *pThis, long i) { DEFiRet; @@ -1143,6 +1153,9 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) ASSERT(pBuf != NULL); DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs); + if(pThis->bAsyncWrite) + d_pthread_mutex_lock(&pThis->mut); + if(pThis->bDisabled) ABORT_FINALIZE(RS_RET_STREAM_DISABLED); @@ -1175,6 +1188,9 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n } finalize_it: + if(pThis->bAsyncWrite) + d_pthread_mutex_unlock(&pThis->mut); + RETiRet; } -- cgit v1.2.3 From 4e9deb5b88129a397d23b55e3954c6f1c259e466 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Jul 2009 08:33:22 +0200 Subject: stream now uses a singular buffer strucuture for writing --- runtime/stream.c | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index d25f8b02..6924d527 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -603,12 +603,12 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) { CHKmalloc(pThis->asyncBuf[i].pBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); } - //pThis->pIOBuf = pThis->ioBuf[0]; + pThis->pIOBuf = pThis->asyncBuf[0].pBuf; pThis->bStopWriter = 0; if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0) DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis); // TODO: remove that below later! - CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); + //CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); } else { /* we work synchronously, so we need to alloc a fixed pIOBuf */ CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); @@ -802,7 +802,10 @@ finalize_it: /* This function is called to "do" an async write call, what primarily means that * the data is handed over to the writer thread (which will then do the actual write * in parallel). Note that the stream mutex has already been locked by the - * strmWrite...() calls. -- rgerhards, 2009-07-06 + * strmWrite...() calls. Also note that we always have only a single producer, + * so we can simply serially assign the next free buffer to it and be sure that + * the very some producer comes back in sequence to submit the then-filled buffers. + * This also enables us to timout on partially written buffers. -- rgerhards, 2009-07-06 */ static inline rsRetVal doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) @@ -814,11 +817,12 @@ doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) d_pthread_cond_wait(&pThis->notFull, &pThis->mut); - iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; + //iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; // TODO: optimize, memcopy only for getting it initially going! //pThis->asyncBuf[iEnq].pBuf = pBuf; - memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, lenBuf); - pThis->asyncBuf[iEnq].lenBuf = lenBuf; + //memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, lenBuf); + pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf; + pThis->pIOBuf = pThis->asyncBuf[pThis->iEnq++ % STREAM_ASYNC_NUMBUFS].pBuf; if(++pThis->iCnt == 1) pthread_cond_signal(&pThis->notEmpty); @@ -1164,6 +1168,8 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n /* it is - so we do a direct write, that is most efficient. * TODO: is it really? think about disk block sizes! */ +printf("error: we should not have reached this code!\n"); + abort(); CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */ CHKiRet(strmSchedWrite(pThis, pBuf, lenBuf)); } else { -- cgit v1.2.3 From 53aa68fc7d78279ab1af88de253a6134fc7ef00d Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Jul 2009 09:16:14 +0200 Subject: clean solution for "writing" arbrietary-size user buffers to a stream --- runtime/stream.c | 70 +++++++++++++++++++++++--------------------------------- 1 file changed, 28 insertions(+), 42 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index 6924d527..92122215 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -607,8 +607,6 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) pThis->bStopWriter = 0; if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0) DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis); - // TODO: remove that below later! - //CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); } else { /* we work synchronously, so we need to alloc a fixed pIOBuf */ CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); @@ -808,19 +806,14 @@ finalize_it: * This also enables us to timout on partially written buffers. -- rgerhards, 2009-07-06 */ static inline rsRetVal -doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) +doAsyncWriteInternal(strm_t *pThis, size_t lenBuf) { - int iEnq; DEFiRet; ISOBJ_TYPE_assert(pThis, strm); while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) d_pthread_cond_wait(&pThis->notFull, &pThis->mut); - //iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; - // TODO: optimize, memcopy only for getting it initially going! - //pThis->asyncBuf[iEnq].pBuf = pBuf; - //memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, lenBuf); pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf; pThis->pIOBuf = pThis->asyncBuf[pThis->iEnq++ % STREAM_ASYNC_NUMBUFS].pBuf; @@ -846,7 +839,7 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); if(pThis->bAsyncWrite) { - CHKiRet(doAsyncWriteInternal(pThis, pBuf, lenBuf)); + CHKiRet(doAsyncWriteInternal(pThis, lenBuf)); } else { CHKiRet(doWriteInternal(pThis, pBuf, lenBuf)); } @@ -873,11 +866,9 @@ asyncWriterThread(void *pPtr) DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer"); } -DBGPRINTF("TTT: writer thread startup\n"); while(1) { /* loop broken inside */ d_pthread_mutex_lock(&pThis->mut); while(pThis->iCnt == 0) { -DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter); if(pThis->bStopWriter) { pthread_cond_signal(&pThis->isEmpty); d_pthread_mutex_unlock(&pThis->mut); @@ -889,7 +880,6 @@ DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter) iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; iWritten = pThis->asyncBuf[iDeq].lenBuf; doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, iWritten); - // doWriteCall(pThis, pThis->asyncBuf[iDeq].pBuf, &iWritten); // TODO: error check!!!!! 2009-07-06 --pThis->iCnt; @@ -900,7 +890,6 @@ DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter) } finalize_it: -DBGPRINTF("TTT: writer thread shutdown\n"); ENDfunc return NULL; /* to keep pthreads happy */ } @@ -1109,6 +1098,9 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c) if(pThis->bAsyncWrite) d_pthread_mutex_lock(&pThis->mut); + if(pThis->bDisabled) + ABORT_FINALIZE(RS_RET_STREAM_DISABLED); + /* if the buffer is full, we need to flush before we can write */ if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); @@ -1145,13 +1137,23 @@ finalize_it: } -/* write memory buffer to a stream object +/* write memory buffer to a stream object. + * process the data in chunks and copy it over to our buffer. The caller-provided data + * may theoritically be larger than our buffer. In that case, we do multiple copies. One + * may argue if it were more efficient to write out the caller-provided buffer in that case + * and earlier versions of rsyslog did this. However, this introduces a lot of complexity + * inside the buffered writer and potential performance bottlenecks when trying to solve + * it. Now keep in mind that we actually do (almost?) never have a case where the + * caller-provided buffer is larger than our one. So instead of optimizing a case + * which normally does not exist, we expect some degradation in its case but make us + * perform better in the regular cases. -- rgerhards, 2009-07-07 */ static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) { DEFiRet; - size_t iPartial; + size_t iWrite; + size_t iOffset; ASSERT(pThis != NULL); ASSERT(pBuf != NULL); @@ -1163,35 +1165,19 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n if(pThis->bDisabled) ABORT_FINALIZE(RS_RET_STREAM_DISABLED); - /* check if the to-be-written data is larger than our buffer size */ - if(lenBuf >= pThis->sIOBufSize) { - /* it is - so we do a direct write, that is most efficient. - * TODO: is it really? think about disk block sizes! - */ -printf("error: we should not have reached this code!\n"); - abort(); - CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */ - CHKiRet(strmSchedWrite(pThis, pBuf, lenBuf)); - } else { - /* data fits into a buffer - we just need to see if it - * fits into the current buffer... - */ - if(pThis->iBufPtr + lenBuf > pThis->sIOBufSize) { - /* nope, so we must split it */ - iPartial = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ - if(iPartial > 0) { /* the buffer was exactly full, can not write anything! */ - memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, iPartial); - pThis->iBufPtr += iPartial; - } + iOffset = 0; + do { + if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ - memcpy(pThis->pIOBuf, pBuf + iPartial, lenBuf - iPartial); - pThis->iBufPtr = lenBuf - iPartial; - } else { - /* we have space, so we simply copy over the string */ - memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf); - pThis->iBufPtr += lenBuf; } - } + iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ + if(iWrite > lenBuf) + iWrite = lenBuf; + memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf + iOffset, iWrite); + pThis->iBufPtr += iWrite; + iOffset += iWrite; + lenBuf -= iWrite; + } while(lenBuf > 0); finalize_it: if(pThis->bAsyncWrite) -- cgit v1.2.3 From f53aa966e1fad03c478de342f5a878e57405de13 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Jul 2009 12:09:41 +0200 Subject: solved a race condition --- runtime/stream.c | 63 +++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 49 insertions(+), 14 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index 92122215..eae36796 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -50,6 +50,8 @@ #include "module-template.h" #include +#define inline + /* static data */ DEFobjStaticHelpers DEFobjCurrIf(zlibw) @@ -267,6 +269,21 @@ finalize_it: } +/* wait for the output writer thread to be done. This must be called before actions + * that require data to be persisted. May be called in non-async mode and is a null + * operation than. Must be called with the mutex locked. + */ +static inline void +strmWaitAsyncWriterDone(strm_t *pThis) +{ + if(pThis->bAsyncWrite) { + /* awake writer thread and make it write out everything */ + pthread_cond_signal(&pThis->notEmpty); + d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut); + } +} + + /* close a strm file * Note that the bDeleteOnClose flag is honored. If it is set, the file will be * deleted after close. This is in support for the qRead thread. @@ -282,6 +299,8 @@ static rsRetVal strmCloseFile(strm_t *pThis) if(pThis->tOperationsMode != STREAMMODE_READ) strmFlush(pThis); + strmWaitAsyncWriterDone(pThis); + close(pThis->fd); pThis->fd = -1; @@ -618,16 +637,14 @@ finalize_it: /* stop the writer thread (we MUST be runnnig asynchronously when this method - * is called!) -- rgerhards, 2009-07-06 + * is called!). Note that the mutex must be locked! -- rgerhards, 2009-07-06 */ static inline void stopWriter(strm_t *pThis) { BEGINfunc - d_pthread_mutex_lock(&pThis->mut); pThis->bStopWriter = 1; pthread_cond_signal(&pThis->notEmpty); - d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut); d_pthread_mutex_unlock(&pThis->mut); pthread_join(pThis->writerThreadID, NULL); ENDfunc @@ -638,9 +655,14 @@ stopWriter(strm_t *pThis) BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */ int i; CODESTARTobjDestruct(strm) + if(pThis->bAsyncWrite) + /* Note: mutex will be unlocked in stopWriter! */ + d_pthread_mutex_lock(&pThis->mut); + if(pThis->tOperationsMode != STREAMMODE_READ) strmFlush(pThis); +dbgprintf("XXX: destruct stream %p\n", pThis); /* ... then free resources */ if(pThis->fd != -1) strmCloseFile(pThis); @@ -681,6 +703,9 @@ static rsRetVal strmCheckNextOutputFile(strm_t *pThis) if(pThis->fd == -1) FINALIZE; + /* wait for output to be empty, so that our counts are correct */ + strmWaitAsyncWriterDone(pThis); + if(pThis->iCurrOffs >= pThis->iMaxFileSize) { dbgoprint((obj_t*) pThis, "max file size %ld reached for %d, now %ld - starting new file\n", (long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs); @@ -783,7 +808,6 @@ doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) DEFiRet; ASSERT(pThis != NULL); - ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); if(pThis->iZipLevel) { CHKiRet(doZipWrite(pThis, pBuf, lenBuf)); @@ -811,15 +835,15 @@ doAsyncWriteInternal(strm_t *pThis, size_t lenBuf) DEFiRet; ISOBJ_TYPE_assert(pThis, strm); +dbgprintf("XXX: doAsyncWriteInternal: strm %p, len %ld\n", pThis, (long) lenBuf); while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) d_pthread_cond_wait(&pThis->notFull, &pThis->mut); pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf; - pThis->pIOBuf = pThis->asyncBuf[pThis->iEnq++ % STREAM_ASYNC_NUMBUFS].pBuf; + pThis->pIOBuf = pThis->asyncBuf[++pThis->iEnq % STREAM_ASYNC_NUMBUFS].pBuf; if(++pThis->iCnt == 1) pthread_cond_signal(&pThis->notEmpty); - d_pthread_mutex_unlock(&pThis->mut); finalize_it: RETiRet; @@ -836,7 +860,6 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) DEFiRet; ASSERT(pThis != NULL); - ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); if(pThis->bAsyncWrite) { CHKiRet(doAsyncWriteInternal(pThis, lenBuf)); @@ -844,6 +867,8 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) CHKiRet(doWriteInternal(pThis, pBuf, lenBuf)); } + pThis->iBufPtr = 0; /* we are at the begin of a new buffer */ + finalize_it: RETiRet; } @@ -857,7 +882,6 @@ static void* asyncWriterThread(void *pPtr) { int iDeq; - size_t iWritten; strm_t *pThis = (strm_t*) pPtr; ISOBJ_TYPE_assert(pThis, strm); @@ -870,7 +894,7 @@ asyncWriterThread(void *pPtr) d_pthread_mutex_lock(&pThis->mut); while(pThis->iCnt == 0) { if(pThis->bStopWriter) { - pthread_cond_signal(&pThis->isEmpty); + pthread_cond_broadcast(&pThis->isEmpty); d_pthread_mutex_unlock(&pThis->mut); goto finalize_it; /* break main loop */ } @@ -878,13 +902,14 @@ asyncWriterThread(void *pPtr) } iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; - iWritten = pThis->asyncBuf[iDeq].lenBuf; - doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, iWritten); - // TODO: error check!!!!! 2009-07-06 + doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf); + // TODO: error check????? 2009-07-06 --pThis->iCnt; if(pThis->iCnt < STREAM_ASYNC_NUMBUFS) { pthread_cond_signal(&pThis->notFull); + if(pThis->iCnt == 0) + pthread_cond_broadcast(&pThis->isEmpty); } d_pthread_mutex_unlock(&pThis->mut); } @@ -949,7 +974,6 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) iWritten = lenBuf; CHKiRet(doWriteCall(pThis, pBuf, &iWritten)); - pThis->iBufPtr = 0; pThis->iCurrOffs += iWritten; /* update user counter, if provided */ if(pThis->pUsrWCntr != NULL) @@ -1158,7 +1182,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) ASSERT(pThis != NULL); ASSERT(pBuf != NULL); -DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs); +//DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs); if(pThis->bAsyncWrite) d_pthread_mutex_lock(&pThis->mut); @@ -1167,9 +1191,11 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n iOffset = 0; do { +//dbgprintf("XXX: enter write loop\n"); if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ } +//dbgprintf("XXX: post strmFlush\n"); iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ if(iWrite > lenBuf) iWrite = lenBuf; @@ -1177,8 +1203,17 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n pThis->iBufPtr += iWrite; iOffset += iWrite; lenBuf -= iWrite; +//dbgprintf("XXX: after write , iBufPtr %d, iOffset %d, len %d, unwritten: '%20.20s'\n", pThis->iBufPtr, iOffset, lenBuf, pBuf + iOffset); } while(lenBuf > 0); + /* now check if the buffer right at the end of the write is full and, if so, + * write it. This seems more natural than waiting (hours?) for the next message... + */ + if(pThis->iBufPtr == pThis->sIOBufSize) { + CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ + } +//dbgprintf("XXX: done write loop, iBufPtr %d, iOffset %d, len %d\n", pThis->iBufPtr, iOffset, lenBuf); + finalize_it: if(pThis->bAsyncWrite) d_pthread_mutex_unlock(&pThis->mut); -- cgit v1.2.3 From 26227091faac8c3cc9bc282eb4e4fc408635f8d2 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Jul 2009 17:18:51 +0200 Subject: fixed a bug introduced today that lead to an abort in queue disk mode --- runtime/stream.c | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index eae36796..9f363257 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -276,11 +276,13 @@ finalize_it: static inline void strmWaitAsyncWriterDone(strm_t *pThis) { + BEGINfunc if(pThis->bAsyncWrite) { /* awake writer thread and make it write out everything */ pthread_cond_signal(&pThis->notEmpty); d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut); } + ENDfunc } @@ -296,10 +298,16 @@ static rsRetVal strmCloseFile(strm_t *pThis) ASSERT(pThis->fd != -1); dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd); - if(pThis->tOperationsMode != STREAMMODE_READ) - strmFlush(pThis); - - strmWaitAsyncWriterDone(pThis); + dbgCallStackPrintAll(); + if(!pThis->bInClose && pThis->tOperationsMode != STREAMMODE_READ) { + pThis->bInClose = 1; + if(pThis->bAsyncWrite) { + strmFlush(pThis); + } else { + strmWaitAsyncWriterDone(pThis); + } + pThis->bInClose = 0; + } close(pThis->fd); pThis->fd = -1; @@ -796,11 +804,6 @@ finalize_it: /* write memory buffer to a stream object. - * To support direct writes of large objects, this method may be called - * with a buffer pointing to some region other than the stream buffer itself. - * However, in that case the stream buffer must be empty (strmFlush() has to - * be called before), because we would otherwise mess up with the sequence - * inside the stream. -- rgerhards, 2008-01-10 */ static inline rsRetVal doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) -- cgit v1.2.3 From 5221a1e42e16c8c39b48a4a1a18ee6322c38cd17 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Jul 2009 18:33:00 +0200 Subject: added capability to write incomplete buffers after an inactivity timeout for the stream class and thus finally activating omfile's timeout capability in a useful way without polling and too-high performance overhead. --- runtime/stream.c | 46 +++++++++++++++++++++++++++++++++++++++------- 1 file changed, 39 insertions(+), 7 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index 9f363257..a0571a61 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -298,7 +298,6 @@ static rsRetVal strmCloseFile(strm_t *pThis) ASSERT(pThis->fd != -1); dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd); - dbgCallStackPrintAll(); if(!pThis->bInClose && pThis->tOperationsMode != STREAMMODE_READ) { pThis->bInClose = 1; if(pThis->bAsyncWrite) { @@ -845,6 +844,7 @@ dbgprintf("XXX: doAsyncWriteInternal: strm %p, len %ld\n", pThis, (long) lenBuf) pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf; pThis->pIOBuf = pThis->asyncBuf[++pThis->iEnq % STREAM_ASYNC_NUMBUFS].pBuf; + pThis->bDoTimedWait = 0; /* everything written, no need to timeout partial buffer writes */ if(++pThis->iCnt == 1) pthread_cond_signal(&pThis->notEmpty); @@ -885,6 +885,8 @@ static void* asyncWriterThread(void *pPtr) { int iDeq; + struct timespec t; + bool bTimedOut = 0; strm_t *pThis = (strm_t*) pPtr; ISOBJ_TYPE_assert(pThis, strm); @@ -901,9 +903,35 @@ asyncWriterThread(void *pPtr) d_pthread_mutex_unlock(&pThis->mut); goto finalize_it; /* break main loop */ } - d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut); + if(bTimedOut && pThis->iBufPtr > 0) { +RUNLOG_STR("XXX: we had a timeout in stream writer"); + /* if we timed out, we need to flush pending data */ + strmFlush(pThis); + bTimedOut = 0; + continue; /* now we should have data */ + } + bTimedOut = 0; + timeoutComp(&t, pThis->iFlushInterval * 2000); /* *1000 millisconds */ + if(pThis->bDoTimedWait) { + if(pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t) != 0) { + int err = errno; + if(err == ETIMEDOUT) { + bTimedOut = 1; + } else { + bTimedOut = 1; + char errStr[1024]; + rs_strerror_r(err, errStr, sizeof(errStr)); + DBGPRINTF("stream async writer timeout with error (%d): %s - ignoring\n", + err, errStr); + } + } + } else { + d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut); + } } + bTimedOut = 0; /* we may have timed out, but there *is* work to do... */ + iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf); // TODO: error check????? 2009-07-06 @@ -1194,11 +1222,9 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) iOffset = 0; do { -//dbgprintf("XXX: enter write loop\n"); if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ } -//dbgprintf("XXX: post strmFlush\n"); iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ if(iWrite > lenBuf) iWrite = lenBuf; @@ -1206,7 +1232,6 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) pThis->iBufPtr += iWrite; iOffset += iWrite; lenBuf -= iWrite; -//dbgprintf("XXX: after write , iBufPtr %d, iOffset %d, len %d, unwritten: '%20.20s'\n", pThis->iBufPtr, iOffset, lenBuf, pBuf + iOffset); } while(lenBuf > 0); /* now check if the buffer right at the end of the write is full and, if so, @@ -1215,11 +1240,18 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ } -//dbgprintf("XXX: done write loop, iBufPtr %d, iOffset %d, len %d\n", pThis->iBufPtr, iOffset, lenBuf); finalize_it: - if(pThis->bAsyncWrite) + if(pThis->bAsyncWrite) { + if(pThis->bDoTimedWait == 0) { + /* we potentially have a partial buffer, so re-activate the + * writer thread that it can set and pick up timeouts. + */ + pThis->bDoTimedWait = 1; + pthread_cond_signal(&pThis->notEmpty); + } d_pthread_mutex_unlock(&pThis->mut); + } RETiRet; } -- cgit v1.2.3