summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-05-27 12:00:14 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-05-27 12:00:14 +0200
commit33316de49b7775ac40dd0ec0f4b7ff6256ccde78 (patch)
treeedeec9b609baf2011838f7dbfcb6fc3bd82f29e7 /plugins
parentaa9426f683fa6af9280bc63050ee0187ba4c57e1 (diff)
parent35ea061f62cf0cbae074b2df47dc6b462da4fd20 (diff)
downloadrsyslog-33316de49b7775ac40dd0ec0f4b7ff6256ccde78.tar.gz
rsyslog-33316de49b7775ac40dd0ec0f4b7ff6256ccde78.tar.xz
rsyslog-33316de49b7775ac40dd0ec0f4b7ff6256ccde78.zip
Merge branch 'master' into ultra-reliable
Conflicts: tests/Makefile.am tests/diskqueue.sh tests/imtcp-multiport.sh tests/manytcp.sh tests/memq-persist.sh
Diffstat (limited to 'plugins')
-rw-r--r--plugins/imdiag/imdiag.c145
-rw-r--r--plugins/imtcp/imtcp.c1
2 files changed, 132 insertions, 14 deletions
diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c
index 40f94692..c700cab7 100644
--- a/plugins/imdiag/imdiag.c
+++ b/plugins/imdiag/imdiag.c
@@ -52,6 +52,8 @@
#include "errmsg.h"
#include "tcpsrv.h"
#include "srUtils.h"
+#include "msg.h"
+#include "datetime.h"
#include "net.h" /* for permittedPeers, may be removed when this is removed */
MODULE_TYPE_INPUT
@@ -63,6 +65,7 @@ DEFobjCurrIf(tcps_sess)
DEFobjCurrIf(net)
DEFobjCurrIf(netstrm)
DEFobjCurrIf(errmsg)
+DEFobjCurrIf(datetime)
/* Module static data */
static tcpsrv_t *pOurTcpsrv = NULL; /* our TCP server(listener) TODO: change for multiple instances */
@@ -134,10 +137,123 @@ onErrClose(tcps_sess_t *pSess)
/* ------------------------------ end callbacks ------------------------------ */
+/* get the first word delimited by space from a given string. The pointer is
+ * advanced to after the word. Any leading spaces are discarded. If the
+ * output buffer is too small, parsing ends on buffer full condition.
+ * An empty buffer is returned if there is no more data inside the string.
+ * rgerhards, 2009-05-27
+ */
+#define TO_LOWERCASE 1
+#define NO_MODIFY 0
+static void
+getFirstWord(uchar **ppszSrc, uchar *pszBuf, size_t lenBuf, int options)
+{
+ uchar c;
+ uchar *pszSrc = *ppszSrc;
+
+ while(*pszSrc && *pszSrc == ' ')
+ ++pszSrc; /* skip to first non-space */
+
+ while(*pszSrc && *pszSrc != ' ' && lenBuf > 1) {
+ c = *pszSrc++;
+ if(options & TO_LOWERCASE)
+ c = tolower(c);
+ *pszBuf++ = c;
+ lenBuf--;
+ }
+
+ *pszBuf = '\0';
+ *ppszSrc = pszSrc;
+}
+
+
+/* send a response back to the originator
+ * rgerhards, 2009-05-27
+ */
+static rsRetVal __attribute__((format(printf, 2, 3)))
+sendResponse(tcps_sess_t *pSess, char *fmt, ...)
+{
+ va_list ap;
+ ssize_t len;
+ uchar buf[1024];
+ DEFiRet;
+
+ va_start(ap, fmt);
+ len = vsnprintf((char*)buf, sizeof(buf), fmt, ap);
+ va_end(ap);
+ CHKiRet(netstrm.Send(pSess->pStrm, buf, &len));
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* actually submit a message to the rsyslog core
+ */
+static rsRetVal
+doInjectMsg(int iNum)
+{
+ uchar szMsg[1024];
+ msg_t *pMsg;
+ struct syslogTime stTime;
+ time_t ttGenTime;
+ DEFiRet;
+
+ snprintf((char*)szMsg, sizeof(szMsg)/sizeof(uchar),
+ "<167>Mar 1 01:00:00 172.20.245.8 tag msgnum:%8.8d:\n", iNum);
+
+ datetime.getCurrTime(&stTime, &ttGenTime);
+ /* we now create our own message object and submit it to the queue */
+ CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime));
+ CHKmalloc(pMsg->pszRawMsg = ustrdup(szMsg));
+ pMsg->iLenRawMsg = ustrlen(szMsg);
+ MsgSetInputName(pMsg, UCHAR_CONSTANT("imdiag"));
+ MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
+ pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME;
+ pMsg->bParseHOSTNAME = 1;
+ MsgSetRcvFrom(pMsg, UCHAR_CONSTANT("127.0.0.1")); /* TODO: way may use the real sender here... */
+ CHKiRet(MsgSetRcvFromIP(pMsg, UCHAR_CONSTANT("127.0.0.1")));
+ CHKiRet(submitMsg(pMsg));
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* This function injects messages. Command format:
+ * injectmsg <fromnbr> <number-of-messages>
+ * rgerhards, 2009-05-27
+ */
+static rsRetVal
+injectMsg(uchar *pszCmd, tcps_sess_t *pSess)
+{
+ uchar wordBuf[1024];
+ int iFrom;
+ int nMsgs;
+ int i;
+ DEFiRet;
+
+ /* we do not check errors here! */
+ getFirstWord(&pszCmd, wordBuf, sizeof(wordBuf)/sizeof(uchar), TO_LOWERCASE);
+ iFrom = atoi((char*)wordBuf);
+ getFirstWord(&pszCmd, wordBuf, sizeof(wordBuf)/sizeof(uchar), TO_LOWERCASE);
+ nMsgs = atoi((char*)wordBuf);
+
+ for(i = 0 ; i < nMsgs ; ++i) {
+ doInjectMsg(i + iFrom);
+ }
+
+ CHKiRet(sendResponse(pSess, "messages injected\n"));
+
+finalize_it:
+ RETiRet;
+}
+
+
/* This function waits until the main queue is drained (size = 0)
*/
static rsRetVal
-waitMainQEmpty(void)
+waitMainQEmpty(tcps_sess_t *pSess)
{
int iMsgQueueSize;
DEFiRet;
@@ -148,21 +264,21 @@ waitMainQEmpty(void)
CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize));
}
+ CHKiRet(sendResponse(pSess, "mainqueue empty\n"));
+
finalize_it:
RETiRet;
}
-
/* Function to handle received messages. This is our core function!
* rgerhards, 2009-05-24
*/
static rsRetVal
OnMsgReceived(tcps_sess_t *pSess, uchar *pRcv, int iLenMsg)
{
- ssize_t len;
int iMsgQueueSize;
uchar *pszMsg;
- uchar buf[1024];
+ uchar cmdBuf[1024];
DEFiRet;
assert(pSess != NULL);
@@ -176,17 +292,18 @@ OnMsgReceived(tcps_sess_t *pSess, uchar *pRcv, int iLenMsg)
memcpy(pszMsg, pRcv, iLenMsg);
pszMsg[iLenMsg] = '\0';
- if(!ustrcmp(pszMsg, UCHAR_CONSTANT("GetMainMsgQueueSize"))) {
+ getFirstWord(&pszMsg, cmdBuf, sizeof(cmdBuf)/sizeof(uchar), TO_LOWERCASE);
+
+ if(!ustrcmp(cmdBuf, UCHAR_CONSTANT("getmainmsgqueuesize"))) {
CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize));
- len = snprintf((char*)buf, sizeof(buf)/sizeof(uchar), "%d\n", iMsgQueueSize);
- CHKiRet(netstrm.Send(pSess->pStrm, buf, &len));
- } else if(!ustrcmp(pszMsg, UCHAR_CONSTANT("WaitMainQueueEmpty"))) {
- CHKiRet(waitMainQEmpty());
- len = snprintf((char*)buf, sizeof(buf)/sizeof(uchar), "mainqueue empty\n");
- CHKiRet(netstrm.Send(pSess->pStrm, buf, &len));
+ CHKiRet(sendResponse(pSess, "%d\n", iMsgQueueSize));
+ } else if(!ustrcmp(cmdBuf, UCHAR_CONSTANT("waitmainqueueempty"))) {
+ CHKiRet(waitMainQEmpty(pSess));
+ } else if(!ustrcmp(cmdBuf, UCHAR_CONSTANT("injectmsg"))) {
+ CHKiRet(injectMsg(pszMsg, pSess));
} else {
- len = snprintf((char*)buf, sizeof(buf)/sizeof(uchar), "unkown command '%s'\n", pszMsg);
- CHKiRet(netstrm.Send(pSess->pStrm, buf, &len));
+ dbgprintf("imdiag unkown command '%s'\n", cmdBuf);
+ CHKiRet(sendResponse(pSess, "unkown command '%s'\n", cmdBuf));
}
finalize_it:
@@ -285,6 +402,7 @@ CODESTARTmodExit
objRelease(tcps_sess, LM_TCPSRV_FILENAME);
objRelease(tcpsrv, LM_TCPSRV_FILENAME);
objRelease(errmsg, CORE_COMPONENT);
+ objRelease(datetime, CORE_COMPONENT);
ENDmodExit
@@ -321,6 +439,7 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(tcps_sess, LM_TCPSRV_FILENAME));
CHKiRet(objUse(tcpsrv, LM_TCPSRV_FILENAME));
CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(datetime, CORE_COMPONENT));
/* register config file handlers */
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("imdiagserverrun"), 0, eCmdHdlrGetWord,
diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c
index acac38b2..7300a799 100644
--- a/plugins/imtcp/imtcp.c
+++ b/plugins/imtcp/imtcp.c
@@ -180,7 +180,6 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa
}
}
-dbgprintf("XXX: try add listen port %s\n", pNewVal);
/* initialized, now add socket */
CHKiRet(tcpsrv.SetInputName(pOurTcpsrv, pszInputName == NULL ?
UCHAR_CONSTANT("imtcp") : pszInputName));