summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog5
-rw-r--r--obj.c8
-rw-r--r--queue.c62
-rwxr-xr-xsrUtils.c2
-rwxr-xr-xsrUtils.h2
-rw-r--r--stream.c30
-rw-r--r--stream.h2
7 files changed, 83 insertions, 28 deletions
diff --git a/ChangeLog b/ChangeLog
index 2508b4d4..3044d3e9 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -22,6 +22,11 @@ Version 3.11.6 (rgerhards), 2008-02-??
regular worker was still executing. This could result in a segfault
during shutdown.
tracker: http://bugzilla.adiscon.com/show_bug.cgi?id=41
+- bugfix: queue properties sizeOnDisk, bytesRead were persisted to
+ disk with wrong data type (long instead of int64) - could cause
+ problems on 32 bit machines
+- bugfix: queue aborted when it was shut down, DA-enabled, DA mode
+ was just initiated but not fully initialized (a race condition)
- bugfix: imfile could abort under extreme stress conditions
(when it was terminated before it could open all of its
to be monitored files)
diff --git a/obj.c b/obj.c
index 29fc65ec..c18bdd4d 100644
--- a/obj.c
+++ b/obj.c
@@ -262,6 +262,12 @@ rsRetVal objSerializeProp(strm_t *pStrm, uchar *pszPropName, propType_t propType
lenBuf = strlen((char*) szBuf);
vType = VARTYPE_NUMBER;
break;
+ case PROPTYPE_INT64:
+ CHKiRet(srUtilItoA((char*) szBuf, sizeof(szBuf), *((int64*) pUsr)));
+ pszBuf = szBuf;
+ lenBuf = strlen((char*) szBuf);
+ vType = VARTYPE_NUMBER;
+ break;
case PROPTYPE_CSTR:
pszBuf = rsCStrGetSzStrNoNULL((cstr_t *) pUsr);
lenBuf = rsCStrLen((cstr_t*) pUsr);
@@ -287,7 +293,7 @@ rsRetVal objSerializeProp(strm_t *pStrm, uchar *pszPropName, propType_t propType
pszBuf = szBuf;
break;
default:
- dbgprintf("invalid VARTYPE %d\n", propType);
+ dbgprintf("invalid PROPTYPE %d\n", propType);
break;
}
diff --git a/queue.c b/queue.c
index 25261751..cd10a5df 100644
--- a/queue.c
+++ b/queue.c
@@ -119,6 +119,26 @@ static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis)
}
+/* wait until we have a fully initialized DA queue. Sometimes, we need to
+ * sync with it, as we expect it for some function.
+ * rgerhards, 2008-02-27
+ */
+static rsRetVal
+queueWaitDAModeInitialized(queue_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ ASSERT(pThis->bRunsDA);
+
+ while(pThis->bRunsDA != 2) {
+ d_pthread_cond_wait(&pThis->condDAReady, pThis->mut);
+ }
+
+ RETiRet;
+}
+
+
/* Destruct DA queue. This is the last part of DA-to-normal-mode
* transistion. This is called asynchronously and some time quite a
* while after the actual transistion. The key point is that we need to
@@ -140,9 +160,8 @@ queueTurnOffDAMode(queue_t *pThis)
/* at this point, we need a fully initialized DA queue. So if it isn't, we finally need
* to wait for its startup... -- rgerhards, 2008-01-25
*/
- while(pThis->bRunsDA != 2) {
- d_pthread_cond_wait(&pThis->condDAReady, pThis->mut);
- }
+ queueWaitDAModeInitialized(pThis);
+
/* if we need to pull any data that we still need from the (child) disk queue,
* now would be the time to do so. At present, we do not need this, but I'd like to
* keep that comment if future need arises.
@@ -278,7 +297,7 @@ queueStartDA(queue_t *pThis)
pThis->bRunsDA = 2; /* we are now in DA mode, but not fully initialized */
pThis->bChildIsDone = 0;/* set to 1 when child's worker detect queue is finished */
- pthread_cond_signal(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */
+ pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */
dbgoprint((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n",
queueGetID(pThis->pqDA));
@@ -816,28 +835,19 @@ static rsRetVal qDestructDisk(queue_t *pThis)
static rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
{
DEFiRet;
- int64 offsIn;
- int64 offsOut;
+ number_t nWriteCount;
ASSERT(pThis != NULL);
- CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pWrite, &offsIn));
+ CHKiRet(strmSetWCntr(pThis->tVars.disk.pWrite, &nWriteCount));
CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite));
CHKiRet(strmFlush(pThis->tVars.disk.pWrite));
- CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pWrite, &offsOut));
+ CHKiRet(strmSetWCntr(pThis->tVars.disk.pWrite, NULL)); /* no more counting for now... */
- if(offsIn < offsOut) {
- offsIn = offsOut - offsIn;
- } else {
- /* we had a file switch, so the second offset is the actual number of bytes
- * written. So...
- */
- offsIn = offsOut;
- }
+ pThis->tVars.disk.sizeOnDisk += nWriteCount;
- pThis->tVars.disk.sizeOnDisk += offsIn;
-
- dbgoprint((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets\n", offsIn, pThis->tVars.disk.sizeOnDisk);
+ dbgoprint((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets\n",
+ nWriteCount, pThis->tVars.disk.sizeOnDisk);
finalize_it:
RETiRet;
@@ -1109,6 +1119,10 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
*/
wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); /* set primary queue to shutdown only */
+ /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */
+ if(pThis->bRunsDA)
+ queueWaitDAModeInitialized(pThis);
+
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
/* optimize parameters for shutdown of DA-enabled queues */
if(pThis->bIsDA && queueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
@@ -1116,6 +1130,9 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
if(pThis->bRunsDA == 0) {
queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
} else {
+ /* TODO: RACE: we may reach this point when the DA worker has been initialized (state 1)
+ * but is not yet running (state 2). In this case, pThis->pqDA is NULL! rgerhards, 2008-02-27
+ */
queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */
}
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
@@ -1302,7 +1319,6 @@ queueConsumerCancelCleanup(void *arg1, void *arg2)
ISOBJ_TYPE_assert(pThis, queue);
-RUNLOG_VAR("%p", pUsr);
if(pUsr != NULL) {
/* make sure the data element is not lost */
dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n");
@@ -1746,8 +1762,8 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint)
CHKiRet(objBeginSerializePropBag(psQIF, (obj_t*) pThis));
objSerializeSCALAR(psQIF, iQueueSize, INT);
objSerializeSCALAR(psQIF, iUngottenObjs, INT);
- objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, LONG);
- objSerializeSCALAR(psQIF, tVars.disk.bytesRead, LONG);
+ objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, INT64);
+ objSerializeSCALAR(psQIF, tVars.disk.bytesRead, INT64);
CHKiRet(objEndSerialize(psQIF));
/* now we must persist all objects on the ungotten queue - they can not go to
@@ -1837,9 +1853,7 @@ CODESTARTobjDestruct(queue)
* no WtpDA associated with it - which is perfectly legal thanks to this code here.
*/
if(pThis->pWtpDA != NULL) {
-RUNLOG_STR("wtpDA is being destructed\n");
wtpDestruct(&pThis->pWtpDA);
-RUNLOG_STR("wtpDA is being destructed - done\n");
}
if(pThis->pqDA != NULL) {
queueDestruct(&pThis->pqDA);
diff --git a/srUtils.c b/srUtils.c
index d20f1174..b871861b 100755
--- a/srUtils.c
+++ b/srUtils.c
@@ -107,7 +107,7 @@ syslogName_t syslogFacNames[] = {
* public members *
* ################################################################# */
-rsRetVal srUtilItoA(char *pBuf, int iLenBuf, long iToConv)
+rsRetVal srUtilItoA(char *pBuf, int iLenBuf, number_t iToConv)
{
int i;
int bIsNegative;
diff --git a/srUtils.h b/srUtils.h
index 6540b15a..1fe56665 100755
--- a/srUtils.h
+++ b/srUtils.h
@@ -59,7 +59,7 @@ extern syslogName_t syslogFacNames[];
*
* \param iToConv The integer to be converted.
*/
-rsRetVal srUtilItoA(char *pBuf, int iLenBuf, long iToConv);
+rsRetVal srUtilItoA(char *pBuf, int iLenBuf, number_t iToConv);
/**
* A method to duplicate a string for which the length is known.
diff --git a/stream.c b/stream.c
index 0f05c97c..9f2f06c6 100644
--- a/stream.c
+++ b/stream.c
@@ -464,7 +464,7 @@ static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
CHKiRet(strmOpenFile(pThis));
iWritten = write(pThis->fd, pBuf, lenBuf);
- dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes, errno: %d\n", pThis->fd, iWritten, errno);
+ dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, iWritten);
/* TODO: handle error case -- rgerhards, 2008-01-07 */
/* Now indicate buffer empty again. We do this in any case, because there
@@ -477,6 +477,9 @@ static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
*/
pThis->iBufPtr = 0;
pThis->iCurrOffs += iWritten;
+ /* update user counter, if provided */
+ if(pThis->pUsrWCntr != NULL)
+ *pThis->pUsrWCntr += iWritten;
if(pThis->sType == STREAMTYPE_FILE_CIRCULAR)
CHKiRet(strmCheckNextOutputFile(pThis));
@@ -797,6 +800,31 @@ finalize_it:
}
+
+/* set a user write-counter. This counter is initialized to zero and
+ * receives the number of bytes written. It is accurate only after a
+ * flush(). This hook is provided as a means to control disk size usage.
+ * The pointer must be valid at all times (so if it is on the stack, be sure
+ * to remove it when you exit the function). Pointers are removed by
+ * calling strmSetWCntr() with a NULL param. Only one pointer is settable,
+ * any new set overwrites the previous one.
+ * rgerhards, 2008-02-27
+ */
+rsRetVal
+strmSetWCntr(strm_t *pThis, number_t *pWCnt)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, strm);
+
+ if(pWCnt != NULL)
+ *pWCnt = 0;
+ pThis->pUsrWCntr = pWCnt;
+
+ RETiRet;
+}
+
+
#include "stringbuf.h"
/* This function can be used as a generic way to set properties.
diff --git a/stream.h b/stream.h
index dd8204e2..903bb284 100644
--- a/stream.h
+++ b/stream.h
@@ -76,6 +76,7 @@ typedef struct strm_s {
int iFileNumDigits;/* min number of digits to use in file number (only in circular mode) */
int bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */
int64 iCurrOffs;/* current offset */
+ int64 *pUsrWCntr; /* NULL or a user-provided counter that receives the nbr of bytes written since the last CntrSet() */
/* dynamic properties, valid only during file open, not to be persistet */
size_t sIOBufSize;/* size of IO buffer */
uchar *pszDir; /* Directory */
@@ -110,6 +111,7 @@ rsRetVal strmRecordEnd(strm_t *pThis);
rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm);
rsRetVal strmSetiAddtlOpenFlags(strm_t *pThis, int iNewVal);
rsRetVal strmGetCurrOffset(strm_t *pThis, int64 *pOffs);
+rsRetVal strmSetWCntr(strm_t *pThis, number_t *pWCnt);
PROTOTYPEObjClassInit(strm);
PROTOTYPEpropSetMeth(strm, bDeleteOnClose, int);
PROTOTYPEpropSetMeth(strm, iMaxFileSize, int);