diff options
| -rw-r--r-- | openstack/common/rpc/__init__.py | 11 | ||||
| -rw-r--r-- | openstack/common/rpc/amqp.py | 8 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_kombu.py | 13 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_qpid.py | 17 | ||||
| -rw-r--r-- | tests/unit/rpc/test_qpid.py | 8 |
5 files changed, 34 insertions, 23 deletions
diff --git a/openstack/common/rpc/__init__.py b/openstack/common/rpc/__init__.py index 5b1f8d1..da45d5a 100644 --- a/openstack/common/rpc/__init__.py +++ b/openstack/common/rpc/__init__.py @@ -53,12 +53,17 @@ rpc_opts = [ ], help='Modules of exceptions that are permitted to be recreated' 'upon receiving exception data from an rpc call.'), - cfg.StrOpt('control_exchange', - default='nova', - help='AMQP exchange to connect to if using RabbitMQ or Qpid'), cfg.BoolOpt('fake_rabbit', default=False, help='If passed, use a fake RabbitMQ provider'), + # + # The following options are not registered here, but are expected to be + # present. The project using this library must register these options with + # the configuration so that project-specific defaults may be defined. + # + #cfg.StrOpt('control_exchange', + # default='nova', + # help='AMQP exchange to connect to if using RabbitMQ or Qpid'), ] cfg.CONF.register_opts(rpc_opts) diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index 10b3869..79eda39 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -34,6 +34,7 @@ from eventlet import greenpool from eventlet import pools from eventlet import semaphore +from openstack.common import cfg from openstack.common import excutils from openstack.common.gettextutils import _ from openstack.common import local @@ -416,3 +417,10 @@ def notify(conf, context, topic, msg, connection_pool): def cleanup(connection_pool): if connection_pool: connection_pool.empty() + + +def get_control_exchange(conf): + try: + return conf.control_exchange + except cfg.NoSuchOptError: + return 'openstack' diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 66cf0a9..294fc0a 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -210,10 +210,10 @@ class TopicConsumer(ConsumerBase): 'auto_delete': False, 'exclusive': False} options.update(kwargs) - exchange = kombu.entity.Exchange(name=conf.control_exchange, - type='topic', - durable=options['durable'], - auto_delete=options['auto_delete']) + exchange = kombu.entity.Exchange( + name=rpc_amqp.get_control_exchange(conf), + type='topic', durable=options['durable'], + auto_delete=options['auto_delete']) super(TopicConsumer, self).__init__(channel, callback, tag, @@ -307,8 +307,9 @@ class TopicPublisher(Publisher): 'auto_delete': False, 'exclusive': False} options.update(kwargs) - super(TopicPublisher, self).__init__(channel, conf.control_exchange, - topic, type='topic', **options) + super(TopicPublisher, self).__init__(channel, + rpc_amqp.get_control_exchange(conf), topic, + type='topic', **options) class FanoutPublisher(Publisher): diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 992790a..93d771c 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -181,9 +181,8 @@ class TopicConsumer(ConsumerBase): """ super(TopicConsumer, self).__init__(session, callback, - "%s/%s" % (conf.control_exchange, - topic), - {}, name or topic, {}) + "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic), + {}, name or topic, {}) class FanoutConsumer(ConsumerBase): @@ -256,9 +255,8 @@ class TopicPublisher(Publisher): def __init__(self, conf, session, topic): """init a 'topic' publisher. """ - super(TopicPublisher, self).__init__( - session, - "%s/%s" % (conf.control_exchange, topic)) + super(TopicPublisher, self).__init__(session, + "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic)) class FanoutPublisher(Publisher): @@ -276,10 +274,9 @@ class NotifyPublisher(Publisher): def __init__(self, conf, session, topic): """init a 'topic' publisher. """ - super(NotifyPublisher, self).__init__( - session, - "%s/%s" % (conf.control_exchange, topic), - {"durable": True}) + super(NotifyPublisher, self).__init__(session, + "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic), + {"durable": True}) class Connection(object): diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py index 7cff645..4e0cc29 100644 --- a/tests/unit/rpc/test_qpid.py +++ b/tests/unit/rpc/test_qpid.py @@ -133,7 +133,7 @@ class RpcQpidTestCase(unittest.TestCase): '"name": "impl_qpid_test_fanout_.*"}}$') else: expected_address = ( - 'nova/impl_qpid_test ; {"node": {"x-declare": ' + 'openstack/impl_qpid_test ; {"node": {"x-declare": ' '{"auto-delete": true, "durable": true}, "type": "topic"}, ' '"create": "always", "link": {"x-declare": {"auto-delete": ' 'true, "exclusive": false, "durable": false}, "durable": ' @@ -169,7 +169,7 @@ class RpcQpidTestCase(unittest.TestCase): self.mock_connection.open() self.mock_connection.session().AndReturn(self.mock_session) expected_address = ( - 'nova/impl_qpid_test ; {"node": {"x-declare": ' + 'openstack/impl_qpid_test ; {"node": {"x-declare": ' '{"auto-delete": true, "durable": true}, "type": "topic"}, ' '"create": "always", "link": {"x-declare": {"auto-delete": ' 'true, "exclusive": false, "durable": false}, "durable": ' @@ -205,7 +205,7 @@ class RpcQpidTestCase(unittest.TestCase): '"type": "topic"}, "create": "always"}') else: expected_address = ( - 'nova/impl_qpid_test ; {"node": {"x-declare": ' + 'openstack/impl_qpid_test ; {"node": {"x-declare": ' '{"auto-delete": true, "durable": false}, "type": "topic"}, ' '"create": "always"}') self.mock_session.sender(expected_address).AndReturn(self.mock_sender) @@ -303,7 +303,7 @@ class RpcQpidTestCase(unittest.TestCase): self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver) self.mock_receiver.capacity = 1 send_addr = ( - 'nova/impl_qpid_test ; {"node": {"x-declare": ' + 'openstack/impl_qpid_test ; {"node": {"x-declare": ' '{"auto-delete": true, "durable": false}, "type": "topic"}, ' '"create": "always"}') self.mock_session.sender(send_addr).AndReturn(self.mock_sender) |
