summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosh Kearney <josh@jk0.org>2011-08-15 12:18:22 -0500
committerJosh Kearney <josh@jk0.org>2011-08-15 12:18:22 -0500
commit2c6022a90011bba4618ac6dde06969bea180a2f3 (patch)
tree975102773df7f0ff3b5509987273acaefc5d0dd1
parent1a2bc77871af060069ba0de80637198be78f8169 (diff)
parent2ed3b12cc8da82304cef88dde64631b6348ee60e (diff)
Merged trunk.
-rwxr-xr-xbin/clear_rabbit_queues73
-rw-r--r--nova/flags.py1
-rw-r--r--nova/rpc/amqp.py8
3 files changed, 79 insertions, 3 deletions
diff --git a/bin/clear_rabbit_queues b/bin/clear_rabbit_queues
new file mode 100755
index 000000000..7a000e5d8
--- /dev/null
+++ b/bin/clear_rabbit_queues
@@ -0,0 +1,73 @@
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2011 Openstack, LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Admin/debug script to wipe rabbitMQ (AMQP) queues nova uses.
+ This can be used if you need to change durable options on queues,
+ or to wipe all messages in the queue system if things are in a
+ serious bad way.
+
+"""
+
+import datetime
+import gettext
+import os
+import sys
+import time
+
+# If ../nova/__init__.py exists, add ../ to Python search path, so that
+# it will override what happens to be installed in /usr/(local/)lib/python...
+POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
+ os.pardir,
+ os.pardir))
+if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'nova', '__init__.py')):
+ sys.path.insert(0, POSSIBLE_TOPDIR)
+
+gettext.install('nova', unicode=1)
+
+
+from nova import context
+from nova import exception
+from nova import flags
+from nova import log as logging
+from nova import rpc
+from nova import utils
+
+
+FLAGS = flags.FLAGS
+flags.DEFINE_boolean('delete_exchange', False, 'delete nova exchange too.')
+
+
+def delete_exchange(exch):
+ conn = rpc.create_connection()
+ x = conn.get_channel()
+ x.exchange_delete(exch)
+
+
+def delete_queues(queues):
+ conn = rpc.create_connection()
+ x = conn.get_channel()
+ for q in queues:
+ x.queue_delete(q)
+
+if __name__ == '__main__':
+ utils.default_flagfile()
+ args = flags.FLAGS(sys.argv)
+ logging.setup()
+ delete_queues(args[1:])
+ if FLAGS.delete_exchange:
+ delete_exchange(FLAGS.control_exchange)
diff --git a/nova/flags.py b/nova/flags.py
index e994a1665..48d5e8168 100644
--- a/nova/flags.py
+++ b/nova/flags.py
@@ -305,6 +305,7 @@ DEFINE_string('rabbit_virtual_host', '/', 'rabbit virtual host')
DEFINE_integer('rabbit_retry_interval', 10, 'rabbit connection retry interval')
DEFINE_integer('rabbit_max_retries', 12, 'rabbit connection attempts')
DEFINE_string('control_exchange', 'nova', 'the main exchange to connect to')
+DEFINE_boolean('rabbit_durable_queues', False, 'use durable queues')
DEFINE_list('enabled_apis', ['ec2', 'osapi'],
'list of APIs to enable by default')
DEFINE_string('ec2_host', '$my_ip', 'ip of api server')
diff --git a/nova/rpc/amqp.py b/nova/rpc/amqp.py
index 61555795a..fe429b266 100644
--- a/nova/rpc/amqp.py
+++ b/nova/rpc/amqp.py
@@ -257,7 +257,7 @@ class TopicAdapterConsumer(AdapterConsumer):
self.queue = topic
self.routing_key = topic
self.exchange = FLAGS.control_exchange
- self.durable = False
+ self.durable = FLAGS.rabbit_durable_queues
super(TopicAdapterConsumer, self).__init__(connection=connection,
topic=topic, proxy=proxy)
@@ -345,7 +345,7 @@ class TopicPublisher(Publisher):
def __init__(self, connection=None, topic='broadcast'):
self.routing_key = topic
self.exchange = FLAGS.control_exchange
- self.durable = False
+ self.durable = FLAGS.rabbit_durable_queues
super(TopicPublisher, self).__init__(connection=connection)
@@ -373,6 +373,7 @@ class DirectConsumer(Consumer):
self.queue = msg_id
self.routing_key = msg_id
self.exchange = msg_id
+ self.durable = False
self.auto_delete = True
self.exclusive = True
super(DirectConsumer, self).__init__(connection=connection)
@@ -386,6 +387,7 @@ class DirectPublisher(Publisher):
def __init__(self, connection=None, msg_id=None):
self.routing_key = msg_id
self.exchange = msg_id
+ self.durable = False
self.auto_delete = True
super(DirectPublisher, self).__init__(connection=connection)
@@ -573,7 +575,7 @@ def send_message(topic, message, wait=True):
publisher = messaging.Publisher(connection=Connection.instance(),
exchange=FLAGS.control_exchange,
- durable=False,
+ durable=FLAGS.rabbit_durable_queues,
exchange_type='topic',
routing_key=topic)
publisher.send(message)