diff options
| author | Michael Gundlach <michael.gundlach@rackspace.com> | 2010-09-21 19:13:05 +0000 |
|---|---|---|
| committer | Tarmac <> | 2010-09-21 19:13:05 +0000 |
| commit | d6104d8302057d45fa150079b5911f941cc311ce (patch) | |
| tree | 61394bf79f7790d6e62af3f9883ac3b2ddbc7f6a /nova/api | |
| parent | ce0a9b7b36ba816c347f10a1804aedf337ad35da (diff) | |
| parent | b82a9e3d3ca46e69a1583dea51a474456b867e6f (diff) | |
| download | nova-d6104d8302057d45fa150079b5911f941cc311ce.tar.gz nova-d6104d8302057d45fa150079b5911f941cc311ce.tar.xz nova-d6104d8302057d45fa150079b5911f941cc311ce.zip | |
Delete nova.endpoint module, which used Tornado to serve up the Amazon EC2 API.
Replace it with nova.api.ec2 module, which serves up the same API via a WSGI app in Eventlet.
Convert relevant unit tests from Twisted to eventlet.
The unit tests now pass using eventlet 0.9.12 -- you'll need to 'pip install -U eventlet' or rebuild your venv.
Note that I tried to do this in discrete commits, so you may find it easier to look at each small diff than to try to grok the whole merge diff.
Diffstat (limited to 'nova/api')
| -rw-r--r-- | nova/api/__init__.py | 2 | ||||
| -rw-r--r-- | nova/api/ec2/__init__.py | 222 | ||||
| -rw-r--r-- | nova/api/ec2/admin.py | 183 | ||||
| -rw-r--r-- | nova/api/ec2/apirequest.py | 133 | ||||
| -rw-r--r-- | nova/api/ec2/cloud.py | 729 | ||||
| -rw-r--r-- | nova/api/ec2/context.py | 33 | ||||
| -rw-r--r-- | nova/api/ec2/images.py | 108 |
7 files changed, 1394 insertions, 16 deletions
diff --git a/nova/api/__init__.py b/nova/api/__init__.py index 9f116dada..821f1deea 100644 --- a/nova/api/__init__.py +++ b/nova/api/__init__.py @@ -35,7 +35,7 @@ class API(wsgi.Router): mapper = routes.Mapper() mapper.connect("/", controller=self.versions) mapper.connect("/v1.0/{path_info:.*}", controller=rackspace.API()) - mapper.connect("/ec2/{path_info:.*}", controller=ec2.API()) + mapper.connect("/services/{path_info:.*}", controller=ec2.API()) super(API, self).__init__(mapper) @webob.dec.wsgify diff --git a/nova/api/ec2/__init__.py b/nova/api/ec2/__init__.py index 6eec0abf7..a7b10e428 100644 --- a/nova/api/ec2/__init__.py +++ b/nova/api/ec2/__init__.py @@ -1,6 +1,7 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2010 OpenStack LLC. +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -15,28 +16,219 @@ # License for the specific language governing permissions and limitations # under the License. -""" -WSGI middleware for EC2 API controllers. -""" +"""Starting point for routing EC2 requests""" +import logging import routes +import webob import webob.dec +import webob.exc +from nova import exception from nova import wsgi +from nova.api.ec2 import apirequest +from nova.api.ec2 import context +from nova.api.ec2 import admin +from nova.api.ec2 import cloud +from nova.auth import manager -class API(wsgi.Router): - """Routes EC2 requests to the appropriate controller.""" +_log = logging.getLogger("api") +_log.setLevel(logging.DEBUG) + + +class API(wsgi.Middleware): + + """Routing for all EC2 API requests.""" def __init__(self): - mapper = routes.Mapper() - mapper.connect(None, "{all:.*}", controller=self.dummy) - super(API, self).__init__(mapper) + self.application = Authenticate(Router(Authorizer(Executor()))) + + +class Authenticate(wsgi.Middleware): + + """Authenticate an EC2 request and add 'ec2.context' to WSGI environ.""" + + @webob.dec.wsgify + def __call__(self, req): + # Read request signature and access id. + try: + signature = req.params['Signature'] + access = req.params['AWSAccessKeyId'] + except: + raise webob.exc.HTTPBadRequest() + + # Make a copy of args for authentication and signature verification. + auth_params = dict(req.params) + auth_params.pop('Signature') # not part of authentication args + + # Authenticate the request. + try: + (user, project) = manager.AuthManager().authenticate( + access, + signature, + auth_params, + req.method, + req.host, + req.path) + except exception.Error, ex: + logging.debug("Authentication Failure: %s" % ex) + raise webob.exc.HTTPForbidden() + + # Authenticated! + req.environ['ec2.context'] = context.APIRequestContext(user, project) + return self.application + + +class Router(wsgi.Middleware): + + """Add ec2.'controller', .'action', and .'action_args' to WSGI environ.""" + + def __init__(self, application): + super(Router, self).__init__(application) + self.map = routes.Mapper() + self.map.connect("/{controller_name}/") + self.controllers = dict(Cloud=cloud.CloudController(), + Admin=admin.AdminController()) + + @webob.dec.wsgify + def __call__(self, req): + # Obtain the appropriate controller and action for this request. + try: + match = self.map.match(req.path_info) + controller_name = match['controller_name'] + controller = self.controllers[controller_name] + except: + raise webob.exc.HTTPNotFound() + non_args = ['Action', 'Signature', 'AWSAccessKeyId', 'SignatureMethod', + 'SignatureVersion', 'Version', 'Timestamp'] + args = dict(req.params) + try: + action = req.params['Action'] # raise KeyError if omitted + for non_arg in non_args: + args.pop(non_arg) # remove, but raise KeyError if omitted + except: + raise webob.exc.HTTPBadRequest() + + _log.debug('action: %s' % action) + for key, value in args.items(): + _log.debug('arg: %s\t\tval: %s' % (key, value)) + + # Success! + req.environ['ec2.controller'] = controller + req.environ['ec2.action'] = action + req.environ['ec2.action_args'] = args + return self.application + + +class Authorizer(wsgi.Middleware): + + """Authorize an EC2 API request. + + Return a 401 if ec2.controller and ec2.action in WSGI environ may not be + executed in ec2.context. + """ + + def __init__(self, application): + super(Authorizer, self).__init__(application) + self.action_roles = { + 'CloudController': { + 'DescribeAvailabilityzones': ['all'], + 'DescribeRegions': ['all'], + 'DescribeSnapshots': ['all'], + 'DescribeKeyPairs': ['all'], + 'CreateKeyPair': ['all'], + 'DeleteKeyPair': ['all'], + 'DescribeSecurityGroups': ['all'], + 'CreateSecurityGroup': ['netadmin'], + 'DeleteSecurityGroup': ['netadmin'], + 'GetConsoleOutput': ['projectmanager', 'sysadmin'], + 'DescribeVolumes': ['projectmanager', 'sysadmin'], + 'CreateVolume': ['projectmanager', 'sysadmin'], + 'AttachVolume': ['projectmanager', 'sysadmin'], + 'DetachVolume': ['projectmanager', 'sysadmin'], + 'DescribeInstances': ['all'], + 'DescribeAddresses': ['all'], + 'AllocateAddress': ['netadmin'], + 'ReleaseAddress': ['netadmin'], + 'AssociateAddress': ['netadmin'], + 'DisassociateAddress': ['netadmin'], + 'RunInstances': ['projectmanager', 'sysadmin'], + 'TerminateInstances': ['projectmanager', 'sysadmin'], + 'RebootInstances': ['projectmanager', 'sysadmin'], + 'DeleteVolume': ['projectmanager', 'sysadmin'], + 'DescribeImages': ['all'], + 'DeregisterImage': ['projectmanager', 'sysadmin'], + 'RegisterImage': ['projectmanager', 'sysadmin'], + 'DescribeImageAttribute': ['all'], + 'ModifyImageAttribute': ['projectmanager', 'sysadmin'], + }, + 'AdminController': { + # All actions have the same permission: [] (the default) + # admins will be allowed to run them + # all others will get HTTPUnauthorized. + }, + } - @staticmethod @webob.dec.wsgify - def dummy(req): - """Temporary dummy controller.""" - msg = "dummy response -- please hook up __init__() to cloud.py instead" - return repr({'dummy': msg, - 'kwargs': repr(req.environ['wsgiorg.routing_args'][1])}) + def __call__(self, req): + context = req.environ['ec2.context'] + controller_name = req.environ['ec2.controller'].__class__.__name__ + action = req.environ['ec2.action'] + allowed_roles = self.action_roles[controller_name].get(action, []) + if self._matches_any_role(context, allowed_roles): + return self.application + else: + raise webob.exc.HTTPUnauthorized() + + def _matches_any_role(self, context, roles): + """Return True if any role in roles is allowed in context.""" + if 'all' in roles: + return True + if 'none' in roles: + return False + return any(context.project.has_role(context.user.id, role) + for role in roles) + + +class Executor(wsgi.Application): + + """Execute an EC2 API request. + + Executes 'ec2.action' upon 'ec2.controller', passing 'ec2.context' and + 'ec2.action_args' (all variables in WSGI environ.) Returns an XML + response, or a 400 upon failure. + """ + + @webob.dec.wsgify + def __call__(self, req): + context = req.environ['ec2.context'] + controller = req.environ['ec2.controller'] + action = req.environ['ec2.action'] + args = req.environ['ec2.action_args'] + + api_request = apirequest.APIRequest(controller, action) + try: + result = api_request.send(context, **args) + req.headers['Content-Type'] = 'text/xml' + return result + except exception.ApiError as ex: + + if ex.code: + return self._error(req, ex.code, ex.message) + else: + return self._error(req, type(ex).__name__, ex.message) + # TODO(vish): do something more useful with unknown exceptions + except Exception as ex: + return self._error(req, type(ex).__name__, str(ex)) + + def _error(self, req, code, message): + resp = webob.Response() + resp.status = 400 + resp.headers['Content-Type'] = 'text/xml' + resp.body = ('<?xml version="1.0"?>\n' + '<Response><Errors><Error><Code>%s</Code>' + '<Message>%s</Message></Error></Errors>' + '<RequestID>?</RequestID></Response>') % (code, message) + return resp + diff --git a/nova/api/ec2/admin.py b/nova/api/ec2/admin.py new file mode 100644 index 000000000..36feae451 --- /dev/null +++ b/nova/api/ec2/admin.py @@ -0,0 +1,183 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Admin API controller, exposed through http via the api worker. +""" + +import base64 + +from nova import db +from nova import exception +from nova.auth import manager + + +def user_dict(user, base64_file=None): + """Convert the user object to a result dict""" + if user: + return { + 'username': user.id, + 'accesskey': user.access, + 'secretkey': user.secret, + 'file': base64_file} + else: + return {} + + +def project_dict(project): + """Convert the project object to a result dict""" + if project: + return { + 'projectname': project.id, + 'project_manager_id': project.project_manager_id, + 'description': project.description} + else: + return {} + + +def host_dict(host): + """Convert a host model object to a result dict""" + if host: + return host.state + else: + return {} + + +class AdminController(object): + """ + API Controller for users, hosts, nodes, and workers. + """ + + def __str__(self): + return 'AdminController' + + def describe_user(self, _context, name, **_kwargs): + """Returns user data, including access and secret keys.""" + return user_dict(manager.AuthManager().get_user(name)) + + def describe_users(self, _context, **_kwargs): + """Returns all users - should be changed to deal with a list.""" + return {'userSet': + [user_dict(u) for u in manager.AuthManager().get_users()] } + + def register_user(self, _context, name, **_kwargs): + """Creates a new user, and returns generated credentials.""" + return user_dict(manager.AuthManager().create_user(name)) + + def deregister_user(self, _context, name, **_kwargs): + """Deletes a single user (NOT undoable.) + Should throw an exception if the user has instances, + volumes, or buckets remaining. + """ + manager.AuthManager().delete_user(name) + + return True + + def describe_roles(self, context, project_roles=True, **kwargs): + """Returns a list of allowed roles.""" + roles = manager.AuthManager().get_roles(project_roles) + return { 'roles': [{'role': r} for r in roles]} + + def describe_user_roles(self, context, user, project=None, **kwargs): + """Returns a list of roles for the given user. + Omitting project will return any global roles that the user has. + Specifying project will return only project specific roles. + """ + roles = manager.AuthManager().get_user_roles(user, project=project) + return { 'roles': [{'role': r} for r in roles]} + + def modify_user_role(self, context, user, role, project=None, + operation='add', **kwargs): + """Add or remove a role for a user and project.""" + if operation == 'add': + manager.AuthManager().add_role(user, role, project) + elif operation == 'remove': + manager.AuthManager().remove_role(user, role, project) + else: + raise exception.ApiError('operation must be add or remove') + + return True + + def generate_x509_for_user(self, _context, name, project=None, **kwargs): + """Generates and returns an x509 certificate for a single user. + Is usually called from a client that will wrap this with + access and secret key info, and return a zip file. + """ + if project is None: + project = name + project = manager.AuthManager().get_project(project) + user = manager.AuthManager().get_user(name) + return user_dict(user, base64.b64encode(project.get_credentials(user))) + + def describe_project(self, context, name, **kwargs): + """Returns project data, including member ids.""" + return project_dict(manager.AuthManager().get_project(name)) + + def describe_projects(self, context, user=None, **kwargs): + """Returns all projects - should be changed to deal with a list.""" + return {'projectSet': + [project_dict(u) for u in + manager.AuthManager().get_projects(user=user)]} + + def register_project(self, context, name, manager_user, description=None, + member_users=None, **kwargs): + """Creates a new project""" + return project_dict( + manager.AuthManager().create_project( + name, + manager_user, + description=None, + member_users=None)) + + def deregister_project(self, context, name): + """Permanently deletes a project.""" + manager.AuthManager().delete_project(name) + return True + + def describe_project_members(self, context, name, **kwargs): + project = manager.AuthManager().get_project(name) + result = { + 'members': [{'member': m} for m in project.member_ids]} + return result + + def modify_project_member(self, context, user, project, operation, **kwargs): + """Add or remove a user from a project.""" + if operation =='add': + manager.AuthManager().add_to_project(user, project) + elif operation == 'remove': + manager.AuthManager().remove_from_project(user, project) + else: + raise exception.ApiError('operation must be add or remove') + return True + + # FIXME(vish): these host commands don't work yet, perhaps some of the + # required data can be retrieved from service objects? + def describe_hosts(self, _context, **_kwargs): + """Returns status info for all nodes. Includes: + * Disk Space + * Instance List + * RAM used + * CPU used + * DHCP servers running + * Iptables / bridges + """ + return {'hostSet': [host_dict(h) for h in db.host_get_all()]} + + def describe_host(self, _context, name, **_kwargs): + """Returns status info for single node.""" + return host_dict(db.host_get(name)) diff --git a/nova/api/ec2/apirequest.py b/nova/api/ec2/apirequest.py new file mode 100644 index 000000000..a3b20118f --- /dev/null +++ b/nova/api/ec2/apirequest.py @@ -0,0 +1,133 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +APIRequest class +""" + +import logging +import re +# TODO(termie): replace minidom with etree +from xml.dom import minidom + +_log = logging.getLogger("api") +_log.setLevel(logging.DEBUG) + + +_c2u = re.compile('(((?<=[a-z])[A-Z])|([A-Z](?![A-Z]|$)))') + + +def _camelcase_to_underscore(str): + return _c2u.sub(r'_\1', str).lower().strip('_') + + +def _underscore_to_camelcase(str): + return ''.join([x[:1].upper() + x[1:] for x in str.split('_')]) + + +def _underscore_to_xmlcase(str): + res = _underscore_to_camelcase(str) + return res[:1].lower() + res[1:] + + +class APIRequest(object): + def __init__(self, controller, action): + self.controller = controller + self.action = action + + def send(self, context, **kwargs): + try: + method = getattr(self.controller, + _camelcase_to_underscore(self.action)) + except AttributeError: + _error = ('Unsupported API request: controller = %s,' + 'action = %s') % (self.controller, self.action) + _log.warning(_error) + # TODO: Raise custom exception, trap in apiserver, + # and reraise as 400 error. + raise Exception(_error) + + args = {} + for key, value in kwargs.items(): + parts = key.split(".") + key = _camelcase_to_underscore(parts[0]) + if len(parts) > 1: + d = args.get(key, {}) + d[parts[1]] = value[0] + value = d + else: + value = value[0] + args[key] = value + + for key in args.keys(): + if isinstance(args[key], dict): + if args[key] != {} and args[key].keys()[0].isdigit(): + s = args[key].items() + s.sort() + args[key] = [v for k, v in s] + + result = method(context, **args) + return self._render_response(result, context.request_id) + + def _render_response(self, response_data, request_id): + xml = minidom.Document() + + response_el = xml.createElement(self.action + 'Response') + response_el.setAttribute('xmlns', + 'http://ec2.amazonaws.com/doc/2009-11-30/') + request_id_el = xml.createElement('requestId') + request_id_el.appendChild(xml.createTextNode(request_id)) + response_el.appendChild(request_id_el) + if(response_data == True): + self._render_dict(xml, response_el, {'return': 'true'}) + else: + self._render_dict(xml, response_el, response_data) + + xml.appendChild(response_el) + + response = xml.toxml() + xml.unlink() + _log.debug(response) + return response + + def _render_dict(self, xml, el, data): + try: + for key in data.keys(): + val = data[key] + el.appendChild(self._render_data(xml, key, val)) + except: + _log.debug(data) + raise + + def _render_data(self, xml, el_name, data): + el_name = _underscore_to_xmlcase(el_name) + data_el = xml.createElement(el_name) + + if isinstance(data, list): + for item in data: + data_el.appendChild(self._render_data(xml, 'item', item)) + elif isinstance(data, dict): + self._render_dict(xml, data_el, data) + elif hasattr(data, '__dict__'): + self._render_dict(xml, data_el, data.__dict__) + elif isinstance(data, bool): + data_el.appendChild(xml.createTextNode(str(data).lower())) + elif data != None: + data_el.appendChild(xml.createTextNode(str(data))) + + return data_el diff --git a/nova/api/ec2/cloud.py b/nova/api/ec2/cloud.py new file mode 100644 index 000000000..367511e3b --- /dev/null +++ b/nova/api/ec2/cloud.py @@ -0,0 +1,729 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Cloud Controller: Implementation of EC2 REST API calls, which are +dispatched to other nodes via AMQP RPC. State is via distributed +datastore. +""" + +import base64 +import datetime +import logging +import os +import time + +from nova import crypto +from nova import db +from nova import exception +from nova import flags +from nova import quota +from nova import rpc +from nova import utils +from nova.compute.instance_types import INSTANCE_TYPES +from nova.api.ec2 import images + + +FLAGS = flags.FLAGS +flags.DECLARE('storage_availability_zone', 'nova.volume.manager') + + +class QuotaError(exception.ApiError): + """Quota Exceeeded""" + pass + + +def _gen_key(context, user_id, key_name): + """Generate a key + + This is a module level method because it is slow and we need to defer + it into a process pool.""" + # NOTE(vish): generating key pair is slow so check for legal + # creation before creating key_pair + try: + db.key_pair_get(context, user_id, key_name) + raise exception.Duplicate("The key_pair %s already exists" + % key_name) + except exception.NotFound: + pass + private_key, public_key, fingerprint = crypto.generate_key_pair() + key = {} + key['user_id'] = user_id + key['name'] = key_name + key['public_key'] = public_key + key['fingerprint'] = fingerprint + db.key_pair_create(context, key) + return {'private_key': private_key, 'fingerprint': fingerprint} + + +class CloudController(object): + """ CloudController provides the critical dispatch between + inbound API calls through the endpoint and messages + sent to the other nodes. +""" + def __init__(self): + self.network_manager = utils.import_object(FLAGS.network_manager) + self.setup() + + def __str__(self): + return 'CloudController' + + def setup(self): + """ Ensure the keychains and folders exist. """ + # FIXME(ja): this should be moved to a nova-manage command, + # if not setup throw exceptions instead of running + # Create keys folder, if it doesn't exist + if not os.path.exists(FLAGS.keys_path): + os.makedirs(FLAGS.keys_path) + # Gen root CA, if we don't have one + root_ca_path = os.path.join(FLAGS.ca_path, FLAGS.ca_file) + if not os.path.exists(root_ca_path): + start = os.getcwd() + os.chdir(FLAGS.ca_path) + # TODO(vish): Do this with M2Crypto instead + utils.runthis("Generating root CA: %s", "sh genrootca.sh") + os.chdir(start) + + def _get_mpi_data(self, project_id): + result = {} + for instance in db.instance_get_by_project(None, project_id): + if instance['fixed_ip']: + line = '%s slots=%d' % (instance['fixed_ip']['str_id'], + INSTANCE_TYPES[instance['instance_type']]['vcpus']) + key = str(instance['key_name']) + if key in result: + result[key].append(line) + else: + result[key] = [line] + return result + + def get_metadata(self, address): + instance_ref = db.fixed_ip_get_instance(None, address) + if instance_ref is None: + return None + mpi = self._get_mpi_data(instance_ref['project_id']) + if instance_ref['key_name']: + keys = { + '0': { + '_name': instance_ref['key_name'], + 'openssh-key': instance_ref['key_data'] + } + } + else: + keys = '' + hostname = instance_ref['hostname'] + floating_ip = db.instance_get_floating_address(None, + instance_ref['id']) + data = { + 'user-data': base64.b64decode(instance_ref['user_data']), + 'meta-data': { + 'ami-id': instance_ref['image_id'], + 'ami-launch-index': instance_ref['launch_index'], + 'ami-manifest-path': 'FIXME', + 'block-device-mapping': { # TODO(vish): replace with real data + 'ami': 'sda1', + 'ephemeral0': 'sda2', + 'root': '/dev/sda1', + 'swap': 'sda3' + }, + 'hostname': hostname, + 'instance-action': 'none', + 'instance-id': instance_ref['str_id'], + 'instance-type': instance_ref['instance_type'], + 'local-hostname': hostname, + 'local-ipv4': address, + 'kernel-id': instance_ref['kernel_id'], + 'placement': { + 'availability-zone': 'nova' # TODO(vish): real zone + }, + 'public-hostname': hostname, + 'public-ipv4': floating_ip or '', + 'public-keys': keys, + 'ramdisk-id': instance_ref['ramdisk_id'], + 'reservation-id': instance_ref['reservation_id'], + 'security-groups': '', + 'mpi': mpi + } + } + if False: # TODO(vish): store ancestor ids + data['ancestor-ami-ids'] = [] + if False: # TODO(vish): store product codes + data['product-codes'] = [] + return data + + def describe_availability_zones(self, context, **kwargs): + return {'availabilityZoneInfo': [{'zoneName': 'nova', + 'zoneState': 'available'}]} + + def describe_regions(self, context, region_name=None, **kwargs): + if FLAGS.region_list: + regions = [] + for region in FLAGS.region_list: + name, _sep, url = region.partition('=') + regions.append({'regionName': name, + 'regionEndpoint': url}) + else: + regions = [{'regionName': 'nova', + 'regionEndpoint': FLAGS.ec2_url}] + if region_name: + regions = [r for r in regions if r['regionName'] in region_name] + return {'regionInfo': regions } + + def describe_snapshots(self, + context, + snapshot_id=None, + owner=None, + restorable_by=None, + **kwargs): + return {'snapshotSet': [{'snapshotId': 'fixme', + 'volumeId': 'fixme', + 'status': 'fixme', + 'startTime': 'fixme', + 'progress': 'fixme', + 'ownerId': 'fixme', + 'volumeSize': 0, + 'description': 'fixme'}]} + + def describe_key_pairs(self, context, key_name=None, **kwargs): + key_pairs = db.key_pair_get_all_by_user(context, context.user.id) + if not key_name is None: + key_pairs = [x for x in key_pairs if x['name'] in key_name] + + result = [] + for key_pair in key_pairs: + # filter out the vpn keys + suffix = FLAGS.vpn_key_suffix + if context.user.is_admin() or not key_pair['name'].endswith(suffix): + result.append({ + 'keyName': key_pair['name'], + 'keyFingerprint': key_pair['fingerprint'], + }) + + return {'keypairsSet': result} + + def create_key_pair(self, context, key_name, **kwargs): + data = _gen_key(None, context.user.id, key_name) + return {'keyName': key_name, + 'keyFingerprint': data['fingerprint'], + 'keyMaterial': data['private_key']} + # TODO(vish): when context is no longer an object, pass it here + + def delete_key_pair(self, context, key_name, **kwargs): + try: + db.key_pair_destroy(context, context.user.id, key_name) + except exception.NotFound: + # aws returns true even if the key doesn't exist + pass + return True + + def describe_security_groups(self, context, group_names, **kwargs): + groups = {'securityGroupSet': []} + + # Stubbed for now to unblock other things. + return groups + + def create_security_group(self, context, group_name, **kwargs): + return True + + def delete_security_group(self, context, group_name, **kwargs): + return True + + def get_console_output(self, context, instance_id, **kwargs): + # instance_id is passed in as a list of instances + instance_ref = db.instance_get_by_str(context, instance_id[0]) + return rpc.call('%s.%s' % (FLAGS.compute_topic, + instance_ref['host']), + {"method": "get_console_output", + "args": {"context": None, + "instance_id": instance_ref['id']}}) + + def describe_volumes(self, context, **kwargs): + if context.user.is_admin(): + volumes = db.volume_get_all(context) + else: + volumes = db.volume_get_by_project(context, context.project.id) + + volumes = [self._format_volume(context, v) for v in volumes] + + return {'volumeSet': volumes} + + def _format_volume(self, context, volume): + v = {} + v['volumeId'] = volume['str_id'] + v['status'] = volume['status'] + v['size'] = volume['size'] + v['availabilityZone'] = volume['availability_zone'] + v['createTime'] = volume['created_at'] + if context.user.is_admin(): + v['status'] = '%s (%s, %s, %s, %s)' % ( + volume['status'], + volume['user_id'], + volume['host'], + volume['instance_id'], + volume['mountpoint']) + if volume['attach_status'] == 'attached': + v['attachmentSet'] = [{'attachTime': volume['attach_time'], + 'deleteOnTermination': False, + 'device': volume['mountpoint'], + 'instanceId': volume['instance_id'], + 'status': 'attached', + 'volume_id': volume['str_id']}] + else: + v['attachmentSet'] = [{}] + return v + + def create_volume(self, context, size, **kwargs): + # check quota + if quota.allowed_volumes(context, 1, size) < 1: + logging.warn("Quota exceeeded for %s, tried to create %sG volume", + context.project.id, size) + raise QuotaError("Volume quota exceeded. You cannot " + "create a volume of size %s" % + size) + vol = {} + vol['size'] = size + vol['user_id'] = context.user.id + vol['project_id'] = context.project.id + vol['availability_zone'] = FLAGS.storage_availability_zone + vol['status'] = "creating" + vol['attach_status'] = "detached" + volume_ref = db.volume_create(context, vol) + + rpc.cast(FLAGS.scheduler_topic, + {"method": "create_volume", + "args": {"context": None, + "topic": FLAGS.volume_topic, + "volume_id": volume_ref['id']}}) + + return {'volumeSet': [self._format_volume(context, volume_ref)]} + + + def attach_volume(self, context, volume_id, instance_id, device, **kwargs): + volume_ref = db.volume_get_by_str(context, volume_id) + # TODO(vish): abstract status checking? + if volume_ref['status'] != "available": + raise exception.ApiError("Volume status must be available") + if volume_ref['attach_status'] == "attached": + raise exception.ApiError("Volume is already attached") + instance_ref = db.instance_get_by_str(context, instance_id) + host = instance_ref['host'] + rpc.cast(db.queue_get_for(context, FLAGS.compute_topic, host), + {"method": "attach_volume", + "args": {"context": None, + "volume_id": volume_ref['id'], + "instance_id": instance_ref['id'], + "mountpoint": device}}) + return {'attachTime': volume_ref['attach_time'], + 'device': volume_ref['mountpoint'], + 'instanceId': instance_ref['id'], + 'requestId': context.request_id, + 'status': volume_ref['attach_status'], + 'volumeId': volume_ref['id']} + + def detach_volume(self, context, volume_id, **kwargs): + volume_ref = db.volume_get_by_str(context, volume_id) + instance_ref = db.volume_get_instance(context, volume_ref['id']) + if not instance_ref: + raise exception.ApiError("Volume isn't attached to anything!") + # TODO(vish): abstract status checking? + if volume_ref['status'] == "available": + raise exception.ApiError("Volume is already detached") + try: + host = instance_ref['host'] + rpc.cast(db.queue_get_for(context, FLAGS.compute_topic, host), + {"method": "detach_volume", + "args": {"context": None, + "instance_id": instance_ref['id'], + "volume_id": volume_ref['id']}}) + except exception.NotFound: + # If the instance doesn't exist anymore, + # then we need to call detach blind + db.volume_detached(context) + return {'attachTime': volume_ref['attach_time'], + 'device': volume_ref['mountpoint'], + 'instanceId': instance_ref['str_id'], + 'requestId': context.request_id, + 'status': volume_ref['attach_status'], + 'volumeId': volume_ref['id']} + + def _convert_to_set(self, lst, label): + if lst == None or lst == []: + return None + if not isinstance(lst, list): + lst = [lst] + return [{label: x} for x in lst] + + def describe_instances(self, context, **kwargs): + return self._format_describe_instances(context) + + def _format_describe_instances(self, context): + return { 'reservationSet': self._format_instances(context) } + + def _format_run_instances(self, context, reservation_id): + i = self._format_instances(context, reservation_id) + assert len(i) == 1 + return i[0] + + def _format_instances(self, context, reservation_id=None): + reservations = {} + if reservation_id: + instances = db.instance_get_by_reservation(context, + reservation_id) + else: + if context.user.is_admin(): + instances = db.instance_get_all(context) + else: + instances = db.instance_get_by_project(context, + context.project.id) + for instance in instances: + if not context.user.is_admin(): + if instance['image_id'] == FLAGS.vpn_image_id: + continue + i = {} + i['instanceId'] = instance['str_id'] + i['imageId'] = instance['image_id'] + i['instanceState'] = { + 'code': instance['state'], + 'name': instance['state_description'] + } + fixed_addr = None + floating_addr = None + if instance['fixed_ip']: + fixed_addr = instance['fixed_ip']['str_id'] + if instance['fixed_ip']['floating_ips']: + fixed = instance['fixed_ip'] + floating_addr = fixed['floating_ips'][0]['str_id'] + i['privateDnsName'] = fixed_addr + i['publicDnsName'] = floating_addr + i['dnsName'] = i['publicDnsName'] or i['privateDnsName'] + i['keyName'] = instance['key_name'] + if context.user.is_admin(): + i['keyName'] = '%s (%s, %s)' % (i['keyName'], + instance['project_id'], + instance['host']) + i['productCodesSet'] = self._convert_to_set([], 'product_codes') + i['instanceType'] = instance['instance_type'] + i['launchTime'] = instance['created_at'] + i['amiLaunchIndex'] = instance['launch_index'] + if not reservations.has_key(instance['reservation_id']): + r = {} + r['reservationId'] = instance['reservation_id'] + r['ownerId'] = instance['project_id'] + r['groupSet'] = self._convert_to_set([], 'groups') + r['instancesSet'] = [] + reservations[instance['reservation_id']] = r + reservations[instance['reservation_id']]['instancesSet'].append(i) + + return list(reservations.values()) + + def describe_addresses(self, context, **kwargs): + return self.format_addresses(context) + + def format_addresses(self, context): + addresses = [] + if context.user.is_admin(): + iterator = db.floating_ip_get_all(context) + else: + iterator = db.floating_ip_get_by_project(context, + context.project.id) + for floating_ip_ref in iterator: + address = floating_ip_ref['str_id'] + instance_id = None + if (floating_ip_ref['fixed_ip'] + and floating_ip_ref['fixed_ip']['instance']): + instance_id = floating_ip_ref['fixed_ip']['instance']['str_id'] + address_rv = {'public_ip': address, + 'instance_id': instance_id} + if context.user.is_admin(): + details = "%s (%s)" % (address_rv['instance_id'], + floating_ip_ref['project_id']) + address_rv['instance_id'] = details + addresses.append(address_rv) + return {'addressesSet': addresses} + + def allocate_address(self, context, **kwargs): + # check quota + if quota.allowed_floating_ips(context, 1) < 1: + logging.warn("Quota exceeeded for %s, tried to allocate address", + context.project.id) + raise QuotaError("Address quota exceeded. You cannot " + "allocate any more addresses") + network_topic = self._get_network_topic(context) + public_ip = rpc.call(network_topic, + {"method": "allocate_floating_ip", + "args": {"context": None, + "project_id": context.project.id}}) + return {'addressSet': [{'publicIp': public_ip}]} + + def release_address(self, context, public_ip, **kwargs): + # NOTE(vish): Should we make sure this works? + floating_ip_ref = db.floating_ip_get_by_address(context, public_ip) + network_topic = self._get_network_topic(context) + rpc.cast(network_topic, + {"method": "deallocate_floating_ip", + "args": {"context": None, + "floating_address": floating_ip_ref['str_id']}}) + return {'releaseResponse': ["Address released."]} + + def associate_address(self, context, instance_id, public_ip, **kwargs): + instance_ref = db.instance_get_by_str(context, instance_id) + fixed_ip_ref = db.fixed_ip_get_by_instance(context, instance_ref['id']) + floating_ip_ref = db.floating_ip_get_by_address(context, public_ip) + network_topic = self._get_network_topic(context) + rpc.cast(network_topic, + {"method": "associate_floating_ip", + "args": {"context": None, + "floating_address": floating_ip_ref['str_id'], + "fixed_address": fixed_ip_ref['str_id']}}) + return {'associateResponse': ["Address associated."]} + + def disassociate_address(self, context, public_ip, **kwargs): + floating_ip_ref = db.floating_ip_get_by_address(context, public_ip) + network_topic = self._get_network_topic(context) + rpc.cast(network_topic, + {"method": "disassociate_floating_ip", + "args": {"context": None, + "floating_address": floating_ip_ref['str_id']}}) + return {'disassociateResponse': ["Address disassociated."]} + + def _get_network_topic(self, context): + """Retrieves the network host for a project""" + network_ref = db.project_get_network(context, context.project.id) + host = network_ref['host'] + if not host: + host = rpc.call(FLAGS.network_topic, + {"method": "set_network_host", + "args": {"context": None, + "project_id": context.project.id}}) + return db.queue_get_for(context, FLAGS.network_topic, host) + + def run_instances(self, context, **kwargs): + instance_type = kwargs.get('instance_type', 'm1.small') + if instance_type not in INSTANCE_TYPES: + raise exception.ApiError("Unknown instance type: %s", + instance_type) + # check quota + max_instances = int(kwargs.get('max_count', 1)) + min_instances = int(kwargs.get('min_count', max_instances)) + num_instances = quota.allowed_instances(context, + max_instances, + instance_type) + if num_instances < min_instances: + logging.warn("Quota exceeeded for %s, tried to run %s instances", + context.project.id, min_instances) + raise QuotaError("Instance quota exceeded. You can only " + "run %s more instances of this type." % + num_instances, "InstanceLimitExceeded") + # make sure user can access the image + # vpn image is private so it doesn't show up on lists + vpn = kwargs['image_id'] == FLAGS.vpn_image_id + + if not vpn: + image = images.get(context, kwargs['image_id']) + + # FIXME(ja): if image is vpn, this breaks + # get defaults from imagestore + image_id = image['imageId'] + kernel_id = image.get('kernelId', FLAGS.default_kernel) + ramdisk_id = image.get('ramdiskId', FLAGS.default_ramdisk) + + # API parameters overrides of defaults + kernel_id = kwargs.get('kernel_id', kernel_id) + ramdisk_id = kwargs.get('ramdisk_id', ramdisk_id) + + # make sure we have access to kernel and ramdisk + images.get(context, kernel_id) + images.get(context, ramdisk_id) + + logging.debug("Going to run %s instances...", num_instances) + launch_time = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) + key_data = None + if kwargs.has_key('key_name'): + key_pair_ref = db.key_pair_get(context, + context.user.id, + kwargs['key_name']) + key_data = key_pair_ref['public_key'] + + # TODO: Get the real security group of launch in here + security_group = "default" + + reservation_id = utils.generate_uid('r') + base_options = {} + base_options['state_description'] = 'scheduling' + base_options['image_id'] = image_id + base_options['kernel_id'] = kernel_id + base_options['ramdisk_id'] = ramdisk_id + base_options['reservation_id'] = reservation_id + base_options['key_data'] = key_data + base_options['key_name'] = kwargs.get('key_name', None) + base_options['user_id'] = context.user.id + base_options['project_id'] = context.project.id + base_options['user_data'] = kwargs.get('user_data', '') + base_options['security_group'] = security_group + base_options['instance_type'] = instance_type + + type_data = INSTANCE_TYPES[instance_type] + base_options['memory_mb'] = type_data['memory_mb'] + base_options['vcpus'] = type_data['vcpus'] + base_options['local_gb'] = type_data['local_gb'] + + for num in range(num_instances): + instance_ref = db.instance_create(context, base_options) + inst_id = instance_ref['id'] + + inst = {} + inst['mac_address'] = utils.generate_mac() + inst['launch_index'] = num + inst['hostname'] = instance_ref['str_id'] + db.instance_update(context, inst_id, inst) + address = self.network_manager.allocate_fixed_ip(context, + inst_id, + vpn) + + # TODO(vish): This probably should be done in the scheduler + # network is setup when host is assigned + network_topic = self._get_network_topic(context) + rpc.call(network_topic, + {"method": "setup_fixed_ip", + "args": {"context": None, + "address": address}}) + + rpc.cast(FLAGS.scheduler_topic, + {"method": "run_instance", + "args": {"context": None, + "topic": FLAGS.compute_topic, + "instance_id": inst_id}}) + logging.debug("Casting to scheduler for %s/%s's instance %s" % + (context.project.name, context.user.name, inst_id)) + return self._format_run_instances(context, reservation_id) + + + def terminate_instances(self, context, instance_id, **kwargs): + logging.debug("Going to start terminating instances") + for id_str in instance_id: + logging.debug("Going to try and terminate %s" % id_str) + try: + instance_ref = db.instance_get_by_str(context, id_str) + except exception.NotFound: + logging.warning("Instance %s was not found during terminate" + % id_str) + continue + + now = datetime.datetime.utcnow() + db.instance_update(context, + instance_ref['id'], + {'terminated_at': now}) + # FIXME(ja): where should network deallocate occur? + address = db.instance_get_floating_address(context, + instance_ref['id']) + if address: + logging.debug("Disassociating address %s" % address) + # NOTE(vish): Right now we don't really care if the ip is + # disassociated. We may need to worry about + # checking this later. Perhaps in the scheduler? + network_topic = self._get_network_topic(context) + rpc.cast(network_topic, + {"method": "disassociate_floating_ip", + "args": {"context": None, + "address": address}}) + + address = db.instance_get_fixed_address(context, + instance_ref['id']) + if address: + logging.debug("Deallocating address %s" % address) + # NOTE(vish): Currently, nothing needs to be done on the + # network node until release. If this changes, + # we will need to cast here. + self.network_manager.deallocate_fixed_ip(context, address) + + host = instance_ref['host'] + if host: + rpc.cast(db.queue_get_for(context, FLAGS.compute_topic, host), + {"method": "terminate_instance", + "args": {"context": None, + "instance_id": instance_ref['id']}}) + else: + db.instance_destroy(context, instance_ref['id']) + return True + + def reboot_instances(self, context, instance_id, **kwargs): + """instance_id is a list of instance ids""" + for id_str in instance_id: + instance_ref = db.instance_get_by_str(context, id_str) + host = instance_ref['host'] + rpc.cast(db.queue_get_for(context, FLAGS.compute_topic, host), + {"method": "reboot_instance", + "args": {"context": None, + "instance_id": instance_ref['id']}}) + return True + + def delete_volume(self, context, volume_id, **kwargs): + # TODO: return error if not authorized + volume_ref = db.volume_get_by_str(context, volume_id) + if volume_ref['status'] != "available": + raise exception.ApiError("Volume status must be available") + now = datetime.datetime.utcnow() + db.volume_update(context, volume_ref['id'], {'terminated_at': now}) + host = volume_ref['host'] + rpc.cast(db.queue_get_for(context, FLAGS.volume_topic, host), + {"method": "delete_volume", + "args": {"context": None, + "volume_id": volume_ref['id']}}) + return True + + def describe_images(self, context, image_id=None, **kwargs): + # The objectstore does its own authorization for describe + imageSet = images.list(context, image_id) + return {'imagesSet': imageSet} + + def deregister_image(self, context, image_id, **kwargs): + # FIXME: should the objectstore be doing these authorization checks? + images.deregister(context, image_id) + return {'imageId': image_id} + + def register_image(self, context, image_location=None, **kwargs): + # FIXME: should the objectstore be doing these authorization checks? + if image_location is None and kwargs.has_key('name'): + image_location = kwargs['name'] + image_id = images.register(context, image_location) + logging.debug("Registered %s as %s" % (image_location, image_id)) + return {'imageId': image_id} + + def describe_image_attribute(self, context, image_id, attribute, **kwargs): + if attribute != 'launchPermission': + raise exception.ApiError('attribute not supported: %s' % attribute) + try: + image = images.list(context, image_id)[0] + except IndexError: + raise exception.ApiError('invalid id: %s' % image_id) + result = {'image_id': image_id, 'launchPermission': []} + if image['isPublic']: + result['launchPermission'].append({'group': 'all'}) + return result + + def modify_image_attribute(self, context, image_id, attribute, operation_type, **kwargs): + # TODO(devcamcar): Support users and groups other than 'all'. + if attribute != 'launchPermission': + raise exception.ApiError('attribute not supported: %s' % attribute) + if not 'user_group' in kwargs: + raise exception.ApiError('user or group not specified') + if len(kwargs['user_group']) != 1 and kwargs['user_group'][0] != 'all': + raise exception.ApiError('only group "all" is supported') + if not operation_type in ['add', 'remove']: + raise exception.ApiError('operation_type must be add or remove') + return images.modify(context, image_id, operation_type) diff --git a/nova/api/ec2/context.py b/nova/api/ec2/context.py new file mode 100644 index 000000000..c53ba98d9 --- /dev/null +++ b/nova/api/ec2/context.py @@ -0,0 +1,33 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +APIRequestContext +""" + +import random + + +class APIRequestContext(object): + def __init__(self, user, project): + self.user = user + self.project = project + self.request_id = ''.join( + [random.choice('ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890-') + for x in xrange(20)] + ) diff --git a/nova/api/ec2/images.py b/nova/api/ec2/images.py new file mode 100644 index 000000000..4579cd81a --- /dev/null +++ b/nova/api/ec2/images.py @@ -0,0 +1,108 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Proxy AMI-related calls from the cloud controller, to the running +objectstore service. +""" + +import json +import urllib + +import boto.s3.connection + +from nova import exception +from nova import flags +from nova import utils +from nova.auth import manager + + +FLAGS = flags.FLAGS + + +def modify(context, image_id, operation): + conn(context).make_request( + method='POST', + bucket='_images', + query_args=qs({'image_id': image_id, 'operation': operation})) + + return True + + +def register(context, image_location): + """ rpc call to register a new image based from a manifest """ + + image_id = utils.generate_uid('ami') + conn(context).make_request( + method='PUT', + bucket='_images', + query_args=qs({'image_location': image_location, + 'image_id': image_id})) + + return image_id + +def list(context, filter_list=[]): + """ return a list of all images that a user can see + + optionally filtered by a list of image_id """ + + # FIXME: send along the list of only_images to check for + response = conn(context).make_request( + method='GET', + bucket='_images') + + result = json.loads(response.read()) + if not filter_list is None: + return [i for i in result if i['imageId'] in filter_list] + return result + +def get(context, image_id): + """return a image object if the context has permissions""" + result = list(context, [image_id]) + if not result: + raise exception.NotFound('Image %s could not be found' % image_id) + image = result[0] + return image + + +def deregister(context, image_id): + """ unregister an image """ + conn(context).make_request( + method='DELETE', + bucket='_images', + query_args=qs({'image_id': image_id})) + + +def conn(context): + access = manager.AuthManager().get_access_key(context.user, + context.project) + secret = str(context.user.secret) + calling = boto.s3.connection.OrdinaryCallingFormat() + return boto.s3.connection.S3Connection(aws_access_key_id=access, + aws_secret_access_key=secret, + is_secure=False, + calling_format=calling, + port=FLAGS.s3_port, + host=FLAGS.s3_host) + + +def qs(params): + pairs = [] + for key in params.keys(): + pairs.append(key + '=' + urllib.quote(params[key])) + return '&'.join(pairs) |
