summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2011-12-16 18:51:49 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2011-12-16 18:51:49 +0100
commit955312ebf5d59e2ed0d35ba624070439b9fc5273 (patch)
tree479d801177ae61a79d3280c3876595628305717a /action.c
parenta1b752b32ffb90bfdce4fd8dfdffac3ebbadc79e (diff)
downloadrsyslog-955312ebf5d59e2ed0d35ba624070439b9fc5273.tar.gz
rsyslog-955312ebf5d59e2ed0d35ba624070439b9fc5273.tar.xz
rsyslog-955312ebf5d59e2ed0d35ba624070439b9fc5273.zip
experimental: added first stats counter to action object
Diffstat (limited to 'action.c')
-rw-r--r--action.c49
1 files changed, 43 insertions, 6 deletions
diff --git a/action.c b/action.c
index af7723e7..45fa6cf7 100644
--- a/action.c
+++ b/action.c
@@ -112,6 +112,7 @@
#include "datetime.h"
#include "unicode-helper.h"
#include "atomic.h"
+#include "statsobj.h"
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
@@ -127,6 +128,7 @@ DEFobjCurrIf(obj)
DEFobjCurrIf(datetime)
DEFobjCurrIf(module)
DEFobjCurrIf(errmsg)
+DEFobjCurrIf(statsobj)
static int iActExecOnceInterval = 0; /* execute action once every nn seconds */
static int iActExecEveryNthOccur = 0; /* execute action every n-th occurence (0,1=always) */
@@ -313,18 +315,38 @@ rsRetVal
actionConstructFinalize(action_t *pThis)
{
DEFiRet;
- uchar pszQName[64]; /* friendly name of our queue */
+ uchar pszAName[64]; /* friendly name of our action */
ASSERT(pThis != NULL);
- /* find a name for our queue */
+ /* generate a friendly name for us action stats */
if(pThis->pszName == NULL) {
- snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr);
+ snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d", iActionNbr);
} else {
- ustrncpy(pszQName, pThis->pszName, sizeof(pszQName));
- pszQName[63] = '\0'; /* to be on the save side */
+ ustrncpy(pszAName, pThis->pszName, sizeof(pszAName));
+ pszAName[63] = '\0'; /* to be on the save side */
}
+ /* support statistics gathering */
+ CHKiRet(statsobj.Construct(&pThis->statsobj));
+ CHKiRet(statsobj.SetName(pThis->statsobj, pszAName));
+
+ STATSCOUNTER_INIT(pThis->ctrProcessed, pThis->mutCtrProcessed);
+ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("processed"),
+ ctrType_IntCtr, &pThis->ctrProcessed));
+
+ CHKiRet(statsobj.ConstructFinalize(pThis->statsobj));
+
+ /* create our queue */
+
+ /* generate a friendly name for the queue */
+ if(pThis->pszName == NULL) {
+ snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d queue",
+ iActionNbr);
+ } else {
+ ustrncpy(pszAName, pThis->pszName, sizeof(pszAName));
+ pszAName[63] = '\0'; /* to be on the save side */
+ }
/* now check if we can run the action in "firehose mode" during stage one of
* its processing (that is before messages are enqueued into the action q).
* This is only possible if some features, which require strict sequence, are
@@ -367,7 +389,7 @@ actionConstructFinalize(action_t *pThis)
*/
CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize,
(rsRetVal (*)(void*, batch_t*, int*))processBatchMain));
- obj.SetName((obj_t*) pThis->pQueue, pszQName);
+ obj.SetName((obj_t*) pThis->pQueue, pszAName);
/* ... set some properties ... */
# define setQPROP(func, directive, data) \
@@ -1266,6 +1288,7 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
{
DEFiRet;
+ STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg));
else
@@ -1542,6 +1565,17 @@ finalize_it:
RETiRet;
}
+static inline void
+countStatsBatchEnq(action_t *pAction, batch_t *pBatch)
+{
+ int i;
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ if(pBatch->pElem[i].bFilterOK) {
+ STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
+ }
+ }
+}
+
/* enqueue a batch in direct mode. We have put this into its own function just to avoid
* cluttering the actual submit function.
@@ -1585,6 +1619,7 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
}
if(bNeedSubmit) {
+ countStatsBatchEnq(pAction, pBatch);
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
} else {
DBGPRINTF("no need to submit batch, all bFilterOK==0\n");
@@ -1599,6 +1634,7 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
}
}
} else {
+ countStatsBatchEnq(pAction, pBatch);
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
}
@@ -1822,6 +1858,7 @@ rsRetVal actionClassInit(void)
CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(module, CORE_COMPONENT));
CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &pszActionName, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL));