diff options
author | Sandy Walsh <sandy.walsh@rackspace.com> | 2011-03-03 16:28:04 -0400 |
---|---|---|
committer | Sandy Walsh <sandy.walsh@rackspace.com> | 2011-03-03 16:28:04 -0400 |
commit | 2a6ce075e19af5700960e3fb22c213e43a2e24b4 (patch) | |
tree | 98798b930a626493d7841e763100eb8505b49097 | |
parent | c297880fb7c007aa4f6b6a9f7b985ecc981d8fe2 (diff) | |
download | nova-2a6ce075e19af5700960e3fb22c213e43a2e24b4.tar.gz nova-2a6ce075e19af5700960e3fb22c213e43a2e24b4.tar.xz nova-2a6ce075e19af5700960e3fb22c213e43a2e24b4.zip |
start of fanout
-rw-r--r-- | nova/rpc.py | 20 | ||||
-rw-r--r-- | nova/scheduler/manager.py | 4 | ||||
-rw-r--r-- | nova/scheduler/zone_manager.py | 4 | ||||
-rw-r--r-- | nova/service.py | 6 |
4 files changed, 34 insertions, 0 deletions
diff --git a/nova/rpc.py b/nova/rpc.py index 8fe4565dd..a02cdc90c 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -218,6 +218,16 @@ class TopicPublisher(Publisher): super(TopicPublisher, self).__init__(connection=connection) +class FanoutPublisher(Publisher): + """Publishes messages to a fanout exchange.""" + exchange_type = "fanout" + + def __init__(self, topic, connection=None): + self.exchange = "%s_fanout" % topic + self.durable = False + super(FanoutPublisher, self).__init__(connection=connection) + + class DirectConsumer(Consumer): """Consumes messages directly on a channel specified by msg_id""" exchange_type = "direct" @@ -360,6 +370,16 @@ def cast(context, topic, msg): publisher.close() +def fanout_cast(context, topic, msg): + """Sends a message on a fanout exchange without waiting for a response""" + LOG.debug(_("Making asynchronous fanout cast...")) + _pack_context(msg, context) + conn = Connection.instance() + publisher = FanoutPublisher(topic, connection=conn) + publisher.send(msg) + publisher.close() + + def generic_response(message_data, message): """Logs a result and exits""" LOG.debug(_('response %s'), message_data) diff --git a/nova/scheduler/manager.py b/nova/scheduler/manager.py index c94397210..7541523b0 100644 --- a/nova/scheduler/manager.py +++ b/nova/scheduler/manager.py @@ -59,6 +59,10 @@ class SchedulerManager(manager.Manager): """Get a list of zones from the ZoneManager.""" return self.zone_manager.get_zone_list() + def update_compute_capabilities(self, context=None): + """Process a compute node update.""" + return self.zone_manager.update_compute_capabilities() + def _schedule(self, method, context, topic, *args, **kwargs): """Tries to call schedule_* method on the driver to retrieve host. diff --git a/nova/scheduler/zone_manager.py b/nova/scheduler/zone_manager.py index edf9000cc..eedc5c235 100644 --- a/nova/scheduler/zone_manager.py +++ b/nova/scheduler/zone_manager.py @@ -105,6 +105,7 @@ class ZoneManager(object): def __init__(self): self.last_zone_db_check = datetime.min self.zone_states = {} + self.compute_states = {} self.green_pool = greenpool.GreenPool() def get_zone_list(self): @@ -141,3 +142,6 @@ class ZoneManager(object): self.last_zone_db_check = datetime.now() self._refresh_from_db(context) self._poll_zones(context) + + def update_compute_capabilities(self): + logging.debug(_("****** UPDATE COMPUTE CAPABILITIES *******")) diff --git a/nova/service.py b/nova/service.py index f47358089..3ecf46525 100644 --- a/nova/service.py +++ b/nova/service.py @@ -84,6 +84,7 @@ class Service(object): conn1 = rpc.Connection.instance(new=True) conn2 = rpc.Connection.instance(new=True) + conn3 = rpc.Connection.instance(new=True) if self.report_interval: consumer_all = rpc.AdapterConsumer( connection=conn1, @@ -93,9 +94,14 @@ class Service(object): connection=conn2, topic='%s.%s' % (self.topic, self.host), proxy=self) + fanout = rpc.AdapterConsumer( + connection=conn2, + topic='%s_fanout' % self.topic, + proxy=self) self.timers.append(consumer_all.attach_to_eventlet()) self.timers.append(consumer_node.attach_to_eventlet()) + self.timers.append(fanout.attach_to_eventlet()) pulse = utils.LoopingCall(self.report_state) pulse.start(interval=self.report_interval, now=False) |