summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAnne Gentle <anne@openstack.org>2011-01-07 10:57:53 -0600
committerAnne Gentle <anne@openstack.org>2011-01-07 10:57:53 -0600
commit5d02ee9e5f1502de2d379ce1acbc7e6bae1732f1 (patch)
treee75bc3a452904d8e28db0d2a69b44d726835b0f0
parent80d10d960d0d472bf9a1d35bce081fb350c98a35 (diff)
parente33102d23ec8f357c08e2583f8d9e3c1753bab4d (diff)
downloadnova-5d02ee9e5f1502de2d379ce1acbc7e6bae1732f1.tar.gz
nova-5d02ee9e5f1502de2d379ce1acbc7e6bae1732f1.tar.xz
nova-5d02ee9e5f1502de2d379ce1acbc7e6bae1732f1.zip
Merged with trunk
-rw-r--r--.mailmap1
-rw-r--r--Authors1
-rw-r--r--doc/source/community.rst3
-rw-r--r--doc/source/devref/development.environment.rst7
-rw-r--r--nova/adminclient.py7
-rw-r--r--nova/api/ec2/cloud.py278
-rw-r--r--nova/api/openstack/images.py7
-rw-r--r--nova/api/openstack/servers.py82
-rw-r--r--nova/compute/__init__.py15
-rw-r--r--nova/compute/api.py214
-rw-r--r--nova/compute/manager.py112
-rw-r--r--nova/db/api.py8
-rw-r--r--nova/db/sqlalchemy/__init__.py21
-rw-r--r--nova/db/sqlalchemy/api.py76
-rw-r--r--nova/db/sqlalchemy/models.py28
-rw-r--r--nova/flags.py2
-rw-r--r--nova/image/glance.py1
-rw-r--r--nova/network/__init__.py15
-rw-r--r--nova/network/api.py87
-rw-r--r--nova/quota.py6
-rw-r--r--nova/service.py12
-rw-r--r--nova/tests/api/openstack/fakes.py2
-rw-r--r--nova/tests/api/openstack/test_servers.py29
-rw-r--r--nova/tests/hyperv_unittest.py71
-rw-r--r--nova/tests/test_cloud.py19
-rw-r--r--nova/tests/test_compute.py29
-rw-r--r--nova/tests/test_quota.py10
-rw-r--r--nova/tests/test_xenapi.py30
-rw-r--r--nova/virt/connection.py3
-rw-r--r--nova/virt/hyperv.py459
-rw-r--r--nova/virt/images.py45
-rw-r--r--nova/virt/xenapi/vmops.py214
-rw-r--r--nova/virt/xenapi/volume_utils.py12
-rw-r--r--nova/virt/xenapi_conn.py28
-rw-r--r--nova/volume/__init__.py14
-rw-r--r--nova/volume/api.py101
-rwxr-xr-xplugins/xenserver/xenapi/etc/xapi.d/plugins/pluginlib_nova.py28
-rwxr-xr-xplugins/xenserver/xenapi/etc/xapi.d/plugins/xenstore.py180
38 files changed, 1707 insertions, 550 deletions
diff --git a/.mailmap b/.mailmap
index 010678569..2af2d7cd9 100644
--- a/.mailmap
+++ b/.mailmap
@@ -30,3 +30,4 @@
<rconradharris@gmail.com> <rick.harris@rackspace.com>
<corywright@gmail.com> <cory.wright@rackspace.com>
<ant@openstack.org> <amesserl@rackspace.com>
+<chiradeep@cloud.com> <chiradeep@chiradeep-lt2>
diff --git a/Authors b/Authors
index 639e68a59..8dfaf9557 100644
--- a/Authors
+++ b/Authors
@@ -3,6 +3,7 @@ Anne Gentle <anne@openstack.org>
Anthony Young <sleepsonthefloor@gmail.com>
Antony Messerli <ant@openstack.org>
Armando Migliaccio <Armando.Migliaccio@eu.citrix.com>
+Chiradeep Vittal <chiradeep@cloud.com>
Chris Behrens <cbehrens@codestud.com>
Chmouel Boudjnah <chmouel@chmouel.com>
Cory Wright <corywright@gmail.com>
diff --git a/doc/source/community.rst b/doc/source/community.rst
index 402f9dfe4..4ae32f1eb 100644
--- a/doc/source/community.rst
+++ b/doc/source/community.rst
@@ -35,7 +35,8 @@ Contributing Code
To contribute code, sign up for a Launchpad account and sign a contributor license agreement,
available on the `OpenStack Wiki <http://wiki.openstack.org/CLA>`_. Once the CLA is signed you
-can contribute code through the Bazaar version control system which is related to your Launchpad account.
+can contribute code through the Bazaar version control system which is related to your Launchpad
+account. See the :doc:`devref/development.environment` page to get started.
#openstack on Freenode IRC Network
----------------------------------
diff --git a/doc/source/devref/development.environment.rst b/doc/source/devref/development.environment.rst
index d5f10c42a..f3c454d64 100644
--- a/doc/source/devref/development.environment.rst
+++ b/doc/source/devref/development.environment.rst
@@ -88,7 +88,12 @@ Here's how to get the latest code::
source .nova_venv/bin/activate
./run_tests.sh
-And then you can do cleaning work or hack hack hack with a branched named cleaning::
+Then you can do cleaning work or hack hack hack with a branched named cleaning.
+
+Contributing Your Work
+----------------------
+
+Once your work is complete you may wish to contribute it to the project. Add your name and email address to the `Authors` file, and also to the `.mailmap` file if you use multiple email addresses. Your contributions can not be merged into trunk unless you are listed in the Authors file. Now, push the branch to Launchpad::
bzr push lp:~launchpaduserid/nova/cleaning
diff --git a/nova/adminclient.py b/nova/adminclient.py
index 6ae9f0c0f..b2609c8c4 100644
--- a/nova/adminclient.py
+++ b/nova/adminclient.py
@@ -23,12 +23,9 @@ import base64
import boto
import httplib
-from nova import flags
from boto.ec2.regioninfo import RegionInfo
-FLAGS = flags.FLAGS
-
DEFAULT_CLC_URL = 'http://127.0.0.1:8773'
DEFAULT_REGION = 'nova'
@@ -199,8 +196,8 @@ class NovaAdminClient(object):
self,
clc_url=DEFAULT_CLC_URL,
region=DEFAULT_REGION,
- access_key=FLAGS.aws_access_key_id,
- secret_key=FLAGS.aws_secret_access_key,
+ access_key=None,
+ secret_key=None,
**kwargs):
parts = self.split_clc_url(clc_url)
diff --git a/nova/api/ec2/cloud.py b/nova/api/ec2/cloud.py
index 9fb6307a8..6619b5452 100644
--- a/nova/api/ec2/cloud.py
+++ b/nova/api/ec2/cloud.py
@@ -31,19 +31,19 @@ import os
from nova import context
import IPy
+from nova import compute
from nova import crypto
from nova import db
from nova import exception
from nova import flags
-from nova import quota
+from nova import network
from nova import rpc
from nova import utils
-from nova.compute import api as compute_api
+from nova import volume
from nova.compute import instance_types
FLAGS = flags.FLAGS
-flags.DECLARE('storage_availability_zone', 'nova.volume.manager')
InvalidInputException = exception.InvalidInputException
@@ -71,16 +71,16 @@ def _gen_key(context, user_id, key_name):
return {'private_key': private_key, 'fingerprint': fingerprint}
-def ec2_id_to_internal_id(ec2_id):
- """Convert an ec2 ID (i-[base 36 number]) to an internal id (int)"""
+def ec2_id_to_id(ec2_id):
+ """Convert an ec2 ID (i-[base 36 number]) to an instance id (int)"""
return int(ec2_id[2:], 36)
-def internal_id_to_ec2_id(internal_id):
- """Convert an internal ID (int) to an ec2 ID (i-[base 36 number])"""
+def id_to_ec2_id(instance_id):
+ """Convert an instance ID (int) to an ec2 ID (i-[base 36 number])"""
digits = []
- while internal_id != 0:
- internal_id, remainder = divmod(internal_id, 36)
+ while instance_id != 0:
+ instance_id, remainder = divmod(instance_id, 36)
digits.append('0123456789abcdefghijklmnopqrstuvwxyz'[remainder])
return "i-%s" % ''.join(reversed(digits))
@@ -91,10 +91,11 @@ class CloudController(object):
sent to the other nodes.
"""
def __init__(self):
- self.network_manager = utils.import_object(FLAGS.network_manager)
self.image_service = utils.import_object(FLAGS.image_service)
- self.compute_api = compute_api.ComputeAPI(self.network_manager,
- self.image_service)
+ self.network_api = network.API()
+ self.volume_api = volume.API()
+ self.compute_api = compute.API(self.image_service, self.network_api,
+ self.volume_api)
self.setup()
def __str__(self):
@@ -118,7 +119,8 @@ class CloudController(object):
def _get_mpi_data(self, context, project_id):
result = {}
- for instance in self.compute_api.get_instances(context, project_id):
+ for instance in self.compute_api.get_all(context,
+ project_id=project_id):
if instance['fixed_ip']:
line = '%s slots=%d' % (instance['fixed_ip']['address'],
instance['vcpus'])
@@ -140,7 +142,7 @@ class CloudController(object):
def get_metadata(self, address):
ctxt = context.get_admin_context()
- instance_ref = db.fixed_ip_get_instance(ctxt, address)
+ instance_ref = self.compute_api.get_all(ctxt, fixed_ip=address)
if instance_ref is None:
return None
mpi = self._get_mpi_data(ctxt, instance_ref['project_id'])
@@ -152,7 +154,7 @@ class CloudController(object):
hostname = instance_ref['hostname']
floating_ip = db.instance_get_floating_address(ctxt,
instance_ref['id'])
- ec2_id = internal_id_to_ec2_id(instance_ref['internal_id'])
+ ec2_id = id_to_ec2_id(instance_ref['id'])
data = {
'user-data': base64.b64decode(instance_ref['user_data']),
'meta-data': {
@@ -478,8 +480,8 @@ class CloudController(object):
def get_console_output(self, context, instance_id, **kwargs):
# instance_id is passed in as a list of instances
ec2_id = instance_id[0]
- internal_id = ec2_id_to_internal_id(ec2_id)
- instance_ref = self.compute_api.get_instance(context, internal_id)
+ instance_id = ec2_id_to_id(ec2_id)
+ instance_ref = self.compute_api.get(context, instance_id)
output = rpc.call(context,
'%s.%s' % (FLAGS.compute_topic,
instance_ref['host']),
@@ -492,27 +494,22 @@ class CloudController(object):
"output": base64.b64encode(output)}
def describe_volumes(self, context, volume_id=None, **kwargs):
- if context.user.is_admin():
- volumes = db.volume_get_all(context)
- else:
- volumes = db.volume_get_all_by_project(context, context.project_id)
-
+ volumes = self.volume_api.get_all(context)
# NOTE(vish): volume_id is an optional list of volume ids to filter by.
volumes = [self._format_volume(context, v) for v in volumes
- if volume_id is None or v['ec2_id'] in volume_id]
-
+ if volume_id is None or v['id'] in volume_id]
return {'volumeSet': volumes}
def _format_volume(self, context, volume):
instance_ec2_id = None
instance_data = None
if volume.get('instance', None):
- internal_id = volume['instance']['internal_id']
- instance_ec2_id = internal_id_to_ec2_id(internal_id)
+ instance_id = volume['instance']['id']
+ instance_ec2_id = id_to_ec2_id(instance_id)
instance_data = '%s[%s]' % (instance_ec2_id,
volume['instance']['host'])
v = {}
- v['volumeId'] = volume['ec2_id']
+ v['volumeId'] = volume['id']
v['status'] = volume['status']
v['size'] = volume['size']
v['availabilityZone'] = volume['availability_zone']
@@ -539,88 +536,47 @@ class CloudController(object):
return v
def create_volume(self, context, size, **kwargs):
- # check quota
- if quota.allowed_volumes(context, 1, size) < 1:
- logging.warn("Quota exceeeded for %s, tried to create %sG volume",
- context.project_id, size)
- raise quota.QuotaError("Volume quota exceeded. You cannot "
- "create a volume of size %s" % size)
- vol = {}
- vol['size'] = size
- vol['user_id'] = context.user.id
- vol['project_id'] = context.project_id
- vol['availability_zone'] = FLAGS.storage_availability_zone
- vol['status'] = "creating"
- vol['attach_status'] = "detached"
- vol['display_name'] = kwargs.get('display_name')
- vol['display_description'] = kwargs.get('display_description')
- volume_ref = db.volume_create(context, vol)
-
- rpc.cast(context,
- FLAGS.scheduler_topic,
- {"method": "create_volume",
- "args": {"topic": FLAGS.volume_topic,
- "volume_id": volume_ref['id']}})
-
+ volume = self.volume_api.create(context, size,
+ kwargs.get('display_name'),
+ kwargs.get('display_description'))
# TODO(vish): Instance should be None at db layer instead of
# trying to lazy load, but for now we turn it into
# a dict to avoid an error.
return {'volumeSet': [self._format_volume(context, dict(volume_ref))]}
+ def delete_volume(self, context, volume_id, **kwargs):
+ self.volume_api.delete(context, volume_id)
+ return True
+
+ def update_volume(self, context, volume_id, **kwargs):
+ updatable_fields = ['display_name', 'display_description']
+ changes = {}
+ for field in updatable_fields:
+ if field in kwargs:
+ changes[field] = kwargs[field]
+ if changes:
+ self.volume_api.update(context, volume_id, kwargs)
+ return True
+
def attach_volume(self, context, volume_id, instance_id, device, **kwargs):
- volume_ref = db.volume_get_by_ec2_id(context, volume_id)
- if not re.match("^/dev/[a-z]d[a-z]+$", device):
- raise exception.ApiError(_("Invalid device specified: %s. "
- "Example device: /dev/vdb") % device)
- # TODO(vish): abstract status checking?
- if volume_ref['status'] != "available":
- raise exception.ApiError(_("Volume status must be available"))
- if volume_ref['attach_status'] == "attached":
- raise exception.ApiError(_("Volume is already attached"))
- internal_id = ec2_id_to_internal_id(instance_id)
- instance_ref = self.compute_api.get_instance(context, internal_id)
- host = instance_ref['host']
- rpc.cast(context,
- db.queue_get_for(context, FLAGS.compute_topic, host),
- {"method": "attach_volume",
- "args": {"volume_id": volume_ref['id'],
- "instance_id": instance_ref['id'],
- "mountpoint": device}})
- return {'attachTime': volume_ref['attach_time'],
- 'device': volume_ref['mountpoint'],
- 'instanceId': instance_ref['id'],
+ self.compute_api.attach_volume(context, instance_id, volume_id, device)
+ volume = self.volume_api.get(context, volume_id)
+ return {'attachTime': volume['attach_time'],
+ 'device': volume['mountpoint'],
+ 'instanceId': instance_id,
'requestId': context.request_id,
- 'status': volume_ref['attach_status'],
- 'volumeId': volume_ref['id']}
+ 'status': volume['attach_status'],
+ 'volumeId': volume_id}
def detach_volume(self, context, volume_id, **kwargs):
- volume_ref = db.volume_get_by_ec2_id(context, volume_id)
- instance_ref = db.volume_get_instance(context.elevated(),
- volume_ref['id'])
- if not instance_ref:
- raise exception.ApiError(_("Volume isn't attached to anything!"))
- # TODO(vish): abstract status checking?
- if volume_ref['status'] == "available":
- raise exception.ApiError(_("Volume is already detached"))
- try:
- host = instance_ref['host']
- rpc.cast(context,
- db.queue_get_for(context, FLAGS.compute_topic, host),
- {"method": "detach_volume",
- "args": {"instance_id": instance_ref['id'],
- "volume_id": volume_ref['id']}})
- except exception.NotFound:
- # If the instance doesn't exist anymore,
- # then we need to call detach blind
- db.volume_detached(context)
- internal_id = instance_ref['internal_id']
- ec2_id = internal_id_to_ec2_id(internal_id)
- return {'attachTime': volume_ref['attach_time'],
- 'device': volume_ref['mountpoint'],
- 'instanceId': internal_id,
+ volume = self.volume_api.get(context, volume_id)
+ instance = self.compute_api.detach_volume(context, volume_id)
+ return {'attachTime': volume['attach_time'],
+ 'device': volume['mountpoint'],
+ 'instanceId': id_to_ec2_id(instance['id']),
'requestId': context.request_id,
- 'status': volume_ref['attach_status'],
- 'volumeId': volume_ref['id']}
+ 'status': volume['attach_status'],
+ 'volumeId': volume_id}
def _convert_to_set(self, lst, label):
if lst == None or lst == []:
@@ -629,16 +585,6 @@ class CloudController(object):
lst = [lst]
return [{label: x} for x in lst]
- def update_volume(self, context, volume_id, **kwargs):
- updatable_fields = ['display_name', 'display_description']
- changes = {}
- for field in updatable_fields:
- if field in kwargs:
- changes[field] = kwargs[field]
- if changes:
- db.volume_update(context, volume_id, kwargs)
- return True
-
def describe_instances(self, context, **kwargs):
return self._format_describe_instances(context)
@@ -646,24 +592,20 @@ class CloudController(object):
return {'reservationSet': self._format_instances(context)}
def _format_run_instances(self, context, reservation_id):
- i = self._format_instances(context, reservation_id)
+ i = self._format_instances(context, reservation_id=reservation_id)
assert len(i) == 1
return i[0]
- def _format_instances(self, context, reservation_id=None):
+ def _format_instances(self, context, **kwargs):
reservations = {}
- if reservation_id:
- instances = db.instance_get_all_by_reservation(context,
- reservation_id)
- else:
- instances = self.compute_api.get_instances(context)
+ instances = self.compute_api.get_all(context, **kwargs)
for instance in instances:
if not context.user.is_admin():
if instance['image_id'] == FLAGS.vpn_image_id:
continue
i = {}
- internal_id = instance['internal_id']
- ec2_id = internal_id_to_ec2_id(internal_id)
+ instance_id = instance['id']
+ ec2_id = id_to_ec2_id(instance_id)
i['instanceId'] = ec2_id
i['imageId'] = instance['image_id']
i['instanceState'] = {
@@ -716,8 +658,8 @@ class CloudController(object):
ec2_id = None
if (floating_ip_ref['fixed_ip']
and floating_ip_ref['fixed_ip']['instance']):
- internal_id = floating_ip_ref['fixed_ip']['instance']['ec2_id']
- ec2_id = internal_id_to_ec2_id(internal_id)
+ instance_id = floating_ip_ref['fixed_ip']['instance']['ec2_id']
+ ec2_id = id_to_ec2_id(instance_id)
address_rv = {'public_ip': address,
'instance_id': ec2_id}
if context.user.is_admin():
@@ -728,69 +670,25 @@ class CloudController(object):
return {'addressesSet': addresses}
def allocate_address(self, context, **kwargs):
- # check quota
- if quota.allowed_floating_ips(context, 1) < 1:
- logging.warn(_("Quota exceeeded for %s, tried to allocate "
- "address"),
- context.project_id)
- raise quota.QuotaError(_("Address quota exceeded. You cannot "
- "allocate any more addresses"))
- # NOTE(vish): We don't know which network host should get the ip
- # when we allocate, so just send it to any one. This
- # will probably need to move into a network supervisor
- # at some point.
- public_ip = rpc.call(context,
- FLAGS.network_topic,
- {"method": "allocate_floating_ip",
- "args": {"project_id": context.project_id}})
+ public_ip = self.network_api.allocate_floating_ip(context)
return {'addressSet': [{'publicIp': public_ip}]}
def release_address(self, context, public_ip, **kwargs):
- floating_ip_ref = db.floating_ip_get_by_address(context, public_ip)
- # NOTE(vish): We don't know which network host should get the ip
- # when we deallocate, so just send it to any one. This
- # will probably need to move into a network supervisor
- # at some point.
- rpc.cast(context,
- FLAGS.network_topic,
- {"method": "deallocate_floating_ip",
- "args": {"floating_address": floating_ip_ref['address']}})
+ self.network_api.release_floating_ip(context, public_ip)
return {'releaseResponse': ["Address released."]}
def associate_address(self, context, instance_id, public_ip, **kwargs):
- internal_id = ec2_id_to_internal_id(instance_id)
- instance_ref = self.compute_api.get_instance(context, internal_id)
- fixed_address = db.instance_get_fixed_address(context,
- instance_ref['id'])
- floating_ip_ref = db.floating_ip_get_by_address(context, public_ip)
- # NOTE(vish): Perhaps we should just pass this on to compute and
- # let compute communicate with network.
- network_topic = self.compute_api.get_network_topic(context,
- internal_id)
- rpc.cast(context,
- network_topic,
- {"method": "associate_floating_ip",
- "args": {"floating_address": floating_ip_ref['address'],
- "fixed_address": fixed_address}})
+ instance_id = ec2_id_to_id(instance_id)
+ self.compute_api.associate_floating_ip(context, instance_id, public_ip)
return {'associateResponse': ["Address associated."]}
def disassociate_address(self, context, public_ip, **kwargs):
- floating_ip_ref = db.floating_ip_get_by_address(context, public_ip)
- # NOTE(vish): Get the topic from the host name of the network of
- # the associated fixed ip.
- if not floating_ip_ref.get('fixed_ip'):
- raise exception.ApiError('Address is not associated.')
- host = floating_ip_ref['fixed_ip']['network']['host']
- topic = db.queue_get_for(context, FLAGS.network_topic, host)
- rpc.cast(context,
- topic,
- {"method": "disassociate_floating_ip",
- "args": {"floating_address": floating_ip_ref['address']}})
+ self.network_api.disassociate_floating_ip(context, public_ip)
return {'disassociateResponse': ["Address disassociated."]}
def run_instances(self, context, **kwargs):
max_count = int(kwargs.get('max_count', 1))
- instances = self.compute_api.create_instances(context,
+ instances = self.compute_api.create(context,
instance_types.get_by_type(kwargs.get('instance_type', None)),
kwargs['image_id'],
min_count=int(kwargs.get('min_count', max_count)),
@@ -798,13 +696,13 @@ class CloudController(object):
kernel_id=kwargs.get('kernel_id', None),
ramdisk_id=kwargs.get('ramdisk_id'),
display_name=kwargs.get('display_name'),
- description=kwargs.get('display_description'),
+ display_description=kwargs.get('display_description'),
key_name=kwargs.get('key_name'),
user_data=kwargs.get('user_data'),
security_group=kwargs.get('security_group'),
availability_zone=kwargs.get('placement', {}).get(
'AvailabilityZone'),
- generate_hostname=internal_id_to_ec2_id)
+ generate_hostname=id_to_ec2_id)
return self._format_run_instances(context,
instances[0]['reservation_id'])
@@ -813,27 +711,27 @@ class CloudController(object):
instance_id is a kwarg so its name cannot be modified."""
logging.debug("Going to start terminating instances")
for ec2_id in instance_id:
- internal_id = ec2_id_to_internal_id(ec2_id)
- self.compute_api.delete_instance(context, internal_id)
+ instance_id = ec2_id_to_id(ec2_id)
+ self.compute_api.delete(context, instance_id)
return True
def reboot_instances(self, context, instance_id, **kwargs):
"""instance_id is a list of instance ids"""
for ec2_id in instance_id:
- internal_id = ec2_id_to_internal_id(ec2_id)
- self.compute_api.reboot(context, internal_id)
+ instance_id = ec2_id_to_id(ec2_id)
+ self.compute_api.reboot(context, instance_id)
return True
def rescue_instance(self, context, instance_id, **kwargs):
"""This is an extension to the normal ec2_api"""
- internal_id = ec2_id_to_internal_id(instance_id)
- self.compute_api.rescue(context, internal_id)
+ instance_id = ec2_id_to_id(instance_id)
+ self.compute_api.rescue(context, instance_id)
return True
def unrescue_instance(self, context, instance_id, **kwargs):
"""This is an extension to the normal ec2_api"""
- internal_id = ec2_id_to_internal_id(instance_id)
- self.compute_api.unrescue(context, internal_id)
+ instance_id = ec2_id_to_id(instance_id)
+ self.compute_api.unrescue(context, instance_id)
return True
def update_instance(self, context, ec2_id, **kwargs):
@@ -843,24 +741,8 @@ class CloudController(object):
if field in kwargs:
changes[field] = kwargs[field]
if changes:
- internal_id = ec2_id_to_internal_id(ec2_id)
- inst = self.compute_api.get_instance(context, internal_id)
- db.instance_update(context, inst['id'], kwargs)
- return True
-
- def delete_volume(self, context, volume_id, **kwargs):
- # TODO: return error if not authorized
- volume_ref = db.volume_get_by_ec2_id(context, volume_id)
- if volume_ref['status'] != "available":
- raise exception.ApiError(_("Volume status must be available"))
- now = datetime.datetime.utcnow()
- db.volume_update(context, volume_ref['id'], {'status': 'deleting',
- 'terminated_at': now})
- host = volume_ref['host']
- rpc.cast(context,
- db.queue_get_for(context, FLAGS.volume_topic, host),
- {"method": "delete_volume",
- "args": {"volume_id": volume_ref['id']}})
+ instance_id = ec2_id_to_id(ec2_id)
+ self.compute_api.update(context, instance_id, **kwargs)
return True
def describe_images(self, context, image_id=None, **kwargs):
diff --git a/nova/api/openstack/images.py b/nova/api/openstack/images.py
index 867ee5a7e..0b239aab8 100644
--- a/nova/api/openstack/images.py
+++ b/nova/api/openstack/images.py
@@ -17,15 +17,14 @@
from webob import exc
+from nova import compute
from nova import flags
from nova import utils
from nova import wsgi
import nova.api.openstack
-import nova.image.service
-
from nova.api.openstack import common
from nova.api.openstack import faults
-from nova.compute import api as compute_api
+import nova.image.service
FLAGS = flags.FLAGS
@@ -131,7 +130,7 @@ class Controller(wsgi.Controller):
env = self._deserialize(req.body, req)
instance_id = env["image"]["serverId"]
name = env["image"]["name"]
- return compute_api.ComputeAPI().snapshot(context, instance_id, name)
+ return compute.API().snapshot(context, instance_id, name)
def update(self, req, id):
# Users may not modify public images, and that's all that
diff --git a/nova/api/openstack/servers.py b/nova/api/openstack/servers.py
index c5cbe21ef..10679ccb6 100644
--- a/nova/api/openstack/servers.py
+++ b/nova/api/openstack/servers.py
@@ -20,12 +20,12 @@ import traceback
from webob import exc
+from nova import compute
from nova import exception
from nova import wsgi
from nova.api.openstack import common
from nova.api.openstack import faults
from nova.auth import manager as auth_manager
-from nova.compute import api as compute_api
from nova.compute import instance_types
from nova.compute import power_state
import nova.api.openstack
@@ -51,7 +51,7 @@ def _translate_detail_keys(inst):
inst_dict = {}
mapped_keys = dict(status='state', imageId='image_id',
- flavorId='instance_type', name='display_name', id='internal_id')
+ flavorId='instance_type', name='display_name', id='id')
for k, v in mapped_keys.iteritems():
inst_dict[k] = inst[v]
@@ -67,7 +67,7 @@ def _translate_detail_keys(inst):
def _translate_keys(inst):
""" Coerces into dictionary format, excluding all model attributes
save for id and name """
- return dict(server=dict(id=inst['internal_id'], name=inst['display_name']))
+ return dict(server=dict(id=inst['id'], name=inst['display_name']))
class Controller(wsgi.Controller):
@@ -80,7 +80,7 @@ class Controller(wsgi.Controller):
"status", "progress"]}}}
def __init__(self):
- self.compute_api = compute_api.ComputeAPI()
+ self.compute_api = compute.API()
super(Controller, self).__init__()
def index(self, req):
@@ -96,8 +96,7 @@ class Controller(wsgi.Controller):
entity_maker - either _translate_detail_keys or _translate_keys
"""
- instance_list = self.compute_api.get_instances(
- req.environ['nova.context'])
+ instance_list = self.compute_api.get_all(req.environ['nova.context'])
limited_list = common.limited(instance_list, req)
res = [entity_maker(inst)['server'] for inst in limited_list]
return dict(servers=res)
@@ -105,8 +104,7 @@ class Controller(wsgi.Controller):
def show(self, req, id):
""" Returns server details by server id """
try:
- instance = self.compute_api.get_instance(
- req.environ['nova.context'], int(id))
+ instance = self.compute_api.get(req.environ['nova.context'], id)
return _translate_detail_keys(instance)
except exception.NotFound:
return faults.Fault(exc.HTTPNotFound())
@@ -114,8 +112,7 @@ class Controller(wsgi.Controller):
def delete(self, req, id):
""" Destroys a server """
try:
- self.compute_api.delete_instance(req.environ['nova.context'],
- int(id))
+ self.compute_api.delete(req.environ['nova.context'], id)
except exception.NotFound:
return faults.Fault(exc.HTTPNotFound())
return exc.HTTPAccepted()
@@ -128,12 +125,12 @@ class Controller(wsgi.Controller):
key_pair = auth_manager.AuthManager.get_key_pairs(
req.environ['nova.context'])[0]
- instances = self.compute_api.create_instances(
+ instances = self.compute_api.create(
req.environ['nova.context'],
instance_types.get_by_flavor_id(env['server']['flavorId']),
env['server']['imageId'],
display_name=env['server']['name'],
- description=env['server']['name'],
+ display_description=env['server']['name'],
key_name=key_pair['name'],
key_data=key_pair['public_key'])
return _translate_keys(instances[0])
@@ -151,10 +148,8 @@ class Controller(wsgi.Controller):
update_dict['display_name'] = inst_dict['server']['name']
try:
- ctxt = req.environ['nova.context']
- self.compute_api.update_instance(ctxt,
- id,
- **update_dict)
+ self.compute_api.update(req.environ['nova.context'], id,
+ **update_dict)
except exception.NotFound:
return faults.Fault(exc.HTTPNotFound())
return exc.HTTPNoContent()
@@ -175,6 +170,50 @@ class Controller(wsgi.Controller):
return faults.Fault(exc.HTTPUnprocessableEntity())
return exc.HTTPAccepted()
+ def lock(self, req, id):
+ """
+ lock the instance with id
+ admin only operation
+
+ """
+ context = req.environ['nova.context']
+ try:
+ self.compute_api.lock(context, id)
+ except:
+ readable = traceback.format_exc()
+ logging.error(_("Compute.api::lock %s"), readable)
+ return faults.Fault(exc.HTTPUnprocessableEntity())
+ return exc.HTTPAccepted()
+
+ def unlock(self, req, id):
+ """
+ unlock the instance with id
+ admin only operation
+
+ """
+ context = req.environ['nova.context']
+ try:
+ self.compute_api.unlock(context, id)
+ except:
+ readable = traceback.format_exc()
+ logging.error(_("Compute.api::unlock %s"), readable)
+ return faults.Fault(exc.HTTPUnprocessableEntity())
+ return exc.HTTPAccepted()
+
+ def get_lock(self, req, id):
+ """
+ return the boolean state of (instance with id)'s lock
+
+ """
+ context = req.environ['nova.context']
+ try:
+ self.compute_api.get_lock(context, id)
+ except:
+ readable = traceback.format_exc()
+ logging.error(_("Compute.api::get_lock %s"), readable)
+ return faults.Fault(exc.HTTPUnprocessableEntity())
+ return exc.HTTPAccepted()
+
def pause(self, req, id):
""" Permit Admins to Pause the server. """
ctxt = req.environ['nova.context']
@@ -227,4 +266,13 @@ class Controller(wsgi.Controller):
def actions(self, req, id):
"""Permit Admins to retrieve server actions."""
ctxt = req.environ["nova.context"]
- return self.compute_api.get_actions(ctxt, id)
+ items = self.compute_api.get_actions(ctxt, id)
+ actions = []
+ # TODO(jk0): Do not do pre-serialization here once the default
+ # serializer is updated
+ for item in items:
+ actions.append(dict(
+ created_at=str(item.created_at),
+ action=item.action,
+ error=item.error))
+ return dict(actions=actions)
diff --git a/nova/compute/__init__.py b/nova/compute/__init__.py
index a5df2ec1a..b94f971d1 100644
--- a/nova/compute/__init__.py
+++ b/nova/compute/__init__.py
@@ -16,17 +16,4 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""
-:mod:`nova.compute` -- Compute Nodes using LibVirt
-=====================================================
-
-.. automodule:: nova.compute
- :platform: Unix
- :synopsis: Thin wrapper around libvirt for VM mgmt.
-.. moduleauthor:: Jesse Andrews <jesse@ansolabs.com>
-.. moduleauthor:: Devin Carlen <devin.carlen@gmail.com>
-.. moduleauthor:: Vishvananda Ishaya <vishvananda@yahoo.com>
-.. moduleauthor:: Joshua McKenty <joshua@cognition.ca>
-.. moduleauthor:: Manish Singh <yosh@gimp.org>
-.. moduleauthor:: Andy Smith <andy@anarkystic.com>
-"""
+from nova.compute.api import API
diff --git a/nova/compute/api.py b/nova/compute/api.py
index 07c69bd31..015934ee7 100644
--- a/nova/compute/api.py
+++ b/nova/compute/api.py
@@ -17,7 +17,7 @@
# under the License.
"""
-Handles all API requests relating to instances (guest vms).
+Handles all requests relating to instances (guest vms).
"""
import datetime
@@ -27,36 +27,41 @@ import time
from nova import db
from nova import exception
from nova import flags
+from nova import network
from nova import quota
from nova import rpc
from nova import utils
+from nova import volume
from nova.compute import instance_types
from nova.db import base
FLAGS = flags.FLAGS
-def generate_default_hostname(internal_id):
+def generate_default_hostname(instance_id):
"""Default function to generate a hostname given an instance reference."""
- return str(internal_id)
+ return str(instance_id)
-class ComputeAPI(base.Base):
+class API(base.Base):
"""API for interacting with the compute manager."""
- def __init__(self, network_manager=None, image_service=None, **kwargs):
- if not network_manager:
- network_manager = utils.import_object(FLAGS.network_manager)
- self.network_manager = network_manager
+ def __init__(self, image_service=None, network_api=None, volume_api=None,
+ **kwargs):
if not image_service:
image_service = utils.import_object(FLAGS.image_service)
self.image_service = image_service
- super(ComputeAPI, self).__init__(**kwargs)
+ if not network_api:
+ network_api = network.API()
+ self.network_api = network_api
+ if not volume_api:
+ volume_api = volume.API()
+ self.volume_api = volume_api
+ super(API, self).__init__(**kwargs)
def get_network_topic(self, context, instance_id):
try:
- instance = self.db.instance_get_by_internal_id(context,
- instance_id)
+ instance = self.get(context, instance_id)
except exception.NotFound as e:
logging.warning("Instance %d was not found in get_network_topic",
instance_id)
@@ -70,18 +75,18 @@ class ComputeAPI(base.Base):
topic,
{"method": "get_network_topic", "args": {'fake': 1}})
- def create_instances(self, context, instance_type, image_id, min_count=1,
- max_count=1, kernel_id=None, ramdisk_id=None,
- display_name='', description='', key_name=None,
- key_data=None, security_group='default',
- availability_zone=None,
- user_data=None,
- generate_hostname=generate_default_hostname):
- """Create the number of instances requested if quote and
+ def create(self, context, instance_type,
+ image_id, kernel_id=None, ramdisk_id=None,
+ min_count=1, max_count=1,
+ display_name='', display_description='',
+ key_name=None, key_data=None, security_group='default',
+ availability_zone=None, user_data=None,
+ generate_hostname=generate_default_hostname):
+ """Create the number of instances requested if quota and
other arguments check out ok."""
- num_instances = quota.allowed_instances(context, max_count,
- instance_type)
+ type_data = instance_types.INSTANCE_TYPES[instance_type]
+ num_instances = quota.allowed_instances(context, max_count, type_data)
if num_instances < min_count:
logging.warn("Quota exceeeded for %s, tried to run %s instances",
context.project_id, min_count)
@@ -96,7 +101,7 @@ class ComputeAPI(base.Base):
kernel_id = image.get('kernelId', None)
if ramdisk_id is None:
ramdisk_id = image.get('ramdiskId', None)
- #No kernel and ramdisk for raw images
+ # No kernel and ramdisk for raw images
if kernel_id == str(FLAGS.null_kernel):
kernel_id = None
ramdisk_id = None
@@ -124,7 +129,6 @@ class ComputeAPI(base.Base):
key_pair = db.key_pair_get(context, context.user_id, key_name)
key_data = key_pair['public_key']
- type_data = instance_types.INSTANCE_TYPES[instance_type]
base_options = {
'reservation_id': utils.generate_uid('r'),
'image_id': image_id,
@@ -139,10 +143,11 @@ class ComputeAPI(base.Base):
'vcpus': type_data['vcpus'],
'local_gb': type_data['local_gb'],
'display_name': display_name,
- 'display_description': description,
+ 'display_description': display_description,
'user_data': user_data or '',
'key_name': key_name,
'key_data': key_data,
+ 'locked': False,
'availability_zone': availability_zone}
elevated = context.elevated()
@@ -154,7 +159,6 @@ class ComputeAPI(base.Base):
**base_options)
instance = self.db.instance_create(context, instance)
instance_id = instance['id']
- internal_id = instance['internal_id']
elevated = context.elevated()
if not security_groups:
@@ -165,11 +169,11 @@ class ComputeAPI(base.Base):
security_group_id)
# Set sane defaults if not specified
- updates = dict(hostname=generate_hostname(internal_id))
+ updates = dict(hostname=generate_hostname(instance_id))
if 'display_name' not in instance:
- updates['display_name'] = "Server %s" % internal_id
+ updates['display_name'] = "Server %s" % instance_id
- instance = self.update_instance(context, instance_id, **updates)
+ instance = self.update(context, instance_id, **updates)
instances.append(instance)
logging.debug(_("Casting to scheduler for %s/%s's instance %s"),
@@ -199,7 +203,7 @@ class ComputeAPI(base.Base):
'project_id': context.project_id}
db.security_group_create(context, values)
- def update_instance(self, context, instance_id, **kwargs):
+ def update(self, context, instance_id, **kwargs):
"""Updates the instance in the datastore.
:param context: The security context
@@ -213,134 +217,204 @@ class ComputeAPI(base.Base):
"""
return self.db.instance_update(context, instance_id, kwargs)
- def delete_instance(self, context, instance_id):
- logging.debug("Going to try and terminate %d" % instance_id)
+ def delete(self, context, instance_id):
+ logging.debug("Going to try and terminate %s" % instance_id)
try:
- instance = self.db.instance_get_by_internal_id(context,
- instance_id)
+ instance = self.get(context, instance_id)
except exception.NotFound as e:
- logging.warning(_("Instance %d was not found during terminate"),
+ logging.warning(_("Instance %s was not found during terminate"),
instance_id)
raise e
if (instance['state_description'] == 'terminating'):
- logging.warning(_("Instance %d is already being terminated"),
+ logging.warning(_("Instance %s is already being terminated"),
instance_id)
return
- self.update_instance(context,
- instance['id'],
- state_description='terminating',
- state=0,
- terminated_at=datetime.datetime.utcnow())
+ self.update(context,
+ instance['id'],
+ state_description='terminating',
+ state=0,
+ terminated_at=datetime.datetime.utcnow())
host = instance['host']
if host:
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "terminate_instance",
- "args": {"instance_id": instance['id']}})
+ "args": {"instance_id": instance_id}})
else:
- self.db.instance_destroy(context, instance['id'])
-
- def get_instances(self, context, project_id=None):
- """Get all instances, possibly filtered by project ID or
- user ID. If there is no filter and the context is an admin,
- it will retreive all instances in the system."""
+ self.db.instance_destroy(context, instance_id)
+
+ def get(self, context, instance_id):
+ """Get a single instance with the given ID."""
+ return self.db.instance_get_by_id(context, instance_id)
+
+ def get_all(self, context, project_id=None, reservation_id=None,
+ fixed_ip=None):
+ """Get all instances, possibly filtered by one of the
+ given parameters. If there is no filter and the context is
+ an admin, it will retreive all instances in the system."""
+ if reservation_id is not None:
+ return self.db.instance_get_all_by_reservation(context,
+ reservation_id)
+ if fixed_ip is not None:
+ return self.db.fixed_ip_get_instance(context, fixed_ip)
if project_id or not context.is_admin:
if not context.project:
return self.db.instance_get_all_by_user(context,
context.user_id)
if project_id is None:
project_id = context.project_id
- return self.db.instance_get_all_by_project(context, project_id)
+ return self.db.instance_get_all_by_project(context,
+ project_id)
return self.db.instance_get_all(context)
- def get_instance(self, context, instance_id):
- return self.db.instance_get_by_internal_id(context, instance_id)
-
def snapshot(self, context, instance_id, name):
"""Snapshot the given instance."""
- instance = self.db.instance_get_by_internal_id(context, instance_id)
+ instance = self.get(context, instance_id)
host = instance['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "snapshot_instance",
- "args": {"instance_id": instance['id'], "name": name}})
+ "args": {"instance_id": instance_id, "name": name}})
def reboot(self, context, instance_id):
"""Reboot the given instance."""
- instance = self.db.instance_get_by_internal_id(context, instance_id)
+ instance = self.get(context, instance_id)
host = instance['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "reboot_instance",
- "args": {"instance_id": instance['id']}})
+ "args": {"instance_id": instance_id}})
def pause(self, context, instance_id):
"""Pause the given instance."""
- instance = self.db.instance_get_by_internal_id(context, instance_id)
+ instance = self.get(context, instance_id)
host = instance['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "pause_instance",
- "args": {"instance_id": instance['id']}})
+ "args": {"instance_id": instance_id}})
def unpause(self, context, instance_id):
"""Unpause the given instance."""
- instance = self.db.instance_get_by_internal_id(context, instance_id)
+ instance = self.get(context, instance_id)
host = instance['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "unpause_instance",
- "args": {"instance_id": instance['id']}})
+ "args": {"instance_id": instance_id}})
def get_diagnostics(self, context, instance_id):
"""Retrieve diagnostics for the given instance."""
- instance = self.db.instance_get_by_internal_id(context, instance_id)
+ instance = self.get(context, instance_id)
host = instance["host"]
return rpc.call(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "get_diagnostics",
- "args": {"instance_id": instance["id"]}})
+ "args": {"instance_id": instance_id}})
def get_actions(self, context, instance_id):
"""Retrieve actions for the given instance."""
- instance = self.db.instance_get_by_internal_id(context, instance_id)
- return self.db.instance_get_actions(context, instance["id"])
+ return self.db.instance_get_actions(context, instance_id)
def suspend(self, context, instance_id):
"""suspend the instance with instance_id"""
- instance = self.db.instance_get_by_internal_id(context, instance_id)
+ instance = self.get(context, instance_id)
host = instance['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "suspend_instance",
- "args": {"instance_id": instance['id']}})
+ "args": {"instance_id": instance_id}})
def resume(self, context, instance_id):
"""resume the instance with instance_id"""
- instance = self.db.instance_get_by_internal_id(context, instance_id)
+ instance = self.get(context, instance_id)
host = instance['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "resume_instance",
- "args": {"instance_id": instance['id']}})
+ "args": {"instance_id": instance_id}})
def rescue(self, context, instance_id):
"""Rescue the given instance."""
- instance = self.db.instance_get_by_internal_id(context, instance_id)
+ instance = self.get(context, instance_id)
host = instance['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "rescue_instance",
- "args": {"instance_id": instance['id']}})
+ "args": {"instance_id": instance_id}})
def unrescue(self, context, instance_id):
"""Unrescue the given instance."""
- instance = self.db.instance_get_by_internal_id(context, instance_id)
+ instance = self.get(context, instance_id)
host = instance['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "unrescue_instance",
+ "args": {"instance_id": instance_id}})
+
+ def lock(self, context, instance_id):
+ """
+ lock the instance with instance_id
+
+ """
+ instance = self.get_instance(context, instance_id)
+ host = instance['host']
+ rpc.cast(context,
+ self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ {"method": "lock_instance",
"args": {"instance_id": instance['id']}})
+
+ def unlock(self, context, instance_id):
+ """
+ unlock the instance with instance_id
+
+ """
+ instance = self.get_instance(context, instance_id)
+ host = instance['host']
+ rpc.cast(context,
+ self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ {"method": "unlock_instance",
+ "args": {"instance_id": instance['id']}})
+
+ def get_lock(self, context, instance_id):
+ """
+ return the boolean state of (instance with instance_id)'s lock
+
+ """
+ instance = self.get_instance(context, instance_id)
+ return instance['locked']
+
+ def attach_volume(self, context, instance_id, volume_id, device):
+ if not re.match("^/dev/[a-z]d[a-z]+$", device):
+ raise exception.ApiError(_("Invalid device specified: %s. "
+ "Example device: /dev/vdb") % device)
+ self.volume_api.check_attach(context, volume_id)
+ instance = self.get(context, instance_id)
+ host = instance['host']
+ rpc.cast(context,
+ self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ {"method": "attach_volume",
+ "args": {"volume_id": volume_id,
+ "instance_id": instance_id,
+ "mountpoint": device}})
+
+ def detach_volume(self, context, volume_id):
+ instance = self.db.volume_get_instance(context.elevated(), volume_id)
+ if not instance:
+ raise exception.ApiError(_("Volume isn't attached to anything!"))
+ self.volume_api.check_detach(context, volume_id)
+ host = instance['host']
+ rpc.cast(context,
+ self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ {"method": "detach_volume",
+ "args": {"instance_id": instance['id'],
+ "volume_id": volume_id}})
+ return instance
+
+ def associate_floating_ip(self, context, instance_id, address):
+ instance = self.get(context, instance_id)
+ self.network_api.associate_floating_ip(context, address,
+ instance['fixed_ip'])
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index c9aff75ac..d5136eb26 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -36,6 +36,7 @@ terminating it.
import datetime
import logging
+import functools
from nova import exception
from nova import flags
@@ -53,6 +54,38 @@ flags.DEFINE_string('stub_network', False,
'Stub network related code')
+def checks_instance_lock(function):
+ """
+ decorator used for preventing action against locked instances
+ unless, of course, you happen to be admin
+
+ """
+
+ @functools.wraps(function)
+ def decorated_function(self, context, instance_id, *args, **kwargs):
+
+ logging.info(_("check_instance_lock: decorating: |%s|"), function)
+ logging.info(_("check_instance_lock: arguments: |%s| |%s| |%s|"),
+ self,
+ context,
+ instance_id)
+ locked = self.get_lock(context, instance_id)
+ admin = context.is_admin
+ logging.info(_("check_instance_lock: locked: |%s|"), locked)
+ logging.info(_("check_instance_lock: admin: |%s|"), admin)
+
+ # if admin or unlocked call function otherwise log error
+ if admin or not locked:
+ logging.info(_("check_instance_lock: executing: |%s|"), function)
+ function(self, context, instance_id, *args, **kwargs)
+ else:
+ logging.error(_("check_instance_lock: not executing |%s|"),
+ function)
+ return False
+
+ return decorated_function
+
+
class ComputeManager(manager.Manager):
"""Manages the running instances from creation to destruction."""
@@ -158,6 +191,7 @@ class ComputeManager(manager.Manager):
self._update_state(context, instance_id)
@exception.wrap_exception
+ @checks_instance_lock
def terminate_instance(self, context, instance_id):
"""Terminate an instance on this machine."""
context = context.elevated()
@@ -202,6 +236,7 @@ class ComputeManager(manager.Manager):
self.db.instance_destroy(context, instance_id)
@exception.wrap_exception
+ @checks_instance_lock
def reboot_instance(self, context, instance_id):
"""Reboot an instance on this server."""
context = context.elevated()
@@ -211,7 +246,7 @@ class ComputeManager(manager.Manager):
if instance_ref['state'] != power_state.RUNNING:
logging.warn(_('trying to reboot a non-running '
'instance: %s (state: %s excepted: %s)'),
- instance_ref['internal_id'],
+ instance_id,
instance_ref['state'],
power_state.RUNNING)
@@ -239,20 +274,20 @@ class ComputeManager(manager.Manager):
if instance_ref['state'] != power_state.RUNNING:
logging.warn(_('trying to snapshot a non-running '
'instance: %s (state: %s excepted: %s)'),
- instance_ref['internal_id'],
+ instance_id,
instance_ref['state'],
power_state.RUNNING)
self.driver.snapshot(instance_ref, name)
@exception.wrap_exception
+ @checks_instance_lock
def rescue_instance(self, context, instance_id):
"""Rescue an instance on this server."""
context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id)
- logging.debug(_('instance %s: rescuing'),
- instance_ref['internal_id'])
+ logging.debug(_('instance %s: rescuing'), instance_id)
self.db.instance_set_state(context,
instance_id,
power_state.NOSTATE,
@@ -262,13 +297,13 @@ class ComputeManager(manager.Manager):
self._update_state(context, instance_id)
@exception.wrap_exception
+ @checks_instance_lock
def unrescue_instance(self, context, instance_id):
"""Rescue an instance on this server."""
context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id)
- logging.debug(_('instance %s: unrescuing'),
- instance_ref['internal_id'])
+ logging.debug(_('instance %s: unrescuing'), instance_id)
self.db.instance_set_state(context,
instance_id,
power_state.NOSTATE,
@@ -282,13 +317,13 @@ class ComputeManager(manager.Manager):
self._update_state(context, instance_id)
@exception.wrap_exception
+ @checks_instance_lock
def pause_instance(self, context, instance_id):
"""Pause an instance on this server."""
context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id)
- logging.debug('instance %s: pausing',
- instance_ref['internal_id'])
+ logging.debug('instance %s: pausing', instance_id)
self.db.instance_set_state(context,
instance_id,
power_state.NOSTATE,
@@ -300,13 +335,13 @@ class ComputeManager(manager.Manager):
result))
@exception.wrap_exception
+ @checks_instance_lock
def unpause_instance(self, context, instance_id):
"""Unpause a paused instance on this server."""
context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id)
- logging.debug('instance %s: unpausing',
- instance_ref['internal_id'])
+ logging.debug('instance %s: unpausing', instance_id)
self.db.instance_set_state(context,
instance_id,
power_state.NOSTATE,
@@ -324,16 +359,20 @@ class ComputeManager(manager.Manager):
if instance_ref["state"] == power_state.RUNNING:
logging.debug(_("instance %s: retrieving diagnostics"),
- instance_ref["internal_id"])
+ instance_id)
return self.driver.get_diagnostics(instance_ref)
+ @exception.wrap_exception
+ @checks_instance_lock
def suspend_instance(self, context, instance_id):
- """suspend the instance with instance_id"""
+ """
+ suspend the instance with instance_id
+
+ """
context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id)
- logging.debug(_('instance %s: suspending'),
- instance_ref['internal_id'])
+ logging.debug(_('instance %s: suspending'), instance_id)
self.db.instance_set_state(context, instance_id,
power_state.NOSTATE,
'suspending')
@@ -344,12 +383,16 @@ class ComputeManager(manager.Manager):
result))
@exception.wrap_exception
+ @checks_instance_lock
def resume_instance(self, context, instance_id):
- """resume the suspended instance with instance_id"""
+ """
+ resume the suspended instance with instance_id
+
+ """
context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id)
- logging.debug(_('instance %s: resuming'), instance_ref['internal_id'])
+ logging.debug(_('instance %s: resuming'), instance_id)
self.db.instance_set_state(context, instance_id,
power_state.NOSTATE,
'resuming')
@@ -360,6 +403,41 @@ class ComputeManager(manager.Manager):
result))
@exception.wrap_exception
+ def lock_instance(self, context, instance_id):
+ """
+ lock the instance with instance_id
+
+ """
+ context = context.elevated()
+ instance_ref = self.db.instance_get(context, instance_id)
+
+ logging.debug(_('instance %s: locking'), instance_id)
+ self.db.instance_update(context, instance_id, {'locked': True})
+
+ @exception.wrap_exception
+ def unlock_instance(self, context, instance_id):
+ """
+ unlock the instance with instance_id
+
+ """
+ context = context.elevated()
+ instance_ref = self.db.instance_get(context, instance_id)
+
+ logging.debug(_('instance %s: unlocking'), instance_id)
+ self.db.instance_update(context, instance_id, {'locked': False})
+
+ @exception.wrap_exception
+ def get_lock(self, context, instance_id):
+ """
+ return the boolean state of (instance with instance_id)'s lock
+
+ """
+ context = context.elevated()
+ logging.debug(_('instance %s: getting locked state'), instance_id)
+ instance_ref = self.db.instance_get(context, instance_id)
+ return instance_ref['locked']
+
+ @exception.wrap_exception
def get_console_output(self, context, instance_id):
"""Send the console output for an instance."""
context = context.elevated()
@@ -369,6 +447,7 @@ class ComputeManager(manager.Manager):
return self.driver.get_console_output(instance_ref)
@exception.wrap_exception
+ @checks_instance_lock
def attach_volume(self, context, instance_id, volume_id, mountpoint):
"""Attach a volume to an instance."""
context = context.elevated()
@@ -398,6 +477,7 @@ class ComputeManager(manager.Manager):
return True
@exception.wrap_exception
+ @checks_instance_lock
def detach_volume(self, context, instance_id, volume_id):
"""Detach a volume from an instance."""
context = context.elevated()
diff --git a/nova/db/api.py b/nova/db/api.py
index bab698d9d..0fa5eb1e8 100644
--- a/nova/db/api.py
+++ b/nova/db/api.py
@@ -353,9 +353,9 @@ def instance_get_project_vpn(context, project_id):
return IMPL.instance_get_project_vpn(context, project_id)
-def instance_get_by_internal_id(context, internal_id):
- """Get an instance by internal id."""
- return IMPL.instance_get_by_internal_id(context, internal_id)
+def instance_get_by_id(context, instance_id):
+ """Get an instance by id."""
+ return IMPL.instance_get_by_id(context, instance_id)
def instance_is_vpn(context, instance_id):
@@ -719,7 +719,7 @@ def security_group_get_all(context):
def security_group_get(context, security_group_id):
- """Get security group by its internal id."""
+ """Get security group by its id."""
return IMPL.security_group_get(context, security_group_id)
diff --git a/nova/db/sqlalchemy/__init__.py b/nova/db/sqlalchemy/__init__.py
index 3288ebd20..22aa1cfe6 100644
--- a/nova/db/sqlalchemy/__init__.py
+++ b/nova/db/sqlalchemy/__init__.py
@@ -19,6 +19,25 @@
"""
SQLAlchemy database backend
"""
+import logging
+import time
+
+from sqlalchemy.exc import OperationalError
+
+from nova import flags
from nova.db.sqlalchemy import models
-models.register_models()
+
+FLAGS = flags.FLAGS
+
+
+for i in xrange(FLAGS.sql_max_retries):
+ if i > 0:
+ time.sleep(FLAGS.sql_retry_interval)
+
+ try:
+ models.register_models()
+ break
+ except OperationalError:
+ logging.exception(_("Data store is unreachable."
+ " Trying again in %d seconds.") % FLAGS.sql_retry_interval)
diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py
index 72369d187..45427597a 100644
--- a/nova/db/sqlalchemy/api.py
+++ b/nova/db/sqlalchemy/api.py
@@ -19,7 +19,6 @@
Implementation of SQLAlchemy backend.
"""
-import random
import warnings
from nova import db
@@ -606,30 +605,18 @@ def fixed_ip_update(context, address, values):
###################
-#TODO(gundlach): instance_create and volume_create are nearly identical
-#and should be refactored. I expect there are other copy-and-paste
-#functions between the two of them as well.
-
-
@require_context
def instance_create(context, values):
"""Create a new Instance record in the database.
context - request context object
values - dict containing column values.
- 'internal_id' is auto-generated and should not be specified.
"""
instance_ref = models.Instance()
instance_ref.update(values)
session = get_session()
with session.begin():
- while instance_ref.internal_id == None:
- # Instances have integer internal ids.
- internal_id = random.randint(0, 2 ** 31 - 1)
- if not instance_internal_id_exists(context, internal_id,
- session=session):
- instance_ref.internal_id = internal_id
instance_ref.save(session=session)
return instance_ref
@@ -751,38 +738,29 @@ def instance_get_project_vpn(context, project_id):
@require_context
-def instance_get_by_internal_id(context, internal_id):
+def instance_get_by_id(context, instance_id):
session = get_session()
if is_admin_context(context):
result = session.query(models.Instance).\
options(joinedload('security_groups')).\
- filter_by(internal_id=internal_id).\
+ filter_by(id=instance_id).\
filter_by(deleted=can_read_deleted(context)).\
first()
elif is_user_context(context):
result = session.query(models.Instance).\
options(joinedload('security_groups')).\
filter_by(project_id=context.project_id).\
- filter_by(internal_id=internal_id).\
+ filter_by(id=instance_id).\
filter_by(deleted=False).\
first()
if not result:
- raise exception.NotFound(_('Instance %s not found') % (internal_id))
+ raise exception.NotFound(_('Instance %s not found') % (instance_id))
return result
@require_context
-def instance_internal_id_exists(context, internal_id, session=None):
- if not session:
- session = get_session()
- return session.query(exists().\
- where(models.Instance.internal_id == internal_id)).\
- one()[0]
-
-
-@require_context
def instance_get_fixed_address(context, instance_id):
session = get_session()
with session.begin():
@@ -862,12 +840,9 @@ def instance_action_create(context, values):
def instance_get_actions(context, instance_id):
"""Return the actions associated to the given instance id"""
session = get_session()
- actions = {}
- for action in session.query(models.InstanceActions).\
+ return session.query(models.InstanceActions).\
filter_by(instance_id=instance_id).\
- all():
- actions[action.action] = action.error
- return actions
+ all()
###################
@@ -1317,10 +1292,6 @@ def volume_create(context, values):
session = get_session()
with session.begin():
- while volume_ref.ec2_id == None:
- ec2_id = utils.generate_uid('vol')
- if not volume_ec2_id_exists(context, ec2_id, session=session):
- volume_ref.ec2_id = ec2_id
volume_ref.save(session=session)
return volume_ref
@@ -1418,41 +1389,6 @@ def volume_get_all_by_project(context, project_id):
all()
-@require_context
-def volume_get_by_ec2_id(context, ec2_id):
- session = get_session()
- result = None
-
- if is_admin_context(context):
- result = session.query(models.Volume).\
- filter_by(ec2_id=ec2_id).\
- filter_by(deleted=can_read_deleted(context)).\
- first()
- elif is_user_context(context):
- result = session.query(models.Volume).\
- filter_by(project_id=context.project_id).\
- filter_by(ec2_id=ec2_id).\
- filter_by(deleted=False).\
- first()
- else:
- raise exception.NotAuthorized()
-
- if not result:
- raise exception.NotFound(_('Volume %s not found') % ec2_id)
-
- return result
-
-
-@require_context
-def volume_ec2_id_exists(context, ec2_id, session=None):
- if not session:
- session = get_session()
-
- return session.query(exists().\
- where(models.Volume.id == ec2_id)).\
- one()[0]
-
-
@require_admin_context
def volume_get_instance(context, volume_id):
session = get_session()
diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py
index 0c9c387fc..1ed366127 100644
--- a/nova/db/sqlalchemy/models.py
+++ b/nova/db/sqlalchemy/models.py
@@ -164,11 +164,13 @@ class Certificate(BASE, NovaBase):
class Instance(BASE, NovaBase):
"""Represents a guest vm."""
__tablename__ = 'instances'
- id = Column(Integer, primary_key=True)
- internal_id = Column(Integer, unique=True)
+ id = Column(Integer, primary_key=True, autoincrement=True)
- admin_pass = Column(String(255))
+ @property
+ def name(self):
+ return "instance-%08x" % self.id
+ admin_pass = Column(String(255))
user_id = Column(String(255))
project_id = Column(String(255))
@@ -180,10 +182,6 @@ class Instance(BASE, NovaBase):
def project(self):
return auth.manager.AuthManager().get_project(self.project_id)
- @property
- def name(self):
- return "instance-%d" % self.internal_id
-
image_id = Column(String(255))
kernel_id = Column(String(255))
ramdisk_id = Column(String(255))
@@ -226,6 +224,8 @@ class Instance(BASE, NovaBase):
display_name = Column(String(255))
display_description = Column(String(255))
+ locked = Column(Boolean)
+
# TODO(vish): see Ewan's email about state improvements, probably
# should be in a driver base class or some such
# vmstate_state = running, halted, suspended, paused
@@ -251,8 +251,11 @@ class InstanceActions(BASE, NovaBase):
class Volume(BASE, NovaBase):
"""Represents a block storage device that can be attached to a vm."""
__tablename__ = 'volumes'
- id = Column(Integer, primary_key=True)
- ec2_id = Column(String(12), unique=True)
+ id = Column(Integer, primary_key=True, autoincrement=True)
+
+ @property
+ def name(self):
+ return "volume-%08x" % self.id
user_id = Column(String(255))
project_id = Column(String(255))
@@ -278,10 +281,6 @@ class Volume(BASE, NovaBase):
display_name = Column(String(255))
display_description = Column(String(255))
- @property
- def name(self):
- return self.ec2_id
-
class Quota(BASE, NovaBase):
"""Represents quota overrides for a project."""
@@ -545,7 +544,8 @@ def register_models():
"""Register Models and create metadata.
Called from nova.db.sqlalchemy.__init__ as part of loading the driver,
- it will never need to be called explicitly elsewhere.
+ it will never need to be called explicitly elsewhere unless the
+ connection is lost and needs to be reestablished.
"""
from sqlalchemy import create_engine
models = (Service, Instance, InstanceActions,
diff --git a/nova/flags.py b/nova/flags.py
index e872ba217..4b7334927 100644
--- a/nova/flags.py
+++ b/nova/flags.py
@@ -266,6 +266,8 @@ DEFINE_string('sql_connection',
DEFINE_string('sql_idle_timeout',
'3600',
'timeout for idle sql database connections')
+DEFINE_integer('sql_max_retries', 12, 'sql connection attempts')
+DEFINE_integer('sql_retry_interval', 10, 'sql connection retry interval')
DEFINE_string('compute_manager', 'nova.compute.manager.ComputeManager',
'Manager for compute')
diff --git a/nova/image/glance.py b/nova/image/glance.py
index cc3192e7c..cb3936df1 100644
--- a/nova/image/glance.py
+++ b/nova/image/glance.py
@@ -24,7 +24,6 @@ import urlparse
import webob.exc
-from nova.compute import api as compute_api
from nova import utils
from nova import flags
from nova import exception
diff --git a/nova/network/__init__.py b/nova/network/__init__.py
index dcc54db09..6eb3e3ef6 100644
--- a/nova/network/__init__.py
+++ b/nova/network/__init__.py
@@ -16,17 +16,4 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""
-:mod:`nova.network` -- Network Nodes
-=====================================================
-
-.. automodule:: nova.network
- :platform: Unix
- :synopsis: Network is responsible for managing networking
-.. moduleauthor:: Jesse Andrews <jesse@ansolabs.com>
-.. moduleauthor:: Devin Carlen <devin.carlen@gmail.com>
-.. moduleauthor:: Vishvananda Ishaya <vishvananda@yahoo.com>
-.. moduleauthor:: Joshua McKenty <joshua@cognition.ca>
-.. moduleauthor:: Manish Singh <yosh@gimp.org>
-.. moduleauthor:: Andy Smith <andy@anarkystic.com>
-"""
+from nova.network.api import API
diff --git a/nova/network/api.py b/nova/network/api.py
new file mode 100644
index 000000000..cbd912047
--- /dev/null
+++ b/nova/network/api.py
@@ -0,0 +1,87 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Handles all requests relating to instances (guest vms).
+"""
+
+import logging
+
+from nova import db
+from nova import flags
+from nova import quota
+from nova import rpc
+from nova.db import base
+
+FLAGS = flags.FLAGS
+
+
+class API(base.Base):
+ """API for interacting with the network manager."""
+
+ def allocate_floating_ip(self, context):
+ if quota.allowed_floating_ips(context, 1) < 1:
+ logging.warn(_("Quota exceeeded for %s, tried to allocate "
+ "address"),
+ context.project_id)
+ raise quota.QuotaError(_("Address quota exceeded. You cannot "
+ "allocate any more addresses"))
+ # NOTE(vish): We don't know which network host should get the ip
+ # when we allocate, so just send it to any one. This
+ # will probably need to move into a network supervisor
+ # at some point.
+ return rpc.call(context,
+ FLAGS.network_topic,
+ {"method": "allocate_floating_ip",
+ "args": {"project_id": context.project_id}})
+
+ def release_floating_ip(self, context, address):
+ floating_ip = self.db.floating_ip_get_by_address(context, address)
+ # NOTE(vish): We don't know which network host should get the ip
+ # when we deallocate, so just send it to any one. This
+ # will probably need to move into a network supervisor
+ # at some point.
+ rpc.cast(context,
+ FLAGS.network_topic,
+ {"method": "deallocate_floating_ip",
+ "args": {"floating_address": floating_ip['address']}})
+
+ def associate_floating_ip(self, context, floating_ip, fixed_ip):
+ if isinstance(fixed_ip, str) or isinstance(fixed_ip, unicode):
+ fixed_ip = self.db.fixed_ip_get_by_address(context, fixed_ip)
+ floating_ip = self.db.floating_ip_get_by_address(context, floating_ip)
+ # NOTE(vish): Perhaps we should just pass this on to compute and
+ # let compute communicate with network.
+ host = fixed_ip['network']['host']
+ rpc.cast(context,
+ self.db.queue_get_for(context, FLAGS.network_topic, host),
+ {"method": "associate_floating_ip",
+ "args": {"floating_address": floating_ip['address'],
+ "fixed_address": fixed_ip['address']}})
+
+ def disassociate_floating_ip(self, context, address):
+ floating_ip = self.db.floating_ip_get_by_address(context, address)
+ if not floating_ip.get('fixed_ip'):
+ raise exception.ApiError('Address is not associated.')
+ # NOTE(vish): Get the topic from the host name of the network of
+ # the associated fixed ip.
+ host = floating_ip['fixed_ip']['network']['host']
+ rpc.cast(context,
+ self.db.queue_get_for(context, FLAGS.network_topic, host),
+ {"method": "disassociate_floating_ip",
+ "args": {"floating_address": floating_ip['address']}})
diff --git a/nova/quota.py b/nova/quota.py
index f6ca9f77c..3884eb308 100644
--- a/nova/quota.py
+++ b/nova/quota.py
@@ -22,7 +22,6 @@ Quotas for instances, volumes, and floating ips
from nova import db
from nova import exception
from nova import flags
-from nova.compute import instance_types
FLAGS = flags.FLAGS
@@ -63,10 +62,9 @@ def allowed_instances(context, num_instances, instance_type):
quota = get_quota(context, project_id)
allowed_instances = quota['instances'] - used_instances
allowed_cores = quota['cores'] - used_cores
- type_cores = instance_types.INSTANCE_TYPES[instance_type]['vcpus']
- num_cores = num_instances * type_cores
+ num_cores = num_instances * instance_type['vcpus']
allowed_instances = min(allowed_instances,
- int(allowed_cores // type_cores))
+ int(allowed_cores // instance_type['vcpus']))
return min(num_instances, allowed_instances)
diff --git a/nova/service.py b/nova/service.py
index f1f90742f..7203430c6 100644
--- a/nova/service.py
+++ b/nova/service.py
@@ -24,17 +24,21 @@ import inspect
import logging
import os
import sys
+import time
from eventlet import event
from eventlet import greenthread
from eventlet import greenpool
+from sqlalchemy.exc import OperationalError
+
from nova import context
from nova import db
from nova import exception
from nova import flags
from nova import rpc
from nova import utils
+from nova.db.sqlalchemy import models
FLAGS = flags.FLAGS
@@ -204,6 +208,14 @@ class Service(object):
self.model_disconnected = True
logging.exception(_("model server went away"))
+ try:
+ models.register_models()
+ except OperationalError:
+ logging.exception(_("Data store is unreachable."
+ " Trying again in %d seconds.") %
+ FLAGS.sql_retry_interval)
+ time.sleep(FLAGS.sql_retry_interval)
+
def serve(*services):
argv = FLAGS(sys.argv)
diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py
index 961431154..291a0e468 100644
--- a/nova/tests/api/openstack/fakes.py
+++ b/nova/tests/api/openstack/fakes.py
@@ -113,7 +113,7 @@ def stub_out_networking(stubs):
def stub_out_compute_api_snapshot(stubs):
def snapshot(self, context, instance_id, name):
return 123
- stubs.Set(nova.compute.api.ComputeAPI, 'snapshot', snapshot)
+ stubs.Set(nova.compute.API, 'snapshot', snapshot)
def stub_out_glance(stubs, initial_fixtures=[]):
diff --git a/nova/tests/api/openstack/test_servers.py b/nova/tests/api/openstack/test_servers.py
index 70ff714e6..6e611a55d 100644
--- a/nova/tests/api/openstack/test_servers.py
+++ b/nova/tests/api/openstack/test_servers.py
@@ -56,8 +56,8 @@ def instance_address(context, instance_id):
def stub_instance(id, user_id=1):
- return Instance(id=int(id) + 123456, state=0, image_id=10, user_id=user_id,
- display_name='server%s' % id, internal_id=id)
+ return Instance(id=id, state=0, image_id=10, user_id=user_id,
+ display_name='server%s' % id)
def fake_compute_api(cls, req, id):
@@ -76,8 +76,7 @@ class ServersTest(unittest.TestCase):
fakes.stub_out_key_pair_funcs(self.stubs)
fakes.stub_out_image_service(self.stubs)
self.stubs.Set(nova.db.api, 'instance_get_all', return_servers)
- self.stubs.Set(nova.db.api, 'instance_get_by_internal_id',
- return_server)
+ self.stubs.Set(nova.db.api, 'instance_get_by_id', return_server)
self.stubs.Set(nova.db.api, 'instance_get_all_by_user',
return_servers)
self.stubs.Set(nova.db.api, 'instance_add_security_group',
@@ -87,18 +86,12 @@ class ServersTest(unittest.TestCase):
instance_address)
self.stubs.Set(nova.db.api, 'instance_get_floating_address',
instance_address)
- self.stubs.Set(nova.compute.api.ComputeAPI, 'pause',
- fake_compute_api)
- self.stubs.Set(nova.compute.api.ComputeAPI, 'unpause',
- fake_compute_api)
- self.stubs.Set(nova.compute.api.ComputeAPI, 'suspend',
- fake_compute_api)
- self.stubs.Set(nova.compute.api.ComputeAPI, 'resume',
- fake_compute_api)
- self.stubs.Set(nova.compute.api.ComputeAPI, "get_diagnostics",
- fake_compute_api)
- self.stubs.Set(nova.compute.api.ComputeAPI, "get_actions",
- fake_compute_api)
+ self.stubs.Set(nova.compute.API, 'pause', fake_compute_api)
+ self.stubs.Set(nova.compute.API, 'unpause', fake_compute_api)
+ self.stubs.Set(nova.compute.API, 'suspend', fake_compute_api)
+ self.stubs.Set(nova.compute.API, 'resume', fake_compute_api)
+ self.stubs.Set(nova.compute.API, "get_diagnostics", fake_compute_api)
+ self.stubs.Set(nova.compute.API, "get_actions", fake_compute_api)
self.allow_admin = FLAGS.allow_admin_api
def tearDown(self):
@@ -109,7 +102,7 @@ class ServersTest(unittest.TestCase):
req = webob.Request.blank('/v1.0/servers/1')
res = req.get_response(nova.api.API('os'))
res_dict = json.loads(res.body)
- self.assertEqual(res_dict['server']['id'], 1)
+ self.assertEqual(res_dict['server']['id'], '1')
self.assertEqual(res_dict['server']['name'], 'server1')
def test_get_server_list(self):
@@ -126,7 +119,7 @@ class ServersTest(unittest.TestCase):
def test_create_instance(self):
def instance_create(context, inst):
- return {'id': 1, 'internal_id': 1, 'display_name': ''}
+ return {'id': '1', 'display_name': ''}
def server_update(context, id, params):
return instance_create(context, id)
diff --git a/nova/tests/hyperv_unittest.py b/nova/tests/hyperv_unittest.py
new file mode 100644
index 000000000..3980ae3cb
--- /dev/null
+++ b/nova/tests/hyperv_unittest.py
@@ -0,0 +1,71 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2010 Cloud.com, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Tests For Hyper-V driver
+"""
+
+import random
+
+from nova import context
+from nova import db
+from nova import flags
+from nova import test
+from nova.auth import manager
+from nova.virt import hyperv
+
+FLAGS = flags.FLAGS
+FLAGS.connection_type = 'hyperv'
+
+
+class HyperVTestCase(test.TestCase):
+ """Test cases for the Hyper-V driver"""
+ def setUp(self):
+ super(HyperVTestCase, self).setUp()
+ self.manager = manager.AuthManager()
+ self.user = self.manager.create_user('fake', 'fake', 'fake',
+ admin=True)
+ self.project = self.manager.create_project('fake', 'fake', 'fake')
+ self.context = context.RequestContext(self.user, self.project)
+
+ def test_create_destroy(self):
+ """Create a VM and destroy it"""
+ instance = {'internal_id': random.randint(1, 1000000),
+ 'memory_mb': '1024',
+ 'mac_address': '02:12:34:46:56:67',
+ 'vcpus': 2,
+ 'project_id': 'fake',
+ 'instance_type': 'm1.small'}
+ instance_ref = db.instance_create(self.context, instance)
+
+ conn = hyperv.get_connection(False)
+ conn._create_vm(instance_ref) # pylint: disable-msg=W0212
+ found = [n for n in conn.list_instances()
+ if n == instance_ref['name']]
+ self.assertTrue(len(found) == 1)
+ info = conn.get_info(instance_ref['name'])
+ #Unfortunately since the vm is not running at this point,
+ #we cannot obtain memory information from get_info
+ self.assertEquals(info['num_cpu'], instance_ref['vcpus'])
+
+ conn.destroy(instance_ref)
+ found = [n for n in conn.list_instances()
+ if n == instance_ref['name']]
+ self.assertTrue(len(found) == 0)
+
+ def tearDown(self):
+ super(HyperVTestCase, self).tearDown()
+ self.manager.delete_project(self.project)
+ self.manager.delete_user(self.user)
diff --git a/nova/tests/test_cloud.py b/nova/tests/test_cloud.py
index 70d2c44da..ba58fab59 100644
--- a/nova/tests/test_cloud.py
+++ b/nova/tests/test_cloud.py
@@ -106,7 +106,7 @@ class CloudTestCase(test.TestCase):
self.cloud.allocate_address(self.context)
inst = db.instance_create(self.context, {'host': FLAGS.host})
fixed = self.network.allocate_fixed_ip(self.context, inst['id'])
- ec2_id = cloud.internal_id_to_ec2_id(inst['internal_id'])
+ ec2_id = cloud.id_to_ec2_id(inst['id'])
self.cloud.associate_address(self.context,
instance_id=ec2_id,
public_ip=address)
@@ -127,9 +127,9 @@ class CloudTestCase(test.TestCase):
result = self.cloud.describe_volumes(self.context)
self.assertEqual(len(result['volumeSet']), 2)
result = self.cloud.describe_volumes(self.context,
- volume_id=[vol2['ec2_id']])
+ volume_id=[vol2['id']])
self.assertEqual(len(result['volumeSet']), 1)
- self.assertEqual(result['volumeSet'][0]['volumeId'], vol2['ec2_id'])
+ self.assertEqual(result['volumeSet'][0]['volumeId'], vol2['id'])
db.volume_destroy(self.context, vol1['id'])
db.volume_destroy(self.context, vol2['id'])
@@ -140,15 +140,16 @@ class CloudTestCase(test.TestCase):
kwargs = {'image_id': image_id,
'instance_type': instance_type,
'max_count': max_count}
- rv = yield self.cloud.run_instances(self.context, **kwargs)
+ rv = self.cloud.run_instances(self.context, **kwargs)
+ print rv
instance_id = rv['instancesSet'][0]['instanceId']
- output = yield self.cloud.get_console_output(context=self.context,
+ output = self.cloud.get_console_output(context=self.context,
instance_id=[instance_id])
self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE OUTPUT')
# TODO(soren): We need this until we can stop polling in the rpc code
# for unit tests.
greenthread.sleep(0.3)
- rv = yield self.cloud.terminate_instances(self.context, [instance_id])
+ rv = self.cloud.terminate_instances(self.context, [instance_id])
def test_key_generation(self):
result = self._create_key('test')
@@ -186,7 +187,7 @@ class CloudTestCase(test.TestCase):
kwargs = {'image_id': image_id,
'instance_type': instance_type,
'max_count': max_count}
- rv = yield self.cloud.run_instances(self.context, **kwargs)
+ rv = self.cloud.run_instances(self.context, **kwargs)
# TODO: check for proper response
instance_id = rv['reservationSet'][0].keys()[0]
instance = rv['reservationSet'][0][instance_id][0]
@@ -209,7 +210,7 @@ class CloudTestCase(test.TestCase):
for instance in reservations[reservations.keys()[0]]:
instance_id = instance['instance_id']
logging.debug("Terminating instance %s" % instance_id)
- rv = yield self.compute.terminate_instance(instance_id)
+ rv = self.compute.terminate_instance(instance_id)
def test_instance_update_state(self):
def instance(num):
@@ -296,7 +297,7 @@ class CloudTestCase(test.TestCase):
def test_update_of_instance_display_fields(self):
inst = db.instance_create(self.context, {})
- ec2_id = cloud.internal_id_to_ec2_id(inst['internal_id'])
+ ec2_id = cloud.id_to_ec2_id(inst['id'])
self.cloud.update_instance(self.context, ec2_id,
display_name='c00l 1m4g3')
inst = db.instance_get(self.context, inst['id'])
diff --git a/nova/tests/test_compute.py b/nova/tests/test_compute.py
index 1fb9143f1..534493dfe 100644
--- a/nova/tests/test_compute.py
+++ b/nova/tests/test_compute.py
@@ -22,6 +22,7 @@ Tests For Compute
import datetime
import logging
+from nova import compute
from nova import context
from nova import db
from nova import exception
@@ -29,7 +30,6 @@ from nova import flags
from nova import test
from nova import utils
from nova.auth import manager
-from nova.compute import api as compute_api
FLAGS = flags.FLAGS
@@ -44,7 +44,7 @@ class ComputeTestCase(test.TestCase):
stub_network=True,
network_manager='nova.network.manager.FlatManager')
self.compute = utils.import_object(FLAGS.compute_manager)
- self.compute_api = compute_api.ComputeAPI()
+ self.compute_api = compute.API()
self.manager = manager.AuthManager()
self.user = self.manager.create_user('fake', 'fake', 'fake')
self.project = self.manager.create_project('fake', 'fake', 'fake')
@@ -72,7 +72,7 @@ class ComputeTestCase(test.TestCase):
"""Verify that an instance cannot be created without a display_name."""
cases = [dict(), dict(display_name=None)]
for instance in cases:
- ref = self.compute_api.create_instances(self.context,
+ ref = self.compute_api.create(self.context,
FLAGS.default_instance_type, None, **instance)
try:
self.assertNotEqual(ref[0].display_name, None)
@@ -80,13 +80,13 @@ class ComputeTestCase(test.TestCase):
db.instance_destroy(self.context, ref[0]['id'])
def test_create_instance_associates_security_groups(self):
- """Make sure create_instances associates security groups"""
+ """Make sure create associates security groups"""
values = {'name': 'default',
'description': 'default',
'user_id': self.user.id,
'project_id': self.project.id}
group = db.security_group_create(self.context, values)
- ref = self.compute_api.create_instances(self.context,
+ ref = self.compute_api.create(self.context,
FLAGS.default_instance_type, None, security_group=['default'])
try:
self.assertEqual(len(ref[0]['security_groups']), 1)
@@ -178,3 +178,22 @@ class ComputeTestCase(test.TestCase):
self.context,
instance_id)
self.compute.terminate_instance(self.context, instance_id)
+
+ def test_lock(self):
+ """ensure locked instance cannot be changed"""
+ instance_id = self._create_instance()
+ self.compute.run_instance(self.context, instance_id)
+
+ non_admin_context = context.RequestContext(None, None, False, False)
+
+ # decorator should return False (fail) with locked nonadmin context
+ self.compute.lock_instance(self.context, instance_id)
+ ret_val = self.compute.reboot_instance(non_admin_context, instance_id)
+ self.assertEqual(ret_val, False)
+
+ # decorator should return None (success) with unlocked nonadmin context
+ self.compute.unlock_instance(self.context, instance_id)
+ ret_val = self.compute.reboot_instance(non_admin_context, instance_id)
+ self.assertEqual(ret_val, None)
+
+ self.compute.terminate_instance(self.context, instance_id)
diff --git a/nova/tests/test_quota.py b/nova/tests/test_quota.py
index 8cf2a5e54..b5f9f30ef 100644
--- a/nova/tests/test_quota.py
+++ b/nova/tests/test_quota.py
@@ -27,6 +27,7 @@ from nova import test
from nova import utils
from nova.auth import manager
from nova.api.ec2 import cloud
+from nova.compute import instance_types
FLAGS = flags.FLAGS
@@ -78,14 +79,17 @@ class QuotaTestCase(test.TestCase):
def test_quota_overrides(self):
"""Make sure overriding a projects quotas works"""
- num_instances = quota.allowed_instances(self.context, 100, 'm1.small')
+ num_instances = quota.allowed_instances(self.context, 100,
+ instance_types.INSTANCE_TYPES['m1.small'])
self.assertEqual(num_instances, 2)
db.quota_create(self.context, {'project_id': self.project.id,
'instances': 10})
- num_instances = quota.allowed_instances(self.context, 100, 'm1.small')
+ num_instances = quota.allowed_instances(self.context, 100,
+ instance_types.INSTANCE_TYPES['m1.small'])
self.assertEqual(num_instances, 4)
db.quota_update(self.context, self.project.id, {'cores': 100})
- num_instances = quota.allowed_instances(self.context, 100, 'm1.small')
+ num_instances = quota.allowed_instances(self.context, 100,
+ instance_types.INSTANCE_TYPES['m1.small'])
self.assertEqual(num_instances, 10)
db.quota_destroy(self.context, self.project.id)
diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py
index c95a53af3..ec9462ada 100644
--- a/nova/tests/test_xenapi.py
+++ b/nova/tests/test_xenapi.py
@@ -79,8 +79,8 @@ class XenAPIVolumeTestCase(test.TestCase):
helper = volume_utils.VolumeHelper
helper.XenAPI = session.get_imported_xenapi()
vol = self._create_volume()
- info = helper.parse_volume_info(vol['ec2_id'], '/dev/sdc')
- label = 'SR-%s' % vol['ec2_id']
+ info = helper.parse_volume_info(vol['id'], '/dev/sdc')
+ label = 'SR-%s' % vol['id']
description = 'Test-SR'
sr_ref = helper.create_iscsi_storage(session, info, label, description)
srs = xenapi_fake.get_all('SR')
@@ -97,7 +97,7 @@ class XenAPIVolumeTestCase(test.TestCase):
# oops, wrong mount point!
self.assertRaises(volume_utils.StorageError,
helper.parse_volume_info,
- vol['ec2_id'],
+ vol['id'],
'/dev/sd')
db.volume_destroy(context.get_admin_context(), vol['id'])
@@ -108,8 +108,7 @@ class XenAPIVolumeTestCase(test.TestCase):
volume = self._create_volume()
instance = db.instance_create(self.values)
xenapi_fake.create_vm(instance.name, 'Running')
- result = conn.attach_volume(instance.name, volume['ec2_id'],
- '/dev/sdc')
+ result = conn.attach_volume(instance.name, volume['id'], '/dev/sdc')
def check():
# check that the VM has a VBD attached to it
@@ -134,7 +133,7 @@ class XenAPIVolumeTestCase(test.TestCase):
self.assertRaises(Exception,
conn.attach_volume,
instance.name,
- volume['ec2_id'],
+ volume['id'],
'/dev/sdc')
def tearDown(self):
@@ -250,15 +249,16 @@ class XenAPIVMTestCase(test.TestCase):
def _create_instance(self):
"""Creates and spawns a test instance"""
- values = {'name': 1, 'id': 1,
- 'project_id': self.project.id,
- 'user_id': self.user.id,
- 'image_id': 1,
- 'kernel_id': 2,
- 'ramdisk_id': 3,
- 'instance_type': 'm1.large',
- 'mac_address': 'aa:bb:cc:dd:ee:ff'
- }
+ values = {
+ 'name': 1,
+ 'id': 1,
+ 'project_id': self.project.id,
+ 'user_id': self.user.id,
+ 'image_id': 1,
+ 'kernel_id': 2,
+ 'ramdisk_id': 3,
+ 'instance_type': 'm1.large',
+ 'mac_address': 'aa:bb:cc:dd:ee:ff'}
instance = db.instance_create(values)
self.conn.spawn(instance)
return instance
diff --git a/nova/virt/connection.py b/nova/virt/connection.py
index 61e99944e..846423afe 100644
--- a/nova/virt/connection.py
+++ b/nova/virt/connection.py
@@ -26,6 +26,7 @@ from nova import flags
from nova.virt import fake
from nova.virt import libvirt_conn
from nova.virt import xenapi_conn
+from nova.virt import hyperv
FLAGS = flags.FLAGS
@@ -62,6 +63,8 @@ def get_connection(read_only=False):
conn = libvirt_conn.get_connection(read_only)
elif t == 'xenapi':
conn = xenapi_conn.get_connection(read_only)
+ elif t == 'hyperv':
+ conn = hyperv.get_connection(read_only)
else:
raise Exception('Unknown connection type "%s"' % t)
diff --git a/nova/virt/hyperv.py b/nova/virt/hyperv.py
new file mode 100644
index 000000000..4b9f6f946
--- /dev/null
+++ b/nova/virt/hyperv.py
@@ -0,0 +1,459 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Cloud.com, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+A connection to Hyper-V .
+Uses Windows Management Instrumentation (WMI) calls to interact with Hyper-V
+Hyper-V WMI usage:
+ http://msdn.microsoft.com/en-us/library/cc723875%28v=VS.85%29.aspx
+The Hyper-V object model briefly:
+ The physical computer and its hosted virtual machines are each represented
+ by the Msvm_ComputerSystem class.
+
+ Each virtual machine is associated with a
+ Msvm_VirtualSystemGlobalSettingData (vs_gs_data) instance and one or more
+ Msvm_VirtualSystemSettingData (vmsetting) instances. For each vmsetting
+ there is a series of Msvm_ResourceAllocationSettingData (rasd) objects.
+ The rasd objects describe the settings for each device in a VM.
+ Together, the vs_gs_data, vmsettings and rasds describe the configuration
+ of the virtual machine.
+
+ Creating new resources such as disks and nics involves cloning a default
+ rasd object and appropriately modifying the clone and calling the
+ AddVirtualSystemResources WMI method
+ Changing resources such as memory uses the ModifyVirtualSystemResources
+ WMI method
+
+Using the Python WMI library:
+ Tutorial:
+ http://timgolden.me.uk/python/wmi/tutorial.html
+ Hyper-V WMI objects can be retrieved simply by using the class name
+ of the WMI object and optionally specifying a column to filter the
+ result set. More complex filters can be formed using WQL (sql-like)
+ queries.
+ The parameters and return tuples of WMI method calls can gleaned by
+ examining the doc string. For example:
+ >>> vs_man_svc.ModifyVirtualSystemResources.__doc__
+ ModifyVirtualSystemResources (ComputerSystem, ResourceSettingData[])
+ => (Job, ReturnValue)'
+ When passing setting data (ResourceSettingData) to the WMI method,
+ an XML representation of the data is passed in using GetText_(1).
+ Available methods on a service can be determined using method.keys():
+ >>> vs_man_svc.methods.keys()
+ vmsettings and rasds for a vm can be retrieved using the 'associators'
+ method with the appropriate return class.
+ Long running WMI commands generally return a Job (an instance of
+ Msvm_ConcreteJob) whose state can be polled to determine when it finishes
+
+"""
+
+import os
+import logging
+import time
+
+from nova import exception
+from nova import flags
+from nova.auth import manager
+from nova.compute import power_state
+from nova.virt import images
+
+wmi = None
+
+
+FLAGS = flags.FLAGS
+
+
+HYPERV_POWER_STATE = {
+ 3: power_state.SHUTDOWN,
+ 2: power_state.RUNNING,
+ 32768: power_state.PAUSED,
+}
+
+
+REQ_POWER_STATE = {
+ 'Enabled': 2,
+ 'Disabled': 3,
+ 'Reboot': 10,
+ 'Reset': 11,
+ 'Paused': 32768,
+ 'Suspended': 32769
+}
+
+
+WMI_JOB_STATUS_STARTED = 4096
+WMI_JOB_STATE_RUNNING = 4
+WMI_JOB_STATE_COMPLETED = 7
+
+
+def get_connection(_):
+ global wmi
+ if wmi is None:
+ wmi = __import__('wmi')
+ return HyperVConnection()
+
+
+class HyperVConnection(object):
+ def __init__(self):
+ self._conn = wmi.WMI(moniker='//./root/virtualization')
+ self._cim_conn = wmi.WMI(moniker='//./root/cimv2')
+
+ def init_host(self):
+ #FIXME(chiradeep): implement this
+ logging.debug(_('In init host'))
+ pass
+
+ def list_instances(self):
+ """ Return the names of all the instances known to Hyper-V. """
+ vms = [v.ElementName \
+ for v in self._conn.Msvm_ComputerSystem(['ElementName'])]
+ return vms
+
+ def spawn(self, instance):
+ """ Create a new VM and start it."""
+ vm = self._lookup(instance.name)
+ if vm is not None:
+ raise exception.Duplicate(_('Attempt to create duplicate vm %s') %
+ instance.name)
+
+ user = manager.AuthManager().get_user(instance['user_id'])
+ project = manager.AuthManager().get_project(instance['project_id'])
+ #Fetch the file, assume it is a VHD file.
+ base_vhd_filename = os.path.join(FLAGS.instances_path,
+ instance.name)
+ vhdfile = "%s.vhd" % (base_vhd_filename)
+ images.fetch(instance['image_id'], vhdfile, user, project)
+
+ try:
+ self._create_vm(instance)
+
+ self._create_disk(instance['name'], vhdfile)
+ self._create_nic(instance['name'], instance['mac_address'])
+
+ logging.debug(_('Starting VM %s '), instance.name)
+ self._set_vm_state(instance['name'], 'Enabled')
+ logging.info(_('Started VM %s '), instance.name)
+ except Exception as exn:
+ logging.error(_('spawn vm failed: %s'), exn)
+ self.destroy(instance)
+
+ def _create_vm(self, instance):
+ """Create a VM but don't start it. """
+ vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
+
+ vs_gs_data = self._conn.Msvm_VirtualSystemGlobalSettingData.new()
+ vs_gs_data.ElementName = instance['name']
+ (job, ret_val) = vs_man_svc.DefineVirtualSystem(
+ [], None, vs_gs_data.GetText_(1))[1:]
+ if ret_val == WMI_JOB_STATUS_STARTED:
+ success = self._check_job_status(job)
+ else:
+ success = (ret_val == 0)
+
+ if not success:
+ raise Exception(_('Failed to create VM %s'), instance.name)
+
+ logging.debug(_('Created VM %s...'), instance.name)
+ vm = self._conn.Msvm_ComputerSystem(ElementName=instance.name)[0]
+
+ vmsettings = vm.associators(
+ wmi_result_class='Msvm_VirtualSystemSettingData')
+ vmsetting = [s for s in vmsettings
+ if s.SettingType == 3][0] # avoid snapshots
+ memsetting = vmsetting.associators(
+ wmi_result_class='Msvm_MemorySettingData')[0]
+ #No Dynamic Memory, so reservation, limit and quantity are identical.
+ mem = long(str(instance['memory_mb']))
+ memsetting.VirtualQuantity = mem
+ memsetting.Reservation = mem
+ memsetting.Limit = mem
+
+ (job, ret_val) = vs_man_svc.ModifyVirtualSystemResources(
+ vm.path_(), [memsetting.GetText_(1)])
+ logging.debug(_('Set memory for vm %s...'), instance.name)
+ procsetting = vmsetting.associators(
+ wmi_result_class='Msvm_ProcessorSettingData')[0]
+ vcpus = long(instance['vcpus'])
+ procsetting.VirtualQuantity = vcpus
+ procsetting.Reservation = vcpus
+ procsetting.Limit = vcpus
+
+ (job, ret_val) = vs_man_svc.ModifyVirtualSystemResources(
+ vm.path_(), [procsetting.GetText_(1)])
+ logging.debug(_('Set vcpus for vm %s...'), instance.name)
+
+ def _create_disk(self, vm_name, vhdfile):
+ """Create a disk and attach it to the vm"""
+ logging.debug(_('Creating disk for %s by attaching disk file %s'),
+ vm_name, vhdfile)
+ #Find the IDE controller for the vm.
+ vms = self._conn.MSVM_ComputerSystem(ElementName=vm_name)
+ vm = vms[0]
+ vmsettings = vm.associators(
+ wmi_result_class='Msvm_VirtualSystemSettingData')
+ rasds = vmsettings[0].associators(
+ wmi_result_class='MSVM_ResourceAllocationSettingData')
+ ctrller = [r for r in rasds
+ if r.ResourceSubType == 'Microsoft Emulated IDE Controller'\
+ and r.Address == "0"]
+ #Find the default disk drive object for the vm and clone it.
+ diskdflt = self._conn.query(
+ "SELECT * FROM Msvm_ResourceAllocationSettingData \
+ WHERE ResourceSubType LIKE 'Microsoft Synthetic Disk Drive'\
+ AND InstanceID LIKE '%Default%'")[0]
+ diskdrive = self._clone_wmi_obj(
+ 'Msvm_ResourceAllocationSettingData', diskdflt)
+ #Set the IDE ctrller as parent.
+ diskdrive.Parent = ctrller[0].path_()
+ diskdrive.Address = 0
+ #Add the cloned disk drive object to the vm.
+ new_resources = self._add_virt_resource(diskdrive, vm)
+ if new_resources is None:
+ raise Exception(_('Failed to add diskdrive to VM %s'),
+ vm_name)
+ diskdrive_path = new_resources[0]
+ logging.debug(_('New disk drive path is %s'), diskdrive_path)
+ #Find the default VHD disk object.
+ vhddefault = self._conn.query(
+ "SELECT * FROM Msvm_ResourceAllocationSettingData \
+ WHERE ResourceSubType LIKE 'Microsoft Virtual Hard Disk' AND \
+ InstanceID LIKE '%Default%' ")[0]
+
+ #Clone the default and point it to the image file.
+ vhddisk = self._clone_wmi_obj(
+ 'Msvm_ResourceAllocationSettingData', vhddefault)
+ #Set the new drive as the parent.
+ vhddisk.Parent = diskdrive_path
+ vhddisk.Connection = [vhdfile]
+
+ #Add the new vhd object as a virtual hard disk to the vm.
+ new_resources = self._add_virt_resource(vhddisk, vm)
+ if new_resources is None:
+ raise Exception(_('Failed to add vhd file to VM %s'),
+ vm_name)
+ logging.info(_('Created disk for %s'), vm_name)
+
+ def _create_nic(self, vm_name, mac):
+ """Create a (emulated) nic and attach it to the vm"""
+ logging.debug(_('Creating nic for %s '), vm_name)
+ #Find the vswitch that is connected to the physical nic.
+ vms = self._conn.Msvm_ComputerSystem(ElementName=vm_name)
+ extswitch = self._find_external_network()
+ vm = vms[0]
+ switch_svc = self._conn.Msvm_VirtualSwitchManagementService()[0]
+ #Find the default nic and clone it to create a new nic for the vm.
+ #Use Msvm_SyntheticEthernetPortSettingData for Windows or Linux with
+ #Linux Integration Components installed.
+ emulatednics_data = self._conn.Msvm_EmulatedEthernetPortSettingData()
+ default_nic_data = [n for n in emulatednics_data
+ if n.InstanceID.rfind('Default') > 0]
+ new_nic_data = self._clone_wmi_obj(
+ 'Msvm_EmulatedEthernetPortSettingData',
+ default_nic_data[0])
+ #Create a port on the vswitch.
+ (new_port, ret_val) = switch_svc.CreateSwitchPort(vm_name, vm_name,
+ "", extswitch.path_())
+ if ret_val != 0:
+ logging.error(_('Failed creating a port on the external vswitch'))
+ raise Exception(_('Failed creating port for %s'),
+ vm_name)
+ logging.debug(_("Created switch port %s on switch %s"),
+ vm_name, extswitch.path_())
+ #Connect the new nic to the new port.
+ new_nic_data.Connection = [new_port]
+ new_nic_data.ElementName = vm_name + ' nic'
+ new_nic_data.Address = ''.join(mac.split(':'))
+ new_nic_data.StaticMacAddress = 'TRUE'
+ #Add the new nic to the vm.
+ new_resources = self._add_virt_resource(new_nic_data, vm)
+ if new_resources is None:
+ raise Exception(_('Failed to add nic to VM %s'),
+ vm_name)
+ logging.info(_("Created nic for %s "), vm_name)
+
+ def _add_virt_resource(self, res_setting_data, target_vm):
+ """Add a new resource (disk/nic) to the VM"""
+ vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
+ (job, new_resources, ret_val) = vs_man_svc.\
+ AddVirtualSystemResources([res_setting_data.GetText_(1)],
+ target_vm.path_())
+ success = True
+ if ret_val == WMI_JOB_STATUS_STARTED:
+ success = self._check_job_status(job)
+ else:
+ success = (ret_val == 0)
+ if success:
+ return new_resources
+ else:
+ return None
+
+ #TODO: use the reactor to poll instead of sleep
+ def _check_job_status(self, jobpath):
+ """Poll WMI job state for completion"""
+ #Jobs have a path of the form:
+ #\\WIN-P5IG7367DAG\root\virtualization:Msvm_ConcreteJob.InstanceID=
+ #"8A496B9C-AF4D-4E98-BD3C-1128CD85320D"
+ inst_id = jobpath.split('=')[1].strip('"')
+ jobs = self._conn.Msvm_ConcreteJob(InstanceID=inst_id)
+ if len(jobs) == 0:
+ return False
+ job = jobs[0]
+ while job.JobState == WMI_JOB_STATE_RUNNING:
+ time.sleep(0.1)
+ job = self._conn.Msvm_ConcreteJob(InstanceID=inst_id)[0]
+ if job.JobState != WMI_JOB_STATE_COMPLETED:
+ logging.debug(_("WMI job failed: %s"), job.ErrorSummaryDescription)
+ return False
+ logging.debug(_("WMI job succeeded: %s, Elapsed=%s "), job.Description,
+ job.ElapsedTime)
+ return True
+
+ def _find_external_network(self):
+ """Find the vswitch that is connected to the physical nic.
+ Assumes only one physical nic on the host
+ """
+ #If there are no physical nics connected to networks, return.
+ bound = self._conn.Msvm_ExternalEthernetPort(IsBound='TRUE')
+ if len(bound) == 0:
+ return None
+ return self._conn.Msvm_ExternalEthernetPort(IsBound='TRUE')[0]\
+ .associators(wmi_result_class='Msvm_SwitchLANEndpoint')[0]\
+ .associators(wmi_result_class='Msvm_SwitchPort')[0]\
+ .associators(wmi_result_class='Msvm_VirtualSwitch')[0]
+
+ def _clone_wmi_obj(self, wmi_class, wmi_obj):
+ """Clone a WMI object"""
+ cl = self._conn.__getattr__(wmi_class) # get the class
+ newinst = cl.new()
+ #Copy the properties from the original.
+ for prop in wmi_obj._properties:
+ newinst.Properties_.Item(prop).Value =\
+ wmi_obj.Properties_.Item(prop).Value
+ return newinst
+
+ def reboot(self, instance):
+ """Reboot the specified instance."""
+ vm = self._lookup(instance.name)
+ if vm is None:
+ raise exception.NotFound('instance not present %s' % instance.name)
+ self._set_vm_state(instance.name, 'Reboot')
+
+ def destroy(self, instance):
+ """Destroy the VM. Also destroy the associated VHD disk files"""
+ logging.debug(_("Got request to destroy vm %s"), instance.name)
+ vm = self._lookup(instance.name)
+ if vm is None:
+ return
+ vm = self._conn.Msvm_ComputerSystem(ElementName=instance.name)[0]
+ vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
+ #Stop the VM first.
+ self._set_vm_state(instance.name, 'Disabled')
+ vmsettings = vm.associators(
+ wmi_result_class='Msvm_VirtualSystemSettingData')
+ rasds = vmsettings[0].associators(
+ wmi_result_class='MSVM_ResourceAllocationSettingData')
+ disks = [r for r in rasds \
+ if r.ResourceSubType == 'Microsoft Virtual Hard Disk']
+ diskfiles = []
+ #Collect disk file information before destroying the VM.
+ for disk in disks:
+ diskfiles.extend([c for c in disk.Connection])
+ #Nuke the VM. Does not destroy disks.
+ (job, ret_val) = vs_man_svc.DestroyVirtualSystem(vm.path_())
+ if ret_val == WMI_JOB_STATUS_STARTED:
+ success = self._check_job_status(job)
+ elif ret_val == 0:
+ success = True
+ if not success:
+ raise Exception(_('Failed to destroy vm %s') % instance.name)
+ #Delete associated vhd disk files.
+ for disk in diskfiles:
+ vhdfile = self._cim_conn.CIM_DataFile(Name=disk)
+ for vf in vhdfile:
+ vf.Delete()
+ logging.debug(_("Del: disk %s vm %s"), vhdfile, instance.name)
+
+ def get_info(self, instance_id):
+ """Get information about the VM"""
+ vm = self._lookup(instance_id)
+ if vm is None:
+ raise exception.NotFound('instance not present %s' % instance_id)
+ vm = self._conn.Msvm_ComputerSystem(ElementName=instance_id)[0]
+ vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
+ vmsettings = vm.associators(
+ wmi_result_class='Msvm_VirtualSystemSettingData')
+ settings_paths = [v.path_() for v in vmsettings]
+ #See http://msdn.microsoft.com/en-us/library/cc160706%28VS.85%29.aspx
+ summary_info = vs_man_svc.GetSummaryInformation(
+ [4, 100, 103, 105], settings_paths)[1]
+ info = summary_info[0]
+ logging.debug(_("Got Info for vm %s: state=%s, mem=%s, num_cpu=%s, \
+ cpu_time=%s"), instance_id,
+ str(HYPERV_POWER_STATE[info.EnabledState]),
+ str(info.MemoryUsage),
+ str(info.NumberOfProcessors),
+ str(info.UpTime))
+
+ return {'state': HYPERV_POWER_STATE[info.EnabledState],
+ 'max_mem': info.MemoryUsage,
+ 'mem': info.MemoryUsage,
+ 'num_cpu': info.NumberOfProcessors,
+ 'cpu_time': info.UpTime}
+
+ def _lookup(self, i):
+ vms = self._conn.Msvm_ComputerSystem(ElementName=i)
+ n = len(vms)
+ if n == 0:
+ return None
+ elif n > 1:
+ raise Exception(_('duplicate name found: %s') % i)
+ else:
+ return vms[0].ElementName
+
+ def _set_vm_state(self, vm_name, req_state):
+ """Set the desired state of the VM"""
+ vms = self._conn.Msvm_ComputerSystem(ElementName=vm_name)
+ if len(vms) == 0:
+ return False
+ (job, ret_val) = vms[0].RequestStateChange(REQ_POWER_STATE[req_state])
+ success = False
+ if ret_val == WMI_JOB_STATUS_STARTED:
+ success = self._check_job_status(job)
+ elif ret_val == 0:
+ success = True
+ elif ret_val == 32775:
+ #Invalid state for current operation. Typically means it is
+ #already in the state requested
+ success = True
+ if success:
+ logging.info(_("Successfully changed vm state of %s to %s"),
+ vm_name, req_state)
+ else:
+ logging.error(_("Failed to change vm state of %s to %s"),
+ vm_name, req_state)
+ raise Exception(_("Failed to change vm state of %s to %s"),
+ vm_name, req_state)
+
+ def attach_volume(self, instance_name, device_path, mountpoint):
+ vm = self._lookup(instance_name)
+ if vm is None:
+ raise exception.NotFound('Cannot attach volume to missing %s vm' %
+ instance_name)
+
+ def detach_volume(self, instance_name, mountpoint):
+ vm = self._lookup(instance_name)
+ if vm is None:
+ raise exception.NotFound('Cannot detach volume from missing %s ' %
+ instance_name)
diff --git a/nova/virt/images.py b/nova/virt/images.py
index 1c9b2e093..2d03da4b4 100644
--- a/nova/virt/images.py
+++ b/nova/virt/images.py
@@ -21,8 +21,12 @@
Handling of VM disk images.
"""
+import logging
import os.path
+import shutil
+import sys
import time
+import urllib2
import urlparse
from nova import flags
@@ -45,6 +49,25 @@ def fetch(image, path, user, project):
return f(image, path, user, project)
+def _fetch_image_no_curl(url, path, headers):
+ request = urllib2.Request(url)
+ for (k, v) in headers.iteritems():
+ request.add_header(k, v)
+
+ def urlretrieve(urlfile, fpath):
+ chunk = 1 * 1024 * 1024
+ f = open(fpath, "wb")
+ while 1:
+ data = urlfile.read(chunk)
+ if not data:
+ break
+ f.write(data)
+
+ urlopened = urllib2.urlopen(request)
+ urlretrieve(urlopened, path)
+ logging.debug(_("Finished retreving %s -- placed in %s"), url, path)
+
+
def _fetch_s3_image(image, path, user, project):
url = image_url(image)
@@ -61,18 +84,24 @@ def _fetch_s3_image(image, path, user, project):
url_path)
headers['Authorization'] = 'AWS %s:%s' % (access, signature)
- cmd = ['/usr/bin/curl', '--fail', '--silent', url]
- for (k, v) in headers.iteritems():
- cmd += ['-H', '"%s: %s"' % (k, v)]
+ if sys.platform.startswith('win'):
+ return _fetch_image_no_curl(url, path, headers)
+ else:
+ cmd = ['/usr/bin/curl', '--fail', '--silent', url]
+ for (k, v) in headers.iteritems():
+ cmd += ['-H', '%s: %s' % (k, v)]
- cmd += ['-o', path]
- cmd_out = ' '.join(cmd)
- return utils.execute(cmd_out)
+ cmd += ['-o', path]
+ cmd_out = ' '.join(cmd)
+ return utils.execute(cmd_out)
def _fetch_local_image(image, path, user, project):
- source = _image_path('%s/image' % image)
- return utils.execute('cp %s %s' % (source, path))
+ source = _image_path(os.path.join(image, 'image'))
+ if sys.platform.startswith('win'):
+ return shutil.copy(source, path)
+ else:
+ return utils.execute('cp %s %s' % (source, path))
def _image_path(path):
diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py
index 76f31635a..b6d620782 100644
--- a/nova/virt/xenapi/vmops.py
+++ b/nova/virt/xenapi/vmops.py
@@ -1,6 +1,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Citrix Systems, Inc.
+# Copyright 2010 OpenStack LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -18,6 +19,7 @@
Management class for VM-related functions (spawn, reboot, etc).
"""
+import json
import logging
from nova import db
@@ -36,7 +38,6 @@ class VMOps(object):
"""
Management class for VM-related tasks
"""
-
def __init__(self, session):
self.XenAPI = session.get_imported_xenapi()
self._session = session
@@ -120,6 +121,20 @@ class VMOps(object):
timer.f = _wait_for_boot
return timer.start(interval=0.5, now=True)
+ def _get_vm_opaque_ref(self, instance_or_vm):
+ """Refactored out the common code of many methods that receive either
+ a vm name or a vm instance, and want a vm instance in return.
+ """
+ try:
+ instance_name = instance_or_vm.name
+ vm = VMHelper.lookup(self._session, instance_name)
+ except AttributeError:
+ # A vm opaque ref was passed
+ vm = instance_or_vm
+ if vm is None:
+ raise Exception(_('Instance not present %s') % instance_name)
+ return vm
+
def snapshot(self, instance, name):
""" Create snapshot from a running VM instance
@@ -168,11 +183,7 @@ class VMOps(object):
def reboot(self, instance):
"""Reboot VM instance"""
- instance_name = instance.name
- vm = VMHelper.lookup(self._session, instance_name)
- if vm is None:
- raise exception.NotFound(_('instance not'
- ' found %s') % instance_name)
+ vm = self._get_vm_opaque_ref(instance)
task = self._session.call_xenapi('Async.VM.clean_reboot', vm)
self._session.wait_for_task(instance.id, task)
@@ -215,27 +226,19 @@ class VMOps(object):
ret = None
try:
ret = self._session.wait_for_task(instance_id, task)
- except XenAPI.Failure, exc:
+ except self.XenAPI.Failure, exc:
logging.warn(exc)
callback(ret)
def pause(self, instance, callback):
"""Pause VM instance"""
- instance_name = instance.name
- vm = VMHelper.lookup(self._session, instance_name)
- if vm is None:
- raise exception.NotFound(_('Instance not'
- ' found %s') % instance_name)
+ vm = self._get_vm_opaque_ref(instance)
task = self._session.call_xenapi('Async.VM.pause', vm)
self._wait_with_callback(instance.id, task, callback)
def unpause(self, instance, callback):
"""Unpause VM instance"""
- instance_name = instance.name
- vm = VMHelper.lookup(self._session, instance_name)
- if vm is None:
- raise exception.NotFound(_('Instance not'
- ' found %s') % instance_name)
+ vm = self._get_vm_opaque_ref(instance)
task = self._session.call_xenapi('Async.VM.unpause', vm)
self._wait_with_callback(instance.id, task, callback)
@@ -270,10 +273,7 @@ class VMOps(object):
def get_diagnostics(self, instance):
"""Return data about VM diagnostics"""
- vm = VMHelper.lookup(self._session, instance.name)
- if vm is None:
- raise exception.NotFound(_("Instance not found %s") %
- instance.name)
+ vm = self._get_vm_opaque_ref(instance)
rec = self._session.get_xenapi().VM.get_record(vm)
return VMHelper.compile_diagnostics(self._session, rec)
@@ -281,3 +281,175 @@ class VMOps(object):
"""Return snapshot of console"""
# TODO: implement this to fix pylint!
return 'FAKE CONSOLE OUTPUT of instance'
+
+ def list_from_xenstore(self, vm, path):
+ """Runs the xenstore-ls command to get a listing of all records
+ from 'path' downward. Returns a dict with the sub-paths as keys,
+ and the value stored in those paths as values. If nothing is
+ found at that path, returns None.
+ """
+ ret = self._make_xenstore_call('list_records', vm, path)
+ return json.loads(ret)
+
+ def read_from_xenstore(self, vm, path):
+ """Returns the value stored in the xenstore record for the given VM
+ at the specified location. A XenAPIPlugin.PluginError will be raised
+ if any error is encountered in the read process.
+ """
+ try:
+ ret = self._make_xenstore_call('read_record', vm, path,
+ {'ignore_missing_path': 'True'})
+ except self.XenAPI.Failure, e:
+ return None
+ ret = json.loads(ret)
+ if ret == "None":
+ # Can't marshall None over RPC calls.
+ return None
+ return ret
+
+ def write_to_xenstore(self, vm, path, value):
+ """Writes the passed value to the xenstore record for the given VM
+ at the specified location. A XenAPIPlugin.PluginError will be raised
+ if any error is encountered in the write process.
+ """
+ return self._make_xenstore_call('write_record', vm, path,
+ {'value': json.dumps(value)})
+
+ def clear_xenstore(self, vm, path):
+ """Deletes the VM's xenstore record for the specified path.
+ If there is no such record, the request is ignored.
+ """
+ self._make_xenstore_call('delete_record', vm, path)
+
+ def _make_xenstore_call(self, method, vm, path, addl_args={}):
+ """Handles calls to the xenstore xenapi plugin."""
+ return self._make_plugin_call('xenstore.py', method=method, vm=vm,
+ path=path, addl_args=addl_args)
+
+ def _make_plugin_call(self, plugin, method, vm, path, addl_args={}):
+ """Abstracts out the process of calling a method of a xenapi plugin.
+ Any errors raised by the plugin will in turn raise a RuntimeError here.
+ """
+ vm = self._get_vm_opaque_ref(vm)
+ rec = self._session.get_xenapi().VM.get_record(vm)
+ args = {'dom_id': rec['domid'], 'path': path}
+ args.update(addl_args)
+ # If the 'testing_mode' attribute is set, add that to the args.
+ if getattr(self, 'testing_mode', False):
+ args['testing_mode'] = 'true'
+ try:
+ task = self._session.async_call_plugin(plugin, method, args)
+ ret = self._session.wait_for_task(0, task)
+ except self.XenAPI.Failure, e:
+ raise RuntimeError("%s" % e.details[-1])
+ return ret
+
+ def add_to_xenstore(self, vm, path, key, value):
+ """Adds the passed key/value pair to the xenstore record for
+ the given VM at the specified location. A XenAPIPlugin.PluginError
+ will be raised if any error is encountered in the write process.
+ """
+ current = self.read_from_xenstore(vm, path)
+ if not current:
+ # Nothing at that location
+ current = {key: value}
+ else:
+ current[key] = value
+ self.write_to_xenstore(vm, path, current)
+
+ def remove_from_xenstore(self, vm, path, key_or_keys):
+ """Takes either a single key or a list of keys and removes
+ them from the xenstoreirecord data for the given VM.
+ If the key doesn't exist, the request is ignored.
+ """
+ current = self.list_from_xenstore(vm, path)
+ if not current:
+ return
+ if isinstance(key_or_keys, basestring):
+ keys = [key_or_keys]
+ else:
+ keys = key_or_keys
+ keys.sort(lambda x, y: cmp(y.count('/'), x.count('/')))
+ for key in keys:
+ if path:
+ keypath = "%s/%s" % (path, key)
+ else:
+ keypath = key
+ self._make_xenstore_call('delete_record', vm, keypath)
+
+ ########################################################################
+ ###### The following methods interact with the xenstore parameter
+ ###### record, not the live xenstore. They were created before I
+ ###### knew the difference, and are left in here in case they prove
+ ###### to be useful. They all have '_param' added to their method
+ ###### names to distinguish them. (dabo)
+ ########################################################################
+ def read_partial_from_param_xenstore(self, instance_or_vm, key_prefix):
+ """Returns a dict of all the keys in the xenstore parameter record
+ for the given instance that begin with the key_prefix.
+ """
+ data = self.read_from_param_xenstore(instance_or_vm)
+ badkeys = [k for k in data.keys()
+ if not k.startswith(key_prefix)]
+ for badkey in badkeys:
+ del data[badkey]
+ return data
+
+ def read_from_param_xenstore(self, instance_or_vm, keys=None):
+ """Returns the xenstore parameter record data for the specified VM
+ instance as a dict. Accepts an optional key or list of keys; if a
+ value for 'keys' is passed, the returned dict is filtered to only
+ return the values for those keys.
+ """
+ vm = self._get_vm_opaque_ref(instance_or_vm)
+ data = self._session.call_xenapi_request('VM.get_xenstore_data',
+ (vm, ))
+ ret = {}
+ if keys is None:
+ keys = data.keys()
+ elif isinstance(keys, basestring):
+ keys = [keys]
+ for key in keys:
+ raw = data.get(key)
+ if raw:
+ ret[key] = json.loads(raw)
+ else:
+ ret[key] = raw
+ return ret
+
+ def add_to_param_xenstore(self, instance_or_vm, key, val):
+ """Takes a key/value pair and adds it to the xenstore parameter
+ record for the given vm instance. If the key exists in xenstore,
+ it is overwritten"""
+ vm = self._get_vm_opaque_ref(instance_or_vm)
+ self.remove_from_param_xenstore(instance_or_vm, key)
+ jsonval = json.dumps(val)
+ self._session.call_xenapi_request('VM.add_to_xenstore_data',
+ (vm, key, jsonval))
+
+ def write_to_param_xenstore(self, instance_or_vm, mapping):
+ """Takes a dict and writes each key/value pair to the xenstore
+ parameter record for the given vm instance. Any existing data for
+ those keys is overwritten.
+ """
+ for k, v in mapping.iteritems():
+ self.add_to_param_xenstore(instance_or_vm, k, v)
+
+ def remove_from_param_xenstore(self, instance_or_vm, key_or_keys):
+ """Takes either a single key or a list of keys and removes
+ them from the xenstore parameter record data for the given VM.
+ If the key doesn't exist, the request is ignored.
+ """
+ vm = self._get_vm_opaque_ref(instance_or_vm)
+ if isinstance(key_or_keys, basestring):
+ keys = [key_or_keys]
+ else:
+ keys = key_or_keys
+ for key in keys:
+ self._session.call_xenapi_request('VM.remove_from_xenstore_data',
+ (vm, key))
+
+ def clear_param_xenstore(self, instance_or_vm):
+ """Removes all data from the xenstore parameter record for this VM."""
+ self.write_to_param_xenstore(instance_or_vm, {})
+ ########################################################################
diff --git a/nova/virt/xenapi/volume_utils.py b/nova/virt/xenapi/volume_utils.py
index 1ca813bcf..4bbc41b03 100644
--- a/nova/virt/xenapi/volume_utils.py
+++ b/nova/virt/xenapi/volume_utils.py
@@ -200,15 +200,19 @@ class VolumeHelper(HelperBase):
return -1
-def _get_volume_id(path):
+def _get_volume_id(path_or_id):
"""Retrieve the volume id from device_path"""
+ # If we have the ID and not a path, just return it.
+ if isinstance(path_or_id, int):
+ return path_or_id
# n must contain at least the volume_id
# /vol- is for remote volumes
# -vol- is for local volumes
# see compute/manager->setup_compute_volume
- volume_id = path[path.find('/vol-') + 1:]
- if volume_id == path:
- volume_id = path[path.find('-vol-') + 1:].replace('--', '-')
+ volume_id = path_or_id[path_or_id.find('/vol-') + 1:]
+ if volume_id == path_or_id:
+ volume_id = path_or_id[path_or_id.find('-vol-') + 1:]
+ volume_id = volume_id.replace('--', '-')
return volume_id
diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py
index f17c8f39d..c48f5b7cb 100644
--- a/nova/virt/xenapi_conn.py
+++ b/nova/virt/xenapi_conn.py
@@ -1,6 +1,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Citrix Systems, Inc.
+# Copyright 2010 OpenStack LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -19,15 +20,15 @@ A connection to XenServer or Xen Cloud Platform.
The concurrency model for this class is as follows:
-All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator
-deferredToThread). They are remote calls, and so may hang for the usual
-reasons. They should not be allowed to block the reactor thread.
+All XenAPI calls are on a green thread (using eventlet's "tpool"
+thread pool). They are remote calls, and so may hang for the usual
+reasons.
All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
-(using XenAPI.VM.async_start etc). These return a task, which can then be
-polled for completion. Polling is handled using reactor.callLater.
+(using XenAPI.VM.async_start etc). These return a task, which can then be
+polled for completion.
-This combination of techniques means that we don't block the reactor thread at
+This combination of techniques means that we don't block the main thread at
all, and at the same time we don't hold lots of threads waiting for
long-running operations.
@@ -81,7 +82,7 @@ flags.DEFINE_string('xenapi_connection_password',
flags.DEFINE_float('xenapi_task_poll_interval',
0.5,
'The interval used for polling of remote tasks '
- '(Async.VM.start, etc). Used only if '
+ '(Async.VM.start, etc). Used only if '
'connection_type=xenapi.')
flags.DEFINE_float('xenapi_vhd_coalesce_poll_interval',
5.0,
@@ -213,6 +214,14 @@ class XenAPISession(object):
f = f.__getattr__(m)
return tpool.execute(f, *args)
+ def call_xenapi_request(self, method, *args):
+ """Some interactions with dom0, such as interacting with xenstore's
+ param record, require using the xenapi_request method of the session
+ object. This wraps that call on a background thread.
+ """
+ f = self._session.xenapi_request
+ return tpool.execute(f, method, *args)
+
def async_call_plugin(self, plugin, fn, args):
"""Call Async.host.call_plugin on a background thread."""
return tpool.execute(self._unwrap_plugin_exceptions,
@@ -222,7 +231,6 @@ class XenAPISession(object):
def wait_for_task(self, id, task):
"""Return the result of the given task. The task is polled
until it completes."""
-
done = event.Event()
loop = utils.LoopingCall(self._poll_task, id, task, done)
loop.start(FLAGS.xenapi_task_poll_interval, now=True)
@@ -235,7 +243,7 @@ class XenAPISession(object):
return self.XenAPI.Session(url)
def _poll_task(self, id, task, done):
- """Poll the given XenAPI task, and fire the given Deferred if we
+ """Poll the given XenAPI task, and fire the given action if we
get a result."""
try:
name = self._session.xenapi.task.get_name_label(task)
@@ -290,7 +298,7 @@ class XenAPISession(object):
def _parse_xmlrpc_value(val):
- """Parse the given value as if it were an XML-RPC value. This is
+ """Parse the given value as if it were an XML-RPC value. This is
sometimes used as the format for the task.result field."""
if not val:
return val
diff --git a/nova/volume/__init__.py b/nova/volume/__init__.py
index d6e944fc0..56ef9332e 100644
--- a/nova/volume/__init__.py
+++ b/nova/volume/__init__.py
@@ -16,16 +16,4 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""
-:mod:`nova.volume` -- Nova Block Storage
-=====================================================
-
-.. automodule:: nova.volume
- :platform: Unix
-.. moduleauthor:: Jesse Andrews <jesse@ansolabs.com>
-.. moduleauthor:: Devin Carlen <devin.carlen@gmail.com>
-.. moduleauthor:: Vishvananda Ishaya <vishvananda@yahoo.com>
-.. moduleauthor:: Joshua McKenty <joshua@cognition.ca>
-.. moduleauthor:: Manish Singh <yosh@gimp.org>
-.. moduleauthor:: Andy Smith <andy@anarkystic.com>
-"""
+from nova.volume.api import API
diff --git a/nova/volume/api.py b/nova/volume/api.py
new file mode 100644
index 000000000..2d7fe3762
--- /dev/null
+++ b/nova/volume/api.py
@@ -0,0 +1,101 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Handles all requests relating to volumes.
+"""
+
+import datetime
+import logging
+
+from nova import db
+from nova import exception
+from nova import flags
+from nova import quota
+from nova import rpc
+from nova.db import base
+
+FLAGS = flags.FLAGS
+flags.DECLARE('storage_availability_zone', 'nova.volume.manager')
+
+
+class API(base.Base):
+ """API for interacting with the volume manager."""
+
+ def create(self, context, size, name, description):
+ if quota.allowed_volumes(context, 1, size) < 1:
+ logging.warn("Quota exceeeded for %s, tried to create %sG volume",
+ context.project_id, size)
+ raise quota.QuotaError("Volume quota exceeded. You cannot "
+ "create a volume of size %s" % size)
+
+ options = {
+ 'size': size,
+ 'user_id': context.user.id,
+ 'project_id': context.project_id,
+ 'availability_zone': FLAGS.storage_availability_zone,
+ 'status': "creating",
+ 'attach_status': "detached",
+ 'display_name': name,
+ 'display_description': description}
+
+ volume = self.db.volume_create(context, options)
+ rpc.cast(context,
+ FLAGS.scheduler_topic,
+ {"method": "create_volume",
+ "args": {"topic": FLAGS.volume_topic,
+ "volume_id": volume['id']}})
+ return volume
+
+ def delete(self, context, volume_id):
+ volume = self.get(context, volume_id)
+ if volume['status'] != "available":
+ raise exception.ApiError(_("Volume status must be available"))
+ now = datetime.datetime.utcnow()
+ self.db.volume_update(context, volume_id, {'status': 'deleting',
+ 'terminated_at': now})
+ host = volume['host']
+ rpc.cast(context,
+ self.db.queue_get_for(context, FLAGS.volume_topic, host),
+ {"method": "delete_volume",
+ "args": {"volume_id": volume_id}})
+
+ def update(self, context, volume_id, fields):
+ self.db.volume_update(context, volume_id, fields)
+
+ def get(self, context, volume_id):
+ return self.db.volume_get(context, volume_id)
+
+ def get_all(self, context):
+ if context.user.is_admin():
+ return self.db.volume_get_all(context)
+ return self.db.volume_get_all_by_project(context, context.project_id)
+
+ def check_attach(self, context, volume_id):
+ volume = self.get(context, volume_id)
+ # TODO(vish): abstract status checking?
+ if volume['status'] != "available":
+ raise exception.ApiError(_("Volume status must be available"))
+ if volume['attach_status'] == "attached":
+ raise exception.ApiError(_("Volume is already attached"))
+
+ def check_detach(self, context, volume_id):
+ volume = self.get(context, volume_id)
+ # TODO(vish): abstract status checking?
+ if volume['status'] == "available":
+ raise exception.ApiError(_("Volume is already detached"))
diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/pluginlib_nova.py b/plugins/xenserver/xenapi/etc/xapi.d/plugins/pluginlib_nova.py
index 2d323a016..8e7a829d5 100755
--- a/plugins/xenserver/xenapi/etc/xapi.d/plugins/pluginlib_nova.py
+++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/pluginlib_nova.py
@@ -45,6 +45,7 @@ class PluginError(Exception):
def __init__(self, *args):
Exception.__init__(self, *args)
+
class ArgumentError(PluginError):
"""Raised when required arguments are missing, argument values are invalid,
or incompatible arguments are given.
@@ -67,6 +68,7 @@ def ignore_failure(func, *args, **kwargs):
ARGUMENT_PATTERN = re.compile(r'^[a-zA-Z0-9_:\.\-,]+$')
+
def validate_exists(args, key, default=None):
"""Validates that a string argument to a RPC method call is given, and
matches the shell-safe regex, with an optional default value in case it
@@ -76,20 +78,24 @@ def validate_exists(args, key, default=None):
"""
if key in args:
if len(args[key]) == 0:
- raise ArgumentError('Argument %r value %r is too short.' % (key, args[key]))
+ raise ArgumentError('Argument %r value %r is too short.' %
+ (key, args[key]))
if not ARGUMENT_PATTERN.match(args[key]):
- raise ArgumentError('Argument %r value %r contains invalid characters.' % (key, args[key]))
+ raise ArgumentError('Argument %r value %r contains invalid '
+ 'characters.' % (key, args[key]))
if args[key][0] == '-':
- raise ArgumentError('Argument %r value %r starts with a hyphen.' % (key, args[key]))
+ raise ArgumentError('Argument %r value %r starts with a hyphen.'
+ % (key, args[key]))
return args[key]
elif default is not None:
return default
else:
raise ArgumentError('Argument %s is required.' % key)
+
def validate_bool(args, key, default=None):
- """Validates that a string argument to a RPC method call is a boolean string,
- with an optional default value in case it does not exist.
+ """Validates that a string argument to a RPC method call is a boolean
+ string, with an optional default value in case it does not exist.
Returns the python boolean value.
"""
@@ -99,7 +105,9 @@ def validate_bool(args, key, default=None):
elif value.lower() == 'false':
return False
else:
- raise ArgumentError("Argument %s may not take value %r. Valid values are ['true', 'false']." % (key, value))
+ raise ArgumentError("Argument %s may not take value %r. "
+ "Valid values are ['true', 'false']." % (key, value))
+
def exists(args, key):
"""Validates that a freeform string argument to a RPC method call is given.
@@ -110,6 +118,7 @@ def exists(args, key):
else:
raise ArgumentError('Argument %s is required.' % key)
+
def optional(args, key):
"""If the given key is in args, return the corresponding value, otherwise
return None"""
@@ -122,13 +131,14 @@ def get_this_host(session):
def get_domain_0(session):
this_host_ref = get_this_host(session)
- expr = 'field "is_control_domain" = "true" and field "resident_on" = "%s"' % this_host_ref
+ expr = 'field "is_control_domain" = "true" and field "resident_on" = "%s"'
+ expr = expr % this_host_ref
return session.xenapi.VM.get_all_records_where(expr).keys()[0]
def create_vdi(session, sr_ref, name_label, virtual_size, read_only):
vdi_ref = session.xenapi.VDI.create(
- { 'name_label': name_label,
+ {'name_label': name_label,
'name_description': '',
'SR': sr_ref,
'virtual_size': str(virtual_size),
@@ -138,7 +148,7 @@ def create_vdi(session, sr_ref, name_label, virtual_size, read_only):
'xenstore_data': {},
'other_config': {},
'sm_config': {},
- 'tags': [] })
+ 'tags': []})
logging.debug('Created VDI %s (%s, %s, %s) on %s.', vdi_ref, name_label,
virtual_size, read_only, sr_ref)
return vdi_ref
diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenstore.py b/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenstore.py
new file mode 100755
index 000000000..695bf3448
--- /dev/null
+++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenstore.py
@@ -0,0 +1,180 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+# Copyright 2010 OpenStack LLC.
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+#
+# XenAPI plugin for reading/writing information to xenstore
+#
+
+try:
+ import json
+except ImportError:
+ import simplejson as json
+import subprocess
+
+import XenAPIPlugin
+
+import pluginlib_nova as pluginlib
+pluginlib.configure_logging("xenstore")
+
+
+def jsonify(fnc):
+ def wrapper(*args, **kwargs):
+ return json.dumps(fnc(*args, **kwargs))
+ return wrapper
+
+
+@jsonify
+def read_record(self, arg_dict):
+ """Returns the value stored at the given path for the given dom_id.
+ These must be encoded as key/value pairs in arg_dict. You can
+ optinally include a key 'ignore_missing_path'; if this is present
+ and boolean True, attempting to read a non-existent path will return
+ the string 'None' instead of raising an exception.
+ """
+ cmd = "xenstore-read /local/domain/%(dom_id)s/%(path)s" % arg_dict
+ try:
+ return _run_command(cmd).rstrip("\n")
+ except pluginlib.PluginError, e:
+ if arg_dict.get("ignore_missing_path", False):
+ cmd = "xenstore-exists /local/domain/%(dom_id)s/%(path)s; echo $?"
+ cmd = cmd % arg_dict
+ ret = _run_command(cmd).strip()
+ # If the path exists, the cmd should return "0"
+ if ret != "0":
+ # No such path, so ignore the error and return the
+ # string 'None', since None can't be marshalled
+ # over RPC.
+ return "None"
+ # Either we shouldn't ignore path errors, or another
+ # error was hit. Re-raise.
+ raise
+
+
+@jsonify
+def write_record(self, arg_dict):
+ """Writes to xenstore at the specified path. If there is information
+ already stored in that location, it is overwritten. As in read_record,
+ the dom_id and path must be specified in the arg_dict; additionally,
+ you must specify a 'value' key, whose value must be a string. Typically,
+ you can json-ify more complex values and store the json output.
+ """
+ cmd = "xenstore-write /local/domain/%(dom_id)s/%(path)s '%(value)s'"
+ cmd = cmd % arg_dict
+ _run_command(cmd)
+ return arg_dict["value"]
+
+
+@jsonify
+def list_records(self, arg_dict):
+ """Returns all the stored data at or below the given path for the
+ given dom_id. The data is returned as a json-ified dict, with the
+ path as the key and the stored value as the value. If the path
+ doesn't exist, an empty dict is returned.
+ """
+ cmd = "xenstore-ls /local/domain/%(dom_id)s/%(path)s" % arg_dict
+ cmd = cmd.rstrip("/")
+ try:
+ recs = _run_command(cmd)
+ except pluginlib.PluginError, e:
+ if "No such file or directory" in "%s" % e:
+ # Path doesn't exist.
+ return {}
+ return str(e)
+ raise
+ base_path = arg_dict["path"]
+ paths = _paths_from_ls(recs)
+ ret = {}
+ for path in paths:
+ if base_path:
+ arg_dict["path"] = "%s/%s" % (base_path, path)
+ else:
+ arg_dict["path"] = path
+ rec = read_record(self, arg_dict)
+ try:
+ val = json.loads(rec)
+ except ValueError:
+ val = rec
+ ret[path] = val
+ return ret
+
+
+@jsonify
+def delete_record(self, arg_dict):
+ """Just like it sounds: it removes the record for the specified
+ VM and the specified path from xenstore.
+ """
+ cmd = "xenstore-rm /local/domain/%(dom_id)s/%(path)s" % arg_dict
+ return _run_command(cmd)
+
+
+def _paths_from_ls(recs):
+ """The xenstore-ls command returns a listing that isn't terribly
+ useful. This method cleans that up into a dict with each path
+ as the key, and the associated string as the value.
+ """
+ ret = {}
+ last_nm = ""
+ level = 0
+ path = []
+ ret = []
+ for ln in recs.splitlines():
+ nm, val = ln.rstrip().split(" = ")
+ barename = nm.lstrip()
+ this_level = len(nm) - len(barename)
+ if this_level == 0:
+ ret.append(barename)
+ level = 0
+ path = []
+ elif this_level == level:
+ # child of same parent
+ ret.append("%s/%s" % ("/".join(path), barename))
+ elif this_level > level:
+ path.append(last_nm)
+ ret.append("%s/%s" % ("/".join(path), barename))
+ level = this_level
+ elif this_level < level:
+ path = path[:this_level]
+ ret.append("%s/%s" % ("/".join(path), barename))
+ level = this_level
+ last_nm = barename
+ return ret
+
+
+def _run_command(cmd):
+ """Abstracts out the basics of issuing system commands. If the command
+ returns anything in stderr, a PluginError is raised with that information.
+ Otherwise, the output from stdout is returned.
+ """
+ pipe = subprocess.PIPE
+ proc = subprocess.Popen([cmd], shell=True, stdin=pipe, stdout=pipe,
+ stderr=pipe, close_fds=True)
+ proc.wait()
+ err = proc.stderr.read()
+ if err:
+ raise pluginlib.PluginError(err)
+ return proc.stdout.read()
+
+
+if __name__ == "__main__":
+ XenAPIPlugin.dispatch(
+ {"read_record": read_record,
+ "write_record": write_record,
+ "list_records": list_records,
+ "delete_record": delete_record})