summaryrefslogtreecommitdiffstats
path: root/keystone/openstack/common/rpc/impl_fake.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone/openstack/common/rpc/impl_fake.py')
-rw-r--r--keystone/openstack/common/rpc/impl_fake.py195
1 files changed, 195 insertions, 0 deletions
diff --git a/keystone/openstack/common/rpc/impl_fake.py b/keystone/openstack/common/rpc/impl_fake.py
new file mode 100644
index 00000000..9479ab4d
--- /dev/null
+++ b/keystone/openstack/common/rpc/impl_fake.py
@@ -0,0 +1,195 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""Fake RPC implementation which calls proxy methods directly with no
+queues. Casts will block, but this is very useful for tests.
+"""
+
+import inspect
+# NOTE(russellb): We specifically want to use json, not our own jsonutils.
+# jsonutils has some extra logic to automatically convert objects to primitive
+# types so that they can be serialized. We want to catch all cases where
+# non-primitive types make it into this code and treat it as an error.
+import json
+import time
+
+import eventlet
+
+from keystone.openstack.common.rpc import common as rpc_common
+
+CONSUMERS = {}
+
+
+class RpcContext(rpc_common.CommonRpcContext):
+ def __init__(self, **kwargs):
+ super(RpcContext, self).__init__(**kwargs)
+ self._response = []
+ self._done = False
+
+ def deepcopy(self):
+ values = self.to_dict()
+ new_inst = self.__class__(**values)
+ new_inst._response = self._response
+ new_inst._done = self._done
+ return new_inst
+
+ def reply(self, reply=None, failure=None, ending=False):
+ if ending:
+ self._done = True
+ if not self._done:
+ self._response.append((reply, failure))
+
+
+class Consumer(object):
+ def __init__(self, topic, proxy):
+ self.topic = topic
+ self.proxy = proxy
+
+ def call(self, context, version, method, namespace, args, timeout):
+ done = eventlet.event.Event()
+
+ def _inner():
+ ctxt = RpcContext.from_dict(context.to_dict())
+ try:
+ rval = self.proxy.dispatch(context, version, method,
+ namespace, **args)
+ res = []
+ # Caller might have called ctxt.reply() manually
+ for (reply, failure) in ctxt._response:
+ if failure:
+ raise failure[0], failure[1], failure[2]
+ res.append(reply)
+ # if ending not 'sent'...we might have more data to
+ # return from the function itself
+ if not ctxt._done:
+ if inspect.isgenerator(rval):
+ for val in rval:
+ res.append(val)
+ else:
+ res.append(rval)
+ done.send(res)
+ except rpc_common.ClientException as e:
+ done.send_exception(e._exc_info[1])
+ except Exception as e:
+ done.send_exception(e)
+
+ thread = eventlet.greenthread.spawn(_inner)
+
+ if timeout:
+ start_time = time.time()
+ while not done.ready():
+ eventlet.greenthread.sleep(1)
+ cur_time = time.time()
+ if (cur_time - start_time) > timeout:
+ thread.kill()
+ raise rpc_common.Timeout()
+
+ return done.wait()
+
+
+class Connection(object):
+ """Connection object."""
+
+ def __init__(self):
+ self.consumers = []
+
+ def create_consumer(self, topic, proxy, fanout=False):
+ consumer = Consumer(topic, proxy)
+ self.consumers.append(consumer)
+ if topic not in CONSUMERS:
+ CONSUMERS[topic] = []
+ CONSUMERS[topic].append(consumer)
+
+ def close(self):
+ for consumer in self.consumers:
+ CONSUMERS[consumer.topic].remove(consumer)
+ self.consumers = []
+
+ def consume_in_thread(self):
+ pass
+
+
+def create_connection(conf, new=True):
+ """Create a connection."""
+ return Connection()
+
+
+def check_serialize(msg):
+ """Make sure a message intended for rpc can be serialized."""
+ json.dumps(msg)
+
+
+def multicall(conf, context, topic, msg, timeout=None):
+ """Make a call that returns multiple times."""
+
+ check_serialize(msg)
+
+ method = msg.get('method')
+ if not method:
+ return
+ args = msg.get('args', {})
+ version = msg.get('version', None)
+ namespace = msg.get('namespace', None)
+
+ try:
+ consumer = CONSUMERS[topic][0]
+ except (KeyError, IndexError):
+ return iter([None])
+ else:
+ return consumer.call(context, version, method, namespace, args,
+ timeout)
+
+
+def call(conf, context, topic, msg, timeout=None):
+ """Sends a message on a topic and wait for a response."""
+ rv = multicall(conf, context, topic, msg, timeout)
+ # NOTE(vish): return the last result from the multicall
+ rv = list(rv)
+ if not rv:
+ return
+ return rv[-1]
+
+
+def cast(conf, context, topic, msg):
+ check_serialize(msg)
+ try:
+ call(conf, context, topic, msg)
+ except Exception:
+ pass
+
+
+def notify(conf, context, topic, msg, envelope):
+ check_serialize(msg)
+
+
+def cleanup():
+ pass
+
+
+def fanout_cast(conf, context, topic, msg):
+ """Cast to all consumers of a topic."""
+ check_serialize(msg)
+ method = msg.get('method')
+ if not method:
+ return
+ args = msg.get('args', {})
+ version = msg.get('version', None)
+ namespace = msg.get('namespace', None)
+
+ for consumer in CONSUMERS.get(topic, []):
+ try:
+ consumer.call(context, version, method, namespace, args, None)
+ except Exception:
+ pass