diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-09 17:25:07 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-09 17:25:07 +0000 |
commit | aa7e00d8e1a1d67fa2860623ffab75bd387faffc (patch) | |
tree | bf8d5fbf99050a010c1918d8a3a1168cdb43fe84 /queue.c | |
parent | fd8c6452c8a4d51d39eb511046fca09391138a22 (diff) | |
download | rsyslog-aa7e00d8e1a1d67fa2860623ffab75bd387faffc.tar.gz rsyslog-aa7e00d8e1a1d67fa2860623ffab75bd387faffc.tar.xz rsyslog-aa7e00d8e1a1d67fa2860623ffab75bd387faffc.zip |
changed queue class to use stream class
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 247 |
1 files changed, 18 insertions, 229 deletions
@@ -189,208 +189,20 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr) /* -------------------- disk -------------------- */ -/* first, disk-queue internal utility functions */ - - -/* open a queue file */ -static rsRetVal qDiskOpenFile(queue_t *pThis, queueFileDescription_t *pFile, int flags, mode_t mode) -{ - DEFiRet; - - assert(pThis != NULL); - assert(pFile != NULL); - - if(pThis->pszFilePrefix == NULL) - ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); - - /* now open the write file */ - CHKiRet(genFileName(&pFile->pszFileName, pThis->pszSpoolDir, pThis->lenSpoolDir, - pThis->pszFilePrefix, pThis->lenFilePrefix, pFile->iCurrFileNum, (uchar*) "qf", 2)); - - pFile->fd = open((char*)pFile->pszFileName, flags, mode); // TODO: open modes! - pFile->iCurrOffs = 0; - - dbgprintf("Queue 0x%lx: opened file '%s' for %d as %d\n", (unsigned long) pThis, pFile->pszFileName, flags, pFile->fd); - -finalize_it: - return iRet; -} - - -/* close a queue file - * Note that the bDeleteOnClose flag is honored. If it is set, the file will be - * deleted after close. This is in support for the qRead thread. - */ -static rsRetVal qDiskCloseFile(queue_t *pThis, queueFileDescription_t *pFile) -{ - DEFiRet; - - assert(pThis != NULL); - assert(pFile != NULL); - dbgprintf("Queue 0x%lx: closing file %d\n", (unsigned long) pThis, pFile->fd); - - close(pFile->fd); // TODO: error check - pFile->fd = -1; - - if(pFile->bDeleteOnClose) { - unlink((char*) pThis->tVars.disk.fRead.pszFileName); // TODO: check returncode - } - - if(pFile->pszFileName != NULL) { - free(pFile->pszFileName); /* no longer needed in any case (just for open) */ - pFile->pszFileName = NULL; - } - - return iRet; -} - - -/* switch to next queue file */ -static rsRetVal qDiskNextFile(queue_t *pThis, queueFileDescription_t *pFile) -{ - DEFiRet; - - assert(pThis != NULL); - assert(pFile != NULL); - CHKiRet(qDiskCloseFile(pThis, pFile)); - - /* we do modulo 1,000,000 so that the file number is always at most 6 digits. If we have a million - * or more queue files, something is awfully wrong and it is OK if we run into problems in that - * situation ;) -- rgerhards, 2008-01-09 - */ - pFile->iCurrFileNum = (pFile->iCurrFileNum + 1) % 1000000; - -finalize_it: - return iRet; -} - - -/*** buffered read functions for queue files ***/ - -/* logically "read" a character from a file. What actually happens is that - * data is taken from the buffer. Only if the buffer is full, data is read - * directly from file. In that case, a read is performed blockwise. - * rgerhards, 2008-01-07 - * NOTE: needs to be enhanced to support sticking with a queue entry (if not - * deleted). - */ -static rsRetVal qDiskReadChar(queueFileDescription_t *pFile, uchar *pC) -{ - DEFiRet; - - assert(pFile != NULL); - assert(pC != NULL); - -//dbgprintf("qDiskRead index %d, max %d\n", pFile->iBufPtr, pFile->iBufPtrMax); - if(pFile->pIOBuf == NULL) { /* TODO: maybe we should move that to file open... */ - if((pFile->pIOBuf = (uchar*) malloc(sizeof(uchar) * qFILE_IOBUF_SIZE )) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - pFile->iBufPtrMax = 0; /* results in immediate read request */ - } - - if(pFile->iUngetC != -1) { /* do we have an "unread" char that we need to provide? */ - *pC = pFile->iUngetC; - pFile->iUngetC = -1; - ABORT_FINALIZE(RS_RET_OK); - } - - /* do we need to obtain a new buffer */ - if(pFile->iBufPtr >= pFile->iBufPtrMax) { - /* read */ - pFile->iBufPtrMax = read(pFile->fd, pFile->pIOBuf, qFILE_IOBUF_SIZE); - dbgprintf("qDiskReadChar read %d bytes from file %d\n", pFile->iBufPtrMax, pFile->fd); - if(pFile->iBufPtrMax == 0) - ABORT_FINALIZE(RS_RET_EOF); - else if(pFile->iBufPtrMax < 0) - ABORT_FINALIZE(RS_RET_IO_ERROR); - /* if we reach this point, we had a good read */ - pFile->iBufPtr = 0; - } - - *pC = pFile->pIOBuf[pFile->iBufPtr++]; - -finalize_it: - return iRet; -} - - -/* unget a single character just like ungetc(). As with that call, there is only a single - * character buffering capability. - * rgerhards, 2008-01-07 - */ -static rsRetVal qDiskUnreadChar(queueFileDescription_t *pFile, uchar c) -{ - assert(pFile != NULL); - assert(pFile->iUngetC == -1); - pFile->iUngetC = c; - - return RS_RET_OK; -} - -#if 0 -/* we have commented out the code below because we would like to preserve it. It - * is currently not needed, but may be useful if we implemented a bufferred file - * class. - * rgerhards, 2008-01-07 - */ -/* read a line from a queue file. A line is terminated by LF. The LF is read, but it - * is not returned in the buffer (it is discared). The caller is responsible for - * destruction of the returned CStr object! - * rgerhards, 2008-01-07 - */ -static rsRetVal qDiskReadLine(queueFileDescription_t *pFile, rsCStrObj **ppCStr) -{ - DEFiRet; - uchar c; - rsCStrObj *pCStr = NULL; - - assert(pFile != NULL); - assert(ppCStr != NULL); - - if((pCStr = rsCStrConstruct()) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - - /* now read the line */ - CHKiRet(qDiskReadChar(pFile, &c)); - while(c != '\n') { - CHKiRet(rsCStrAppendChar(pCStr, c)); - CHKiRet(qDiskReadChar(pFile, &c)); - } - CHKiRet(rsCStrFinish(pCStr)); - *ppCStr = pCStr; - -finalize_it: - if(iRet != RS_RET_OK && pCStr != NULL) - rsCStrDestruct(pCStr); - - return iRet; -} - -#endif /* #if 0 - saved code */ - -/*** end buffered read functions for queue files ***/ - - -/* now come the disk mode queue driver functions */ - static rsRetVal qConstructDisk(queue_t *pThis) { DEFiRet; assert(pThis != NULL); - pThis->tVars.disk.fWrite.iCurrFileNum = 1; - pThis->tVars.disk.fWrite.iCurrOffs = 0; - pThis->tVars.disk.fWrite.fd = -1; - pThis->tVars.disk.fWrite.iUngetC = -1; - pThis->tVars.disk.fRead.bDeleteOnClose = 0; /* do *NOT* set this to 1! */ + CHKiRet(strmConstruct(&pThis->tVars.disk.pWrite)); + CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite)); - pThis->tVars.disk.fRead.iCurrFileNum = 1; - pThis->tVars.disk.fRead.fd = -1; - pThis->tVars.disk.fRead.iCurrOffs = 0; - pThis->tVars.disk.fRead.iUngetC = -1; - pThis->tVars.disk.fRead.bDeleteOnClose = 1; + CHKiRet(strmConstruct(&pThis->tVars.disk.pRead)); + CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1)); + CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead)); +finalize_it: return iRet; } @@ -401,10 +213,8 @@ static rsRetVal qDestructDisk(queue_t *pThis) assert(pThis != NULL); - if(pThis->tVars.disk.fWrite.fd != -1) - qDiskCloseFile(pThis, &pThis->tVars.disk.fWrite); - if(pThis->tVars.disk.fRead.fd != -1) - qDiskCloseFile(pThis, &pThis->tVars.disk.fRead); + strmDestruct(pThis->tVars.disk.pWrite); + strmDestruct(pThis->tVars.disk.pRead); if(pThis->pszSpoolDir != NULL) free(pThis->pszSpoolDir); @@ -415,23 +225,14 @@ static rsRetVal qDestructDisk(queue_t *pThis) static rsRetVal qAddDisk(queue_t *pThis, void* pUsr) { DEFiRet; - int iWritten; rsCStrObj *pCStr; assert(pThis != NULL); - if(pThis->tVars.disk.fWrite.fd == -1) - CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fWrite, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes! + CHKiRet(strmOpenFile(pThis->tVars.disk.pWrite, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes! CHKiRet((objSerialize(pUsr))(pUsr, &pCStr)); - iWritten = write(pThis->tVars.disk.fWrite.fd, rsCStrGetBufBeg(pCStr), rsCStrLen(pCStr)); - dbgprintf("Queue 0x%lx: write wrote %d bytes, errno: %d, err %s\n", (unsigned long) pThis, - iWritten, errno, strerror(errno)); - /* TODO: handle error case -- rgerhards, 2008-01-07 */ - - pThis->tVars.disk.fWrite.iCurrOffs += iWritten; - if(pThis->tVars.disk.fWrite.iCurrOffs >= pThis->iMaxFileSize) - CHKiRet(qDiskNextFile(pThis, &pThis->tVars.disk.fWrite)); + CHKiRet(strmWrite(pThis->tVars.disk.pWrite, rsCStrGetBufBeg(pCStr), rsCStrLen(pCStr))); finalize_it: return iRet; @@ -450,21 +251,20 @@ static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr) * We need to try at least twice because we may run into EOF and need * to switch files. */ - serialStore.pUsr = &pThis->tVars.disk.fRead; - serialStore.funcGetChar = (rsRetVal (*)(void*, uchar*)) qDiskReadChar; - serialStore.funcUngetChar = (rsRetVal (*)(void*, uchar)) qDiskUnreadChar; + serialStore.pUsr = pThis->tVars.disk.pRead; + serialStore.funcGetChar = (rsRetVal (*)(void*, uchar*)) strmReadChar; + serialStore.funcUngetChar = (rsRetVal (*)(void*, uchar)) strmUnreadChar; bRun = 1; while(bRun) { /* first check if we need to (re)open the file (we may have switched to a new one!) */ - if(pThis->tVars.disk.fRead.fd == -1) - CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fRead, O_RDONLY, 0600)); // TODO: open modes! + CHKiRet(strmOpenFile(pThis->tVars.disk.pRead, O_RDONLY, 0600)); // TODO: open modes! iRet = objDeserialize((void*) &pMsg, OBJMsg, &serialStore); if(iRet == RS_RET_OK) bRun = 0; /* we are done */ else if(iRet == RS_RET_EOF) { - dbgprintf("Queue 0x%lx: EOF on file %d\n", (unsigned long) pThis, pThis->tVars.disk.fRead.fd); - CHKiRet(qDiskNextFile(pThis, &pThis->tVars.disk.fRead)); + dbgprintf("Queue 0x%lx: EOF on file %d\n", (unsigned long) pThis, pThis->tVars.disk.pRead->fd); + CHKiRet(strmNextFile(pThis->tVars.disk.pRead)); } else FINALIZE; } @@ -781,19 +581,8 @@ rsRetVal queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) { DEFiRet; - - assert(pThis != NULL); - assert(pszPrefix != NULL); - - if(iLenPrefix < 1) - ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); - - if((pThis->pszFilePrefix = malloc(sizeof(uchar) * iLenPrefix + 1)) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - - memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1); /* always think about the \0! */ - pThis->lenFilePrefix = iLenPrefix; - + CHKiRet(strmSetFilePrefix(pThis->tVars.disk.pWrite, pszPrefix, iLenPrefix)); + CHKiRet(strmSetFilePrefix(pThis->tVars.disk.pRead, pszPrefix, iLenPrefix)); finalize_it: return iRet; } |