summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortermie <github@anarkystic.com>2011-05-25 15:42:24 -0700
committertermie <github@anarkystic.com>2011-05-25 15:42:24 -0700
commit51e8eeb9b3a23f811bcbf52d9700d94c5c8b15e4 (patch)
tree57264d10f16be215ce309cbecce04d478e9c7b49
parentb193b97054f11664a72cd53547f355d1c9044f88 (diff)
downloadnova-51e8eeb9b3a23f811bcbf52d9700d94c5c8b15e4.tar.gz
nova-51e8eeb9b3a23f811bcbf52d9700d94c5c8b15e4.tar.xz
nova-51e8eeb9b3a23f811bcbf52d9700d94c5c8b15e4.zip
bring back commits lost in merge
-rw-r--r--nova/rpc.py107
-rw-r--r--nova/tests/test_rpc.py19
2 files changed, 82 insertions, 44 deletions
diff --git a/nova/rpc.py b/nova/rpc.py
index 3cc0dadd4..d7d7bb014 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -35,6 +35,7 @@ from carrot import connection as carrot_connection
from carrot import messaging
import eventlet
from eventlet import greenpool
+from eventlet import greenthread
from eventlet import pools
from eventlet import queue
@@ -140,30 +141,30 @@ class Consumer(messaging.Consumer):
FLAGS.rabbit_max_retries)
sys.exit(1)
- def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
- """Wraps the parent fetch with some logic for failed connection."""
- # TODO(vish): the logic for failed connections and logging should be
- # refactored into some sort of connection manager object
- try:
- if self.failed_connection:
- # NOTE(vish): connection is defined in the parent class, we can
- # recreate it as long as we create the backend too
- # pylint: disable=W0201
- self.connection = Connection.recreate()
- self.backend = self.connection.create_backend()
- self.declare()
- return super(Consumer, self).fetch(
- no_ack, auto_ack, enable_callbacks)
- if self.failed_connection:
- LOG.error(_('Reconnected to queue'))
- self.failed_connection = False
- # NOTE(vish): This is catching all errors because we really don't
- # want exceptions to be logged 10 times a second if some
- # persistent failure occurs.
- except Exception, e: # pylint: disable=W0703
- if not self.failed_connection:
- LOG.exception(_('Failed to fetch message from queue: %s' % e))
- self.failed_connection = True
+ #def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
+ # """Wraps the parent fetch with some logic for failed connection."""
+ # # TODO(vish): the logic for failed connections and logging should be
+ # # refactored into some sort of connection manager object
+ # try:
+ # if self.failed_connection:
+ # # NOTE(vish): connection is defined in the parent class, we can
+ # # recreate it as long as we create the backend too
+ # # pylint: disable=W0201
+ # self.connection = Connection.recreate()
+ # self.backend = self.connection.create_backend()
+ # self.declare()
+ # return super(Consumer, self).fetch(
+ # no_ack, auto_ack, enable_callbacks)
+ # if self.failed_connection:
+ # LOG.error(_('Reconnected to queue'))
+ # self.failed_connection = False
+ # # NOTE(vish): This is catching all errors because we really don't
+ # # want exceptions to be logged 10 times a second if some
+ # # persistent failure occurs.
+ # except Exception, e: # pylint: disable=W0703
+ # if not self.failed_connection:
+ # LOG.exception(_('Failed to fetch message from queue: %s' % e))
+ # self.failed_connection = True
def attach_to_eventlet(self):
"""Only needed for unit tests!"""
@@ -195,7 +196,7 @@ class AdapterConsumer(Consumer):
"""
LOG.debug(_('received %s') % message_data)
- msg_id = message_data.pop('_msg_id', None)
+ msg_id = message_data.get('_msg_id', None)
ctxt = _unpack_context(message_data)
@@ -225,11 +226,14 @@ class AdapterConsumer(Consumer):
rval = node_func(context=ctxt, **node_args)
if msg_id:
# TODO(termie): re-enable when fix the yielding issue
- #if hasattr(rval, 'send'):
- # logging.error('rval! %s', rval)
- # for x in rval:
- # msg_reply(msg_id, x, None)
- msg_reply(msg_id, rval, None)
+ if hasattr(rval, 'send'):
+ logging.error('rval! %s', rval)
+ for x in rval:
+ msg_reply(msg_id, x, None)
+ msg_reply(msg_id, None, None)
+ else:
+ msg_reply(msg_id, rval, None)
+ #msg_reply(msg_id, rval, None)
except Exception as e:
logging.exception('Exception during message handling')
if msg_id:
@@ -355,7 +359,7 @@ class DirectConsumer(Consumer):
self.routing_key = msg_id
self.exchange = msg_id
self.auto_delete = True
- self.exclusive = True
+ self.exclusive = False
super(DirectConsumer, self).__init__(connection=connection)
@@ -387,7 +391,9 @@ def msg_reply(msg_id, reply=None, failure=None):
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try:
publisher.send({'result': reply, 'failure': failure})
+ LOG.error('MSG REPLY SUCCESS')
except TypeError:
+ LOG.error('MSG REPLY FAILURE')
publisher.send(
{'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
@@ -440,9 +446,9 @@ def _pack_context(msg, context):
for args at some point.
"""
- context = dict([('_context_%s' % key, value)
- for (key, value) in context.to_dict().iteritems()])
- msg.update(context)
+ context_d = dict([('_context_%s' % key, value)
+ for (key, value) in context.to_dict().iteritems()])
+ msg.update(context_d)
class RpcContext(context.RequestContext):
@@ -463,12 +469,13 @@ def multicall(context, topic, msg):
LOG.debug(_('MSG_ID is %s') % (msg_id))
_pack_context(msg, context)
- conn = ConnectionPool.get()
- consumer = DirectConsumer(connection=conn, msg_id=msg_id)
+ con_conn = ConnectionPool.get()
+ consumer = DirectConsumer(connection=con_conn, msg_id=msg_id)
wait_msg = MulticallWaiter(consumer)
consumer.register_callback(wait_msg)
- publisher = TopicPublisher(connection=conn, topic=topic)
+ pub_conn = ConnectionPool.get()
+ publisher = TopicPublisher(connection=pub_conn, topic=topic)
publisher.send(msg)
publisher.close()
@@ -484,6 +491,7 @@ class MulticallWaiter(object):
def close(self):
self._closed = True
self._consumer.close()
+ ConnectionPool.put(self._consumer.connection)
def __call__(self, data, message):
"""Acks message and sets result."""
@@ -501,15 +509,26 @@ class MulticallWaiter(object):
# trying to solve the problem quickly. This works but
# I'd prefer to dig in and do it the best way later on.
- def _waiter():
- while not self._closed:
- try:
- self._consumer.wait(limit=1)
- except StopIteration:
- pass
- eventlet.spawn(_waiter)
+ #def _waiter():
+ # i = 0
+ # while not self._closed:
+ # LOG.error('Iteration #%s (%s)', i, self._consumer.consumer_tag)
+ # i += 1
+ # try:
+ # self._consumer.wait(limit=1)
+ # except StopIteration:
+ # pass
+ # self._consumer.close()
+ # ConnectionPool.put(self._consumer.connection)
+ #eventlet.spawn(_waiter)
while True:
+ rv = None
+ while rv is None and not self._closed:
+ rv = self._consumer.fetch(enable_callbacks=True)
+ time.sleep(0.01)
+
+ LOG.error('RV %s', rv)
result = self._results.get()
if isinstance(result, Exception):
raise result
diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py
index f64209596..e5d99474d 100644
--- a/nova/tests/test_rpc.py
+++ b/nova/tests/test_rpc.py
@@ -61,6 +61,18 @@ class RpcTestCase(test.TestCase):
self.assertEqual(value + i, x)
i += 1
+ def test_multicall_succeed_three_times_yield(self):
+ """Get a value through rpc call"""
+ value = 42
+ result = rpc.multicall(self.context,
+ 'test',
+ {"method": "echo_three_times_yield",
+ "args": {"value": value}})
+ i = 0
+ for x in result:
+ self.assertEqual(value + i, x)
+ i += 1
+
def test_context_passed(self):
"""Makes sure a context is passed through rpc call"""
value = 42
@@ -83,6 +95,7 @@ class RpcTestCase(test.TestCase):
'test',
{"method": "fail",
"args": {"value": value}})
+ LOG.error('INNNNNNN BETTTWWWWWWWWWWEEEEEEEEEEN')
try:
rpc.call(self.context,
'test',
@@ -187,6 +200,12 @@ class TestReceiver(object):
context.reply(value + 2)
@staticmethod
+ def echo_three_times_yield(context, value):
+ yield value
+ yield value + 1
+ yield value + 2
+
+ @staticmethod
def fail(context, value):
"""Raises an exception with the value sent in"""
raise Exception(value)