summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-06-16 15:13:47 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-06-16 15:13:47 +0200
commit6f0db63e9b962edf6305860b608500e8c650b71b (patch)
tree142a4dc01419a08f46481d5ca5557da07f0474c7
parentaef1a38fe8c7472362904b2f90c67113b21034ab (diff)
downloadrsyslog-6f0db63e9b962edf6305860b608500e8c650b71b.zip
rsyslog-6f0db63e9b962edf6305860b608500e8c650b71b.tar.gz
rsyslog-6f0db63e9b962edf6305860b608500e8c650b71b.tar.xz
milestone: input-side multiSubmit capability
... commit before I try to touch the queue side ;)
-rw-r--r--runtime/rsyslog.h13
-rw-r--r--runtime/stream.c2
-rw-r--r--tcps_sess.c34
-rw-r--r--tools/syslogd.c26
4 files changed, 64 insertions, 11 deletions
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index e81c9ee..ea8a522 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -127,6 +127,19 @@ typedef enum {
} fiop_t;
+/* multi-submit support.
+ * This is done via a simple data structure, which holds the number of elements
+ * as well as an array of to-be-submitted messages.
+ * rgerhards, 2009-06-16
+ */
+typedef struct multi_submit_s multi_submit_t;
+struct multi_submit_s {
+ short maxElem; /* maximum number of Elements */
+ short nElem; /* current number of Elements, points to the next one FREE */
+ msg_t **ppMsgs;
+};
+
+
#ifndef _PATH_CONSOLE
#define _PATH_CONSOLE "/dev/console"
#endif
diff --git a/runtime/stream.c b/runtime/stream.c
index f13258b..8cbe029 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -188,7 +188,7 @@ finalize_it:
static rsRetVal
doPhysOpen(strm_t *pThis)
{
- int iFlags;
+ int iFlags = 0;
DEFiRet;
ISOBJ_TYPE_assert(pThis, strm);
diff --git a/tcps_sess.c b/tcps_sess.c
index f887e70..379faea 100644
--- a/tcps_sess.c
+++ b/tcps_sess.c
@@ -220,7 +220,7 @@ SetOnMsgReceive(tcps_sess_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar
* rgerhards, 2009-04-23
*/
static rsRetVal
-defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttGenTime)
+defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, multi_submit_t *pMultiSub)
{
msg_t *pMsg;
DEFiRet;
@@ -245,7 +245,15 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttG
MsgSetRcvFrom(pMsg, pThis->fromHost);
MsgSetRuleset(pMsg, pThis->pLstnInfo->pRuleset);
CHKiRet(MsgSetRcvFromIP(pMsg, pThis->fromHostIP));
- CHKiRet(submitMsg(pMsg));
+
+ if(pMultiSub == NULL) {
+ CHKiRet(submitMsg(pMsg));
+ } else {
+ pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg;
+ if(pMultiSub->nElem == pMultiSub->maxElem)
+ CHKiRet(multiSubmitMsg(pMultiSub));
+ }
+
finalize_it:
/* reset status variables */
@@ -299,7 +307,7 @@ PrepareClose(tcps_sess_t *pThis)
*/
dbgprintf("Extra data at end of stream in legacy syslog/tcp message - processing\n");
datetime.getCurrTime(&stTime, &ttGenTime);
- defaultDoSubmitMessage(pThis, &stTime, ttGenTime);
+ defaultDoSubmitMessage(pThis, &stTime, ttGenTime, NULL);
}
finalize_it:
@@ -334,7 +342,7 @@ Close(tcps_sess_t *pThis)
* rgerhards, 2008-03-14
*/
static rsRetVal
-processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t ttGenTime)
+processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t ttGenTime, multi_submit_t *pMultiSub)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, tcps_sess);
@@ -380,7 +388,7 @@ processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t tt
if(pThis->iMsg >= iMaxLine) {
/* emergency, we now need to flush, no matter if we are at end of message or not... */
dbgprintf("error: message received is larger than max msg size, we split it\n");
- defaultDoSubmitMessage(pThis, stTime, ttGenTime);
+ defaultDoSubmitMessage(pThis, stTime, ttGenTime, pMultiSub);
/* we might think if it is better to ignore the rest of the
* message than to treat it as a new one. Maybe this is a good
* candidate for a configuration parameter...
@@ -391,7 +399,7 @@ processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t tt
if(( (c == '\n')
|| ((pThis->pSrv->addtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) && (c == pThis->pSrv->addtlFrameDelim))
) && pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { /* record delimiter? */
- defaultDoSubmitMessage(pThis, stTime, ttGenTime);
+ defaultDoSubmitMessage(pThis, stTime, ttGenTime, pMultiSub);
pThis->inputState = eAtStrtFram;
} else {
/* IMPORTANT: here we copy the actual frame content to the message - for BOTH framing modes!
@@ -408,7 +416,7 @@ processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t tt
pThis->iOctetsRemain--;
if(pThis->iOctetsRemain < 1) {
/* we have end of frame! */
- defaultDoSubmitMessage(pThis, stTime, ttGenTime);
+ defaultDoSubmitMessage(pThis, stTime, ttGenTime, pMultiSub);
pThis->inputState = eAtStrtFram;
}
}
@@ -433,9 +441,12 @@ processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t tt
* this *is* the *correct* reception step for all the data we received, because
* we have just received a bunch of data! -- rgerhards, 2009-06-16
*/
+#define NUM_MULTISUB 128
static rsRetVal
DataRcvd(tcps_sess_t *pThis, char *pData, size_t iLen)
{
+ multi_submit_t multiSub;
+ msg_t *pMsgs[NUM_MULTISUB];
struct syslogTime stTime;
time_t ttGenTime;
char *pEnd;
@@ -446,18 +457,25 @@ DataRcvd(tcps_sess_t *pThis, char *pData, size_t iLen)
assert(iLen > 0);
datetime.getCurrTime(&stTime, &ttGenTime);
+ multiSub.ppMsgs = pMsgs;
+ multiSub.maxElem = NUM_MULTISUB;
+ multiSub.nElem = 0;
/* We now copy the message to the session buffer. */
pEnd = pData + iLen; /* this is one off, which is intensional */
iNbrTimeUsed = 0; /* full time query */
while(pData < pEnd) {
- CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime));
+ CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime, &multiSub));
}
+ /* submit anything that was not yet submitted */
+ CHKiRet(multiSubmitMsg(&multiSub));
+
finalize_it:
RETiRet;
}
+#undef NUM_MULTISUB
/* queryInterface function
diff --git a/tools/syslogd.c b/tools/syslogd.c
index ea8eff7..4d2839a 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -1345,8 +1345,7 @@ int parseLegacySyslogMsg(msg_t *pMsg, int flags)
}
-/* submit a fully created message to the main message queue. The message is
- * fully processed and parsed, so no parsing at all happens. This is primarily
+/* submit a message to the main message queue. This is primarily
* a hook to prevent the need for callers to know about the main message queue
* (which may change in the future as we will probably have multiple rule
* sets and thus queues...).
@@ -1366,6 +1365,29 @@ submitMsg(msg_t *pMsg)
}
+/* submit multiple messages at once, very similar to submitMsg, just
+ * for multi_submit_t.
+ * rgerhards, 2009-06-16
+ */
+rsRetVal
+multiSubmitMsg(multi_submit_t *pMultiSub)
+{
+ int i;
+ DEFiRet;
+ assert(pMultiSub != NULL);
+
+ for(i = 0 ; i < pMultiSub->nElem ; ++i) {
+dbgprintf("multiSubmitMsg, index %d\n", i);
+ MsgPrepareEnqueue(pMultiSub->ppMsgs[i]);
+ qqueueEnqObj(pMsgQueue, pMultiSub->ppMsgs[i]->flowCtlType, (void*) pMultiSub->ppMsgs[i]);
+ }
+
+ pMultiSub->nElem = 0;
+
+ RETiRet;
+}
+
+
/* Log a message to the appropriate log files, users, etc. based on
* the priority.
* rgerhards 2004-11-08: actually, this also decodes all but the PRI part.