diff options
Diffstat (limited to 'plugins/ommongodb/ommongodb.c')
-rw-r--r-- | plugins/ommongodb/ommongodb.c | 470 |
1 files changed, 309 insertions, 161 deletions
diff --git a/plugins/ommongodb/ommongodb.c b/plugins/ommongodb/ommongodb.c index 8e19105f..39e2e4f9 100644 --- a/plugins/ommongodb/ommongodb.c +++ b/plugins/ommongodb/ommongodb.c @@ -1,3 +1,28 @@ +/* ommongodb.c + * Output module for mongodb. + * Note: this module uses the libmongo-client library. The original 10gen + * mongodb C interface is crap. Obtain the library here: + * https://github.com/algernon/libmongo-client + * + * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" #include <stdio.h> #include <string.h> #include <stdlib.h> @@ -6,54 +31,61 @@ #include <assert.h> #include <signal.h> #include <time.h> -#include "bson.h" -#include "mongo.h" -#include "config.h" +#include <mongo.h> + #include "rsyslog.h" #include "conf.h" #include "syslogd-types.h" #include "srUtils.h" #include "template.h" #include "module-template.h" +#include "datetime.h" #include "errmsg.h" #include "cfsysline.h" -#include "mongo-c-driver/src/mongo.h" - -#define countof(X) ( (size_t) ( sizeof(X)/sizeof*(X) ) ) - -#define DEFAULT_SERVER "127.0.0.1" -#define DEFAULT_DATABASE "syslog" -#define DEFAULT_COLLECTION "log" -#define DEFAULT_DB_COLLECTION "syslog.log" - -//i just defined some constants, i couldt not find the limit -#define MONGO_DB_NAME_SIZE 128 -#define MONGO_COLLECTION_NAME_SIZE 128 MODULE_TYPE_OUTPUT +MODULE_TYPE_NOKEEP +MODULE_CNFNAME("ommongodb") /* internal structures */ DEF_OMOD_STATIC_DATA DEFobjCurrIf(errmsg) +DEFobjCurrIf(datetime) typedef struct _instanceData { - mongo_connection conn[1]; /* ptr */ - mongo_connection_options opts[1]; - mongo_conn_return status; - char db[MONGO_DB_NAME_SIZE]; - char collection[MONGO_COLLECTION_NAME_SIZE]; - char dbcollection[MONGO_DB_NAME_SIZE + MONGO_COLLECTION_NAME_SIZE + 1]; - unsigned uLastMongoDBErrno; - //unsigned iSrvPort; /* sample: server port */ + mongo_sync_connection *conn; + uchar *server; + int port; + uchar *db; + uchar *collection; + uchar *uid; + uchar *pwd; + uchar *dbNcoll; + uchar *tplName; } instanceData; -char db[_DB_MAXDBLEN+2]; -static int iSrvPort = 27017; + +/* tables for interfacing with the v6 config system */ +/* action (instance) parameters */ +static struct cnfparamdescr actpdescr[] = { + { "server", eCmdHdlrGetWord, 0 }, + { "serverport", eCmdHdlrInt, 0 }, + { "db", eCmdHdlrGetWord, 0 }, + { "collection", eCmdHdlrGetWord, 0 }, + { "uid", eCmdHdlrGetWord, 0 }, + { "pwd", eCmdHdlrGetWord, 0 }, + { "template", eCmdHdlrGetWord, 1 } +}; +static struct cnfparamblk actpblk = + { CNFPARAMBLK_VERSION, + sizeof(actpdescr)/sizeof(struct cnfparamdescr), + actpdescr + }; + BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance - BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature /* use this to specify if select features are supported by this @@ -66,149 +98,200 @@ ENDisCompatibleWithFeature static void closeMongoDB(instanceData *pData) { - ASSERT(pData != NULL); - if(pData->conn != NULL) { - mongo_destroy( pData->conn ); - memset(pData->conn,0x00,sizeof(mongo_connection)); + mongo_sync_disconnect(pData->conn); + pData->conn = NULL; } } + BEGINfreeInstance CODESTARTfreeInstance closeMongoDB(pData); + free(pData->server); + free(pData->db); + free(pData->collection); + free(pData->uid); + free(pData->pwd); + free(pData->tplName); ENDfreeInstance + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo /* nothing special here */ ENDdbgPrintInstInfo -/* log a database error with descriptive message. - * We check if we have a valid MongoDB handle. If not, we simply - * report an error + +/* report error that occured during *last* operation */ -static void reportDBError(instanceData *pData, int bSilent) +static void +reportMongoError(instanceData *pData) { - char errMsg[512]; - bson ErrObj; - - ASSERT(pData != NULL); - - /* output log message */ - errno = 0; - if(pData->conn == NULL) { - errmsg.LogError(0, NO_ERRCODE, "unknown DB error occured - could not obtain MongoDB handle"); - } else { /* we can ask mysql for the error description... */ - //we should handle the error. if bSilent is set then we should print as debug - mongo_cmd_get_last_error(pData->conn, pData->db, &ErrObj); - bson_destroy(&ErrObj); + char errStr[1024]; + errmsg.LogError(0, RS_RET_ERR, "ommongodb: error: %s", + rs_strerror_r(errno, errStr, sizeof(errStr))); +#if 0 + gchar *err; + if(mongo_sync_cmd_get_last_error(pData->conn, (gchar*)pData->db, &err) == TRUE) { + errmsg.LogError(0, RS_RET_ERR, "ommongodb: error: %s", err); + } else { + errmsg.LogError(0, RS_RET_ERR, "ommongodb: we had an error, but can " + "not obtain specifics"); } - - return; +#endif } + /* The following function is responsible for initializing a * MySQL connection. * Initially added 2004-10-28 mmeckelein */ static rsRetVal initMongoDB(instanceData *pData, int bSilent) { + char *server; DEFiRet; - ASSERT(pData != NULL); - ASSERT(pData->conn == NULL); - - //I'm trying to fallback to a default here - if(pData->opts->port == 0) - pData->opts->port = 27017; - - if(pData->opts->host == 0x00) - strcpy(pData->opts->host,DEFAULT_SERVER); - - if(pData->dbcollection == 0x00) - strcpy(pData->dbcollection,DEFAULT_DB_COLLECTION); + server = (pData->server == NULL) ? "127.0.0.1" : (char*) pData->server; + DBGPRINTF("ommongodb: trying connect to '%s' at port %d\n", server, pData->port); - pData->status = mongo_connect(pData->conn, pData->opts ); - - switch (pData->status) { - case mongo_conn_success: - fprintf(stderr, "connection succeeded\n" ); - iRet = RS_RET_OK; - break; - case mongo_conn_bad_arg: - errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle"); - fprintf(stderr, "bad arguments\n" ); - iRet = RS_RET_SUSPENDED; - break; - case mongo_conn_no_socket: - errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle"); - fprintf(stderr, "no socket\n" ); - iRet = RS_RET_SUSPENDED; - break; - case mongo_conn_fail: - errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle"); - fprintf(stderr, "connection failed\n" ); - iRet = RS_RET_SUSPENDED; - break; - case mongo_conn_not_master: - errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle"); - fprintf(stderr, "not master\n" ); - iRet = RS_RET_SUSPENDED; - break; - } + pData->conn = mongo_sync_connect(server, pData->port, TRUE); + if(pData->conn == NULL) { + if(!bSilent) { + reportMongoError(pData); + dbgprintf("ommongodb: can not initialize MongoDB handle"); + } + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + +finalize_it: RETiRet; } -//we must implement it -rsRetVal writeMongoDB(uchar *psz, instanceData *pData) -{ - char mydate[32]; - char **szParams; - bson b[1]; - bson_buffer buf[1]; - bson_buffer_init( buf ); - bson_append_new_oid(buf, "_id" ); - memset(mydate,0x00,32); - - - DEFiRet; - - ASSERT(psz != NULL); - ASSERT(pData != NULL); - - - /* see if we are ready to proceed */ - if(pData->conn == NULL) { - CHKiRet(initMongoDB(pData, 0)); - } -szParams = (char**)(void*) psz; -//We can make it beter -//if you change the fields in your template, we must update it here -//there is any C_metaprogramming_ninja there? :-) -if(countof(szParams) > 0) +/* map syslog severity to lumberjack level + * TODO: consider moving this to msg.c - make some dirty "friend" references... + * rgerhards, 2012-03-19 + */ +static inline char * +getLumberjackLevel(short severity) { - bson_append_string( buf, "msg", szParams[0]); - bson_append_string( buf, "facility",szParams[1]); - bson_append_string( buf, "hostname", szParams[2] ); - bson_append_string(buf, "priority",szParams[3]); - bson_append_int(buf,"count",countof(szParams)); - bson_from_buffer( b, buf ); - mongo_insert(pData->conn, pData->dbcollection, b ); + switch(severity) { + case 0: return "FATAL"; + case 1: + case 2: + case 3: return "ERROR"; + case 4: return "WARN"; + case 5: + case 6: return "INFO"; + case 7: return "DEBUG"; + default:DBGPRINTF("ommongodb: invalid syslog severity %u\n", severity); + return "INVLD"; + } } -if(b) - bson_destroy(b); +/* small helper: get integer power of 10 */ +static inline int +i10pow(int exp) +{ + int r = 1; + while(exp > 0) { + r *= 10; + exp--; + } + return r; +} +/* write to mongodb in MSG passing mode, that is without a template. + * In this mode, we use the standard document format, which is somewhat + * aligned to cee (as described in project lumberjack). Note that this is + * a moving target, so we may run out of sync (and stay so to retain + * backward compatibility, which we consider pretty important). + */ +rsRetVal writeMongoDB_msg(msg_t *pMsg, instanceData *pData) +{ + bson *doc = NULL; + uchar *procid; short unsigned procid_free; size_t procid_len; + uchar *tag; short unsigned tag_free; size_t tag_len; + uchar *pid; short unsigned pid_free; size_t pid_len; + uchar *sys; short unsigned sys_free; size_t sys_len; + uchar *msg; short unsigned msg_free; size_t msg_len; + int severity, facil; + gint64 ts_gen, ts_rcv; /* timestamps: generated, received */ + int secfrac; + DEFiRet; - finalize_it: - if(iRet == RS_RET_OK) { - pData->uLastMongoDBErrno = 0; /* reset error for error supression */ + /* see if we are ready to proceed */ + if(pData->conn == NULL) { + CHKiRet(initMongoDB(pData, 0)); } - - RETiRet; + procid = MsgGetProp(pMsg, NULL, PROP_PROGRAMNAME, NULL, &procid_len, &procid_free); + tag = MsgGetProp(pMsg, NULL, PROP_SYSLOGTAG, NULL, &tag_len, &tag_free); + pid = MsgGetProp(pMsg, NULL, PROP_PROCID, NULL, &pid_len, &pid_free); + sys = MsgGetProp(pMsg, NULL, PROP_HOSTNAME, NULL, &sys_len, &sys_free); + msg = MsgGetProp(pMsg, NULL, PROP_MSG, NULL, &msg_len, &msg_free); + + // TODO: move to datetime? Refactor in any case! rgerhards, 2012-03-30 + ts_gen = (gint64) datetime.syslogTime2time_t(&pMsg->tTIMESTAMP) * 1000; /* ms! */ +dbgprintf("ommongodb: ts_gen is %lld\n", (long long) ts_gen); +dbgprintf("ommongodb: secfrac is %d, precision %d\n", pMsg->tTIMESTAMP.secfrac, pMsg->tTIMESTAMP.secfracPrecision); + if(pMsg->tTIMESTAMP.secfracPrecision > 3) { + secfrac = pMsg->tTIMESTAMP.secfrac / i10pow(pMsg->tTIMESTAMP.secfracPrecision - 3); + } else if(pMsg->tTIMESTAMP.secfracPrecision < 3) { + secfrac = pMsg->tTIMESTAMP.secfrac * i10pow(3 - pMsg->tTIMESTAMP.secfracPrecision); + } else { + secfrac = pMsg->tTIMESTAMP.secfrac; + } + ts_gen += secfrac; + ts_rcv = (gint64) datetime.syslogTime2time_t(&pMsg->tRcvdAt) * 1000; /* ms! */ + if(pMsg->tRcvdAt.secfracPrecision > 3) { + secfrac = pMsg->tRcvdAt.secfrac / i10pow(pMsg->tRcvdAt.secfracPrecision - 3); + } else if(pMsg->tRcvdAt.secfracPrecision < 3) { + secfrac = pMsg->tRcvdAt.secfrac * i10pow(3 - pMsg->tRcvdAt.secfracPrecision); + } else { + secfrac = pMsg->tRcvdAt.secfrac; + } + ts_rcv += secfrac; + + /* the following need to be int, but are short, so we need to xlat */ + severity = pMsg->iSeverity; + facil = pMsg->iFacility; + + doc = bson_build(BSON_TYPE_STRING, "sys", sys, sys_len, + BSON_TYPE_UTC_DATETIME, "time", ts_gen, + BSON_TYPE_UTC_DATETIME, "time_rcvd", ts_rcv, + BSON_TYPE_STRING, "msg", msg, msg_len, + BSON_TYPE_INT32, "syslog_fac", facil, + BSON_TYPE_INT32, "syslog_sever", severity, + BSON_TYPE_STRING, "syslog_tag", tag, tag_len, + BSON_TYPE_STRING, "procid", procid, procid_len, + BSON_TYPE_STRING, "pid", pid, pid_len, + BSON_TYPE_STRING, "level", getLumberjackLevel(pMsg->iSeverity), -1, + BSON_TYPE_NONE); + + if(procid_free) free(procid); + if(tag_free) free(tag); + if(pid_free) free(pid); + if(sys_free) free(sys); + if(msg_free) free(msg); + + if(doc == NULL) { + reportMongoError(pData); + dbgprintf("ommongodb: error creating BSON doc\n"); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + bson_finish(doc); + if(!mongo_sync_cmd_insert(pData->conn, (char*)pData->dbNcoll, doc, NULL)) { + reportMongoError(pData); + dbgprintf("ommongodb: insert error\n"); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + +finalize_it: + if(doc != NULL) + bson_free(doc); + RETiRet; } BEGINtryResume @@ -220,53 +303,117 @@ ENDtryResume BEGINdoAction CODESTARTdoAction - iRet = writeMongoDB(ppString[0], pData); + if(pData->tplName == NULL) { + iRet = writeMongoDB_msg((msg_t*)ppString[0], pData); + } ENDdoAction + +static inline void +setInstParamDefaults(instanceData *pData) +{ + pData->server = NULL; + pData->port = 27017; + pData->db = NULL; + pData->collection= NULL; + pData->uid = NULL; + pData->pwd = NULL; + pData->tplName = NULL; +} + +BEGINnewActInst + struct cnfparamvals *pvals; + int i; + unsigned lendb, lencoll; +CODESTARTnewActInst + if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + CHKiRet(createInstance(&pData)); + setInstParamDefaults(pData); + + CODE_STD_STRING_REQUESTparseSelectorAct(1) + for(i = 0 ; i < actpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(actpblk.descr[i].name, "server")) { + pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "serverport")) { + pData->port = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "db")) { + pData->db = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "collection")) { + pData->collection = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "uid")) { + pData->uid = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "pwd")) { + pData->pwd = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "template")) { + pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else { + dbgprintf("ommongodb: program error, non-handled " + "param '%s'\n", actpblk.descr[i].name); + } + } + + if(pData->tplName == NULL) { + CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG)); + } else { + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "ommongodb: templates are not supported in this version"); + ABORT_FINALIZE(RS_RET_ERR); + CHKiRet(OMSRsetEntry(*ppOMSR, 0, + (uchar*) strdup((char*) pData->tplName), + OMSR_TPL_AS_ARRAY)); + } + + if(pData->db == NULL) + pData->db = (uchar*)strdup("syslog"); + if(pData->collection == NULL) + pData->collection = (uchar*)strdup("log"); + + /* we now create a db+collection string as we need to pass this + * into the API and we do not want to generate it each time ;) + * +2 ==> dot as delimiter and \0 + */ + lendb = strlen((char*)pData->db); + lencoll = strlen((char*)pData->collection); + CHKmalloc(pData->dbNcoll = malloc(lendb+lencoll+2)); + memcpy(pData->dbNcoll, pData->db, lendb); + pData->dbNcoll[lendb] = '.'; + /* lencoll+1 => copy \0! */ + memcpy(pData->dbNcoll+lendb+1, pData->collection, lencoll+1); + +CODE_STD_FINALIZERnewActInst + cnfparamvalsDestruct(pvals, &actpblk); +ENDnewActInst + + BEGINparseSelectorAct - //int iMongoDBPropErr = 0; CODESTARTparseSelectorAct CODE_STD_STRING_REQUESTparseSelectorAct(1) - if(!strncmp((char*) p, ":ommongodb:", sizeof(":ommongodb:") - 1)) { - p += sizeof(":ommongodb:") - 1; /* eat indicator sequence (-1 because of '\0'!) */ - } else { - ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "ommongodb supports only v6 config format, use: " + "action(type=\"ommongodb\" server=...)"); } - - CHKiRet(createInstance(&pData)); - - if(getSubString(&p, pData->opts->host, MAXHOSTNAMELEN+1, ',')) - strcpy(pData->opts->host,DEFAULT_SERVER); - - //we must define the max db name - if(getSubString(&p,pData->db,255,',')) - strcpy(pData->db,DEFAULT_DATABASE); - if(getSubString(&p,pData->collection,255,';')) - strcpy(pData->collection,DEFAULT_COLLECTION); - if(*(p-1) == ';') - --p; - - - CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_TPL_AS_ARRAY, (uchar*) " StdMongoDBFmt")); - - - pData->opts->port = (unsigned) iSrvPort; /* set configured port */ - sprintf(pData->dbcollection,"%s.%s",pData->db,pData->collection); - CHKiRet(initMongoDB(pData, 0)); - + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct BEGINmodExit CODESTARTmodExit + objRelease(errmsg, CORE_COMPONENT); + objRelease(datetime, CORE_COMPONENT); ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES ENDqueryEtryPt BEGINmodInit() @@ -274,7 +421,8 @@ CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(datetime, CORE_COMPONENT)); INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING); - DBGPRINTF("ompgsql: module compiled with rsyslog version %s.\n", VERSION); - DBGPRINTF("ompgsql: %susing transactional output interface.\n", bCoreSupportsBatching ? "" : "not "); -ENDmodInit
\ No newline at end of file + DBGPRINTF("ommongodb: module compiled with rsyslog version %s.\n", VERSION); + //DBGPRINTF("ommongodb: %susing transactional output interface.\n", bCoreSupportsBatching ? "" : "not "); +ENDmodInit |