summaryrefslogtreecommitdiffstats
path: root/plugins/imdiag/imdiag.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/imdiag/imdiag.c')
-rw-r--r--plugins/imdiag/imdiag.c46
1 files changed, 40 insertions, 6 deletions
diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c
index addbbb9a..b3468921 100644
--- a/plugins/imdiag/imdiag.c
+++ b/plugins/imdiag/imdiag.c
@@ -56,6 +56,7 @@
#include "net.h" /* for permittedPeers, may be removed when this is removed */
MODULE_TYPE_INPUT
+MODULE_TYPE_NOKEEP
/* static data */
DEF_IMOD_STATIC_DATA
@@ -203,7 +204,7 @@ doInjectMsg(int iNum)
DEFiRet;
snprintf((char*)szMsg, sizeof(szMsg)/sizeof(uchar),
- "<167>Mar 1 01:00:00 172.20.245.8 tag msgnum:%8.8d:\n", iNum);
+ "<167>Mar 1 01:00:00 172.20.245.8 tag msgnum:%8.8d:", iNum);
datetime.getCurrTime(&stTime, &ttGenTime);
/* we now create our own message object and submit it to the queue */
@@ -212,7 +213,6 @@ doInjectMsg(int iNum)
MsgSetInputName(pMsg, pInputName);
MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME;
- pMsg->bParseHOSTNAME = 1;
MsgSetRcvFrom(pMsg, pRcvDummy);
CHKiRet(MsgSetRcvFromIP(pMsg, pRcvIPDummy));
CHKiRet(submitMsg(pMsg));
@@ -245,7 +245,8 @@ injectMsg(uchar *pszCmd, tcps_sess_t *pSess)
doInjectMsg(i + iFrom);
}
- CHKiRet(sendResponse(pSess, "messages injected\n"));
+ CHKiRet(sendResponse(pSess, "%d messages injected\n", nMsgs));
+ DBGPRINTF("imdiag: %d messages injected\n", nMsgs);
finalize_it:
RETiRet;
@@ -253,20 +254,37 @@ finalize_it:
/* This function waits until the main queue is drained (size = 0)
+ * To make sure it really is drained, we check three times. Otherwise we
+ * may just see races.
*/
static rsRetVal
waitMainQEmpty(tcps_sess_t *pSess)
{
int iMsgQueueSize;
+ int iPrint = 0;
DEFiRet;
CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize));
- while(iMsgQueueSize > 0) {
- srSleep(0,2); /* wait a little bit */
+ while(1) {
+ if(iMsgQueueSize == 0) {
+ /* verify that queue is still empty (else it could just be a race!) */
+ srSleep(0,250000);/* wait a little bit */
+ CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize));
+ if(iMsgQueueSize == 0) {
+ srSleep(0,500000);/* wait a little bit */
+ CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize));
+ }
+ }
+ if(iMsgQueueSize == 0)
+ break;
+ if(iPrint++ % 500 == 0)
+ dbgprintf("imdiag sleeping, wait mainq drain, curr size %d\n", iMsgQueueSize);
+ srSleep(0,200000);/* wait a little bit */
CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize));
}
CHKiRet(sendResponse(pSess, "mainqueue empty\n"));
+ DBGPRINTF("imdiag: mainqueue empty\n");
finalize_it:
RETiRet;
@@ -280,6 +298,7 @@ OnMsgReceived(tcps_sess_t *pSess, uchar *pRcv, int iLenMsg)
{
int iMsgQueueSize;
uchar *pszMsg;
+ uchar *pToFree = NULL;
uchar cmdBuf[1024];
DEFiRet;
@@ -290,15 +309,18 @@ OnMsgReceived(tcps_sess_t *pSess, uchar *pRcv, int iLenMsg)
* WITHOUT a termination \0 char. So we need to convert it to one
* before proceeding.
*/
- CHKmalloc(pszMsg = malloc(sizeof(uchar) * (iLenMsg + 1)));
+ CHKmalloc(pszMsg = MALLOC(sizeof(uchar) * (iLenMsg + 1)));
+ pToFree = pszMsg;
memcpy(pszMsg, pRcv, iLenMsg);
pszMsg[iLenMsg] = '\0';
getFirstWord(&pszMsg, cmdBuf, sizeof(cmdBuf)/sizeof(uchar), TO_LOWERCASE);
+ dbgprintf("imdiag received command '%s'\n", cmdBuf);
if(!ustrcmp(cmdBuf, UCHAR_CONSTANT("getmainmsgqueuesize"))) {
CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize));
CHKiRet(sendResponse(pSess, "%d\n", iMsgQueueSize));
+ DBGPRINTF("imdiag: %d messages in main queue\n", iMsgQueueSize);
} else if(!ustrcmp(cmdBuf, UCHAR_CONSTANT("waitmainqueueempty"))) {
CHKiRet(waitMainQEmpty(pSess));
} else if(!ustrcmp(cmdBuf, UCHAR_CONSTANT("injectmsg"))) {
@@ -309,6 +331,8 @@ OnMsgReceived(tcps_sess_t *pSess, uchar *pRcv, int iLenMsg)
}
finalize_it:
+ if(pToFree != NULL)
+ free(pToFree);
RETiRet;
}
@@ -416,6 +440,9 @@ CODESTARTmodExit
net.DestructPermittedPeers(&pPermPeersRoot);
}
+ /* free some globals to keep valgrind happy */
+ free(pszInputName);
+
/* release objects we used */
objRelease(net, LM_NET_FILENAME);
objRelease(netstrm, LM_NETSTRMS_FILENAME);
@@ -442,10 +469,17 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus
}
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATURENonCancelInputTermination)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_IMOD_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
ENDqueryEtryPt