diff options
| author | Jesse Andrews <anotherjesse@gmail.com> | 2010-05-27 23:05:26 -0700 |
|---|---|---|
| committer | Jesse Andrews <anotherjesse@gmail.com> | 2010-05-27 23:05:26 -0700 |
| commit | bf6e6e718cdc7488e2da87b21e258ccc065fe499 (patch) | |
| tree | 51cf4f72047eb6b16079c7fe21e9822895541801 /nova/endpoint | |
| download | nova-bf6e6e718cdc7488e2da87b21e258ccc065fe499.tar.gz nova-bf6e6e718cdc7488e2da87b21e258ccc065fe499.tar.xz nova-bf6e6e718cdc7488e2da87b21e258ccc065fe499.zip | |
initial commit
Diffstat (limited to 'nova/endpoint')
| -rw-r--r-- | nova/endpoint/__init__.py | 28 | ||||
| -rw-r--r-- | nova/endpoint/admin.py | 131 | ||||
| -rwxr-xr-x | nova/endpoint/api.py | 337 | ||||
| -rw-r--r-- | nova/endpoint/cloud.py | 572 | ||||
| -rw-r--r-- | nova/endpoint/images.py | 92 |
5 files changed, 1160 insertions, 0 deletions
diff --git a/nova/endpoint/__init__.py b/nova/endpoint/__init__.py new file mode 100644 index 000000000..dbf15d259 --- /dev/null +++ b/nova/endpoint/__init__.py @@ -0,0 +1,28 @@ +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +:mod:`nova.endpoint` -- Main NOVA Api endpoints +===================================================== + +.. automodule:: nova.endpoint + :platform: Unix + :synopsis: REST APIs for all nova functions +.. moduleauthor:: Jesse Andrews <jesse@ansolabs.com> +.. moduleauthor:: Devin Carlen <devin.carlen@gmail.com> +.. moduleauthor:: Vishvananda Ishaya <vishvananda@yahoo.com> +.. moduleauthor:: Joshua McKenty <joshua@cognition.ca> +.. moduleauthor:: Manish Singh <yosh@gimp.org> +.. moduleauthor:: Andy Smith <andy@anarkystic.com> +"""
\ No newline at end of file diff --git a/nova/endpoint/admin.py b/nova/endpoint/admin.py new file mode 100644 index 000000000..e9880acc5 --- /dev/null +++ b/nova/endpoint/admin.py @@ -0,0 +1,131 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Admin API controller, exposed through http via the api worker. +""" + +import base64 + +def user_dict(user, base64_file=None): + """Convert the user object to a result dict""" + if user: + return { + 'username': user.id, + 'accesskey': user.access, + 'secretkey': user.secret, + 'file': base64_file, + } + else: + return {} + +def node_dict(node): + """Convert a node object to a result dict""" + if node: + return { + 'node_id': node.id, + 'workers': ", ".join(node.workers), + 'disks': ", ".join(node.disks), + 'ram': node.memory, + 'load_average' : node.load_average, + } + else: + return {} + +def admin_only(target): + """Decorator for admin-only API calls""" + def wrapper(*args, **kwargs): + """Internal wrapper method for admin-only API calls""" + context = args[1] + if context.user.is_admin(): + return target(*args, **kwargs) + else: + return {} + + return wrapper + +class AdminController(object): + """ + API Controller for users, node status, and worker mgmt. + Trivial admin_only wrapper will be replaced with RBAC, + allowing project managers to administer project users. + + """ + def __init__(self, user_manager, node_manager=None): + self.user_manager = user_manager + self.node_manager = node_manager + + def __str__(self): + return 'AdminController' + + @admin_only + def describe_user(self, _context, name, **_kwargs): + """Returns user data, including access and secret keys. + """ + return user_dict(self.user_manager.get_user(name)) + + @admin_only + def describe_users(self, _context, **_kwargs): + """Returns all users - should be changed to deal with a list. + """ + return {'userSet': + [user_dict(u) for u in self.user_manager.get_users()] } + + @admin_only + def register_user(self, _context, name, **_kwargs): + """ Creates a new user, and returns generated credentials. + """ + self.user_manager.create_user(name) + + return user_dict(self.user_manager.get_user(name)) + + @admin_only + def deregister_user(self, _context, name, **_kwargs): + """Deletes a single user (NOT undoable.) + Should throw an exception if the user has instances, + volumes, or buckets remaining. + """ + self.user_manager.delete_user(name) + + return True + + @admin_only + def generate_x509_for_user(self, _context, name, **_kwargs): + """Generates and returns an x509 certificate for a single user. + Is usually called from a client that will wrap this with + access and secret key info, and return a zip file. + """ + user = self.user_manager.get_user(name) + return user_dict(user, base64.b64encode(user.get_credentials())) + + @admin_only + def describe_nodes(self, _context, **_kwargs): + """Returns status info for all nodes. Includes: + * Disk Space + * Instance List + * RAM used + * CPU used + * DHCP servers running + * Iptables / bridges + """ + return {'nodeSet': + [node_dict(n) for n in self.node_manager.get_nodes()] } + + @admin_only + def describe_node(self, _context, name, **_kwargs): + """Returns status info for single node. + """ + return node_dict(self.node_manager.get_node(name)) + diff --git a/nova/endpoint/api.py b/nova/endpoint/api.py new file mode 100755 index 000000000..5bbda3f56 --- /dev/null +++ b/nova/endpoint/api.py @@ -0,0 +1,337 @@ +#!/usr/bin/python +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tornado REST API Request Handlers for Nova functions +Most calls are proxied into the responsible controller. +""" + +import logging +import multiprocessing +import random +import re +import urllib +# TODO(termie): replace minidom with etree +from xml.dom import minidom + +from nova import vendor +import tornado.web +from twisted.internet import defer + +from nova import crypto +from nova import exception +from nova import flags +from nova import utils +from nova.endpoint import cloud +from nova.auth import users + +FLAGS = flags.FLAGS +flags.DEFINE_integer('cc_port', 8773, 'cloud controller port') + + +_log = logging.getLogger("api") +_log.setLevel(logging.DEBUG) + + +_c2u = re.compile('(((?<=[a-z])[A-Z])|([A-Z](?![A-Z]|$)))') + + +def _camelcase_to_underscore(str): + return _c2u.sub(r'_\1', str).lower().strip('_') + + +def _underscore_to_camelcase(str): + return ''.join([x[:1].upper() + x[1:] for x in str.split('_')]) + + +def _underscore_to_xmlcase(str): + res = _underscore_to_camelcase(str) + return res[:1].lower() + res[1:] + + +class APIRequestContext(object): + def __init__(self, handler, user): + self.handler = handler + self.user = user + self.request_id = ''.join( + [random.choice('ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890-') + for x in xrange(20)] + ) + + +class APIRequest(object): + def __init__(self, handler, controller, action): + self.handler = handler + self.controller = controller + self.action = action + + def send(self, user, **kwargs): + context = APIRequestContext(self.handler, user) + + try: + method = getattr(self.controller, + _camelcase_to_underscore(self.action)) + except AttributeError: + _error = ('Unsupported API request: controller = %s,' + 'action = %s') % (self.controller, self.action) + _log.warning(_error) + # TODO: Raise custom exception, trap in apiserver, + # and reraise as 400 error. + raise Exception(_error) + + args = {} + for key, value in kwargs.items(): + parts = key.split(".") + key = _camelcase_to_underscore(parts[0]) + if len(parts) > 1: + d = args.get(key, {}) + d[parts[1]] = value[0] + value = d + else: + value = value[0] + args[key] = value + + for key in args.keys(): + if isinstance(args[key], dict): + if args[key] != {} and args[key].keys()[0].isdigit(): + s = args[key].items() + s.sort() + args[key] = [v for k, v in s] + + d = defer.maybeDeferred(method, context, **args) + d.addCallback(self._render_response, context.request_id) + return d + + def _render_response(self, response_data, request_id): + xml = minidom.Document() + + response_el = xml.createElement(self.action + 'Response') + response_el.setAttribute('xmlns', + 'http://ec2.amazonaws.com/doc/2009-11-30/') + request_id_el = xml.createElement('requestId') + request_id_el.appendChild(xml.createTextNode(request_id)) + response_el.appendChild(request_id_el) + if(response_data == True): + self._render_dict(xml, response_el, {'return': 'true'}) + else: + self._render_dict(xml, response_el, response_data) + + xml.appendChild(response_el) + + response = xml.toxml() + xml.unlink() + _log.debug(response) + return response + + def _render_dict(self, xml, el, data): + try: + for key in data.keys(): + val = data[key] + el.appendChild(self._render_data(xml, key, val)) + except: + _log.debug(data) + raise + + def _render_data(self, xml, el_name, data): + el_name = _underscore_to_xmlcase(el_name) + data_el = xml.createElement(el_name) + + if isinstance(data, list): + for item in data: + data_el.appendChild(self._render_data(xml, 'item', item)) + elif isinstance(data, dict): + self._render_dict(xml, data_el, data) + elif hasattr(data, '__dict__'): + self._render_dict(xml, data_el, data.__dict__) + elif isinstance(data, bool): + data_el.appendChild(xml.createTextNode(str(data).lower())) + elif data != None: + data_el.appendChild(xml.createTextNode(str(data))) + + return data_el + + +class RootRequestHandler(tornado.web.RequestHandler): + def get(self): + # available api versions + versions = [ + '1.0', + '2007-01-19', + '2007-03-01', + '2007-08-29', + '2007-10-10', + '2007-12-15', + '2008-02-01', + '2008-09-01', + '2009-04-04', + ] + for version in versions: + self.write('%s\n' % version) + self.finish() + + +class MetadataRequestHandler(tornado.web.RequestHandler): + def print_data(self, data): + if isinstance(data, dict): + output = '' + for key in data: + if key == '_name': + continue + output += key + if isinstance(data[key], dict): + if '_name' in data[key]: + output += '=' + str(data[key]['_name']) + else: + output += '/' + output += '\n' + self.write(output[:-1]) # cut off last \n + elif isinstance(data, list): + self.write('\n'.join(data)) + else: + self.write(str(data)) + + def lookup(self, path, data): + items = path.split('/') + for item in items: + if item: + if not isinstance(data, dict): + return data + if not item in data: + return None + data = data[item] + return data + + def get(self, path): + cc = self.application.controllers['Cloud'] + meta_data = cc.get_metadata(self.request.remote_ip) + if meta_data is None: + _log.error('Failed to get metadata for ip: %s' % + self.request.remote_ip) + raise tornado.web.HTTPError(404) + data = self.lookup(path, meta_data) + if data is None: + raise tornado.web.HTTPError(404) + self.print_data(data) + self.finish() + + +class APIRequestHandler(tornado.web.RequestHandler): + def get(self, controller_name): + self.execute(controller_name) + + @tornado.web.asynchronous + def execute(self, controller_name): + # Obtain the appropriate controller for this request. + try: + controller = self.application.controllers[controller_name] + except KeyError: + self._error('unhandled', 'no controller named %s' % controller_name) + return + + args = self.request.arguments + + # Read request signature. + try: + signature = args.pop('Signature')[0] + except: + raise tornado.web.HTTPError(400) + + # Make a copy of args for authentication and signature verification. + auth_params = {} + for key, value in args.items(): + auth_params[key] = value[0] + + # Get requested action and remove authentication args for final request. + try: + action = args.pop('Action')[0] + args.pop('AWSAccessKeyId') + args.pop('SignatureMethod') + args.pop('SignatureVersion') + args.pop('Version') + args.pop('Timestamp') + except: + raise tornado.web.HTTPError(400) + + # Authenticate the request. + user = self.application.user_manager.authenticate( + auth_params, + signature, + self.request.method, + self.request.host, + self.request.path + ) + + if not user: + raise tornado.web.HTTPError(403) + + _log.debug('action: %s' % action) + + for key, value in args.items(): + _log.debug('arg: %s\t\tval: %s' % (key, value)) + + request = APIRequest(self, controller, action) + d = request.send(user, **args) + # d.addCallback(utils.debug) + + # TODO: Wrap response in AWS XML format + d.addCallbacks(self._write_callback, self._error_callback) + + def _write_callback(self, data): + self.set_header('Content-Type', 'text/xml') + self.write(data) + self.finish() + + def _error_callback(self, failure): + try: + failure.raiseException() + except exception.ApiError as ex: + self._error(type(ex).__name__ + "." + ex.code, ex.message) + # TODO(vish): do something more useful with unknown exceptions + except Exception as ex: + self._error(type(ex).__name__, str(ex)) + raise + + def post(self, controller_name): + self.execute(controller_name) + + def _error(self, code, message): + self._status_code = 400 + self.set_header('Content-Type', 'text/xml') + self.write('<?xml version="1.0"?>\n') + self.write('<Response><Errors><Error><Code>%s</Code>' + '<Message>%s</Message></Error></Errors>' + '<RequestID>?</RequestID></Response>' % (code, message)) + self.finish() + + +class APIServerApplication(tornado.web.Application): + def __init__(self, user_manager, controllers): + tornado.web.Application.__init__(self, [ + (r'/', RootRequestHandler), + (r'/services/([A-Za-z0-9]+)/', APIRequestHandler), + (r'/latest/([-A-Za-z0-9/]*)', MetadataRequestHandler), + (r'/2009-04-04/([-A-Za-z0-9/]*)', MetadataRequestHandler), + (r'/2008-09-01/([-A-Za-z0-9/]*)', MetadataRequestHandler), + (r'/2008-02-01/([-A-Za-z0-9/]*)', MetadataRequestHandler), + (r'/2007-12-15/([-A-Za-z0-9/]*)', MetadataRequestHandler), + (r'/2007-10-10/([-A-Za-z0-9/]*)', MetadataRequestHandler), + (r'/2007-08-29/([-A-Za-z0-9/]*)', MetadataRequestHandler), + (r'/2007-03-01/([-A-Za-z0-9/]*)', MetadataRequestHandler), + (r'/2007-01-19/([-A-Za-z0-9/]*)', MetadataRequestHandler), + (r'/1.0/([-A-Za-z0-9/]*)', MetadataRequestHandler), + ], pool=multiprocessing.Pool(4)) + self.user_manager = user_manager + self.controllers = controllers diff --git a/nova/endpoint/cloud.py b/nova/endpoint/cloud.py new file mode 100644 index 000000000..27dd81aa2 --- /dev/null +++ b/nova/endpoint/cloud.py @@ -0,0 +1,572 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Cloud Controller: Implementation of EC2 REST API calls, which are +dispatched to other nodes via AMQP RPC. State is via distributed +datastore. +""" + +import json +import logging +import os +import time + +from nova import vendor +from twisted.internet import defer + +from nova import datastore +from nova import flags +from nova import rpc +from nova import utils +from nova import exception +from nova.auth import users +from nova.compute import model +from nova.compute import network +from nova.endpoint import images +from nova.volume import storage + +FLAGS = flags.FLAGS + +flags.DEFINE_string('cloud_topic', 'cloud', 'the topic clouds listen on') + +def _gen_key(user_id, key_name): + """ Tuck this into UserManager """ + try: + manager = users.UserManager.instance() + private_key, fingerprint = manager.generate_key_pair(user_id, key_name) + except Exception as ex: + return {'exception': ex} + return {'private_key': private_key, 'fingerprint': fingerprint} + + +class CloudController(object): + """ CloudController provides the critical dispatch between + inbound API calls through the endpoint and messages + sent to the other nodes. +""" + def __init__(self): + self._instances = datastore.Keeper(FLAGS.instances_prefix) + self.instdir = model.InstanceDirectory() + self.network = network.NetworkController() + self.setup() + + @property + def instances(self): + """ All instances in the system, as dicts """ + for instance in self.instdir.all: + yield {instance['instance_id']: instance} + + @property + def volumes(self): + """ returns a list of all volumes """ + for volume_id in datastore.Redis.instance().smembers("volumes"): + volume = storage.Volume(volume_id=volume_id) + yield volume + + def __str__(self): + return 'CloudController' + + def setup(self): + """ Ensure the keychains and folders exist. """ + # Create keys folder, if it doesn't exist + if not os.path.exists(FLAGS.keys_path): + os.makedirs(os.path.abspath(FLAGS.keys_path)) + # Gen root CA, if we don't have one + root_ca_path = os.path.join(FLAGS.ca_path, FLAGS.ca_file) + if not os.path.exists(root_ca_path): + start = os.getcwd() + os.chdir(FLAGS.ca_path) + utils.runthis("Generating root CA: %s", "sh genrootca.sh") + os.chdir(start) + # TODO: Do this with M2Crypto instead + + def get_instance_by_ip(self, ip): + return self.instdir.by_ip(ip) + + def get_metadata(self, ip): + i = self.instdir.by_ip(ip) + if i is None: + return None + if i['key_name']: + keys = { + '0': { + '_name': i['key_name'], + 'openssh-key': i['key_data'] + } + } + else: + keys = '' + data = { + 'user-data': base64.b64decode(i['user_data']), + 'meta-data': { + 'ami-id': i['image_id'], + 'ami-launch-index': i['ami_launch_index'], + 'ami-manifest-path': 'FIXME', # image property + 'block-device-mapping': { # TODO: replace with real data + 'ami': 'sda1', + 'ephemeral0': 'sda2', + 'root': '/dev/sda1', + 'swap': 'sda3' + }, + 'hostname': i['private_dns_name'], # is this public sometimes? + 'instance-action': 'none', + 'instance-id': i['instance_id'], + 'instance-type': i.get('instance_type', ''), + 'local-hostname': i['private_dns_name'], + 'local-ipv4': i['private_dns_name'], # TODO: switch to IP + 'kernel-id': i.get('kernel_id', ''), + 'placement': { + 'availaibility-zone': i.get('availability_zone', 'nova'), + }, + 'public-hostname': i.get('dns_name', ''), + 'public-ipv4': i.get('dns_name', ''), # TODO: switch to IP + 'public-keys' : keys, + 'ramdisk-id': i.get('ramdisk_id', ''), + 'reservation-id': i['reservation_id'], + 'security-groups': i.get('groups', '') + } + } + if False: # TODO: store ancestor ids + data['ancestor-ami-ids'] = [] + if i.get('product_codes', None): + data['product-codes'] = i['product_codes'] + return data + + + def describe_availability_zones(self, context, **kwargs): + return {'availabilityZoneInfo': [{'zoneName': 'nova', + 'zoneState': 'available'}]} + + def describe_key_pairs(self, context, key_name=None, **kwargs): + key_pairs = [] + key_names = key_name and key_name or [] + if len(key_names) > 0: + for key_name in key_names: + key_pair = context.user.get_key_pair(key_name) + if key_pair != None: + key_pairs.append({ + 'keyName': key_pair.name, + 'keyFingerprint': key_pair.fingerprint, + }) + else: + for key_pair in context.user.get_key_pairs(): + key_pairs.append({ + 'keyName': key_pair.name, + 'keyFingerprint': key_pair.fingerprint, + }) + + return { 'keypairsSet': key_pairs } + + def create_key_pair(self, context, key_name, **kwargs): + try: + d = defer.Deferred() + p = context.handler.application.settings.get('pool') + def _complete(kwargs): + if 'exception' in kwargs: + d.errback(kwargs['exception']) + return + d.callback({'keyName': key_name, + 'keyFingerprint': kwargs['fingerprint'], + 'keyMaterial': kwargs['private_key']}) + p.apply_async(_gen_key, [context.user.id, key_name], + callback=_complete) + return d + + except users.UserError, e: + raise + + def delete_key_pair(self, context, key_name, **kwargs): + context.user.delete_key_pair(key_name) + # aws returns true even if the key doens't exist + return True + + def describe_security_groups(self, context, group_names, **kwargs): + groups = { 'securityGroupSet': [] } + + # Stubbed for now to unblock other things. + return groups + + def create_security_group(self, context, group_name, **kwargs): + return True + + def delete_security_group(self, context, group_name, **kwargs): + return True + + def get_console_output(self, context, instance_id, **kwargs): + # instance_id is passed in as a list of instances + instance = self.instdir.get(instance_id[0]) + if instance['state'] == 'pending': + raise exception.ApiError('Cannot get output for pending instance') + if not context.user.is_authorized(instance.get('owner_id', None)): + raise exception.ApiError('Not authorized to view output') + return rpc.call('%s.%s' % (FLAGS.compute_topic, instance['node_name']), + {"method": "get_console_output", + "args" : {"instance_id": instance_id[0]}}) + + def _get_user_id(self, context): + if context and context.user: + return context.user.id + else: + return None + + def describe_volumes(self, context, **kwargs): + volumes = [] + for volume in self.volumes: + if context.user.is_authorized(volume.get('user_id', None)): + v = self.format_volume(context, volume) + volumes.append(v) + return defer.succeed({'volumeSet': volumes}) + + def format_volume(self, context, volume): + v = {} + v['volumeId'] = volume['volume_id'] + v['status'] = volume['status'] + v['size'] = volume['size'] + v['availabilityZone'] = volume['availability_zone'] + v['createTime'] = volume['create_time'] + if context.user.is_admin(): + v['status'] = '%s (%s, %s, %s, %s)' % ( + volume.get('status', None), + volume.get('user_id', None), + volume.get('node_name', None), + volume.get('instance_id', ''), + volume.get('mountpoint', '')) + return v + + def create_volume(self, context, size, **kwargs): + # TODO(vish): refactor this to create the volume object here and tell storage to create it + res = rpc.call(FLAGS.storage_topic, {"method": "create_volume", + "args" : {"size": size, + "user_id": context.user.id}}) + def _format_result(result): + volume = self._get_volume(result['result']) + return {'volumeSet': [self.format_volume(context, volume)]} + res.addCallback(_format_result) + return res + + def _get_by_id(self, nodes, id): + if nodes == {}: + raise exception.NotFound("%s not found" % id) + for node_name, node in nodes.iteritems(): + if node.has_key(id): + return node_name, node[id] + raise exception.NotFound("%s not found" % id) + + def _get_volume(self, volume_id): + for volume in self.volumes: + if volume['volume_id'] == volume_id: + return volume + + def attach_volume(self, context, volume_id, instance_id, device, **kwargs): + volume = self._get_volume(volume_id) + storage_node = volume['node_name'] + # TODO: (joshua) Fix volumes to store creator id + if not context.user.is_authorized(volume.get('user_id', None)): + raise exception.ApiError("%s not authorized for %s" % + (context.user.id, volume_id)) + instance = self.instdir.get(instance_id) + compute_node = instance['node_name'] + if not context.user.is_authorized(instance.get('owner_id', None)): + raise exception.ApiError(message="%s not authorized for %s" % + (context.user.id, instance_id)) + aoe_device = volume['aoe_device'] + # Needs to get right node controller for attaching to + # TODO: Maybe have another exchange that goes to everyone? + rpc.cast('%s.%s' % (FLAGS.compute_topic, compute_node), + {"method": "attach_volume", + "args" : {"aoe_device": aoe_device, + "instance_id" : instance_id, + "mountpoint" : device}}) + rpc.cast('%s.%s' % (FLAGS.storage_topic, storage_node), + {"method": "attach_volume", + "args" : {"volume_id": volume_id, + "instance_id" : instance_id, + "mountpoint" : device}}) + return defer.succeed(True) + + def detach_volume(self, context, volume_id, **kwargs): + # TODO(joshua): Make sure the updated state has been received first + volume = self._get_volume(volume_id) + storage_node = volume['node_name'] + if not context.user.is_authorized(volume.get('user_id', None)): + raise exception.ApiError("%s not authorized for %s" % + (context.user.id, volume_id)) + if 'instance_id' in volume.keys(): + instance_id = volume['instance_id'] + try: + instance = self.instdir.get(instance_id) + compute_node = instance['node_name'] + mountpoint = volume['mountpoint'] + if not context.user.is_authorized( + instance.get('owner_id', None)): + raise exception.ApiError( + "%s not authorized for %s" % + (context.user.id, instance_id)) + rpc.cast('%s.%s' % (FLAGS.compute_topic, compute_node), + {"method": "detach_volume", + "args" : {"instance_id": instance_id, + "mountpoint": mountpoint}}) + except exception.NotFound: + pass + rpc.cast('%s.%s' % (FLAGS.storage_topic, storage_node), + {"method": "detach_volume", + "args" : {"volume_id": volume_id}}) + return defer.succeed(True) + + def _convert_to_set(self, lst, str): + if lst == None or lst == []: + return None + return [{str: x} for x in lst] + + def describe_instances(self, context, **kwargs): + return defer.succeed(self.format_instances(context.user)) + + def format_instances(self, user, reservation_id = None): + if self.instances == {}: + return {'reservationSet': []} + reservations = {} + for inst in self.instances: + instance = inst.values()[0] + res_id = instance.get('reservation_id', 'Unknown') + if (user.is_authorized(instance.get('owner_id', None)) + and (reservation_id == None or reservation_id == res_id)): + i = {} + i['instance_id'] = instance.get('instance_id', None) + i['image_id'] = instance.get('image_id', None) + i['instance_state'] = { + 'code': 42, + 'name': instance.get('state', 'pending') + } + i['public_dns_name'] = self.network.get_public_ip_for_instance( + i['instance_id']) + i['private_dns_name'] = instance.get('private_dns_name', None) + if not i['public_dns_name']: + i['public_dns_name'] = i['private_dns_name'] + i['dns_name'] = instance.get('dns_name', None) + i['key_name'] = instance.get('key_name', None) + if user.is_admin(): + i['key_name'] = '%s (%s, %s)' % (i['key_name'], + instance.get('owner_id', None), instance.get('node_name','')) + i['product_codes_set'] = self._convert_to_set( + instance.get('product_codes', None), 'product_code') + i['instance_type'] = instance.get('instance_type', None) + i['launch_time'] = instance.get('launch_time', None) + i['ami_launch_index'] = instance.get('ami_launch_index', + None) + if not reservations.has_key(res_id): + r = {} + r['reservation_id'] = res_id + r['owner_id'] = instance.get('owner_id', None) + r['group_set'] = self._convert_to_set( + instance.get('groups', None), 'group_id') + r['instances_set'] = [] + reservations[res_id] = r + reservations[res_id]['instances_set'].append(i) + + instance_response = {'reservationSet' : list(reservations.values()) } + return instance_response + + def describe_addresses(self, context, **kwargs): + return self.format_addresses(context.user) + + def format_addresses(self, user): + addresses = [] + # TODO(vish): move authorization checking into network.py + for address_record in self.network.describe_addresses( + type=network.PublicNetwork): + #logging.debug(address_record) + if user.is_authorized(address_record[u'user_id']): + address = { + 'public_ip': address_record[u'address'], + 'instance_id' : address_record.get(u'instance_id', 'free') + } + # FIXME: add another field for user id + if user.is_admin(): + address['instance_id'] = "%s (%s)" % ( + address['instance_id'], + address_record[u'user_id'], + ) + addresses.append(address) + # logging.debug(addresses) + return {'addressesSet': addresses} + + def allocate_address(self, context, **kwargs): + # TODO: Verify user is valid? + kwargs['owner_id'] = context.user.id + (address,network_name) = self.network.allocate_address( + context.user.id, type=network.PublicNetwork) + return defer.succeed({'addressSet': [{'publicIp' : address}]}) + + def release_address(self, context, **kwargs): + self.network.deallocate_address(kwargs.get('public_ip', None)) + return defer.succeed({'releaseResponse': ["Address released."]}) + + def associate_address(self, context, instance_id, **kwargs): + instance = self.instdir.get(instance_id) + rv = self.network.associate_address( + kwargs['public_ip'], + instance['private_dns_name'], + instance_id) + return defer.succeed({'associateResponse': ["Address associated."]}) + + def disassociate_address(self, context, **kwargs): + rv = self.network.disassociate_address(kwargs['public_ip']) + # TODO - Strip the IP from the instance + return rv + + def run_instances(self, context, **kwargs): + logging.debug("Going to run instances...") + reservation_id = utils.generate_uid('r') + launch_time = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) + key_data = None + if kwargs.has_key('key_name'): + key_pair = context.user.get_key_pair(kwargs['key_name']) + if not key_pair: + raise exception.ApiError('Key Pair %s not found' % + kwargs['key_name']) + key_data = key_pair.public_key + + for num in range(int(kwargs['max_count'])): + inst = self.instdir.new() + # TODO(ja): add ari, aki + inst['image_id'] = kwargs['image_id'] + inst['user_data'] = kwargs.get('user_data', '') + inst['instance_type'] = kwargs.get('instance_type', '') + inst['reservation_id'] = reservation_id + inst['launch_time'] = launch_time + inst['key_data'] = key_data or '' + inst['key_name'] = kwargs.get('key_name', '') + inst['owner_id'] = context.user.id + inst['mac_address'] = utils.generate_mac() + inst['ami_launch_index'] = num + address, _netname = self.network.allocate_address( + inst['owner_id'], mac=inst['mac_address']) + network = self.network.get_users_network(str(context.user.id)) + inst['network_str'] = json.dumps(network.to_dict()) + inst['bridge_name'] = network.bridge_name + inst['private_dns_name'] = str(address) + # TODO: allocate expresses on the router node + inst.save() + rpc.cast(FLAGS.compute_topic, + {"method": "run_instance", + "args": {"instance_id" : inst.instance_id}}) + logging.debug("Casting to node for %s's instance with IP of %s" % + (context.user.name, inst['private_dns_name'])) + # TODO: Make the NetworkComputeNode figure out the network name from ip. + return defer.succeed(self.format_instances( + context.user, reservation_id)) + + def terminate_instances(self, context, instance_id, **kwargs): + logging.debug("Going to start terminating instances") + # TODO: return error if not authorized + for i in instance_id: + logging.debug("Going to try and terminate %s" % i) + instance = self.instdir.get(i) + #if instance['state'] == 'pending': + # raise exception.ApiError('Cannot terminate pending instance') + if context.user.is_authorized(instance.get('owner_id', None)): + try: + self.network.disassociate_address( + instance.get('public_dns_name', 'bork')) + except: + pass + if instance.get('private_dns_name', None): + logging.debug("Deallocating address %s" % instance.get('private_dns_name', None)) + try: + self.network.deallocate_address(instance.get('private_dns_name', None)) + except Exception, _err: + pass + if instance.get('node_name', 'unassigned') != 'unassigned': #It's also internal default + rpc.cast('%s.%s' % (FLAGS.compute_topic, instance['node_name']), + {"method": "terminate_instance", + "args" : {"instance_id": i}}) + else: + instance.destroy() + return defer.succeed(True) + + def reboot_instances(self, context, instance_id, **kwargs): + # TODO: return error if not authorized + for i in instance_id: + instance = self.instdir.get(i) + if instance['state'] == 'pending': + raise exception.ApiError('Cannot reboot pending instance') + if context.user.is_authorized(instance.get('owner_id', None)): + rpc.cast('%s.%s' % (FLAGS.node_topic, instance['node_name']), + {"method": "reboot_instance", + "args" : {"instance_id": i}}) + return defer.succeed(True) + + def delete_volume(self, context, volume_id, **kwargs): + # TODO: return error if not authorized + volume = self._get_volume(volume_id) + storage_node = volume['node_name'] + if context.user.is_authorized(volume.get('user_id', None)): + rpc.cast('%s.%s' % (FLAGS.storage_topic, storage_node), + {"method": "delete_volume", + "args" : {"volume_id": volume_id}}) + return defer.succeed(True) + + def describe_images(self, context, image_id=None, **kwargs): + imageSet = images.list(context.user) + if not image_id is None: + imageSet = [i for i in imageSet if i['imageId'] in image_id] + + return defer.succeed({'imagesSet': imageSet}) + + def deregister_image(self, context, image_id, **kwargs): + images.deregister(context.user, image_id) + + return defer.succeed({'imageId': image_id}) + + def register_image(self, context, image_location=None, **kwargs): + if image_location is None and kwargs.has_key('name'): + image_location = kwargs['name'] + + image_id = images.register(context.user, image_location) + logging.debug("Registered %s as %s" % (image_location, image_id)) + + return defer.succeed({'imageId': image_id}) + + def modify_image_attribute(self, context, image_id, + attribute, operation_type, **kwargs): + if attribute != 'launchPermission': + raise exception.ApiError('only launchPermission is supported') + if len(kwargs['user_group']) != 1 and kwargs['user_group'][0] != 'all': + raise exception.ApiError('only group "all" is supported') + if not operation_type in ['add', 'delete']: + raise exception.ApiError('operation_type must be add or delete') + result = images.modify(context.user, image_id, operation_type) + return defer.succeed(result) + + def update_state(self, topic, value): + """ accepts status reports from the queue and consolidates them """ + # TODO(jmc): if an instance has disappeared from + # the node, call instance_death + if topic == "instances": + return defer.succeed(True) + aggregate_state = getattr(self, topic) + node_name = value.keys()[0] + items = value[node_name] + + logging.debug("Updating %s state for %s" % (topic, node_name)) + + for item_id in items.keys(): + if (aggregate_state.has_key('pending') and + aggregate_state['pending'].has_key(item_id)): + del aggregate_state['pending'][item_id] + aggregate_state[node_name] = items + + return defer.succeed(True) diff --git a/nova/endpoint/images.py b/nova/endpoint/images.py new file mode 100644 index 000000000..f494ce892 --- /dev/null +++ b/nova/endpoint/images.py @@ -0,0 +1,92 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Proxy AMI-related calls from the cloud controller, to the running +objectstore daemon. +""" + +import json +import random +import urllib + +from nova import vendor +import boto +import boto.s3 + +from nova import flags +from nova import utils + +FLAGS = flags.FLAGS + + +def modify(user, image_id, operation): + conn(user).make_request( + method='POST', + bucket='_images', + query_args=qs({'image_id': image_id, 'operation': operation})) + + return True + + +def register(user, image_location): + """ rpc call to register a new image based from a manifest """ + + image_id = utils.generate_uid('ami') + conn(user).make_request( + method='PUT', + bucket='_images', + query_args=qs({'image_location': image_location, + 'image_id': image_id})) + + return image_id + + +def list(user, filter_list=[]): + """ return a list of all images that a user can see + + optionally filtered by a list of image_id """ + + # FIXME: send along the list of only_images to check for + response = conn(user).make_request( + method='GET', + bucket='_images') + + return json.loads(response.read()) + + +def deregister(user, image_id): + """ unregister an image """ + conn(user).make_request( + method='DELETE', + bucket='_images', + query_args=qs({'image_id': image_id})) + + +def conn(user): + return boto.s3.connection.S3Connection ( + aws_access_key_id=user.access, + aws_secret_access_key=user.secret, + is_secure=False, + calling_format=boto.s3.connection.OrdinaryCallingFormat(), + port=FLAGS.s3_port, + host=FLAGS.s3_host) + + +def qs(params): + pairs = [] + for key in params.keys(): + pairs.append(key + '=' + urllib.quote(params[key])) + return '&'.join(pairs) |
