# # logactio - simple framework for doing configured action on certain # log file events # # Copyright 2012 - 2015 David Sommerseth # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation, version 2 # of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # # For the avoidance of doubt the "preferred form" of this code is one which # is in an open unpatent encumbered format. Where cryptographic key signing # forms part of the process of creating an executable the information # including keys needed to generate an equivalently functional executable # are deemed to be part of the source code. # import sys, json from qpid.util import connect from qpid.connection import Connection from qpid.datatypes import Message, RangedSet, uuid4 from qpid.queue import Empty import LogActio.Message, LogActio.ReporterQueue class QpidReporter(LogActio.ReporterQueue.ReporterQueue): """Simple LogActio reporter module, sending alerts to a Qpid broker Example configuration to be used in /etc/logactio.cfg [Reporter:QPID] module: QpidReporter broker: qpid.example.com port: 5672 routing_key: LogActio.Alerts exchange: amq.topic This will send reports to the configured Qpid broker queue """ def __init__(self, config, logger = None): if not config.has_key("broker"): raise Exception("QpidReporter is not configured with a broker host/address") if not config.has_key("routing_key"): raise Exception("QpidReporter is not configured with a Qpid routing key") if not config.has_key("exchange"): raise Exception("QpidReporter is not configured with a Qpid exchange") self.__log = logger and logger or self.__logfnc self.__broker = config["broker"] self.__port = config.has_key("port") and config["port"] or 5672 self.__routkey = config["routing_key"] self.__exchange = config["exchange"] LogActio.ReporterQueue.ReporterQueue.__init__(self, "QpidReporter", "Qpid Reporter", self.__processqueue) def __logfnc(self, lvl, msg): print "%s" % msg sys.stdout.flush() def __processqueue(self): # Connect to the Qpid broker qpidsock = connect(self.__broker, self.__port) qpidconn = Connection(sock=qpidsock, service="qpidd", reconnect=True, heartbeat=30) qpidconn.start() qpidsess = qpidconn.session(str(uuid4())) qmsgprops = qpidsess.delivery_properties(routing_key=self.__routkey) self.__log(1, "[QpidReporter] Connection to %s:%i opened" % ( self.__broker, self.__port)) # Process the internal message queue done = False mcount = 0 uuid = str(uuid4()) while not done: msg = self._QueueGet() if( msg.MessageType() == LogActio.Message.MSG_SHUTDOWN ): # Prepare for shutdown done = True elif( msg.MessageType() == LogActio.Message.MSG_SEND ): m = msg.Message() try: qm = Message(qmsgprops, m) qpidsess.message_transfer(self.__exchange, message=qm) mcount += 1 del qm except Exception, e: self.__log(0, "** ERROR ** QpidReporter failed: %s" % str(e)) qpidsess.close(timeout=30) self.__log(1, "[QpidReporter] Connection to %s:%i closed" % ( self.__broker, self.__port)) def ProcessEvent(self, logfile, prefix, msg, count, threshold): # Format the report message msg = {"prefix": prefix, "count": count, "threshold": threshold, "message": msg, "logfile": logfile} # Queue the message for sending self._QueueMsg(0, json.dumps(msg)) def InitReporter(config, logger = None): return QpidReporter(config, logger)