diff options
author | Dan Wendlandt <dan@nicira.com> | 2011-09-07 14:27:06 -0700 |
---|---|---|
committer | Dan Wendlandt <dan@nicira.com> | 2011-09-07 14:27:06 -0700 |
commit | 0f5eb3f888de5f6eb23f968fa5a2270d2a350bcc (patch) | |
tree | 0ee3f8a60147da042b0b14db350a320228876fa6 | |
parent | e5e3b306985a3b1fdd8a971f48b76eaf8f923f21 (diff) | |
parent | 3e0698e3b57c9f73a359340f51c2797d8adc669a (diff) | |
download | nova-0f5eb3f888de5f6eb23f968fa5a2270d2a350bcc.tar.gz nova-0f5eb3f888de5f6eb23f968fa5a2270d2a350bcc.tar.xz nova-0f5eb3f888de5f6eb23f968fa5a2270d2a350bcc.zip |
merge trunk
74 files changed, 3463 insertions, 1042 deletions
@@ -31,6 +31,7 @@ Devendra Modium <dmodium@isi.edu> Devin Carlen <devin.carlen@gmail.com> Donal Lafferty <donal.lafferty@citrix.com> Ed Leafe <ed@leafe.com> +Edouard Thuleau <thuleau@gmail.com> Eldar Nugaev <reldan@oscloud.ru> Eric Day <eday@oddments.org> Eric Windisch <eric@cloudscaling.com> diff --git a/bin/instance-usage-audit b/bin/instance-usage-audit index a06c6b1b3..7ce5732e7 100755 --- a/bin/instance-usage-audit +++ b/bin/instance-usage-audit @@ -102,9 +102,8 @@ if __name__ == '__main__': logging.setup() begin, end = time_period(FLAGS.instance_usage_audit_period) print "Creating usages for %s until %s" % (str(begin), str(end)) - instances = db.instance_get_active_by_window(context.get_admin_context(), - begin, - end) + ctxt = context.get_admin_context() + instances = db.instance_get_active_by_window_joined(ctxt, begin, end) print "%s instances" % len(instances) for instance_ref in instances: usage_info = utils.usage_from_instance(instance_ref, diff --git a/bin/nova-ajax-console-proxy b/bin/nova-ajax-console-proxy index 0a789b4b9..23fb42fb5 100755 --- a/bin/nova-ajax-console-proxy +++ b/bin/nova-ajax-console-proxy @@ -113,11 +113,10 @@ class AjaxConsoleProxy(object): AjaxConsoleProxy.tokens[kwargs['token']] = \ {'args': kwargs, 'last_activity': time.time()} - conn = rpc.create_connection(new=True) - consumer = rpc.create_consumer( - conn, - FLAGS.ajax_console_proxy_topic, - TopicProxy) + self.conn = rpc.create_connection(new=True) + self.conn.create_consumer( + FLAGS.ajax_console_proxy_topic, + TopicProxy) def delete_expired_tokens(): now = time.time() @@ -129,7 +128,7 @@ class AjaxConsoleProxy(object): for k in to_delete: del AjaxConsoleProxy.tokens[k] - utils.LoopingCall(consumer.fetch, enable_callbacks=True).start(0.1) + self.conn.consume_in_thread() utils.LoopingCall(delete_expired_tokens).start(1) if __name__ == '__main__': @@ -142,3 +141,4 @@ if __name__ == '__main__': server = wsgi.Server("AJAX Console Proxy", acp, port=acp_port) service.serve(server) service.wait() + self.conn.close() diff --git a/bin/nova-manage b/bin/nova-manage index 6dd2920d1..bc191b2f0 100755 --- a/bin/nova-manage +++ b/bin/nova-manage @@ -166,7 +166,7 @@ class VpnCommands(object): print address, print vpn['host'], print ec2utils.id_to_ec2_id(vpn['id']), - print vpn['state_description'], + print vpn['vm_state'], print state else: print None @@ -892,7 +892,7 @@ class VmCommands(object): instance['hostname'], instance['host'], instance['instance_type'].name, - instance['state_description'], + instance['vm_state'], instance['launched_at'], instance['image_ref'], instance['kernel_id'], @@ -1246,7 +1246,7 @@ class VsaCommands(object): type=vc['instance_type']['name'], fl_ip=floating_addr, fx_ip=fixed_addr, - stat=vc['state_description'], + stat=vc['vm_state'], host=vc['host'], time=str(vc['created_at'])) diff --git a/contrib/nova.sh b/contrib/nova.sh index 7994e5133..16cddebd5 100755 --- a/contrib/nova.sh +++ b/contrib/nova.sh @@ -81,7 +81,7 @@ if [ "$CMD" == "install" ]; then sudo apt-get install -y python-netaddr python-pastedeploy python-eventlet sudo apt-get install -y python-novaclient python-glance python-cheetah sudo apt-get install -y python-carrot python-tempita python-sqlalchemy - sudo apt-get install -y python-suds + sudo apt-get install -y python-suds python-kombu if [ "$USE_IPV6" == 1 ]; then diff --git a/nova/api/ec2/__init__.py b/nova/api/ec2/__init__.py index 5430f443d..3b217e62e 100644 --- a/nova/api/ec2/__init__.py +++ b/nova/api/ec2/__init__.py @@ -20,7 +20,10 @@ Starting point for routing EC2 requests. """ -import httplib2 +from urlparse import urlparse + +import eventlet +from eventlet.green import httplib import webob import webob.dec import webob.exc @@ -35,7 +38,6 @@ from nova.api.ec2 import apirequest from nova.api.ec2 import ec2utils from nova.auth import manager - FLAGS = flags.FLAGS LOG = logging.getLogger("nova.api") flags.DEFINE_integer('lockout_attempts', 5, @@ -158,7 +160,6 @@ class ToToken(wsgi.Middleware): auth_params.pop('Signature') # Authenticate the request. - client = httplib2.Http() creds = {'ec2Credentials': {'access': access, 'signature': signature, 'host': req.host, @@ -166,18 +167,24 @@ class ToToken(wsgi.Middleware): 'path': req.path, 'params': auth_params, }} - headers = {'Content-Type': 'application/json'}, - resp, content = client.request(FLAGS.keystone_ec2_url, - 'POST', - headers=headers, - body=utils.dumps(creds)) + creds_json = utils.dumps(creds) + headers = {'Content-Type': 'application/json'} + o = urlparse(FLAGS.keystone_ec2_url) + if o.scheme == "http": + conn = httplib.HTTPConnection(o.netloc) + else: + conn = httplib.HTTPSConnection(o.netloc) + conn.request('POST', o.path, body=creds_json, headers=headers) + response = conn.getresponse().read() + conn.close() + # NOTE(vish): We could save a call to keystone by # having keystone return token, tenant, # user, and roles from this call. - result = utils.loads(content) + result = utils.loads(response) # TODO(vish): check for errors - token_id = result['auth']['token']['id'] + token_id = result['auth']['token']['id'] # Authenticated! req.headers['X-Auth-Token'] = token_id return self.application @@ -392,18 +399,20 @@ class Executor(wsgi.Application): except exception.InstanceNotFound as ex: LOG.info(_('InstanceNotFound raised: %s'), unicode(ex), context=context) - return self._error(req, context, type(ex).__name__, ex.message) + ec2_id = ec2utils.id_to_ec2_id(ex.kwargs['instance_id']) + message = ex.message % {'instance_id': ec2_id} + return self._error(req, context, type(ex).__name__, message) except exception.VolumeNotFound as ex: LOG.info(_('VolumeNotFound raised: %s'), unicode(ex), context=context) - ec2_id = ec2utils.id_to_ec2_vol_id(ex.volume_id) - message = _('Volume %s not found') % ec2_id + ec2_id = ec2utils.id_to_ec2_vol_id(ex.kwargs['volume_id']) + message = ex.message % {'volume_id': ec2_id} return self._error(req, context, type(ex).__name__, message) except exception.SnapshotNotFound as ex: LOG.info(_('SnapshotNotFound raised: %s'), unicode(ex), context=context) - ec2_id = ec2utils.id_to_ec2_snap_id(ex.snapshot_id) - message = _('Snapshot %s not found') % ec2_id + ec2_id = ec2utils.id_to_ec2_snap_id(ex.kwargs['snapshot_id']) + message = ex.message % {'snapshot_id': ec2_id} return self._error(req, context, type(ex).__name__, message) except exception.NotFound as ex: LOG.info(_('NotFound raised: %s'), unicode(ex), context=context) diff --git a/nova/api/ec2/admin.py b/nova/api/ec2/admin.py index dfbbc0a2b..75e029509 100644 --- a/nova/api/ec2/admin.py +++ b/nova/api/ec2/admin.py @@ -21,7 +21,6 @@ Admin API controller, exposed through http via the api worker. """ import base64 -import datetime import netaddr import urllib @@ -33,6 +32,7 @@ from nova import log as logging from nova import utils from nova.api.ec2 import ec2utils from nova.auth import manager +from nova.compute import vm_states FLAGS = flags.FLAGS @@ -273,8 +273,7 @@ class AdminController(object): """Get the VPN instance for a project ID.""" for instance in db.instance_get_all_by_project(context, project_id): if (instance['image_id'] == str(FLAGS.vpn_image_id) - and not instance['state_description'] in - ['shutting_down', 'shutdown']): + and not instance['vm_state'] in [vm_states.DELETED]): return instance def start_vpn(self, context, project): diff --git a/nova/api/ec2/cloud.py b/nova/api/ec2/cloud.py index 9aebf92e3..049ca6f93 100644 --- a/nova/api/ec2/cloud.py +++ b/nova/api/ec2/cloud.py @@ -47,6 +47,7 @@ from nova import utils from nova import volume from nova.api.ec2 import ec2utils from nova.compute import instance_types +from nova.compute import vm_states from nova.image import s3 @@ -78,6 +79,30 @@ def _gen_key(context, user_id, key_name): return {'private_key': private_key, 'fingerprint': fingerprint} +# EC2 API can return the following values as documented in the EC2 API +# http://docs.amazonwebservices.com/AWSEC2/latest/APIReference/ +# ApiReference-ItemType-InstanceStateType.html +# pending | running | shutting-down | terminated | stopping | stopped +_STATE_DESCRIPTION_MAP = { + None: 'pending', + vm_states.ACTIVE: 'running', + vm_states.BUILDING: 'pending', + vm_states.REBUILDING: 'pending', + vm_states.DELETED: 'terminated', + vm_states.STOPPED: 'stopped', + vm_states.MIGRATING: 'migrate', + vm_states.RESIZING: 'resize', + vm_states.PAUSED: 'pause', + vm_states.SUSPENDED: 'suspend', + vm_states.RESCUED: 'rescue', +} + + +def state_description_from_vm_state(vm_state): + """Map the vm state to the server status string""" + return _STATE_DESCRIPTION_MAP.get(vm_state, vm_state) + + # TODO(yamahata): hypervisor dependent default device name _DEFAULT_ROOT_DEVICE_NAME = '/dev/sda1' _DEFAULT_MAPPINGS = {'ami': 'sda1', @@ -995,14 +1020,6 @@ class CloudController(object): 'status': volume['attach_status'], 'volumeId': ec2utils.id_to_ec2_vol_id(volume_id)} - @staticmethod - def _convert_to_set(lst, label): - if lst is None or lst == []: - return None - if not isinstance(lst, list): - lst = [lst] - return [{label: x} for x in lst] - def _format_kernel_id(self, instance_ref, result, key): kernel_id = instance_ref['kernel_id'] if kernel_id is None: @@ -1039,11 +1056,12 @@ class CloudController(object): def _format_attr_instance_initiated_shutdown_behavior(instance, result): - state_description = instance['state_description'] - state_to_value = {'stopping': 'stop', - 'stopped': 'stop', - 'terminating': 'terminate'} - value = state_to_value.get(state_description) + vm_state = instance['vm_state'] + state_to_value = { + vm_states.STOPPED: 'stopped', + vm_states.DELETED: 'terminated', + } + value = state_to_value.get(vm_state) if value: result['instanceInitiatedShutdownBehavior'] = value @@ -1160,7 +1178,7 @@ class CloudController(object): if instance.get('security_groups'): for security_group in instance['security_groups']: security_group_names.append(security_group['name']) - result['groupSet'] = CloudController._convert_to_set( + result['groupSet'] = utils.convert_to_list_dict( security_group_names, 'groupId') def _format_instances(self, context, instance_id=None, use_v6=False, @@ -1198,8 +1216,8 @@ class CloudController(object): self._format_kernel_id(instance, i, 'kernelId') self._format_ramdisk_id(instance, i, 'ramdiskId') i['instanceState'] = { - 'code': instance['state'], - 'name': instance['state_description']} + 'code': instance['power_state'], + 'name': state_description_from_vm_state(instance['vm_state'])} fixed_addr = None floating_addr = None if instance['fixed_ips']: @@ -1224,7 +1242,8 @@ class CloudController(object): i['keyName'] = '%s (%s, %s)' % (i['keyName'], instance['project_id'], instance['host']) - i['productCodesSet'] = self._convert_to_set([], 'product_codes') + i['productCodesSet'] = utils.convert_to_list_dict([], + 'product_codes') self._format_instance_type(instance, i) i['launchTime'] = instance['created_at'] i['amiLaunchIndex'] = instance['launch_index'] @@ -1618,22 +1637,22 @@ class CloudController(object): # stop the instance if necessary restart_instance = False if not no_reboot: - state_description = instance['state_description'] + vm_state = instance['vm_state'] # if the instance is in subtle state, refuse to proceed. - if state_description not in ('running', 'stopping', 'stopped'): + if vm_state not in (vm_states.ACTIVE, vm_states.STOPPED): raise exception.InstanceNotRunning(instance_id=ec2_instance_id) - if state_description == 'running': + if vm_state == vm_states.ACTIVE: restart_instance = True self.compute_api.stop(context, instance_id=instance_id) # wait instance for really stopped start_time = time.time() - while state_description != 'stopped': + while vm_state != vm_states.STOPPED: time.sleep(1) instance = self.compute_api.get(context, instance_id) - state_description = instance['state_description'] + vm_state = instance['vm_state'] # NOTE(yamahata): timeout and error. 1 hour for now for safety. # Is it too short/long? # Or is there any better way? diff --git a/nova/api/openstack/common.py b/nova/api/openstack/common.py index d9eb832f2..d743a66ef 100644 --- a/nova/api/openstack/common.py +++ b/nova/api/openstack/common.py @@ -27,7 +27,8 @@ from nova import flags from nova import log as logging from nova import quota from nova.api.openstack import wsgi -from nova.compute import power_state as compute_power_state +from nova.compute import vm_states +from nova.compute import task_states LOG = logging.getLogger('nova.api.openstack.common') @@ -38,36 +39,61 @@ XML_NS_V10 = 'http://docs.rackspacecloud.com/servers/api/v1.0' XML_NS_V11 = 'http://docs.openstack.org/compute/api/v1.1' -_STATUS_MAP = { - None: 'BUILD', - compute_power_state.NOSTATE: 'BUILD', - compute_power_state.RUNNING: 'ACTIVE', - compute_power_state.BLOCKED: 'ACTIVE', - compute_power_state.SUSPENDED: 'SUSPENDED', - compute_power_state.PAUSED: 'PAUSED', - compute_power_state.SHUTDOWN: 'SHUTDOWN', - compute_power_state.SHUTOFF: 'SHUTOFF', - compute_power_state.CRASHED: 'ERROR', - compute_power_state.FAILED: 'ERROR', - compute_power_state.BUILDING: 'BUILD', +_STATE_MAP = { + vm_states.ACTIVE: { + 'default': 'ACTIVE', + task_states.REBOOTING: 'REBOOT', + task_states.UPDATING_PASSWORD: 'PASSWORD', + task_states.RESIZE_VERIFY: 'VERIFY_RESIZE', + }, + vm_states.BUILDING: { + 'default': 'BUILD', + }, + vm_states.REBUILDING: { + 'default': 'REBUILD', + }, + vm_states.STOPPED: { + 'default': 'STOPPED', + }, + vm_states.MIGRATING: { + 'default': 'MIGRATING', + }, + vm_states.RESIZING: { + 'default': 'RESIZE', + }, + vm_states.PAUSED: { + 'default': 'PAUSED', + }, + vm_states.SUSPENDED: { + 'default': 'SUSPENDED', + }, + vm_states.RESCUED: { + 'default': 'RESCUE', + }, + vm_states.ERROR: { + 'default': 'ERROR', + }, + vm_states.DELETED: { + 'default': 'DELETED', + }, } -def status_from_power_state(power_state): - """Map the power state to the server status string""" - return _STATUS_MAP[power_state] +def status_from_state(vm_state, task_state='default'): + """Given vm_state and task_state, return a status string.""" + task_map = _STATE_MAP.get(vm_state, dict(default='UNKNOWN_STATE')) + status = task_map.get(task_state, task_map['default']) + LOG.debug("Generated %(status)s from vm_state=%(vm_state)s " + "task_state=%(task_state)s." % locals()) + return status -def power_states_from_status(status): - """Map the server status string to a list of power states""" - power_states = [] - for power_state, status_map in _STATUS_MAP.iteritems(): - # Skip the 'None' state - if power_state is None: - continue - if status.lower() == status_map.lower(): - power_states.append(power_state) - return power_states +def vm_state_from_status(status): + """Map the server status string to a vm state.""" + for state, task_map in _STATE_MAP.iteritems(): + status_string = task_map.get("default") + if status.lower() == status_string.lower(): + return state def get_pagination_params(request): diff --git a/nova/api/openstack/contrib/createserverext.py b/nova/api/openstack/contrib/createserverext.py index ba72fdb0b..af7f37f13 100644 --- a/nova/api/openstack/contrib/createserverext.py +++ b/nova/api/openstack/contrib/createserverext.py @@ -14,18 +14,34 @@ # License for the specific language governing permissions and limitations # under the License +from nova import utils from nova.api.openstack import create_instance_helper as helper from nova.api.openstack import extensions from nova.api.openstack import servers from nova.api.openstack import wsgi -class Createserverext(extensions.ExtensionDescriptor): - """The servers create ext +class CreateServerController(servers.ControllerV11): + def _build_view(self, req, instance, is_detail=False): + server = super(CreateServerController, self)._build_view(req, + instance, + is_detail) + if is_detail: + self._build_security_groups(server['server'], instance) + return server + + def _build_security_groups(self, response, inst): + sg_names = [] + sec_groups = inst.get('security_groups') + if sec_groups: + sg_names = [sec_group['name'] for sec_group in sec_groups] - Exposes addFixedIp and removeFixedIp actions on servers. + response['security_groups'] = utils.convert_to_list_dict(sg_names, + 'name') - """ + +class Createserverext(extensions.ExtensionDescriptor): + """The servers create ext""" def get_name(self): return "Createserverext" @@ -58,7 +74,7 @@ class Createserverext(extensions.ExtensionDescriptor): deserializer = wsgi.RequestDeserializer(body_deserializers) res = extensions.ResourceExtension('os-create-server-ext', - controller=servers.ControllerV11(), + controller=CreateServerController(), deserializer=deserializer, serializer=serializer) resources.append(res) diff --git a/nova/api/openstack/contrib/floating_ips.py b/nova/api/openstack/contrib/floating_ips.py index 40086f778..d1add8f83 100644 --- a/nova/api/openstack/contrib/floating_ips.py +++ b/nova/api/openstack/contrib/floating_ips.py @@ -36,9 +36,9 @@ def _translate_floating_ip_view(floating_ip): result['fixed_ip'] = floating_ip['fixed_ip']['address'] except (TypeError, KeyError): result['fixed_ip'] = None - if 'instance' in floating_ip: - result['instance_id'] = floating_ip['instance']['id'] - else: + try: + result['instance_id'] = floating_ip['fixed_ip']['instance_id'] + except (TypeError, KeyError): result['instance_id'] = None return {'floating_ip': result} @@ -96,7 +96,8 @@ class FloatingIPController(object): except rpc.RemoteError as ex: # NOTE(tr3buchet) - why does this block exist? if ex.exc_type == 'NoMoreFloatingIps': - raise exception.NoMoreFloatingIps() + msg = _("No more floating ips available.") + raise webob.exc.HTTPBadRequest(explanation=msg) else: raise @@ -138,7 +139,11 @@ class Floating_ips(extensions.ExtensionDescriptor): msg = _("Address not specified") raise webob.exc.HTTPBadRequest(explanation=msg) - self.compute_api.associate_floating_ip(context, instance_id, address) + try: + self.compute_api.associate_floating_ip(context, instance_id, + address) + except exception.ApiError, e: + raise webob.exc.HTTPBadRequest(explanation=e.message) return webob.Response(status_int=202) diff --git a/nova/api/openstack/contrib/simple_tenant_usage.py b/nova/api/openstack/contrib/simple_tenant_usage.py new file mode 100644 index 000000000..42691a9fa --- /dev/null +++ b/nova/api/openstack/contrib/simple_tenant_usage.py @@ -0,0 +1,236 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import urlparse +import webob + +from datetime import datetime +from nova import exception +from nova import flags +from nova.compute import api +from nova.api.openstack import extensions +from nova.api.openstack import views +from nova.db.sqlalchemy.session import get_session +from webob import exc + + +FLAGS = flags.FLAGS + + +class SimpleTenantUsageController(object): + def _hours_for(self, instance, period_start, period_stop): + launched_at = instance['launched_at'] + terminated_at = instance['terminated_at'] + if terminated_at is not None: + if not isinstance(terminated_at, datetime): + terminated_at = datetime.strptime(terminated_at, + "%Y-%m-%d %H:%M:%S.%f") + + if launched_at is not None: + if not isinstance(launched_at, datetime): + launched_at = datetime.strptime(launched_at, + "%Y-%m-%d %H:%M:%S.%f") + + if terminated_at and terminated_at < period_start: + return 0 + # nothing if it started after the usage report ended + if launched_at and launched_at > period_stop: + return 0 + if launched_at: + # if instance launched after period_started, don't charge for first + start = max(launched_at, period_start) + if terminated_at: + # if instance stopped before period_stop, don't charge after + stop = min(period_stop, terminated_at) + else: + # instance is still running, so charge them up to current time + stop = period_stop + dt = stop - start + seconds = dt.days * 3600 * 24 + dt.seconds\ + + dt.microseconds / 100000.0 + + return seconds / 3600.0 + else: + # instance hasn't launched, so no charge + return 0 + + def _tenant_usages_for_period(self, context, period_start, + period_stop, tenant_id=None, detailed=True): + + compute_api = api.API() + instances = compute_api.get_active_by_window(context, + period_start, + period_stop, + tenant_id) + from nova import log as logging + logging.info(instances) + rval = {} + flavors = {} + + for instance in instances: + info = {} + info['hours'] = self._hours_for(instance, + period_start, + period_stop) + flavor_type = instance['instance_type_id'] + + if not flavors.get(flavor_type): + try: + it_ref = compute_api.get_instance_type(context, + flavor_type) + flavors[flavor_type] = it_ref + except exception.InstanceTypeNotFound: + # can't bill if there is no instance type + continue + + flavor = flavors[flavor_type] + + info['name'] = instance['display_name'] + + info['memory_mb'] = flavor['memory_mb'] + info['local_gb'] = flavor['local_gb'] + info['vcpus'] = flavor['vcpus'] + + info['tenant_id'] = instance['project_id'] + + info['flavor'] = flavor['name'] + + info['started_at'] = instance['launched_at'] + + info['ended_at'] = instance['terminated_at'] + + if info['ended_at']: + info['state'] = 'terminated' + else: + info['state'] = instance['vm_state'] + + now = datetime.utcnow() + + if info['state'] == 'terminated': + delta = info['ended_at'] - info['started_at'] + else: + delta = now - info['started_at'] + + info['uptime'] = delta.days * 24 * 60 + delta.seconds + + if not info['tenant_id'] in rval: + summary = {} + summary['tenant_id'] = info['tenant_id'] + if detailed: + summary['server_usages'] = [] + summary['total_local_gb_usage'] = 0 + summary['total_vcpus_usage'] = 0 + summary['total_memory_mb_usage'] = 0 + summary['total_hours'] = 0 + summary['start'] = period_start + summary['stop'] = period_stop + rval[info['tenant_id']] = summary + + summary = rval[info['tenant_id']] + summary['total_local_gb_usage'] += info['local_gb'] * info['hours'] + summary['total_vcpus_usage'] += info['vcpus'] * info['hours'] + summary['total_memory_mb_usage'] += info['memory_mb']\ + * info['hours'] + + summary['total_hours'] += info['hours'] + if detailed: + summary['server_usages'].append(info) + + return rval.values() + + def _parse_datetime(self, dtstr): + if isinstance(dtstr, datetime): + return dtstr + try: + return datetime.strptime(dtstr, "%Y-%m-%dT%H:%M:%S") + except: + try: + return datetime.strptime(dtstr, "%Y-%m-%dT%H:%M:%S.%f") + except: + return datetime.strptime(dtstr, "%Y-%m-%d %H:%M:%S.%f") + + def _get_datetime_range(self, req): + qs = req.environ.get('QUERY_STRING', '') + env = urlparse.parse_qs(qs) + period_start = self._parse_datetime(env.get('start', + [datetime.utcnow().isoformat()])[0]) + period_stop = self._parse_datetime(env.get('end', + [datetime.utcnow().isoformat()])[0]) + + detailed = bool(env.get('detailed', False)) + return (period_start, period_stop, detailed) + + def index(self, req): + """Retrive tenant_usage for all tenants""" + context = req.environ['nova.context'] + + if not context.is_admin and FLAGS.allow_admin_api: + return webob.Response(status_int=403) + + (period_start, period_stop, detailed) = self._get_datetime_range(req) + usages = self._tenant_usages_for_period(context, + period_start, + period_stop, + detailed=detailed) + return {'tenant_usages': usages} + + def show(self, req, id): + """Retrive tenant_usage for a specified tenant""" + tenant_id = id + context = req.environ['nova.context'] + + if not context.is_admin and FLAGS.allow_admin_api: + if tenant_id != context.project_id: + return webob.Response(status_int=403) + + (period_start, period_stop, ignore) = self._get_datetime_range(req) + usage = self._tenant_usages_for_period(context, + period_start, + period_stop, + tenant_id=tenant_id, + detailed=True) + if len(usage): + usage = usage[0] + else: + usage = {} + return {'tenant_usage': usage} + + +class Simple_tenant_usage(extensions.ExtensionDescriptor): + def get_name(self): + return "SimpleTenantUsage" + + def get_alias(self): + return "os-simple-tenant-usage" + + def get_description(self): + return "Simple tenant usage extension" + + def get_namespace(self): + return "http://docs.openstack.org/ext/os-simple-tenant-usage/api/v1.1" + + def get_updated(self): + return "2011-08-19T00:00:00+00:00" + + def get_resources(self): + resources = [] + + res = extensions.ResourceExtension('os-simple-tenant-usage', + SimpleTenantUsageController()) + resources.append(res) + + return resources diff --git a/nova/api/openstack/create_instance_helper.py b/nova/api/openstack/create_instance_helper.py index 483ff4985..67e669c17 100644 --- a/nova/api/openstack/create_instance_helper.py +++ b/nova/api/openstack/create_instance_helper.py @@ -19,7 +19,6 @@ import base64 from webob import exc from xml.dom import minidom -from nova import db from nova import exception from nova import flags from nova import log as logging @@ -74,20 +73,17 @@ class CreateInstanceHelper(object): if not 'server' in body: raise exc.HTTPUnprocessableEntity() - server_dict = body['server'] context = req.environ['nova.context'] + server_dict = body['server'] password = self.controller._get_server_admin_password(server_dict) - key_name = None - key_data = None - # TODO(vish): Key pair access should move into a common library - # instead of being accessed directly from the db. - key_pairs = db.key_pair_get_all_by_user(context.elevated(), - context.user_id) - if key_pairs: - key_pair = key_pairs[0] - key_name = key_pair['name'] - key_data = key_pair['public_key'] + if not 'name' in server_dict: + msg = _("Server name is not defined") + raise exc.HTTPBadRequest(explanation=msg) + + name = server_dict['name'] + self._validate_server_name(name) + name = name.strip() image_href = self.controller._image_ref_from_req_data(body) # If the image href was generated by nova api, strip image_href @@ -98,7 +94,7 @@ class CreateInstanceHelper(object): try: image_service, image_id = nova.image.get_image_service(image_href) kernel_id, ramdisk_id = self._get_kernel_ramdisk_from_image( - req, image_id) + req, image_service, image_id) images = set([str(x['id']) for x in image_service.index(context)]) assert str(image_id) in images except Exception, e: @@ -133,12 +129,13 @@ class CreateInstanceHelper(object): msg = _("Invalid flavorRef provided.") raise exc.HTTPBadRequest(explanation=msg) - if not 'name' in server_dict: - msg = _("Server name is not defined") - raise exc.HTTPBadRequest(explanation=msg) - zone_blob = server_dict.get('blob') + + # optional openstack extensions: + key_name = server_dict.get('key_name') user_data = server_dict.get('user_data') + self._validate_user_data(user_data) + availability_zone = server_dict.get('availability_zone') name = server_dict['name'] self._validate_server_name(name) @@ -173,7 +170,6 @@ class CreateInstanceHelper(object): display_name=name, display_description=name, key_name=key_name, - key_data=key_data, metadata=server_dict.get('metadata', {}), access_ip_v4=server_dict.get('accessIPv4'), access_ip_v6=server_dict.get('accessIPv6'), @@ -196,6 +192,9 @@ class CreateInstanceHelper(object): except exception.FlavorNotFound as error: msg = _("Invalid flavorRef provided.") raise exc.HTTPBadRequest(explanation=msg) + except exception.KeypairNotFound as error: + msg = _("Invalid key_name provided.") + raise exc.HTTPBadRequest(explanation=msg) except exception.SecurityGroupNotFound as error: raise exc.HTTPBadRequest(explanation=unicode(error)) except RemoteError as err: @@ -248,12 +247,12 @@ class CreateInstanceHelper(object): msg = _("Server name is an empty string") raise exc.HTTPBadRequest(explanation=msg) - def _get_kernel_ramdisk_from_image(self, req, image_id): + def _get_kernel_ramdisk_from_image(self, req, image_service, image_id): """Fetch an image from the ImageService, then if present, return the associated kernel and ramdisk image IDs. """ context = req.environ['nova.context'] - image_meta = self._image_service.show(context, image_id) + image_meta = image_service.show(context, image_id) # NOTE(sirp): extracted to a separate method to aid unit-testing, the # new method doesn't need a request obj or an ImageService stub kernel_id, ramdisk_id = self._do_get_kernel_ramdisk_from_image( @@ -283,7 +282,7 @@ class CreateInstanceHelper(object): try: ramdisk_id = image_meta['properties']['ramdisk_id'] except KeyError: - raise exception.RamdiskNotFoundForImage(image_id=image_id) + ramdisk_id = None return kernel_id, ramdisk_id @@ -370,6 +369,16 @@ class CreateInstanceHelper(object): return networks + def _validate_user_data(self, user_data): + """Check if the user_data is encoded properly""" + if not user_data: + return + try: + user_data = base64.b64decode(user_data) + except TypeError: + expl = _('Userdata content cannot be decoded') + raise exc.HTTPBadRequest(explanation=expl) + class ServerXMLDeserializer(wsgi.XMLDeserializer): """ diff --git a/nova/api/openstack/schemas/v1.1/server.rng b/nova/api/openstack/schemas/v1.1/server.rng index dbd169a83..ef835e408 100644 --- a/nova/api/openstack/schemas/v1.1/server.rng +++ b/nova/api/openstack/schemas/v1.1/server.rng @@ -1,6 +1,8 @@ <element name="server" ns="http://docs.openstack.org/compute/api/v1.1" xmlns="http://relaxng.org/ns/structure/1.0"> <attribute name="name"> <text/> </attribute> + <attribute name="userId"> <text/> </attribute> + <attribute name="tenantId"> <text/> </attribute> <attribute name="id"> <text/> </attribute> <attribute name="uuid"> <text/> </attribute> <attribute name="updated"> <text/> </attribute> diff --git a/nova/api/openstack/servers.py b/nova/api/openstack/servers.py index 27c67e79e..d084ac360 100644 --- a/nova/api/openstack/servers.py +++ b/nova/api/openstack/servers.py @@ -22,6 +22,7 @@ from xml.dom import minidom import webob from nova import compute +from nova import db from nova import exception from nova import flags from nova import log as logging @@ -95,17 +96,23 @@ class Controller(object): search_opts['recurse_zones'] = utils.bool_from_str( search_opts.get('recurse_zones', False)) - # If search by 'status', we need to convert it to 'state' - # If the status is unknown, bail. - # Leave 'state' in search_opts so compute can pass it on to - # child zones.. + # If search by 'status', we need to convert it to 'vm_state' + # to pass on to child zones. if 'status' in search_opts: status = search_opts['status'] - search_opts['state'] = common.power_states_from_status(status) - if len(search_opts['state']) == 0: + state = common.vm_state_from_status(status) + if state is None: reason = _('Invalid server status: %(status)s') % locals() - LOG.error(reason) raise exception.InvalidInput(reason=reason) + search_opts['vm_state'] = state + + if 'changes-since' in search_opts: + try: + parsed = utils.parse_isotime(search_opts['changes-since']) + except ValueError: + msg = _('Invalid changes-since value') + raise exc.HTTPBadRequest(explanation=msg) + search_opts['changes-since'] = parsed # By default, compute's get_all() will return deleted instances. # If an admin hasn't specified a 'deleted' search option, we need @@ -114,23 +121,17 @@ class Controller(object): # should return recently deleted images according to the API spec. if 'deleted' not in search_opts: - # Admin hasn't specified deleted filter if 'changes-since' not in search_opts: - # No 'changes-since', so we need to find non-deleted servers + # No 'changes-since', so we only want non-deleted servers search_opts['deleted'] = False - else: - # This is the default, but just in case.. - search_opts['deleted'] = True - - instance_list = self.compute_api.get_all( - context, search_opts=search_opts) - # FIXME(comstud): 'changes-since' is not fully implemented. Where - # should this be filtered? + instance_list = self.compute_api.get_all(context, + search_opts=search_opts) limited_list = self._limit_items(instance_list, req) servers = [self._build_view(req, inst, is_detail)['server'] - for inst in limited_list] + for inst in limited_list] + return dict(servers=servers) @scheduler_api.redirect_handler @@ -143,10 +144,16 @@ class Controller(object): except exception.NotFound: raise exc.HTTPNotFound() + def _get_key_name(self, req, body): + """ Get default keypair if not set """ + raise NotImplementedError() + def create(self, req, body): """ Creates a new server for a given user """ + if 'server' in body: + body['server']['key_name'] = self._get_key_name(req, body) + extra_values = None - result = None extra_values, instances = self.helper.create_instance( req, body, self.compute_api.create) @@ -564,6 +571,13 @@ class ControllerV10(Controller): raise exc.HTTPNotFound() return webob.Response(status_int=202) + def _get_key_name(self, req, body): + context = req.environ["nova.context"] + keypairs = db.key_pair_get_all_by_user(context, + context.user_id) + if keypairs: + return keypairs[0]['name'] + def _image_ref_from_req_data(self, data): return data['server']['imageId'] @@ -608,9 +622,8 @@ class ControllerV10(Controller): try: self.compute_api.rebuild(context, instance_id, image_id, password) - except exception.BuildInProgress: - msg = _("Instance %s is currently being rebuilt.") % instance_id - LOG.debug(msg) + except exception.RebuildRequiresActiveInstance: + msg = _("Instance %s must be active to rebuild.") % instance_id raise exc.HTTPConflict(explanation=msg) return webob.Response(status_int=202) @@ -635,6 +648,10 @@ class ControllerV11(Controller): except exception.NotFound: raise exc.HTTPNotFound() + def _get_key_name(self, req, body): + if 'server' in body: + return body['server'].get('key_name') + def _image_ref_from_req_data(self, data): try: return data['server']['imageRef'] @@ -750,9 +767,8 @@ class ControllerV11(Controller): self.compute_api.rebuild(context, instance_id, image_href, password, name=name, metadata=metadata, files_to_inject=personalities) - except exception.BuildInProgress: - msg = _("Instance %s is currently being rebuilt.") % instance_id - LOG.debug(msg) + except exception.RebuildRequiresActiveInstance: + msg = _("Instance %s must be active to rebuild.") % instance_id raise exc.HTTPConflict(explanation=msg) except exception.InstanceNotFound: msg = _("Instance %s could not be found") % instance_id @@ -857,6 +873,8 @@ class ServerXMLSerializer(wsgi.XMLDictSerializer): def _add_server_attributes(self, node, server): node.setAttribute('id', str(server['id'])) + node.setAttribute('userId', str(server['user_id'])) + node.setAttribute('tenantId', str(server['tenant_id'])) node.setAttribute('uuid', str(server['uuid'])) node.setAttribute('hostId', str(server['hostId'])) node.setAttribute('name', server['name']) @@ -912,6 +930,11 @@ class ServerXMLSerializer(wsgi.XMLDictSerializer): server['addresses']) server_node.appendChild(addresses_node) + if 'security_groups' in server: + security_groups_node = self._create_security_groups_node(xml_doc, + server['security_groups']) + server_node.appendChild(security_groups_node) + return server_node def _server_list_to_xml(self, xml_doc, servers, detailed): @@ -964,6 +987,19 @@ class ServerXMLSerializer(wsgi.XMLDictSerializer): server_dict['server']) return self.to_xml_string(node, True) + def _security_group_to_xml(self, doc, security_group): + node = doc.createElement('security_group') + node.setAttribute('name', str(security_group.get('name'))) + return node + + def _create_security_groups_node(self, xml_doc, security_groups): + security_groups_node = xml_doc.createElement('security_groups') + if security_groups: + for security_group in security_groups: + node = self._security_group_to_xml(xml_doc, security_group) + security_groups_node.appendChild(node) + return security_groups_node + def create_resource(version='1.0'): controller = { @@ -975,7 +1011,7 @@ def create_resource(version='1.0'): "attributes": { "server": ["id", "imageId", "name", "flavorId", "hostId", "status", "progress", "adminPass", "flavorRef", - "imageRef"], + "imageRef", "userId", "tenantId"], "link": ["rel", "type", "href"], }, "dict_collections": { diff --git a/nova/api/openstack/views/servers.py b/nova/api/openstack/views/servers.py index 0ec98591e..ac09b5864 100644 --- a/nova/api/openstack/views/servers.py +++ b/nova/api/openstack/views/servers.py @@ -21,13 +21,12 @@ import hashlib import os from nova import exception -import nova.compute -import nova.context from nova.api.openstack import common from nova.api.openstack.views import addresses as addresses_view from nova.api.openstack.views import flavors as flavors_view from nova.api.openstack.views import images as images_view from nova import utils +from nova.compute import vm_states class ViewBuilder(object): @@ -61,17 +60,15 @@ class ViewBuilder(object): def _build_detail(self, inst): """Returns a detailed model of a server.""" + vm_state = inst.get('vm_state', vm_states.BUILDING) + task_state = inst.get('task_state') inst_dict = { 'id': inst['id'], 'name': inst['display_name'], - 'status': common.status_from_power_state(inst.get('state'))} - - ctxt = nova.context.get_admin_context() - compute_api = nova.compute.API() - - if compute_api.has_finished_migration(ctxt, inst['uuid']): - inst_dict['status'] = 'RESIZE-CONFIRM' + 'user_id': inst.get('user_id', ''), + 'tenant_id': inst.get('project_id', ''), + 'status': common.status_from_state(vm_state, task_state)} # Return the metadata as a dictionary metadata = {} @@ -188,6 +185,7 @@ class ViewBuilderV11(ViewBuilder): def _build_extra(self, response, inst): self._build_links(response, inst) response['uuid'] = inst['uuid'] + response['key_name'] = inst.get('key_name', '') self._build_config_drive(response, inst) def _build_links(self, response, inst): diff --git a/nova/compute/api.py b/nova/compute/api.py index 3b4bde8ea..4e2944bb7 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -19,13 +19,11 @@ """Handles all requests relating to instances (guest vms).""" -import eventlet import novaclient import re import time from nova import block_device -from nova import db from nova import exception from nova import flags import nova.image @@ -37,6 +35,8 @@ from nova import utils from nova import volume from nova.compute import instance_types from nova.compute import power_state +from nova.compute import task_states +from nova.compute import vm_states from nova.compute.utils import terminate_volumes from nova.scheduler import api as scheduler_api from nova.db import base @@ -75,12 +75,18 @@ def generate_default_hostname(instance): def _is_able_to_shutdown(instance, instance_id): - states = {'terminating': "Instance %s is already being terminated", - 'migrating': "Instance %s is being migrated", - 'stopping': "Instance %s is being stopped"} - msg = states.get(instance['state_description']) - if msg: - LOG.warning(_(msg), instance_id) + vm_state = instance["vm_state"] + task_state = instance["task_state"] + + valid_shutdown_states = [ + vm_states.ACTIVE, + vm_states.REBUILDING, + vm_states.BUILDING, + ] + + if vm_state not in valid_shutdown_states: + LOG.warn(_("Instance %(instance_id)s is not in an 'active' state. It " + "is currently %(vm_state)s. Shutdown aborted.") % locals()) return False return True @@ -237,7 +243,7 @@ class API(base.Base): self.ensure_default_security_group(context) if key_data is None and key_name: - key_pair = db.key_pair_get(context, context.user_id, key_name) + key_pair = self.db.key_pair_get(context, context.user_id, key_name) key_data = key_pair['public_key'] if reservation_id is None: @@ -251,10 +257,10 @@ class API(base.Base): 'image_ref': image_href, 'kernel_id': kernel_id or '', 'ramdisk_id': ramdisk_id or '', + 'power_state': power_state.NOSTATE, + 'vm_state': vm_states.BUILDING, 'config_drive_id': config_drive_id or '', 'config_drive': config_drive or '', - 'state': 0, - 'state_description': 'scheduling', 'user_id': context.user_id, 'project_id': context.project_id, 'launch_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()), @@ -377,10 +383,6 @@ class API(base.Base): If you are changing this method, be sure to update both call paths. """ - instance = dict(launch_index=num, **base_options) - instance = self.db.instance_create(context, instance) - instance_id = instance['id'] - elevated = context.elevated() if security_group is None: security_group = ['default'] @@ -389,11 +391,15 @@ class API(base.Base): security_groups = [] for security_group_name in security_group: - group = db.security_group_get_by_name(context, - context.project_id, - security_group_name) + group = self.db.security_group_get_by_name(context, + context.project_id, + security_group_name) security_groups.append(group['id']) + instance = dict(launch_index=num, **base_options) + instance = self.db.instance_create(context, instance) + instance_id = instance['id'] + for security_group_id in security_groups: self.db.instance_add_security_group(elevated, instance_id, @@ -415,6 +421,8 @@ class API(base.Base): updates['display_name'] = "Server %s" % instance_id instance['display_name'] = updates['display_name'] updates['hostname'] = self.hostname_factory(instance) + updates['vm_state'] = vm_states.BUILDING + updates['task_state'] = task_states.SCHEDULING instance = self.update(context, instance_id, **updates) return instance @@ -551,8 +559,9 @@ class API(base.Base): def has_finished_migration(self, context, instance_uuid): """Returns true if an instance has a finished migration.""" try: - db.migration_get_by_instance_and_status(context, instance_uuid, - 'finished') + self.db.migration_get_by_instance_and_status(context, + instance_uuid, + 'finished') return True except exception.NotFound: return False @@ -566,14 +575,15 @@ class API(base.Base): :param context: the security context """ try: - db.security_group_get_by_name(context, context.project_id, - 'default') + self.db.security_group_get_by_name(context, + context.project_id, + 'default') except exception.NotFound: values = {'name': 'default', 'description': 'default', 'user_id': context.user_id, 'project_id': context.project_id} - db.security_group_create(context, values) + self.db.security_group_create(context, values) def trigger_security_group_rules_refresh(self, context, security_group_id): """Called when a rule is added to or removed from a security_group.""" @@ -638,7 +648,7 @@ class API(base.Base): """Called when a rule is added to or removed from a security_group""" hosts = [x['host'] for (x, idx) - in db.service_get_all_compute_sorted(context)] + in self.db.service_get_all_compute_sorted(context)] for host in hosts: rpc.cast(context, self.db.queue_get_for(context, FLAGS.compute_topic, host), @@ -666,11 +676,11 @@ class API(base.Base): def add_security_group(self, context, instance_id, security_group_name): """Add security group to the instance""" - security_group = db.security_group_get_by_name(context, - context.project_id, - security_group_name) + security_group = self.db.security_group_get_by_name(context, + context.project_id, + security_group_name) # check if the server exists - inst = db.instance_get(context, instance_id) + inst = self.db.instance_get(context, instance_id) #check if the security group is associated with the server if self._is_security_group_associated_with_server(security_group, instance_id): @@ -682,21 +692,21 @@ class API(base.Base): if inst['state'] != power_state.RUNNING: raise exception.InstanceNotRunning(instance_id=instance_id) - db.instance_add_security_group(context.elevated(), - instance_id, - security_group['id']) + self.db.instance_add_security_group(context.elevated(), + instance_id, + security_group['id']) rpc.cast(context, - db.queue_get_for(context, FLAGS.compute_topic, inst['host']), + self.db.queue_get_for(context, FLAGS.compute_topic, inst['host']), {"method": "refresh_security_group_rules", "args": {"security_group_id": security_group['id']}}) def remove_security_group(self, context, instance_id, security_group_name): """Remove the security group associated with the instance""" - security_group = db.security_group_get_by_name(context, - context.project_id, - security_group_name) + security_group = self.db.security_group_get_by_name(context, + context.project_id, + security_group_name) # check if the server exists - inst = db.instance_get(context, instance_id) + inst = self.db.instance_get(context, instance_id) #check if the security group is associated with the server if not self._is_security_group_associated_with_server(security_group, instance_id): @@ -708,11 +718,11 @@ class API(base.Base): if inst['state'] != power_state.RUNNING: raise exception.InstanceNotRunning(instance_id=instance_id) - db.instance_remove_security_group(context.elevated(), - instance_id, - security_group['id']) + self.db.instance_remove_security_group(context.elevated(), + instance_id, + security_group['id']) rpc.cast(context, - db.queue_get_for(context, FLAGS.compute_topic, inst['host']), + self.db.queue_get_for(context, FLAGS.compute_topic, inst['host']), {"method": "refresh_security_group_rules", "args": {"security_group_id": security_group['id']}}) @@ -750,10 +760,8 @@ class API(base.Base): return self.update(context, - instance['id'], - state_description='terminating', - state=0, - terminated_at=utils.utcnow()) + instance_id, + task_state=task_states.DELETING) host = instance['host'] if host: @@ -773,9 +781,9 @@ class API(base.Base): return self.update(context, - instance['id'], - state_description='stopping', - state=power_state.NOSTATE, + instance_id, + vm_state=vm_states.ACTIVE, + task_state=task_states.STOPPING, terminated_at=utils.utcnow()) host = instance['host'] @@ -787,12 +795,18 @@ class API(base.Base): """Start an instance.""" LOG.debug(_("Going to try to start %s"), instance_id) instance = self._get_instance(context, instance_id, 'starting') - if instance['state_description'] != 'stopped': - _state_description = instance['state_description'] + vm_state = instance["vm_state"] + + if vm_state != vm_states.STOPPED: LOG.warning(_("Instance %(instance_id)s is not " - "stopped(%(_state_description)s)") % locals()) + "stopped. (%(vm_state)s)") % locals()) return + self.update(context, + instance_id, + vm_state=vm_states.STOPPED, + task_state=task_states.STARTING) + # TODO(yamahata): injected_files isn't supported right now. # It is used only for osapi. not for ec2 api. # availability_zone isn't used by run_instance. @@ -802,6 +816,15 @@ class API(base.Base): "args": {"topic": FLAGS.compute_topic, "instance_id": instance_id}}) + def get_active_by_window(self, context, begin, end=None, project_id=None): + """Get instances that were continuously active over a window.""" + return self.db.instance_get_active_by_window(context, begin, end, + project_id) + + def get_instance_type(self, context, instance_type_id): + """Get an instance type by instance type id.""" + return self.db.instance_type_get(context, instance_type_id) + def get(self, context, instance_id): """Get a single instance with the given instance_id.""" # NOTE(sirp): id used to be exclusively integer IDs; now we're @@ -854,6 +877,7 @@ class API(base.Base): 'image': 'image_ref', 'name': 'display_name', 'instance_name': 'name', + 'tenant_id': 'project_id', 'recurse_zones': None, 'flavor': _remap_flavor_filter, 'fixed_ip': _remap_fixed_ip_filter} @@ -1001,7 +1025,7 @@ class API(base.Base): :param extra_properties: dict of extra image properties to include """ - instance = db.api.instance_get(context, instance_id) + instance = self.db.instance_get(context, instance_id) properties = {'instance_uuid': instance['uuid'], 'user_id': str(context.user_id), 'image_state': 'creating', @@ -1020,28 +1044,36 @@ class API(base.Base): @scheduler_api.reroute_compute("reboot") def reboot(self, context, instance_id): """Reboot the given instance.""" + self.update(context, + instance_id, + vm_state=vm_states.ACTIVE, + task_state=task_states.REBOOTING) self._cast_compute_message('reboot_instance', context, instance_id) @scheduler_api.reroute_compute("rebuild") def rebuild(self, context, instance_id, image_href, admin_password, name=None, metadata=None, files_to_inject=None): """Rebuild the given instance with the provided metadata.""" - instance = db.api.instance_get(context, instance_id) + instance = self.db.instance_get(context, instance_id) + name = name or instance["display_name"] - if instance["state"] == power_state.BUILDING: - msg = _("Instance already building") - raise exception.BuildInProgress(msg) + if instance["vm_state"] != vm_states.ACTIVE: + msg = _("Instance must be active to rebuild.") + raise exception.RebuildRequiresActiveInstance(msg) files_to_inject = files_to_inject or [] + metadata = metadata or {} + self._check_injected_file_quota(context, files_to_inject) + self._check_metadata_properties_quota(context, metadata) - values = {"image_ref": image_href} - if metadata is not None: - self._check_metadata_properties_quota(context, metadata) - values['metadata'] = metadata - if name is not None: - values['display_name'] = name - self.db.instance_update(context, instance_id, values) + self.update(context, + instance_id, + metadata=metadata, + display_name=name, + image_ref=image_href, + vm_state=vm_states.ACTIVE, + task_state=task_states.REBUILDING) rebuild_params = { "new_pass": admin_password, @@ -1065,6 +1097,11 @@ class API(base.Base): raise exception.MigrationNotFoundByStatus(instance_id=instance_id, status='finished') + self.update(context, + instance_id, + vm_state=vm_states.ACTIVE, + task_state=None) + params = {'migration_id': migration_ref['id']} self._cast_compute_message('revert_resize', context, instance_ref['uuid'], @@ -1085,6 +1122,12 @@ class API(base.Base): if not migration_ref: raise exception.MigrationNotFoundByStatus(instance_id=instance_id, status='finished') + + self.update(context, + instance_id, + vm_state=vm_states.ACTIVE, + task_state=None) + params = {'migration_id': migration_ref['id']} self._cast_compute_message('confirm_resize', context, instance_ref['uuid'], @@ -1130,6 +1173,11 @@ class API(base.Base): if (current_memory_mb == new_memory_mb) and flavor_id: raise exception.CannotResizeToSameSize() + self.update(context, + instance_id, + vm_state=vm_states.RESIZING, + task_state=task_states.RESIZE_PREP) + instance_ref = self._get_instance(context, instance_id, 'resize') self._cast_scheduler_message(context, {"method": "prep_resize", @@ -1163,11 +1211,19 @@ class API(base.Base): @scheduler_api.reroute_compute("pause") def pause(self, context, instance_id): """Pause the given instance.""" + self.update(context, + instance_id, + vm_state=vm_states.ACTIVE, + task_state=task_states.PAUSING) self._cast_compute_message('pause_instance', context, instance_id) @scheduler_api.reroute_compute("unpause") def unpause(self, context, instance_id): """Unpause the given instance.""" + self.update(context, + instance_id, + vm_state=vm_states.PAUSED, + task_state=task_states.UNPAUSING) self._cast_compute_message('unpause_instance', context, instance_id) def _call_compute_message_for_host(self, action, context, host, params): @@ -1200,21 +1256,37 @@ class API(base.Base): @scheduler_api.reroute_compute("suspend") def suspend(self, context, instance_id): """Suspend the given instance.""" + self.update(context, + instance_id, + vm_state=vm_states.ACTIVE, + task_state=task_states.SUSPENDING) self._cast_compute_message('suspend_instance', context, instance_id) @scheduler_api.reroute_compute("resume") def resume(self, context, instance_id): """Resume the given instance.""" + self.update(context, + instance_id, + vm_state=vm_states.SUSPENDED, + task_state=task_states.RESUMING) self._cast_compute_message('resume_instance', context, instance_id) @scheduler_api.reroute_compute("rescue") def rescue(self, context, instance_id): """Rescue the given instance.""" + self.update(context, + instance_id, + vm_state=vm_states.ACTIVE, + task_state=task_states.RESCUING) self._cast_compute_message('rescue_instance', context, instance_id) @scheduler_api.reroute_compute("unrescue") def unrescue(self, context, instance_id): """Unrescue the given instance.""" + self.update(context, + instance_id, + vm_state=vm_states.RESCUED, + task_state=task_states.UNRESCUING) self._cast_compute_message('unrescue_instance', context, instance_id) @scheduler_api.reroute_compute("set_admin_password") diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 6fcb3786c..0477db745 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -56,6 +56,8 @@ from nova import rpc from nova import utils from nova import volume from nova.compute import power_state +from nova.compute import task_states +from nova.compute import vm_states from nova.notifier import api as notifier from nova.compute.utils import terminate_volumes from nova.virt import driver @@ -146,6 +148,10 @@ class ComputeManager(manager.SchedulerDependentManager): super(ComputeManager, self).__init__(service_name="compute", *args, **kwargs) + def _instance_update(self, context, instance_id, **kwargs): + """Update an instance in the database using kwargs as value.""" + return self.db.instance_update(context, instance_id, kwargs) + def init_host(self): """Initialization for a standalone compute service.""" self.driver.init_host(host=self.host) @@ -153,8 +159,8 @@ class ComputeManager(manager.SchedulerDependentManager): instances = self.db.instance_get_all_by_host(context, self.host) for instance in instances: inst_name = instance['name'] - db_state = instance['state'] - drv_state = self._update_state(context, instance['id']) + db_state = instance['power_state'] + drv_state = self._get_power_state(context, instance) expect_running = db_state == power_state.RUNNING \ and drv_state != db_state @@ -177,29 +183,13 @@ class ComputeManager(manager.SchedulerDependentManager): LOG.warning(_('Hypervisor driver does not ' 'support firewall rules')) - def _update_state(self, context, instance_id, state=None): - """Update the state of an instance from the driver info.""" - instance_ref = self.db.instance_get(context, instance_id) - - if state is None: - try: - LOG.debug(_('Checking state of %s'), instance_ref['name']) - info = self.driver.get_info(instance_ref['name']) - except exception.NotFound: - info = None - - if info is not None: - state = info['state'] - else: - state = power_state.FAILED - - self.db.instance_set_state(context, instance_id, state) - return state - - def _update_launched_at(self, context, instance_id, launched_at=None): - """Update the launched_at parameter of the given instance.""" - data = {'launched_at': launched_at or utils.utcnow()} - self.db.instance_update(context, instance_id, data) + def _get_power_state(self, context, instance): + """Retrieve the power state for the given instance.""" + LOG.debug(_('Checking state of %s'), instance['name']) + try: + return self.driver.get_info(instance['name'])["state"] + except exception.NotFound: + return power_state.FAILED def get_console_topic(self, context, **kwargs): """Retrieves the console host for a project on this host. @@ -251,11 +241,6 @@ class ComputeManager(manager.SchedulerDependentManager): def _setup_block_device_mapping(self, context, instance_id): """setup volumes for block device mapping""" - self.db.instance_set_state(context, - instance_id, - power_state.NOSTATE, - 'block_device_mapping') - volume_api = volume.API() block_device_mapping = [] swap = None @@ -389,17 +374,12 @@ class ComputeManager(manager.SchedulerDependentManager): updates = {} updates['host'] = self.host updates['launched_on'] = self.host - instance = self.db.instance_update(context, - instance_id, - updates) + updates['vm_state'] = vm_states.BUILDING + updates['task_state'] = task_states.NETWORKING + instance = self.db.instance_update(context, instance_id, updates) instance['injected_files'] = kwargs.get('injected_files', []) instance['admin_pass'] = kwargs.get('admin_password', None) - self.db.instance_set_state(context, - instance_id, - power_state.NOSTATE, - 'networking') - is_vpn = instance['image_ref'] == str(FLAGS.vpn_image_id) try: # NOTE(vish): This could be a cast because we don't do anything @@ -418,6 +398,11 @@ class ComputeManager(manager.SchedulerDependentManager): # all vif creation and network injection, maybe this is correct network_info = [] + self._instance_update(context, + instance_id, + vm_state=vm_states.BUILDING, + task_state=task_states.BLOCK_DEVICE_MAPPING) + (swap, ephemerals, block_device_mapping) = self._setup_block_device_mapping( context, instance_id) @@ -427,9 +412,12 @@ class ComputeManager(manager.SchedulerDependentManager): 'ephemerals': ephemerals, 'block_device_mapping': block_device_mapping} - # TODO(vish) check to make sure the availability zone matches - self._update_state(context, instance_id, power_state.BUILDING) + self._instance_update(context, + instance_id, + vm_state=vm_states.BUILDING, + task_state=task_states.SPAWNING) + # TODO(vish) check to make sure the availability zone matches try: self.driver.spawn(context, instance, network_info, block_device_info) @@ -438,13 +426,21 @@ class ComputeManager(manager.SchedulerDependentManager): "virtualization enabled in the BIOS? Details: " "%(ex)s") % locals() LOG.exception(msg) + return + + current_power_state = self._get_power_state(context, instance) + self._instance_update(context, + instance_id, + power_state=current_power_state, + vm_state=vm_states.ACTIVE, + task_state=None, + launched_at=utils.utcnow()) - self._update_launched_at(context, instance_id) - self._update_state(context, instance_id) usage_info = utils.usage_from_instance(instance) notifier.notify('compute.%s' % self.host, 'compute.instance.create', notifier.INFO, usage_info) + except exception.InstanceNotFound: # FIXME(wwolf): We are just ignoring InstanceNotFound # exceptions here in case the instance was immediately @@ -480,8 +476,7 @@ class ComputeManager(manager.SchedulerDependentManager): for volume in volumes: self._detach_volume(context, instance_id, volume['id'], False) - if (instance['state'] == power_state.SHUTOFF and - instance['state_description'] != 'stopped'): + if instance['power_state'] == power_state.SHUTOFF: self.db.instance_destroy(context, instance_id) raise exception.Error(_('trying to destroy already destroyed' ' instance: %s') % instance_id) @@ -496,9 +491,14 @@ class ComputeManager(manager.SchedulerDependentManager): """Terminate an instance on this host.""" self._shutdown_instance(context, instance_id, 'Terminating') instance = self.db.instance_get(context.elevated(), instance_id) + self._instance_update(context, + instance_id, + vm_state=vm_states.DELETED, + task_state=None, + terminated_at=utils.utcnow()) - # TODO(ja): should we keep it in a terminated state for a bit? self.db.instance_destroy(context, instance_id) + usage_info = utils.usage_from_instance(instance) notifier.notify('compute.%s' % self.host, 'compute.instance.delete', @@ -509,7 +509,10 @@ class ComputeManager(manager.SchedulerDependentManager): def stop_instance(self, context, instance_id): """Stopping an instance on this host.""" self._shutdown_instance(context, instance_id, 'Stopping') - # instance state will be updated to stopped by _poll_instance_states() + self._instance_update(context, + instance_id, + vm_state=vm_states.STOPPED, + task_state=None) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) @checks_instance_lock @@ -529,26 +532,46 @@ class ComputeManager(manager.SchedulerDependentManager): instance_ref = self.db.instance_get(context, instance_id) LOG.audit(_("Rebuilding instance %s"), instance_id, context=context) - self._update_state(context, instance_id, power_state.BUILDING) + current_power_state = self._get_power_state(context, instance_ref) + self._instance_update(context, + instance_id, + power_state=current_power_state, + vm_state=vm_states.REBUILDING, + task_state=None) network_info = self._get_instance_nw_info(context, instance_ref) - self.driver.destroy(instance_ref, network_info) + + self._instance_update(context, + instance_id, + vm_state=vm_states.REBUILDING, + task_state=task_states.BLOCK_DEVICE_MAPPING) + instance_ref.injected_files = kwargs.get('injected_files', []) network_info = self.network_api.get_instance_nw_info(context, instance_ref) bd_mapping = self._setup_block_device_mapping(context, instance_id) + self._instance_update(context, + instance_id, + vm_state=vm_states.REBUILDING, + task_state=task_states.SPAWNING) + # pull in new password here since the original password isn't in the db instance_ref.admin_pass = kwargs.get('new_pass', utils.generate_password(FLAGS.password_length)) self.driver.spawn(context, instance_ref, network_info, bd_mapping) - self._update_launched_at(context, instance_id) - self._update_state(context, instance_id) - usage_info = utils.usage_from_instance(instance_ref) + current_power_state = self._get_power_state(context, instance_ref) + self._instance_update(context, + instance_id, + power_state=current_power_state, + vm_state=vm_states.ACTIVE, + task_state=None, + launched_at=utils.utcnow()) + usage_info = utils.usage_from_instance(instance_ref) notifier.notify('compute.%s' % self.host, 'compute.instance.rebuild', notifier.INFO, @@ -558,26 +581,34 @@ class ComputeManager(manager.SchedulerDependentManager): @checks_instance_lock def reboot_instance(self, context, instance_id): """Reboot an instance on this host.""" + LOG.audit(_("Rebooting instance %s"), instance_id, context=context) context = context.elevated() - self._update_state(context, instance_id) instance_ref = self.db.instance_get(context, instance_id) - LOG.audit(_("Rebooting instance %s"), instance_id, context=context) - if instance_ref['state'] != power_state.RUNNING: - state = instance_ref['state'] + current_power_state = self._get_power_state(context, instance_ref) + self._instance_update(context, + instance_id, + power_state=current_power_state, + vm_state=vm_states.ACTIVE, + task_state=task_states.REBOOTING) + + if instance_ref['power_state'] != power_state.RUNNING: + state = instance_ref['power_state'] running = power_state.RUNNING LOG.warn(_('trying to reboot a non-running ' 'instance: %(instance_id)s (state: %(state)s ' 'expected: %(running)s)') % locals(), context=context) - self.db.instance_set_state(context, - instance_id, - power_state.NOSTATE, - 'rebooting') network_info = self._get_instance_nw_info(context, instance_ref) self.driver.reboot(instance_ref, network_info) - self._update_state(context, instance_id) + + current_power_state = self._get_power_state(context, instance_ref) + self._instance_update(context, + instance_id, + power_state=current_power_state, + vm_state=vm_states.ACTIVE, + task_state=None) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) def snapshot_instance(self, context, instance_id, image_id, @@ -593,37 +624,45 @@ class ComputeManager(manager.SchedulerDependentManager): :param rotation: int representing how many backups to keep around; None if rotation shouldn't be used (as in the case of snapshots) """ + if image_type == "snapshot": + task_state = task_states.IMAGE_SNAPSHOT + elif image_type == "backup": + task_state = task_states.IMAGE_BACKUP + else: + raise Exception(_('Image type not recognized %s') % image_type) + context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) - #NOTE(sirp): update_state currently only refreshes the state field - # if we add is_snapshotting, we will need this refreshed too, - # potentially? - self._update_state(context, instance_id) + current_power_state = self._get_power_state(context, instance_ref) + self._instance_update(context, + instance_id, + power_state=current_power_state, + vm_state=vm_states.ACTIVE, + task_state=task_state) LOG.audit(_('instance %s: snapshotting'), instance_id, context=context) - if instance_ref['state'] != power_state.RUNNING: - state = instance_ref['state'] + + if instance_ref['power_state'] != power_state.RUNNING: + state = instance_ref['power_state'] running = power_state.RUNNING LOG.warn(_('trying to snapshot a non-running ' 'instance: %(instance_id)s (state: %(state)s ' 'expected: %(running)s)') % locals()) self.driver.snapshot(context, instance_ref, image_id) + self._instance_update(context, instance_id, task_state=None) + + if image_type == 'snapshot' and rotation: + raise exception.ImageRotationNotAllowed() + + elif image_type == 'backup' and rotation: + instance_uuid = instance_ref['uuid'] + self.rotate_backups(context, instance_uuid, backup_type, rotation) - if image_type == 'snapshot': - if rotation: - raise exception.ImageRotationNotAllowed() elif image_type == 'backup': - if rotation: - instance_uuid = instance_ref['uuid'] - self.rotate_backups(context, instance_uuid, backup_type, - rotation) - else: - raise exception.RotationRequiredForBackup() - else: - raise Exception(_('Image type not recognized %s') % image_type) + raise exception.RotationRequiredForBackup() def rotate_backups(self, context, instance_uuid, backup_type, rotation): """Delete excess backups associated to an instance. @@ -691,7 +730,7 @@ class ComputeManager(manager.SchedulerDependentManager): for i in xrange(max_tries): instance_ref = self.db.instance_get(context, instance_id) instance_id = instance_ref["id"] - instance_state = instance_ref["state"] + instance_state = instance_ref["power_state"] expected_state = power_state.RUNNING if instance_state != expected_state: @@ -726,7 +765,7 @@ class ComputeManager(manager.SchedulerDependentManager): context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) instance_id = instance_ref['id'] - instance_state = instance_ref['state'] + instance_state = instance_ref['power_state'] expected_state = power_state.RUNNING if instance_state != expected_state: LOG.warn(_('trying to inject a file into a non-running ' @@ -744,7 +783,7 @@ class ComputeManager(manager.SchedulerDependentManager): context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) instance_id = instance_ref['id'] - instance_state = instance_ref['state'] + instance_state = instance_ref['power_state'] expected_state = power_state.RUNNING if instance_state != expected_state: LOG.warn(_('trying to update agent on a non-running ' @@ -759,40 +798,41 @@ class ComputeManager(manager.SchedulerDependentManager): @checks_instance_lock def rescue_instance(self, context, instance_id): """Rescue an instance on this host.""" + LOG.audit(_('instance %s: rescuing'), instance_id, context=context) context = context.elevated() + instance_ref = self.db.instance_get(context, instance_id) - LOG.audit(_('instance %s: rescuing'), instance_id, context=context) - self.db.instance_set_state(context, - instance_id, - power_state.NOSTATE, - 'rescuing') - _update_state = lambda result: self._update_state_callback( - self, context, instance_id, result) network_info = self._get_instance_nw_info(context, instance_ref) - self.driver.rescue(context, instance_ref, _update_state, network_info) - self._update_state(context, instance_id) + + # NOTE(blamar): None of the virt drivers use the 'callback' param + self.driver.rescue(context, instance_ref, None, network_info) + + current_power_state = self._get_power_state(context, instance_ref) + self._instance_update(context, + instance_id, + vm_state=vm_states.RESCUED, + task_state=None, + power_state=current_power_state) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) @checks_instance_lock def unrescue_instance(self, context, instance_id): """Rescue an instance on this host.""" + LOG.audit(_('instance %s: unrescuing'), instance_id, context=context) context = context.elevated() + instance_ref = self.db.instance_get(context, instance_id) - LOG.audit(_('instance %s: unrescuing'), instance_id, context=context) - self.db.instance_set_state(context, - instance_id, - power_state.NOSTATE, - 'unrescuing') - _update_state = lambda result: self._update_state_callback( - self, context, instance_id, result) network_info = self._get_instance_nw_info(context, instance_ref) - self.driver.unrescue(instance_ref, _update_state, network_info) - self._update_state(context, instance_id) - @staticmethod - def _update_state_callback(self, context, instance_id, result): - """Update instance state when async task completes.""" - self._update_state(context, instance_id) + # NOTE(blamar): None of the virt drivers use the 'callback' param + self.driver.unrescue(instance_ref, None, network_info) + + current_power_state = self._get_power_state(context, instance_ref) + self._instance_update(context, + instance_id, + vm_state=vm_states.ACTIVE, + task_state=None, + power_state=current_power_state) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) @checks_instance_lock @@ -851,11 +891,12 @@ class ComputeManager(manager.SchedulerDependentManager): # Just roll back the record. There's no need to resize down since # the 'old' VM already has the preferred attributes - self.db.instance_update(context, instance_ref['uuid'], - dict(memory_mb=instance_type['memory_mb'], - vcpus=instance_type['vcpus'], - local_gb=instance_type['local_gb'], - instance_type_id=instance_type['id'])) + self._instance_update(context, + instance_ref["uuid"], + memory_mb=instance_type['memory_mb'], + vcpus=instance_type['vcpus'], + local_gb=instance_type['local_gb'], + instance_type_id=instance_type['id']) self.driver.revert_migration(instance_ref) self.db.migration_update(context, migration_id, @@ -882,8 +923,11 @@ class ComputeManager(manager.SchedulerDependentManager): instance_ref = self.db.instance_get_by_uuid(context, instance_id) if instance_ref['host'] == FLAGS.host: - raise exception.Error(_( - 'Migration error: destination same as source!')) + self._instance_update(context, + instance_id, + vm_state=vm_states.ERROR) + msg = _('Migration error: destination same as source!') + raise exception.Error(msg) old_instance_type = self.db.instance_type_get(context, instance_ref['instance_type_id']) @@ -977,6 +1021,11 @@ class ComputeManager(manager.SchedulerDependentManager): self.driver.finish_migration(context, instance_ref, disk_info, network_info, resize_instance) + self._instance_update(context, + instance_id, + vm_state=vm_states.ACTIVE, + task_state=task_states.RESIZE_VERIFY) + self.db.migration_update(context, migration_id, {'status': 'finished', }) @@ -1008,35 +1057,35 @@ class ComputeManager(manager.SchedulerDependentManager): @checks_instance_lock def pause_instance(self, context, instance_id): """Pause an instance on this host.""" + LOG.audit(_('instance %s: pausing'), instance_id, context=context) context = context.elevated() + instance_ref = self.db.instance_get(context, instance_id) - LOG.audit(_('instance %s: pausing'), instance_id, context=context) - self.db.instance_set_state(context, - instance_id, - power_state.NOSTATE, - 'pausing') - self.driver.pause(instance_ref, - lambda result: self._update_state_callback(self, - context, - instance_id, - result)) + self.driver.pause(instance_ref, lambda result: None) + + current_power_state = self._get_power_state(context, instance_ref) + self._instance_update(context, + instance_id, + power_state=current_power_state, + vm_state=vm_states.PAUSED, + task_state=None) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) @checks_instance_lock def unpause_instance(self, context, instance_id): """Unpause a paused instance on this host.""" + LOG.audit(_('instance %s: unpausing'), instance_id, context=context) context = context.elevated() + instance_ref = self.db.instance_get(context, instance_id) - LOG.audit(_('instance %s: unpausing'), instance_id, context=context) - self.db.instance_set_state(context, - instance_id, - power_state.NOSTATE, - 'unpausing') - self.driver.unpause(instance_ref, - lambda result: self._update_state_callback(self, - context, - instance_id, - result)) + self.driver.unpause(instance_ref, lambda result: None) + + current_power_state = self._get_power_state(context, instance_ref) + self._instance_update(context, + instance_id, + power_state=current_power_state, + vm_state=vm_states.ACTIVE, + task_state=None) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) def host_power_action(self, context, host=None, action=None): @@ -1052,7 +1101,7 @@ class ComputeManager(manager.SchedulerDependentManager): def get_diagnostics(self, context, instance_id): """Retrieve diagnostics for an instance on this host.""" instance_ref = self.db.instance_get(context, instance_id) - if instance_ref["state"] == power_state.RUNNING: + if instance_ref["power_state"] == power_state.RUNNING: LOG.audit(_("instance %s: retrieving diagnostics"), instance_id, context=context) return self.driver.get_diagnostics(instance_ref) @@ -1061,33 +1110,35 @@ class ComputeManager(manager.SchedulerDependentManager): @checks_instance_lock def suspend_instance(self, context, instance_id): """Suspend the given instance.""" + LOG.audit(_('instance %s: suspending'), instance_id, context=context) context = context.elevated() + instance_ref = self.db.instance_get(context, instance_id) - LOG.audit(_('instance %s: suspending'), instance_id, context=context) - self.db.instance_set_state(context, instance_id, - power_state.NOSTATE, - 'suspending') - self.driver.suspend(instance_ref, - lambda result: self._update_state_callback(self, - context, - instance_id, - result)) + self.driver.suspend(instance_ref, lambda result: None) + + current_power_state = self._get_power_state(context, instance_ref) + self._instance_update(context, + instance_id, + power_state=current_power_state, + vm_state=vm_states.SUSPENDED, + task_state=None) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) @checks_instance_lock def resume_instance(self, context, instance_id): """Resume the given suspended instance.""" + LOG.audit(_('instance %s: resuming'), instance_id, context=context) context = context.elevated() + instance_ref = self.db.instance_get(context, instance_id) - LOG.audit(_('instance %s: resuming'), instance_id, context=context) - self.db.instance_set_state(context, instance_id, - power_state.NOSTATE, - 'resuming') - self.driver.resume(instance_ref, - lambda result: self._update_state_callback(self, - context, - instance_id, - result)) + self.driver.resume(instance_ref, lambda result: None) + + current_power_state = self._get_power_state(context, instance_ref) + self._instance_update(context, + instance_id, + power_state=current_power_state, + vm_state=vm_states.ACTIVE, + task_state=None) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) def lock_instance(self, context, instance_id): @@ -1498,11 +1549,14 @@ class ComputeManager(manager.SchedulerDependentManager): 'block_migration': block_migration}}) # Restore instance state - self.db.instance_update(ctxt, - instance_ref['id'], - {'state_description': 'running', - 'state': power_state.RUNNING, - 'host': dest}) + current_power_state = self._get_power_state(ctxt, instance_ref) + self._instance_update(ctxt, + instance_ref["id"], + host=dest, + power_state=current_power_state, + vm_state=vm_states.ACTIVE, + task_state=None) + # Restore volume state for volume_ref in instance_ref['volumes']: volume_id = volume_ref['id'] @@ -1548,11 +1602,11 @@ class ComputeManager(manager.SchedulerDependentManager): This param specifies destination host. """ host = instance_ref['host'] - self.db.instance_update(context, - instance_ref['id'], - {'state_description': 'running', - 'state': power_state.RUNNING, - 'host': host}) + self._instance_update(context, + instance_ref['id'], + host=host, + vm_state=vm_states.ACTIVE, + task_state=None) for volume_ref in instance_ref['volumes']: volume_id = volume_ref['id'] @@ -1600,10 +1654,9 @@ class ComputeManager(manager.SchedulerDependentManager): error_list.append(ex) try: - self._poll_instance_states(context) + self._sync_power_states(context) except Exception as ex: - LOG.warning(_("Error during instance poll: %s"), - unicode(ex)) + LOG.warning(_("Error during power_state sync: %s"), unicode(ex)) error_list.append(ex) return error_list @@ -1618,68 +1671,40 @@ class ComputeManager(manager.SchedulerDependentManager): self.update_service_capabilities( self.driver.get_host_stats(refresh=True)) - def _poll_instance_states(self, context): + def _sync_power_states(self, context): + """Align power states between the database and the hypervisor. + + The hypervisor is authoritative for the power_state data, so we + simply loop over all known instances for this host and update the + power_state according to the hypervisor. If the instance is not found + then it will be set to power_state.NOSTATE, because it doesn't exist + on the hypervisor. + + """ vm_instances = self.driver.list_instances_detail() vm_instances = dict((vm.name, vm) for vm in vm_instances) + db_instances = self.db.instance_get_all_by_host(context, self.host) - # Keep a list of VMs not in the DB, cross them off as we find them - vms_not_found_in_db = list(vm_instances.keys()) + num_vm_instances = len(vm_instances) + num_db_instances = len(db_instances) - db_instances = self.db.instance_get_all_by_host(context, self.host) + if num_vm_instances != num_db_instances: + LOG.info(_("Found %(num_db_instances)s in the database and " + "%(num_vm_instances)s on the hypervisor.") % locals()) for db_instance in db_instances: - name = db_instance['name'] - db_state = db_instance['state'] + name = db_instance["name"] + db_power_state = db_instance['power_state'] vm_instance = vm_instances.get(name) if vm_instance is None: - # NOTE(justinsb): We have to be very careful here, because a - # concurrent operation could be in progress (e.g. a spawn) - if db_state == power_state.BUILDING: - # TODO(justinsb): This does mean that if we crash during a - # spawn, the machine will never leave the spawning state, - # but this is just the way nova is; this function isn't - # trying to correct that problem. - # We could have a separate task to correct this error. - # TODO(justinsb): What happens during a live migration? - LOG.info(_("Found instance '%(name)s' in DB but no VM. " - "State=%(db_state)s, so assuming spawn is in " - "progress.") % locals()) - vm_state = db_state - else: - LOG.info(_("Found instance '%(name)s' in DB but no VM. " - "State=%(db_state)s, so setting state to " - "shutoff.") % locals()) - vm_state = power_state.SHUTOFF - if db_instance['state_description'] == 'stopping': - self.db.instance_stop(context, db_instance['id']) - continue + vm_power_state = power_state.NOSTATE else: - vm_state = vm_instance.state - vms_not_found_in_db.remove(name) - - if (db_instance['state_description'] in ['migrating', 'stopping']): - # A situation which db record exists, but no instance" - # sometimes occurs while live-migration at src compute, - # this case should be ignored. - LOG.debug(_("Ignoring %(name)s, as it's currently being " - "migrated.") % locals()) - continue - - if vm_state != db_state: - LOG.info(_("DB/VM state mismatch. Changing state from " - "'%(db_state)s' to '%(vm_state)s'") % locals()) - self._update_state(context, db_instance['id'], vm_state) + vm_power_state = vm_instance.state - # NOTE(justinsb): We no longer auto-remove SHUTOFF instances - # It's quite hard to get them back when we do. - - # Are there VMs not in the DB? - for vm_not_found_in_db in vms_not_found_in_db: - name = vm_not_found_in_db + if vm_power_state == db_power_state: + continue - # We only care about instances that compute *should* know about - if name.startswith("instance-"): - # TODO(justinsb): What to do here? Adopt it? Shut it down? - LOG.warning(_("Found VM not in DB: '%(name)s'. Ignoring") - % locals()) + self._instance_update(context, + db_instance["id"], + power_state=vm_power_state) diff --git a/nova/compute/task_states.py b/nova/compute/task_states.py new file mode 100644 index 000000000..e3315a542 --- /dev/null +++ b/nova/compute/task_states.py @@ -0,0 +1,59 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Possible task states for instances. + +Compute instance task states represent what is happening to the instance at the +current moment. These tasks can be generic, such as 'spawning', or specific, +such as 'block_device_mapping'. These task states allow for a better view into +what an instance is doing and should be displayed to users/administrators as +necessary. + +""" + +SCHEDULING = 'scheduling' +BLOCK_DEVICE_MAPPING = 'block_device_mapping' +NETWORKING = 'networking' +SPAWNING = 'spawning' + +IMAGE_SNAPSHOT = 'image_snapshot' +IMAGE_BACKUP = 'image_backup' + +UPDATING_PASSWORD = 'updating_password' + +RESIZE_PREP = 'resize_prep' +RESIZE_MIGRATING = 'resize_migrating' +RESIZE_MIGRATED = 'resize_migrated' +RESIZE_FINISH = 'resize_finish' +RESIZE_REVERTING = 'resize_reverting' +RESIZE_CONFIRMING = 'resize_confirming' +RESIZE_VERIFY = 'resize_verify' + +REBUILDING = 'rebuilding' + +REBOOTING = 'rebooting' +PAUSING = 'pausing' +UNPAUSING = 'unpausing' +SUSPENDING = 'suspending' +RESUMING = 'resuming' + +RESCUING = 'rescuing' +UNRESCUING = 'unrescuing' + +DELETING = 'deleting' +STOPPING = 'stopping' +STARTING = 'starting' diff --git a/nova/compute/vm_states.py b/nova/compute/vm_states.py new file mode 100644 index 000000000..6f16c1f09 --- /dev/null +++ b/nova/compute/vm_states.py @@ -0,0 +1,39 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Possible vm states for instances. + +Compute instance vm states represent the state of an instance as it pertains to +a user or administrator. When combined with task states (task_states.py), a +better picture can be formed regarding the instance's health. + +""" + +ACTIVE = 'active' +BUILDING = 'building' +REBUILDING = 'rebuilding' + +PAUSED = 'paused' +SUSPENDED = 'suspended' +RESCUED = 'rescued' +DELETED = 'deleted' +STOPPED = 'stopped' + +MIGRATING = 'migrating' +RESIZING = 'resizing' + +ERROR = 'error' diff --git a/nova/context.py b/nova/context.py index b917a1d81..5c22641a0 100644 --- a/nova/context.py +++ b/nova/context.py @@ -38,7 +38,7 @@ class RequestContext(object): self.roles = roles or [] self.is_admin = is_admin if self.is_admin is None: - self.admin = 'admin' in self.roles + self.is_admin = 'admin' in [x.lower() for x in self.roles] self.read_deleted = read_deleted self.remote_address = remote_address if not timestamp: diff --git a/nova/db/api.py b/nova/db/api.py index 07d6e1095..c03a86671 100644 --- a/nova/db/api.py +++ b/nova/db/api.py @@ -501,9 +501,20 @@ def instance_get_all_by_filters(context, filters): return IMPL.instance_get_all_by_filters(context, filters) -def instance_get_active_by_window(context, begin, end=None): - """Get instances active during a certain time window.""" - return IMPL.instance_get_active_by_window(context, begin, end) +def instance_get_active_by_window(context, begin, end=None, project_id=None): + """Get instances active during a certain time window. + + Specifying a project_id will filter for a certain project.""" + return IMPL.instance_get_active_by_window(context, begin, end, project_id) + + +def instance_get_active_by_window_joined(context, begin, end=None, + project_id=None): + """Get instances and joins active during a certain time window. + + Specifying a project_id will filter for a certain project.""" + return IMPL.instance_get_active_by_window_joined(context, begin, end, + project_id) def instance_get_all_by_user(context, user_id): diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index e0da2269d..523258841 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -28,6 +28,7 @@ from nova import flags from nova import ipv6 from nova import utils from nova import log as logging +from nova.compute import vm_states from nova.db.sqlalchemy import models from nova.db.sqlalchemy.session import get_session from sqlalchemy import or_ @@ -35,6 +36,7 @@ from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import joinedload from sqlalchemy.orm import joinedload_all from sqlalchemy.sql import func +from sqlalchemy.sql.expression import desc from sqlalchemy.sql.expression import literal_column FLAGS = flags.FLAGS @@ -1118,12 +1120,11 @@ def instance_destroy(context, instance_id): def instance_stop(context, instance_id): session = get_session() with session.begin(): - from nova.compute import power_state session.query(models.Instance).\ filter_by(id=instance_id).\ update({'host': None, - 'state': power_state.SHUTOFF, - 'state_description': 'stopped', + 'vm_state': vm_states.STOPPED, + 'task_state': None, 'updated_at': literal_column('updated_at')}) session.query(models.SecurityGroupInstanceAssociation).\ filter_by(instance_id=instance_id).\ @@ -1266,12 +1267,17 @@ def instance_get_all_by_filters(context, filters): options(joinedload_all('fixed_ips.network')).\ options(joinedload('metadata')).\ options(joinedload('instance_type')).\ - filter_by(deleted=can_read_deleted(context)) + order_by(desc(models.Instance.created_at)) # Make a copy of the filters dictionary to use going forward, as we'll # be modifying it and we shouldn't affect the caller's use of it. filters = filters.copy() + if 'changes-since' in filters: + changes_since = filters['changes-since'] + query_prefix = query_prefix.\ + filter(models.Instance.updated_at > changes_since) + if not context.is_admin: # If we're not admin context, add appropriate filter.. if context.project_id: @@ -1282,7 +1288,7 @@ def instance_get_all_by_filters(context, filters): # Filters for exact matches that we can do along with the SQL query... # For other filters that don't match this, we will do regexp matching exact_match_filter_names = ['project_id', 'user_id', 'image_ref', - 'state', 'instance_type_id', 'deleted'] + 'vm_state', 'instance_type_id', 'deleted'] query_filters = [key for key in filters.iterkeys() if key in exact_match_filter_names] @@ -1293,9 +1299,7 @@ def instance_get_all_by_filters(context, filters): query_prefix = _exact_match_filter(query_prefix, filter_name, filters.pop(filter_name)) - instances = query_prefix.\ - filter_by(deleted=can_read_deleted(context)).\ - all() + instances = query_prefix.all() if not instances: return [] @@ -1322,21 +1326,40 @@ def instance_get_all_by_filters(context, filters): return instances +@require_context +def instance_get_active_by_window(context, begin, end=None, project_id=None): + """Return instances that were continuously active over window.""" + session = get_session() + query = session.query(models.Instance).\ + filter(models.Instance.launched_at < begin) + if end: + query = query.filter(or_(models.Instance.terminated_at == None, + models.Instance.terminated_at > end)) + else: + query = query.filter(models.Instance.terminated_at == None) + if project_id: + query = query.filter_by(project_id=project_id) + return query.all() + + @require_admin_context -def instance_get_active_by_window(context, begin, end=None): - """Return instances that were continuously active over the given window""" +def instance_get_active_by_window_joined(context, begin, end=None, + project_id=None): + """Return instances and joins that were continuously active over window.""" session = get_session() query = session.query(models.Instance).\ - options(joinedload_all('fixed_ips.floating_ips')).\ - options(joinedload('security_groups')).\ - options(joinedload_all('fixed_ips.network')).\ - options(joinedload('instance_type')).\ - filter(models.Instance.launched_at < begin) + options(joinedload_all('fixed_ips.floating_ips')).\ + options(joinedload('security_groups')).\ + options(joinedload_all('fixed_ips.network')).\ + options(joinedload('instance_type')).\ + filter(models.Instance.launched_at < begin) if end: query = query.filter(or_(models.Instance.terminated_at == None, models.Instance.terminated_at > end)) else: query = query.filter(models.Instance.terminated_at == None) + if project_id: + query = query.filter_by(project_id=project_id) return query.all() @@ -1500,18 +1523,6 @@ def instance_get_floating_address(context, instance_id): return fixed_ip_refs[0].floating_ips[0]['address'] -@require_admin_context -def instance_set_state(context, instance_id, state, description=None): - # TODO(devcamcar): Move this out of models and into driver - from nova.compute import power_state - if not description: - description = power_state.name(state) - db.instance_update(context, - instance_id, - {'state': state, - 'state_description': description}) - - @require_context def instance_update(context, instance_id, values): session = get_session() diff --git a/nova/db/sqlalchemy/migrate_repo/versions/044_update_instance_states.py b/nova/db/sqlalchemy/migrate_repo/versions/044_update_instance_states.py new file mode 100644 index 000000000..e58ae5362 --- /dev/null +++ b/nova/db/sqlalchemy/migrate_repo/versions/044_update_instance_states.py @@ -0,0 +1,138 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 OpenStack LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import sqlalchemy +from sqlalchemy import MetaData, Table, Column, String + +from nova.compute import task_states +from nova.compute import vm_states + + +meta = MetaData() + + +c_task_state = Column('task_state', + String(length=255, convert_unicode=False, + assert_unicode=None, unicode_error=None, + _warn_on_bytestring=False), + nullable=True) + + +_upgrade_translations = { + "stopping": { + "state_description": vm_states.ACTIVE, + "task_state": task_states.STOPPING, + }, + "stopped": { + "state_description": vm_states.STOPPED, + "task_state": None, + }, + "terminated": { + "state_description": vm_states.DELETED, + "task_state": None, + }, + "terminating": { + "state_description": vm_states.ACTIVE, + "task_state": task_states.DELETING, + }, + "running": { + "state_description": vm_states.ACTIVE, + "task_state": None, + }, + "scheduling": { + "state_description": vm_states.BUILDING, + "task_state": task_states.SCHEDULING, + }, + "migrating": { + "state_description": vm_states.MIGRATING, + "task_state": None, + }, + "pending": { + "state_description": vm_states.BUILDING, + "task_state": task_states.SCHEDULING, + }, +} + + +_downgrade_translations = { + vm_states.ACTIVE: { + None: "running", + task_states.DELETING: "terminating", + task_states.STOPPING: "stopping", + }, + vm_states.BUILDING: { + None: "pending", + task_states.SCHEDULING: "scheduling", + }, + vm_states.STOPPED: { + None: "stopped", + }, + vm_states.REBUILDING: { + None: "pending", + }, + vm_states.DELETED: { + None: "terminated", + }, + vm_states.MIGRATING: { + None: "migrating", + }, +} + + +def upgrade(migrate_engine): + meta.bind = migrate_engine + + instance_table = Table('instances', meta, autoload=True, + autoload_with=migrate_engine) + + c_state = instance_table.c.state + c_state.alter(name='power_state') + + c_vm_state = instance_table.c.state_description + c_vm_state.alter(name='vm_state') + + instance_table.create_column(c_task_state) + + for old_state, values in _upgrade_translations.iteritems(): + instance_table.update().\ + values(**values).\ + where(c_vm_state == old_state).\ + execute() + + +def downgrade(migrate_engine): + meta.bind = migrate_engine + + instance_table = Table('instances', meta, autoload=True, + autoload_with=migrate_engine) + + c_task_state = instance_table.c.task_state + + c_state = instance_table.c.power_state + c_state.alter(name='state') + + c_vm_state = instance_table.c.vm_state + c_vm_state.alter(name='state_description') + + for old_vm_state, old_task_states in _downgrade_translations.iteritems(): + for old_task_state, new_state_desc in old_task_states.iteritems(): + instance_table.update().\ + where(c_task_state == old_task_state).\ + where(c_vm_state == old_vm_state).\ + values(vm_state=new_state_desc).\ + execute() + + instance_table.drop_column('task_state') diff --git a/nova/db/sqlalchemy/migrate_repo/versions/044_add_network_priority.py b/nova/db/sqlalchemy/migrate_repo/versions/045_add_network_priority.py index b9b0ea37c..b9b0ea37c 100644 --- a/nova/db/sqlalchemy/migrate_repo/versions/044_add_network_priority.py +++ b/nova/db/sqlalchemy/migrate_repo/versions/045_add_network_priority.py diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py index dc6f85aad..211049112 100644 --- a/nova/db/sqlalchemy/models.py +++ b/nova/db/sqlalchemy/models.py @@ -193,8 +193,9 @@ class Instance(BASE, NovaBase): key_name = Column(String(255)) key_data = Column(Text) - state = Column(Integer) - state_description = Column(String(255)) + power_state = Column(Integer) + vm_state = Column(String(255)) + task_state = Column(String(255)) memory_mb = Column(Integer) vcpus = Column(Integer) @@ -238,17 +239,6 @@ class Instance(BASE, NovaBase): access_ip_v4 = Column(String(255)) access_ip_v6 = Column(String(255)) - # TODO(vish): see Ewan's email about state improvements, probably - # should be in a driver base class or some such - # vmstate_state = running, halted, suspended, paused - # power_state = what we have - # task_state = transitory and may trigger power state transition - - #@validates('state') - #def validate_state(self, key, state): - # assert(state in ['nostate', 'running', 'blocked', 'paused', - # 'shutdown', 'shutoff', 'crashed']) - class VirtualStorageArray(BASE, NovaBase): """ diff --git a/nova/exception.py b/nova/exception.py index 5a365897d..8e73fdfc8 100644 --- a/nova/exception.py +++ b/nova/exception.py @@ -61,7 +61,7 @@ class ApiError(Error): super(ApiError, self).__init__(outstr) -class BuildInProgress(Error): +class RebuildRequiresActiveInstance(Error): pass @@ -146,6 +146,7 @@ class NovaException(Exception): message = _("An unknown exception occurred.") def __init__(self, **kwargs): + self.kwargs = kwargs try: self._error_string = self.message % kwargs @@ -402,10 +403,6 @@ class KernelNotFoundForImage(ImageNotFound): message = _("Kernel not found for image %(image_id)s.") -class RamdiskNotFoundForImage(ImageNotFound): - message = _("Ramdisk not found for image %(image_id)s.") - - class UserNotFound(NotFound): message = _("User %(user_id)s could not be found.") @@ -537,6 +534,10 @@ class NoMoreFloatingIps(FloatingIpNotFound): message = _("Zero floating ips available.") +class FloatingIpAlreadyInUse(NovaException): + message = _("Floating ip %(address)s already in use by %(fixed_ip)s.") + + class NoFloatingIpsDefined(NotFound): message = _("Zero floating ips exist.") diff --git a/nova/flags.py b/nova/flags.py index a5951ebc8..aa76defe5 100644 --- a/nova/flags.py +++ b/nova/flags.py @@ -303,8 +303,12 @@ DEFINE_bool('rabbit_use_ssl', False, 'connect over SSL') DEFINE_string('rabbit_userid', 'guest', 'rabbit userid') DEFINE_string('rabbit_password', 'guest', 'rabbit password') DEFINE_string('rabbit_virtual_host', '/', 'rabbit virtual host') -DEFINE_integer('rabbit_retry_interval', 10, 'rabbit connection retry interval') -DEFINE_integer('rabbit_max_retries', 12, 'rabbit connection attempts') +DEFINE_integer('rabbit_retry_interval', 1, + 'rabbit connection retry interval to start') +DEFINE_integer('rabbit_retry_backoff', 2, + 'rabbit connection retry backoff in seconds') +DEFINE_integer('rabbit_max_retries', 0, + 'maximum rabbit connection attempts (0=try forever)') DEFINE_string('control_exchange', 'nova', 'the main exchange to connect to') DEFINE_boolean('rabbit_durable_queues', False, 'use durable queues') DEFINE_list('enabled_apis', ['ec2', 'osapi'], diff --git a/nova/image/glance.py b/nova/image/glance.py index 9060f6a91..80abc7384 100644 --- a/nova/image/glance.py +++ b/nova/image/glance.py @@ -141,19 +141,30 @@ class GlanceImageService(service.BaseImageService): """Paginate through results from glance server""" images = fetch_func(**kwargs) - for image in images: - yield image - else: + if not images: # break out of recursive loop to end pagination return + for image in images: + yield image + try: # attempt to advance the marker in order to fetch next page kwargs['marker'] = images[-1]['id'] except KeyError: raise exception.ImagePaginationFailed() - self._fetch_images(fetch_func, **kwargs) + try: + kwargs['limit'] = kwargs['limit'] - len(images) + # break if we have reached a provided limit + if kwargs['limit'] <= 0: + return + except KeyError: + # ignore missing limit, just proceed without it + pass + + for image in self._fetch_images(fetch_func, **kwargs): + yield image def show(self, context, image_id): """Returns a dict with image data for the given opaque image id.""" @@ -269,6 +280,20 @@ class GlanceImageService(service.BaseImageService): image_meta = _convert_from_string(image_meta) return image_meta + @staticmethod + def _is_image_available(context, image_meta): + """Check image availability. + + Under Glance, images are always available if the context has + an auth_token. Otherwise, we fall back to the superclass + method. + + """ + if hasattr(context, 'auth_token') and context.auth_token: + return True + return service.BaseImageService._is_image_available(context, + image_meta) + # utility functions def _convert_timestamps_to_datetimes(image_meta): diff --git a/nova/network/api.py b/nova/network/api.py index d04474df3..78580d360 100644 --- a/nova/network/api.py +++ b/nova/network/api.py @@ -111,6 +111,12 @@ class API(base.Base): '(%(project)s)') % {'address': floating_ip['address'], 'project': context.project_id}) + + # If this address has been previously associated to a + # different instance, disassociate the floating_ip + if floating_ip['fixed_ip'] and floating_ip['fixed_ip'] is not fixed_ip: + self.disassociate_floating_ip(context, floating_ip['address']) + # NOTE(vish): if we are multi_host, send to the instances host if fixed_ip['network']['multi_host']: host = fixed_ip['instance']['host'] diff --git a/nova/network/manager.py b/nova/network/manager.py index 6730e808f..fa50b9ff2 100644 --- a/nova/network/manager.py +++ b/nova/network/manager.py @@ -74,7 +74,7 @@ flags.DEFINE_string('flat_network_bridge', None, 'Bridge for simple network instances') flags.DEFINE_string('flat_network_dns', '8.8.4.4', 'Dns for simple network') -flags.DEFINE_bool('flat_injected', True, +flags.DEFINE_bool('flat_injected', False, 'Whether to attempt to inject network setup into guest') flags.DEFINE_string('flat_interface', None, 'FlatDhcp will bridge into this interface if set') @@ -280,6 +280,13 @@ class FloatingIP(object): def associate_floating_ip(self, context, floating_address, fixed_address): """Associates an floating ip to a fixed ip.""" + floating_ip = self.db.floating_ip_get_by_address(context, + floating_address) + if floating_ip['fixed_ip']: + raise exception.FloatingIpAlreadyInUse( + address=floating_ip['address'], + fixed_ip=floating_ip['fixed_ip']['address']) + self.db.floating_ip_fixed_ip_associate(context, floating_address, fixed_address) diff --git a/nova/notifier/api.py b/nova/notifier/api.py index 6ef4a050e..043838536 100644 --- a/nova/notifier/api.py +++ b/nova/notifier/api.py @@ -122,4 +122,5 @@ def notify(publisher_id, event_type, priority, payload): driver.notify(msg) except Exception, e: LOG.exception(_("Problem '%(e)s' attempting to " - "send to notification system." % locals())) + "send to notification system. Payload=%(payload)s" % + locals())) diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py index bdf7f705b..c0cfdd5ce 100644 --- a/nova/rpc/__init__.py +++ b/nova/rpc/__init__.py @@ -23,44 +23,35 @@ from nova import flags FLAGS = flags.FLAGS flags.DEFINE_string('rpc_backend', - 'nova.rpc.amqp', - "The messaging module to use, defaults to AMQP.") + 'nova.rpc.impl_kombu', + "The messaging module to use, defaults to kombu.") -RPCIMPL = import_object(FLAGS.rpc_backend) +_RPCIMPL = None -def create_connection(new=True): - return RPCIMPL.Connection.instance(new=True) - +def get_impl(): + """Delay import of rpc_backend until FLAGS are loaded.""" + global _RPCIMPL + if _RPCIMPL is None: + _RPCIMPL = import_object(FLAGS.rpc_backend) + return _RPCIMPL -def create_consumer(conn, topic, proxy, fanout=False): - if fanout: - return RPCIMPL.FanoutAdapterConsumer( - connection=conn, - topic=topic, - proxy=proxy) - else: - return RPCIMPL.TopicAdapterConsumer( - connection=conn, - topic=topic, - proxy=proxy) - -def create_consumer_set(conn, consumers): - return RPCIMPL.ConsumerSet(connection=conn, consumer_list=consumers) +def create_connection(new=True): + return get_impl().create_connection(new=new) def call(context, topic, msg): - return RPCIMPL.call(context, topic, msg) + return get_impl().call(context, topic, msg) def cast(context, topic, msg): - return RPCIMPL.cast(context, topic, msg) + return get_impl().cast(context, topic, msg) def fanout_cast(context, topic, msg): - return RPCIMPL.fanout_cast(context, topic, msg) + return get_impl().fanout_cast(context, topic, msg) def multicall(context, topic, msg): - return RPCIMPL.multicall(context, topic, msg) + return get_impl().multicall(context, topic, msg) diff --git a/nova/rpc/common.py b/nova/rpc/common.py index 1d3065a83..b8c280630 100644 --- a/nova/rpc/common.py +++ b/nova/rpc/common.py @@ -1,8 +1,14 @@ from nova import exception +from nova import flags from nova import log as logging LOG = logging.getLogger('nova.rpc') +flags.DEFINE_integer('rpc_thread_pool_size', 1024, + 'Size of RPC thread pool') +flags.DEFINE_integer('rpc_conn_pool_size', 30, + 'Size of RPC connection pool') + class RemoteError(exception.Error): """Signifies that a remote class has raised an exception. diff --git a/nova/rpc/amqp.py b/nova/rpc/impl_carrot.py index fe429b266..303a4ff88 100644 --- a/nova/rpc/amqp.py +++ b/nova/rpc/impl_carrot.py @@ -33,6 +33,7 @@ import uuid from carrot import connection as carrot_connection from carrot import messaging +import eventlet from eventlet import greenpool from eventlet import pools from eventlet import queue @@ -42,21 +43,22 @@ from nova import context from nova import exception from nova import fakerabbit from nova import flags -from nova import log as logging -from nova import utils from nova.rpc.common import RemoteError, LOG +# Needed for tests +eventlet.monkey_patch() FLAGS = flags.FLAGS -flags.DEFINE_integer('rpc_thread_pool_size', 1024, - 'Size of RPC thread pool') -flags.DEFINE_integer('rpc_conn_pool_size', 30, - 'Size of RPC connection pool') class Connection(carrot_connection.BrokerConnection): """Connection instance object.""" + def __init__(self, *args, **kwargs): + super(Connection, self).__init__(*args, **kwargs) + self._rpc_consumers = [] + self._rpc_consumer_thread = None + @classmethod def instance(cls, new=True): """Returns the instance.""" @@ -94,13 +96,63 @@ class Connection(carrot_connection.BrokerConnection): pass return cls.instance() + def close(self): + self.cancel_consumer_thread() + for consumer in self._rpc_consumers: + try: + consumer.close() + except Exception: + # ignore all errors + pass + self._rpc_consumers = [] + super(Connection, self).close() + + def consume_in_thread(self): + """Consumer from all queues/consumers in a greenthread""" + + consumer_set = ConsumerSet(connection=self, + consumer_list=self._rpc_consumers) + + def _consumer_thread(): + try: + consumer_set.wait() + except greenlet.GreenletExit: + return + if self._rpc_consumer_thread is None: + self._rpc_consumer_thread = eventlet.spawn(_consumer_thread) + return self._rpc_consumer_thread + + def cancel_consumer_thread(self): + """Cancel a consumer thread""" + if self._rpc_consumer_thread is not None: + self._rpc_consumer_thread.kill() + try: + self._rpc_consumer_thread.wait() + except greenlet.GreenletExit: + pass + self._rpc_consumer_thread = None + + def create_consumer(self, topic, proxy, fanout=False): + """Create a consumer that calls methods in the proxy""" + if fanout: + consumer = FanoutAdapterConsumer( + connection=self, + topic=topic, + proxy=proxy) + else: + consumer = TopicAdapterConsumer( + connection=self, + topic=topic, + proxy=proxy) + self._rpc_consumers.append(consumer) + class Pool(pools.Pool): """Class that implements a Pool of Connections.""" # TODO(comstud): Timeout connections not used in a while def create(self): - LOG.debug('Creating new connection') + LOG.debug('Pool creating new connection') return Connection.instance(new=True) # Create a ConnectionPool to use for RPC calls. We'll order the @@ -119,25 +171,34 @@ class Consumer(messaging.Consumer): """ def __init__(self, *args, **kwargs): - for i in xrange(FLAGS.rabbit_max_retries): - if i > 0: - time.sleep(FLAGS.rabbit_retry_interval) + max_retries = FLAGS.rabbit_max_retries + sleep_time = FLAGS.rabbit_retry_interval + tries = 0 + while True: + tries += 1 + if tries > 1: + time.sleep(sleep_time) + # backoff for next retry attempt.. if there is one + sleep_time += FLAGS.rabbit_retry_backoff + if sleep_time > 30: + sleep_time = 30 try: super(Consumer, self).__init__(*args, **kwargs) self.failed_connection = False break except Exception as e: # Catching all because carrot sucks + self.failed_connection = True + if max_retries > 0 and tries == max_retries: + break fl_host = FLAGS.rabbit_host fl_port = FLAGS.rabbit_port - fl_intv = FLAGS.rabbit_retry_interval + fl_intv = sleep_time LOG.error(_('AMQP server on %(fl_host)s:%(fl_port)d is' ' unreachable: %(e)s. Trying again in %(fl_intv)d' ' seconds.') % locals()) - self.failed_connection = True if self.failed_connection: LOG.error(_('Unable to connect to AMQP server ' - 'after %d tries. Shutting down.'), - FLAGS.rabbit_max_retries) + 'after %(tries)d tries. Shutting down.') % locals()) sys.exit(1) def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False): @@ -166,12 +227,6 @@ class Consumer(messaging.Consumer): LOG.exception(_('Failed to fetch message from queue: %s' % e)) self.failed_connection = True - def attach_to_eventlet(self): - """Only needed for unit tests!""" - timer = utils.LoopingCall(self.fetch, enable_callbacks=True) - timer.start(0.1) - return timer - class AdapterConsumer(Consumer): """Calls methods on a proxy object based on method and args.""" @@ -242,7 +297,7 @@ class AdapterConsumer(Consumer): # NOTE(vish): this iterates through the generator list(rval) except Exception as e: - logging.exception('Exception during message handling') + LOG.exception('Exception during message handling') if msg_id: msg_reply(msg_id, None, sys.exc_info()) return @@ -520,6 +575,11 @@ class MulticallWaiter(object): yield result +def create_connection(new=True): + """Create a connection""" + return Connection.instance(new=new) + + def call(context, topic, msg): """Sends a message on a topic and wait for a response.""" rv = multicall(context, topic, msg) diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py new file mode 100644 index 000000000..b994a6a10 --- /dev/null +++ b/nova/rpc/impl_kombu.py @@ -0,0 +1,781 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import kombu +import kombu.entity +import kombu.messaging +import kombu.connection +import itertools +import sys +import time +import traceback +import types +import uuid + +import eventlet +from eventlet import greenpool +from eventlet import pools +import greenlet + +from nova import context +from nova import exception +from nova import flags +from nova.rpc.common import RemoteError, LOG + +# Needed for tests +eventlet.monkey_patch() + +FLAGS = flags.FLAGS + + +class ConsumerBase(object): + """Consumer base class.""" + + def __init__(self, channel, callback, tag, **kwargs): + """Declare a queue on an amqp channel. + + 'channel' is the amqp channel to use + 'callback' is the callback to call when messages are received + 'tag' is a unique ID for the consumer on the channel + + queue name, exchange name, and other kombu options are + passed in here as a dictionary. + """ + self.callback = callback + self.tag = str(tag) + self.kwargs = kwargs + self.queue = None + self.reconnect(channel) + + def reconnect(self, channel): + """Re-declare the queue after a rabbit reconnect""" + self.channel = channel + self.kwargs['channel'] = channel + self.queue = kombu.entity.Queue(**self.kwargs) + self.queue.declare() + + def consume(self, *args, **kwargs): + """Actually declare the consumer on the amqp channel. This will + start the flow of messages from the queue. Using the + Connection.iterconsume() iterator will process the messages, + calling the appropriate callback. + + If a callback is specified in kwargs, use that. Otherwise, + use the callback passed during __init__() + + If kwargs['nowait'] is True, then this call will block until + a message is read. + + Messages will automatically be acked if the callback doesn't + raise an exception + """ + + options = {'consumer_tag': self.tag} + options['nowait'] = kwargs.get('nowait', False) + callback = kwargs.get('callback', self.callback) + if not callback: + raise ValueError("No callback defined") + + def _callback(raw_message): + message = self.channel.message_to_python(raw_message) + callback(message.payload) + message.ack() + + self.queue.consume(*args, callback=_callback, **options) + + def cancel(self): + """Cancel the consuming from the queue, if it has started""" + try: + self.queue.cancel(self.tag) + except KeyError, e: + # NOTE(comstud): Kludge to get around a amqplib bug + if str(e) != "u'%s'" % self.tag: + raise + self.queue = None + + +class DirectConsumer(ConsumerBase): + """Queue/consumer class for 'direct'""" + + def __init__(self, channel, msg_id, callback, tag, **kwargs): + """Init a 'direct' queue. + + 'channel' is the amqp channel to use + 'msg_id' is the msg_id to listen on + 'callback' is the callback to call when messages are received + 'tag' is a unique ID for the consumer on the channel + + Other kombu options may be passed + """ + # Default options + options = {'durable': False, + 'auto_delete': True, + 'exclusive': True} + options.update(kwargs) + exchange = kombu.entity.Exchange( + name=msg_id, + type='direct', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(DirectConsumer, self).__init__( + channel, + callback, + tag, + name=msg_id, + exchange=exchange, + routing_key=msg_id, + **options) + + +class TopicConsumer(ConsumerBase): + """Consumer class for 'topic'""" + + def __init__(self, channel, topic, callback, tag, **kwargs): + """Init a 'topic' queue. + + 'channel' is the amqp channel to use + 'topic' is the topic to listen on + 'callback' is the callback to call when messages are received + 'tag' is a unique ID for the consumer on the channel + + Other kombu options may be passed + """ + # Default options + options = {'durable': FLAGS.rabbit_durable_queues, + 'auto_delete': False, + 'exclusive': False} + options.update(kwargs) + exchange = kombu.entity.Exchange( + name=FLAGS.control_exchange, + type='topic', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(TopicConsumer, self).__init__( + channel, + callback, + tag, + name=topic, + exchange=exchange, + routing_key=topic, + **options) + + +class FanoutConsumer(ConsumerBase): + """Consumer class for 'fanout'""" + + def __init__(self, channel, topic, callback, tag, **kwargs): + """Init a 'fanout' queue. + + 'channel' is the amqp channel to use + 'topic' is the topic to listen on + 'callback' is the callback to call when messages are received + 'tag' is a unique ID for the consumer on the channel + + Other kombu options may be passed + """ + unique = uuid.uuid4().hex + exchange_name = '%s_fanout' % topic + queue_name = '%s_fanout_%s' % (topic, unique) + + # Default options + options = {'durable': False, + 'auto_delete': True, + 'exclusive': True} + options.update(kwargs) + exchange = kombu.entity.Exchange( + name=exchange_name, + type='fanout', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(FanoutConsumer, self).__init__( + channel, + callback, + tag, + name=queue_name, + exchange=exchange, + routing_key=topic, + **options) + + +class Publisher(object): + """Base Publisher class""" + + def __init__(self, channel, exchange_name, routing_key, **kwargs): + """Init the Publisher class with the exchange_name, routing_key, + and other options + """ + self.exchange_name = exchange_name + self.routing_key = routing_key + self.kwargs = kwargs + self.reconnect(channel) + + def reconnect(self, channel): + """Re-establish the Producer after a rabbit reconnection""" + self.exchange = kombu.entity.Exchange(name=self.exchange_name, + **self.kwargs) + self.producer = kombu.messaging.Producer(exchange=self.exchange, + channel=channel, routing_key=self.routing_key) + + def send(self, msg): + """Send a message""" + self.producer.publish(msg) + + +class DirectPublisher(Publisher): + """Publisher class for 'direct'""" + def __init__(self, channel, msg_id, **kwargs): + """init a 'direct' publisher. + + Kombu options may be passed as keyword args to override defaults + """ + + options = {'durable': False, + 'auto_delete': True, + 'exclusive': True} + options.update(kwargs) + super(DirectPublisher, self).__init__(channel, + msg_id, + msg_id, + type='direct', + **options) + + +class TopicPublisher(Publisher): + """Publisher class for 'topic'""" + def __init__(self, channel, topic, **kwargs): + """init a 'topic' publisher. + + Kombu options may be passed as keyword args to override defaults + """ + options = {'durable': FLAGS.rabbit_durable_queues, + 'auto_delete': False, + 'exclusive': False} + options.update(kwargs) + super(TopicPublisher, self).__init__(channel, + FLAGS.control_exchange, + topic, + type='topic', + **options) + + +class FanoutPublisher(Publisher): + """Publisher class for 'fanout'""" + def __init__(self, channel, topic, **kwargs): + """init a 'fanout' publisher. + + Kombu options may be passed as keyword args to override defaults + """ + options = {'durable': False, + 'auto_delete': True, + 'exclusive': True} + options.update(kwargs) + super(FanoutPublisher, self).__init__(channel, + '%s_fanout' % topic, + None, + type='fanout', + **options) + + +class Connection(object): + """Connection object.""" + + def __init__(self): + self.consumers = [] + self.consumer_thread = None + self.max_retries = FLAGS.rabbit_max_retries + # Try forever? + if self.max_retries <= 0: + self.max_retries = None + self.interval_start = FLAGS.rabbit_retry_interval + self.interval_stepping = FLAGS.rabbit_retry_backoff + # max retry-interval = 30 seconds + self.interval_max = 30 + self.memory_transport = False + + self.params = dict(hostname=FLAGS.rabbit_host, + port=FLAGS.rabbit_port, + userid=FLAGS.rabbit_userid, + password=FLAGS.rabbit_password, + virtual_host=FLAGS.rabbit_virtual_host) + if FLAGS.fake_rabbit: + self.params['transport'] = 'memory' + self.memory_transport = True + else: + self.memory_transport = False + self.connection = None + self.reconnect() + + def reconnect(self): + """Handles reconnecting and re-estblishing queues""" + if self.connection: + try: + self.connection.close() + except self.connection.connection_errors: + pass + time.sleep(1) + self.connection = kombu.connection.BrokerConnection(**self.params) + if self.memory_transport: + # Kludge to speed up tests. + self.connection.transport.polling_interval = 0.0 + self.consumer_num = itertools.count(1) + + try: + self.connection.ensure_connection(errback=self.connect_error, + max_retries=self.max_retries, + interval_start=self.interval_start, + interval_step=self.interval_stepping, + interval_max=self.interval_max) + except self.connection.connection_errors, e: + # We should only get here if max_retries is set. We'll go + # ahead and exit in this case. + err_str = str(e) + max_retries = self.max_retries + LOG.error(_('Unable to connect to AMQP server ' + 'after %(max_retries)d tries: %(err_str)s') % locals()) + sys.exit(1) + LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d' % + self.params)) + self.channel = self.connection.channel() + # work around 'memory' transport bug in 1.1.3 + if self.memory_transport: + self.channel._new_queue('ae.undeliver') + for consumer in self.consumers: + consumer.reconnect(self.channel) + if self.consumers: + LOG.debug(_("Re-established AMQP queues")) + + def get_channel(self): + """Convenience call for bin/clear_rabbit_queues""" + return self.channel + + def connect_error(self, exc, interval): + """Callback when there are connection re-tries by kombu""" + info = self.params.copy() + info['intv'] = interval + info['e'] = exc + LOG.error(_('AMQP server on %(hostname)s:%(port)d is' + ' unreachable: %(e)s. Trying again in %(intv)d' + ' seconds.') % info) + + def close(self): + """Close/release this connection""" + self.cancel_consumer_thread() + self.connection.release() + self.connection = None + + def reset(self): + """Reset a connection so it can be used again""" + self.cancel_consumer_thread() + self.channel.close() + self.channel = self.connection.channel() + # work around 'memory' transport bug in 1.1.3 + if self.memory_transport: + self.channel._new_queue('ae.undeliver') + self.consumers = [] + + def declare_consumer(self, consumer_cls, topic, callback): + """Create a Consumer using the class that was passed in and + add it to our list of consumers + """ + consumer = consumer_cls(self.channel, topic, callback, + self.consumer_num.next()) + self.consumers.append(consumer) + return consumer + + def iterconsume(self, limit=None): + """Return an iterator that will consume from all queues/consumers""" + while True: + try: + queues_head = self.consumers[:-1] + queues_tail = self.consumers[-1] + for queue in queues_head: + queue.consume(nowait=True) + queues_tail.consume(nowait=False) + + for iteration in itertools.count(0): + if limit and iteration >= limit: + raise StopIteration + yield self.connection.drain_events() + except self.connection.connection_errors, e: + LOG.exception(_('Failed to consume message from queue: ' + '%s' % str(e))) + self.reconnect() + + def cancel_consumer_thread(self): + """Cancel a consumer thread""" + if self.consumer_thread is not None: + self.consumer_thread.kill() + try: + self.consumer_thread.wait() + except greenlet.GreenletExit: + pass + self.consumer_thread = None + + def publisher_send(self, cls, topic, msg): + """Send to a publisher based on the publisher class""" + while True: + publisher = None + try: + publisher = cls(self.channel, topic) + publisher.send(msg) + return + except self.connection.connection_errors, e: + LOG.exception(_('Failed to publish message %s' % str(e))) + try: + self.reconnect() + if publisher: + publisher.reconnect(self.channel) + except self.connection.connection_errors, e: + pass + + def declare_direct_consumer(self, topic, callback): + """Create a 'direct' queue. + In nova's use, this is generally a msg_id queue used for + responses for call/multicall + """ + self.declare_consumer(DirectConsumer, topic, callback) + + def declare_topic_consumer(self, topic, callback=None): + """Create a 'topic' consumer.""" + self.declare_consumer(TopicConsumer, topic, callback) + + def declare_fanout_consumer(self, topic, callback): + """Create a 'fanout' consumer""" + self.declare_consumer(FanoutConsumer, topic, callback) + + def direct_send(self, msg_id, msg): + """Send a 'direct' message""" + self.publisher_send(DirectPublisher, msg_id, msg) + + def topic_send(self, topic, msg): + """Send a 'topic' message""" + self.publisher_send(TopicPublisher, topic, msg) + + def fanout_send(self, topic, msg): + """Send a 'fanout' message""" + self.publisher_send(FanoutPublisher, topic, msg) + + def consume(self, limit=None): + """Consume from all queues/consumers""" + it = self.iterconsume(limit=limit) + while True: + try: + it.next() + except StopIteration: + return + + def consume_in_thread(self): + """Consumer from all queues/consumers in a greenthread""" + def _consumer_thread(): + try: + self.consume() + except greenlet.GreenletExit: + return + if self.consumer_thread is None: + self.consumer_thread = eventlet.spawn(_consumer_thread) + return self.consumer_thread + + def create_consumer(self, topic, proxy, fanout=False): + """Create a consumer that calls a method in a proxy object""" + if fanout: + self.declare_fanout_consumer(topic, ProxyCallback(proxy)) + else: + self.declare_topic_consumer(topic, ProxyCallback(proxy)) + + +class Pool(pools.Pool): + """Class that implements a Pool of Connections.""" + + # TODO(comstud): Timeout connections not used in a while + def create(self): + LOG.debug('Pool creating new connection') + return Connection() + +# Create a ConnectionPool to use for RPC calls. We'll order the +# pool as a stack (LIFO), so that we can potentially loop through and +# timeout old unused connections at some point +ConnectionPool = Pool( + max_size=FLAGS.rpc_conn_pool_size, + order_as_stack=True) + + +class ConnectionContext(object): + """The class that is actually returned to the caller of + create_connection(). This is a essentially a wrapper around + Connection that supports 'with' and can return a new Connection or + one from a pool. It will also catch when an instance of this class + is to be deleted so that we can return Connections to the pool on + exceptions and so forth without making the caller be responsible for + catching all exceptions and making sure to return a connection to + the pool. + """ + + def __init__(self, pooled=True): + """Create a new connection, or get one from the pool""" + self.connection = None + if pooled: + self.connection = ConnectionPool.get() + else: + self.connection = Connection() + self.pooled = pooled + + def __enter__(self): + """with ConnectionContext() should return self""" + return self + + def _done(self): + """If the connection came from a pool, clean it up and put it back. + If it did not come from a pool, close it. + """ + if self.connection: + if self.pooled: + # Reset the connection so it's ready for the next caller + # to grab from the pool + self.connection.reset() + ConnectionPool.put(self.connection) + else: + try: + self.connection.close() + except Exception: + # There's apparently a bug in kombu 'memory' transport + # which causes an assert failure. + # But, we probably want to ignore all exceptions when + # trying to close a connection, anyway... + pass + self.connection = None + + def __exit__(self, t, v, tb): + """end of 'with' statement. We're done here.""" + self._done() + + def __del__(self): + """Caller is done with this connection. Make sure we cleaned up.""" + self._done() + + def close(self): + """Caller is done with this connection.""" + self._done() + + def __getattr__(self, key): + """Proxy all other calls to the Connection instance""" + if self.connection: + return getattr(self.connection, key) + else: + raise exception.InvalidRPCConnectionReuse() + + +class ProxyCallback(object): + """Calls methods on a proxy object based on method and args.""" + + def __init__(self, proxy): + self.proxy = proxy + self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size) + + def __call__(self, message_data): + """Consumer callback to call a method on a proxy object. + + Parses the message for validity and fires off a thread to call the + proxy object method. + + Message data should be a dictionary with two keys: + method: string representing the method to call + args: dictionary of arg: value + + Example: {'method': 'echo', 'args': {'value': 42}} + + """ + LOG.debug(_('received %s') % message_data) + ctxt = _unpack_context(message_data) + method = message_data.get('method') + args = message_data.get('args', {}) + if not method: + LOG.warn(_('no method for message: %s') % message_data) + ctxt.reply(_('No method for message: %s') % message_data) + return + self.pool.spawn_n(self._process_data, ctxt, method, args) + + @exception.wrap_exception() + def _process_data(self, ctxt, method, args): + """Thread that maigcally looks for a method on the proxy + object and calls it. + """ + + node_func = getattr(self.proxy, str(method)) + node_args = dict((str(k), v) for k, v in args.iteritems()) + # NOTE(vish): magic is fun! + try: + rval = node_func(context=ctxt, **node_args) + # Check if the result was a generator + if isinstance(rval, types.GeneratorType): + for x in rval: + ctxt.reply(x, None) + else: + ctxt.reply(rval, None) + # This final None tells multicall that it is done. + ctxt.reply(None, None) + except Exception as e: + LOG.exception('Exception during message handling') + ctxt.reply(None, sys.exc_info()) + return + + +def _unpack_context(msg): + """Unpack context from msg.""" + context_dict = {} + for key in list(msg.keys()): + # NOTE(vish): Some versions of python don't like unicode keys + # in kwargs. + key = str(key) + if key.startswith('_context_'): + value = msg.pop(key) + context_dict[key[9:]] = value + context_dict['msg_id'] = msg.pop('_msg_id', None) + LOG.debug(_('unpacked context: %s'), context_dict) + return RpcContext.from_dict(context_dict) + + +def _pack_context(msg, context): + """Pack context into msg. + + Values for message keys need to be less than 255 chars, so we pull + context out into a bunch of separate keys. If we want to support + more arguments in rabbit messages, we may want to do the same + for args at some point. + + """ + context_d = dict([('_context_%s' % key, value) + for (key, value) in context.to_dict().iteritems()]) + msg.update(context_d) + + +class RpcContext(context.RequestContext): + """Context that supports replying to a rpc.call""" + def __init__(self, *args, **kwargs): + msg_id = kwargs.pop('msg_id', None) + self.msg_id = msg_id + super(RpcContext, self).__init__(*args, **kwargs) + + def reply(self, *args, **kwargs): + if self.msg_id: + msg_reply(self.msg_id, *args, **kwargs) + + +class MulticallWaiter(object): + def __init__(self, connection): + self._connection = connection + self._iterator = connection.iterconsume() + self._result = None + self._done = False + + def done(self): + self._done = True + self._connection.close() + + def __call__(self, data): + """The consume() callback will call this. Store the result.""" + if data['failure']: + self._result = RemoteError(*data['failure']) + else: + self._result = data['result'] + + def __iter__(self): + """Return a result until we get a 'None' response from consumer""" + if self._done: + raise StopIteration + while True: + self._iterator.next() + result = self._result + if isinstance(result, Exception): + self.done() + raise result + if result == None: + self.done() + raise StopIteration + yield result + + +def create_connection(new=True): + """Create a connection""" + return ConnectionContext(pooled=not new) + + +def multicall(context, topic, msg): + """Make a call that returns multiple times.""" + # Can't use 'with' for multicall, as it returns an iterator + # that will continue to use the connection. When it's done, + # connection.close() will get called which will put it back into + # the pool + LOG.debug(_('Making asynchronous call on %s ...'), topic) + msg_id = uuid.uuid4().hex + msg.update({'_msg_id': msg_id}) + LOG.debug(_('MSG_ID is %s') % (msg_id)) + _pack_context(msg, context) + + conn = ConnectionContext() + wait_msg = MulticallWaiter(conn) + conn.declare_direct_consumer(msg_id, wait_msg) + conn.topic_send(topic, msg) + + return wait_msg + + +def call(context, topic, msg): + """Sends a message on a topic and wait for a response.""" + rv = multicall(context, topic, msg) + # NOTE(vish): return the last result from the multicall + rv = list(rv) + if not rv: + return + return rv[-1] + + +def cast(context, topic, msg): + """Sends a message on a topic without waiting for a response.""" + LOG.debug(_('Making asynchronous cast on %s...'), topic) + _pack_context(msg, context) + with ConnectionContext() as conn: + conn.topic_send(topic, msg) + + +def fanout_cast(context, topic, msg): + """Sends a message on a fanout exchange without waiting for a response.""" + LOG.debug(_('Making asynchronous fanout cast...')) + _pack_context(msg, context) + with ConnectionContext() as conn: + conn.fanout_send(topic, msg) + + +def msg_reply(msg_id, reply=None, failure=None): + """Sends a reply or an error on the channel signified by msg_id. + + Failure should be a sys.exc_info() tuple. + + """ + with ConnectionContext() as conn: + if failure: + message = str(failure[1]) + tb = traceback.format_exception(*failure) + LOG.error(_("Returning exception %s to caller"), message) + LOG.error(tb) + failure = (failure[0].__name__, str(failure[1]), tb) + + try: + msg = {'result': reply, 'failure': failure} + except TypeError: + msg = {'result': dict((k, repr(v)) + for k, v in reply.__dict__.iteritems()), + 'failure': failure} + conn.direct_send(msg_id, msg) diff --git a/nova/scheduler/abstract_scheduler.py b/nova/scheduler/abstract_scheduler.py index e8c343a4b..7f17b642f 100644 --- a/nova/scheduler/abstract_scheduler.py +++ b/nova/scheduler/abstract_scheduler.py @@ -180,7 +180,6 @@ class AbstractScheduler(driver.Scheduler): for zone_id, result in child_results: if not result: continue - assert isinstance(zone_id, int) for zone_rec in zones: if zone_rec['id'] != zone_id: diff --git a/nova/scheduler/driver.py b/nova/scheduler/driver.py index f28353f05..22f4e14f9 100644 --- a/nova/scheduler/driver.py +++ b/nova/scheduler/driver.py @@ -30,6 +30,7 @@ from nova import log as logging from nova import rpc from nova import utils from nova.compute import power_state +from nova.compute import vm_states from nova.api.ec2 import ec2utils @@ -104,10 +105,8 @@ class Scheduler(object): dest, block_migration) # Changing instance_state. - db.instance_set_state(context, - instance_id, - power_state.PAUSED, - 'migrating') + values = {"vm_state": vm_states.MIGRATING} + db.instance_update(context, instance_id, values) # Changing volume state for volume_ref in instance_ref['volumes']: @@ -129,8 +128,7 @@ class Scheduler(object): """ # Checking instance is running. - if (power_state.RUNNING != instance_ref['state'] or \ - 'running' != instance_ref['state_description']): + if instance_ref['power_state'] != power_state.RUNNING: instance_id = ec2utils.id_to_ec2_id(instance_ref['id']) raise exception.InstanceNotRunning(instance_id=instance_id) diff --git a/nova/scheduler/host_filter.py b/nova/scheduler/host_filter.py index 826a99b0a..9f7d34ea7 100644 --- a/nova/scheduler/host_filter.py +++ b/nova/scheduler/host_filter.py @@ -32,6 +32,12 @@ from nova import exception from nova import flags import nova.scheduler +# NOTE(Vek): Even though we don't use filters in here anywhere, we +# depend on default_host_filter being available in FLAGS, +# and that happens only when filters/abstract_filter.py is +# imported. +from nova.scheduler import filters + FLAGS = flags.FLAGS diff --git a/nova/scheduler/manager.py b/nova/scheduler/manager.py index 0e395ee79..bf18abc6c 100644 --- a/nova/scheduler/manager.py +++ b/nova/scheduler/manager.py @@ -93,12 +93,14 @@ class SchedulerManager(manager.Manager): driver_method = 'schedule_%s' % method elevated = context.elevated() try: - host = getattr(self.driver, driver_method)(elevated, *args, - **kwargs) + real_meth = getattr(self.driver, driver_method) + args = (elevated,) + args except AttributeError, e: LOG.warning(_("Driver Method %(driver_method)s missing: %(e)s." - "Reverting to schedule()") % locals()) - host = self.driver.schedule(elevated, topic, *args, **kwargs) + "Reverting to schedule()") % locals()) + real_meth = self.driver.schedule + args = (elevated, topic) + args + host = real_meth(*args, **kwargs) if not host: LOG.debug(_("%(topic)s %(method)s handled in Scheduler") diff --git a/nova/service.py b/nova/service.py index 959e79052..247eb4fb1 100644 --- a/nova/service.py +++ b/nova/service.py @@ -153,26 +153,15 @@ class Service(object): self.topic) # Share this same connection for these Consumers - consumer_all = rpc.create_consumer(self.conn, self.topic, self, - fanout=False) + self.conn.create_consumer(self.topic, self, fanout=False) node_topic = '%s.%s' % (self.topic, self.host) - consumer_node = rpc.create_consumer(self.conn, node_topic, self, - fanout=False) + self.conn.create_consumer(node_topic, self, fanout=False) - fanout = rpc.create_consumer(self.conn, self.topic, self, fanout=True) + self.conn.create_consumer(self.topic, self, fanout=True) - consumers = [consumer_all, consumer_node, fanout] - consumer_set = rpc.create_consumer_set(self.conn, consumers) - - # Wait forever, processing these consumers - def _wait(): - try: - consumer_set.wait() - finally: - consumer_set.close() - - self.consumer_set_thread = eventlet.spawn(_wait) + # Consume from all consumers in a thread + self.conn.consume_in_thread() if self.report_interval: pulse = utils.LoopingCall(self.report_state) @@ -237,10 +226,11 @@ class Service(object): logging.warn(_('Service killed that has no database entry')) def stop(self): - self.consumer_set_thread.kill() + # Try to shut the connection down, but if we get any sort of + # errors, go ahead and ignore them.. as we're shutting down anyway try: - self.consumer_set_thread.wait() - except greenlet.GreenletExit: + self.conn.close() + except Exception: pass for x in self.timers: try: diff --git a/nova/test.py b/nova/test.py index 88f1489e8..d759aef60 100644 --- a/nova/test.py +++ b/nova/test.py @@ -277,3 +277,21 @@ class TestCase(unittest.TestCase): continue else: self.assertEqual(sub_value, super_value) + + def assertIn(self, a, b, *args, **kwargs): + """Python < v2.7 compatibility. Assert 'a' in 'b'""" + try: + f = super(TestCase, self).assertIn + except AttributeError: + self.assertTrue(a in b, *args, **kwargs) + else: + f(a, b, *args, **kwargs) + + def assertNotIn(self, a, b, *args, **kwargs): + """Python < v2.7 compatibility. Assert 'a' NOT in 'b'""" + try: + f = super(TestCase, self).assertNotIn + except AttributeError: + self.assertFalse(a in b, *args, **kwargs) + else: + f(a, b, *args, **kwargs) diff --git a/nova/tests/api/ec2/__init__.py b/nova/tests/api/ec2/__init__.py new file mode 100644 index 000000000..6dab802f2 --- /dev/null +++ b/nova/tests/api/ec2/__init__.py @@ -0,0 +1,19 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 Openstack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# NOTE(vish): this forces the fixtures from tests/__init.py:setup() to work +from nova.tests import * diff --git a/nova/tests/test_middleware.py b/nova/tests/api/ec2/test_middleware.py index 40d117c45..295f6c4ea 100644 --- a/nova/tests/test_middleware.py +++ b/nova/tests/api/ec2/test_middleware.py @@ -21,10 +21,13 @@ import webob.dec import webob.exc from nova.api import ec2 +from nova import context +from nova import exception from nova import flags from nova import test from nova import utils +from xml.etree.ElementTree import fromstring as xml_to_tree FLAGS = flags.FLAGS @@ -83,3 +86,45 @@ class LockoutTestCase(test.TestCase): utils.advance_time_seconds(FLAGS.lockout_window * 60) self._send_bad_attempts('test', FLAGS.lockout_attempts - 1) self.assertFalse(self._is_locked_out('test')) + + +class ExecutorTestCase(test.TestCase): + def setUp(self): + super(ExecutorTestCase, self).setUp() + self.executor = ec2.Executor() + + def _execute(self, invoke): + class Fake(object): + pass + fake_ec2_request = Fake() + fake_ec2_request.invoke = invoke + + fake_wsgi_request = Fake() + + fake_wsgi_request.environ = { + 'nova.context': context.get_admin_context(), + 'ec2.request': fake_ec2_request, + } + return self.executor(fake_wsgi_request) + + def _extract_message(self, result): + tree = xml_to_tree(result.body) + return tree.findall('./Errors')[0].find('Error/Message').text + + def test_instance_not_found(self): + def not_found(context): + raise exception.InstanceNotFound(instance_id=5) + result = self._execute(not_found) + self.assertIn('i-00000005', self._extract_message(result)) + + def test_snapshot_not_found(self): + def not_found(context): + raise exception.SnapshotNotFound(snapshot_id=5) + result = self._execute(not_found) + self.assertIn('snap-00000005', self._extract_message(result)) + + def test_volume_not_found(self): + def not_found(context): + raise exception.VolumeNotFound(volume_id=5) + result = self._execute(not_found) + self.assertIn('vol-00000005', self._extract_message(result)) diff --git a/nova/tests/api/openstack/contrib/test_createserverext.py b/nova/tests/api/openstack/contrib/test_createserverext.py index e5eed14fe..078b72d67 100644 --- a/nova/tests/api/openstack/contrib/test_createserverext.py +++ b/nova/tests/api/openstack/contrib/test_createserverext.py @@ -16,6 +16,7 @@ # under the License. import base64 +import datetime import json import unittest from xml.dom import minidom @@ -23,18 +24,11 @@ from xml.dom import minidom import stubout import webob +from nova import db from nova import exception from nova import flags from nova import test -from nova import utils import nova.api.openstack -from nova.api.openstack import servers -from nova.api.openstack.contrib import createserverext -import nova.compute.api - -import nova.scheduler.api -import nova.image.fake -import nova.rpc from nova.tests.api.openstack import fakes @@ -51,22 +45,45 @@ DUPLICATE_NETWORKS = [('aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', '10.0.1.12'), INVALID_NETWORKS = [('invalid', 'invalid-ip-address')] +INSTANCE = { + "id": 1, + "display_name": "test_server", + "uuid": FAKE_UUID, + "created_at": datetime.datetime(2010, 10, 10, 12, 0, 0), + "updated_at": datetime.datetime(2010, 11, 11, 11, 0, 0), + "security_groups": [{"id": 1, "name": "test"}] + } + + +def return_server_by_id(context, id, session=None): + INSTANCE['id'] = id + return INSTANCE + + +def return_security_group_non_existing(context, project_id, group_name): + raise exception.SecurityGroupNotFoundForProject(project_id=project_id, + security_group_id=group_name) + + +def return_security_group_get_by_name(context, project_id, group_name): + return {'id': 1, 'name': group_name} + + +def return_security_group_get(context, security_group_id, session): + return {'id': security_group_id} + + +def return_instance_add_security_group(context, instance_id, + security_group_id): + pass + class CreateserverextTest(test.TestCase): def setUp(self): super(CreateserverextTest, self).setUp() - self.stubs = stubout.StubOutForTesting() - fakes.FakeAuthManager.auth_data = {} - fakes.FakeAuthDatabase.data = {} - fakes.stub_out_auth(self.stubs) - fakes.stub_out_image_service(self.stubs) - fakes.stub_out_key_pair_funcs(self.stubs) - self.allow_admin = FLAGS.allow_admin_api def tearDown(self): - self.stubs.UnsetAll() - FLAGS.allow_admin_api = self.allow_admin super(CreateserverextTest, self).tearDown() def _setup_mock_compute_api(self): @@ -76,6 +93,8 @@ class CreateserverextTest(test.TestCase): def __init__(self): self.injected_files = None self.networks = None + self.user_data = None + self.db = db def create(self, *args, **kwargs): if 'injected_files' in kwargs: @@ -87,8 +106,14 @@ class CreateserverextTest(test.TestCase): self.networks = kwargs['requested_networks'] else: self.networks = None + + if 'user_data' in kwargs: + self.user_data = kwargs['user_data'] + return [{'id': '1234', 'display_name': 'fakeinstance', 'uuid': FAKE_UUID, + 'user_id': 'fake', + 'project_id': 'fake', 'created_at': "", 'updated_at': ""}] @@ -107,6 +132,18 @@ class CreateserverextTest(test.TestCase): '_get_kernel_ramdisk_from_image', make_stub_method((1, 1))) return compute_api + def _create_security_group_request_dict(self, security_groups): + server = {} + server['name'] = 'new-server-test' + server['imageRef'] = 1 + server['flavorRef'] = 1 + if security_groups is not None: + sg_list = [] + for name in security_groups: + sg_list.append({'name': name}) + server['security_groups'] = sg_list + return {'server': server} + def _create_networks_request_dict(self, networks): server = {} server['name'] = 'new-server-test' @@ -119,6 +156,14 @@ class CreateserverextTest(test.TestCase): server['networks'] = network_list return {'server': server} + def _create_user_data_request_dict(self, user_data): + server = {} + server['name'] = 'new-server-test' + server['imageRef'] = 1 + server['flavorRef'] = 1 + server['user_data'] = user_data + return {'server': server} + def _get_create_request_json(self, body_dict): req = webob.Request.blank('/v1.1/123/os-create-server-ext') req.headers['Content-Type'] = 'application/json' @@ -178,6 +223,13 @@ class CreateserverextTest(test.TestCase): self._run_create_instance_with_mock_compute_api(request) return request, response, compute_api.networks + def _create_instance_with_user_data_json(self, networks): + body_dict = self._create_user_data_request_dict(networks) + request = self._get_create_request_json(body_dict) + compute_api, response = \ + self._run_create_instance_with_mock_compute_api(request) + return request, response, compute_api.user_data + def _create_instance_with_networks_xml(self, networks): body_dict = self._create_networks_request_dict(networks) request = self._get_create_request_xml(body_dict) @@ -304,3 +356,60 @@ class CreateserverextTest(test.TestCase): self.assertEquals(response.status_int, 202) self.assertEquals(compute_api.networks, [('aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', None)]) + + def test_create_instance_with_userdata(self): + user_data_contents = '#!/bin/bash\necho "Oh no!"\n' + user_data_contents = base64.b64encode(user_data_contents) + request, response, user_data = \ + self._create_instance_with_user_data_json(user_data_contents) + self.assertEquals(response.status_int, 202) + self.assertEquals(user_data, user_data_contents) + + def test_create_instance_with_userdata_none(self): + user_data_contents = None + request, response, user_data = \ + self._create_instance_with_user_data_json(user_data_contents) + self.assertEquals(response.status_int, 202) + self.assertEquals(user_data, user_data_contents) + + def test_create_instance_with_userdata_with_non_b64_content(self): + user_data_contents = '#!/bin/bash\necho "Oh no!"\n' + request, response, user_data = \ + self._create_instance_with_user_data_json(user_data_contents) + self.assertEquals(response.status_int, 400) + self.assertEquals(user_data, None) + + def test_create_instance_with_security_group_json(self): + security_groups = ['test', 'test1'] + self.stubs.Set(nova.db.api, 'security_group_get_by_name', + return_security_group_get_by_name) + self.stubs.Set(nova.db.api, 'instance_add_security_group', + return_instance_add_security_group) + body_dict = self._create_security_group_request_dict(security_groups) + request = self._get_create_request_json(body_dict) + response = request.get_response(fakes.wsgi_app()) + self.assertEquals(response.status_int, 202) + + def test_get_server_by_id_verify_security_groups_json(self): + self.stubs.Set(nova.db.api, 'instance_get', return_server_by_id) + req = webob.Request.blank('/v1.1/123/os-create-server-ext/1') + req.headers['Content-Type'] = 'application/json' + response = req.get_response(fakes.wsgi_app()) + self.assertEquals(response.status_int, 200) + res_dict = json.loads(response.body) + expected_security_group = [{"name": "test"}] + self.assertEquals(res_dict['server']['security_groups'], + expected_security_group) + + def test_get_server_by_id_verify_security_groups_xml(self): + self.stubs.Set(nova.db.api, 'instance_get', return_server_by_id) + req = webob.Request.blank('/v1.1/123/os-create-server-ext/1') + req.headers['Accept'] = 'application/xml' + response = req.get_response(fakes.wsgi_app()) + self.assertEquals(response.status_int, 200) + dom = minidom.parseString(response.body) + server = dom.childNodes[0] + sec_groups = server.getElementsByTagName('security_groups')[0] + sec_group = sec_groups.getElementsByTagName('security_group')[0] + self.assertEqual(INSTANCE['security_groups'][0]['name'], + sec_group.getAttribute("name")) diff --git a/nova/tests/api/openstack/contrib/test_floating_ips.py b/nova/tests/api/openstack/contrib/test_floating_ips.py index 568faf867..642f2b841 100644 --- a/nova/tests/api/openstack/contrib/test_floating_ips.py +++ b/nova/tests/api/openstack/contrib/test_floating_ips.py @@ -20,9 +20,11 @@ import webob from nova import compute from nova import context from nova import db -from nova import test from nova import network +from nova import rpc +from nova import test from nova.tests.api.openstack import fakes +from nova.tests.api.openstack import test_servers from nova.api.openstack.contrib.floating_ips import FloatingIPController @@ -36,14 +38,13 @@ def network_api_get_floating_ip(self, context, id): def network_api_get_floating_ip_by_ip(self, context, address): return {'id': 1, 'address': '10.10.10.10', - 'fixed_ip': {'address': '11.0.0.1'}} + 'fixed_ip': {'address': '10.0.0.1', 'instance_id': 1}}, def network_api_list_floating_ips(self, context): return [{'id': 1, 'address': '10.10.10.10', - 'instance': {'id': 11}, - 'fixed_ip': {'address': '10.0.0.1'}}, + 'fixed_ip': {'address': '10.0.0.1', 'instance_id': 1}}, {'id': 2, 'address': '10.10.10.11'}] @@ -60,10 +61,38 @@ def compute_api_associate(self, context, instance_id, floating_ip): pass +def network_api_associate(self, context, floating_ip, fixed_ip): + pass + + def network_api_disassociate(self, context, floating_address): pass +def network_get_instance_nw_info(self, context, instance): + info = { + 'label': 'fake', + 'gateway': 'fake', + 'dhcp_server': 'fake', + 'broadcast': 'fake', + 'mac': 'fake', + 'vif_uuid': 'fake', + 'rxtx_cap': 'fake', + 'dns': [], + 'ips': [{'ip': '10.0.0.1'}], + 'should_create_bridge': False, + 'should_create_vlan': False} + + return [['ignore', info]] + + +def fake_instance_get(context, instance_id): + return { + "id": 1, + "user_id": 'fakeuser', + "project_id": '123'} + + class FloatingIpTest(test.TestCase): address = "10.10.10.10" @@ -79,23 +108,21 @@ class FloatingIpTest(test.TestCase): def setUp(self): super(FloatingIpTest, self).setUp() - self.controller = FloatingIPController() - fakes.stub_out_networking(self.stubs) - fakes.stub_out_rate_limiting(self.stubs) self.stubs.Set(network.api.API, "get_floating_ip", network_api_get_floating_ip) self.stubs.Set(network.api.API, "get_floating_ip_by_ip", network_api_get_floating_ip) self.stubs.Set(network.api.API, "list_floating_ips", network_api_list_floating_ips) - self.stubs.Set(network.api.API, "allocate_floating_ip", - network_api_allocate) self.stubs.Set(network.api.API, "release_floating_ip", network_api_release) - self.stubs.Set(compute.api.API, "associate_floating_ip", - compute_api_associate) self.stubs.Set(network.api.API, "disassociate_floating_ip", network_api_disassociate) + self.stubs.Set(network.api.API, "get_instance_nw_info", + network_get_instance_nw_info) + self.stubs.Set(db.api, 'instance_get', + fake_instance_get) + self.context = context.get_admin_context() self._create_floating_ip() @@ -124,7 +151,7 @@ class FloatingIpTest(test.TestCase): res = req.get_response(fakes.wsgi_app()) self.assertEqual(res.status_int, 200) res_dict = json.loads(res.body) - response = {'floating_ips': [{'instance_id': 11, + response = {'floating_ips': [{'instance_id': 1, 'ip': '10.10.10.10', 'fixed_ip': '10.0.0.1', 'id': 1}, @@ -143,7 +170,34 @@ class FloatingIpTest(test.TestCase): self.assertEqual(res_dict['floating_ip']['ip'], '10.10.10.10') self.assertEqual(res_dict['floating_ip']['instance_id'], None) + def test_show_associated_floating_ip(self): + def get_floating_ip(self, context, id): + return {'id': 1, 'address': '10.10.10.10', + 'fixed_ip': {'address': '10.0.0.1', 'instance_id': 1}} + self.stubs.Set(network.api.API, "get_floating_ip", get_floating_ip) + + req = webob.Request.blank('/v1.1/123/os-floating-ips/1') + res = req.get_response(fakes.wsgi_app()) + self.assertEqual(res.status_int, 200) + res_dict = json.loads(res.body) + self.assertEqual(res_dict['floating_ip']['id'], 1) + self.assertEqual(res_dict['floating_ip']['ip'], '10.10.10.10') + self.assertEqual(res_dict['floating_ip']['instance_id'], 1) + + def test_floating_ip_allocate_no_free_ips(self): + def fake_call(*args, **kwargs): + raise(rpc.RemoteError('NoMoreFloatingIps', '', '')) + + self.stubs.Set(rpc, "call", fake_call) + req = webob.Request.blank('/v1.1/123/os-floating-ips') + req.method = 'POST' + req.headers['Content-Type'] = 'application/json' + res = req.get_response(fakes.wsgi_app()) + self.assertEqual(res.status_int, 400) + def test_floating_ip_allocate(self): + self.stubs.Set(network.api.API, "allocate_floating_ip", + network_api_allocate) req = webob.Request.blank('/v1.1/123/os-floating-ips') req.method = 'POST' req.headers['Content-Type'] = 'application/json' @@ -165,6 +219,8 @@ class FloatingIpTest(test.TestCase): self.assertEqual(res.status_int, 202) def test_add_floating_ip_to_instance(self): + self.stubs.Set(network.api.API, "associate_floating_ip", + network_api_associate) body = dict(addFloatingIp=dict(address='11.0.0.1')) req = webob.Request.blank('/v1.1/123/servers/test_inst/action') req.method = "POST" @@ -174,6 +230,65 @@ class FloatingIpTest(test.TestCase): resp = req.get_response(fakes.wsgi_app()) self.assertEqual(resp.status_int, 202) + def test_associate_floating_ip_to_instance_wrong_project_id(self): + def fake_fixed_ip_get_by_address(ctx, address, session=None): + return {'address': address, 'network': {'multi_host': None, + 'host': 'fake'}} + self.stubs.Set(db.api, "fixed_ip_get_by_address", + fake_fixed_ip_get_by_address) + db.floating_ip_update(self.context, self.address, {'project_id': 'bad', + 'fixed_ip_id': 1}) + body = dict(addFloatingIp=dict(address=self.address)) + req = webob.Request.blank('/v1.1/123/servers/test_inst/action') + req.method = "POST" + req.body = json.dumps(body) + req.headers["content-type"] = "application/json" + resp = req.get_response(fakes.wsgi_app()) + self.assertEqual(resp.status_int, 400) + + def test_associate_floating_ip_to_instance_no_project_id(self): + def fake_fixed_ip_get_by_address(ctx, address, session=None): + return {'address': address, 'network': {'multi_host': None, + 'host': 'fake'}} + self.stubs.Set(db.api, "fixed_ip_get_by_address", + fake_fixed_ip_get_by_address) + db.floating_ip_update(self.context, self.address, {'project_id': None, + 'fixed_ip_id': 1}) + body = dict(addFloatingIp=dict(address=self.address)) + req = webob.Request.blank('/v1.1/123/servers/test_inst/action') + req.method = "POST" + req.body = json.dumps(body) + req.headers["content-type"] = "application/json" + resp = req.get_response(fakes.wsgi_app()) + self.assertEqual(resp.status_int, 400) + + def test_add_associated_floating_ip_to_instance(self): + def fake_fixed_ip_get_by_address(ctx, address, session=None): + return {'address': address, 'network': {'multi_host': None, + 'host': 'fake'}} + + self.disassociated = False + + def fake_network_api_disassociate(local_self, ctx, floating_address): + self.disassociated = True + + db.floating_ip_update(self.context, self.address, {'project_id': '123', + 'fixed_ip_id': 1}) + self.stubs.Set(network.api.API, "disassociate_floating_ip", + fake_network_api_disassociate) + self.stubs.Set(db.api, "fixed_ip_get_by_address", + fake_fixed_ip_get_by_address) + + body = dict(addFloatingIp=dict(address=self.address)) + req = webob.Request.blank('/v1.1/123/servers/test_inst/action') + req.method = "POST" + req.body = json.dumps(body) + req.headers["content-type"] = "application/json" + + resp = req.get_response(fakes.wsgi_app()) + self.assertEqual(resp.status_int, 202) + self.assertTrue(self.disassociated) + def test_remove_floating_ip_from_instance(self): body = dict(removeFloatingIp=dict(address='11.0.0.1')) req = webob.Request.blank('/v1.1/123/servers/test_inst/action') diff --git a/nova/tests/api/openstack/contrib/test_security_groups.py b/nova/tests/api/openstack/contrib/test_security_groups.py index bc1536911..0816a6312 100644 --- a/nova/tests/api/openstack/contrib/test_security_groups.py +++ b/nova/tests/api/openstack/contrib/test_security_groups.py @@ -360,7 +360,7 @@ class TestSecurityGroups(test.TestCase): def test_associate_by_invalid_server_id(self): body = dict(addSecurityGroup=dict(name='test')) - self.stubs.Set(nova.db, 'security_group_get_by_name', + self.stubs.Set(nova.db.api, 'security_group_get_by_name', return_security_group) req = webob.Request.blank('/v1.1/123/servers/invalid/action') req.headers['Content-Type'] = 'application/json' @@ -372,7 +372,7 @@ class TestSecurityGroups(test.TestCase): def test_associate_without_body(self): req = webob.Request.blank('/v1.1/123/servers/1/action') body = dict(addSecurityGroup=None) - self.stubs.Set(nova.db, 'instance_get', return_server) + self.stubs.Set(nova.db.api, 'instance_get', return_server) req.headers['Content-Type'] = 'application/json' req.method = 'POST' req.body = json.dumps(body) @@ -382,7 +382,7 @@ class TestSecurityGroups(test.TestCase): def test_associate_no_security_group_name(self): req = webob.Request.blank('/v1.1/123/servers/1/action') body = dict(addSecurityGroup=dict()) - self.stubs.Set(nova.db, 'instance_get', return_server) + self.stubs.Set(nova.db.api, 'instance_get', return_server) req.headers['Content-Type'] = 'application/json' req.method = 'POST' req.body = json.dumps(body) @@ -392,7 +392,7 @@ class TestSecurityGroups(test.TestCase): def test_associate_security_group_name_with_whitespaces(self): req = webob.Request.blank('/v1.1/123/servers/1/action') body = dict(addSecurityGroup=dict(name=" ")) - self.stubs.Set(nova.db, 'instance_get', return_server) + self.stubs.Set(nova.db.api, 'instance_get', return_server) req.headers['Content-Type'] = 'application/json' req.method = 'POST' req.body = json.dumps(body) @@ -400,9 +400,9 @@ class TestSecurityGroups(test.TestCase): self.assertEquals(response.status_int, 400) def test_associate_non_existing_instance(self): - self.stubs.Set(nova.db, 'instance_get', return_server_nonexistant) + self.stubs.Set(nova.db.api, 'instance_get', return_server_nonexistant) body = dict(addSecurityGroup=dict(name="test")) - self.stubs.Set(nova.db, 'security_group_get_by_name', + self.stubs.Set(nova.db.api, 'security_group_get_by_name', return_security_group) req = webob.Request.blank('/v1.1/123/servers/10000/action') req.headers['Content-Type'] = 'application/json' @@ -412,8 +412,8 @@ class TestSecurityGroups(test.TestCase): self.assertEquals(response.status_int, 404) def test_associate_non_running_instance(self): - self.stubs.Set(nova.db, 'instance_get', return_non_running_server) - self.stubs.Set(nova.db, 'security_group_get_by_name', + self.stubs.Set(nova.db.api, 'instance_get', return_non_running_server) + self.stubs.Set(nova.db.api, 'security_group_get_by_name', return_security_group_without_instances) body = dict(addSecurityGroup=dict(name="test")) req = webob.Request.blank('/v1.1/123/servers/1/action') @@ -424,8 +424,8 @@ class TestSecurityGroups(test.TestCase): self.assertEquals(response.status_int, 400) def test_associate_already_associated_security_group_to_instance(self): - self.stubs.Set(nova.db, 'instance_get', return_server) - self.stubs.Set(nova.db, 'security_group_get_by_name', + self.stubs.Set(nova.db.api, 'instance_get', return_server) + self.stubs.Set(nova.db.api, 'security_group_get_by_name', return_security_group) body = dict(addSecurityGroup=dict(name="test")) req = webob.Request.blank('/v1.1/123/servers/1/action') @@ -436,12 +436,12 @@ class TestSecurityGroups(test.TestCase): self.assertEquals(response.status_int, 400) def test_associate(self): - self.stubs.Set(nova.db, 'instance_get', return_server) - self.mox.StubOutWithMock(nova.db, 'instance_add_security_group') - nova.db.instance_add_security_group(mox.IgnoreArg(), + self.stubs.Set(nova.db.api, 'instance_get', return_server) + self.mox.StubOutWithMock(nova.db.api, 'instance_add_security_group') + nova.db.api.instance_add_security_group(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg()) - self.stubs.Set(nova.db, 'security_group_get_by_name', + self.stubs.Set(nova.db.api, 'security_group_get_by_name', return_security_group_without_instances) self.mox.ReplayAll() @@ -454,12 +454,12 @@ class TestSecurityGroups(test.TestCase): self.assertEquals(response.status_int, 202) def test_associate_xml(self): - self.stubs.Set(nova.db, 'instance_get', return_server) - self.mox.StubOutWithMock(nova.db, 'instance_add_security_group') - nova.db.instance_add_security_group(mox.IgnoreArg(), + self.stubs.Set(nova.db.api, 'instance_get', return_server) + self.mox.StubOutWithMock(nova.db.api, 'instance_add_security_group') + nova.db.api.instance_add_security_group(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg()) - self.stubs.Set(nova.db, 'security_group_get_by_name', + self.stubs.Set(nova.db.api, 'security_group_get_by_name', return_security_group_without_instances) self.mox.ReplayAll() @@ -483,7 +483,7 @@ class TestSecurityGroups(test.TestCase): def test_disassociate_by_invalid_server_id(self): body = dict(removeSecurityGroup=dict(name='test')) - self.stubs.Set(nova.db, 'security_group_get_by_name', + self.stubs.Set(nova.db.api, 'security_group_get_by_name', return_security_group) req = webob.Request.blank('/v1.1/123/servers/invalid/action') req.headers['Content-Type'] = 'application/json' @@ -495,7 +495,7 @@ class TestSecurityGroups(test.TestCase): def test_disassociate_without_body(self): req = webob.Request.blank('/v1.1/123/servers/1/action') body = dict(removeSecurityGroup=None) - self.stubs.Set(nova.db, 'instance_get', return_server) + self.stubs.Set(nova.db.api, 'instance_get', return_server) req.headers['Content-Type'] = 'application/json' req.method = 'POST' req.body = json.dumps(body) @@ -505,7 +505,7 @@ class TestSecurityGroups(test.TestCase): def test_disassociate_no_security_group_name(self): req = webob.Request.blank('/v1.1/123/servers/1/action') body = dict(removeSecurityGroup=dict()) - self.stubs.Set(nova.db, 'instance_get', return_server) + self.stubs.Set(nova.db.api, 'instance_get', return_server) req.headers['Content-Type'] = 'application/json' req.method = 'POST' req.body = json.dumps(body) @@ -515,7 +515,7 @@ class TestSecurityGroups(test.TestCase): def test_disassociate_security_group_name_with_whitespaces(self): req = webob.Request.blank('/v1.1/123/servers/1/action') body = dict(removeSecurityGroup=dict(name=" ")) - self.stubs.Set(nova.db, 'instance_get', return_server) + self.stubs.Set(nova.db.api, 'instance_get', return_server) req.headers['Content-Type'] = 'application/json' req.method = 'POST' req.body = json.dumps(body) @@ -523,9 +523,9 @@ class TestSecurityGroups(test.TestCase): self.assertEquals(response.status_int, 400) def test_disassociate_non_existing_instance(self): - self.stubs.Set(nova.db, 'instance_get', return_server_nonexistant) + self.stubs.Set(nova.db.api, 'instance_get', return_server_nonexistant) body = dict(removeSecurityGroup=dict(name="test")) - self.stubs.Set(nova.db, 'security_group_get_by_name', + self.stubs.Set(nova.db.api, 'security_group_get_by_name', return_security_group) req = webob.Request.blank('/v1.1/123/servers/10000/action') req.headers['Content-Type'] = 'application/json' @@ -535,8 +535,8 @@ class TestSecurityGroups(test.TestCase): self.assertEquals(response.status_int, 404) def test_disassociate_non_running_instance(self): - self.stubs.Set(nova.db, 'instance_get', return_non_running_server) - self.stubs.Set(nova.db, 'security_group_get_by_name', + self.stubs.Set(nova.db.api, 'instance_get', return_non_running_server) + self.stubs.Set(nova.db.api, 'security_group_get_by_name', return_security_group) body = dict(removeSecurityGroup=dict(name="test")) req = webob.Request.blank('/v1.1/123/servers/1/action') @@ -547,8 +547,8 @@ class TestSecurityGroups(test.TestCase): self.assertEquals(response.status_int, 400) def test_disassociate_already_associated_security_group_to_instance(self): - self.stubs.Set(nova.db, 'instance_get', return_server) - self.stubs.Set(nova.db, 'security_group_get_by_name', + self.stubs.Set(nova.db.api, 'instance_get', return_server) + self.stubs.Set(nova.db.api, 'security_group_get_by_name', return_security_group_without_instances) body = dict(removeSecurityGroup=dict(name="test")) req = webob.Request.blank('/v1.1/123/servers/1/action') @@ -559,12 +559,12 @@ class TestSecurityGroups(test.TestCase): self.assertEquals(response.status_int, 400) def test_disassociate(self): - self.stubs.Set(nova.db, 'instance_get', return_server) - self.mox.StubOutWithMock(nova.db, 'instance_remove_security_group') - nova.db.instance_remove_security_group(mox.IgnoreArg(), + self.stubs.Set(nova.db.api, 'instance_get', return_server) + self.mox.StubOutWithMock(nova.db.api, 'instance_remove_security_group') + nova.db.api.instance_remove_security_group(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg()) - self.stubs.Set(nova.db, 'security_group_get_by_name', + self.stubs.Set(nova.db.api, 'security_group_get_by_name', return_security_group) self.mox.ReplayAll() @@ -577,12 +577,12 @@ class TestSecurityGroups(test.TestCase): self.assertEquals(response.status_int, 202) def test_disassociate_xml(self): - self.stubs.Set(nova.db, 'instance_get', return_server) - self.mox.StubOutWithMock(nova.db, 'instance_remove_security_group') - nova.db.instance_remove_security_group(mox.IgnoreArg(), + self.stubs.Set(nova.db.api, 'instance_get', return_server) + self.mox.StubOutWithMock(nova.db.api, 'instance_remove_security_group') + nova.db.api.instance_remove_security_group(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg()) - self.stubs.Set(nova.db, 'security_group_get_by_name', + self.stubs.Set(nova.db.api, 'security_group_get_by_name', return_security_group) self.mox.ReplayAll() diff --git a/nova/tests/api/openstack/contrib/test_simple_tenant_usage.py b/nova/tests/api/openstack/contrib/test_simple_tenant_usage.py new file mode 100644 index 000000000..2430b9d51 --- /dev/null +++ b/nova/tests/api/openstack/contrib/test_simple_tenant_usage.py @@ -0,0 +1,172 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import json +import webob + +from nova import context +from nova import flags +from nova import test +from nova.compute import api +from nova.tests.api.openstack import fakes + + +FLAGS = flags.FLAGS + +SERVERS = 5 +TENANTS = 2 +HOURS = 24 +LOCAL_GB = 10 +MEMORY_MB = 1024 +VCPUS = 2 +STOP = datetime.datetime.utcnow() +START = STOP - datetime.timedelta(hours=HOURS) + + +def fake_instance_type_get(self, context, instance_type_id): + return {'id': 1, + 'vcpus': VCPUS, + 'local_gb': LOCAL_GB, + 'memory_mb': MEMORY_MB, + 'name': + 'fakeflavor'} + + +def get_fake_db_instance(start, end, instance_id, tenant_id): + return {'id': instance_id, + 'image_ref': '1', + 'project_id': tenant_id, + 'user_id': 'fakeuser', + 'display_name': 'name', + 'state_description': 'state', + 'instance_type_id': 1, + 'launched_at': start, + 'terminated_at': end} + + +def fake_instance_get_active_by_window(self, context, begin, end, project_id): + return [get_fake_db_instance(START, + STOP, + x, + "faketenant_%s" % (x / SERVERS)) + for x in xrange(TENANTS * SERVERS)] + + +class SimpleTenantUsageTest(test.TestCase): + def setUp(self): + super(SimpleTenantUsageTest, self).setUp() + self.stubs.Set(api.API, "get_instance_type", + fake_instance_type_get) + self.stubs.Set(api.API, "get_active_by_window", + fake_instance_get_active_by_window) + self.admin_context = context.RequestContext('fakeadmin_0', + 'faketenant_0', + is_admin=True) + self.user_context = context.RequestContext('fakeadmin_0', + 'faketenant_0', + is_admin=False) + self.alt_user_context = context.RequestContext('fakeadmin_0', + 'faketenant_1', + is_admin=False) + FLAGS.allow_admin_api = True + + def test_verify_index(self): + req = webob.Request.blank( + '/v1.1/123/os-simple-tenant-usage?start=%s&end=%s' % + (START.isoformat(), STOP.isoformat())) + req.method = "GET" + req.headers["content-type"] = "application/json" + + res = req.get_response(fakes.wsgi_app( + fake_auth_context=self.admin_context)) + + self.assertEqual(res.status_int, 200) + res_dict = json.loads(res.body) + usages = res_dict['tenant_usages'] + from nova import log as logging + logging.warn(usages) + for i in xrange(TENANTS): + self.assertEqual(int(usages[i]['total_hours']), + SERVERS * HOURS) + self.assertEqual(int(usages[i]['total_local_gb_usage']), + SERVERS * LOCAL_GB * HOURS) + self.assertEqual(int(usages[i]['total_memory_mb_usage']), + SERVERS * MEMORY_MB * HOURS) + self.assertEqual(int(usages[i]['total_vcpus_usage']), + SERVERS * VCPUS * HOURS) + self.assertFalse(usages[i].get('server_usages')) + + def test_verify_detailed_index(self): + req = webob.Request.blank( + '/v1.1/123/os-simple-tenant-usage?' + 'detailed=1&start=%s&end=%s' % + (START.isoformat(), STOP.isoformat())) + req.method = "GET" + req.headers["content-type"] = "application/json" + + res = req.get_response(fakes.wsgi_app( + fake_auth_context=self.admin_context)) + self.assertEqual(res.status_int, 200) + res_dict = json.loads(res.body) + usages = res_dict['tenant_usages'] + for i in xrange(TENANTS): + servers = usages[i]['server_usages'] + for j in xrange(SERVERS): + self.assertEqual(int(servers[j]['hours']), HOURS) + + def test_verify_index_fails_for_nonadmin(self): + req = webob.Request.blank( + '/v1.1/123/os-simple-tenant-usage?' + 'detailed=1&start=%s&end=%s' % + (START.isoformat(), STOP.isoformat())) + req.method = "GET" + req.headers["content-type"] = "application/json" + + res = req.get_response(fakes.wsgi_app()) + self.assertEqual(res.status_int, 403) + + def test_verify_show(self): + req = webob.Request.blank( + '/v1.1/faketenant_0/os-simple-tenant-usage/' + 'faketenant_0?start=%s&end=%s' % + (START.isoformat(), STOP.isoformat())) + req.method = "GET" + req.headers["content-type"] = "application/json" + + res = req.get_response(fakes.wsgi_app( + fake_auth_context=self.user_context)) + self.assertEqual(res.status_int, 200) + res_dict = json.loads(res.body) + + usage = res_dict['tenant_usage'] + servers = usage['server_usages'] + self.assertEqual(len(usage['server_usages']), SERVERS) + for j in xrange(SERVERS): + self.assertEqual(int(servers[j]['hours']), HOURS) + + def test_verify_show_cant_view_other_tenant(self): + req = webob.Request.blank( + '/v1.1/faketenant_1/os-simple-tenant-usage/' + 'faketenant_0?start=%s&end=%s' % + (START.isoformat(), STOP.isoformat())) + req.method = "GET" + req.headers["content-type"] = "application/json" + + res = req.get_response(fakes.wsgi_app( + fake_auth_context=self.alt_user_context)) + self.assertEqual(res.status_int, 403) diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py index a095dd90a..44681d395 100644 --- a/nova/tests/api/openstack/fakes.py +++ b/nova/tests/api/openstack/fakes.py @@ -107,13 +107,20 @@ def stub_out_key_pair_funcs(stubs, have_key_pair=True): def key_pair(context, user_id): return [dict(name='key', public_key='public_key')] + def one_key_pair(context, user_id, name): + if name == 'key': + return dict(name='key', public_key='public_key') + else: + raise exc.KeypairNotFound(user_id=user_id, name=name) + def no_key_pair(context, user_id): return [] if have_key_pair: - stubs.Set(nova.db, 'key_pair_get_all_by_user', key_pair) + stubs.Set(nova.db.api, 'key_pair_get_all_by_user', key_pair) + stubs.Set(nova.db.api, 'key_pair_get', one_key_pair) else: - stubs.Set(nova.db, 'key_pair_get_all_by_user', no_key_pair) + stubs.Set(nova.db.api, 'key_pair_get_all_by_user', no_key_pair) def stub_out_image_service(stubs): diff --git a/nova/tests/api/openstack/test_extensions.py b/nova/tests/api/openstack/test_extensions.py index 05267d8fb..31443242b 100644 --- a/nova/tests/api/openstack/test_extensions.py +++ b/nova/tests/api/openstack/test_extensions.py @@ -95,6 +95,7 @@ class ExtensionControllerTest(test.TestCase): "Quotas", "Rescue", "SecurityGroups", + "SimpleTenantUsage", "VSAs", "VirtualInterfaces", "Volumes", diff --git a/nova/tests/api/openstack/test_server_actions.py b/nova/tests/api/openstack/test_server_actions.py index 3dfdeb79c..b9ef41465 100644 --- a/nova/tests/api/openstack/test_server_actions.py +++ b/nova/tests/api/openstack/test_server_actions.py @@ -10,8 +10,8 @@ from nova import utils from nova import exception from nova import flags from nova.api.openstack import create_instance_helper +from nova.compute import vm_states from nova.compute import instance_types -from nova.compute import power_state import nova.db.api from nova import test from nova.tests.api.openstack import common @@ -35,17 +35,19 @@ def return_server_with_attributes(**kwargs): return _return_server -def return_server_with_power_state(power_state): - return return_server_with_attributes(power_state=power_state) +def return_server_with_state(vm_state, task_state=None): + return return_server_with_attributes(vm_state=vm_state, + task_state=task_state) -def return_server_with_uuid_and_power_state(power_state): - return return_server_with_power_state(power_state) - +def return_server_with_uuid_and_state(vm_state, task_state=None): + def _return_server(context, id): + return return_server_with_state(vm_state, task_state) + return _return_server -def stub_instance(id, power_state=0, metadata=None, - image_ref="10", flavor_id="1", name=None): +def stub_instance(id, metadata=None, image_ref="10", flavor_id="1", + name=None, vm_state=None, task_state=None): if metadata is not None: metadata_items = [{'key':k, 'value':v} for k, v in metadata.items()] else: @@ -66,8 +68,8 @@ def stub_instance(id, power_state=0, metadata=None, "launch_index": 0, "key_name": "", "key_data": "", - "state": power_state, - "state_description": "", + "vm_state": vm_state or vm_states.ACTIVE, + "task_state": task_state, "memory_mb": 0, "vcpus": 0, "local_gb": 0, @@ -175,11 +177,11 @@ class ServerActionsTest(test.TestCase): }, } - state = power_state.BUILDING - new_return_server = return_server_with_power_state(state) + state = vm_states.BUILDING + new_return_server = return_server_with_state(state) self.stubs.Set(nova.db.api, 'instance_get', new_return_server) self.stubs.Set(nova.db, 'instance_get_by_uuid', - return_server_with_uuid_and_power_state(state)) + return_server_with_uuid_and_state(state)) req = webob.Request.blank('/v1.0/servers/1/action') req.method = 'POST' @@ -242,19 +244,6 @@ class ServerActionsTest(test.TestCase): res = req.get_response(fakes.wsgi_app()) self.assertEqual(res.status_int, 500) - def test_resized_server_has_correct_status(self): - req = self.webreq('/1', 'GET') - - def fake_migration_get(*args): - return {} - - self.stubs.Set(nova.db, 'migration_get_by_instance_and_status', - fake_migration_get) - res = req.get_response(fakes.wsgi_app()) - self.assertEqual(res.status_int, 200) - body = json.loads(res.body) - self.assertEqual(body['server']['status'], 'RESIZE-CONFIRM') - def test_confirm_resize_server(self): req = self.webreq('/1/action', 'POST', dict(confirmResize=None)) @@ -642,11 +631,11 @@ class ServerActionsTestV11(test.TestCase): }, } - state = power_state.BUILDING - new_return_server = return_server_with_power_state(state) + state = vm_states.BUILDING + new_return_server = return_server_with_state(state) self.stubs.Set(nova.db.api, 'instance_get', new_return_server) self.stubs.Set(nova.db, 'instance_get_by_uuid', - return_server_with_uuid_and_power_state(state)) + return_server_with_uuid_and_state(state)) req = webob.Request.blank('/v1.1/fake/servers/1/action') req.method = 'POST' diff --git a/nova/tests/api/openstack/test_servers.py b/nova/tests/api/openstack/test_servers.py index 3559e6de5..2ef687709 100644 --- a/nova/tests/api/openstack/test_servers.py +++ b/nova/tests/api/openstack/test_servers.py @@ -37,7 +37,8 @@ from nova.api.openstack import wsgi from nova.api.openstack import xmlutil import nova.compute.api from nova.compute import instance_types -from nova.compute import power_state +from nova.compute import task_states +from nova.compute import vm_states import nova.db.api import nova.scheduler.api from nova.db.sqlalchemy.models import Instance @@ -91,15 +92,18 @@ def return_server_with_addresses(private, public): return _return_server -def return_server_with_power_state(power_state): +def return_server_with_state(vm_state, task_state=None): def _return_server(context, id): - return stub_instance(id, power_state=power_state) + return stub_instance(id, vm_state=vm_state, task_state=task_state) return _return_server -def return_server_with_uuid_and_power_state(power_state): +def return_server_with_uuid_and_state(vm_state, task_state): def _return_server(context, id): - return stub_instance(id, uuid=FAKE_UUID, power_state=power_state) + return stub_instance(id, + uuid=FAKE_UUID, + vm_state=vm_state, + task_state=task_state) return _return_server @@ -148,9 +152,10 @@ def instance_addresses(context, instance_id): def stub_instance(id, user_id='fake', project_id='fake', private_address=None, - public_addresses=None, host=None, power_state=0, + public_addresses=None, host=None, + vm_state=None, task_state=None, reservation_id="", uuid=FAKE_UUID, image_ref="10", - flavor_id="1", interfaces=None, name=None, + flavor_id="1", interfaces=None, name=None, key_name='', access_ipv4=None, access_ipv6=None): metadata = [] metadata.append(InstanceMetadata(key='seq', value=id)) @@ -166,6 +171,11 @@ def stub_instance(id, user_id='fake', project_id='fake', private_address=None, if host is not None: host = str(host) + if key_name: + key_data = 'FAKE' + else: + key_data = '' + # ReservationID isn't sent back, hack it in there. server_name = name or "server%s" % id if reservation_id != "": @@ -182,10 +192,10 @@ def stub_instance(id, user_id='fake', project_id='fake', private_address=None, "kernel_id": "", "ramdisk_id": "", "launch_index": 0, - "key_name": "", - "key_data": "", - "state": power_state, - "state_description": "", + "key_name": key_name, + "key_data": key_data, + "vm_state": vm_state or vm_states.BUILDING, + "task_state": task_state, "memory_mb": 0, "vcpus": 0, "local_gb": 0, @@ -337,6 +347,8 @@ class ServersTest(test.TestCase): "server": { "id": 1, "uuid": FAKE_UUID, + "user_id": "fake", + "tenant_id": "fake", "updated": "2010-11-11T11:00:00Z", "created": "2010-10-10T12:00:00Z", "progress": 0, @@ -345,6 +357,7 @@ class ServersTest(test.TestCase): "accessIPv4": "", "accessIPv6": "", "hostId": '', + "key_name": '', "image": { "id": "10", "links": [ @@ -435,6 +448,8 @@ class ServersTest(test.TestCase): expected = minidom.parseString(""" <server id="1" uuid="%(expected_uuid)s" + userId="fake" + tenantId="fake" xmlns="http://docs.openstack.org/compute/api/v1.1" xmlns:atom="http://www.w3.org/2005/Atom" name="server1" @@ -494,7 +509,7 @@ class ServersTest(test.TestCase): }, ] new_return_server = return_server_with_attributes( - interfaces=interfaces, power_state=1) + interfaces=interfaces, vm_state=vm_states.ACTIVE) self.stubs.Set(nova.db.api, 'instance_get', new_return_server) req = webob.Request.blank('/v1.1/fake/servers/1') @@ -504,6 +519,8 @@ class ServersTest(test.TestCase): "server": { "id": 1, "uuid": FAKE_UUID, + "user_id": "fake", + "tenant_id": "fake", "updated": "2010-11-11T11:00:00Z", "created": "2010-10-10T12:00:00Z", "progress": 100, @@ -512,6 +529,7 @@ class ServersTest(test.TestCase): "accessIPv4": "", "accessIPv6": "", "hostId": '', + "key_name": '', "image": { "id": "10", "links": [ @@ -587,8 +605,8 @@ class ServersTest(test.TestCase): }, ] new_return_server = return_server_with_attributes( - interfaces=interfaces, power_state=1, image_ref=image_ref, - flavor_id=flavor_id) + interfaces=interfaces, vm_state=vm_states.ACTIVE, + image_ref=image_ref, flavor_id=flavor_id) self.stubs.Set(nova.db.api, 'instance_get', new_return_server) req = webob.Request.blank('/v1.1/fake/servers/1') @@ -598,6 +616,8 @@ class ServersTest(test.TestCase): "server": { "id": 1, "uuid": FAKE_UUID, + "user_id": "fake", + "tenant_id": "fake", "updated": "2010-11-11T11:00:00Z", "created": "2010-10-10T12:00:00Z", "progress": 100, @@ -606,6 +626,7 @@ class ServersTest(test.TestCase): "accessIPv4": "", "accessIPv6": "", "hostId": '', + "key_name": '', "image": { "id": "10", "links": [ @@ -1186,6 +1207,26 @@ class ServersTest(test.TestCase): self.assertEqual(len(servers), 1) self.assertEqual(servers[0]['id'], 100) + def test_tenant_id_filter_converts_to_project_id_for_admin(self): + def fake_get_all(context, filters=None): + self.assertNotEqual(filters, None) + self.assertEqual(filters['project_id'], 'faketenant') + self.assertFalse(filters.get('tenant_id')) + return [stub_instance(100)] + + self.stubs.Set(nova.db.api, 'instance_get_all_by_filters', + fake_get_all) + self.flags(allow_admin_api=True) + + req = webob.Request.blank('/v1.1/fake/servers?tenant_id=faketenant') + # Use admin context + context = nova.context.RequestContext('testuser', 'testproject', + is_admin=True) + res = req.get_response(fakes.wsgi_app(fake_auth_context=context)) + res_dict = json.loads(res.body) + # Failure in fake_get_all returns non 200 status code + self.assertEqual(res.status_int, 200) + def test_get_servers_allows_flavor_v1_1(self): def fake_get_all(compute_self, context, search_opts=None): self.assertNotEqual(search_opts, None) @@ -1209,9 +1250,8 @@ class ServersTest(test.TestCase): def test_get_servers_allows_status_v1_1(self): def fake_get_all(compute_self, context, search_opts=None): self.assertNotEqual(search_opts, None) - self.assertTrue('state' in search_opts) - self.assertEqual(set(search_opts['state']), - set([power_state.RUNNING, power_state.BLOCKED])) + self.assertTrue('vm_state' in search_opts) + self.assertEqual(search_opts['vm_state'], vm_states.ACTIVE) return [stub_instance(100)] self.stubs.Set(nova.compute.API, 'get_all', fake_get_all) @@ -1228,13 +1268,9 @@ class ServersTest(test.TestCase): def test_get_servers_invalid_status_v1_1(self): """Test getting servers by invalid status""" - self.flags(allow_admin_api=False) - req = webob.Request.blank('/v1.1/fake/servers?status=running') res = req.get_response(fakes.wsgi_app()) - # The following assert will fail if either of the asserts in - # fake_get_all() fail self.assertEqual(res.status_int, 400) self.assertTrue(res.body.find('Invalid server status') > -1) @@ -1257,6 +1293,31 @@ class ServersTest(test.TestCase): self.assertEqual(len(servers), 1) self.assertEqual(servers[0]['id'], 100) + def test_get_servers_allows_changes_since_v1_1(self): + def fake_get_all(compute_self, context, search_opts=None): + self.assertNotEqual(search_opts, None) + self.assertTrue('changes-since' in search_opts) + changes_since = datetime.datetime(2011, 1, 24, 17, 8, 1) + self.assertEqual(search_opts['changes-since'], changes_since) + self.assertTrue('deleted' not in search_opts) + return [stub_instance(100)] + + self.stubs.Set(nova.compute.API, 'get_all', fake_get_all) + + params = 'changes-since=2011-01-24T17:08:01Z' + req = webob.Request.blank('/v1.1/fake/servers?%s' % params) + res = req.get_response(fakes.wsgi_app()) + self.assertEqual(res.status_int, 200) + servers = json.loads(res.body)['servers'] + self.assertEqual(len(servers), 1) + self.assertEqual(servers[0]['id'], 100) + + def test_get_servers_allows_changes_since_bad_value_v1_1(self): + params = 'changes-since=asdf' + req = webob.Request.blank('/v1.1/fake/servers?%s' % params) + res = req.get_response(fakes.wsgi_app()) + self.assertEqual(res.status_int, 400) + def test_get_servers_unknown_or_admin_options1(self): """Test getting servers by admin-only or unknown options. This tests when admin_api is off. Make sure the admin and @@ -1422,6 +1483,8 @@ class ServersTest(test.TestCase): 'access_ip_v4': '1.2.3.4', 'access_ip_v6': 'fead::1234', 'image_ref': image_ref, + 'user_id': 'fake', + 'project_id': 'fake', "created_at": datetime.datetime(2010, 10, 10, 12, 0, 0), "updated_at": datetime.datetime(2010, 11, 11, 11, 0, 0), "config_drive": self.config_drive, @@ -1738,6 +1801,7 @@ class ServersTest(test.TestCase): server = json.loads(res.body)['server'] self.assertEqual(16, len(server['adminPass'])) self.assertEqual(1, server['id']) + self.assertEqual("BUILD", server["status"]) self.assertEqual(0, server['progress']) self.assertEqual('server_test', server['name']) self.assertEqual(expected_flavor, server['flavor']) @@ -1745,6 +1809,36 @@ class ServersTest(test.TestCase): self.assertEqual('1.2.3.4', server['accessIPv4']) self.assertEqual('fead::1234', server['accessIPv6']) + def test_create_instance_v1_1_invalid_key_name(self): + self._setup_for_create_instance() + + image_href = 'http://localhost/v1.1/images/2' + flavor_ref = 'http://localhost/flavors/3' + body = dict(server=dict( + name='server_test', imageRef=image_href, flavorRef=flavor_ref, + key_name='nonexistentkey')) + req = webob.Request.blank('/v1.1/fake/servers') + req.method = 'POST' + req.body = json.dumps(body) + req.headers["content-type"] = "application/json" + res = req.get_response(fakes.wsgi_app()) + self.assertEqual(res.status_int, 400) + + def test_create_instance_v1_1_valid_key_name(self): + self._setup_for_create_instance() + + image_href = 'http://localhost/v1.1/images/2' + flavor_ref = 'http://localhost/flavors/3' + body = dict(server=dict( + name='server_test', imageRef=image_href, flavorRef=flavor_ref, + key_name='key')) + req = webob.Request.blank('/v1.1/fake/servers') + req.method = 'POST' + req.body = json.dumps(body) + req.headers["content-type"] = "application/json" + res = req.get_response(fakes.wsgi_app()) + self.assertEqual(res.status_int, 202) + def test_create_instance_v1_1_invalid_flavor_href(self): self._setup_for_create_instance() @@ -2467,23 +2561,51 @@ class ServersTest(test.TestCase): self.assertEqual(res.status_int, 204) self.assertEqual(self.server_delete_called, True) - def test_shutdown_status(self): - new_server = return_server_with_power_state(power_state.SHUTDOWN) - self.stubs.Set(nova.db.api, 'instance_get', new_server) - req = webob.Request.blank('/v1.0/servers/1') - res = req.get_response(fakes.wsgi_app()) - self.assertEqual(res.status_int, 200) - res_dict = json.loads(res.body) - self.assertEqual(res_dict['server']['status'], 'SHUTDOWN') - def test_shutoff_status(self): - new_server = return_server_with_power_state(power_state.SHUTOFF) +class TestServerStatus(test.TestCase): + + def _get_with_state(self, vm_state, task_state=None): + new_server = return_server_with_state(vm_state, task_state) self.stubs.Set(nova.db.api, 'instance_get', new_server) - req = webob.Request.blank('/v1.0/servers/1') - res = req.get_response(fakes.wsgi_app()) - self.assertEqual(res.status_int, 200) - res_dict = json.loads(res.body) - self.assertEqual(res_dict['server']['status'], 'SHUTOFF') + request = webob.Request.blank('/v1.0/servers/1') + response = request.get_response(fakes.wsgi_app()) + self.assertEqual(response.status_int, 200) + return json.loads(response.body) + + def test_active(self): + response = self._get_with_state(vm_states.ACTIVE) + self.assertEqual(response['server']['status'], 'ACTIVE') + + def test_reboot(self): + response = self._get_with_state(vm_states.ACTIVE, + task_states.REBOOTING) + self.assertEqual(response['server']['status'], 'REBOOT') + + def test_rebuild(self): + response = self._get_with_state(vm_states.REBUILDING) + self.assertEqual(response['server']['status'], 'REBUILD') + + def test_rebuild_error(self): + response = self._get_with_state(vm_states.ERROR) + self.assertEqual(response['server']['status'], 'ERROR') + + def test_resize(self): + response = self._get_with_state(vm_states.RESIZING) + self.assertEqual(response['server']['status'], 'RESIZE') + + def test_verify_resize(self): + response = self._get_with_state(vm_states.ACTIVE, + task_states.RESIZE_VERIFY) + self.assertEqual(response['server']['status'], 'VERIFY_RESIZE') + + def test_password_update(self): + response = self._get_with_state(vm_states.ACTIVE, + task_states.UPDATING_PASSWORD) + self.assertEqual(response['server']['status'], 'PASSWORD') + + def test_stopped(self): + response = self._get_with_state(vm_states.STOPPED) + self.assertEqual(response['server']['status'], 'STOPPED') class TestServerCreateRequestXMLDeserializerV10(unittest.TestCase): @@ -3011,7 +3133,7 @@ class TestServerCreateRequestXMLDeserializerV11(test.TestCase): "name": "new-server-test", "imageRef": "1", "flavorRef": "1", - "networks": [] + "networks": [], }} self.assertEquals(request['body'], expected) @@ -3229,6 +3351,7 @@ class TestServerInstanceCreation(test.TestCase): def __init__(self): self.injected_files = None self.networks = None + self.db = db def create(self, *args, **kwargs): if 'injected_files' in kwargs: @@ -3237,6 +3360,8 @@ class TestServerInstanceCreation(test.TestCase): self.injected_files = None return [{'id': '1234', 'display_name': 'fakeinstance', + 'user_id': 'fake', + 'project_id': 'fake', 'uuid': FAKE_UUID}] def set_admin_password(self, *args, **kwargs): @@ -3490,10 +3615,14 @@ class TestGetKernelRamdiskFromImage(test.TestCase): self.assertRaises(exception.NotFound, self._get_k_r, image_meta) def test_ami_no_ramdisk(self): - """If an ami is missing a ramdisk it should raise NotFound""" + """If an ami is missing a ramdisk, return kernel ID and None for + ramdisk ID + """ image_meta = {'id': 1, 'status': 'active', 'container_format': 'ami', 'properties': {'kernel_id': 1}} - self.assertRaises(exception.NotFound, self._get_k_r, image_meta) + kernel_id, ramdisk_id = self._get_k_r(image_meta) + self.assertEqual(kernel_id, 1) + self.assertEqual(ramdisk_id, None) def test_ami_kernel_ramdisk_present(self): """Return IDs if both kernel and ramdisk are present""" @@ -3528,16 +3657,16 @@ class ServersViewBuilderV11Test(test.TestCase): "created_at": created_at, "updated_at": updated_at, "admin_pass": "", - "user_id": "", - "project_id": "", + "user_id": "fake", + "project_id": "fake", "image_ref": "5", "kernel_id": "", "ramdisk_id": "", "launch_index": 0, "key_name": "", "key_data": "", - "state": 0, - "state_description": "", + "vm_state": vm_states.BUILDING, + "task_state": None, "memory_mb": 0, "vcpus": 0, "local_gb": 0, @@ -3554,7 +3683,6 @@ class ServersViewBuilderV11Test(test.TestCase): "terminated_at": utils.utcnow(), "availability_zone": "", "display_name": "test_server", - "display_description": "", "locked": False, "metadata": [], "accessIPv4": "1.2.3.4", @@ -3587,6 +3715,7 @@ class ServersViewBuilderV11Test(test.TestCase): "id": 1, "uuid": self.instance['uuid'], "name": "test_server", + "key_name": '', "links": [ { "rel": "self", @@ -3610,6 +3739,7 @@ class ServersViewBuilderV11Test(test.TestCase): "id": 1, "uuid": self.instance['uuid'], "name": "test_server", + "key_name": '', "config_drive": None, "links": [ { @@ -3635,6 +3765,8 @@ class ServersViewBuilderV11Test(test.TestCase): "server": { "id": 1, "uuid": self.instance['uuid'], + "user_id": "fake", + "tenant_id": "fake", "updated": "2010-11-11T11:00:00Z", "created": "2010-10-10T12:00:00Z", "progress": 0, @@ -3643,6 +3775,7 @@ class ServersViewBuilderV11Test(test.TestCase): "accessIPv4": "", "accessIPv6": "", "hostId": '', + "key_name": '', "image": { "id": "5", "links": [ @@ -3682,13 +3815,15 @@ class ServersViewBuilderV11Test(test.TestCase): def test_build_server_detail_active_status(self): #set the power state of the instance to running - self.instance['state'] = 1 + self.instance['vm_state'] = vm_states.ACTIVE image_bookmark = "http://localhost/images/5" flavor_bookmark = "http://localhost/flavors/1" expected_server = { "server": { "id": 1, "uuid": self.instance['uuid'], + "user_id": "fake", + "tenant_id": "fake", "updated": "2010-11-11T11:00:00Z", "created": "2010-10-10T12:00:00Z", "progress": 100, @@ -3697,6 +3832,7 @@ class ServersViewBuilderV11Test(test.TestCase): "accessIPv4": "", "accessIPv6": "", "hostId": '', + "key_name": '', "image": { "id": "5", "links": [ @@ -3744,10 +3880,13 @@ class ServersViewBuilderV11Test(test.TestCase): "server": { "id": 1, "uuid": self.instance['uuid'], + "user_id": "fake", + "tenant_id": "fake", "updated": "2010-11-11T11:00:00Z", "created": "2010-10-10T12:00:00Z", "progress": 0, "name": "test_server", + "key_name": "", "status": "BUILD", "hostId": '', "image": { @@ -3799,10 +3938,13 @@ class ServersViewBuilderV11Test(test.TestCase): "server": { "id": 1, "uuid": self.instance['uuid'], + "user_id": "fake", + "tenant_id": "fake", "updated": "2010-11-11T11:00:00Z", "created": "2010-10-10T12:00:00Z", "progress": 0, "name": "test_server", + "key_name": "", "status": "BUILD", "hostId": '', "image": { @@ -3857,6 +3999,8 @@ class ServersViewBuilderV11Test(test.TestCase): "server": { "id": 1, "uuid": self.instance['uuid'], + "user_id": "fake", + "tenant_id": "fake", "updated": "2010-11-11T11:00:00Z", "created": "2010-10-10T12:00:00Z", "progress": 0, @@ -3865,6 +4009,7 @@ class ServersViewBuilderV11Test(test.TestCase): "accessIPv4": "", "accessIPv6": "", "hostId": '', + "key_name": '', "image": { "id": "5", "links": [ @@ -3924,6 +4069,8 @@ class ServerXMLSerializationTest(test.TestCase): fixture = { "server": { "id": 1, + "user_id": "fake", + "tenant_id": "fake", "uuid": FAKE_UUID, 'created': self.TIMESTAMP, 'updated': self.TIMESTAMP, @@ -3931,6 +4078,7 @@ class ServerXMLSerializationTest(test.TestCase): "name": "test_server", "status": "BUILD", "hostId": 'e4d909c290d0fb1ca068ffaddf22cbd0', + "key_name": '', "accessIPv4": "1.2.3.4", "accessIPv6": "fead::1234", "image": { @@ -4060,6 +4208,8 @@ class ServerXMLSerializationTest(test.TestCase): "server": { "id": 1, "uuid": FAKE_UUID, + "user_id": "fake", + "tenant_id": "fake", 'created': self.TIMESTAMP, 'updated': self.TIMESTAMP, "progress": 0, @@ -4260,6 +4410,8 @@ class ServerXMLSerializationTest(test.TestCase): { "id": 1, "uuid": FAKE_UUID, + "user_id": "fake", + "tenant_id": "fake", 'created': self.TIMESTAMP, 'updated': self.TIMESTAMP, "progress": 0, @@ -4315,6 +4467,8 @@ class ServerXMLSerializationTest(test.TestCase): { "id": 2, "uuid": FAKE_UUID, + "user_id": 'fake', + "tenant_id": 'fake', 'created': self.TIMESTAMP, 'updated': self.TIMESTAMP, "progress": 100, @@ -4434,6 +4588,8 @@ class ServerXMLSerializationTest(test.TestCase): fixture = { "server": { "id": 1, + "user_id": "fake", + "tenant_id": "fake", "uuid": FAKE_UUID, 'created': self.TIMESTAMP, 'updated': self.TIMESTAMP, @@ -4570,6 +4726,8 @@ class ServerXMLSerializationTest(test.TestCase): "server": { "id": 1, "uuid": FAKE_UUID, + "user_id": "fake", + "tenant_id": "fake", 'created': self.TIMESTAMP, 'updated': self.TIMESTAMP, "progress": 0, diff --git a/nova/tests/image/test_glance.py b/nova/tests/image/test_glance.py index 0ff508ffa..b1ebd8436 100644 --- a/nova/tests/image/test_glance.py +++ b/nova/tests/image/test_glance.py @@ -20,6 +20,7 @@ import datetime import unittest from nova import context +from nova import exception from nova import test from nova.image import glance @@ -38,7 +39,16 @@ class StubGlanceClient(object): return self.images[image_id] def get_images_detailed(self, filters=None, marker=None, limit=None): - return self.images.itervalues() + images = self.images.values() + if marker is None: + index = 0 + else: + for index, image in enumerate(images): + if image['id'] == marker: + index += 1 + break + # default to a page size of 3 to ensure we flex the pagination code + return images[index:index + 3] def get_image(self, image_id): return self.images[image_id], [] @@ -86,23 +96,48 @@ class TestGlanceImageServiceProperties(BaseGlanceTest): """Ensure attributes which aren't BASE_IMAGE_ATTRS are stored in the properties dict """ - fixtures = {'image1': {'name': 'image1', 'is_public': True, + fixtures = {'image1': {'id': '1', 'name': 'image1', 'is_public': True, 'foo': 'bar', 'properties': {'prop1': 'propvalue1'}}} self.client.images = fixtures image_meta = self.service.show(self.context, 'image1') + expected = {'id': '1', 'name': 'image1', 'is_public': True, + 'properties': {'prop1': 'propvalue1', 'foo': 'bar'}} + self.assertEqual(image_meta, expected) + + def test_show_raises_when_no_authtoken_in_the_context(self): + fixtures = {'image1': {'name': 'image1', 'is_public': False, + 'foo': 'bar', + 'properties': {'prop1': 'propvalue1'}}} + self.client.images = fixtures + self.context.auth_token = False + expected = {'name': 'image1', 'is_public': True, 'properties': {'prop1': 'propvalue1', 'foo': 'bar'}} + self.assertRaises(exception.ImageNotFound, + self.service.show, self.context, 'image1') + + def test_show_passes_through_to_client_with_authtoken_in_context(self): + fixtures = {'image1': {'name': 'image1', 'is_public': False, + 'foo': 'bar', + 'properties': {'prop1': 'propvalue1'}}} + self.client.images = fixtures + self.context.auth_token = True + + expected = {'name': 'image1', 'is_public': False, + 'properties': {'prop1': 'propvalue1', 'foo': 'bar'}} + + image_meta = self.service.show(self.context, 'image1') self.assertEqual(image_meta, expected) def test_detail_passes_through_to_client(self): - fixtures = {'image1': {'name': 'image1', 'is_public': True, + fixtures = {'image1': {'id': '1', 'name': 'image1', 'is_public': True, 'foo': 'bar', 'properties': {'prop1': 'propvalue1'}}} self.client.images = fixtures image_meta = self.service.detail(self.context) - expected = [{'name': 'image1', 'is_public': True, + expected = [{'id': '1', 'name': 'image1', 'is_public': True, 'properties': {'prop1': 'propvalue1', 'foo': 'bar'}}] self.assertEqual(image_meta, expected) @@ -166,6 +201,7 @@ class TestGetterDateTimeNoneTests(BaseGlanceTest): def _make_datetime_fixtures(self): fixtures = { 'image1': { + 'id': '1', 'name': 'image1', 'is_public': True, 'created_at': self.NOW_GLANCE_FORMAT, @@ -173,6 +209,7 @@ class TestGetterDateTimeNoneTests(BaseGlanceTest): 'deleted_at': self.NOW_GLANCE_FORMAT, }, 'image2': { + 'id': '2', 'name': 'image2', 'is_public': True, 'created_at': self.NOW_GLANCE_OLD_FORMAT, @@ -183,13 +220,17 @@ class TestGetterDateTimeNoneTests(BaseGlanceTest): return fixtures def _make_none_datetime_fixtures(self): - fixtures = {'image1': {'name': 'image1', 'is_public': True, + fixtures = {'image1': {'id': '1', + 'name': 'image1', + 'is_public': True, 'updated_at': None, 'deleted_at': None}} return fixtures def _make_blank_datetime_fixtures(self): - fixtures = {'image1': {'name': 'image1', 'is_public': True, + fixtures = {'image1': {'id': '1', + 'name': 'image1', + 'is_public': True, 'updated_at': '', 'deleted_at': ''}} return fixtures diff --git a/nova/tests/integrated/test_servers.py b/nova/tests/integrated/test_servers.py index b9382038a..2cf604d06 100644 --- a/nova/tests/integrated/test_servers.py +++ b/nova/tests/integrated/test_servers.py @@ -28,6 +28,17 @@ LOG = logging.getLogger('nova.tests.integrated') class ServersTest(integrated_helpers._IntegratedTestBase): + def _wait_for_creation(self, server): + retries = 0 + while server['status'] == 'BUILD': + time.sleep(1) + server = self.api.get_server(server['id']) + print server + retries = retries + 1 + if retries > 5: + break + return server + def test_get_servers(self): """Simple check that listing servers works.""" servers = self.api.get_servers() @@ -36,9 +47,9 @@ class ServersTest(integrated_helpers._IntegratedTestBase): def test_create_and_delete_server(self): """Creates and deletes a server.""" + self.flags(stub_network=True) # Create server - # Build the server data gradually, checking errors along the way server = {} good_server = self._build_minimal_create_server_request() @@ -91,19 +102,11 @@ class ServersTest(integrated_helpers._IntegratedTestBase): server_ids = [server['id'] for server in servers] self.assertTrue(created_server_id in server_ids) - # Wait (briefly) for creation - retries = 0 - while found_server['status'] == 'build': - LOG.debug("found server: %s" % found_server) - time.sleep(1) - found_server = self.api.get_server(created_server_id) - retries = retries + 1 - if retries > 5: - break + found_server = self._wait_for_creation(found_server) # It should be available... # TODO(justinsb): Mock doesn't yet do this... - #self.assertEqual('available', found_server['status']) + self.assertEqual('ACTIVE', found_server['status']) servers = self.api.get_servers(detail=True) for server in servers: self.assertTrue("image" in server) @@ -181,6 +184,7 @@ class ServersTest(integrated_helpers._IntegratedTestBase): def test_create_and_rebuild_server(self): """Rebuild a server.""" + self.flags(stub_network=True) # create a server with initially has no metadata server = self._build_minimal_create_server_request() @@ -190,6 +194,8 @@ class ServersTest(integrated_helpers._IntegratedTestBase): self.assertTrue(created_server['id']) created_server_id = created_server['id'] + created_server = self._wait_for_creation(created_server) + # rebuild the server with metadata post = {} post['rebuild'] = { @@ -212,6 +218,7 @@ class ServersTest(integrated_helpers._IntegratedTestBase): def test_create_and_rebuild_server_with_metadata(self): """Rebuild a server with metadata.""" + self.flags(stub_network=True) # create a server with initially has no metadata server = self._build_minimal_create_server_request() @@ -221,6 +228,8 @@ class ServersTest(integrated_helpers._IntegratedTestBase): self.assertTrue(created_server['id']) created_server_id = created_server['id'] + created_server = self._wait_for_creation(created_server) + # rebuild the server with metadata post = {} post['rebuild'] = { @@ -248,6 +257,7 @@ class ServersTest(integrated_helpers._IntegratedTestBase): def test_create_and_rebuild_server_with_metadata_removal(self): """Rebuild a server with metadata.""" + self.flags(stub_network=True) # create a server with initially has no metadata server = self._build_minimal_create_server_request() @@ -264,6 +274,8 @@ class ServersTest(integrated_helpers._IntegratedTestBase): self.assertTrue(created_server['id']) created_server_id = created_server['id'] + created_server = self._wait_for_creation(created_server) + # rebuild the server with metadata post = {} post['rebuild'] = { diff --git a/nova/tests/scheduler/test_scheduler.py b/nova/tests/scheduler/test_scheduler.py index 158df2a27..a52dd041a 100644 --- a/nova/tests/scheduler/test_scheduler.py +++ b/nova/tests/scheduler/test_scheduler.py @@ -40,6 +40,7 @@ from nova.scheduler import driver from nova.scheduler import manager from nova.scheduler import multi from nova.compute import power_state +from nova.compute import vm_states FLAGS = flags.FLAGS @@ -94,6 +95,9 @@ class SchedulerTestCase(test.TestCase): inst['vcpus'] = kwargs.get('vcpus', 1) inst['memory_mb'] = kwargs.get('memory_mb', 10) inst['local_gb'] = kwargs.get('local_gb', 20) + inst['vm_state'] = kwargs.get('vm_state', vm_states.ACTIVE) + inst['power_state'] = kwargs.get('power_state', power_state.RUNNING) + inst['task_state'] = kwargs.get('task_state', None) return db.instance_create(ctxt, inst) def test_fallback(self): @@ -271,8 +275,9 @@ class SimpleDriverTestCase(test.TestCase): inst['memory_mb'] = kwargs.get('memory_mb', 20) inst['local_gb'] = kwargs.get('local_gb', 30) inst['launched_on'] = kwargs.get('launghed_on', 'dummy') - inst['state_description'] = kwargs.get('state_description', 'running') - inst['state'] = kwargs.get('state', power_state.RUNNING) + inst['vm_state'] = kwargs.get('vm_state', vm_states.ACTIVE) + inst['task_state'] = kwargs.get('task_state', None) + inst['power_state'] = kwargs.get('power_state', power_state.RUNNING) return db.instance_create(self.context, inst)['id'] def _create_volume(self): @@ -664,14 +669,14 @@ class SimpleDriverTestCase(test.TestCase): block_migration=False) i_ref = db.instance_get(self.context, instance_id) - self.assertTrue(i_ref['state_description'] == 'migrating') + self.assertTrue(i_ref['vm_state'] == vm_states.MIGRATING) db.instance_destroy(self.context, instance_id) db.volume_destroy(self.context, v_ref['id']) def test_live_migration_src_check_instance_not_running(self): """The instance given by instance_id is not running.""" - instance_id = self._create_instance(state_description='migrating') + instance_id = self._create_instance(power_state=power_state.NOSTATE) i_ref = db.instance_get(self.context, instance_id) try: diff --git a/nova/tests/test_adminapi.py b/nova/tests/test_adminapi.py index 06cc498ac..aaa633adc 100644 --- a/nova/tests/test_adminapi.py +++ b/nova/tests/test_adminapi.py @@ -38,8 +38,6 @@ class AdminApiTestCase(test.TestCase): super(AdminApiTestCase, self).setUp() self.flags(connection_type='fake') - self.conn = rpc.create_connection() - # set up our cloud self.api = admin.AdminController() diff --git a/nova/tests/test_cloud.py b/nova/tests/test_cloud.py index 0793784f8..3fe6a9b42 100644 --- a/nova/tests/test_cloud.py +++ b/nova/tests/test_cloud.py @@ -38,6 +38,7 @@ from nova import test from nova import utils from nova.api.ec2 import cloud from nova.api.ec2 import ec2utils +from nova.compute import vm_states from nova.image import fake @@ -51,8 +52,6 @@ class CloudTestCase(test.TestCase): self.flags(connection_type='fake', stub_network=True) - self.conn = rpc.create_connection() - # set up our cloud self.cloud = cloud.CloudController() @@ -86,13 +85,6 @@ class CloudTestCase(test.TestCase): self.stubs.Set(rpc, 'cast', finish_cast) - def tearDown(self): - networks = db.project_get_networks(self.context, self.project_id, - associate=False) - for network in networks: - db.network_disassociate(self.context, network['id']) - super(CloudTestCase, self).tearDown() - def _create_key(self, name): # NOTE(vish): create depends on pool, so just call helper directly return cloud._gen_key(self.context, self.context.user_id, name) @@ -494,8 +486,11 @@ class CloudTestCase(test.TestCase): inst2 = db.instance_create(self.context, args2) db.instance_destroy(self.context, inst1.id) result = self.cloud.describe_instances(self.context) - result = result['reservationSet'][0]['instancesSet'] - self.assertEqual(result[0]['instanceId'], + result1 = result['reservationSet'][0]['instancesSet'] + self.assertEqual(result1[0]['instanceId'], + ec2utils.id_to_ec2_id(inst1.id)) + result2 = result['reservationSet'][1]['instancesSet'] + self.assertEqual(result2[0]['instanceId'], ec2utils.id_to_ec2_id(inst2.id)) def _block_device_mapping_create(self, instance_id, mappings): @@ -1163,7 +1158,7 @@ class CloudTestCase(test.TestCase): self.compute = self.start_service('compute') def _wait_for_state(self, ctxt, instance_id, predicate): - """Wait for an stopping instance to be a given state""" + """Wait for a stopped instance to be a given state""" id = ec2utils.ec2_id_to_id(instance_id) while True: info = self.cloud.compute_api.get(context=ctxt, instance_id=id) @@ -1174,12 +1169,16 @@ class CloudTestCase(test.TestCase): def _wait_for_running(self, instance_id): def is_running(info): - return info['state_description'] == 'running' + vm_state = info["vm_state"] + task_state = info["task_state"] + return vm_state == vm_states.ACTIVE and task_state == None self._wait_for_state(self.context, instance_id, is_running) def _wait_for_stopped(self, instance_id): def is_stopped(info): - return info['state_description'] == 'stopped' + vm_state = info["vm_state"] + task_state = info["task_state"] + return vm_state == vm_states.STOPPED and task_state == None self._wait_for_state(self.context, instance_id, is_stopped) def _wait_for_terminate(self, instance_id): @@ -1562,7 +1561,7 @@ class CloudTestCase(test.TestCase): 'id': 0, 'root_device_name': '/dev/sdh', 'security_groups': [{'name': 'fake0'}, {'name': 'fake1'}], - 'state_description': 'stopping', + 'vm_state': vm_states.STOPPED, 'instance_type': {'name': 'fake_type'}, 'kernel_id': 1, 'ramdisk_id': 2, @@ -1606,7 +1605,7 @@ class CloudTestCase(test.TestCase): self.assertEqual(groupSet, expected_groupSet) self.assertEqual(get_attribute('instanceInitiatedShutdownBehavior'), {'instance_id': 'i-12345678', - 'instanceInitiatedShutdownBehavior': 'stop'}) + 'instanceInitiatedShutdownBehavior': 'stopped'}) self.assertEqual(get_attribute('instanceType'), {'instance_id': 'i-12345678', 'instanceType': 'fake_type'}) diff --git a/nova/tests/test_compute.py b/nova/tests/test_compute.py index 6659b81eb..65fdffbd6 100644 --- a/nova/tests/test_compute.py +++ b/nova/tests/test_compute.py @@ -24,6 +24,7 @@ from nova import compute from nova.compute import instance_types from nova.compute import manager as compute_manager from nova.compute import power_state +from nova.compute import vm_states from nova import context from nova import db from nova.db.sqlalchemy import models @@ -160,6 +161,19 @@ class ComputeTestCase(test.TestCase): db.security_group_destroy(self.context, group['id']) db.instance_destroy(self.context, ref[0]['id']) + def test_create_instance_with_invalid_security_group_raises(self): + instance_type = instance_types.get_default_instance_type() + + pre_build_len = len(db.instance_get_all(context.get_admin_context())) + self.assertRaises(exception.SecurityGroupNotFoundForProject, + self.compute_api.create, + self.context, + instance_type=instance_type, + image_href=None, + security_group=['this_is_a_fake_sec_group']) + self.assertEqual(pre_build_len, + len(db.instance_get_all(context.get_admin_context()))) + def test_create_instance_associates_config_drive(self): """Make sure create associates a config drive.""" @@ -763,8 +777,8 @@ class ComputeTestCase(test.TestCase): 'block_migration': False, 'disk': None}}).\ AndRaise(rpc.RemoteError('', '', '')) - dbmock.instance_update(c, i_ref['id'], {'state_description': 'running', - 'state': power_state.RUNNING, + dbmock.instance_update(c, i_ref['id'], {'vm_state': vm_states.ACTIVE, + 'task_state': None, 'host': i_ref['host']}) for v in i_ref['volumes']: dbmock.volume_update(c, v['id'], {'status': 'in-use'}) @@ -795,8 +809,8 @@ class ComputeTestCase(test.TestCase): 'block_migration': False, 'disk': None}}).\ AndRaise(rpc.RemoteError('', '', '')) - dbmock.instance_update(c, i_ref['id'], {'state_description': 'running', - 'state': power_state.RUNNING, + dbmock.instance_update(c, i_ref['id'], {'vm_state': vm_states.ACTIVE, + 'task_state': None, 'host': i_ref['host']}) self.compute.db = dbmock @@ -841,8 +855,8 @@ class ComputeTestCase(test.TestCase): c = context.get_admin_context() instance_id = self._create_instance() i_ref = db.instance_get(c, instance_id) - db.instance_update(c, i_ref['id'], {'state_description': 'migrating', - 'state': power_state.PAUSED}) + db.instance_update(c, i_ref['id'], {'vm_state': vm_states.MIGRATING, + 'power_state': power_state.PAUSED}) v_ref = db.volume_create(c, {'size': 1, 'instance_id': instance_id}) fix_addr = db.fixed_ip_create(c, {'address': '1.1.1.1', 'instance_id': instance_id}) @@ -903,7 +917,7 @@ class ComputeTestCase(test.TestCase): instances = db.instance_get_all(context.get_admin_context()) LOG.info(_("After force-killing instances: %s"), instances) self.assertEqual(len(instances), 1) - self.assertEqual(power_state.SHUTOFF, instances[0]['state']) + self.assertEqual(power_state.NOSTATE, instances[0]['power_state']) def test_get_all_by_name_regexp(self): """Test searching instances by name (display_name)""" @@ -1323,25 +1337,28 @@ class ComputeTestCase(test.TestCase): """Test searching instances by state""" c = context.get_admin_context() - instance_id1 = self._create_instance({'state': power_state.SHUTDOWN}) + instance_id1 = self._create_instance({ + 'power_state': power_state.SHUTDOWN, + }) instance_id2 = self._create_instance({ - 'id': 2, - 'state': power_state.RUNNING}) + 'id': 2, + 'power_state': power_state.RUNNING, + }) instance_id3 = self._create_instance({ - 'id': 10, - 'state': power_state.RUNNING}) - + 'id': 10, + 'power_state': power_state.RUNNING, + }) instances = self.compute_api.get_all(c, - search_opts={'state': power_state.SUSPENDED}) + search_opts={'power_state': power_state.SUSPENDED}) self.assertEqual(len(instances), 0) instances = self.compute_api.get_all(c, - search_opts={'state': power_state.SHUTDOWN}) + search_opts={'power_state': power_state.SHUTDOWN}) self.assertEqual(len(instances), 1) self.assertEqual(instances[0].id, instance_id1) instances = self.compute_api.get_all(c, - search_opts={'state': power_state.RUNNING}) + search_opts={'power_state': power_state.RUNNING}) self.assertEqual(len(instances), 2) instance_ids = [instance.id for instance in instances] self.assertTrue(instance_id2 in instance_ids) @@ -1349,7 +1366,7 @@ class ComputeTestCase(test.TestCase): # Test passing a list as search arg instances = self.compute_api.get_all(c, - search_opts={'state': [power_state.SHUTDOWN, + search_opts={'power_state': [power_state.SHUTDOWN, power_state.RUNNING]}) self.assertEqual(len(instances), 3) diff --git a/nova/tests/test_context.py b/nova/tests/test_context.py new file mode 100644 index 000000000..b2507fa59 --- /dev/null +++ b/nova/tests/test_context.py @@ -0,0 +1,33 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from nova import context +from nova import test + + +class ContextTestCase(test.TestCase): + + def test_request_context_sets_is_admin(self): + ctxt = context.RequestContext('111', + '222', + roles=['admin', 'weasel']) + self.assertEquals(ctxt.is_admin, True) + + def test_request_context_sets_is_admin_upcase(self): + ctxt = context.RequestContext('111', + '222', + roles=['Admin', 'weasel']) + self.assertEquals(ctxt.is_admin, True) diff --git a/nova/tests/test_db_api.py b/nova/tests/test_db_api.py index 038c07f40..60d7abd8c 100644 --- a/nova/tests/test_db_api.py +++ b/nova/tests/test_db_api.py @@ -91,5 +91,7 @@ class DbApiTestCase(test.TestCase): inst2 = db.instance_create(self.context, args2) db.instance_destroy(self.context, inst1.id) result = db.instance_get_all_by_filters(self.context.elevated(), {}) - self.assertEqual(1, len(result)) + self.assertEqual(2, len(result)) self.assertEqual(result[0].id, inst2.id) + self.assertEqual(result[1].id, inst1.id) + self.assertTrue(result[1].deleted) diff --git a/nova/tests/test_libvirt.py b/nova/tests/test_libvirt.py index 6a213b4f0..8c6775b29 100644 --- a/nova/tests/test_libvirt.py +++ b/nova/tests/test_libvirt.py @@ -34,6 +34,7 @@ from nova import test from nova import utils from nova.api.ec2 import cloud from nova.compute import power_state +from nova.compute import vm_states from nova.virt.libvirt import connection from nova.virt.libvirt import firewall @@ -674,8 +675,9 @@ class LibvirtConnTestCase(test.TestCase): # Preparing data self.compute = utils.import_object(FLAGS.compute_manager) - instance_dict = {'host': 'fake', 'state': power_state.RUNNING, - 'state_description': 'running'} + instance_dict = {'host': 'fake', + 'power_state': power_state.RUNNING, + 'vm_state': vm_states.ACTIVE} instance_ref = db.instance_create(self.context, self.test_instance) instance_ref = db.instance_update(self.context, instance_ref['id'], instance_dict) @@ -713,8 +715,8 @@ class LibvirtConnTestCase(test.TestCase): self.compute.rollback_live_migration) instance_ref = db.instance_get(self.context, instance_ref['id']) - self.assertTrue(instance_ref['state_description'] == 'running') - self.assertTrue(instance_ref['state'] == power_state.RUNNING) + self.assertTrue(instance_ref['vm_state'] == vm_states.ACTIVE) + self.assertTrue(instance_ref['power_state'] == power_state.RUNNING) volume_ref = db.volume_get(self.context, volume_ref['id']) self.assertTrue(volume_ref['status'] == 'in-use') diff --git a/nova/tests/test_network.py b/nova/tests/test_network.py index 0b8539442..25ff940f0 100644 --- a/nova/tests/test_network.py +++ b/nova/tests/test_network.py @@ -371,6 +371,22 @@ class VlanNetworkTestCase(test.TestCase): self.mox.ReplayAll() self.network.validate_networks(self.context, requested_networks) + def test_cant_associate_associated_floating_ip(self): + ctxt = context.RequestContext('testuser', 'testproject', + is_admin=False) + + def fake_floating_ip_get_by_address(context, address): + return {'address': '10.10.10.10', + 'fixed_ip': {'address': '10.0.0.1'}} + self.stubs.Set(self.network.db, 'floating_ip_get_by_address', + fake_floating_ip_get_by_address) + + self.assertRaises(exception.FloatingIpAlreadyInUse, + self.network.associate_floating_ip, + ctxt, + mox.IgnoreArg(), + mox.IgnoreArg()) + class CommonNetworkTestCase(test.TestCase): diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py index ba9c0a859..6b4454747 100644 --- a/nova/tests/test_rpc.py +++ b/nova/tests/test_rpc.py @@ -22,168 +22,16 @@ Unit Tests for remote procedure calls using queue from nova import context from nova import log as logging from nova import rpc -from nova import test +from nova.tests import test_rpc_common LOG = logging.getLogger('nova.tests.rpc') -class RpcTestCase(test.TestCase): +class RpcTestCase(test_rpc_common._BaseRpcTestCase): def setUp(self): + self.rpc = rpc super(RpcTestCase, self).setUp() - self.conn = rpc.create_connection(True) - self.receiver = TestReceiver() - self.consumer = rpc.create_consumer(self.conn, - 'test', - self.receiver, - False) - self.consumer.attach_to_eventlet() - self.context = context.get_admin_context() - def test_call_succeed(self): - value = 42 - result = rpc.call(self.context, 'test', {"method": "echo", - "args": {"value": value}}) - self.assertEqual(value, result) - - def test_call_succeed_despite_multiple_returns(self): - value = 42 - result = rpc.call(self.context, 'test', {"method": "echo_three_times", - "args": {"value": value}}) - self.assertEqual(value + 2, result) - - def test_call_succeed_despite_multiple_returns_yield(self): - value = 42 - result = rpc.call(self.context, 'test', - {"method": "echo_three_times_yield", - "args": {"value": value}}) - self.assertEqual(value + 2, result) - - def test_multicall_succeed_once(self): - value = 42 - result = rpc.multicall(self.context, - 'test', - {"method": "echo", - "args": {"value": value}}) - for i, x in enumerate(result): - if i > 0: - self.fail('should only receive one response') - self.assertEqual(value + i, x) - - def test_multicall_succeed_three_times(self): - value = 42 - result = rpc.multicall(self.context, - 'test', - {"method": "echo_three_times", - "args": {"value": value}}) - for i, x in enumerate(result): - self.assertEqual(value + i, x) - - def test_multicall_succeed_three_times_yield(self): - value = 42 - result = rpc.multicall(self.context, - 'test', - {"method": "echo_three_times_yield", - "args": {"value": value}}) - for i, x in enumerate(result): - self.assertEqual(value + i, x) - - def test_context_passed(self): - """Makes sure a context is passed through rpc call.""" - value = 42 - result = rpc.call(self.context, - 'test', {"method": "context", - "args": {"value": value}}) - self.assertEqual(self.context.to_dict(), result) - - def test_call_exception(self): - """Test that exception gets passed back properly. - - rpc.call returns a RemoteError object. The value of the - exception is converted to a string, so we convert it back - to an int in the test. - - """ - value = 42 - self.assertRaises(rpc.RemoteError, - rpc.call, - self.context, - 'test', - {"method": "fail", - "args": {"value": value}}) - try: - rpc.call(self.context, - 'test', - {"method": "fail", - "args": {"value": value}}) - self.fail("should have thrown rpc.RemoteError") - except rpc.RemoteError as exc: - self.assertEqual(int(exc.value), value) - - def test_nested_calls(self): - """Test that we can do an rpc.call inside another call.""" - class Nested(object): - @staticmethod - def echo(context, queue, value): - """Calls echo in the passed queue""" - LOG.debug(_("Nested received %(queue)s, %(value)s") - % locals()) - # TODO: so, it will replay the context and use the same REQID? - # that's bizarre. - ret = rpc.call(context, - queue, - {"method": "echo", - "args": {"value": value}}) - LOG.debug(_("Nested return %s"), ret) - return value - - nested = Nested() - conn = rpc.create_connection(True) - consumer = rpc.create_consumer(conn, - 'nested', - nested, - False) - consumer.attach_to_eventlet() - value = 42 - result = rpc.call(self.context, - 'nested', {"method": "echo", - "args": {"queue": "test", - "value": value}}) - self.assertEqual(value, result) - - -class TestReceiver(object): - """Simple Proxy class so the consumer has methods to call. - - Uses static methods because we aren't actually storing any state. - - """ - - @staticmethod - def echo(context, value): - """Simply returns whatever value is sent in.""" - LOG.debug(_("Received %s"), value) - return value - - @staticmethod - def context(context, value): - """Returns dictionary version of context.""" - LOG.debug(_("Received %s"), context) - return context.to_dict() - - @staticmethod - def echo_three_times(context, value): - context.reply(value) - context.reply(value + 1) - context.reply(value + 2) - - @staticmethod - def echo_three_times_yield(context, value): - yield value - yield value + 1 - yield value + 2 - - @staticmethod - def fail(context, value): - """Raises an exception with the value sent in.""" - raise Exception(value) + def tearDown(self): + super(RpcTestCase, self).tearDown() diff --git a/nova/tests/test_rpc_amqp.py b/nova/tests/test_rpc_amqp.py deleted file mode 100644 index 2215a908b..000000000 --- a/nova/tests/test_rpc_amqp.py +++ /dev/null @@ -1,88 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (c) 2010 Openstack, LLC. -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Tests For RPC AMQP. -""" - -from nova import context -from nova import log as logging -from nova import rpc -from nova.rpc import amqp -from nova import test - - -LOG = logging.getLogger('nova.tests.rpc') - - -class RpcAMQPTestCase(test.TestCase): - def setUp(self): - super(RpcAMQPTestCase, self).setUp() - self.conn = rpc.create_connection(True) - self.receiver = TestReceiver() - self.consumer = rpc.create_consumer(self.conn, - 'test', - self.receiver, - False) - self.consumer.attach_to_eventlet() - self.context = context.get_admin_context() - - def test_connectionpool_single(self): - """Test that ConnectionPool recycles a single connection.""" - conn1 = amqp.ConnectionPool.get() - amqp.ConnectionPool.put(conn1) - conn2 = amqp.ConnectionPool.get() - amqp.ConnectionPool.put(conn2) - self.assertEqual(conn1, conn2) - - -class TestReceiver(object): - """Simple Proxy class so the consumer has methods to call. - - Uses static methods because we aren't actually storing any state. - - """ - - @staticmethod - def echo(context, value): - """Simply returns whatever value is sent in.""" - LOG.debug(_("Received %s"), value) - return value - - @staticmethod - def context(context, value): - """Returns dictionary version of context.""" - LOG.debug(_("Received %s"), context) - return context.to_dict() - - @staticmethod - def echo_three_times(context, value): - context.reply(value) - context.reply(value + 1) - context.reply(value + 2) - - @staticmethod - def echo_three_times_yield(context, value): - yield value - yield value + 1 - yield value + 2 - - @staticmethod - def fail(context, value): - """Raises an exception with the value sent in.""" - raise Exception(value) diff --git a/nova/tests/test_rpc_carrot.py b/nova/tests/test_rpc_carrot.py new file mode 100644 index 000000000..57cdebf4f --- /dev/null +++ b/nova/tests/test_rpc_carrot.py @@ -0,0 +1,45 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Unit Tests for remote procedure calls using carrot +""" + +from nova import context +from nova import log as logging +from nova.rpc import impl_carrot +from nova.tests import test_rpc_common + + +LOG = logging.getLogger('nova.tests.rpc') + + +class RpcCarrotTestCase(test_rpc_common._BaseRpcTestCase): + def setUp(self): + self.rpc = impl_carrot + super(RpcCarrotTestCase, self).setUp() + + def tearDown(self): + super(RpcCarrotTestCase, self).tearDown() + + def test_connectionpool_single(self): + """Test that ConnectionPool recycles a single connection.""" + conn1 = self.rpc.ConnectionPool.get() + self.rpc.ConnectionPool.put(conn1) + conn2 = self.rpc.ConnectionPool.get() + self.rpc.ConnectionPool.put(conn2) + self.assertEqual(conn1, conn2) diff --git a/nova/tests/test_rpc_common.py b/nova/tests/test_rpc_common.py new file mode 100644 index 000000000..4ab4e8a0e --- /dev/null +++ b/nova/tests/test_rpc_common.py @@ -0,0 +1,189 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Unit Tests for remote procedure calls shared between all implementations +""" + +from nova import context +from nova import log as logging +from nova.rpc.common import RemoteError +from nova import test + + +LOG = logging.getLogger('nova.tests.rpc') + + +class _BaseRpcTestCase(test.TestCase): + def setUp(self): + super(_BaseRpcTestCase, self).setUp() + self.conn = self.rpc.create_connection(True) + self.receiver = TestReceiver() + self.conn.create_consumer('test', self.receiver, False) + self.conn.consume_in_thread() + self.context = context.get_admin_context() + + def tearDown(self): + self.conn.close() + super(_BaseRpcTestCase, self).tearDown() + + def test_call_succeed(self): + value = 42 + result = self.rpc.call(self.context, 'test', {"method": "echo", + "args": {"value": value}}) + self.assertEqual(value, result) + + def test_call_succeed_despite_multiple_returns(self): + value = 42 + result = self.rpc.call(self.context, 'test', + {"method": "echo_three_times", + "args": {"value": value}}) + self.assertEqual(value + 2, result) + + def test_call_succeed_despite_multiple_returns_yield(self): + value = 42 + result = self.rpc.call(self.context, 'test', + {"method": "echo_three_times_yield", + "args": {"value": value}}) + self.assertEqual(value + 2, result) + + def test_multicall_succeed_once(self): + value = 42 + result = self.rpc.multicall(self.context, + 'test', + {"method": "echo", + "args": {"value": value}}) + for i, x in enumerate(result): + if i > 0: + self.fail('should only receive one response') + self.assertEqual(value + i, x) + + def test_multicall_succeed_three_times(self): + value = 42 + result = self.rpc.multicall(self.context, + 'test', + {"method": "echo_three_times", + "args": {"value": value}}) + for i, x in enumerate(result): + self.assertEqual(value + i, x) + + def test_multicall_succeed_three_times_yield(self): + value = 42 + result = self.rpc.multicall(self.context, + 'test', + {"method": "echo_three_times_yield", + "args": {"value": value}}) + for i, x in enumerate(result): + self.assertEqual(value + i, x) + + def test_context_passed(self): + """Makes sure a context is passed through rpc call.""" + value = 42 + result = self.rpc.call(self.context, + 'test', {"method": "context", + "args": {"value": value}}) + self.assertEqual(self.context.to_dict(), result) + + def test_call_exception(self): + """Test that exception gets passed back properly. + + rpc.call returns a RemoteError object. The value of the + exception is converted to a string, so we convert it back + to an int in the test. + + """ + value = 42 + self.assertRaises(RemoteError, + self.rpc.call, + self.context, + 'test', + {"method": "fail", + "args": {"value": value}}) + try: + self.rpc.call(self.context, + 'test', + {"method": "fail", + "args": {"value": value}}) + self.fail("should have thrown RemoteError") + except RemoteError as exc: + self.assertEqual(int(exc.value), value) + + def test_nested_calls(self): + """Test that we can do an rpc.call inside another call.""" + class Nested(object): + @staticmethod + def echo(context, queue, value): + """Calls echo in the passed queue""" + LOG.debug(_("Nested received %(queue)s, %(value)s") + % locals()) + # TODO: so, it will replay the context and use the same REQID? + # that's bizarre. + ret = self.rpc.call(context, + queue, + {"method": "echo", + "args": {"value": value}}) + LOG.debug(_("Nested return %s"), ret) + return value + + nested = Nested() + conn = self.rpc.create_connection(True) + conn.create_consumer('nested', nested, False) + conn.consume_in_thread() + value = 42 + result = self.rpc.call(self.context, + 'nested', {"method": "echo", + "args": {"queue": "test", + "value": value}}) + conn.close() + self.assertEqual(value, result) + + +class TestReceiver(object): + """Simple Proxy class so the consumer has methods to call. + + Uses static methods because we aren't actually storing any state. + + """ + + @staticmethod + def echo(context, value): + """Simply returns whatever value is sent in.""" + LOG.debug(_("Received %s"), value) + return value + + @staticmethod + def context(context, value): + """Returns dictionary version of context.""" + LOG.debug(_("Received %s"), context) + return context.to_dict() + + @staticmethod + def echo_three_times(context, value): + context.reply(value) + context.reply(value + 1) + context.reply(value + 2) + + @staticmethod + def echo_three_times_yield(context, value): + yield value + yield value + 1 + yield value + 2 + + @staticmethod + def fail(context, value): + """Raises an exception with the value sent in.""" + raise Exception(value) diff --git a/nova/tests/test_rpc_kombu.py b/nova/tests/test_rpc_kombu.py new file mode 100644 index 000000000..101ed14af --- /dev/null +++ b/nova/tests/test_rpc_kombu.py @@ -0,0 +1,110 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Unit Tests for remote procedure calls using kombu +""" + +from nova import context +from nova import log as logging +from nova import test +from nova.rpc import impl_kombu +from nova.tests import test_rpc_common + + +LOG = logging.getLogger('nova.tests.rpc') + + +class RpcKombuTestCase(test_rpc_common._BaseRpcTestCase): + def setUp(self): + self.rpc = impl_kombu + super(RpcKombuTestCase, self).setUp() + + def tearDown(self): + super(RpcKombuTestCase, self).tearDown() + + def test_reusing_connection(self): + """Test that reusing a connection returns same one.""" + conn_context = self.rpc.create_connection(new=False) + conn1 = conn_context.connection + conn_context.close() + conn_context = self.rpc.create_connection(new=False) + conn2 = conn_context.connection + conn_context.close() + self.assertEqual(conn1, conn2) + + def test_topic_send_receive(self): + """Test sending to a topic exchange/queue""" + + conn = self.rpc.create_connection() + message = 'topic test message' + + self.received_message = None + + def _callback(message): + self.received_message = message + + conn.declare_topic_consumer('a_topic', _callback) + conn.topic_send('a_topic', message) + conn.consume(limit=1) + conn.close() + + self.assertEqual(self.received_message, message) + + def test_direct_send_receive(self): + """Test sending to a direct exchange/queue""" + conn = self.rpc.create_connection() + message = 'direct test message' + + self.received_message = None + + def _callback(message): + self.received_message = message + + conn.declare_direct_consumer('a_direct', _callback) + conn.direct_send('a_direct', message) + conn.consume(limit=1) + conn.close() + + self.assertEqual(self.received_message, message) + + @test.skip_test("kombu memory transport seems buggy with fanout queues " + "as this test passes when you use rabbit (fake_rabbit=False)") + def test_fanout_send_receive(self): + """Test sending to a fanout exchange and consuming from 2 queues""" + + conn = self.rpc.create_connection() + conn2 = self.rpc.create_connection() + message = 'fanout test message' + + self.received_message = None + + def _callback(message): + self.received_message = message + + conn.declare_fanout_consumer('a_fanout', _callback) + conn2.declare_fanout_consumer('a_fanout', _callback) + conn.fanout_send('a_fanout', message) + + conn.consume(limit=1) + conn.close() + self.assertEqual(self.received_message, message) + + self.received_message = None + conn2.consume(limit=1) + conn2.close() + self.assertEqual(self.received_message, message) diff --git a/nova/tests/test_test.py b/nova/tests/test_test.py index 64f11fa45..3482ff6a0 100644 --- a/nova/tests/test_test.py +++ b/nova/tests/test_test.py @@ -40,6 +40,5 @@ class IsolationTestCase(test.TestCase): connection = rpc.create_connection(new=True) proxy = NeverCalled() - consumer = rpc.create_consumer(connection, 'compute', - proxy, fanout=False) - consumer.attach_to_eventlet() + connection.create_consumer('compute', proxy, fanout=False) + connection.consume_in_thread() diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py index 2f0559366..91b4161b0 100644 --- a/nova/tests/test_xenapi.py +++ b/nova/tests/test_xenapi.py @@ -16,7 +16,6 @@ """Test suite for XenAPI.""" -import eventlet import functools import json import os @@ -203,42 +202,6 @@ class XenAPIVMTestCase(test.TestCase): self.context = context.RequestContext(self.user_id, self.project_id) self.conn = xenapi_conn.get_connection(False) - def test_parallel_builds(self): - stubs.stubout_loopingcall_delay(self.stubs) - - def _do_build(id, proj, user, *args): - values = { - 'id': id, - 'project_id': proj, - 'user_id': user, - 'image_ref': 1, - 'kernel_id': 2, - 'ramdisk_id': 3, - 'instance_type_id': '3', # m1.large - 'os_type': 'linux', - 'architecture': 'x86-64'} - network_info = [({'bridge': 'fa0', 'id': 0, 'injected': False}, - {'broadcast': '192.168.0.255', - 'dns': ['192.168.0.1'], - 'gateway': '192.168.0.1', - 'gateway6': 'dead:beef::1', - 'ip6s': [{'enabled': '1', - 'ip': 'dead:beef::dcad:beff:feef:0', - 'netmask': '64'}], - 'ips': [{'enabled': '1', - 'ip': '192.168.0.100', - 'netmask': '255.255.255.0'}], - 'label': 'fake', - 'mac': 'DE:AD:BE:EF:00:00', - 'rxtx_cap': 3})] - instance = db.instance_create(self.context, values) - self.conn.spawn(self.context, instance, network_info) - - gt1 = eventlet.spawn(_do_build, 1, self.project_id, self.user_id) - gt2 = eventlet.spawn(_do_build, 2, self.project_id, self.user_id) - gt1.wait() - gt2.wait() - def test_list_instances_0(self): instances = self.conn.list_instances() self.assertEquals(instances, []) @@ -531,6 +494,7 @@ class XenAPIVMTestCase(test.TestCase): self.check_vm_params_for_linux_with_external_kernel() def test_spawn_netinject_file(self): + self.flags(flat_injected=True) db_fakes.stub_out_db_instance_api(self.stubs, injected=True) self._tee_executed = False @@ -648,7 +612,6 @@ class XenAPIVMTestCase(test.TestCase): str(3 * 1024)) def test_rescue(self): - self.flags(flat_injected=False) instance = self._create_instance() conn = xenapi_conn.get_connection(False) conn.rescue(self.context, instance, None, []) diff --git a/nova/tests/vmwareapi/db_fakes.py b/nova/tests/vmwareapi/db_fakes.py index afd672c7a..0d896239a 100644 --- a/nova/tests/vmwareapi/db_fakes.py +++ b/nova/tests/vmwareapi/db_fakes.py @@ -23,6 +23,8 @@ import time from nova import db
from nova import utils
+from nova.compute import task_states
+from nova.compute import vm_states
def stub_out_db_instance_api(stubs):
@@ -64,7 +66,8 @@ def stub_out_db_instance_api(stubs): 'image_ref': values['image_ref'],
'kernel_id': values['kernel_id'],
'ramdisk_id': values['ramdisk_id'],
- 'state_description': 'scheduling',
+ 'vm_state': vm_states.BUILDING,
+ 'task_state': task_states.SCHEDULING,
'user_id': values['user_id'],
'project_id': values['project_id'],
'launch_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py index a6a1febd6..647a4c1df 100644 --- a/nova/tests/xenapi/stubs.py +++ b/nova/tests/xenapi/stubs.py @@ -18,6 +18,8 @@ import eventlet import json +import random + from nova.virt import xenapi_conn from nova.virt.xenapi import fake from nova.virt.xenapi import volume_utils @@ -191,6 +193,7 @@ class FakeSessionForVMTests(fake.SessionBase): vm['power_state'] = 'Running' vm['is_a_template'] = False vm['is_control_domain'] = False + vm['domid'] = random.randrange(1, 1 << 16) def VM_snapshot(self, session_ref, vm_ref, label): status = "Running" @@ -290,6 +293,7 @@ class FakeSessionForMigrationTests(fake.SessionBase): vm['power_state'] = 'Running' vm['is_a_template'] = False vm['is_control_domain'] = False + vm['domid'] = random.randrange(1, 1 << 16) def stub_out_migration_methods(stubs): diff --git a/nova/utils.py b/nova/utils.py index 21e6221b2..81157a4cd 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -901,3 +901,12 @@ def monkey_patch(): func = import_class("%s.%s" % (module, key)) setattr(sys.modules[module], key,\ decorator("%s.%s" % (module, key), func)) + + +def convert_to_list_dict(lst, label): + """Convert a value or list into a list of dicts""" + if not lst: + return None + if not isinstance(lst, list): + lst = [lst] + return [{label: x} for x in lst] diff --git a/nova/virt/xenapi/fake.py b/nova/virt/xenapi/fake.py index 7c91aa9b9..97dfd9fa9 100644 --- a/nova/virt/xenapi/fake.py +++ b/nova/virt/xenapi/fake.py @@ -51,6 +51,7 @@ A fake XenAPI SDK. """ +import random import uuid from pprint import pformat @@ -103,8 +104,10 @@ def create_network(name_label, bridge): def create_vm(name_label, status, is_a_template=False, is_control_domain=False): + domid = status == 'Running' and random.randrange(1, 1 << 16) or -1 return _create_object('VM', {'name_label': name_label, + 'domid': domid, 'power-state': status, 'is_a_template': is_a_template, 'is_control_domain': is_control_domain}) diff --git a/tools/pip-requires b/tools/pip-requires index 60b502ffd..66d6a48d9 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -8,6 +8,7 @@ anyjson==0.2.4 boto==1.9b carrot==0.10.5 eventlet +kombu lockfile==0.8 lxml==2.3 python-novaclient==2.6.0 |