diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/stream.c | 28 |
1 files changed, 17 insertions, 11 deletions
diff --git a/runtime/stream.c b/runtime/stream.c index 9f4d3556..3e890c71 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -1032,17 +1032,20 @@ asyncWriterThread(void *pPtr) sbool bTimedOut = 0; strm_t *pThis = (strm_t*) pPtr; int err; + uchar thrdName[256] = "rs:"; ISOBJ_TYPE_assert(pThis, strm); BEGINfunc + ustrncpy(thrdName+3, pThis->pszFName, sizeof(thrdName)-4); + dbgOutputTID((char*)thrdName); # if HAVE_PRCTL && defined PR_SET_NAME - if(prctl(PR_SET_NAME, "rs:asyn strmwr", 0, 0, 0) != 0) { + if(prctl(PR_SET_NAME, (char*)thrdName, 0, 0, 0) != 0) { DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer"); } # endif + d_pthread_mutex_lock(&pThis->mut); while(1) { /* loop broken inside */ - d_pthread_mutex_lock(&pThis->mut); while(pThis->iCnt == 0) { if(pThis->bStopWriter) { pthread_cond_broadcast(&pThis->isEmpty); @@ -1053,16 +1056,15 @@ asyncWriterThread(void *pPtr) /* if we timed out, we need to flush pending data */ strmFlushInternal(pThis, 0); bTimedOut = 0; - continue; /* now we should have data */ + d_pthread_mutex_unlock(&pThis->mut); + continue; } bTimedOut = 0; timeoutComp(&t, pThis->iFlushInterval * 1000); /* *1000 millisconds */ if(pThis->bDoTimedWait) { if((err = pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t)) != 0) { - if(err == ETIMEDOUT) { - bTimedOut = 1; - } else { - bTimedOut = 1; + bTimedOut = 1; /* simulate in any case */ + if(err != ETIMEDOUT) { char errStr[1024]; rs_strerror_r(err, errStr, sizeof(errStr)); DBGPRINTF("stream async writer timeout with error (%d): %s - ignoring\n", @@ -1077,8 +1079,12 @@ asyncWriterThread(void *pPtr) bTimedOut = 0; /* we may have timed out, but there *is* work to do... */ iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; + + /* now we can do the actual write in parallel */ + d_pthread_mutex_unlock(&pThis->mut); doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf, 0); // TODO: flush state // TODO: error check????? 2009-07-06 + d_pthread_mutex_lock(&pThis->mut); --pThis->iCnt; if(pThis->iCnt < STREAM_ASYNC_NUMBUFS) { @@ -1086,8 +1092,8 @@ asyncWriterThread(void *pPtr) if(pThis->iCnt == 0) pthread_cond_broadcast(&pThis->isEmpty); } - d_pthread_mutex_unlock(&pThis->mut); } + d_pthread_mutex_unlock(&pThis->mut); finalize_it: ENDfunc @@ -1487,12 +1493,12 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) ASSERT(pBuf != NULL); /* DEV DEBUG ONLY DBGPRINTF("strmWrite(%p[%s], '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pThis->pszCurrFName, pBuf,(long) lenBuf, pThis->bDisabled, (long) pThis->iSizeLimit, (long long) pThis->iCurrOffs); */ - if(pThis->bAsyncWrite) - d_pthread_mutex_lock(&pThis->mut); - if(pThis->bDisabled) ABORT_FINALIZE(RS_RET_STREAM_DISABLED); + if(pThis->bAsyncWrite) + d_pthread_mutex_lock(&pThis->mut); + iOffset = 0; do { if(pThis->iBufPtr == pThis->sIOBufSize) { |