summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChris Behrens <cbehrens@codestud.com>2011-05-25 15:42:24 -0700
committertermie <github@anarkystic.com>2011-05-25 15:42:24 -0700
commit5f3adfc3110ed8095cdac43cc651aa46087c5490 (patch)
tree5f67761406d4c6f7b3a9cf1f9240503549672fb6
parent8f2557dcd3e3d88c0eabb63bcce90ced79347ae4 (diff)
Always create Service consumers no matter if report_interval is 0
Fix tests to handle how Service loads Consumers now
-rw-r--r--nova/service.py46
-rw-r--r--nova/tests/test_service.py57
2 files changed, 75 insertions, 28 deletions
diff --git a/nova/service.py b/nova/service.py
index c7e48544c..3a364b6c6 100644
--- a/nova/service.py
+++ b/nova/service.py
@@ -88,29 +88,31 @@ class Service(object):
if 'nova-compute' == self.binary:
self.manager.update_available_resource(ctxt)
- if self.report_interval:
- conn = rpc.Connection.instance(new=True)
-
- # Share this same connection for these Consumers
- consumer_all = rpc.TopicAdapterConsumer(
- connection=conn,
- topic=self.topic,
- proxy=self)
- consumer_node = rpc.TopicAdapterConsumer(
- connection=conn,
- topic='%s.%s' % (self.topic, self.host),
- proxy=self)
- fanout = rpc.FanoutAdapterConsumer(
- connection=conn,
- topic=self.topic,
- proxy=self)
-
- cset = rpc.ConsumerSet(conn, [consumer_all,
- consumer_node,
- fanout])
- # Wait forever, processing these consumers
- self.csetthread = greenthread.spawn(cset.wait)
+ conn = rpc.Connection.instance(new=True)
+ logging.debug("Creating Consumer connection for Service %s" % \
+ self.topic)
+
+ # Share this same connection for these Consumers
+ consumer_all = rpc.TopicAdapterConsumer(
+ connection=conn,
+ topic=self.topic,
+ proxy=self)
+ consumer_node = rpc.TopicAdapterConsumer(
+ connection=conn,
+ topic='%s.%s' % (self.topic, self.host),
+ proxy=self)
+ fanout = rpc.FanoutAdapterConsumer(
+ connection=conn,
+ topic=self.topic,
+ proxy=self)
+
+ cset = rpc.ConsumerSet(conn, [consumer_all,
+ consumer_node,
+ fanout])
+ # Wait forever, processing these consumers
+ self.csetthread = greenthread.spawn(cset.wait)
+ if self.report_interval:
pulse = utils.LoopingCall(self.report_state)
pulse.start(interval=self.report_interval, now=False)
self.timers.append(pulse)
diff --git a/nova/tests/test_service.py b/nova/tests/test_service.py
index d48de2057..0bba01d92 100644
--- a/nova/tests/test_service.py
+++ b/nova/tests/test_service.py
@@ -106,7 +106,10 @@ class ServiceTestCase(test.TestCase):
# NOTE(vish): Create was moved out of mox replay to make sure that
# the looping calls are created in StartService.
- app = service.Service.create(host=host, binary=binary)
+ app = service.Service.create(host=host, binary=binary, topic=topic)
+
+ self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
+ service.rpc.Connection.instance(new=mox.IgnoreArg())
self.mox.StubOutWithMock(rpc,
'TopicAdapterConsumer',
@@ -114,6 +117,11 @@ class ServiceTestCase(test.TestCase):
self.mox.StubOutWithMock(rpc,
'FanoutAdapterConsumer',
use_mock_anything=True)
+
+ self.mox.StubOutWithMock(rpc,
+ 'ConsumerSet',
+ use_mock_anything=True)
+
rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
topic=topic,
proxy=mox.IsA(service.Service)).AndReturn(
@@ -129,9 +137,13 @@ class ServiceTestCase(test.TestCase):
proxy=mox.IsA(service.Service)).AndReturn(
rpc.FanoutAdapterConsumer)
- rpc.TopicAdapterConsumer.attach_to_eventlet()
- rpc.TopicAdapterConsumer.attach_to_eventlet()
- rpc.FanoutAdapterConsumer.attach_to_eventlet()
+ def wait_func(self, limit=None):
+ return None
+
+ mock_cset = self.mox.CreateMock(rpc.ConsumerSet,
+ {'wait': wait_func})
+ rpc.ConsumerSet(mox.IgnoreArg(), mox.IsA(list)).AndReturn(mock_cset)
+ wait_func(mox.IgnoreArg())
service_create = {'host': host,
'binary': binary,
@@ -287,8 +299,41 @@ class ServiceTestCase(test.TestCase):
# Creating mocks
self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
service.rpc.Connection.instance(new=mox.IgnoreArg())
- service.rpc.Connection.instance(new=mox.IgnoreArg())
- service.rpc.Connection.instance(new=mox.IgnoreArg())
+
+ self.mox.StubOutWithMock(rpc,
+ 'TopicAdapterConsumer',
+ use_mock_anything=True)
+ self.mox.StubOutWithMock(rpc,
+ 'FanoutAdapterConsumer',
+ use_mock_anything=True)
+
+ self.mox.StubOutWithMock(rpc,
+ 'ConsumerSet',
+ use_mock_anything=True)
+
+ rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
+ topic=topic,
+ proxy=mox.IsA(service.Service)).AndReturn(
+ rpc.TopicAdapterConsumer)
+
+ rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
+ topic='%s.%s' % (topic, host),
+ proxy=mox.IsA(service.Service)).AndReturn(
+ rpc.TopicAdapterConsumer)
+
+ rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(),
+ topic=topic,
+ proxy=mox.IsA(service.Service)).AndReturn(
+ rpc.FanoutAdapterConsumer)
+
+ def wait_func(self, limit=None):
+ return None
+
+ mock_cset = self.mox.CreateMock(rpc.ConsumerSet,
+ {'wait': wait_func})
+ rpc.ConsumerSet(mox.IgnoreArg(), mox.IsA(list)).AndReturn(mock_cset)
+ wait_func(mox.IgnoreArg())
+
self.mox.StubOutWithMock(serv.manager.driver,
'update_available_resource')
serv.manager.driver.update_available_resource(mox.IgnoreArg(), host)