summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2011-12-19 11:16:07 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2011-12-19 11:16:07 +0100
commit5fe837bf7dbdcc245ee233feb1fbcc6d052a4898 (patch)
tree206148b35803ac791559f08926d236a2e95f47af /action.c
parentdb137ef8c1b3f8b24ccf9b3b4bfed4fdf493916a (diff)
downloadrsyslog-5fe837bf7dbdcc245ee233feb1fbcc6d052a4898.tar.gz
rsyslog-5fe837bf7dbdcc245ee233feb1fbcc6d052a4898.tar.xz
rsyslog-5fe837bf7dbdcc245ee233feb1fbcc6d052a4898.zip
added instrumentation
Diffstat (limited to 'action.c')
-rw-r--r--action.c48
1 files changed, 45 insertions, 3 deletions
diff --git a/action.c b/action.c
index 278625ce..49eb3663 100644
--- a/action.c
+++ b/action.c
@@ -112,6 +112,7 @@
#include "datetime.h"
#include "unicode-helper.h"
#include "atomic.h"
+#include "statsobj.h"
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
@@ -127,6 +128,7 @@ DEFobjCurrIf(obj)
DEFobjCurrIf(datetime)
DEFobjCurrIf(module)
DEFobjCurrIf(errmsg)
+DEFobjCurrIf(statsobj)
static int iActExecOnceInterval = 0; /* execute action once every nn seconds */
static int iActExecEveryNthOccur = 0; /* execute action every n-th occurence (0,1=always) */
@@ -263,6 +265,12 @@ rsRetVal actionDestruct(action_t *pThis)
qqueueDestruct(&pThis->pQueue);
}
+ /* destroy stats object, if we have one (may not always be
+ * be the case, e.g. if turned off)
+ */
+ if(pThis->statsobj != NULL)
+ statsobj.Destruct(&pThis->statsobj);
+
if(pThis->pMod != NULL)
pThis->pMod->freeInstance(pThis->pModData);
@@ -313,12 +321,31 @@ rsRetVal
actionConstructFinalize(action_t *pThis)
{
DEFiRet;
- uchar pszQName[64]; /* friendly name of our queue */
+ uchar pszAName[64]; /* friendly name of our queue */
ASSERT(pThis != NULL);
+ /* generate a friendly name for us action stats */
+ if(pThis->pszName == NULL) {
+ snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d", iActionNbr);
+ } else {
+ ustrncpy(pszAName, pThis->pszName, sizeof(pszAName));
+ pszAName[63] = '\0'; /* to be on the save side */
+ }
+
+ /* support statistics gathering */
+ CHKiRet(statsobj.Construct(&pThis->statsobj));
+ CHKiRet(statsobj.SetName(pThis->statsobj, pszAName));
+
+ STATSCOUNTER_INIT(pThis->ctrProcessed, pThis->mutCtrProcessed);
+ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("processed"),
+ ctrType_IntCtr, &pThis->ctrProcessed));
+
+ CHKiRet(statsobj.ConstructFinalize(pThis->statsobj));
+
+ /* create our queue */
/* find a name for our queue */
- snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr);
+ snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d queue", iActionNbr);
/* now check if we can run the action in "firehose mode" during stage one of
* its processing (that is before messages are enqueued into the action q).
@@ -362,7 +389,7 @@ actionConstructFinalize(action_t *pThis)
*/
CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize,
(rsRetVal (*)(void*, batch_t*, int*))processBatchMain));
- obj.SetName((obj_t*) pThis->pQueue, pszQName);
+ obj.SetName((obj_t*) pThis->pQueue, pszAName);
/* ... set some properties ... */
# define setQPROP(func, directive, data) \
@@ -1261,6 +1288,7 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
{
DEFiRet;
+ STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg));
else
@@ -1537,6 +1565,17 @@ finalize_it:
RETiRet;
}
+static inline void
+countStatsBatchEnq(action_t *pAction, batch_t *pBatch)
+{
+ int i;
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ if(pBatch->pElem[i].bFilterOK) {
+ STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
+ }
+ }
+}
+
/* enqueue a batch in direct mode. We have put this into its own function just to avoid
* cluttering the actual submit function.
@@ -1580,6 +1619,7 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
}
if(bNeedSubmit) {
+ countStatsBatchEnq(pAction, pBatch);
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
} else {
DBGPRINTF("no need to submit batch, all bFilterOK==0\n");
@@ -1594,6 +1634,7 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
}
}
} else {
+ countStatsBatchEnq(pAction, pBatch);
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
}
@@ -1817,6 +1858,7 @@ rsRetVal actionClassInit(void)
CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(module, CORE_COMPONENT));
CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &pszActionName, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL));