summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSandy Walsh <sandy.walsh@rackspace.com>2011-03-03 16:28:04 -0400
committerSandy Walsh <sandy.walsh@rackspace.com>2011-03-03 16:28:04 -0400
commit2a6ce075e19af5700960e3fb22c213e43a2e24b4 (patch)
tree98798b930a626493d7841e763100eb8505b49097
parentc297880fb7c007aa4f6b6a9f7b985ecc981d8fe2 (diff)
downloadnova-2a6ce075e19af5700960e3fb22c213e43a2e24b4.tar.gz
nova-2a6ce075e19af5700960e3fb22c213e43a2e24b4.tar.xz
nova-2a6ce075e19af5700960e3fb22c213e43a2e24b4.zip
start of fanout
-rw-r--r--nova/rpc.py20
-rw-r--r--nova/scheduler/manager.py4
-rw-r--r--nova/scheduler/zone_manager.py4
-rw-r--r--nova/service.py6
4 files changed, 34 insertions, 0 deletions
diff --git a/nova/rpc.py b/nova/rpc.py
index 8fe4565dd..a02cdc90c 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -218,6 +218,16 @@ class TopicPublisher(Publisher):
super(TopicPublisher, self).__init__(connection=connection)
+class FanoutPublisher(Publisher):
+ """Publishes messages to a fanout exchange."""
+ exchange_type = "fanout"
+
+ def __init__(self, topic, connection=None):
+ self.exchange = "%s_fanout" % topic
+ self.durable = False
+ super(FanoutPublisher, self).__init__(connection=connection)
+
+
class DirectConsumer(Consumer):
"""Consumes messages directly on a channel specified by msg_id"""
exchange_type = "direct"
@@ -360,6 +370,16 @@ def cast(context, topic, msg):
publisher.close()
+def fanout_cast(context, topic, msg):
+ """Sends a message on a fanout exchange without waiting for a response"""
+ LOG.debug(_("Making asynchronous fanout cast..."))
+ _pack_context(msg, context)
+ conn = Connection.instance()
+ publisher = FanoutPublisher(topic, connection=conn)
+ publisher.send(msg)
+ publisher.close()
+
+
def generic_response(message_data, message):
"""Logs a result and exits"""
LOG.debug(_('response %s'), message_data)
diff --git a/nova/scheduler/manager.py b/nova/scheduler/manager.py
index c94397210..7541523b0 100644
--- a/nova/scheduler/manager.py
+++ b/nova/scheduler/manager.py
@@ -59,6 +59,10 @@ class SchedulerManager(manager.Manager):
"""Get a list of zones from the ZoneManager."""
return self.zone_manager.get_zone_list()
+ def update_compute_capabilities(self, context=None):
+ """Process a compute node update."""
+ return self.zone_manager.update_compute_capabilities()
+
def _schedule(self, method, context, topic, *args, **kwargs):
"""Tries to call schedule_* method on the driver to retrieve host.
diff --git a/nova/scheduler/zone_manager.py b/nova/scheduler/zone_manager.py
index edf9000cc..eedc5c235 100644
--- a/nova/scheduler/zone_manager.py
+++ b/nova/scheduler/zone_manager.py
@@ -105,6 +105,7 @@ class ZoneManager(object):
def __init__(self):
self.last_zone_db_check = datetime.min
self.zone_states = {}
+ self.compute_states = {}
self.green_pool = greenpool.GreenPool()
def get_zone_list(self):
@@ -141,3 +142,6 @@ class ZoneManager(object):
self.last_zone_db_check = datetime.now()
self._refresh_from_db(context)
self._poll_zones(context)
+
+ def update_compute_capabilities(self):
+ logging.debug(_("****** UPDATE COMPUTE CAPABILITIES *******"))
diff --git a/nova/service.py b/nova/service.py
index f47358089..3ecf46525 100644
--- a/nova/service.py
+++ b/nova/service.py
@@ -84,6 +84,7 @@ class Service(object):
conn1 = rpc.Connection.instance(new=True)
conn2 = rpc.Connection.instance(new=True)
+ conn3 = rpc.Connection.instance(new=True)
if self.report_interval:
consumer_all = rpc.AdapterConsumer(
connection=conn1,
@@ -93,9 +94,14 @@ class Service(object):
connection=conn2,
topic='%s.%s' % (self.topic, self.host),
proxy=self)
+ fanout = rpc.AdapterConsumer(
+ connection=conn2,
+ topic='%s_fanout' % self.topic,
+ proxy=self)
self.timers.append(consumer_all.attach_to_eventlet())
self.timers.append(consumer_node.attach_to_eventlet())
+ self.timers.append(fanout.attach_to_eventlet())
pulse = utils.LoopingCall(self.report_state)
pulse.start(interval=self.report_interval, now=False)