diff options
Diffstat (limited to 'plugins/omhdfs/omhdfs.c')
-rw-r--r-- | plugins/omhdfs/omhdfs.c | 137 |
1 files changed, 105 insertions, 32 deletions
diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c index 8b72747f..cd14d03c 100644 --- a/plugins/omhdfs/omhdfs.c +++ b/plugins/omhdfs/omhdfs.c @@ -51,6 +51,7 @@ MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP +MODULE_CNFNAME("omhdfs") /* internal structures */ @@ -60,12 +61,18 @@ DEFobjCurrIf(errmsg) /* global data */ static struct hashtable *files; /* holds all file objects that we know */ -/* globals for default values */ -static uchar *fileName = NULL; -static uchar *hdfsHost = NULL; -static uchar *dfltTplName = NULL; /* default template name to use */ -int hdfsPort = 0; -/* end globals for default values */ +typedef struct configSettings_s { + uchar *fileName; + uchar *hdfsHost; + uchar *dfltTplName; /* default template name to use */ + int hdfsPort; +} configSettings_t; + +SCOPING_SUPPORT; /* must be set AFTER configSettings_t is defined */ + +BEGINinitConfVars /* (re)set config variables to default values */ +CODESTARTinitConfVars +ENDinitConfVars typedef struct { uchar *name; @@ -80,6 +87,8 @@ typedef struct { typedef struct _instanceData { file_t *pFile; + uchar ioBuf[64*1024]; + unsigned offsBuf; } instanceData; /* forward definitions (down here, need data types) */ @@ -260,7 +269,8 @@ fileOpen(file_t *pFile) if(errno == ENOENT) { DBGPRINTF("omhdfs: ENOENT trying to append to '%s', now trying create\n", pFile->name); - pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0); + pFile->fh = hdfsOpenFile(pFile->fs, + (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0); } } if(pFile->fh == NULL) { @@ -275,12 +285,15 @@ finalize_it: } +/* Note: lenWrite is reset to zero on successful write! */ static inline rsRetVal -fileWrite(file_t *pFile, uchar *buf) +fileWrite(file_t *pFile, uchar *buf, size_t *lenWrite) { - size_t lenWrite; DEFiRet; + if(*lenWrite == 0) + FINALIZE; + if(pFile->nUsers > 1) d_pthread_mutex_lock(&pFile->mut); @@ -294,18 +307,18 @@ fileWrite(file_t *pFile, uchar *buf) } } - lenWrite = strlen((char*) buf); - tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, lenWrite); - if((unsigned) num_written_bytes != lenWrite) { - errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE, "omhdfs: failed to write %s, expected %lu bytes, " - "written %lu\n", pFile->name, (unsigned long) lenWrite, +dbgprintf("XXXXX: omhdfs writing %u bytes\n", *lenWrite); + tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, *lenWrite); + if((unsigned) num_written_bytes != *lenWrite) { + errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE, + "omhdfs: failed to write %s, expected %lu bytes, " + "written %lu\n", pFile->name, (unsigned long) *lenWrite, (unsigned long) num_written_bytes); ABORT_FINALIZE(RS_RET_SUSPENDED); } + *lenWrite = 0; finalize_it: - if(pFile->nUsers > 1) - d_pthread_mutex_unlock(&pFile->mut); RETiRet; } @@ -333,6 +346,40 @@ finalize_it: /* ---END FILE OBJECT---------------------------------------------------- */ +/* This adds data to the output buffer and performs an actual write + * if the new data does not fit into the buffer. Note that we never write + * partial data records. Other actions may write into the same file, and if + * we would write partial records, data could become severely mixed up. + * Note that we must check of some new data arrived is large than our + * buffer. In that case, the new data will written with its own + * write operation. + */ +static inline rsRetVal +addData(instanceData *pData, uchar *buf) +{ + unsigned len; + DEFiRet; + + len = strlen((char*)buf); + if(pData->offsBuf + len < sizeof(pData->ioBuf)) { + /* new data fits into remaining buffer */ + memcpy((char*) pData->ioBuf + pData->offsBuf, buf, len); + pData->offsBuf += len; + } else { +dbgprintf("XXXXX: not enough room, need to flush\n"); + CHKiRet(fileWrite(pData->pFile, pData->ioBuf, &pData->offsBuf)); + if(len >= sizeof(pData->ioBuf)) { + CHKiRet(fileWrite(pData->pFile, buf, &len)); + } else { + memcpy((char*) pData->ioBuf + pData->offsBuf, buf, len); + pData->offsBuf += len; + } + } + + iRet = RS_RET_DEFER_COMMIT; +finalize_it: + RETiRet; +} BEGINcreateInstance CODESTARTcreateInstance @@ -358,13 +405,31 @@ CODESTARTtryResume } ENDtryResume + +BEGINbeginTransaction +CODESTARTbeginTransaction +dbgprintf("omhdfs: beginTransaction\n"); +ENDbeginTransaction + + BEGINdoAction CODESTARTdoAction - DBGPRINTF("omuxsock: action to to write to %s\n", pData->pFile->name); - iRet = fileWrite(pData->pFile, ppString[0]); + DBGPRINTF("omhdfs: action to to write to %s\n", pData->pFile->name); + iRet = addData(pData, ppString[0]); +dbgprintf("omhdfs: done doAction\n"); ENDdoAction +BEGINendTransaction +CODESTARTendTransaction +dbgprintf("omhdfs: endTransaction\n"); + if(pData->offsBuf != 0) { + DBGPRINTF("omhdfs: data unwritten at end of transaction, persisting...\n"); + iRet = fileWrite(pData->pFile, pData->ioBuf, &pData->offsBuf); + } +ENDendTransaction + + BEGINparseSelectorAct file_t *pFile; int r; @@ -381,22 +446,22 @@ CODESTARTparseSelectorAct CHKiRet(createInstance(&pData)); CODE_STD_STRING_REQUESTparseSelectorAct(1) CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, 0, - (dfltTplName == NULL) ? (uchar*)"RSYSLOG_FileFormat" : dfltTplName)); + (cs.dfltTplName == NULL) ? (uchar*)"RSYSLOG_FileFormat" : cs.dfltTplName)); - if(fileName == NULL) { + if(cs.fileName == NULL) { errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: no file name specified, can not continue"); ABORT_FINALIZE(RS_RET_FILE_NOT_SPECIFIED); } - pFile = hashtable_search(files, fileName); + pFile = hashtable_search(files, cs.fileName); if(pFile == NULL) { /* we need a new file object, this one not seen before */ CHKiRet(fileObjConstruct(&pFile)); - CHKmalloc(pFile->name = fileName); - CHKmalloc(keybuf = ustrdup(fileName)); - fileName = NULL; /* re-set, data passed to file object */ - CHKmalloc(pFile->hdfsHost = strdup((hdfsHost == NULL) ? "default" : (char*) hdfsHost)); - pFile->hdfsPort = hdfsPort; + CHKmalloc(pFile->name = cs.fileName); + CHKmalloc(keybuf = ustrdup(cs.fileName)); + cs.fileName = NULL; /* re-set, data passed to file object */ + CHKmalloc(pFile->hdfsHost = strdup((cs.hdfsHost == NULL) ? "default" : (char*) cs.hdfsHost)); + pFile->hdfsPort = cs.hdfsPort; fileOpen(pFile); if(pFile->fh == NULL){ errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: failed to open %s - " @@ -409,6 +474,7 @@ CODESTARTparseSelectorAct } fileObjAddUser(pFile); pData->pFile = pFile; + pData->offsBuf = 0; CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct @@ -438,8 +504,12 @@ ENDdoHUP */ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) { - hdfsHost = NULL; - hdfsPort = 0; + cs.hdfsHost = NULL; + cs.hdfsPort = 0; + free(cs.fileName); + cs.fileName = NULL; + free(cs.dfltTplName); + cs.dfltTplName = NULL; return RS_RET_OK; } @@ -455,10 +525,12 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ CODEqueryEtryPt_doHUP ENDqueryEtryPt + BEGINmodInit() CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; @@ -467,10 +539,11 @@ CODEmodInit_QueryRegCFSLineHdlr CHKmalloc(files = create_hashtable(20, hash_from_string, key_equals_string, fileObjDestruct4Hashtable)); - CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsfilename", 0, eCmdHdlrGetWord, NULL, &fileName, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"omhdfshost", 0, eCmdHdlrGetWord, NULL, &hdfsHost, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &hdfsPort, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &dfltTplName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsfilename", 0, eCmdHdlrGetWord, NULL, &cs.fileName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"omhdfshost", 0, eCmdHdlrGetWord, NULL, &cs.hdfsHost, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &cs.hdfsPort, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &cs.dfltTplName, NULL)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); + DBGPRINTF("omhdfs: module compiled with rsyslog version %s.\n", VERSION); CODEmodInit_QueryRegCFSLineHdlr ENDmodInit |