summaryrefslogtreecommitdiffstats
path: root/runtime/stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/stream.c')
-rw-r--r--runtime/stream.c28
1 files changed, 17 insertions, 11 deletions
diff --git a/runtime/stream.c b/runtime/stream.c
index 9f4d3556..3e890c71 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -1032,17 +1032,20 @@ asyncWriterThread(void *pPtr)
sbool bTimedOut = 0;
strm_t *pThis = (strm_t*) pPtr;
int err;
+ uchar thrdName[256] = "rs:";
ISOBJ_TYPE_assert(pThis, strm);
BEGINfunc
+ ustrncpy(thrdName+3, pThis->pszFName, sizeof(thrdName)-4);
+ dbgOutputTID((char*)thrdName);
# if HAVE_PRCTL && defined PR_SET_NAME
- if(prctl(PR_SET_NAME, "rs:asyn strmwr", 0, 0, 0) != 0) {
+ if(prctl(PR_SET_NAME, (char*)thrdName, 0, 0, 0) != 0) {
DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer");
}
# endif
+ d_pthread_mutex_lock(&pThis->mut);
while(1) { /* loop broken inside */
- d_pthread_mutex_lock(&pThis->mut);
while(pThis->iCnt == 0) {
if(pThis->bStopWriter) {
pthread_cond_broadcast(&pThis->isEmpty);
@@ -1053,16 +1056,15 @@ asyncWriterThread(void *pPtr)
/* if we timed out, we need to flush pending data */
strmFlushInternal(pThis, 0);
bTimedOut = 0;
- continue; /* now we should have data */
+ d_pthread_mutex_unlock(&pThis->mut);
+ continue;
}
bTimedOut = 0;
timeoutComp(&t, pThis->iFlushInterval * 1000); /* *1000 millisconds */
if(pThis->bDoTimedWait) {
if((err = pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t)) != 0) {
- if(err == ETIMEDOUT) {
- bTimedOut = 1;
- } else {
- bTimedOut = 1;
+ bTimedOut = 1; /* simulate in any case */
+ if(err != ETIMEDOUT) {
char errStr[1024];
rs_strerror_r(err, errStr, sizeof(errStr));
DBGPRINTF("stream async writer timeout with error (%d): %s - ignoring\n",
@@ -1077,8 +1079,12 @@ asyncWriterThread(void *pPtr)
bTimedOut = 0; /* we may have timed out, but there *is* work to do... */
iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS;
+
+ /* now we can do the actual write in parallel */
+ d_pthread_mutex_unlock(&pThis->mut);
doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf, 0); // TODO: flush state
// TODO: error check????? 2009-07-06
+ d_pthread_mutex_lock(&pThis->mut);
--pThis->iCnt;
if(pThis->iCnt < STREAM_ASYNC_NUMBUFS) {
@@ -1086,8 +1092,8 @@ asyncWriterThread(void *pPtr)
if(pThis->iCnt == 0)
pthread_cond_broadcast(&pThis->isEmpty);
}
- d_pthread_mutex_unlock(&pThis->mut);
}
+ d_pthread_mutex_unlock(&pThis->mut);
finalize_it:
ENDfunc
@@ -1487,12 +1493,12 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
ASSERT(pBuf != NULL);
/* DEV DEBUG ONLY DBGPRINTF("strmWrite(%p[%s], '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pThis->pszCurrFName, pBuf,(long) lenBuf, pThis->bDisabled, (long) pThis->iSizeLimit, (long long) pThis->iCurrOffs); */
- if(pThis->bAsyncWrite)
- d_pthread_mutex_lock(&pThis->mut);
-
if(pThis->bDisabled)
ABORT_FINALIZE(RS_RET_STREAM_DISABLED);
+ if(pThis->bAsyncWrite)
+ d_pthread_mutex_lock(&pThis->mut);
+
iOffset = 0;
do {
if(pThis->iBufPtr == pThis->sIOBufSize) {