diff options
Diffstat (limited to 'plugins/imdiag/imdiag.c')
-rw-r--r-- | plugins/imdiag/imdiag.c | 46 |
1 files changed, 40 insertions, 6 deletions
diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c index addbbb9a..b3468921 100644 --- a/plugins/imdiag/imdiag.c +++ b/plugins/imdiag/imdiag.c @@ -56,6 +56,7 @@ #include "net.h" /* for permittedPeers, may be removed when this is removed */ MODULE_TYPE_INPUT +MODULE_TYPE_NOKEEP /* static data */ DEF_IMOD_STATIC_DATA @@ -203,7 +204,7 @@ doInjectMsg(int iNum) DEFiRet; snprintf((char*)szMsg, sizeof(szMsg)/sizeof(uchar), - "<167>Mar 1 01:00:00 172.20.245.8 tag msgnum:%8.8d:\n", iNum); + "<167>Mar 1 01:00:00 172.20.245.8 tag msgnum:%8.8d:", iNum); datetime.getCurrTime(&stTime, &ttGenTime); /* we now create our own message object and submit it to the queue */ @@ -212,7 +213,6 @@ doInjectMsg(int iNum) MsgSetInputName(pMsg, pInputName); MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; - pMsg->bParseHOSTNAME = 1; MsgSetRcvFrom(pMsg, pRcvDummy); CHKiRet(MsgSetRcvFromIP(pMsg, pRcvIPDummy)); CHKiRet(submitMsg(pMsg)); @@ -245,7 +245,8 @@ injectMsg(uchar *pszCmd, tcps_sess_t *pSess) doInjectMsg(i + iFrom); } - CHKiRet(sendResponse(pSess, "messages injected\n")); + CHKiRet(sendResponse(pSess, "%d messages injected\n", nMsgs)); + DBGPRINTF("imdiag: %d messages injected\n", nMsgs); finalize_it: RETiRet; @@ -253,20 +254,37 @@ finalize_it: /* This function waits until the main queue is drained (size = 0) + * To make sure it really is drained, we check three times. Otherwise we + * may just see races. */ static rsRetVal waitMainQEmpty(tcps_sess_t *pSess) { int iMsgQueueSize; + int iPrint = 0; DEFiRet; CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize)); - while(iMsgQueueSize > 0) { - srSleep(0,2); /* wait a little bit */ + while(1) { + if(iMsgQueueSize == 0) { + /* verify that queue is still empty (else it could just be a race!) */ + srSleep(0,250000);/* wait a little bit */ + CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize)); + if(iMsgQueueSize == 0) { + srSleep(0,500000);/* wait a little bit */ + CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize)); + } + } + if(iMsgQueueSize == 0) + break; + if(iPrint++ % 500 == 0) + dbgprintf("imdiag sleeping, wait mainq drain, curr size %d\n", iMsgQueueSize); + srSleep(0,200000);/* wait a little bit */ CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize)); } CHKiRet(sendResponse(pSess, "mainqueue empty\n")); + DBGPRINTF("imdiag: mainqueue empty\n"); finalize_it: RETiRet; @@ -280,6 +298,7 @@ OnMsgReceived(tcps_sess_t *pSess, uchar *pRcv, int iLenMsg) { int iMsgQueueSize; uchar *pszMsg; + uchar *pToFree = NULL; uchar cmdBuf[1024]; DEFiRet; @@ -290,15 +309,18 @@ OnMsgReceived(tcps_sess_t *pSess, uchar *pRcv, int iLenMsg) * WITHOUT a termination \0 char. So we need to convert it to one * before proceeding. */ - CHKmalloc(pszMsg = malloc(sizeof(uchar) * (iLenMsg + 1))); + CHKmalloc(pszMsg = MALLOC(sizeof(uchar) * (iLenMsg + 1))); + pToFree = pszMsg; memcpy(pszMsg, pRcv, iLenMsg); pszMsg[iLenMsg] = '\0'; getFirstWord(&pszMsg, cmdBuf, sizeof(cmdBuf)/sizeof(uchar), TO_LOWERCASE); + dbgprintf("imdiag received command '%s'\n", cmdBuf); if(!ustrcmp(cmdBuf, UCHAR_CONSTANT("getmainmsgqueuesize"))) { CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize)); CHKiRet(sendResponse(pSess, "%d\n", iMsgQueueSize)); + DBGPRINTF("imdiag: %d messages in main queue\n", iMsgQueueSize); } else if(!ustrcmp(cmdBuf, UCHAR_CONSTANT("waitmainqueueempty"))) { CHKiRet(waitMainQEmpty(pSess)); } else if(!ustrcmp(cmdBuf, UCHAR_CONSTANT("injectmsg"))) { @@ -309,6 +331,8 @@ OnMsgReceived(tcps_sess_t *pSess, uchar *pRcv, int iLenMsg) } finalize_it: + if(pToFree != NULL) + free(pToFree); RETiRet; } @@ -416,6 +440,9 @@ CODESTARTmodExit net.DestructPermittedPeers(&pPermPeersRoot); } + /* free some globals to keep valgrind happy */ + free(pszInputName); + /* release objects we used */ objRelease(net, LM_NET_FILENAME); objRelease(netstrm, LM_NETSTRMS_FILENAME); @@ -442,10 +469,17 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus } +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURENonCancelInputTermination) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt |