diff options
| author | David Sommerseth <davids@redhat.com> | 2012-09-16 19:38:05 +0200 |
|---|---|---|
| committer | David Sommerseth <davids@redhat.com> | 2012-09-16 19:38:05 +0200 |
| commit | d54efa2866b589313d45b92a020974698874161b (patch) | |
| tree | 9b7e75fe8d10cf76bae3f6c70d230e863ade4bdf | |
| parent | dbaf8d9df54127688b282740aac9ee0d5cf5efbc (diff) | |
| download | logactio-d54efa2866b589313d45b92a020974698874161b.tar.gz logactio-d54efa2866b589313d45b92a020974698874161b.tar.xz logactio-d54efa2866b589313d45b92a020974698874161b.zip | |
Added a Qpid reporter module
This allows alerts to be sent to a Qpid based AMQP broker. A
simple alert consumer has been added as well.
Signed-off-by: David Sommerseth <davids@redhat.com>
| -rw-r--r-- | LogActio/Reporters/QpidReporter.py | 123 | ||||
| -rw-r--r-- | examples/qpid-alert-watcher | 98 |
2 files changed, 221 insertions, 0 deletions
diff --git a/LogActio/Reporters/QpidReporter.py b/LogActio/Reporters/QpidReporter.py new file mode 100644 index 0000000..82c4b50 --- /dev/null +++ b/LogActio/Reporters/QpidReporter.py @@ -0,0 +1,123 @@ +# +# logactio - simple framework for doing configured action on certain +# log file events +# +# Copyright 2012 David Sommerseth <dazo@users.sourceforge.net> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# For the avoidance of doubt the "preferred form" of this code is one which +# is in an open unpatent encumbered format. Where cryptographic key signing +# forms part of the process of creating an executable the information +# including keys needed to generate an equivalently functional executable +# are deemed to be part of the source code. +# + +import sys, json +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty +import LogActio.Message, LogActio.ReporterQueue + + +class QpidReporter(LogActio.ReporterQueue.ReporterQueue): + """Simple LogActio reporter module, sending alerts to a Qpid broker + + Example configuration to be used in /etc/logactio.cfg + + [Reporter:QPID] + module: QpidReporter + broker: qpid.example.com + port: 5672 + routing_key: LogActio.Alerts + exchange: amq.topic + + This will send reports to the configured Qpid broker queue + """ + + def __init__(self, config, logger = None): + if not config.has_key("broker"): + raise Exception("QpidReporter is not configured with a broker host/address") + if not config.has_key("routing_key"): + raise Exception("QpidReporter is not configured with a Qpid routing key") + if not config.has_key("exchange"): + raise Exception("QpidReporter is not configured with a Qpid exchange") + + self.__log = logger and logger or self.__logfnc + self.__broker = config["broker"] + self.__port = config.has_key("port") and config["port"] or 5672 + self.__routkey = config["routing_key"] + self.__exchange = config["exchange"] + + LogActio.ReporterQueue.ReporterQueue.__init__(self, + "QpidReporter", + "Qpid Reporter", + self.__processqueue) + + def __logfnc(self, lvl, msg): + print "%s" % msg + sys.stdout.flush() + + + def __processqueue(self): + # Connect to the Qpid broker + qpidsock = connect(self.__broker, self.__port) + qpidconn = Connection(sock=qpidsock, + service="qpidd", reconnect=True, heartbeat=30) + qpidconn.start() + qpidsess = qpidconn.session(str(uuid4())) + qmsgprops = qpidsess.delivery_properties(routing_key=self.__routkey) + + self.__log(1, "[QpidReporter] Connection to %s:%i opened" % ( + self.__broker, self.__port)) + + # Process the internal message queue + done = False + mcount = 0 + uuid = str(uuid4()) + while not done: + msg = self._QueueGet() + + if( msg.MessageType() == LogActio.Message.MSG_SHUTDOWN ): + # Prepare for shutdown + done = True + + elif( msg.MessageType() == LogActio.Message.MSG_SEND ): + m = msg.Message() + + try: + qm = Message(qmsgprops, m) + qpidsess.message_transfer(self.__exchange, message=qm) + mcount += 1 + del qm + except Exception, e: + self.__log(0, "** ERROR ** QpidReporter failed: %s" % str(e)) + + qpidsess.close(timeout=30) + self.__log(1, "[QpidReporter] Connection to %s:%i closed" % ( + self.__broker, self.__port)) + + + def ProcessEvent(self, logfile, prefix, msg, count, threshold): + # Format the report message + msg = {"prefix": prefix, "count": count, "threshold": threshold, + "message": msg, "logfile": logfile} + + # Queue the message for sending + self._QueueMsg(0, json.dumps(msg)) + + +def InitReporter(config, logger = None): + return QpidReporter(config, logger) diff --git a/examples/qpid-alert-watcher b/examples/qpid-alert-watcher new file mode 100644 index 0000000..5e40ae7 --- /dev/null +++ b/examples/qpid-alert-watcher @@ -0,0 +1,98 @@ +#!/usr/bin/env python2 +# +# logactio - simple framework for doing configured action on certain +# log file events +# +# Copyright 2012 David Sommerseth <dazo@users.sourceforge.net> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# For the avoidance of doubt the "preferred form" of this code is one which +# is in an open unpatent encumbered format. Where cryptographic key signing +# forms part of the process of creating an executable the information +# including keys needed to generate an equivalently functional executable +# are deemed to be part of the source code. +# +# +# This is a simple Qpid consumer script, which reports all alerts to stdout +# + +import sys, json, socket +from optparse import OptionParser +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, RangedSet, uuid4 +from qpid.queue import Empty + +if __name__ == "__main__": + parser = OptionParser() + parser.add_option("--broker", action="store", dest="broker", + default="localhost", metavar="HOSTNAME", + help="Qpid broker to connect to") + parser.add_option("--port", action="store", dest="port", + default=5672, metavar="PORT", + help="Qpid broker port") + parser.add_option("--exchange", action="store", dest="exchange", + metavar="EXCHANGE", + help="Qpid exchange to pull messages from") + parser.add_option("--bind-key", action="store", dest="bindkey", + metavar="BIND-KEY", + help="Routing key to bind against") + (opts, args) = parser.parse_args() + + if opts.exchange is None or opts.bindkey is None: + print "** ERROR ** Missing --exchange or --bind-key" + sys.exit(1) + + qpidsock = connect(opts.broker, opts.port) + connection = Connection (sock=qpidsock, host=opts.broker, service="qpidd") + connection.start() + session = connection.session(str(uuid4())) + + session.queue_declare(queue="qpidalerter", exclusive=True) + session.exchange_bind(exchange=opts.exchange, queue="qpidalerter", + binding_key=opts.bindkey) + local_queue = session.incoming("local_qpidalerter") + session.message_subscribe(queue="qpidalerter", destination="local_qpidalerter") + local_queue.start() + + print "Started qpid-alert-watcher\n" + while True: + try: + message = local_queue.get() + alert = json.loads(message.body) + session.message_accept(RangedSet(message.id)) + print "== LogActio Alert ===============================================" + print "Alert prefix: %-20s [Count %i, threshold %i]" % ( + alert["prefix"], alert["count"], alert["threshold"]) + print "Logfile: %s" % alert["logfile"] + if alert["message"] is not None: + almsg = "\n ".join(alert["message"].split("|")) + else: + almsg = "(No message)" + print "Message: %s" % almsg + print "-----------------------------------------------------------------" + + except Empty: + print ">> (Empty message received)" + + except socket.gaierror, e: + print "** WARNING ** Socket error - %s" % str(e) + print " Trying again in 10 seconds" + time.sleep(10) + + except socket.error, e: + print "** WARNING ** Socket error - %s" % str(e) + print " Trying again in 20 seconds" + time.sleep(20) |
