summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c37
1 files changed, 27 insertions, 10 deletions
diff --git a/queue.c b/queue.c
index 1c371cd9..7bd64d41 100644
--- a/queue.c
+++ b/queue.c
@@ -1,3 +1,4 @@
+#include <stdio.h>
// TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in
// call consumer state. Facilitates retaining messages in queue until action could
// be called!
@@ -127,12 +128,10 @@ static rsRetVal qConstructLinkedList(queue_t *pThis)
}
-static rsRetVal qDestructLinkedList(queue_t *pThis)
+static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis)
{
DEFiRet;
- assert(pThis != NULL);
-
/* with the linked list type, there is nothing to do here. The
* reason is that the Destructor is only called after all entries
* have bene taken off the queue. In this case, there is nothing
@@ -211,12 +210,14 @@ static rsRetVal qDiskOpenFile(queue_t *pThis, queueFileDescription_t *pFile, int
dbgprintf("Queue 0x%lx: opened file '%s' for %d as %d\n", (unsigned long) pThis, pFile->pszFileName, flags, pFile->fd);
finalize_it:
-dbgprintf("qDiskOpen iRet %d\n", iRet);
return iRet;
}
-/* close a queue file */
+/* close a queue file
+ * Note that the bDeleteOnClose flag is honored. If it is set, the file will be
+ * deleted after close. This is in support for the qRead thread.
+ */
static rsRetVal qDiskCloseFile(queue_t *pThis, queueFileDescription_t *pFile)
{
DEFiRet;
@@ -228,6 +229,10 @@ static rsRetVal qDiskCloseFile(queue_t *pThis, queueFileDescription_t *pFile)
close(pFile->fd); // TODO: error check
pFile->fd = -1;
+ if(pFile->bDeleteOnClose) {
+ unlink((char*) pThis->tVars.disk.fRead.pszFileName); // TODO: check returncode
+ }
+
if(pFile->pszFileName != NULL) {
free(pFile->pszFileName); /* no longer needed in any case (just for open) */
pFile->pszFileName = NULL;
@@ -375,17 +380,19 @@ static rsRetVal qConstructDisk(queue_t *pThis)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
pThis->tVars.disk.lenSpoolDir = strlen((char*)pThis->tVars.disk.pszSpoolDir);
- pThis->tVars.disk.iMaxFileSize = 1024 * 3; // TODO: configurable!
+ pThis->tVars.disk.iMaxFileSize = 10240000; //1024 * 3; // TODO: configurable!
pThis->tVars.disk.fWrite.iCurrFileNum = 1;
pThis->tVars.disk.fWrite.iCurrOffs = 0;
pThis->tVars.disk.fWrite.fd = -1;
pThis->tVars.disk.fWrite.iUngetC = -1;
+ pThis->tVars.disk.fRead.bDeleteOnClose = 0; /* do *NOT* set this to 1! */
pThis->tVars.disk.fRead.iCurrFileNum = 1;
pThis->tVars.disk.fRead.fd = -1;
pThis->tVars.disk.fRead.iCurrOffs = 0;
pThis->tVars.disk.fRead.iUngetC = -1;
+ pThis->tVars.disk.fRead.bDeleteOnClose = 1;
finalize_it:
return iRet;
@@ -418,7 +425,7 @@ static rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
assert(pThis != NULL);
if(pThis->tVars.disk.fWrite.fd == -1)
- CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fWrite, O_RDWR|O_CREAT, 0600)); // TODO: open modes!
+ CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fWrite, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes!
CHKiRet((objSerialize(pUsr))(pUsr, &pCStr));
iWritten = write(pThis->tVars.disk.fWrite.fd, rsCStrGetBufBeg(pCStr), rsCStrLen(pCStr));
@@ -564,7 +571,7 @@ queueDel(queue_t *pThis, void *pUsr)
* Please NOTE:
* Having more than one worker requires considerable
* additional code review in regard to thread-safety.
- */
+*/
static void *
queueWorker(void *arg)
{
@@ -774,16 +781,24 @@ rsRetVal
queueEnqObj(queue_t *pThis, void *pUsr)
{
DEFiRet;
+ int iCancelStateSave;
int i;
struct timespec t;
assert(pThis != NULL);
- if(pThis->qType != QUEUETYPE_DIRECT)
+ /* Please note that this function is not cancel-safe and consequently
+ * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
+ * during its execution. If that is not done, race conditions occur if the
+ * thread is canceled (most important use case is input module termination).
+ * rgerhards, 2008-01-08
+ */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ if(pThis->pWorkerThreads != NULL)
pthread_mutex_lock(pThis->mut);
while(pThis->iQueueSize >= pThis->iMaxQueueSize) {
- dbgprintf("enqueueMsg: queue 0x%lx FULL.\n", (unsigned long) pThis);
+ dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", (unsigned long) pThis);
clock_gettime (CLOCK_REALTIME, &t);
t.tv_sec += 2; /* TODO: configurable! */
@@ -805,6 +820,8 @@ finalize_it:
dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i);
}
+ pthread_setcancelstate(iCancelStateSave, NULL);
+
return iRet;
}
/*