diff options
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 124 |
1 files changed, 104 insertions, 20 deletions
@@ -54,7 +54,7 @@ */ /* -------------------- fixed array -------------------- */ -rsRetVal qConstructFixedArray(queue_t *pThis) +static rsRetVal qConstructFixedArray(queue_t *pThis) { DEFiRet; @@ -72,7 +72,7 @@ finalize_it: } -rsRetVal qDestructFixedArray(queue_t *pThis) +static rsRetVal qDestructFixedArray(queue_t *pThis) { DEFiRet; @@ -84,7 +84,7 @@ rsRetVal qDestructFixedArray(queue_t *pThis) return iRet; } -rsRetVal qAddFixedArray(queue_t *pThis, void* in) +static rsRetVal qAddFixedArray(queue_t *pThis, void* in) { DEFiRet; @@ -97,7 +97,7 @@ rsRetVal qAddFixedArray(queue_t *pThis, void* in) return iRet; } -rsRetVal qDelFixedArray(queue_t *pThis, void **out) +static rsRetVal qDelFixedArray(queue_t *pThis, void **out) { DEFiRet; @@ -113,7 +113,7 @@ rsRetVal qDelFixedArray(queue_t *pThis, void **out) /* -------------------- linked list -------------------- */ -rsRetVal qConstructLinkedList(queue_t *pThis) +static rsRetVal qConstructLinkedList(queue_t *pThis) { DEFiRet; @@ -126,7 +126,7 @@ rsRetVal qConstructLinkedList(queue_t *pThis) } -rsRetVal qDestructLinkedList(queue_t *pThis) +static rsRetVal qDestructLinkedList(queue_t *pThis) { DEFiRet; @@ -141,7 +141,7 @@ rsRetVal qDestructLinkedList(queue_t *pThis) return iRet; } -rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr) +static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr) { DEFiRet; qLinkedList_t *pEntry; @@ -165,7 +165,7 @@ finalize_it: return iRet; } -rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr) +static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr) { DEFiRet; qLinkedList_t *pEntry; @@ -194,7 +194,7 @@ rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr) /* open a queue file */ -rsRetVal qDiskOpenFile(queue_t *pThis, queueFileDescription_t *pFile, int flags, mode_t mode) +static rsRetVal qDiskOpenFile(queue_t *pThis, queueFileDescription_t *pFile, int flags, mode_t mode) { DEFiRet; uchar *pszFile = NULL; @@ -219,7 +219,7 @@ finalize_it: /* close a queue file */ -rsRetVal qDiskCloseFile(queue_t *pThis, queueFileDescription_t *pFile) +static rsRetVal qDiskCloseFile(queue_t *pThis, queueFileDescription_t *pFile) { DEFiRet; @@ -235,7 +235,7 @@ rsRetVal qDiskCloseFile(queue_t *pThis, queueFileDescription_t *pFile) /* switch to next queue file */ -rsRetVal qDiskNextFile(queue_t *pThis, queueFileDescription_t *pFile) +static rsRetVal qDiskNextFile(queue_t *pThis, queueFileDescription_t *pFile) { DEFiRet; @@ -254,9 +254,88 @@ finalize_it: } +/*** buffered read functions for queue files ***/ + +/* logically "read" a character from a file. What actually happens is that + * data is taken from the buffer. Only if the buffer is full, data is read + * directly from file. In that case, a read is performed blockwise. + * rgerhards, 2008-01-07 + * NOTE: needs to be enhanced to support sticking with a queue entry (if not + * deleted). + */ +static rsRetVal qDiskReadChar(queueFileDescription_t *pFile, uchar *pC) +{ + DEFiRet; + uchar c; + + assert(pFile != NULL); + assert(pC != NULL); + + if(pFile->pIOBuf == NULL) { /* TODO: maybe we should move that to file open... */ + if((pFile->pIOBuf = (uchar*) malloc(sizeof(uchar) * qFILE_IOBUF_SIZE )) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + pFile->iBufPtrMax = 0; /* results in immediate read request */ + } + + /* do we need to obtain a new buffer */ + if(pFile->iBufPtr >= pFile->iBufPtrMax) { + /* read */ + pFile->iBufPtrMax = read(pFile->fd, pFile->pIOBuf, qFILE_IOBUF_SIZE); + dbgprintf("Read %d bytes from file %d\n", pFile->iBufPtrMax, pFile->fd); + if(pFile->iBufPtrMax == 0) + ABORT_FINALIZE(RS_RET_EOF); + else if(pFile->iBufPtrMax < 0) + ABORT_FINALIZE(RS_RET_IO_ERROR); + /* if we reach this point, we had a good read */ + pFile->iBufPtr = 0; + } + + *pC = pFile->pIOBuf[pFile->iBufPtr++]; + +finalize_it: + return iRet; +} + +/* read a line from a queue file. A line is terminated by LF. The LF is read, but it + * is not returned in the buffer (it is discared). The caller is responsible for + * destruction of the returned CStr object! + * rgerhards, 2008-01-07 + */ +static rsRetVal qDiskReadLine(queueFileDescription_t *pFile, rsCStrObj **ppCStr) +{ + DEFiRet; + uchar c; + rsCStrObj *pCStr = NULL; + + assert(pFile != NULL); + assert(ppCStr != NULL); + + if((pCStr = rsCStrConstruct()) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + + /* now read the line */ + CHKiRet(qDiskReadChar(pFile, &c)); + while(c != '\n') { + CHKiRet(rsCStrAppendChar(pCStr, c)); + CHKiRet(qDiskReadChar(pFile, &c)); + } + CHKiRet(rsCStrFinish(pCStr)); + *ppCStr = pCStr; + +finalize_it: + if(iRet != RS_RET_OK && pCStr != NULL) + rsCStrDestruct(pCStr); + + return iRet; +} + + +/*** end buffered read functions for queue files ***/ + + /* now come the disk mode queue driver functions */ -rsRetVal qConstructDisk(queue_t *pThis) +static rsRetVal qConstructDisk(queue_t *pThis) { DEFiRet; @@ -266,7 +345,7 @@ rsRetVal qConstructDisk(queue_t *pThis) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); pThis->tVars.disk.lenSpoolDir = strlen((char*)pThis->tVars.disk.pszSpoolDir); - pThis->tVars.disk.iMaxFileSize = 1024; // TODO: configurable! + pThis->tVars.disk.iMaxFileSize = 1024 * 3; // TODO: configurable! pThis->tVars.disk.fWrite.iCurrFileNum = 1; pThis->tVars.disk.fWrite.iCurrOffs = 0; @@ -281,7 +360,7 @@ finalize_it: } -rsRetVal qDestructDisk(queue_t *pThis) +static rsRetVal qDestructDisk(queue_t *pThis) { DEFiRet; @@ -298,7 +377,7 @@ rsRetVal qDestructDisk(queue_t *pThis) return iRet; } -rsRetVal qAddDisk(queue_t *pThis, void* pUsr) +static rsRetVal qAddDisk(queue_t *pThis, void* pUsr) { DEFiRet; int iWritten; @@ -323,7 +402,7 @@ finalize_it: return iRet; } -rsRetVal qDelDisk(queue_t __attribute__((unused)) *pThis, void __attribute__((unused)) **ppUsr) +static rsRetVal qDelDisk(queue_t __attribute__((unused)) *pThis, void __attribute__((unused)) **ppUsr) { DEFiRet; @@ -333,6 +412,10 @@ rsRetVal qDelDisk(queue_t __attribute__((unused)) *pThis, void __attribute__((un CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fRead, O_RDONLY, 0600)); // TODO: open modes! /* read here */ + rsCStrObj *pCStr = NULL; + CHKiRet(qDiskReadLine(&pThis->tVars.disk.fRead, &pCStr)); + dbgprintf("qDelDisk read line '%s'\n", rsCStrGetSzStr(pCStr)); + rsCStrDestruct(pCStr); /* de-serialize here */ /* switch to next file when EOF is reached. We may also delete the last file in that case. @@ -340,6 +423,7 @@ rsRetVal qDelDisk(queue_t __attribute__((unused)) *pThis, void __attribute__((un if(pThis->tVars.disk.fWrite.iCurrOffs >= pThis->tVars.disk.iMaxFileSize) CHKiRet(qDiskNextFile(pThis, &pThis->tVars.disk.fWrite)); */ + iRet = RS_RET_ERR; finalize_it: @@ -347,18 +431,18 @@ finalize_it: } /* -------------------- direct (no queueing) -------------------- */ -rsRetVal qConstructDirect(queue_t __attribute__((unused)) *pThis) +static rsRetVal qConstructDirect(queue_t __attribute__((unused)) *pThis) { return RS_RET_OK; } -rsRetVal qDestructDirect(queue_t __attribute__((unused)) *pThis) +static rsRetVal qDestructDirect(queue_t __attribute__((unused)) *pThis) { return RS_RET_OK; } -rsRetVal qAddDirect(queue_t *pThis, void* pUsr) +static rsRetVal qAddDirect(queue_t *pThis, void* pUsr) { DEFiRet; rsRetVal iRetLocal; @@ -378,7 +462,7 @@ rsRetVal qAddDirect(queue_t *pThis, void* pUsr) return iRet; } -rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out) +static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out) { return RS_RET_OK; } |