summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCory Wright <cory.wright@rackspace.com>2011-05-31 09:40:01 -0400
committerCory Wright <cory.wright@rackspace.com>2011-05-31 09:40:01 -0400
commit6120fca7ff665ead049c4a26b63e73d7e2a372fd (patch)
tree7693d8926ac2e32e2a7cfc7383bd1758f7588ae1
parenta027d887b7c18e27fed951c5ed22bf5dd097c560 (diff)
parent46f12e016e862803a3bfd81a4b8c615cae42cb38 (diff)
downloadnova-6120fca7ff665ead049c4a26b63e73d7e2a372fd.tar.gz
nova-6120fca7ff665ead049c4a26b63e73d7e2a372fd.tar.xz
nova-6120fca7ff665ead049c4a26b63e73d7e2a372fd.zip
merge trunk
-rw-r--r--Authors1
-rw-r--r--nova/api/ec2/__init__.py6
-rw-r--r--nova/api/ec2/cloud.py57
-rw-r--r--nova/auth/novarc.template2
-rw-r--r--nova/db/api.py39
-rw-r--r--nova/db/sqlalchemy/api.py76
-rw-r--r--nova/db/sqlalchemy/migrate_repo/versions/019_add_volume_snapshot_support.py70
-rw-r--r--nova/db/sqlalchemy/models.py25
-rw-r--r--nova/exception.py8
-rw-r--r--nova/fakerabbit.py31
-rw-r--r--nova/rpc.py271
-rw-r--r--nova/service.py60
-rw-r--r--nova/test.py9
-rw-r--r--nova/tests/integrated/integrated_helpers.py5
-rw-r--r--nova/tests/test_cloud.py72
-rw-r--r--nova/tests/test_rpc.py116
-rw-r--r--nova/tests/test_service.py59
-rw-r--r--nova/tests/test_volume.py28
-rw-r--r--nova/tests/test_xenapi.py26
-rw-r--r--nova/virt/xenapi/vmops.py24
-rw-r--r--nova/volume/api.py44
-rw-r--r--nova/volume/driver.py121
-rw-r--r--nova/volume/manager.py49
-rw-r--r--tools/pip-requires3
24 files changed, 1007 insertions, 195 deletions
diff --git a/Authors b/Authors
index 50f4680a9..26e1281f4 100644
--- a/Authors
+++ b/Authors
@@ -30,6 +30,7 @@ Gabe Westmaas <gabe.westmaas@rackspace.com>
Hisaharu Ishii <ishii.hisaharu@lab.ntt.co.jp>
Hisaki Ohara <hisaki.ohara@intel.com>
Ilya Alekseyev <ialekseev@griddynamics.com>
+Isaku Yamahata <yamahata@valinux.co.jp>
Jason Koelker <jason@koelker.net>
Jay Pipes <jaypipes@gmail.com>
Jesse Andrews <anotherjesse@gmail.com>
diff --git a/nova/api/ec2/__init__.py b/nova/api/ec2/__init__.py
index c13993dd3..1915d007d 100644
--- a/nova/api/ec2/__init__.py
+++ b/nova/api/ec2/__init__.py
@@ -327,6 +327,12 @@ class Executor(wsgi.Application):
ec2_id = ec2utils.id_to_ec2_id(ex.volume_id, 'vol-%08x')
message = _('Volume %s not found') % ec2_id
return self._error(req, context, type(ex).__name__, message)
+ except exception.SnapshotNotFound as ex:
+ LOG.info(_('SnapshotNotFound raised: %s'), unicode(ex),
+ context=context)
+ ec2_id = ec2utils.id_to_ec2_id(ex.snapshot_id, 'snap-%08x')
+ message = _('Snapshot %s not found') % ec2_id
+ return self._error(req, context, type(ex).__name__, message)
except exception.NotFound as ex:
LOG.info(_('NotFound raised: %s'), unicode(ex), context=context)
return self._error(req, context, type(ex).__name__, unicode(ex))
diff --git a/nova/api/ec2/cloud.py b/nova/api/ec2/cloud.py
index c35b6024e..d92838f38 100644
--- a/nova/api/ec2/cloud.py
+++ b/nova/api/ec2/cloud.py
@@ -283,14 +283,50 @@ class CloudController(object):
owner=None,
restorable_by=None,
**kwargs):
- return {'snapshotSet': [{'snapshotId': 'fixme',
- 'volumeId': 'fixme',
- 'status': 'fixme',
- 'startTime': 'fixme',
- 'progress': 'fixme',
- 'ownerId': 'fixme',
- 'volumeSize': 0,
- 'description': 'fixme'}]}
+ if snapshot_id:
+ snapshots = []
+ for ec2_id in snapshot_id:
+ internal_id = ec2utils.ec2_id_to_id(ec2_id)
+ snapshot = self.volume_api.get_snapshot(
+ context,
+ snapshot_id=internal_id)
+ snapshots.append(snapshot)
+ else:
+ snapshots = self.volume_api.get_all_snapshots(context)
+ snapshots = [self._format_snapshot(context, s) for s in snapshots]
+ return {'snapshotSet': snapshots}
+
+ def _format_snapshot(self, context, snapshot):
+ s = {}
+ s['snapshotId'] = ec2utils.id_to_ec2_id(snapshot['id'], 'snap-%08x')
+ s['volumeId'] = ec2utils.id_to_ec2_id(snapshot['volume_id'],
+ 'vol-%08x')
+ s['status'] = snapshot['status']
+ s['startTime'] = snapshot['created_at']
+ s['progress'] = snapshot['progress']
+ s['ownerId'] = snapshot['project_id']
+ s['volumeSize'] = snapshot['volume_size']
+ s['description'] = snapshot['display_description']
+
+ s['display_name'] = snapshot['display_name']
+ s['display_description'] = snapshot['display_description']
+ return s
+
+ def create_snapshot(self, context, volume_id, **kwargs):
+ LOG.audit(_("Create snapshot of volume %s"), volume_id,
+ context=context)
+ volume_id = ec2utils.ec2_id_to_id(volume_id)
+ snapshot = self.volume_api.create_snapshot(
+ context,
+ volume_id=volume_id,
+ name=kwargs.get('display_name'),
+ description=kwargs.get('display_description'))
+ return self._format_snapshot(context, snapshot)
+
+ def delete_snapshot(self, context, snapshot_id, **kwargs):
+ snapshot_id = ec2utils.ec2_id_to_id(snapshot_id)
+ self.volume_api.delete_snapshot(context, snapshot_id=snapshot_id)
+ return True
def describe_key_pairs(self, context, key_name=None, **kwargs):
key_pairs = db.key_pair_get_all_by_user(context, context.user_id)
@@ -619,6 +655,11 @@ class CloudController(object):
'volumeId': v['volumeId']}]
else:
v['attachmentSet'] = [{}]
+ if volume.get('snapshot_id') != None:
+ v['snapshotId'] = ec2utils.id_to_ec2_id(volume['snapshot_id'],
+ 'snap-%08x')
+ else:
+ v['snapshotId'] = None
v['display_name'] = volume['display_name']
v['display_description'] = volume['display_description']
diff --git a/nova/auth/novarc.template b/nova/auth/novarc.template
index cda2ecc28..8170fcafe 100644
--- a/nova/auth/novarc.template
+++ b/nova/auth/novarc.template
@@ -1,4 +1,4 @@
-NOVA_KEY_DIR=$(pushd $(dirname $BASH_SOURCE)>/dev/null; pwd; popd>/dev/null)
+NOVA_KEY_DIR=$(dirname $(readlink -f ${BASH_SOURCE}))
export EC2_ACCESS_KEY="%(access)s:%(project)s"
export EC2_SECRET_KEY="%(secret)s"
export EC2_URL="%(ec2)s"
diff --git a/nova/db/api.py b/nova/db/api.py
index 310c0bb09..4e0aa60a2 100644
--- a/nova/db/api.py
+++ b/nova/db/api.py
@@ -47,6 +47,8 @@ flags.DEFINE_string('instance_name_template', 'instance-%08x',
'Template string to be used to generate instance names')
flags.DEFINE_string('volume_name_template', 'volume-%08x',
'Template string to be used to generate instance names')
+flags.DEFINE_string('snapshot_name_template', 'snapshot-%08x',
+ 'Template string to be used to generate snapshot names')
IMPL = utils.LazyPluggable(FLAGS['db_backend'],
@@ -881,6 +883,43 @@ def volume_update(context, volume_id, values):
####################
+def snapshot_create(context, values):
+ """Create a snapshot from the values dictionary."""
+ return IMPL.snapshot_create(context, values)
+
+
+def snapshot_destroy(context, snapshot_id):
+ """Destroy the snapshot or raise if it does not exist."""
+ return IMPL.snapshot_destroy(context, snapshot_id)
+
+
+def snapshot_get(context, snapshot_id):
+ """Get a snapshot or raise if it does not exist."""
+ return IMPL.snapshot_get(context, snapshot_id)
+
+
+def snapshot_get_all(context):
+ """Get all snapshots."""
+ return IMPL.snapshot_get_all(context)
+
+
+def snapshot_get_all_by_project(context, project_id):
+ """Get all snapshots belonging to a project."""
+ return IMPL.snapshot_get_all_by_project(context, project_id)
+
+
+def snapshot_update(context, snapshot_id, values):
+ """Set the given properties on an snapshot and update it.
+
+ Raises NotFound if snapshot does not exist.
+
+ """
+ return IMPL.snapshot_update(context, snapshot_id, values)
+
+
+####################
+
+
def security_group_get_all(context):
"""Get all security groups."""
return IMPL.security_group_get_all(context)
diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py
index e4dda5c12..d53b76f4a 100644
--- a/nova/db/sqlalchemy/api.py
+++ b/nova/db/sqlalchemy/api.py
@@ -1790,6 +1790,82 @@ def volume_update(context, volume_id, values):
@require_context
+def snapshot_create(context, values):
+ snapshot_ref = models.Snapshot()
+ snapshot_ref.update(values)
+
+ session = get_session()
+ with session.begin():
+ snapshot_ref.save(session=session)
+ return snapshot_ref
+
+
+@require_admin_context
+def snapshot_destroy(context, snapshot_id):
+ session = get_session()
+ with session.begin():
+ session.query(models.Snapshot).\
+ filter_by(id=snapshot_id).\
+ update({'deleted': 1,
+ 'deleted_at': datetime.datetime.utcnow(),
+ 'updated_at': literal_column('updated_at')})
+
+
+@require_context
+def snapshot_get(context, snapshot_id, session=None):
+ if not session:
+ session = get_session()
+ result = None
+
+ if is_admin_context(context):
+ result = session.query(models.Snapshot).\
+ filter_by(id=snapshot_id).\
+ filter_by(deleted=can_read_deleted(context)).\
+ first()
+ elif is_user_context(context):
+ result = session.query(models.Snapshot).\
+ filter_by(project_id=context.project_id).\
+ filter_by(id=snapshot_id).\
+ filter_by(deleted=False).\
+ first()
+ if not result:
+ raise exception.SnapshotNotFound(snapshot_id=snapshot_id)
+
+ return result
+
+
+@require_admin_context
+def snapshot_get_all(context):
+ session = get_session()
+ return session.query(models.Snapshot).\
+ filter_by(deleted=can_read_deleted(context)).\
+ all()
+
+
+@require_context
+def snapshot_get_all_by_project(context, project_id):
+ authorize_project_context(context, project_id)
+
+ session = get_session()
+ return session.query(models.Snapshot).\
+ filter_by(project_id=project_id).\
+ filter_by(deleted=can_read_deleted(context)).\
+ all()
+
+
+@require_context
+def snapshot_update(context, snapshot_id, values):
+ session = get_session()
+ with session.begin():
+ snapshot_ref = snapshot_get(context, snapshot_id, session=session)
+ snapshot_ref.update(values)
+ snapshot_ref.save(session=session)
+
+
+###################
+
+
+@require_context
def security_group_get_all(context):
session = get_session()
return session.query(models.SecurityGroup).\
diff --git a/nova/db/sqlalchemy/migrate_repo/versions/019_add_volume_snapshot_support.py b/nova/db/sqlalchemy/migrate_repo/versions/019_add_volume_snapshot_support.py
new file mode 100644
index 000000000..f16d6db56
--- /dev/null
+++ b/nova/db/sqlalchemy/migrate_repo/versions/019_add_volume_snapshot_support.py
@@ -0,0 +1,70 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 MORITA Kazutaka.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from sqlalchemy import Column, Table, MetaData
+from sqlalchemy import Integer, DateTime, Boolean, String
+
+from nova import log as logging
+
+meta = MetaData()
+
+snapshots = Table('snapshots', meta,
+ Column('created_at', DateTime(timezone=False)),
+ Column('updated_at', DateTime(timezone=False)),
+ Column('deleted_at', DateTime(timezone=False)),
+ Column('deleted', Boolean(create_constraint=True, name=None)),
+ Column('id', Integer(), primary_key=True, nullable=False),
+ Column('volume_id', Integer(), nullable=False),
+ Column('user_id',
+ String(length=255, convert_unicode=False, assert_unicode=None,
+ unicode_error=None, _warn_on_bytestring=False)),
+ Column('project_id',
+ String(length=255, convert_unicode=False, assert_unicode=None,
+ unicode_error=None, _warn_on_bytestring=False)),
+ Column('status',
+ String(length=255, convert_unicode=False, assert_unicode=None,
+ unicode_error=None, _warn_on_bytestring=False)),
+ Column('progress',
+ String(length=255, convert_unicode=False, assert_unicode=None,
+ unicode_error=None, _warn_on_bytestring=False)),
+ Column('volume_size', Integer()),
+ Column('scheduled_at', DateTime(timezone=False)),
+ Column('display_name',
+ String(length=255, convert_unicode=False, assert_unicode=None,
+ unicode_error=None, _warn_on_bytestring=False)),
+ Column('display_description',
+ String(length=255, convert_unicode=False, assert_unicode=None,
+ unicode_error=None, _warn_on_bytestring=False)))
+
+
+def upgrade(migrate_engine):
+ # Upgrade operations go here. Don't create your own engine;
+ # bind migrate_engine to your metadata
+ meta.bind = migrate_engine
+
+ try:
+ snapshots.create()
+ except Exception:
+ logging.info(repr(snapshots))
+ logging.exception('Exception while creating table')
+ meta.drop_all(tables=[snapshots])
+ raise
+
+
+def downgrade(migrate_engine):
+ # Operations to reverse the above upgrade go here.
+ snapshots.drop()
diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py
index 1215448f8..480f62399 100644
--- a/nova/db/sqlalchemy/models.py
+++ b/nova/db/sqlalchemy/models.py
@@ -329,6 +329,31 @@ class Quota(BASE, NovaBase):
hard_limit = Column(Integer, nullable=True)
+class Snapshot(BASE, NovaBase):
+ """Represents a block storage device that can be attached to a vm."""
+ __tablename__ = 'snapshots'
+ id = Column(Integer, primary_key=True, autoincrement=True)
+
+ @property
+ def name(self):
+ return FLAGS.snapshot_name_template % self.id
+
+ @property
+ def volume_name(self):
+ return FLAGS.volume_name_template % self.volume_id
+
+ user_id = Column(String(255))
+ project_id = Column(String(255))
+
+ volume_id = Column(Integer)
+ status = Column(String(255))
+ progress = Column(String(255))
+ volume_size = Column(Integer)
+
+ display_name = Column(String(255))
+ display_description = Column(String(255))
+
+
class ExportDevice(BASE, NovaBase):
"""Represates a shelf and blade that a volume can be exported on."""
__tablename__ = 'export_devices'
diff --git a/nova/exception.py b/nova/exception.py
index 56c20d111..02c65fd64 100644
--- a/nova/exception.py
+++ b/nova/exception.py
@@ -271,6 +271,14 @@ class VolumeNotFoundForInstance(VolumeNotFound):
message = _("Volume not found for instance %(instance_id)s.")
+class SnapshotNotFound(NotFound):
+ message = _("Snapshot %(snapshot_id)s could not be found.")
+
+
+class VolumeIsBusy(Error):
+ message = _("deleting volume %(volume_name)s that has snapshot")
+
+
class ExportDeviceNotFoundForVolume(NotFound):
message = _("No export device found for volume %(volume_id)s.")
diff --git a/nova/fakerabbit.py b/nova/fakerabbit.py
index a7dee8caf..e7e9dab77 100644
--- a/nova/fakerabbit.py
+++ b/nova/fakerabbit.py
@@ -31,6 +31,7 @@ LOG = logging.getLogger("nova.fakerabbit")
EXCHANGES = {}
QUEUES = {}
+CONSUMERS = {}
class Message(base.BaseMessage):
@@ -96,17 +97,29 @@ class Backend(base.BaseBackend):
' key %(routing_key)s') % locals())
EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key)
- def declare_consumer(self, queue, callback, *args, **kwargs):
- self.current_queue = queue
- self.current_callback = callback
+ def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs):
+ global CONSUMERS
+ LOG.debug("Adding consumer %s", consumer_tag)
+ CONSUMERS[consumer_tag] = (queue, callback)
+
+ def cancel(self, consumer_tag):
+ global CONSUMERS
+ LOG.debug("Removing consumer %s", consumer_tag)
+ del CONSUMERS[consumer_tag]
def consume(self, limit=None):
+ global CONSUMERS
+ num = 0
while True:
- item = self.get(self.current_queue)
- if item:
- self.current_callback(item)
- raise StopIteration()
- greenthread.sleep(0)
+ for (queue, callback) in CONSUMERS.itervalues():
+ item = self.get(queue)
+ if item:
+ callback(item)
+ num += 1
+ yield
+ if limit and num == limit:
+ raise StopIteration()
+ greenthread.sleep(0.1)
def get(self, queue, no_ack=False):
global QUEUES
@@ -134,5 +147,7 @@ class Backend(base.BaseBackend):
def reset_all():
global EXCHANGES
global QUEUES
+ global CONSUMERS
EXCHANGES = {}
QUEUES = {}
+ CONSUMERS = {}
diff --git a/nova/rpc.py b/nova/rpc.py
index 2116f22c3..c5277c6a9 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -28,12 +28,15 @@ import json
import sys
import time
import traceback
+import types
import uuid
from carrot import connection as carrot_connection
from carrot import messaging
from eventlet import greenpool
-from eventlet import greenthread
+from eventlet import pools
+from eventlet import queue
+import greenlet
from nova import context
from nova import exception
@@ -47,7 +50,10 @@ LOG = logging.getLogger('nova.rpc')
FLAGS = flags.FLAGS
-flags.DEFINE_integer('rpc_thread_pool_size', 1024, 'Size of RPC thread pool')
+flags.DEFINE_integer('rpc_thread_pool_size', 1024,
+ 'Size of RPC thread pool')
+flags.DEFINE_integer('rpc_conn_pool_size', 30,
+ 'Size of RPC connection pool')
class Connection(carrot_connection.BrokerConnection):
@@ -90,6 +96,22 @@ class Connection(carrot_connection.BrokerConnection):
return cls.instance()
+class Pool(pools.Pool):
+ """Class that implements a Pool of Connections."""
+
+ # TODO(comstud): Timeout connections not used in a while
+ def create(self):
+ LOG.debug('Creating new connection')
+ return Connection.instance(new=True)
+
+# Create a ConnectionPool to use for RPC calls. We'll order the
+# pool as a stack (LIFO), so that we can potentially loop through and
+# timeout old unused connections at some point
+ConnectionPool = Pool(
+ max_size=FLAGS.rpc_conn_pool_size,
+ order_as_stack=True)
+
+
class Consumer(messaging.Consumer):
"""Consumer base class.
@@ -131,7 +153,9 @@ class Consumer(messaging.Consumer):
self.connection = Connection.recreate()
self.backend = self.connection.create_backend()
self.declare()
- super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks)
+ return super(Consumer, self).fetch(no_ack,
+ auto_ack,
+ enable_callbacks)
if self.failed_connection:
LOG.error(_('Reconnected to queue'))
self.failed_connection = False
@@ -159,13 +183,13 @@ class AdapterConsumer(Consumer):
self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
super(AdapterConsumer, self).__init__(connection=connection,
topic=topic)
+ self.register_callback(self.process_data)
- def receive(self, *args, **kwargs):
- self.pool.spawn_n(self._receive, *args, **kwargs)
+ def process_data(self, message_data, message):
+ """Consumer callback to call a method on a proxy object.
- @exception.wrap_exception
- def _receive(self, message_data, message):
- """Magically looks for a method on the proxy object and calls it.
+ Parses the message for validity and fires off a thread to call the
+ proxy object method.
Message data should be a dictionary with two keys:
method: string representing the method to call
@@ -175,8 +199,8 @@ class AdapterConsumer(Consumer):
"""
LOG.debug(_('received %s') % message_data)
- msg_id = message_data.pop('_msg_id', None)
-
+ # This will be popped off in _unpack_context
+ msg_id = message_data.get('_msg_id', None)
ctxt = _unpack_context(message_data)
method = message_data.get('method')
@@ -188,8 +212,17 @@ class AdapterConsumer(Consumer):
# we just log the message and send an error string
# back to the caller
LOG.warn(_('no method for message: %s') % message_data)
- msg_reply(msg_id, _('No method for message: %s') % message_data)
+ if msg_id:
+ msg_reply(msg_id,
+ _('No method for message: %s') % message_data)
return
+ self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args)
+
+ @exception.wrap_exception
+ def _process_data(self, msg_id, ctxt, method, args):
+ """Thread that maigcally looks for a method on the proxy
+ object and calls it.
+ """
node_func = getattr(self.proxy, str(method))
node_args = dict((str(k), v) for k, v in args.iteritems())
@@ -197,7 +230,18 @@ class AdapterConsumer(Consumer):
try:
rval = node_func(context=ctxt, **node_args)
if msg_id:
- msg_reply(msg_id, rval, None)
+ # Check if the result was a generator
+ if isinstance(rval, types.GeneratorType):
+ for x in rval:
+ msg_reply(msg_id, x, None)
+ else:
+ msg_reply(msg_id, rval, None)
+
+ # This final None tells multicall that it is done.
+ msg_reply(msg_id, None, None)
+ elif isinstance(rval, types.GeneratorType):
+ # NOTE(vish): this iterates through the generator
+ list(rval)
except Exception as e:
logging.exception('Exception during message handling')
if msg_id:
@@ -205,11 +249,6 @@ class AdapterConsumer(Consumer):
return
-class Publisher(messaging.Publisher):
- """Publisher base class."""
- pass
-
-
class TopicAdapterConsumer(AdapterConsumer):
"""Consumes messages on a specific topic."""
@@ -242,6 +281,58 @@ class FanoutAdapterConsumer(AdapterConsumer):
topic=topic, proxy=proxy)
+class ConsumerSet(object):
+ """Groups consumers to listen on together on a single connection."""
+
+ def __init__(self, connection, consumer_list):
+ self.consumer_list = set(consumer_list)
+ self.consumer_set = None
+ self.enabled = True
+ self.init(connection)
+
+ def init(self, conn):
+ if not conn:
+ conn = Connection.instance(new=True)
+ if self.consumer_set:
+ self.consumer_set.close()
+ self.consumer_set = messaging.ConsumerSet(conn)
+ for consumer in self.consumer_list:
+ consumer.connection = conn
+ # consumer.backend is set for us
+ self.consumer_set.add_consumer(consumer)
+
+ def reconnect(self):
+ self.init(None)
+
+ def wait(self, limit=None):
+ running = True
+ while running:
+ it = self.consumer_set.iterconsume(limit=limit)
+ if not it:
+ break
+ while True:
+ try:
+ it.next()
+ except StopIteration:
+ return
+ except greenlet.GreenletExit:
+ running = False
+ break
+ except Exception as e:
+ LOG.exception(_("Exception while processing consumer"))
+ self.reconnect()
+ # Break to outer loop
+ break
+
+ def close(self):
+ self.consumer_set.close()
+
+
+class Publisher(messaging.Publisher):
+ """Publisher base class."""
+ pass
+
+
class TopicPublisher(Publisher):
"""Publishes messages on a specific topic."""
@@ -306,16 +397,18 @@ def msg_reply(msg_id, reply=None, failure=None):
LOG.error(_("Returning exception %s to caller"), message)
LOG.error(tb)
failure = (failure[0].__name__, str(failure[1]), tb)
- conn = Connection.instance()
- publisher = DirectPublisher(connection=conn, msg_id=msg_id)
- try:
- publisher.send({'result': reply, 'failure': failure})
- except TypeError:
- publisher.send(
- {'result': dict((k, repr(v))
- for k, v in reply.__dict__.iteritems()),
- 'failure': failure})
- publisher.close()
+
+ with ConnectionPool.item() as conn:
+ publisher = DirectPublisher(connection=conn, msg_id=msg_id)
+ try:
+ publisher.send({'result': reply, 'failure': failure})
+ except TypeError:
+ publisher.send(
+ {'result': dict((k, repr(v))
+ for k, v in reply.__dict__.iteritems()),
+ 'failure': failure})
+
+ publisher.close()
class RemoteError(exception.Error):
@@ -347,8 +440,9 @@ def _unpack_context(msg):
if key.startswith('_context_'):
value = msg.pop(key)
context_dict[key[9:]] = value
+ context_dict['msg_id'] = msg.pop('_msg_id', None)
LOG.debug(_('unpacked context: %s'), context_dict)
- return context.RequestContext.from_dict(context_dict)
+ return RpcContext.from_dict(context_dict)
def _pack_context(msg, context):
@@ -360,70 +454,112 @@ def _pack_context(msg, context):
for args at some point.
"""
- context = dict([('_context_%s' % key, value)
- for (key, value) in context.to_dict().iteritems()])
- msg.update(context)
+ context_d = dict([('_context_%s' % key, value)
+ for (key, value) in context.to_dict().iteritems()])
+ msg.update(context_d)
-def call(context, topic, msg):
- """Sends a message on a topic and wait for a response."""
+class RpcContext(context.RequestContext):
+ def __init__(self, *args, **kwargs):
+ msg_id = kwargs.pop('msg_id', None)
+ self.msg_id = msg_id
+ super(RpcContext, self).__init__(*args, **kwargs)
+
+ def reply(self, *args, **kwargs):
+ msg_reply(self.msg_id, *args, **kwargs)
+
+
+def multicall(context, topic, msg):
+ """Make a call that returns multiple times."""
LOG.debug(_('Making asynchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
_pack_context(msg, context)
- class WaitMessage(object):
- def __call__(self, data, message):
- """Acks message and sets result."""
- message.ack()
- if data['failure']:
- self.result = RemoteError(*data['failure'])
- else:
- self.result = data['result']
-
- wait_msg = WaitMessage()
- conn = Connection.instance()
- consumer = DirectConsumer(connection=conn, msg_id=msg_id)
+ con_conn = ConnectionPool.get()
+ consumer = DirectConsumer(connection=con_conn, msg_id=msg_id)
+ wait_msg = MulticallWaiter(consumer)
consumer.register_callback(wait_msg)
- conn = Connection.instance()
- publisher = TopicPublisher(connection=conn, topic=topic)
+ publisher = TopicPublisher(connection=con_conn, topic=topic)
publisher.send(msg)
publisher.close()
- try:
- consumer.wait(limit=1)
- except StopIteration:
- pass
- consumer.close()
- # NOTE(termie): this is a little bit of a change from the original
- # non-eventlet code where returning a Failure
- # instance from a deferred call is very similar to
- # raising an exception
- if isinstance(wait_msg.result, Exception):
- raise wait_msg.result
- return wait_msg.result
+ return wait_msg
+
+
+class MulticallWaiter(object):
+ def __init__(self, consumer):
+ self._consumer = consumer
+ self._results = queue.Queue()
+ self._closed = False
+
+ def close(self):
+ self._closed = True
+ self._consumer.close()
+ ConnectionPool.put(self._consumer.connection)
+
+ def __call__(self, data, message):
+ """Acks message and sets result."""
+ message.ack()
+ if data['failure']:
+ self._results.put(RemoteError(*data['failure']))
+ else:
+ self._results.put(data['result'])
+
+ def __iter__(self):
+ return self.wait()
+
+ def wait(self):
+ while True:
+ rv = None
+ while rv is None and not self._closed:
+ try:
+ rv = self._consumer.fetch(enable_callbacks=True)
+ except Exception:
+ self.close()
+ raise
+ time.sleep(0.01)
+
+ result = self._results.get()
+ if isinstance(result, Exception):
+ self.close()
+ raise result
+ if result == None:
+ self.close()
+ raise StopIteration
+ yield result
+
+
+def call(context, topic, msg):
+ """Sends a message on a topic and wait for a response."""
+ rv = multicall(context, topic, msg)
+ # NOTE(vish): return the last result from the multicall
+ rv = list(rv)
+ if not rv:
+ return
+ return rv[-1]
def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic)
_pack_context(msg, context)
- conn = Connection.instance()
- publisher = TopicPublisher(connection=conn, topic=topic)
- publisher.send(msg)
- publisher.close()
+ with ConnectionPool.item() as conn:
+ publisher = TopicPublisher(connection=conn, topic=topic)
+ publisher.send(msg)
+ publisher.close()
def fanout_cast(context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response."""
LOG.debug(_('Making asynchronous fanout cast...'))
_pack_context(msg, context)
- conn = Connection.instance()
- publisher = FanoutPublisher(topic, connection=conn)
- publisher.send(msg)
- publisher.close()
+ with ConnectionPool.item() as conn:
+ publisher = FanoutPublisher(topic, connection=conn)
+ publisher.send(msg)
+ publisher.close()
def generic_response(message_data, message):
@@ -459,6 +595,7 @@ def send_message(topic, message, wait=True):
if wait:
consumer.wait()
+ consumer.close()
if __name__ == '__main__':
diff --git a/nova/service.py b/nova/service.py
index ab1238c3b..74f9f04d8 100644
--- a/nova/service.py
+++ b/nova/service.py
@@ -19,14 +19,11 @@
"""Generic Node baseclass for all workers that run on hosts."""
+import greenlet
import inspect
import os
-import sys
-import time
-from eventlet import event
from eventlet import greenthread
-from eventlet import greenpool
from nova import context
from nova import db
@@ -91,27 +88,37 @@ class Service(object):
if 'nova-compute' == self.binary:
self.manager.update_available_resource(ctxt)
- conn1 = rpc.Connection.instance(new=True)
- conn2 = rpc.Connection.instance(new=True)
- conn3 = rpc.Connection.instance(new=True)
- if self.report_interval:
- consumer_all = rpc.TopicAdapterConsumer(
- connection=conn1,
- topic=self.topic,
- proxy=self)
- consumer_node = rpc.TopicAdapterConsumer(
- connection=conn2,
- topic='%s.%s' % (self.topic, self.host),
- proxy=self)
- fanout = rpc.FanoutAdapterConsumer(
- connection=conn3,
- topic=self.topic,
- proxy=self)
-
- self.timers.append(consumer_all.attach_to_eventlet())
- self.timers.append(consumer_node.attach_to_eventlet())
- self.timers.append(fanout.attach_to_eventlet())
+ self.conn = rpc.Connection.instance(new=True)
+ logging.debug("Creating Consumer connection for Service %s" %
+ self.topic)
+
+ # Share this same connection for these Consumers
+ consumer_all = rpc.TopicAdapterConsumer(
+ connection=self.conn,
+ topic=self.topic,
+ proxy=self)
+ consumer_node = rpc.TopicAdapterConsumer(
+ connection=self.conn,
+ topic='%s.%s' % (self.topic, self.host),
+ proxy=self)
+ fanout = rpc.FanoutAdapterConsumer(
+ connection=self.conn,
+ topic=self.topic,
+ proxy=self)
+ consumer_set = rpc.ConsumerSet(
+ connection=self.conn,
+ consumer_list=[consumer_all, consumer_node, fanout])
+
+ # Wait forever, processing these consumers
+ def _wait():
+ try:
+ consumer_set.wait()
+ finally:
+ consumer_set.close()
+
+ self.consumer_set_thread = greenthread.spawn(_wait)
+ if self.report_interval:
pulse = utils.LoopingCall(self.report_state)
pulse.start(interval=self.report_interval, now=False)
self.timers.append(pulse)
@@ -174,6 +181,11 @@ class Service(object):
logging.warn(_('Service killed that has no database entry'))
def stop(self):
+ self.consumer_set_thread.kill()
+ try:
+ self.consumer_set_thread.wait()
+ except greenlet.GreenletExit:
+ pass
for x in self.timers:
try:
x.stop()
diff --git a/nova/test.py b/nova/test.py
index 4deb2a175..80b2d0a74 100644
--- a/nova/test.py
+++ b/nova/test.py
@@ -31,17 +31,15 @@ import uuid
import unittest
import mox
-import shutil
import stubout
from eventlet import greenthread
-from nova import context
-from nova import db
from nova import fakerabbit
from nova import flags
from nova import rpc
from nova import service
from nova import wsgi
+from nova.virt import fake
FLAGS = flags.FLAGS
@@ -85,6 +83,7 @@ class TestCase(unittest.TestCase):
self._monkey_patch_attach()
self._monkey_patch_wsgi()
self._original_flags = FLAGS.FlagValuesDict()
+ rpc.ConnectionPool = rpc.Pool(max_size=FLAGS.rpc_conn_pool_size)
def tearDown(self):
"""Runs after each test method to tear down test environment."""
@@ -99,6 +98,10 @@ class TestCase(unittest.TestCase):
if FLAGS.fake_rabbit:
fakerabbit.reset_all()
+ if FLAGS.connection_type == 'fake':
+ if hasattr(fake.FakeConnection, '_instance'):
+ del fake.FakeConnection._instance
+
# Reset any overriden flags
self.reset_flags()
diff --git a/nova/tests/integrated/integrated_helpers.py b/nova/tests/integrated/integrated_helpers.py
index bc98921f0..7f590441e 100644
--- a/nova/tests/integrated/integrated_helpers.py
+++ b/nova/tests/integrated/integrated_helpers.py
@@ -154,10 +154,7 @@ class _IntegratedTestBase(test.TestCase):
# set up services
self.start_service('compute')
self.start_service('volume')
- # NOTE(justinsb): There's a bug here which is eluding me...
- # If we start the network_service, all is good, but then subsequent
- # tests fail: CloudTestCase.test_ajax_console in particular.
- #self.start_service('network')
+ self.start_service('network')
self.start_service('scheduler')
self._start_api_service()
diff --git a/nova/tests/test_cloud.py b/nova/tests/test_cloud.py
index 54c0454de..34a73ad1f 100644
--- a/nova/tests/test_cloud.py
+++ b/nova/tests/test_cloud.py
@@ -17,13 +17,9 @@
# under the License.
from base64 import b64decode
-import json
from M2Crypto import BIO
from M2Crypto import RSA
import os
-import shutil
-import tempfile
-import time
from eventlet import greenthread
@@ -33,12 +29,10 @@ from nova import db
from nova import flags
from nova import log as logging
from nova import rpc
-from nova import service
from nova import test
from nova import utils
from nova import exception
from nova.auth import manager
-from nova.compute import power_state
from nova.api.ec2 import cloud
from nova.api.ec2 import ec2utils
from nova.image import local
@@ -79,14 +73,21 @@ class CloudTestCase(test.TestCase):
self.stubs.Set(local.LocalImageService, 'show', fake_show)
self.stubs.Set(local.LocalImageService, 'show_by_name', fake_show)
+ # NOTE(vish): set up a manual wait so rpc.cast has a chance to finish
+ rpc_cast = rpc.cast
+
+ def finish_cast(*args, **kwargs):
+ rpc_cast(*args, **kwargs)
+ greenthread.sleep(0.2)
+
+ self.stubs.Set(rpc, 'cast', finish_cast)
+
def tearDown(self):
network_ref = db.project_get_network(self.context,
self.project.id)
db.network_disassociate(self.context, network_ref['id'])
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)
- self.compute.kill()
- self.network.kill()
super(CloudTestCase, self).tearDown()
def _create_key(self, name):
@@ -113,7 +114,6 @@ class CloudTestCase(test.TestCase):
self.cloud.describe_addresses(self.context)
self.cloud.release_address(self.context,
public_ip=address)
- greenthread.sleep(0.3)
db.floating_ip_destroy(self.context, address)
def test_associate_disassociate_address(self):
@@ -129,12 +129,10 @@ class CloudTestCase(test.TestCase):
self.cloud.associate_address(self.context,
instance_id=ec2_id,
public_ip=address)
- greenthread.sleep(0.3)
self.cloud.disassociate_address(self.context,
public_ip=address)
self.cloud.release_address(self.context,
public_ip=address)
- greenthread.sleep(0.3)
self.network.deallocate_fixed_ip(self.context, fixed)
db.instance_destroy(self.context, inst['id'])
db.floating_ip_destroy(self.context, address)
@@ -188,6 +186,52 @@ class CloudTestCase(test.TestCase):
db.service_destroy(self.context, service1['id'])
db.service_destroy(self.context, service2['id'])
+ def test_describe_snapshots(self):
+ """Makes sure describe_snapshots works and filters results."""
+ vol = db.volume_create(self.context, {})
+ snap1 = db.snapshot_create(self.context, {'volume_id': vol['id']})
+ snap2 = db.snapshot_create(self.context, {'volume_id': vol['id']})
+ result = self.cloud.describe_snapshots(self.context)
+ self.assertEqual(len(result['snapshotSet']), 2)
+ snapshot_id = ec2utils.id_to_ec2_id(snap2['id'], 'snap-%08x')
+ result = self.cloud.describe_snapshots(self.context,
+ snapshot_id=[snapshot_id])
+ self.assertEqual(len(result['snapshotSet']), 1)
+ self.assertEqual(
+ ec2utils.ec2_id_to_id(result['snapshotSet'][0]['snapshotId']),
+ snap2['id'])
+ db.snapshot_destroy(self.context, snap1['id'])
+ db.snapshot_destroy(self.context, snap2['id'])
+ db.volume_destroy(self.context, vol['id'])
+
+ def test_create_snapshot(self):
+ """Makes sure create_snapshot works."""
+ vol = db.volume_create(self.context, {'status': "available"})
+ volume_id = ec2utils.id_to_ec2_id(vol['id'], 'vol-%08x')
+
+ result = self.cloud.create_snapshot(self.context,
+ volume_id=volume_id)
+ snapshot_id = result['snapshotId']
+ result = self.cloud.describe_snapshots(self.context)
+ self.assertEqual(len(result['snapshotSet']), 1)
+ self.assertEqual(result['snapshotSet'][0]['snapshotId'], snapshot_id)
+
+ db.snapshot_destroy(self.context, ec2utils.ec2_id_to_id(snapshot_id))
+ db.volume_destroy(self.context, vol['id'])
+
+ def test_delete_snapshot(self):
+ """Makes sure delete_snapshot works."""
+ vol = db.volume_create(self.context, {'status': "available"})
+ snap = db.snapshot_create(self.context, {'volume_id': vol['id'],
+ 'status': "available"})
+ snapshot_id = ec2utils.id_to_ec2_id(snap['id'], 'snap-%08x')
+
+ result = self.cloud.delete_snapshot(self.context,
+ snapshot_id=snapshot_id)
+ self.assertTrue(result)
+
+ db.volume_destroy(self.context, vol['id'])
+
def test_describe_instances(self):
"""Makes sure describe_instances works and filters results."""
inst1 = db.instance_create(self.context, {'reservation_id': 'a',
@@ -306,31 +350,25 @@ class CloudTestCase(test.TestCase):
'instance_type': instance_type,
'max_count': max_count}
rv = self.cloud.run_instances(self.context, **kwargs)
- greenthread.sleep(0.3)
instance_id = rv['instancesSet'][0]['instanceId']
output = self.cloud.get_console_output(context=self.context,
instance_id=[instance_id])
self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE?OUTPUT')
# TODO(soren): We need this until we can stop polling in the rpc code
# for unit tests.
- greenthread.sleep(0.3)
rv = self.cloud.terminate_instances(self.context, [instance_id])
- greenthread.sleep(0.3)
def test_ajax_console(self):
kwargs = {'image_id': 'ami-1'}
rv = self.cloud.run_instances(self.context, **kwargs)
instance_id = rv['instancesSet'][0]['instanceId']
- greenthread.sleep(0.3)
output = self.cloud.get_ajax_console(context=self.context,
instance_id=[instance_id])
self.assertEquals(output['url'],
'%s/?token=FAKETOKEN' % FLAGS.ajax_console_proxy_url)
# TODO(soren): We need this until we can stop polling in the rpc code
# for unit tests.
- greenthread.sleep(0.3)
rv = self.cloud.terminate_instances(self.context, [instance_id])
- greenthread.sleep(0.3)
def test_key_generation(self):
result = self._create_key('test')
diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py
index 44d7c91eb..ffd748efe 100644
--- a/nova/tests/test_rpc.py
+++ b/nova/tests/test_rpc.py
@@ -31,7 +31,6 @@ LOG = logging.getLogger('nova.tests.rpc')
class RpcTestCase(test.TestCase):
- """Test cases for rpc"""
def setUp(self):
super(RpcTestCase, self).setUp()
self.conn = rpc.Connection.instance(True)
@@ -43,14 +42,55 @@ class RpcTestCase(test.TestCase):
self.context = context.get_admin_context()
def test_call_succeed(self):
- """Get a value through rpc call"""
value = 42
result = rpc.call(self.context, 'test', {"method": "echo",
"args": {"value": value}})
self.assertEqual(value, result)
+ def test_call_succeed_despite_multiple_returns(self):
+ value = 42
+ result = rpc.call(self.context, 'test', {"method": "echo_three_times",
+ "args": {"value": value}})
+ self.assertEqual(value + 2, result)
+
+ def test_call_succeed_despite_multiple_returns_yield(self):
+ value = 42
+ result = rpc.call(self.context, 'test',
+ {"method": "echo_three_times_yield",
+ "args": {"value": value}})
+ self.assertEqual(value + 2, result)
+
+ def test_multicall_succeed_once(self):
+ value = 42
+ result = rpc.multicall(self.context,
+ 'test',
+ {"method": "echo",
+ "args": {"value": value}})
+ for i, x in enumerate(result):
+ if i > 0:
+ self.fail('should only receive one response')
+ self.assertEqual(value + i, x)
+
+ def test_multicall_succeed_three_times(self):
+ value = 42
+ result = rpc.multicall(self.context,
+ 'test',
+ {"method": "echo_three_times",
+ "args": {"value": value}})
+ for i, x in enumerate(result):
+ self.assertEqual(value + i, x)
+
+ def test_multicall_succeed_three_times_yield(self):
+ value = 42
+ result = rpc.multicall(self.context,
+ 'test',
+ {"method": "echo_three_times_yield",
+ "args": {"value": value}})
+ for i, x in enumerate(result):
+ self.assertEqual(value + i, x)
+
def test_context_passed(self):
- """Makes sure a context is passed through rpc call"""
+ """Makes sure a context is passed through rpc call."""
value = 42
result = rpc.call(self.context,
'test', {"method": "context",
@@ -58,11 +98,12 @@ class RpcTestCase(test.TestCase):
self.assertEqual(self.context.to_dict(), result)
def test_call_exception(self):
- """Test that exception gets passed back properly
+ """Test that exception gets passed back properly.
rpc.call returns a RemoteError object. The value of the
exception is converted to a string, so we convert it back
to an int in the test.
+
"""
value = 42
self.assertRaises(rpc.RemoteError,
@@ -81,7 +122,7 @@ class RpcTestCase(test.TestCase):
self.assertEqual(int(exc.value), value)
def test_nested_calls(self):
- """Test that we can do an rpc.call inside another call"""
+ """Test that we can do an rpc.call inside another call."""
class Nested(object):
@staticmethod
def echo(context, queue, value):
@@ -108,25 +149,80 @@ class RpcTestCase(test.TestCase):
"value": value}})
self.assertEqual(value, result)
+ def test_connectionpool_single(self):
+ """Test that ConnectionPool recycles a single connection."""
+ conn1 = rpc.ConnectionPool.get()
+ rpc.ConnectionPool.put(conn1)
+ conn2 = rpc.ConnectionPool.get()
+ rpc.ConnectionPool.put(conn2)
+ self.assertEqual(conn1, conn2)
+
+ def test_connectionpool_double(self):
+ """Test that ConnectionPool returns and reuses separate connections.
+
+ When called consecutively we should get separate connections and upon
+ returning them those connections should be reused for future calls
+ before generating a new connection.
+
+ """
+ conn1 = rpc.ConnectionPool.get()
+ conn2 = rpc.ConnectionPool.get()
+
+ self.assertNotEqual(conn1, conn2)
+ rpc.ConnectionPool.put(conn1)
+ rpc.ConnectionPool.put(conn2)
+
+ conn3 = rpc.ConnectionPool.get()
+ conn4 = rpc.ConnectionPool.get()
+ self.assertEqual(conn1, conn3)
+ self.assertEqual(conn2, conn4)
+
+ def test_connectionpool_limit(self):
+ """Test connection pool limit and connection uniqueness."""
+ max_size = FLAGS.rpc_conn_pool_size
+ conns = []
+
+ for i in xrange(max_size):
+ conns.append(rpc.ConnectionPool.get())
+
+ self.assertFalse(rpc.ConnectionPool.free_items)
+ self.assertEqual(rpc.ConnectionPool.current_size,
+ rpc.ConnectionPool.max_size)
+ self.assertEqual(len(set(conns)), max_size)
+
class TestReceiver(object):
- """Simple Proxy class so the consumer has methods to call
+ """Simple Proxy class so the consumer has methods to call.
+
+ Uses static methods because we aren't actually storing any state.
- Uses static methods because we aren't actually storing any state"""
+ """
@staticmethod
def echo(context, value):
- """Simply returns whatever value is sent in"""
+ """Simply returns whatever value is sent in."""
LOG.debug(_("Received %s"), value)
return value
@staticmethod
def context(context, value):
- """Returns dictionary version of context"""
+ """Returns dictionary version of context."""
LOG.debug(_("Received %s"), context)
return context.to_dict()
@staticmethod
+ def echo_three_times(context, value):
+ context.reply(value)
+ context.reply(value + 1)
+ context.reply(value + 2)
+
+ @staticmethod
+ def echo_three_times_yield(context, value):
+ yield value
+ yield value + 1
+ yield value + 2
+
+ @staticmethod
def fail(context, value):
- """Raises an exception with the value sent in"""
+ """Raises an exception with the value sent in."""
raise Exception(value)
diff --git a/nova/tests/test_service.py b/nova/tests/test_service.py
index d48de2057..d1cc8bd61 100644
--- a/nova/tests/test_service.py
+++ b/nova/tests/test_service.py
@@ -106,7 +106,10 @@ class ServiceTestCase(test.TestCase):
# NOTE(vish): Create was moved out of mox replay to make sure that
# the looping calls are created in StartService.
- app = service.Service.create(host=host, binary=binary)
+ app = service.Service.create(host=host, binary=binary, topic=topic)
+
+ self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
+ service.rpc.Connection.instance(new=mox.IgnoreArg())
self.mox.StubOutWithMock(rpc,
'TopicAdapterConsumer',
@@ -114,6 +117,11 @@ class ServiceTestCase(test.TestCase):
self.mox.StubOutWithMock(rpc,
'FanoutAdapterConsumer',
use_mock_anything=True)
+
+ self.mox.StubOutWithMock(rpc,
+ 'ConsumerSet',
+ use_mock_anything=True)
+
rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
topic=topic,
proxy=mox.IsA(service.Service)).AndReturn(
@@ -129,9 +137,14 @@ class ServiceTestCase(test.TestCase):
proxy=mox.IsA(service.Service)).AndReturn(
rpc.FanoutAdapterConsumer)
- rpc.TopicAdapterConsumer.attach_to_eventlet()
- rpc.TopicAdapterConsumer.attach_to_eventlet()
- rpc.FanoutAdapterConsumer.attach_to_eventlet()
+ def wait_func(self, limit=None):
+ return None
+
+ mock_cset = self.mox.CreateMock(rpc.ConsumerSet,
+ {'wait': wait_func})
+ rpc.ConsumerSet(connection=mox.IgnoreArg(),
+ consumer_list=mox.IsA(list)).AndReturn(mock_cset)
+ wait_func(mox.IgnoreArg())
service_create = {'host': host,
'binary': binary,
@@ -287,8 +300,42 @@ class ServiceTestCase(test.TestCase):
# Creating mocks
self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
service.rpc.Connection.instance(new=mox.IgnoreArg())
- service.rpc.Connection.instance(new=mox.IgnoreArg())
- service.rpc.Connection.instance(new=mox.IgnoreArg())
+
+ self.mox.StubOutWithMock(rpc,
+ 'TopicAdapterConsumer',
+ use_mock_anything=True)
+ self.mox.StubOutWithMock(rpc,
+ 'FanoutAdapterConsumer',
+ use_mock_anything=True)
+
+ self.mox.StubOutWithMock(rpc,
+ 'ConsumerSet',
+ use_mock_anything=True)
+
+ rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
+ topic=topic,
+ proxy=mox.IsA(service.Service)).AndReturn(
+ rpc.TopicAdapterConsumer)
+
+ rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
+ topic='%s.%s' % (topic, host),
+ proxy=mox.IsA(service.Service)).AndReturn(
+ rpc.TopicAdapterConsumer)
+
+ rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(),
+ topic=topic,
+ proxy=mox.IsA(service.Service)).AndReturn(
+ rpc.FanoutAdapterConsumer)
+
+ def wait_func(self, limit=None):
+ return None
+
+ mock_cset = self.mox.CreateMock(rpc.ConsumerSet,
+ {'wait': wait_func})
+ rpc.ConsumerSet(connection=mox.IgnoreArg(),
+ consumer_list=mox.IsA(list)).AndReturn(mock_cset)
+ wait_func(mox.IgnoreArg())
+
self.mox.StubOutWithMock(serv.manager.driver,
'update_available_resource')
serv.manager.driver.update_available_resource(mox.IgnoreArg(), host)
diff --git a/nova/tests/test_volume.py b/nova/tests/test_volume.py
index 236d12434..3472b1f59 100644
--- a/nova/tests/test_volume.py
+++ b/nova/tests/test_volume.py
@@ -176,6 +176,34 @@ class VolumeTestCase(test.TestCase):
# This will allow us to test cross-node interactions
pass
+ @staticmethod
+ def _create_snapshot(volume_id, size='0'):
+ """Create a snapshot object."""
+ snap = {}
+ snap['volume_size'] = size
+ snap['user_id'] = 'fake'
+ snap['project_id'] = 'fake'
+ snap['volume_id'] = volume_id
+ snap['status'] = "creating"
+ return db.snapshot_create(context.get_admin_context(), snap)['id']
+
+ def test_create_delete_snapshot(self):
+ """Test snapshot can be created and deleted."""
+ volume_id = self._create_volume()
+ self.volume.create_volume(self.context, volume_id)
+ snapshot_id = self._create_snapshot(volume_id)
+ self.volume.create_snapshot(self.context, volume_id, snapshot_id)
+ self.assertEqual(snapshot_id,
+ db.snapshot_get(context.get_admin_context(),
+ snapshot_id).id)
+
+ self.volume.delete_snapshot(self.context, snapshot_id)
+ self.assertRaises(exception.NotFound,
+ db.snapshot_get,
+ self.context,
+ snapshot_id)
+ self.volume.delete_volume(self.context, volume_id)
+
class DriverTestCase(test.TestCase):
"""Base Test class for Drivers."""
diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py
index 18a267896..9d56c1644 100644
--- a/nova/tests/test_xenapi.py
+++ b/nova/tests/test_xenapi.py
@@ -592,11 +592,29 @@ class XenAPIDiffieHellmanTestCase(test.TestCase):
bob_shared = self.bob.compute_shared(alice_pub)
self.assertEquals(alice_shared, bob_shared)
- def test_encryption(self):
- msg = "This is a top-secret message"
- enc = self.alice.encrypt(msg)
+ def _test_encryption(self, message):
+ enc = self.alice.encrypt(message)
+ self.assertFalse(enc.endswith('\n'))
dec = self.bob.decrypt(enc)
- self.assertEquals(dec, msg)
+ self.assertEquals(dec, message)
+
+ def test_encrypt_simple_message(self):
+ self._test_encryption('This is a simple message.')
+
+ def test_encrypt_message_with_newlines_at_end(self):
+ self._test_encryption('This message has a newline at the end.\n')
+
+ def test_encrypt_many_newlines_at_end(self):
+ self._test_encryption('Message with lotsa newlines.\n\n\n')
+
+ def test_encrypt_newlines_inside_message(self):
+ self._test_encryption('Message\nwith\ninterior\nnewlines.')
+
+ def test_encrypt_with_leading_newlines(self):
+ self._test_encryption('\n\nMessage with leading newlines.')
+
+ def test_encrypt_really_long_message(self):
+ self._test_encryption(''.join(['abcd' for i in xrange(1024)]))
def tearDown(self):
super(XenAPIDiffieHellmanTestCase, self).tearDown()
diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py
index 6d516ddbc..2b3fb6a39 100644
--- a/nova/virt/xenapi/vmops.py
+++ b/nova/virt/xenapi/vmops.py
@@ -1190,26 +1190,22 @@ class SimpleDH(object):
mpi = M2Crypto.m2.bn_to_mpi(bn)
return mpi
- def _run_ssl(self, text, which):
- base_cmd = ('openssl enc -aes-128-cbc -a -pass pass:%(shared)s '
- '-nosalt %(dec_flag)s')
- if which.lower()[0] == 'd':
- dec_flag = ' -d'
- else:
- dec_flag = ''
- shared = self._shared
- cmd = base_cmd % locals()
- proc = _runproc(cmd)
- proc.stdin.write(text + '\n')
+ def _run_ssl(self, text, extra_args=None):
+ if not extra_args:
+ extra_args = ''
+ cmd = 'enc -aes-128-cbc -A -a -pass pass:%s -nosalt %s' % (
+ self._shared, extra_args)
+ proc = _runproc('openssl %s' % cmd)
+ proc.stdin.write(text)
proc.stdin.close()
proc.wait()
err = proc.stderr.read()
if err:
raise RuntimeError(_('OpenSSL error: %s') % err)
- return proc.stdout.read().strip('\n')
+ return proc.stdout.read()
def encrypt(self, text):
- return self._run_ssl(text, 'enc')
+ return self._run_ssl(text).strip('\n')
def decrypt(self, text):
- return self._run_ssl(text, 'dec')
+ return self._run_ssl(text, '-d')
diff --git a/nova/volume/api.py b/nova/volume/api.py
index 09befb647..c1af30de0 100644
--- a/nova/volume/api.py
+++ b/nova/volume/api.py
@@ -90,6 +90,15 @@ class API(base.Base):
return self.db.volume_get_all(context)
return self.db.volume_get_all_by_project(context, context.project_id)
+ def get_snapshot(self, context, snapshot_id):
+ rv = self.db.snapshot_get(context, snapshot_id)
+ return dict(rv.iteritems())
+
+ def get_all_snapshots(self, context):
+ if context.is_admin:
+ return self.db.snapshot_get_all(context)
+ return self.db.snapshot_get_all_by_project(context, context.project_id)
+
def check_attach(self, context, volume_id):
volume = self.get(context, volume_id)
# TODO(vish): abstract status checking?
@@ -110,3 +119,38 @@ class API(base.Base):
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "remove_volume",
"args": {'volume_id': volume_id}})
+
+ def create_snapshot(self, context, volume_id, name, description):
+ volume = self.get(context, volume_id)
+ if volume['status'] != "available":
+ raise exception.ApiError(_("Volume status must be available"))
+
+ options = {
+ 'volume_id': volume_id,
+ 'user_id': context.user_id,
+ 'project_id': context.project_id,
+ 'status': "creating",
+ 'progress': '0%',
+ 'volume_size': volume['size'],
+ 'display_name': name,
+ 'display_description': description}
+
+ snapshot = self.db.snapshot_create(context, options)
+ rpc.cast(context,
+ FLAGS.scheduler_topic,
+ {"method": "create_snapshot",
+ "args": {"topic": FLAGS.volume_topic,
+ "volume_id": volume_id,
+ "snapshot_id": snapshot['id']}})
+ return snapshot
+
+ def delete_snapshot(self, context, snapshot_id):
+ snapshot = self.get_snapshot(context, snapshot_id)
+ if snapshot['status'] != "available":
+ raise exception.ApiError(_("Snapshot status must be available"))
+ self.db.snapshot_update(context, snapshot_id, {'status': 'deleting'})
+ rpc.cast(context,
+ FLAGS.scheduler_topic,
+ {"method": "delete_snapshot",
+ "args": {"topic": FLAGS.volume_topic,
+ "snapshot_id": snapshot_id}})
diff --git a/nova/volume/driver.py b/nova/volume/driver.py
index 55307ad9b..21cc228c9 100644
--- a/nova/volume/driver.py
+++ b/nova/volume/driver.py
@@ -90,42 +90,91 @@ class VolumeDriver(object):
raise exception.Error(_("volume group %s doesn't exist")
% FLAGS.volume_group)
- def create_volume(self, volume):
- """Creates a logical volume. Can optionally return a Dictionary of
- changes to the volume object to be persisted."""
- if int(volume['size']) == 0:
- sizestr = '100M'
- else:
- sizestr = '%sG' % volume['size']
+ def _create_volume(self, volume_name, sizestr):
self._try_execute('sudo', 'lvcreate', '-L', sizestr, '-n',
- volume['name'],
- FLAGS.volume_group)
+ volume_name, FLAGS.volume_group)
- def delete_volume(self, volume):
- """Deletes a logical volume."""
+ def _copy_volume(self, srcstr, deststr, size_in_g):
+ self._execute('sudo', 'dd', 'if=%s' % srcstr, 'of=%s' % deststr,
+ 'count=%d' % (size_in_g * 1024), 'bs=1M')
+
+ def _volume_not_present(self, volume_name):
+ path_name = '%s/%s' % (FLAGS.volume_group, volume_name)
try:
- self._try_execute('sudo', 'lvdisplay',
- '%s/%s' %
- (FLAGS.volume_group,
- volume['name']))
+ self._try_execute('sudo', 'lvdisplay', path_name)
except Exception as e:
- # If the volume isn't present, then don't attempt to delete
+ # If the volume isn't present
return True
+ return False
+ def _delete_volume(self, volume, size_in_g):
+ """Deletes a logical volume."""
# zero out old volumes to prevent data leaking between users
# TODO(ja): reclaiming space should be done lazy and low priority
- self._execute('sudo', 'dd', 'if=/dev/zero',
- 'of=%s' % self.local_path(volume),
- 'count=%d' % (volume['size'] * 1024),
- 'bs=1M')
+ self._copy_volume('/dev/zero', self.local_path(volume), size_in_g)
self._try_execute('sudo', 'lvremove', '-f', "%s/%s" %
(FLAGS.volume_group,
- volume['name']))
+ self._escape_snapshot(volume['name'])))
+
+ def _sizestr(self, size_in_g):
+ if int(size_in_g) == 0:
+ return '100M'
+ return '%sG' % size_in_g
+
+ # Linux LVM reserves name that starts with snapshot, so that
+ # such volume name can't be created. Mangle it.
+ def _escape_snapshot(self, snapshot_name):
+ if not snapshot_name.startswith('snapshot'):
+ return snapshot_name
+ return '_' + snapshot_name
+
+ def create_volume(self, volume):
+ """Creates a logical volume. Can optionally return a Dictionary of
+ changes to the volume object to be persisted."""
+ self._create_volume(volume['name'], self._sizestr(volume['size']))
+
+ def delete_volume(self, volume):
+ """Deletes a logical volume."""
+ if self._volume_not_present(volume['name']):
+ # If the volume isn't present, then don't attempt to delete
+ return True
+
+ # TODO(yamahata): lvm can't delete origin volume only without
+ # deleting derived snapshots. Can we do something fancy?
+ out, err = self._execute('sudo', 'lvdisplay', '--noheading',
+ '-C', '-o', 'Attr',
+ '%s/%s' % (FLAGS.volume_group,
+ volume['name']))
+ # fake_execute returns None resulting unit test error
+ if out:
+ out = out.strip()
+ if (out[0] == 'o') or (out[0] == 'O'):
+ raise exception.VolumeIsBusy(volume_name=volume['name'])
+
+ self._delete_volume(volume, volume['size'])
+
+ def create_snapshot(self, snapshot):
+ """Creates a snapshot."""
+ orig_lv_name = "%s/%s" % (FLAGS.volume_group, snapshot['volume_name'])
+ self._try_execute('sudo', 'lvcreate', '-L',
+ self._sizestr(snapshot['volume_size']),
+ '--name', self._escape_snapshot(snapshot['name']),
+ '--snapshot', orig_lv_name)
+
+ def delete_snapshot(self, snapshot):
+ """Deletes a snapshot."""
+ if self._volume_not_present(self._escape_snapshot(snapshot['name'])):
+ # If the snapshot isn't present, then don't attempt to delete
+ return True
+
+ # TODO(yamahata): zeroing out the whole snapshot triggers COW.
+ # it's quite slow.
+ self._delete_volume(snapshot, snapshot['volume_size'])
def local_path(self, volume):
# NOTE(vish): stops deprecation warning
escaped_group = FLAGS.volume_group.replace('-', '--')
- escaped_name = volume['name'].replace('-', '--')
+ escaped_name = self._escape_snapshot(volume['name']).replace('-', '--')
return "/dev/mapper/%s-%s" % (escaped_group, escaped_name)
def ensure_export(self, context, volume):
@@ -559,6 +608,18 @@ class RBDDriver(VolumeDriver):
self._try_execute('rbd', '--pool', FLAGS.rbd_pool,
'rm', volume['name'])
+ def create_snapshot(self, snapshot):
+ """Creates an rbd snapshot"""
+ self._try_execute('rbd', '--pool', FLAGS.rbd_pool,
+ 'snap', 'create', '--snap', snapshot['name'],
+ snapshot['volume_name'])
+
+ def delete_snapshot(self, snapshot):
+ """Deletes an rbd snapshot"""
+ self._try_execute('rbd', '--pool', FLAGS.rbd_pool,
+ 'snap', 'rm', '--snap', snapshot['name'],
+ snapshot['volume_name'])
+
def local_path(self, volume):
"""Returns the path of the rbd volume."""
# This is the same as the remote path
@@ -600,18 +661,24 @@ class SheepdogDriver(VolumeDriver):
def create_volume(self, volume):
"""Creates a sheepdog volume"""
- if int(volume['size']) == 0:
- sizestr = '100M'
- else:
- sizestr = '%sG' % volume['size']
self._try_execute('qemu-img', 'create',
"sheepdog:%s" % volume['name'],
- sizestr)
+ self._sizestr(volume['size']))
def delete_volume(self, volume):
"""Deletes a logical volume"""
self._try_execute('collie', 'vdi', 'delete', volume['name'])
+ def create_snapshot(self, snapshot):
+ """Creates a sheepdog snapshot"""
+ self._try_execute('qemu-img', 'snapshot', '-c', snapshot['name'],
+ "sheepdog:%s" % snapshot['volume_name'])
+
+ def delete_snapshot(self, snapshot):
+ """Deletes a sheepdog snapshot"""
+ self._try_execute('collie', 'vdi', 'delete', snapshot['volume_name'],
+ '-s', snapshot['name'])
+
def local_path(self, volume):
return "sheepdog:%s" % volume['name']
diff --git a/nova/volume/manager.py b/nova/volume/manager.py
index 2178389ce..40a104d35 100644
--- a/nova/volume/manager.py
+++ b/nova/volume/manager.py
@@ -142,6 +142,12 @@ class VolumeManager(manager.SchedulerDependentManager):
self.driver.remove_export(context, volume_ref)
LOG.debug(_("volume %s: deleting"), volume_ref['name'])
self.driver.delete_volume(volume_ref)
+ except exception.VolumeIsBusy, e:
+ LOG.debug(_("volume %s: volume is busy"), volume_ref['name'])
+ self.driver.ensure_export(context, volume_ref)
+ self.db.volume_update(context, volume_ref['id'],
+ {'status': 'available'})
+ return True
except Exception:
self.db.volume_update(context,
volume_ref['id'],
@@ -152,6 +158,49 @@ class VolumeManager(manager.SchedulerDependentManager):
LOG.debug(_("volume %s: deleted successfully"), volume_ref['name'])
return True
+ def create_snapshot(self, context, volume_id, snapshot_id):
+ """Creates and exports the snapshot."""
+ context = context.elevated()
+ snapshot_ref = self.db.snapshot_get(context, snapshot_id)
+ LOG.info(_("snapshot %s: creating"), snapshot_ref['name'])
+
+ try:
+ snap_name = snapshot_ref['name']
+ LOG.debug(_("snapshot %(snap_name)s: creating") % locals())
+ model_update = self.driver.create_snapshot(snapshot_ref)
+ if model_update:
+ self.db.snapshot_update(context, snapshot_ref['id'],
+ model_update)
+
+ except Exception:
+ self.db.snapshot_update(context,
+ snapshot_ref['id'], {'status': 'error'})
+ raise
+
+ self.db.snapshot_update(context,
+ snapshot_ref['id'], {'status': 'available',
+ 'progress': '100%'})
+ LOG.debug(_("snapshot %s: created successfully"), snapshot_ref['name'])
+ return snapshot_id
+
+ def delete_snapshot(self, context, snapshot_id):
+ """Deletes and unexports snapshot."""
+ context = context.elevated()
+ snapshot_ref = self.db.snapshot_get(context, snapshot_id)
+
+ try:
+ LOG.debug(_("snapshot %s: deleting"), snapshot_ref['name'])
+ self.driver.delete_snapshot(snapshot_ref)
+ except Exception:
+ self.db.snapshot_update(context,
+ snapshot_ref['id'],
+ {'status': 'error_deleting'})
+ raise
+
+ self.db.snapshot_destroy(context, snapshot_id)
+ LOG.debug(_("snapshot %s: deleted successfully"), snapshot_ref['name'])
+ return True
+
def setup_compute_volume(self, context, volume_id):
"""Setup remote volume on compute host.
diff --git a/tools/pip-requires b/tools/pip-requires
index 8f8018765..f1c5b2003 100644
--- a/tools/pip-requires
+++ b/tools/pip-requires
@@ -17,8 +17,7 @@ redis==2.0.0
routes==1.12.3
WebOb==0.9.8
wsgiref==0.1.2
-mox==0.5.0
--f http://pymox.googlecode.com/files/mox-0.5.0.tar.gz
+mox==0.5.3
greenlet==0.3.1
nose
bzr