diff options
-rw-r--r-- | ChangeLog | 5 | ||||
-rw-r--r-- | obj.c | 8 | ||||
-rw-r--r-- | queue.c | 62 | ||||
-rwxr-xr-x | srUtils.c | 2 | ||||
-rwxr-xr-x | srUtils.h | 2 | ||||
-rw-r--r-- | stream.c | 30 | ||||
-rw-r--r-- | stream.h | 2 |
7 files changed, 83 insertions, 28 deletions
@@ -22,6 +22,11 @@ Version 3.11.6 (rgerhards), 2008-02-?? regular worker was still executing. This could result in a segfault during shutdown. tracker: http://bugzilla.adiscon.com/show_bug.cgi?id=41 +- bugfix: queue properties sizeOnDisk, bytesRead were persisted to + disk with wrong data type (long instead of int64) - could cause + problems on 32 bit machines +- bugfix: queue aborted when it was shut down, DA-enabled, DA mode + was just initiated but not fully initialized (a race condition) - bugfix: imfile could abort under extreme stress conditions (when it was terminated before it could open all of its to be monitored files) @@ -262,6 +262,12 @@ rsRetVal objSerializeProp(strm_t *pStrm, uchar *pszPropName, propType_t propType lenBuf = strlen((char*) szBuf); vType = VARTYPE_NUMBER; break; + case PROPTYPE_INT64: + CHKiRet(srUtilItoA((char*) szBuf, sizeof(szBuf), *((int64*) pUsr))); + pszBuf = szBuf; + lenBuf = strlen((char*) szBuf); + vType = VARTYPE_NUMBER; + break; case PROPTYPE_CSTR: pszBuf = rsCStrGetSzStrNoNULL((cstr_t *) pUsr); lenBuf = rsCStrLen((cstr_t*) pUsr); @@ -287,7 +293,7 @@ rsRetVal objSerializeProp(strm_t *pStrm, uchar *pszPropName, propType_t propType pszBuf = szBuf; break; default: - dbgprintf("invalid VARTYPE %d\n", propType); + dbgprintf("invalid PROPTYPE %d\n", propType); break; } @@ -119,6 +119,26 @@ static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis) } +/* wait until we have a fully initialized DA queue. Sometimes, we need to + * sync with it, as we expect it for some function. + * rgerhards, 2008-02-27 + */ +static rsRetVal +queueWaitDAModeInitialized(queue_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + ASSERT(pThis->bRunsDA); + + while(pThis->bRunsDA != 2) { + d_pthread_cond_wait(&pThis->condDAReady, pThis->mut); + } + + RETiRet; +} + + /* Destruct DA queue. This is the last part of DA-to-normal-mode * transistion. This is called asynchronously and some time quite a * while after the actual transistion. The key point is that we need to @@ -140,9 +160,8 @@ queueTurnOffDAMode(queue_t *pThis) /* at this point, we need a fully initialized DA queue. So if it isn't, we finally need * to wait for its startup... -- rgerhards, 2008-01-25 */ - while(pThis->bRunsDA != 2) { - d_pthread_cond_wait(&pThis->condDAReady, pThis->mut); - } + queueWaitDAModeInitialized(pThis); + /* if we need to pull any data that we still need from the (child) disk queue, * now would be the time to do so. At present, we do not need this, but I'd like to * keep that comment if future need arises. @@ -278,7 +297,7 @@ queueStartDA(queue_t *pThis) pThis->bRunsDA = 2; /* we are now in DA mode, but not fully initialized */ pThis->bChildIsDone = 0;/* set to 1 when child's worker detect queue is finished */ - pthread_cond_signal(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */ + pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */ dbgoprint((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n", queueGetID(pThis->pqDA)); @@ -816,28 +835,19 @@ static rsRetVal qDestructDisk(queue_t *pThis) static rsRetVal qAddDisk(queue_t *pThis, void* pUsr) { DEFiRet; - int64 offsIn; - int64 offsOut; + number_t nWriteCount; ASSERT(pThis != NULL); - CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pWrite, &offsIn)); + CHKiRet(strmSetWCntr(pThis->tVars.disk.pWrite, &nWriteCount)); CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite)); CHKiRet(strmFlush(pThis->tVars.disk.pWrite)); - CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pWrite, &offsOut)); + CHKiRet(strmSetWCntr(pThis->tVars.disk.pWrite, NULL)); /* no more counting for now... */ - if(offsIn < offsOut) { - offsIn = offsOut - offsIn; - } else { - /* we had a file switch, so the second offset is the actual number of bytes - * written. So... - */ - offsIn = offsOut; - } + pThis->tVars.disk.sizeOnDisk += nWriteCount; - pThis->tVars.disk.sizeOnDisk += offsIn; - - dbgoprint((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets\n", offsIn, pThis->tVars.disk.sizeOnDisk); + dbgoprint((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets\n", + nWriteCount, pThis->tVars.disk.sizeOnDisk); finalize_it: RETiRet; @@ -1109,6 +1119,10 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) */ wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); /* set primary queue to shutdown only */ + /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */ + if(pThis->bRunsDA) + queueWaitDAModeInitialized(pThis); + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ /* optimize parameters for shutdown of DA-enabled queues */ if(pThis->bIsDA && queueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { @@ -1116,6 +1130,9 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) if(pThis->bRunsDA == 0) { queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */ } else { + /* TODO: RACE: we may reach this point when the DA worker has been initialized (state 1) + * but is not yet running (state 2). In this case, pThis->pqDA is NULL! rgerhards, 2008-02-27 + */ queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */ } END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -1302,7 +1319,6 @@ queueConsumerCancelCleanup(void *arg1, void *arg2) ISOBJ_TYPE_assert(pThis, queue); -RUNLOG_VAR("%p", pUsr); if(pUsr != NULL) { /* make sure the data element is not lost */ dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n"); @@ -1746,8 +1762,8 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint) CHKiRet(objBeginSerializePropBag(psQIF, (obj_t*) pThis)); objSerializeSCALAR(psQIF, iQueueSize, INT); objSerializeSCALAR(psQIF, iUngottenObjs, INT); - objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, LONG); - objSerializeSCALAR(psQIF, tVars.disk.bytesRead, LONG); + objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, INT64); + objSerializeSCALAR(psQIF, tVars.disk.bytesRead, INT64); CHKiRet(objEndSerialize(psQIF)); /* now we must persist all objects on the ungotten queue - they can not go to @@ -1837,9 +1853,7 @@ CODESTARTobjDestruct(queue) * no WtpDA associated with it - which is perfectly legal thanks to this code here. */ if(pThis->pWtpDA != NULL) { -RUNLOG_STR("wtpDA is being destructed\n"); wtpDestruct(&pThis->pWtpDA); -RUNLOG_STR("wtpDA is being destructed - done\n"); } if(pThis->pqDA != NULL) { queueDestruct(&pThis->pqDA); @@ -107,7 +107,7 @@ syslogName_t syslogFacNames[] = { * public members * * ################################################################# */ -rsRetVal srUtilItoA(char *pBuf, int iLenBuf, long iToConv) +rsRetVal srUtilItoA(char *pBuf, int iLenBuf, number_t iToConv) { int i; int bIsNegative; @@ -59,7 +59,7 @@ extern syslogName_t syslogFacNames[]; * * \param iToConv The integer to be converted. */ -rsRetVal srUtilItoA(char *pBuf, int iLenBuf, long iToConv); +rsRetVal srUtilItoA(char *pBuf, int iLenBuf, number_t iToConv); /** * A method to duplicate a string for which the length is known. @@ -464,7 +464,7 @@ static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) CHKiRet(strmOpenFile(pThis)); iWritten = write(pThis->fd, pBuf, lenBuf); - dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes, errno: %d\n", pThis->fd, iWritten, errno); + dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, iWritten); /* TODO: handle error case -- rgerhards, 2008-01-07 */ /* Now indicate buffer empty again. We do this in any case, because there @@ -477,6 +477,9 @@ static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) */ pThis->iBufPtr = 0; pThis->iCurrOffs += iWritten; + /* update user counter, if provided */ + if(pThis->pUsrWCntr != NULL) + *pThis->pUsrWCntr += iWritten; if(pThis->sType == STREAMTYPE_FILE_CIRCULAR) CHKiRet(strmCheckNextOutputFile(pThis)); @@ -797,6 +800,31 @@ finalize_it: } + +/* set a user write-counter. This counter is initialized to zero and + * receives the number of bytes written. It is accurate only after a + * flush(). This hook is provided as a means to control disk size usage. + * The pointer must be valid at all times (so if it is on the stack, be sure + * to remove it when you exit the function). Pointers are removed by + * calling strmSetWCntr() with a NULL param. Only one pointer is settable, + * any new set overwrites the previous one. + * rgerhards, 2008-02-27 + */ +rsRetVal +strmSetWCntr(strm_t *pThis, number_t *pWCnt) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, strm); + + if(pWCnt != NULL) + *pWCnt = 0; + pThis->pUsrWCntr = pWCnt; + + RETiRet; +} + + #include "stringbuf.h" /* This function can be used as a generic way to set properties. @@ -76,6 +76,7 @@ typedef struct strm_s { int iFileNumDigits;/* min number of digits to use in file number (only in circular mode) */ int bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */ int64 iCurrOffs;/* current offset */ + int64 *pUsrWCntr; /* NULL or a user-provided counter that receives the nbr of bytes written since the last CntrSet() */ /* dynamic properties, valid only during file open, not to be persistet */ size_t sIOBufSize;/* size of IO buffer */ uchar *pszDir; /* Directory */ @@ -110,6 +111,7 @@ rsRetVal strmRecordEnd(strm_t *pThis); rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm); rsRetVal strmSetiAddtlOpenFlags(strm_t *pThis, int iNewVal); rsRetVal strmGetCurrOffset(strm_t *pThis, int64 *pOffs); +rsRetVal strmSetWCntr(strm_t *pThis, number_t *pWCnt); PROTOTYPEObjClassInit(strm); PROTOTYPEpropSetMeth(strm, bDeleteOnClose, int); PROTOTYPEpropSetMeth(strm, iMaxFileSize, int); |