summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Sommerseth <davids@redhat.com>2012-09-16 19:38:05 +0200
committerDavid Sommerseth <davids@redhat.com>2012-09-16 19:38:05 +0200
commitd54efa2866b589313d45b92a020974698874161b (patch)
tree9b7e75fe8d10cf76bae3f6c70d230e863ade4bdf
parentdbaf8d9df54127688b282740aac9ee0d5cf5efbc (diff)
downloadlogactio-d54efa2866b589313d45b92a020974698874161b.tar.gz
logactio-d54efa2866b589313d45b92a020974698874161b.tar.xz
logactio-d54efa2866b589313d45b92a020974698874161b.zip
Added a Qpid reporter module
This allows alerts to be sent to a Qpid based AMQP broker. A simple alert consumer has been added as well. Signed-off-by: David Sommerseth <davids@redhat.com>
-rw-r--r--LogActio/Reporters/QpidReporter.py123
-rw-r--r--examples/qpid-alert-watcher98
2 files changed, 221 insertions, 0 deletions
diff --git a/LogActio/Reporters/QpidReporter.py b/LogActio/Reporters/QpidReporter.py
new file mode 100644
index 0000000..82c4b50
--- /dev/null
+++ b/LogActio/Reporters/QpidReporter.py
@@ -0,0 +1,123 @@
+#
+# logactio - simple framework for doing configured action on certain
+# log file events
+#
+# Copyright 2012 David Sommerseth <dazo@users.sourceforge.net>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# For the avoidance of doubt the "preferred form" of this code is one which
+# is in an open unpatent encumbered format. Where cryptographic key signing
+# forms part of the process of creating an executable the information
+# including keys needed to generate an equivalently functional executable
+# are deemed to be part of the source code.
+#
+
+import sys, json
+from qpid.util import connect
+from qpid.connection import Connection
+from qpid.datatypes import Message, RangedSet, uuid4
+from qpid.queue import Empty
+import LogActio.Message, LogActio.ReporterQueue
+
+
+class QpidReporter(LogActio.ReporterQueue.ReporterQueue):
+ """Simple LogActio reporter module, sending alerts to a Qpid broker
+
+ Example configuration to be used in /etc/logactio.cfg
+
+ [Reporter:QPID]
+ module: QpidReporter
+ broker: qpid.example.com
+ port: 5672
+ routing_key: LogActio.Alerts
+ exchange: amq.topic
+
+ This will send reports to the configured Qpid broker queue
+ """
+
+ def __init__(self, config, logger = None):
+ if not config.has_key("broker"):
+ raise Exception("QpidReporter is not configured with a broker host/address")
+ if not config.has_key("routing_key"):
+ raise Exception("QpidReporter is not configured with a Qpid routing key")
+ if not config.has_key("exchange"):
+ raise Exception("QpidReporter is not configured with a Qpid exchange")
+
+ self.__log = logger and logger or self.__logfnc
+ self.__broker = config["broker"]
+ self.__port = config.has_key("port") and config["port"] or 5672
+ self.__routkey = config["routing_key"]
+ self.__exchange = config["exchange"]
+
+ LogActio.ReporterQueue.ReporterQueue.__init__(self,
+ "QpidReporter",
+ "Qpid Reporter",
+ self.__processqueue)
+
+ def __logfnc(self, lvl, msg):
+ print "%s" % msg
+ sys.stdout.flush()
+
+
+ def __processqueue(self):
+ # Connect to the Qpid broker
+ qpidsock = connect(self.__broker, self.__port)
+ qpidconn = Connection(sock=qpidsock,
+ service="qpidd", reconnect=True, heartbeat=30)
+ qpidconn.start()
+ qpidsess = qpidconn.session(str(uuid4()))
+ qmsgprops = qpidsess.delivery_properties(routing_key=self.__routkey)
+
+ self.__log(1, "[QpidReporter] Connection to %s:%i opened" % (
+ self.__broker, self.__port))
+
+ # Process the internal message queue
+ done = False
+ mcount = 0
+ uuid = str(uuid4())
+ while not done:
+ msg = self._QueueGet()
+
+ if( msg.MessageType() == LogActio.Message.MSG_SHUTDOWN ):
+ # Prepare for shutdown
+ done = True
+
+ elif( msg.MessageType() == LogActio.Message.MSG_SEND ):
+ m = msg.Message()
+
+ try:
+ qm = Message(qmsgprops, m)
+ qpidsess.message_transfer(self.__exchange, message=qm)
+ mcount += 1
+ del qm
+ except Exception, e:
+ self.__log(0, "** ERROR ** QpidReporter failed: %s" % str(e))
+
+ qpidsess.close(timeout=30)
+ self.__log(1, "[QpidReporter] Connection to %s:%i closed" % (
+ self.__broker, self.__port))
+
+
+ def ProcessEvent(self, logfile, prefix, msg, count, threshold):
+ # Format the report message
+ msg = {"prefix": prefix, "count": count, "threshold": threshold,
+ "message": msg, "logfile": logfile}
+
+ # Queue the message for sending
+ self._QueueMsg(0, json.dumps(msg))
+
+
+def InitReporter(config, logger = None):
+ return QpidReporter(config, logger)
diff --git a/examples/qpid-alert-watcher b/examples/qpid-alert-watcher
new file mode 100644
index 0000000..5e40ae7
--- /dev/null
+++ b/examples/qpid-alert-watcher
@@ -0,0 +1,98 @@
+#!/usr/bin/env python2
+#
+# logactio - simple framework for doing configured action on certain
+# log file events
+#
+# Copyright 2012 David Sommerseth <dazo@users.sourceforge.net>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# For the avoidance of doubt the "preferred form" of this code is one which
+# is in an open unpatent encumbered format. Where cryptographic key signing
+# forms part of the process of creating an executable the information
+# including keys needed to generate an equivalently functional executable
+# are deemed to be part of the source code.
+#
+#
+# This is a simple Qpid consumer script, which reports all alerts to stdout
+#
+
+import sys, json, socket
+from optparse import OptionParser
+from qpid.util import connect
+from qpid.connection import Connection
+from qpid.datatypes import Message, RangedSet, uuid4
+from qpid.queue import Empty
+
+if __name__ == "__main__":
+ parser = OptionParser()
+ parser.add_option("--broker", action="store", dest="broker",
+ default="localhost", metavar="HOSTNAME",
+ help="Qpid broker to connect to")
+ parser.add_option("--port", action="store", dest="port",
+ default=5672, metavar="PORT",
+ help="Qpid broker port")
+ parser.add_option("--exchange", action="store", dest="exchange",
+ metavar="EXCHANGE",
+ help="Qpid exchange to pull messages from")
+ parser.add_option("--bind-key", action="store", dest="bindkey",
+ metavar="BIND-KEY",
+ help="Routing key to bind against")
+ (opts, args) = parser.parse_args()
+
+ if opts.exchange is None or opts.bindkey is None:
+ print "** ERROR ** Missing --exchange or --bind-key"
+ sys.exit(1)
+
+ qpidsock = connect(opts.broker, opts.port)
+ connection = Connection (sock=qpidsock, host=opts.broker, service="qpidd")
+ connection.start()
+ session = connection.session(str(uuid4()))
+
+ session.queue_declare(queue="qpidalerter", exclusive=True)
+ session.exchange_bind(exchange=opts.exchange, queue="qpidalerter",
+ binding_key=opts.bindkey)
+ local_queue = session.incoming("local_qpidalerter")
+ session.message_subscribe(queue="qpidalerter", destination="local_qpidalerter")
+ local_queue.start()
+
+ print "Started qpid-alert-watcher\n"
+ while True:
+ try:
+ message = local_queue.get()
+ alert = json.loads(message.body)
+ session.message_accept(RangedSet(message.id))
+ print "== LogActio Alert ==============================================="
+ print "Alert prefix: %-20s [Count %i, threshold %i]" % (
+ alert["prefix"], alert["count"], alert["threshold"])
+ print "Logfile: %s" % alert["logfile"]
+ if alert["message"] is not None:
+ almsg = "\n ".join(alert["message"].split("|"))
+ else:
+ almsg = "(No message)"
+ print "Message: %s" % almsg
+ print "-----------------------------------------------------------------"
+
+ except Empty:
+ print ">> (Empty message received)"
+
+ except socket.gaierror, e:
+ print "** WARNING ** Socket error - %s" % str(e)
+ print " Trying again in 10 seconds"
+ time.sleep(10)
+
+ except socket.error, e:
+ print "** WARNING ** Socket error - %s" % str(e)
+ print " Trying again in 20 seconds"
+ time.sleep(20)