From 9757aeb56445eee3aca2b43e6b3efa1f1cb59ba3 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 22 Jul 2011 18:03:43 +0200 Subject: milestone: queue object now has a param handler for new conf interface ... and action queue defs use this new interface (but not yet the main queues) --- action.c | 88 +++++++++++++++++++++++++++------------------------------------- 1 file changed, 37 insertions(+), 51 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index c459a738..74f6f7f7 100644 --- a/action.c +++ b/action.c @@ -8,7 +8,7 @@ * the right code in question): For performance reasons, this module * uses different methods of message submission based on the user-selected * configuration. This code is similar, but can not be abstracted because - * of the performanse-affecting differences in it. As such, it is often + * of the performance-affecting differences in it. As such, it is often * necessary to triple-check that everything works well in *all* modes. * The different modes (and calling sequence) are: * @@ -197,32 +197,6 @@ static struct cnfparamblk paramblk = cnfparamdescr }; -/* Still TODO: */ -#if 0 - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &cs.pszActionQFName, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &cs.iActionQueueSize, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqBatchSize, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &cs.iActionQueMaxDiskSpace, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &cs.iActionQHighWtrMark, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &cs.iActionQLowWtrMark, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &cs.iActionQDiscardMark, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &cs.iActionQDiscardSeverity, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &cs.iActionQPersistUpdCnt, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &cs.bActionQSyncQeueFiles, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &cs.iActionQueueNumWorkers, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &cs.iActionQtoQShutdown, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &cs.iActionQtoActShutdown, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &cs.iActionQtoEnq, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkertimeoutthreadshutdown", 0, eCmdHdlrInt, NULL, &cs.iActionQtoWrkShutdown, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &cs.iActionQWrkMinMsgs, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &cs.iActionQueMaxFileSize, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &cs.bActionQSaveOnShutdown, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqSlowdown, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqtWinFromHr, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqtWinToHr, NULL, eConfObjAction)); -#endif - /* ------------------------------ methods ------------------------------ */ /* This function returns the "current" time for this action. Current time @@ -364,7 +338,7 @@ finalize_it: /* action construction finalizer */ rsRetVal -actionConstructFinalize(action_t *pThis) +actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) { DEFiRet; uchar pszQName[64]; /* friendly name of our queue */ @@ -434,25 +408,31 @@ actionConstructFinalize(action_t *pThis) } qqueueSetpUsr(pThis->pQueue, pThis); - setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", cs.iActionQueMaxDiskSpace); - setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", cs.iActionQueueDeqBatchSize); - setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", cs.iActionQueMaxFileSize); - setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", cs.pszActionQFName); - setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", cs.iActionQPersistUpdCnt); - setQPROP(qqueueSetbSyncQueueFiles, "$ActionQueueSyncQueueFiles", cs.bActionQSyncQeueFiles); - setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", cs.iActionQtoQShutdown ); - setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", cs.iActionQtoActShutdown); - setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", cs.iActionQtoWrkShutdown); - setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", cs.iActionQtoEnq); - setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", cs.iActionQHighWtrMark); - setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", cs.iActionQLowWtrMark); - setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", cs.iActionQDiscardMark); - setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", cs.iActionQDiscardSeverity); - setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", cs.iActionQWrkMinMsgs); - setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", cs.bActionQSaveOnShutdown); - setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", cs.iActionQueueDeqSlowdown); - setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", cs.iActionQueueDeqtWinFromHr); - setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", cs.iActionQueueDeqtWinToHr); + if(queueParams == NULL) { + /* use legacy params */ + setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", cs.iActionQueMaxDiskSpace); + setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", cs.iActionQueueDeqBatchSize); + setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", cs.iActionQueMaxFileSize); + setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", cs.pszActionQFName); + setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", cs.iActionQPersistUpdCnt); + setQPROP(qqueueSetbSyncQueueFiles, "$ActionQueueSyncQueueFiles", cs.bActionQSyncQeueFiles); + setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", cs.iActionQtoQShutdown ); + setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", cs.iActionQtoActShutdown); + setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", cs.iActionQtoWrkShutdown); + setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", cs.iActionQtoEnq); + setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", cs.iActionQHighWtrMark); + setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", cs.iActionQLowWtrMark); + setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", cs.iActionQDiscardMark); + setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", cs.iActionQDiscardSeverity); + setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", cs.iActionQWrkMinMsgs); + setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", cs.bActionQSaveOnShutdown); + setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", cs.iActionQueueDeqSlowdown); + setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", cs.iActionQueueDeqtWinFromHr); + setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", cs.iActionQueueDeqtWinToHr); + } else { + /* we have v6-style config params */ + qqueueApplyCnfParam(pThis->pQueue, queueParams); + } # undef setQPROP # undef setQPROPstr @@ -1762,7 +1742,8 @@ doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) * rgerhards, 2007-07-27 */ rsRetVal -addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, int bSuspended) +addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, + omodStringRequest_t *pOMSR, struct cnfparamvals *queueParams, int bSuspended) { DEFiRet; int i; @@ -1774,7 +1755,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques assert(ppAction != NULL); assert(pMod != NULL); assert(pOMSR != NULL); - DBGPRINTF("Module %s processed this config line.\n", module.GetName(pMod)); + DBGPRINTF("Module %s processes this action.\n", module.GetName(pMod)); CHKiRet(actionConstruct(&pAction)); /* create action object first */ pAction->pMod = pMod; @@ -1851,7 +1832,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques if(bSuspended) actionSuspend(pAction, datetime.GetTime(NULL)); /* "good" time call, only during init and unavoidable */ - CHKiRet(actionConstructFinalize(pAction)); + CHKiRet(actionConstructFinalize(pAction, queueParams)); /* TODO: if we exit here, we have a memory leak... */ @@ -1938,6 +1919,7 @@ rsRetVal actionNewInst(struct nvlst *lst, action_t **ppAction) { struct cnfparamvals *paramvals; + struct cnfparamvals *queueParams; modInfo_t *pMod; uchar *cnfModName = NULL; omodStringRequest_t *pOMSR; @@ -1966,7 +1948,10 @@ actionNewInst(struct nvlst *lst, action_t **ppAction) FINALIZE; /* iRet is already set to error state */ } - if((iRet = addAction(&pAction, pMod, pModData, pOMSR, (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { + qqueueDoCnfParams(lst, &queueParams); + + if((iRet = addAction(&pAction, pMod, pModData, pOMSR, queueParams, + (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { /* now check if the module is compatible with select features */ if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK) pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs; @@ -1994,6 +1979,7 @@ actionProcessCnf(struct cnfobj *o) { DEFiRet; #if 0 /* we need to check if we actually need this functionality -- later! */ +// This is for STAND-ALONE actions at the conf file TOP level struct cnfparamvals *paramvals; paramvals = nvlstGetParams(o->nvlst, ¶mblk, NULL); -- cgit