/* omhiredis.c * Copyright 2012 Talksum, Inc * * This program is free software: you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License * as published by the Free Software Foundation, either version 3 of * the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this program. If not, see * . * * Author: Brian Knox * */ #include "config.h" #include #include #include #include #include #include #include #include #include #include "rsyslog.h" #include "conf.h" #include "syslogd-types.h" #include "srUtils.h" #include "template.h" #include "module-template.h" #include "errmsg.h" #include "cfsysline.h" MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP MODULE_CNFNAME("omhiredis") /* internal structures */ DEF_OMOD_STATIC_DATA DEFobjCurrIf(errmsg) typedef struct _instanceData { redisContext *conn; uchar *server; int port; uchar *tplName; } instanceData; static struct cnfparamdescr actpdescr[] = { { "server", eCmdHdlrGetWord, 0 }, { "serverport", eCmdHdlrInt, 0 }, { "template", eCmdHdlrGetWord, 1 } }; static struct cnfparamblk actpblk = { CNFPARAMBLK_VERSION, sizeof(actpdescr)/sizeof(struct cnfparamdescr), actpdescr }; BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) iRet = RS_RET_OK; ENDisCompatibleWithFeature static void closeHiredis(instanceData *pData) { if(pData->conn != NULL) { redisFree(pData->conn); pData->conn = NULL; } } BEGINfreeInstance CODESTARTfreeInstance closeHiredis(pData); free(pData->server); free(pData->tplName); ENDfreeInstance BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo /* nothing special here */ ENDdbgPrintInstInfo static rsRetVal initHiredis(instanceData *pData, int bSilent) { char *server; DEFiRet; server = (pData->server == NULL) ? "127.0.0.1" : (char*) pData->server; DBGPRINTF("omhiredis: trying connect to '%s' at port %d\n", server, pData->port); struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */ pData->conn = redisConnectWithTimeout(server, pData->port, timeout); if (pData->conn->err) { if(!bSilent) errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize redis handle"); ABORT_FINALIZE(RS_RET_SUSPENDED); } finalize_it: RETiRet; } rsRetVal writeHiredis(uchar *message, instanceData *pData) { redisReply *reply; DEFiRet; if(pData->conn == NULL) CHKiRet(initHiredis(pData, 0)); reply = redisCommand(pData->conn, (char*)message); if (reply->type == REDIS_REPLY_ERROR) { errmsg.LogError(0, NO_ERRCODE, "omhiredis: %s", reply->str); dbgprintf("omhiredis: %s\n", reply->str); freeReplyObject(reply); ABORT_FINALIZE(RS_RET_ERR); } else { freeReplyObject(reply); } finalize_it: RETiRet; } BEGINtryResume CODESTARTtryResume if(pData->conn == NULL) iRet = initHiredis(pData, 0); ENDtryResume BEGINdoAction CODESTARTdoAction iRet = writeHiredis(ppString[0], pData); ENDdoAction static inline void setInstParamDefaults(instanceData *pData) { pData->server = NULL; pData->port = 6379; pData->tplName = NULL; } BEGINnewActInst struct cnfparamvals *pvals; int i; CODESTARTnewActInst if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); CHKiRet(createInstance(&pData)); setInstParamDefaults(pData); CODE_STD_STRING_REQUESTparseSelectorAct(1) for(i = 0 ; i < actpblk.nParams ; ++i) { if(!pvals[i].bUsed) continue; if(!strcmp(actpblk.descr[i].name, "server")) { pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "serverport")) { pData->port = (int) pvals[i].val.d.n, NULL; } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else { dbgprintf("omhiredis: program error, non-handled " "param '%s'\n", actpblk.descr[i].name); } } if(pData->tplName == NULL) { dbgprintf("omhiredis: action requires a template name"); ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); } /* template string 0 is just a regular string */ OMSRsetEntry(*ppOMSR, 0,(uchar*)pData->tplName, OMSR_NO_RQD_TPL_OPTS); CODE_STD_FINALIZERnewActInst cnfparamvalsDestruct(pvals, &actpblk); ENDnewActInst BEGINparseSelectorAct CODESTARTparseSelectorAct /* tell the engine we only want one template string */ CODE_STD_STRING_REQUESTparseSelectorAct(1) if(!strncmp((char*) p, ":omhiredis:", sizeof(":omhiredis:") - 1)) errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, "omhiredis supports only v6 config format, use: " "action(type=\"omhiredis\" server=...)"); ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct BEGINmodExit CODESTARTmodExit ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES ENDqueryEtryPt BEGINmodInit() CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; /* only supports rsyslog 6 configs */ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING); DBGPRINTF("omhiredis: module compiled with rsyslog version %s.\n", VERSION); ENDmodInit