summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog2
-rw-r--r--runtime/stream.c28
-rw-r--r--tools/omfile.c7
3 files changed, 25 insertions, 12 deletions
diff --git a/ChangeLog b/ChangeLog
index adbabeb9..d5b040bc 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,7 @@
---------------------------------------------------------------------------
Version 7.3.6 [devel] 2012-12-??
+- omfile: improved async writing, finally enabled full async write
+ also fixed a couple of smaller issues along that way
- impstats: added ability to write stats records to local file
and avoid going through the syslog log stream. syslog logging can now
also be turned off (see doc for details).
diff --git a/runtime/stream.c b/runtime/stream.c
index 9f4d3556..3e890c71 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -1032,17 +1032,20 @@ asyncWriterThread(void *pPtr)
sbool bTimedOut = 0;
strm_t *pThis = (strm_t*) pPtr;
int err;
+ uchar thrdName[256] = "rs:";
ISOBJ_TYPE_assert(pThis, strm);
BEGINfunc
+ ustrncpy(thrdName+3, pThis->pszFName, sizeof(thrdName)-4);
+ dbgOutputTID((char*)thrdName);
# if HAVE_PRCTL && defined PR_SET_NAME
- if(prctl(PR_SET_NAME, "rs:asyn strmwr", 0, 0, 0) != 0) {
+ if(prctl(PR_SET_NAME, (char*)thrdName, 0, 0, 0) != 0) {
DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer");
}
# endif
+ d_pthread_mutex_lock(&pThis->mut);
while(1) { /* loop broken inside */
- d_pthread_mutex_lock(&pThis->mut);
while(pThis->iCnt == 0) {
if(pThis->bStopWriter) {
pthread_cond_broadcast(&pThis->isEmpty);
@@ -1053,16 +1056,15 @@ asyncWriterThread(void *pPtr)
/* if we timed out, we need to flush pending data */
strmFlushInternal(pThis, 0);
bTimedOut = 0;
- continue; /* now we should have data */
+ d_pthread_mutex_unlock(&pThis->mut);
+ continue;
}
bTimedOut = 0;
timeoutComp(&t, pThis->iFlushInterval * 1000); /* *1000 millisconds */
if(pThis->bDoTimedWait) {
if((err = pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t)) != 0) {
- if(err == ETIMEDOUT) {
- bTimedOut = 1;
- } else {
- bTimedOut = 1;
+ bTimedOut = 1; /* simulate in any case */
+ if(err != ETIMEDOUT) {
char errStr[1024];
rs_strerror_r(err, errStr, sizeof(errStr));
DBGPRINTF("stream async writer timeout with error (%d): %s - ignoring\n",
@@ -1077,8 +1079,12 @@ asyncWriterThread(void *pPtr)
bTimedOut = 0; /* we may have timed out, but there *is* work to do... */
iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS;
+
+ /* now we can do the actual write in parallel */
+ d_pthread_mutex_unlock(&pThis->mut);
doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf, 0); // TODO: flush state
// TODO: error check????? 2009-07-06
+ d_pthread_mutex_lock(&pThis->mut);
--pThis->iCnt;
if(pThis->iCnt < STREAM_ASYNC_NUMBUFS) {
@@ -1086,8 +1092,8 @@ asyncWriterThread(void *pPtr)
if(pThis->iCnt == 0)
pthread_cond_broadcast(&pThis->isEmpty);
}
- d_pthread_mutex_unlock(&pThis->mut);
}
+ d_pthread_mutex_unlock(&pThis->mut);
finalize_it:
ENDfunc
@@ -1487,12 +1493,12 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
ASSERT(pBuf != NULL);
/* DEV DEBUG ONLY DBGPRINTF("strmWrite(%p[%s], '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pThis->pszCurrFName, pBuf,(long) lenBuf, pThis->bDisabled, (long) pThis->iSizeLimit, (long long) pThis->iCurrOffs); */
- if(pThis->bAsyncWrite)
- d_pthread_mutex_lock(&pThis->mut);
-
if(pThis->bDisabled)
ABORT_FINALIZE(RS_RET_STREAM_DISABLED);
+ if(pThis->bAsyncWrite)
+ d_pthread_mutex_lock(&pThis->mut);
+
iOffset = 0;
do {
if(pThis->iBufPtr == pThis->sIOBufSize) {
diff --git a/tools/omfile.c b/tools/omfile.c
index bcc10135..1217d867 100644
--- a/tools/omfile.c
+++ b/tools/omfile.c
@@ -847,7 +847,12 @@ BEGINendTransaction
CODESTARTendTransaction
/* Note: pStrm may be NULL if there was an error opening the stream */
if(pData->bFlushOnTXEnd && pData->pStrm != NULL) {
- CHKiRet(strm.Flush(pData->pStrm));
+ /* if we have an async writer, it controls the flush via
+ * a timeout. However, without it, we actually need to flush,
+ * else incomplete records are written.
+ */
+ if(!pData->bUseAsyncWriter)
+ CHKiRet(strm.Flush(pData->pStrm));
}
finalize_it:
ENDendTransaction