diff options
| author | Jesse Andrews <anotherjesse@gmail.com> | 2010-05-27 23:05:26 -0700 |
|---|---|---|
| committer | Jesse Andrews <anotherjesse@gmail.com> | 2010-05-27 23:05:26 -0700 |
| commit | bf6e6e718cdc7488e2da87b21e258ccc065fe499 (patch) | |
| tree | 51cf4f72047eb6b16079c7fe21e9822895541801 /nova | |
| download | nova-bf6e6e718cdc7488e2da87b21e258ccc065fe499.tar.gz nova-bf6e6e718cdc7488e2da87b21e258ccc065fe499.tar.xz nova-bf6e6e718cdc7488e2da87b21e258ccc065fe499.zip | |
initial commit
Diffstat (limited to 'nova')
64 files changed, 8566 insertions, 0 deletions
diff --git a/nova/__init__.py b/nova/__init__.py new file mode 100644 index 000000000..2b25d1628 --- /dev/null +++ b/nova/__init__.py @@ -0,0 +1,30 @@ +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +:mod:`nova` -- Cloud IaaS Platform +=================================== + +.. automodule:: nova + :platform: Unix + :synopsis: Infrastructure-as-a-Service Cloud platform. +.. moduleauthor:: Jesse Andrews <jesse@ansolabs.com> +.. moduleauthor:: Devin Carlen <devin.carlen@gmail.com> +.. moduleauthor:: Vishvananda Ishaya <vishvananda@yahoo.com> +.. moduleauthor:: Joshua McKenty <joshua@cognition.ca> +.. moduleauthor:: Manish Singh <yosh@gimp.org> +.. moduleauthor:: Andy Smith <andy@anarkystic.com> +""" + +from exception import *
\ No newline at end of file diff --git a/nova/adminclient.py b/nova/adminclient.py new file mode 100644 index 000000000..2cc592b9f --- /dev/null +++ b/nova/adminclient.py @@ -0,0 +1,113 @@ +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Nova User API client library. +""" + +import boto +from boto.ec2.regioninfo import RegionInfo +import base64 + +class UserInfo(object): + """ Information about a Nova user + fields include: + username + accesskey + secretkey + + and an optional field containing a zip with X509 cert & rc + file + """ + + def __init__(self, connection=None, username=None, endpoint=None): + self.connection = connection + self.username = username + self.endpoint = endpoint + + def __repr__(self): + return 'UserInfo:%s' % self.username + + def startElement(self, name, attrs, connection): + return None + + def endElement(self, name, value, connection): + if name == 'username': + self.username = str(value) + elif name == 'file': + self.file = base64.b64decode(str(value)) + elif name == 'accesskey': + self.accesskey = str(value) + elif name == 'secretkey': + self.secretkey = str(value) + + +class NovaAdminClient(object): + def __init__(self, clc_ip='127.0.0.1', region='nova', access_key='admin', + secret_key='admin', **kwargs): + self.clc_ip = clc_ip + self.region = region + self.access = access_key + self.secret = secret_key + self.apiconn = boto.connect_ec2(aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + is_secure=False, + region=RegionInfo(None, region, clc_ip), + port=8773, + path='/services/Admin', + **kwargs) + self.apiconn.APIVersion = 'nova' + + def connection_for(self, username, **kwargs): + """ + Returns a boto ec2 connection for the given username. + """ + user = self.get_user(username) + return boto.connect_ec2( + aws_access_key_id=user.accesskey, + aws_secret_access_key=user.secretkey, + is_secure=False, + region=RegionInfo(None, self.region, self.clc_ip), + port=8773, + path='/services/Cloud', + **kwargs + ) + + def get_users(self): + """ grabs the list of all users """ + return self.apiconn.get_list('DescribeUsers', {}, (['item', UserInfo])) + + def get_user(self, name): + """ grab a single user by name """ + user = self.apiconn.get_object('DescribeUser', {'Name': name}, UserInfo) + + if user.username != None: + return user + + def has_user(self, username): + """ determine if user exists """ + return self.get_user(username) != None + + def create_user(self, username): + """ creates a new user, returning the userinfo object with access/secret """ + return self.apiconn.get_object('RegisterUser', {'Name': username}, UserInfo) + + def delete_user(self, username): + """ deletes a user """ + return self.apiconn.get_object('DeregisterUser', {'Name': username}, UserInfo) + + def get_zip(self, username): + """ returns the content of a zip file containing novarc and access credentials. """ + return self.apiconn.get_object('GenerateX509ForUser', {'Name': username}, UserInfo).file + diff --git a/nova/auth/__init__.py b/nova/auth/__init__.py new file mode 100644 index 000000000..7cd6c618d --- /dev/null +++ b/nova/auth/__init__.py @@ -0,0 +1,25 @@ +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +:mod:`nova.auth` -- Authentication and Access Control +===================================================== + +.. automodule:: nova.auth + :platform: Unix + :synopsis: User-and-Project based RBAC using LDAP, SAML. +.. moduleauthor:: Jesse Andrews <jesse@ansolabs.com> +.. moduleauthor:: Vishvananda Ishaya <vishvananda@yahoo.com> +.. moduleauthor:: Joshua McKenty <joshua@cognition.ca> +"""
\ No newline at end of file diff --git a/nova/auth/access.py b/nova/auth/access.py new file mode 100644 index 000000000..2c780626d --- /dev/null +++ b/nova/auth/access.py @@ -0,0 +1,69 @@ +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Simple base set of RBAC rules which map API endpoints to LDAP groups. +For testing accounts, users will always have PM privileges. +""" + + +# This is logically a RuleSet or some such. + +def allow_describe_images(user, project, target_object): + return True + +def allow_describe_instances(user, project, target_object): + return True + +def allow_describe_addresses(user, project, target_object): + return True + +def allow_run_instances(user, project, target_object): + # target_object is a reservation, not an instance + # it needs to include count, type, image, etc. + + # First, is the project allowed to use this image + + # Second, is this user allowed to launch within this project + + # Third, is the count or type within project quota + + return True + +def allow_terminate_instances(user, project, target_object): + # In a project, the PMs and Sysadmins can terminate + return True + +def allow_get_console_output(user, project, target_object): + # If the user launched the instance, + # Or is a sysadmin in the project, + return True + +def allow_allocate_address(user, project, target_object): + # There's no security concern in allocation, + # but it can get expensive. Limit to PM and NE. + return True + +def allow_associate_address(user, project, target_object): + # project NE only + # In future, will perform a CloudAudit scan first + # (Pass / Fail gate) + return True + +def allow_register(user, project, target_object): + return False + +def is_allowed(action, user, project, target_object): + return globals()['allow_%s' % action](user, project, target_object) + diff --git a/nova/auth/fakeldap.py b/nova/auth/fakeldap.py new file mode 100644 index 000000000..c223b250c --- /dev/null +++ b/nova/auth/fakeldap.py @@ -0,0 +1,81 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" + Fake LDAP server for test harnesses. +""" + +import logging + +from nova import datastore + +SCOPE_SUBTREE = 1 + + +class NO_SUCH_OBJECT(Exception): + pass + + +def initialize(uri): + return FakeLDAP(uri) + + +class FakeLDAP(object): + def __init__(self, _uri): + self.keeper = datastore.Keeper('fakeldap') + if self.keeper['objects'] is None: + self.keeper['objects'] = {} + + def simple_bind_s(self, dn, password): + pass + + def unbind_s(self): + pass + + def search_s(self, dn, scope, query=None, fields=None): + logging.debug("searching for %s" % dn) + filtered = {} + d = self.keeper['objects'] or {} + for cn, attrs in d.iteritems(): + if cn[-len(dn):] == dn: + filtered[cn] = attrs + if query: + k,v = query[1:-1].split('=') + objects = {} + for cn, attrs in filtered.iteritems(): + if attrs.has_key(k) and (v in attrs[k] or + v == attrs[k]): + objects[cn] = attrs + if objects == {}: + raise NO_SUCH_OBJECT() + return objects.items() + + def add_s(self, cn, attr): + logging.debug("adding %s" % cn) + stored = {} + for k, v in attr: + if type(v) is list: + stored[k] = v + else: + stored[k] = [v] + d = self.keeper['objects'] + d[cn] = stored + self.keeper['objects'] = d + + def delete_s(self, cn): + logging.debug("creating for %s" % cn) + d = self.keeper['objects'] or {} + del d[cn] + self.keeper['objects'] = d diff --git a/nova/auth/novarc.template b/nova/auth/novarc.template new file mode 100644 index 000000000..a993d1882 --- /dev/null +++ b/nova/auth/novarc.template @@ -0,0 +1,26 @@ +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +NOVA_KEY_DIR=$(pushd $(dirname $BASH_SOURCE)>/dev/null; pwd; popd>/dev/null) +export EC2_ACCESS_KEY="%(access)s" +export EC2_SECRET_KEY="%(secret)s" +export EC2_URL="%(ec2)s" +export S3_URL="%(s3)s" +export EC2_USER_ID=42 # nova does not use user id, but bundling requires it +export EC2_PRIVATE_KEY=${NOVA_KEY_DIR}/%(key)s +export EC2_CERT=${NOVA_KEY_DIR}/%(cert)s +export NOVA_CERT=${NOVA_KEY_DIR}/%(nova)s +export EUCALYPTUS_CERT=${NOVA_CERT} # euca-bundle-image seems to require this set +alias ec2-bundle-image="ec2-bundle-image --cert ${EC2_CERT} --privatekey ${EC2_PRIVATE_KEY} --user 42 --ec2cert ${NOVA_CERT}" +alias ec2-upload-bundle="ec2-upload-bundle -a ${EC2_ACCESS_KEY} -s ${EC2_SECRET_KEY} --url ${S3_URL} --ec2cert ${NOVA_CERT}" diff --git a/nova/auth/rbac.ldif b/nova/auth/rbac.ldif new file mode 100644 index 000000000..3878d2c1b --- /dev/null +++ b/nova/auth/rbac.ldif @@ -0,0 +1,60 @@ +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# LDIF fragment to create group branch under root + +#dn: ou=Groups,dc=example,dc=com +#objectclass:organizationalunit +#ou: groups +#description: generic groups branch + +# create the itpeople entry + +dn: cn=sysadmins,ou=Groups,dc=example,dc=com +objectclass: groupofnames +cn: itpeople +description: IT admin group +# add the group members all of which are +# assumed to exist under Users +#member: cn=micky mouse,ou=people,dc=example,dc=com +member: cn=admin,ou=Users,dc=example,dc=com + +dn: cn=netadmins,ou=Groups,dc=example,dc=com +objectclass: groupofnames +cn: netadmins +description: Network admin group +member: cn=admin,ou=Users,dc=example,dc=com + +dn: cn=cloudadmins,ou=Groups,dc=example,dc=com +objectclass: groupofnames +cn: cloudadmins +description: Cloud admin group +member: cn=admin,ou=Users,dc=example,dc=com + +dn: cn=itsec,ou=Groups,dc=example,dc=com +objectclass: groupofnames +cn: itsec +description: IT security users group +member: cn=admin,ou=Users,dc=example,dc=com + +# Example Project Group to demonstrate members +# and project members + +dn: cn=myproject,ou=Groups,dc=example,dc=com +objectclass: groupofnames +objectclass: novaProject +cn: myproject +description: My Project Group +member: cn=admin,ou=Users,dc=example,dc=com +projectManager: cn=admin,ou=Users,dc=example,dc=com diff --git a/nova/auth/signer.py b/nova/auth/signer.py new file mode 100644 index 000000000..00aa066fb --- /dev/null +++ b/nova/auth/signer.py @@ -0,0 +1,127 @@ +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# PORTIONS OF THIS FILE ARE FROM: +# http://code.google.com/p/boto +# Copyright (c) 2006-2009 Mitch Garnaat http://garnaat.org/ +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, dis- +# tribute, sublicense, and/or sell copies of the Software, and to permit +# persons to whom the Software is furnished to do so, subject to the fol- +# lowing conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- +# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +""" +Utility class for parsing signed AMI manifests. +""" + +import logging +import hashlib +import hmac +import urllib +import base64 +from nova.exception import Error + +_log = logging.getLogger('signer') +logging.getLogger('signer').setLevel(logging.WARN) + +class Signer(object): + """ hacked up code from boto/connection.py """ + + def __init__(self, secret_key): + self.hmac = hmac.new(secret_key, digestmod=hashlib.sha1) + if hashlib.sha256: + self.hmac_256 = hmac.new(secret_key, digestmod=hashlib.sha256) + + def generate(self, params, verb, server_string, path): + if params['SignatureVersion'] == '0': + t = self._calc_signature_0(params) + elif params['SignatureVersion'] == '1': + t = self._calc_signature_1(params) + elif params['SignatureVersion'] == '2': + t = self._calc_signature_2(params, verb, server_string, path) + else: + raise Error('Unknown Signature Version: %s' % self.SignatureVersion) + return t + + def _get_utf8_value(self, value): + if not isinstance(value, str) and not isinstance(value, unicode): + value = str(value) + if isinstance(value, unicode): + return value.encode('utf-8') + else: + return value + + def _calc_signature_0(self, params): + s = params['Action'] + params['Timestamp'] + self.hmac.update(s) + keys = params.keys() + keys.sort(cmp = lambda x, y: cmp(x.lower(), y.lower())) + pairs = [] + for key in keys: + val = self._get_utf8_value(params[key]) + pairs.append(key + '=' + urllib.quote(val)) + return base64.b64encode(self.hmac.digest()) + + def _calc_signature_1(self, params): + keys = params.keys() + keys.sort(cmp = lambda x, y: cmp(x.lower(), y.lower())) + pairs = [] + for key in keys: + self.hmac.update(key) + val = self._get_utf8_value(params[key]) + self.hmac.update(val) + pairs.append(key + '=' + urllib.quote(val)) + return base64.b64encode(self.hmac.digest()) + + def _calc_signature_2(self, params, verb, server_string, path): + _log.debug('using _calc_signature_2') + string_to_sign = '%s\n%s\n%s\n' % (verb, server_string, path) + if self.hmac_256: + hmac = self.hmac_256 + params['SignatureMethod'] = 'HmacSHA256' + else: + hmac = self.hmac + params['SignatureMethod'] = 'HmacSHA1' + keys = params.keys() + keys.sort() + pairs = [] + for key in keys: + val = self._get_utf8_value(params[key]) + pairs.append(urllib.quote(key, safe='') + '=' + urllib.quote(val, safe='-_~')) + qs = '&'.join(pairs) + _log.debug('query string: %s' % qs) + string_to_sign += qs + _log.debug('string_to_sign: %s' % string_to_sign) + hmac.update(string_to_sign) + b64 = base64.b64encode(hmac.digest()) + _log.debug('len(b64)=%d' % len(b64)) + _log.debug('base64 encoded digest: %s' % b64) + return b64 + +if __name__ == '__main__': + print Signer('foo').generate({"SignatureMethod": 'HmacSHA256', 'SignatureVersion': '2'}, "get", "server", "/foo") diff --git a/nova/auth/slap.sh b/nova/auth/slap.sh new file mode 100755 index 000000000..a0df4e0ae --- /dev/null +++ b/nova/auth/slap.sh @@ -0,0 +1,226 @@ +#!/usr/bin/env bash +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# LDAP INSTALL SCRIPT - SHOULD BE IDEMPOTENT, but it SCRUBS all USERS + +apt-get install -y slapd ldap-utils python-ldap + +cat >/etc/ldap/schema/openssh-lpk_openldap.schema <<LPK_SCHEMA_EOF +# +# LDAP Public Key Patch schema for use with openssh-ldappubkey +# Author: Eric AUGE <eau@phear.org> +# +# Based on the proposal of : Mark Ruijter +# + + +# octetString SYNTAX +attributetype ( 1.3.6.1.4.1.24552.500.1.1.1.13 NAME 'sshPublicKey' + DESC 'MANDATORY: OpenSSH Public key' + EQUALITY octetStringMatch + SYNTAX 1.3.6.1.4.1.1466.115.121.1.40 ) + +# printableString SYNTAX yes|no +objectclass ( 1.3.6.1.4.1.24552.500.1.1.2.0 NAME 'ldapPublicKey' SUP top AUXILIARY + DESC 'MANDATORY: OpenSSH LPK objectclass' + MAY ( sshPublicKey $ uid ) + ) +LPK_SCHEMA_EOF + +cat >/etc/ldap/schema/nova.schema <<NOVA_SCHEMA_EOF +# +# Person object for Nova +# inetorgperson with extra attributes +# Author: Vishvananda Ishaya <vishvananda@yahoo.com> +# +# + +# using internet experimental oid arc as per BP64 3.1 +objectidentifier novaSchema 1.3.6.1.3.1.666.666 +objectidentifier novaAttrs novaSchema:3 +objectidentifier novaOCs novaSchema:4 + +attributetype ( + novaAttrs:1 + NAME 'accessKey' + DESC 'Key for accessing data' + EQUALITY caseIgnoreMatch + SUBSTR caseIgnoreSubstringsMatch + SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 + SINGLE-VALUE + ) + +attributetype ( + novaAttrs:2 + NAME 'secretKey' + DESC 'Secret key' + EQUALITY caseIgnoreMatch + SUBSTR caseIgnoreSubstringsMatch + SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 + SINGLE-VALUE + ) + +attributetype ( + novaAttrs:3 + NAME 'keyFingerprint' + DESC 'Fingerprint of private key' + EQUALITY caseIgnoreMatch + SUBSTR caseIgnoreSubstringsMatch + SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 + SINGLE-VALUE + ) + +attributetype ( + novaAttrs:4 + NAME 'isAdmin' + DESC 'Is user an administrator?' + EQUALITY booleanMatch + SYNTAX 1.3.6.1.4.1.1466.115.121.1.7 + SINGLE-VALUE + ) + +attributetype ( + novaAttrs:5 + NAME 'projectManager' + DESC 'Project Managers of a project' + SYNTAX 1.3.6.1.4.1.1466.115.121.1.12 + ) + +objectClass ( + novaOCs:1 + NAME 'novaUser' + DESC 'access and secret keys' + AUXILIARY + MUST ( uid ) + MAY ( accessKey $ secretKey $ isAdmin ) + ) + +objectClass ( + novaOCs:2 + NAME 'novaKeyPair' + DESC 'Key pair for User' + SUP top + STRUCTURAL + MUST ( cn $ sshPublicKey $ keyFingerprint ) + ) + +objectClass ( + novaOCs:3 + NAME 'novaProject' + DESC 'Container for project' + SUP groupofnames + STRUCTURAL + MUST ( cn $ projectManager ) + ) + +NOVA_SCHEMA_EOF + +mv /etc/ldap/slapd.conf /etc/ldap/slapd.conf.orig +cat >/etc/ldap/slapd.conf <<SLAPD_CONF_EOF +# slapd.conf - Configuration file for LDAP SLAPD +########## +# Basics # +########## +include /etc/ldap/schema/core.schema +include /etc/ldap/schema/cosine.schema +include /etc/ldap/schema/inetorgperson.schema +include /etc/ldap/schema/openssh-lpk_openldap.schema +include /etc/ldap/schema/nova.schema +pidfile /var/run/slapd/slapd.pid +argsfile /var/run/slapd/slapd.args +loglevel none +modulepath /usr/lib/ldap +# modulepath /usr/local/libexec/openldap +moduleload back_hdb +########################## +# Database Configuration # +########################## +database hdb +suffix "dc=example,dc=com" +rootdn "cn=Manager,dc=example,dc=com" +rootpw changeme +directory /var/lib/ldap +# directory /usr/local/var/openldap-data +index objectClass,cn eq +######## +# ACLs # +######## +access to attrs=userPassword + by anonymous auth + by self write + by * none +access to * + by self write + by * none +SLAPD_CONF_EOF + +mv /etc/ldap/ldap.conf /etc/ldap/ldap.conf.orig + +cat >/etc/ldap/ldap.conf <<LDAP_CONF_EOF +# LDAP Client Settings +URI ldap://localhost +BASE dc=example,dc=com +BINDDN cn=Manager,dc=example,dc=com +SIZELIMIT 0 +TIMELIMIT 0 +LDAP_CONF_EOF + +cat >/etc/ldap/base.ldif <<BASE_LDIF_EOF +# This is the root of the directory tree +dn: dc=example,dc=com +description: Example.Com, your trusted non-existent corporation. +dc: example +o: Example.Com +objectClass: top +objectClass: dcObject +objectClass: organization + +# Subtree for users +dn: ou=Users,dc=example,dc=com +ou: Users +description: Users +objectClass: organizationalUnit + +# Subtree for groups +dn: ou=Groups,dc=example,dc=com +ou: Groups +description: Groups +objectClass: organizationalUnit + +# Subtree for system accounts +dn: ou=System,dc=example,dc=com +ou: System +description: Special accounts used by software applications. +objectClass: organizationalUnit + +# Special Account for Authentication: +dn: uid=authenticate,ou=System,dc=example,dc=com +uid: authenticate +ou: System +description: Special account for authenticating users +userPassword: {MD5}TLnIqASP0CKUR3/LGkEZGg== +objectClass: account +objectClass: simpleSecurityObject +BASE_LDIF_EOF + +/etc/init.d/slapd stop +rm -rf /var/lib/ldap/* +rm -rf /etc/ldap/slapd.d/* +slaptest -f /etc/ldap/slapd.conf -F /etc/ldap/slapd.d +cp /usr/share/slapd/DB_CONFIG /var/lib/ldap/DB_CONFIG +slapadd -v -l /etc/ldap/base.ldif +chown -R openldap:openldap /etc/ldap/slapd.d +chown -R openldap:openldap /var/lib/ldap +/etc/init.d/slapd start diff --git a/nova/auth/users.py b/nova/auth/users.py new file mode 100755 index 000000000..d8ea8ac68 --- /dev/null +++ b/nova/auth/users.py @@ -0,0 +1,454 @@ +#!/usr/bin/env python +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Nova users and user management, including RBAC hooks. +""" + +import datetime +import logging +import os +import shutil +import tempfile +import uuid +import zipfile + +try: + import ldap +except Exception, e: + import fakeldap as ldap + +import fakeldap +from nova import datastore + +# TODO(termie): clean up these imports +import signer +from nova import exception +from nova import flags +from nova import crypto +from nova import utils +import access as simplerbac + +from nova import objectstore # for flags + +FLAGS = flags.FLAGS + +flags.DEFINE_string('ldap_url', 'ldap://localhost', 'Point this at your ldap server') +flags.DEFINE_string('ldap_password', 'changeme', 'LDAP password') +flags.DEFINE_string('user_dn', 'cn=Manager,dc=example,dc=com', 'DN of admin user') +flags.DEFINE_string('user_unit', 'Users', 'OID for Users') +flags.DEFINE_string('ldap_subtree', 'ou=Users,dc=example,dc=com', 'OU for Users') + +flags.DEFINE_string('ldap_sysadmin', + 'cn=sysadmins,ou=Groups,dc=example,dc=com', 'OU for Sysadmins') +flags.DEFINE_string('ldap_netadmin', + 'cn=netadmins,ou=Groups,dc=example,dc=com', 'OU for NetAdmins') +flags.DEFINE_string('ldap_cloudadmin', + 'cn=cloudadmins,ou=Groups,dc=example,dc=com', 'OU for Cloud Admins') +flags.DEFINE_string('ldap_itsec', + 'cn=itsec,ou=Groups,dc=example,dc=com', 'OU for ItSec') + +flags.DEFINE_string('credentials_template', + utils.abspath('auth/novarc.template'), + 'Template for creating users rc file') +flags.DEFINE_string('credential_key_file', 'pk.pem', + 'Filename of private key in credentials zip') +flags.DEFINE_string('credential_cert_file', 'cert.pem', + 'Filename of certificate in credentials zip') +flags.DEFINE_string('credential_rc_file', 'novarc', + 'Filename of rc in credentials zip') + +_log = logging.getLogger('auth') +_log.setLevel(logging.WARN) + + + +class UserError(exception.ApiError): + pass + +class InvalidKeyPair(exception.ApiError): + pass + +class User(object): + def __init__(self, id, name, access, secret, admin): + self.manager = UserManager.instance() + self.id = id + self.name = name + self.access = access + self.secret = secret + self.admin = admin + self.keeper = datastore.Keeper(prefix="user") + + + def is_admin(self): + return self.admin + + def has_role(self, role_type): + return self.manager.has_role(self.id, role_type) + + def is_authorized(self, owner_id, action=None): + if self.is_admin() or owner_id == self.id: + return True + if action == None: + return False + project = None #(Fixme) + target_object = None # (Fixme, should be passed in) + return simplerbac.is_allowed(action, self, project, target_object) + + def get_credentials(self): + rc = self.generate_rc() + private_key, signed_cert = self.generate_x509_cert() + + tmpdir = tempfile.mkdtemp() + zf = os.path.join(tmpdir, "temp.zip") + zippy = zipfile.ZipFile(zf, 'w') + zippy.writestr(FLAGS.credential_rc_file, rc) + zippy.writestr(FLAGS.credential_key_file, private_key) + zippy.writestr(FLAGS.credential_cert_file, signed_cert) + zippy.writestr(FLAGS.ca_file, crypto.fetch_ca(self.id)) + zippy.close() + with open(zf, 'rb') as f: + buffer = f.read() + + shutil.rmtree(tmpdir) + return buffer + + + def generate_rc(self): + rc = open(FLAGS.credentials_template).read() + rc = rc % { 'access': self.access, + 'secret': self.secret, + 'ec2': FLAGS.ec2_url, + 's3': 'http://%s:%s' % (FLAGS.s3_host, FLAGS.s3_port), + 'nova': FLAGS.ca_file, + 'cert': FLAGS.credential_cert_file, + 'key': FLAGS.credential_key_file, + } + return rc + + def generate_key_pair(self, name): + return self.manager.generate_key_pair(self.id, name) + + def generate_x509_cert(self): + return self.manager.generate_x509_cert(self.id) + + def create_key_pair(self, name, public_key, fingerprint): + return self.manager.create_key_pair(self.id, + name, + public_key, + fingerprint) + + def get_key_pair(self, name): + return self.manager.get_key_pair(self.id, name) + + def delete_key_pair(self, name): + return self.manager.delete_key_pair(self.id, name) + + def get_key_pairs(self): + return self.manager.get_key_pairs(self.id) + +class KeyPair(object): + def __init__(self, name, owner, public_key, fingerprint): + self.manager = UserManager.instance() + self.owner = owner + self.name = name + self.public_key = public_key + self.fingerprint = fingerprint + + def delete(self): + return self.manager.delete_key_pair(self.owner, self.name) + +class UserManager(object): + def __init__(self): + if hasattr(self.__class__, '_instance'): + raise Exception('Attempted to instantiate singleton') + + @classmethod + def instance(cls): + if not hasattr(cls, '_instance'): + inst = UserManager() + cls._instance = inst + if FLAGS.fake_users: + try: + inst.create_user('fake', 'fake', 'fake') + except: pass + try: + inst.create_user('user', 'user', 'user') + except: pass + try: + inst.create_user('admin', 'admin', 'admin', True) + except: pass + return cls._instance + + def authenticate(self, params, signature, verb='GET', server_string='127.0.0.1:8773', path='/'): + # TODO: Check for valid timestamp + access_key = params['AWSAccessKeyId'] + user = self.get_user_from_access_key(access_key) + if user == None: + return None + # hmac can't handle unicode, so encode ensures that secret isn't unicode + expected_signature = signer.Signer(user.secret.encode()).generate(params, verb, server_string, path) + _log.debug('user.secret: %s', user.secret) + _log.debug('expected_signature: %s', expected_signature) + _log.debug('signature: %s', signature) + if signature == expected_signature: + return user + + def has_role(self, user, role, project=None): + # Map role to ldap group + group = FLAGS.__getitem__("ldap_%s" % role) + with LDAPWrapper() as conn: + return conn.is_member_of(user, group) + + def add_role(self, user, role, project=None): + # TODO: Project-specific roles + group = FLAGS.__getitem__("ldap_%s" % role) + with LDAPWrapper() as conn: + return conn.add_to_group(user, group) + + def get_user(self, uid): + with LDAPWrapper() as conn: + return conn.find_user(uid) + + def get_user_from_access_key(self, access_key): + with LDAPWrapper() as conn: + return conn.find_user_by_access_key(access_key) + + def get_users(self): + with LDAPWrapper() as conn: + return conn.find_users() + + def create_user(self, uid, access=None, secret=None, admin=False): + if access == None: access = str(uuid.uuid4()) + if secret == None: secret = str(uuid.uuid4()) + with LDAPWrapper() as conn: + u = conn.create_user(uid, access, secret, admin) + return u + + def delete_user(self, uid): + with LDAPWrapper() as conn: + conn.delete_user(uid) + + def generate_key_pair(self, uid, key_name): + # generating key pair is slow so delay generation + # until after check + with LDAPWrapper() as conn: + if not conn.user_exists(uid): + raise UserError("User " + uid + " doesn't exist") + if conn.key_pair_exists(uid, key_name): + raise InvalidKeyPair("The keypair '" + + key_name + + "' already exists.", + "Duplicate") + private_key, public_key, fingerprint = crypto.generate_key_pair() + self.create_key_pair(uid, key_name, public_key, fingerprint) + return private_key, fingerprint + + def create_key_pair(self, uid, key_name, public_key, fingerprint): + with LDAPWrapper() as conn: + return conn.create_key_pair(uid, key_name, public_key, fingerprint) + + def get_key_pair(self, uid, key_name): + with LDAPWrapper() as conn: + return conn.find_key_pair(uid, key_name) + + def get_key_pairs(self, uid): + with LDAPWrapper() as conn: + return conn.find_key_pairs(uid) + + def delete_key_pair(self, uid, key_name): + with LDAPWrapper() as conn: + conn.delete_key_pair(uid, key_name) + + def get_signed_zip(self, uid): + user = self.get_user(uid) + return user.get_credentials() + + def generate_x509_cert(self, uid): + (private_key, csr) = crypto.generate_x509_cert(self.__cert_subject(uid)) + # TODO - This should be async call back to the cloud controller + signed_cert = crypto.sign_csr(csr, uid) + return (private_key, signed_cert) + + def sign_cert(self, csr, uid): + return crypto.sign_csr(csr, uid) + + def __cert_subject(self, uid): + return "/C=US/ST=California/L=The_Mission/O=AnsoLabs/OU=Nova/CN=%s-%s" % (uid, str(datetime.datetime.utcnow().isoformat())) + + +class LDAPWrapper(object): + def __init__(self): + self.user = FLAGS.user_dn + self.passwd = FLAGS.ldap_password + + def __enter__(self): + self.connect() + return self + + def __exit__(self, type, value, traceback): + #logging.info('type, value, traceback: %s, %s, %s', type, value, traceback) + self.conn.unbind_s() + return False + + def connect(self): + """ connect to ldap as admin user """ + if FLAGS.fake_users: + self.conn = fakeldap.initialize(FLAGS.ldap_url) + else: + assert(ldap.__name__ != 'fakeldap') + self.conn = ldap.initialize(FLAGS.ldap_url) + self.conn.simple_bind_s(self.user, self.passwd) + + def find_object(self, dn, query = None): + objects = self.find_objects(dn, query) + if len(objects) == 0: + return None + return objects[0] + + def find_objects(self, dn, query = None): + try: + res = self.conn.search_s(dn, ldap.SCOPE_SUBTREE, query) + except Exception: + return [] + # just return the attributes + return [x[1] for x in res] + + def find_users(self): + attrs = self.find_objects(FLAGS.ldap_subtree, '(objectclass=novaUser)') + return [self.__to_user(attr) for attr in attrs] + + def find_key_pairs(self, uid): + dn = 'uid=%s,%s' % (uid, FLAGS.ldap_subtree) + attrs = self.find_objects(dn, '(objectclass=novaKeyPair)') + return [self.__to_key_pair(uid, attr) for attr in attrs] + + def find_user(self, name): + dn = 'uid=%s,%s' % (name, FLAGS.ldap_subtree) + attr = self.find_object(dn, '(objectclass=novaUser)') + return self.__to_user(attr) + + def user_exists(self, name): + return self.find_user(name) != None + + def find_key_pair(self, uid, key_name): + dn = 'cn=%s,uid=%s,%s' % (key_name, + uid, + FLAGS.ldap_subtree) + attr = self.find_object(dn, '(objectclass=novaKeyPair)') + return self.__to_key_pair(uid, attr) + + def delete_key_pairs(self, uid): + keys = self.find_key_pairs(uid) + if keys != None: + for key in keys: + self.delete_key_pair(uid, key.name) + + def key_pair_exists(self, uid, key_name): + return self.find_key_pair(uid, key_name) != None + + def create_user(self, name, access_key, secret_key, is_admin): + if self.user_exists(name): + raise UserError("LDAP user " + name + " already exists") + attr = [ + ('objectclass', ['person', + 'organizationalPerson', + 'inetOrgPerson', + 'novaUser']), + ('ou', [FLAGS.user_unit]), + ('uid', [name]), + ('sn', [name]), + ('cn', [name]), + ('secretKey', [secret_key]), + ('accessKey', [access_key]), + ('isAdmin', [str(is_admin).upper()]), + ] + self.conn.add_s('uid=%s,%s' % (name, FLAGS.ldap_subtree), + attr) + return self.__to_user(dict(attr)) + + def create_project(self, name, project_manager): + # PM can be user object or string containing DN + pass + + def is_member_of(self, name, group): + return True + + def add_to_group(self, name, group): + pass + + def remove_from_group(self, name, group): + pass + + def create_key_pair(self, uid, key_name, public_key, fingerprint): + """create's a public key in the directory underneath the user""" + # TODO(vish): possibly refactor this to store keys in their own ou + # and put dn reference in the user object + attr = [ + ('objectclass', ['novaKeyPair']), + ('cn', [key_name]), + ('sshPublicKey', [public_key]), + ('keyFingerprint', [fingerprint]), + ] + self.conn.add_s('cn=%s,uid=%s,%s' % (key_name, + uid, + FLAGS.ldap_subtree), + attr) + return self.__to_key_pair(uid, dict(attr)) + + def find_user_by_access_key(self, access): + query = '(' + 'accessKey' + '=' + access + ')' + dn = FLAGS.ldap_subtree + return self.__to_user(self.find_object(dn, query)) + + def delete_key_pair(self, uid, key_name): + if not self.key_pair_exists(uid, key_name): + raise UserError("Key Pair " + + key_name + + " doesn't exist for user " + + uid) + self.conn.delete_s('cn=%s,uid=%s,%s' % (key_name, uid, + FLAGS.ldap_subtree)) + + def delete_user(self, name): + if not self.user_exists(name): + raise UserError("User " + + name + + " doesn't exist") + self.delete_key_pairs(name) + self.conn.delete_s('uid=%s,%s' % (name, + FLAGS.ldap_subtree)) + + def __to_user(self, attr): + if attr == None: + return None + return User( + id = attr['uid'][0], + name = attr['uid'][0], + access = attr['accessKey'][0], + secret = attr['secretKey'][0], + admin = (attr['isAdmin'][0] == 'TRUE') + ) + + def __to_key_pair(self, owner, attr): + if attr == None: + return None + return KeyPair( + owner = owner, + name = attr['cn'][0], + public_key = attr['sshPublicKey'][0], + fingerprint = attr['keyFingerprint'][0], + ) diff --git a/nova/compute/__init__.py b/nova/compute/__init__.py new file mode 100644 index 000000000..e8a6921e7 --- /dev/null +++ b/nova/compute/__init__.py @@ -0,0 +1,28 @@ +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +:mod:`nova.compute` -- Compute Nodes using LibVirt +===================================================== + +.. automodule:: nova.compute + :platform: Unix + :synopsis: Thin wrapper around libvirt for VM mgmt. +.. moduleauthor:: Jesse Andrews <jesse@ansolabs.com> +.. moduleauthor:: Devin Carlen <devin.carlen@gmail.com> +.. moduleauthor:: Vishvananda Ishaya <vishvananda@yahoo.com> +.. moduleauthor:: Joshua McKenty <joshua@cognition.ca> +.. moduleauthor:: Manish Singh <yosh@gimp.org> +.. moduleauthor:: Andy Smith <andy@anarkystic.com> +"""
\ No newline at end of file diff --git a/nova/compute/disk.py b/nova/compute/disk.py new file mode 100644 index 000000000..d3eeb951f --- /dev/null +++ b/nova/compute/disk.py @@ -0,0 +1,122 @@ +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Utility methods to resize, repartition, and modify disk images. +Includes injection of SSH PGP keys into authorized_keys file. +""" + +import logging +import os +import tempfile + +from nova.exception import Error +from nova.utils import execute + +def partition(infile, outfile, local_bytes=0, local_type='ext2'): + """Takes a single partition represented by infile and writes a bootable drive image into outfile. + The first 63 sectors (0-62) of the resulting image is a master boot record. + Infile becomes the first primary partition. + If local bytes is specified, a second primary partition is created and formatted as ext2. + In the diagram below, dashes represent drive sectors. + 0 a b c d e + +-----+------. . .-------+------. . .------+ + | mbr | primary partiton | local partition | + +-----+------. . .-------+------. . .------+ + """ + sector_size = 512 + file_size = os.path.getsize(infile) + if file_size % sector_size != 0: + logging.warn("Input partition size not evenly divisible by sector size: %d / %d" (file_size, sector_size)) + primary_sectors = file_size / sector_size + if local_bytes % sector_size != 0: + logging.warn("Bytes for local storage not evenly divisible by sector size: %d / %d" (local_bytes, sector_size)) + local_sectors = local_bytes / sector_size + + mbr_last = 62 # a + primary_first = mbr_last + 1 # b + primary_last = primary_first + primary_sectors # c + local_first = primary_last + 1 # d + local_last = local_first + local_sectors # e + last_sector = local_last # e + + # create an empty file + execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d' % (outfile, last_sector, sector_size)) + + # make mbr partition + execute('parted --script %s mklabel msdos' % outfile) + + # make primary partition + execute('parted --script %s mkpart primary %ds %ds' % (outfile, primary_first, primary_last)) + + # make local partition + if local_bytes > 0: + execute('parted --script %s mkpartfs primary %s %ds %ds' % (outfile, local_type, local_first, local_last)) + + # copy file into partition + execute('dd if=%s of=%s bs=%d seek=%d conv=notrunc,fsync' % (infile, outfile, sector_size, primary_first)) + + +def inject_key(key, image, partition=None): + """Injects a ssh key into a disk image. + It adds the specified key to /root/.ssh/authorized_keys + it will mount the image as a fully partitioned disk and attempt to inject into the specified partition number. + If partition is not specified it mounts the image as a single partition. + """ + out, err = execute('sudo losetup -f --show %s' % image) + if err: + raise Error('Could not attach image to loopback: %s' % err) + device = out.strip() + try: + if not partition is None: + # create partition + out, err = execute('sudo kpartx -a %s' % device) + if err: + raise Error('Failed to load partition: %s' % err) + mapped_device = '/dev/mapper/%sp%s' % ( device.split('/')[-1] , partition ) + else: + mapped_device = device + out, err = execute('sudo tune2fs -c 0 -i 0 %s' % mapped_device) + + tmpdir = tempfile.mkdtemp() + try: + # mount loopback to dir + out, err = execute('sudo mount %s %s' % (mapped_device, tmpdir)) + if err: + raise Error('Failed to mount filesystem: %s' % err) + + try: + # inject key file + _inject_into_fs(key, tmpdir) + finally: + # unmount device + execute('sudo umount %s' % mapped_device) + finally: + # remove temporary directory + os.rmdir(tmpdir) + if not partition is None: + # remove partitions + execute('sudo kpartx -d %s' % device) + finally: + # remove loopback + execute('sudo losetup -d %s' % device) + +def _inject_into_fs(key, fs): + sshdir = os.path.join(os.path.join(fs, 'root'), '.ssh') + execute('sudo mkdir %s' % sshdir) #error on existing dir doesn't matter + execute('sudo chown root %s' % sshdir) + execute('sudo chmod 700 %s' % sshdir) + keyfile = os.path.join(sshdir, 'authorized_keys') + execute('sudo bash -c "cat >> %s"' % keyfile, '\n' + key + '\n') + diff --git a/nova/compute/exception.py b/nova/compute/exception.py new file mode 100644 index 000000000..6fe8e381f --- /dev/null +++ b/nova/compute/exception.py @@ -0,0 +1,35 @@ +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Exceptions for Compute Node errors, mostly network addressing. +""" + +from nova.exception import Error + +class NoMoreAddresses(Error): + pass + +class AddressNotAllocated(Error): + pass + +class AddressAlreadyAssociated(Error): + pass + +class AddressNotAssociated(Error): + pass + +class NotValidNetworkSize(Error): + pass + diff --git a/nova/compute/fakevirtinstance.xml b/nova/compute/fakevirtinstance.xml new file mode 100644 index 000000000..6036516bb --- /dev/null +++ b/nova/compute/fakevirtinstance.xml @@ -0,0 +1,43 @@ +<!-- +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + --> +<domain type='kvm' id='100'> + <name>i-A9B8C7D6</name> + <uuid>12a345bc-67c8-901d-2e34-56f7g89012h3</uuid> + <memory>524288</memory> + <currentMemory>524288</currentMemory> + <vcpu>1</vcpu> + <os/> + <features> + <acpi/> + </features> + <clock offset='utc'/> + <on_poweroff>destroy</on_poweroff> + <on_reboot>restart</on_reboot> + <on_crash>destroy</on_crash> + <devices> + <emulator>/usr/bin/kvm</emulator> + <disk type='file' device='disk'> + <source file='/var/lib/fakevirt/instances/i-A9B8C7D6/disk'/> + <target dev='sda' bus='scsi'/> + </disk> + <interface type='bridge'> + <mac address='a0:1b:c2:3d:4e:f5'/> + <source bridge='fakebr2000'/> + <target dev='vnet1'/> + <model type='e1000'/> + </interface> + </devices> +</domain>
\ No newline at end of file diff --git a/nova/compute/libvirt.xml.template b/nova/compute/libvirt.xml.template new file mode 100644 index 000000000..4cf6e8b10 --- /dev/null +++ b/nova/compute/libvirt.xml.template @@ -0,0 +1,46 @@ +<!-- +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + --> +<domain type='kvm'> + <name>%(name)s</name> + <os> + <type>hvm</type> + <kernel>%(basepath)s/kernel</kernel> + <initrd>%(basepath)s/ramdisk</initrd> + <cmdline>root=/dev/vda1 console=ttyS0</cmdline> + </os> + <features> + <acpi/> + </features> + <memory>%(memory_kb)s</memory> + <vcpu>%(vcpus)s</vcpu> + <devices> + <emulator>/usr/bin/kvm</emulator> + <disk type='file'> + <source file='%(basepath)s/disk'/> + <target dev='vda' bus='virtio'/> + </disk> + <interface type='bridge'> + <source bridge='%(bridge_name)s'/> + <mac address='%(mac_address)s'/> + <!-- <model type='virtio'/> CANT RUN virtio network right now --> + </interface> + <serial type="file"> + <source path='%(basepath)s/console.log'/> + <target port='1'/> + </serial> + </devices> + <nova>%(nova)s</nova> +</domain> diff --git a/nova/compute/linux_net.py b/nova/compute/linux_net.py new file mode 100644 index 000000000..0983241f9 --- /dev/null +++ b/nova/compute/linux_net.py @@ -0,0 +1,146 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +import signal +import os +import nova.utils +import subprocess + +# todo(ja): does the definition of network_path belong here? + +from nova import flags +FLAGS=flags.FLAGS + +def execute(cmd): + if FLAGS.fake_network: + print "FAKE NET: %s" % cmd + return "fake", 0 + else: + nova.utils.execute(cmd) + +def runthis(desc, cmd): + if FLAGS.fake_network: + execute(cmd) + else: + nova.utils.runthis(desc,cmd) + +def Popen(cmd): + if FLAGS.fake_network: + execute(' '.join(cmd)) + else: + subprocess.Popen(cmd) + + +def device_exists(device): + (out, err) = execute("ifconfig %s" % device) + return not err + +def confirm_rule(cmd): + execute("sudo iptables --delete %s" % (cmd)) + execute("sudo iptables -I %s" % (cmd)) + +def remove_rule(cmd): + execute("sudo iptables --delete %s" % (cmd)) + +def bind_public_ip(ip, interface): + runthis("Binding IP to interface: %s", "sudo ip addr add %s dev %s" % (ip, interface)) + +def vlan_create(net): + """ create a vlan on on a bridge device unless vlan already exists """ + if not device_exists("vlan%s" % net.vlan): + execute("sudo vconfig set_name_type VLAN_PLUS_VID_NO_PAD") + execute("sudo vconfig add %s %s" % (net.bridge_dev, net.vlan)) + execute("sudo ifconfig vlan%s up" % (net.vlan)) + +def bridge_create(net): + """ create a bridge on a vlan unless it already exists """ + if not device_exists(net.bridge_name): + execute("sudo brctl addbr %s" % (net.bridge_name)) + # execute("sudo brctl setfd %s 0" % (net.bridge_name)) + # execute("sudo brctl setageing %s 10" % (net.bridge_name)) + execute("sudo brctl stp %s off" % (net.bridge_name)) + execute("sudo brctl addif %s vlan%s" % (net.bridge_name, net.vlan)) + if net.bridge_gets_ip: + execute("sudo ifconfig %s %s broadcast %s netmask %s up" % \ + (net.bridge_name, net.gateway, net.broadcast, net.netmask)) + confirm_rule("FORWARD --in-interface %s -j ACCEPT" % (net.bridge_name)) + else: + execute("sudo ifconfig %s up" % net.bridge_name) + +def dnsmasq_cmd(net): + cmd = ['sudo dnsmasq', + ' --strict-order', + ' --bind-interfaces', + ' --conf-file=', + ' --pid-file=%s' % dhcp_file(net.vlan, 'pid'), + ' --listen-address=%s' % net.dhcp_listen_address, + ' --except-interface=lo', + ' --dhcp-range=%s,%s,120s' % (net.dhcp_range_start, net.dhcp_range_end), + ' --dhcp-lease-max=61', + ' --dhcp-hostsfile=%s' % dhcp_file(net.vlan, 'conf'), + ' --dhcp-leasefile=%s' % dhcp_file(net.vlan, 'leases')] + return ''.join(cmd) + +def hostDHCP(network, host): + idx = host['address'].split(".")[-1] # Logically, the idx of instances they've launched in this net + return "%s,%s-%s-%s.novalocal,%s" % \ + (host['mac'], host['user_id'], network.vlan, idx, host['address']) + +# todo(ja): if the system has restarted or pid numbers have wrapped +# then you cannot be certain that the pid refers to the +# dnsmasq. As well, sending a HUP only reloads the hostfile, +# so any configuration options (like dchp-range, vlan, ...) +# aren't reloaded +def start_dnsmasq(network): + """ (re)starts a dnsmasq server for a given network + + if a dnsmasq instance is already running then send a HUP + signal causing it to reload, otherwise spawn a new instance + """ + with open(dhcp_file(network.vlan, 'conf'), 'w') as f: + for host_name in network.hosts: + f.write("%s\n" % hostDHCP(network, network.hosts[host_name])) + + pid = dnsmasq_pid_for(network) + + # if dnsmasq is already running, then tell it to reload + if pid: + # todo(ja): use "/proc/%d/cmdline" % (pid) to determine if pid refers + # correct dnsmasq process + try: + os.kill(pid, signal.SIGHUP) + return + except Exception, e: + logging.debug("Killing dnsmasq threw %s", e) + + # otherwise delete the existing leases file and start dnsmasq + lease_file = dhcp_file(network.vlan, 'leases') + if os.path.exists(lease_file): + os.unlink(lease_file) + + Popen(dnsmasq_cmd(network).split(" ")) + +def stop_dnsmasq(network): + """ stops the dnsmasq instance for a given network """ + pid = dnsmasq_pid_for(network) + + if pid: + os.kill(pid, signal.SIGTERM) + +def dhcp_file(vlan, kind): + """ return path to a pid, leases or conf file for a vlan """ + + return os.path.abspath("%s/nova-%s.%s" % (FLAGS.networks_path, vlan, kind)) + +def dnsmasq_pid_for(network): + """ the pid for prior dnsmasq instance for a vlan, + returns None if no pid file exists + + if machine has rebooted pid might be incorrect (caller should check) + """ + + pid_file = dhcp_file(network.vlan, 'pid') + + if os.path.exists(pid_file): + with open(pid_file, 'r') as f: + return int(f.read()) + diff --git a/nova/compute/model.py b/nova/compute/model.py new file mode 100644 index 000000000..78ed3a101 --- /dev/null +++ b/nova/compute/model.py @@ -0,0 +1,203 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 expandtab +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Datastore Model objects for Compute Instances, with +InstanceDirectory manager. + +# Create a new instance? +>>> InstDir = InstanceDirectory() +>>> inst = InstDir.new() +>>> inst.destroy() +True +>>> inst = InstDir['i-123'] +>>> inst['ip'] = "192.168.0.3" +>>> inst['owner_id'] = "projectA" +>>> inst.save() +True + +>>> InstDir['i-123'] +<Instance:i-123> +>>> InstDir.all.next() +<Instance:i-123> + +>>> inst.destroy() +True +""" + +from nova import vendor + +from nova import datastore +from nova import flags +from nova import utils + + +FLAGS = flags.FLAGS + + +# TODO(ja): singleton instance of the directory +class InstanceDirectory(object): + """an api for interacting with the global state of instances """ + def __init__(self): + self.keeper = datastore.Keeper(FLAGS.instances_prefix) + + def get(self, instance_id): + """ returns an instance object for a given id """ + return Instance(instance_id) + + def __getitem__(self, item): + return self.get(item) + + def by_project(self, project): + """ returns a list of instance objects for a project """ + for instance_id in self.keeper['project:%s:instances' % project]: + yield Instance(instance_id) + + def by_node(self, node_id): + """ returns a list of instances for a node """ + for instance in self.all: + if instance['node_name'] == node_id: + yield instance + + def by_ip(self, ip_address): + """ returns an instance object that is using the IP """ + for instance in self.all: + if instance['private_dns_name'] == ip_address: + return instance + return None + + def by_volume(self, volume_id): + """ returns the instance a volume is attached to """ + pass + + def exists(self, instance_id): + if instance_id in self.keeper['instances']: + return True + return False + + @property + def all(self): + """ returns a list of all instances """ + instances = self.keeper['instances'] + if instances != None: + for instance_id in self.keeper['instances']: + yield Instance(instance_id) + + def new(self): + """ returns an empty Instance object, with ID """ + instance_id = utils.generate_uid('i') + return self.get(instance_id) + + + +class Instance(object): + """ Wrapper around stored properties of an instance """ + + def __init__(self, instance_id): + """ loads an instance from the datastore if exists """ + self.keeper = datastore.Keeper(FLAGS.instances_prefix) + self.instance_id = instance_id + self.initial_state = {} + self.state = self.keeper[self.__redis_key] + if self.state: + self.initial_state = self.state + else: + self.state = {'state' : 'pending', + 'instance_id' : instance_id, + 'node_name' : 'unassigned', + 'owner_id' : 'unassigned' } + + @property + def __redis_key(self): + """ Magic string for instance keys """ + return 'instance:%s' % self.instance_id + + def __repr__(self): + return "<Instance:%s>" % self.instance_id + + def get(self, item, default): + return self.state.get(item, default) + + def __getitem__(self, item): + return self.state[item] + + def __setitem__(self, item, val): + self.state[item] = val + return self.state[item] + + def __delitem__(self, item): + """ We don't support this """ + raise Exception("Silly monkey, Instances NEED all their properties.") + + def save(self): + """ update the directory with the state from this instance + make sure you've set the owner_id before you call save + for the first time. + """ + # TODO(ja): implement hmset in redis-py and use it + # instead of multiple calls to hset + state = self.keeper[self.__redis_key] + if not state: + state = {} + for key, val in self.state.iteritems(): + # if (not self.initial_state.has_key(key) + # or self.initial_state[key] != val): + state[key] = val + self.keeper[self.__redis_key] = state + if self.initial_state == {}: + self.keeper.set_add('project:%s:instances' % self.state['owner_id'], + self.instance_id) + self.keeper.set_add('instances', self.instance_id) + self.initial_state = self.state + return True + + def destroy(self): + """ deletes all related records from datastore. + does NOT do anything to running libvirt state. + """ + self.keeper.set_remove('project:%s:instances' % self.state['owner_id'], + self.instance_id) + del self.keeper[self.__redis_key] + self.keeper.set_remove('instances', self.instance_id) + return True + + @property + def volumes(self): + """ returns a list of attached volumes """ + pass + + @property + def reservation(self): + """ Returns a reservation object """ + pass + +# class Reservation(object): +# """ ORM wrapper for a batch of launched instances """ +# def __init__(self): +# pass +# +# def userdata(self): +# """ """ +# pass +# +# +# class NodeDirectory(object): +# def __init__(self): +# pass +# + +if __name__ == "__main__": + import doctest + doctest.testmod() diff --git a/nova/compute/network.py b/nova/compute/network.py new file mode 100644 index 000000000..612295f27 --- /dev/null +++ b/nova/compute/network.py @@ -0,0 +1,520 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Classes for network control, including VLANs, DHCP, and IP allocation. +""" + +import json +import logging +import os + +# TODO(termie): clean up these imports +from nova import vendor +import IPy + +from nova import datastore +import nova.exception +from nova.compute import exception +from nova import flags +from nova import utils +from nova.auth import users + +import linux_net + +FLAGS = flags.FLAGS +flags.DEFINE_string('net_libvirt_xml_template', + utils.abspath('compute/net.libvirt.xml.template'), + 'Template file for libvirt networks') +flags.DEFINE_string('networks_path', utils.abspath('../networks'), + 'Location to keep network config files') +flags.DEFINE_integer('public_vlan', 1, 'VLAN for public IP addresses') +flags.DEFINE_string('public_interface', 'vlan1', 'Interface for public IP addresses') +flags.DEFINE_string('bridge_dev', 'eth1', + 'network device for bridges') +flags.DEFINE_integer('vlan_start', 100, 'First VLAN for private networks') +flags.DEFINE_integer('vlan_end', 4093, 'Last VLAN for private networks') +flags.DEFINE_integer('network_size', 256, 'Number of addresses in each private subnet') +flags.DEFINE_string('public_range', '4.4.4.0/24', 'Public IP address block') +flags.DEFINE_string('private_range', '10.0.0.0/8', 'Private IP address block') + + +# HACK(vish): to delay _get_keeper() loading +def _get_keeper(): + if _get_keeper.keeper == None: + _get_keeper.keeper = datastore.Keeper(prefix="net") + return _get_keeper.keeper +_get_keeper.keeper = None + +logging.getLogger().setLevel(logging.DEBUG) + +# CLEANUP: +# TODO(ja): use singleton for usermanager instead of self.manager in vlanpool et al +# TODO(ja): does vlanpool "keeper" need to know the min/max - shouldn't FLAGS always win? + +class Network(object): + def __init__(self, *args, **kwargs): + self.bridge_gets_ip = False + try: + os.makedirs(FLAGS.networks_path) + except Exception, err: + pass + self.load(**kwargs) + + def to_dict(self): + return {'vlan': self.vlan, + 'network': self.network_str, + 'hosts': self.hosts} + + def load(self, **kwargs): + self.network_str = kwargs.get('network', "192.168.100.0/24") + self.hosts = kwargs.get('hosts', {}) + self.vlan = kwargs.get('vlan', 100) + self.name = "nova-%s" % (self.vlan) + self.network = IPy.IP(self.network_str) + self.gateway = self.network[1] + self.netmask = self.network.netmask() + self.broadcast = self.network.broadcast() + self.bridge_name = "br%s" % (self.vlan) + + def __str__(self): + return json.dumps(self.to_dict()) + + def __unicode__(self): + return json.dumps(self.to_dict()) + + @classmethod + def from_dict(cls, args): + for arg in args.keys(): + value = args[arg] + del args[arg] + args[str(arg)] = value + self = cls(**args) + return self + + @classmethod + def from_json(cls, json_string): + parsed = json.loads(json_string) + return cls.from_dict(parsed) + + def range(self): + for idx in range(3, len(self.network)-2): + yield self.network[idx] + + def allocate_ip(self, user_id, mac): + for ip in self.range(): + address = str(ip) + if not address in self.hosts.keys(): + logging.debug("Allocating IP %s to %s" % (address, user_id)) + self.hosts[address] = { + "address" : address, "user_id" : user_id, 'mac' : mac + } + self.express(address=address) + return address + raise exception.NoMoreAddresses() + + def deallocate_ip(self, ip_str): + if not ip_str in self.hosts.keys(): + raise exception.AddressNotAllocated() + del self.hosts[ip_str] + # TODO(joshua) SCRUB from the leases file somehow + self.deexpress(address=ip_str) + + def list_addresses(self): + for address in self.hosts.values(): + yield address + + def express(self, address=None): + pass + + def deexpress(self, address=None): + pass + + +class Vlan(Network): + """ + VLAN configuration, that when expressed creates the vlan + + properties: + + vlan - integer (example: 42) + bridge_dev - string (example: eth0) + """ + + def __init__(self, *args, **kwargs): + super(Vlan, self).__init__(*args, **kwargs) + self.bridge_dev = FLAGS.bridge_dev + + def express(self, address=None): + super(Vlan, self).express(address=address) + try: + logging.debug("Starting VLAN inteface for %s network" % (self.vlan)) + linux_net.vlan_create(self) + except: + pass + + +class VirtNetwork(Vlan): + """ + Virtual Network that can export libvirt configuration or express itself to + create a bridge (with or without an IP address/netmask/gateway) + + properties: + bridge_name - string (example value: br42) + vlan - integer (example value: 42) + bridge_gets_ip - boolean used during bridge creation + + if bridge_gets_ip then network address for bridge uses the properties: + gateway + broadcast + netmask + """ + + def __init__(self, *args, **kwargs): + super(VirtNetwork, self).__init__(*args, **kwargs) + + def virtXML(self): + """ generate XML for libvirt network """ + + libvirt_xml = open(FLAGS.net_libvirt_xml_template).read() + xml_info = {'name' : self.name, + 'bridge_name' : self.bridge_name, + 'device' : "vlan%s" % (self.vlan), + 'gateway' : self.gateway, + 'netmask' : self.netmask, + } + libvirt_xml = libvirt_xml % xml_info + return libvirt_xml + + def express(self, address=None): + """ creates a bridge device on top of the Vlan """ + super(VirtNetwork, self).express(address=address) + try: + logging.debug("Starting Bridge inteface for %s network" % (self.vlan)) + linux_net.bridge_create(self) + except: + pass + +class DHCPNetwork(VirtNetwork): + """ + properties: + dhcp_listen_address: the ip of the gateway / dhcp host + dhcp_range_start: the first ip to give out + dhcp_range_end: the last ip to give out + """ + def __init__(self, *args, **kwargs): + super(DHCPNetwork, self).__init__(*args, **kwargs) + logging.debug("Initing DHCPNetwork object...") + self.bridge_gets_ip = True + self.dhcp_listen_address = self.network[1] + self.dhcp_range_start = self.network[3] + self.dhcp_range_end = self.network[-2] + + def express(self, address=None): + super(DHCPNetwork, self).express(address=address) + if len(self.hosts.values()) > 0: + logging.debug("Starting dnsmasq server for network with vlan %s" % self.vlan) + linux_net.start_dnsmasq(self) + else: + logging.debug("Not launching dnsmasq cause I don't think we have any hosts.") + + def deexpress(self, address=None): + # if this is the last address, stop dns + super(DHCPNetwork, self).deexpress(address=address) + if len(self.hosts.values()) == 0: + linux_net.stop_dnsmasq(self) + else: + linux_net.start_dnsmasq(self) + + +class PrivateNetwork(DHCPNetwork): + def __init__(self, **kwargs): + super(PrivateNetwork, self).__init__(**kwargs) + # self.express() + + def to_dict(self): + return {'vlan': self.vlan, + 'network': self.network_str, + 'hosts': self.hosts} + + def express(self, *args, **kwargs): + super(PrivateNetwork, self).express(*args, **kwargs) + + + +class PublicNetwork(Network): + def __init__(self, network="192.168.216.0/24", **kwargs): + super(PublicNetwork, self).__init__(network=network, **kwargs) + self.express() + + def allocate_ip(self, user_id, mac): + for ip in self.range(): + address = str(ip) + if not address in self.hosts.keys(): + logging.debug("Allocating IP %s to %s" % (address, user_id)) + self.hosts[address] = { + "address" : address, "user_id" : user_id, 'mac' : mac + } + self.express(address=address) + return address + raise exception.NoMoreAddresses() + + def deallocate_ip(self, ip_str): + if not ip_str in self.hosts: + raise exception.AddressNotAllocated() + del self.hosts[ip_str] + # TODO(joshua) SCRUB from the leases file somehow + self.deexpress(address=ip_str) + + def associate_address(self, public_ip, private_ip, instance_id): + if not public_ip in self.hosts: + raise exception.AddressNotAllocated() + for addr in self.hosts.values(): + if addr.has_key('private_ip') and addr['private_ip'] == private_ip: + raise exception.AddressAlreadyAssociated() + if self.hosts[public_ip].has_key('private_ip'): + raise exception.AddressAlreadyAssociated() + self.hosts[public_ip]['private_ip'] = private_ip + self.hosts[public_ip]['instance_id'] = instance_id + self.express(address=public_ip) + + def disassociate_address(self, public_ip): + if not public_ip in self.hosts: + raise exception.AddressNotAllocated() + if not self.hosts[public_ip].has_key('private_ip'): + raise exception.AddressNotAssociated() + self.deexpress(public_ip) + del self.hosts[public_ip]['private_ip'] + del self.hosts[public_ip]['instance_id'] + # TODO Express the removal + + def deexpress(self, address): + addr = self.hosts[address] + public_ip = addr['address'] + private_ip = addr['private_ip'] + linux_net.remove_rule("PREROUTING -t nat -d %s -j DNAT --to %s" % (public_ip, private_ip)) + linux_net.remove_rule("POSTROUTING -t nat -s %s -j SNAT --to %s" % (private_ip, public_ip)) + linux_net.remove_rule("FORWARD -d %s -p icmp -j ACCEPT" % (private_ip)) + for (protocol, port) in [("tcp",80), ("tcp",22), ("udp",1194), ("tcp",443)]: + linux_net.remove_rule("FORWARD -d %s -p %s --dport %s -j ACCEPT" % (private_ip, protocol, port)) + + def express(self, address=None): + logging.debug("Todo - need to create IPTables natting entries for this net.") + addresses = self.hosts.values() + if address: + addresses = [self.hosts[address]] + for addr in addresses: + if not addr.has_key('private_ip'): + continue + public_ip = addr['address'] + private_ip = addr['private_ip'] + linux_net.bind_public_ip(public_ip, FLAGS.public_interface) + linux_net.confirm_rule("PREROUTING -t nat -d %s -j DNAT --to %s" % (public_ip, private_ip)) + linux_net.confirm_rule("POSTROUTING -t nat -s %s -j SNAT --to %s" % (private_ip, public_ip)) + # TODO: Get these from the secgroup datastore entries + linux_net.confirm_rule("FORWARD -d %s -p icmp -j ACCEPT" % (private_ip)) + for (protocol, port) in [("tcp",80), ("tcp",22), ("udp",1194), ("tcp",443)]: + linux_net.confirm_rule("FORWARD -d %s -p %s --dport %s -j ACCEPT" % (private_ip, protocol, port)) + + +class NetworkPool(object): + # TODO - Allocations need to be system global + + def __init__(self): + self.network = IPy.IP(FLAGS.private_range) + netsize = FLAGS.network_size + if not netsize in [4,8,16,32,64,128,256,512,1024]: + raise exception.NotValidNetworkSize() + self.netsize = netsize + self.startvlan = FLAGS.vlan_start + + def get_from_vlan(self, vlan): + start = (vlan-self.startvlan) * self.netsize + net_str = "%s-%s" % (self.network[start], self.network[start + self.netsize - 1]) + logging.debug("Allocating %s" % net_str) + return net_str + + +class VlanPool(object): + def __init__(self, **kwargs): + self.start = FLAGS.vlan_start + self.end = FLAGS.vlan_end + self.vlans = kwargs.get('vlans', {}) + self.vlanpool = {} + self.manager = users.UserManager.instance() + for user_id, vlan in self.vlans.iteritems(): + self.vlanpool[vlan] = user_id + + def to_dict(self): + return {'vlans': self.vlans} + + def __str__(self): + return json.dumps(self.to_dict()) + + def __unicode__(self): + return json.dumps(self.to_dict()) + + @classmethod + def from_dict(cls, args): + for arg in args.keys(): + value = args[arg] + del args[arg] + args[str(arg)] = value + self = cls(**args) + return self + + @classmethod + def from_json(cls, json_string): + parsed = json.loads(json_string) + return cls.from_dict(parsed) + + def assign_vlan(self, user_id, vlan): + logging.debug("Assigning vlan %s to user %s" % (vlan, user_id)) + self.vlans[user_id] = vlan + self.vlanpool[vlan] = user_id + return self.vlans[user_id] + + def next(self, user_id): + for old_user_id, vlan in self.vlans.iteritems(): + if not self.manager.get_user(old_user_id): + _get_keeper()["%s-default" % old_user_id] = {} + del _get_keeper()["%s-default" % old_user_id] + del self.vlans[old_user_id] + return self.assign_vlan(user_id, vlan) + vlans = self.vlanpool.keys() + vlans.append(self.start) + nextvlan = max(vlans) + 1 + if nextvlan == self.end: + raise exception.AddressNotAllocated("Out of VLANs") + return self.assign_vlan(user_id, nextvlan) + + +class NetworkController(object): + """ The network controller is in charge of network connections """ + + def __init__(self, **kwargs): + logging.debug("Starting up the network controller.") + self.manager = users.UserManager.instance() + self._pubnet = None + if not _get_keeper()['vlans']: + _get_keeper()['vlans'] = {} + if not _get_keeper()['public']: + _get_keeper()['public'] = {'vlan': FLAGS.public_vlan, 'network' : FLAGS.public_range} + self.express() + + def reset(self): + _get_keeper()['public'] = {'vlan': FLAGS.public_vlan, 'network': FLAGS.public_range } + _get_keeper()['vlans'] = {} + # TODO : Get rid of old interfaces, bridges, and IPTables rules. + + @property + def public_net(self): + if not self._pubnet: + self._pubnet = PublicNetwork.from_dict(_get_keeper()['public']) + self._pubnet.load(**_get_keeper()['public']) + return self._pubnet + + @property + def vlan_pool(self): + return VlanPool.from_dict(_get_keeper()['vlans']) + + def get_network_from_name(self, network_name): + net_dict = _get_keeper()[network_name] + if net_dict: + return PrivateNetwork.from_dict(net_dict) + return None + + def get_public_ip_for_instance(self, instance_id): + # FIXME: this should be a lookup - iteration won't scale + for address_record in self.describe_addresses(type=PublicNetwork): + if address_record.get(u'instance_id', 'free') == instance_id: + return address_record[u'address'] + + def get_users_network(self, user_id): + """ get a user's private network, allocating one if needed """ + + user = self.manager.get_user(user_id) + if not user: + raise Exception("User %s doesn't exist, uhoh." % user_id) + usernet = self.get_network_from_name("%s-default" % user_id) + if not usernet: + pool = self.vlan_pool + vlan = pool.next(user_id) + private_pool = NetworkPool() + network_str = private_pool.get_from_vlan(vlan) + logging.debug("Constructing network %s and %s for %s" % (network_str, vlan, user_id)) + usernet = PrivateNetwork( + network=network_str, + vlan=vlan) + _get_keeper()["%s-default" % user_id] = usernet.to_dict() + _get_keeper()['vlans'] = pool.to_dict() + return usernet + + def allocate_address(self, user_id, mac=None, type=PrivateNetwork): + ip = None + net_name = None + if type == PrivateNetwork: + net = self.get_users_network(user_id) + ip = net.allocate_ip(user_id, mac) + net_name = net.name + _get_keeper()["%s-default" % user_id] = net.to_dict() + else: + net = self.public_net + ip = net.allocate_ip(user_id, mac) + net_name = net.name + _get_keeper()['public'] = net.to_dict() + return (ip, net_name) + + def deallocate_address(self, address): + if address in self.public_net.network: + net = self.public_net + rv = net.deallocate_ip(str(address)) + _get_keeper()['public'] = net.to_dict() + return rv + for user in self.manager.get_users(): + if address in self.get_users_network(user.id).network: + net = self.get_users_network(user.id) + rv = net.deallocate_ip(str(address)) + _get_keeper()["%s-default" % user.id] = net.to_dict() + return rv + raise exception.AddressNotAllocated() + + def describe_addresses(self, type=PrivateNetwork): + if type == PrivateNetwork: + addresses = [] + for user in self.manager.get_users(): + addresses.extend(self.get_users_network(user.id).list_addresses()) + return addresses + return self.public_net.list_addresses() + + def associate_address(self, address, private_ip, instance_id): + net = self.public_net + rv = net.associate_address(address, private_ip, instance_id) + _get_keeper()['public'] = net.to_dict() + return rv + + def disassociate_address(self, address): + net = self.public_net + rv = net.disassociate_address(address) + _get_keeper()['public'] = net.to_dict() + return rv + + def express(self,address=None): + for user in self.manager.get_users(): + self.get_users_network(user.id).express() + + def report_state(self): + pass + diff --git a/nova/compute/node.py b/nova/compute/node.py new file mode 100644 index 000000000..a4de0f98a --- /dev/null +++ b/nova/compute/node.py @@ -0,0 +1,549 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Compute Node: + + Runs on each compute node, managing the + hypervisor using libvirt. + +""" + +import base64 +import json +import logging +import os +import random +import shutil +import sys + +from nova import vendor +from twisted.internet import defer +from twisted.internet import task +from twisted.application import service + +try: + import libvirt +except Exception, err: + logging.warning('no libvirt found') + +from nova import exception +from nova import fakevirt +from nova import flags +from nova import process +from nova import utils +from nova.compute import disk +from nova.compute import model +from nova.compute import network +from nova.objectstore import image # for image_path flag + +FLAGS = flags.FLAGS +flags.DEFINE_string('libvirt_xml_template', + utils.abspath('compute/libvirt.xml.template'), + 'Network XML Template') +flags.DEFINE_bool('use_s3', True, + 'whether to get images from s3 or use local copy') +flags.DEFINE_string('instances_path', utils.abspath('../instances'), + 'where instances are stored on disk') +flags.DEFINE_string('instances_prefix', 'compute-', + 'prefix for keepers for instances') + +INSTANCE_TYPES = {} +INSTANCE_TYPES['m1.tiny'] = {'memory_mb': 512, 'vcpus': 1, 'local_gb': 0} +INSTANCE_TYPES['m1.small'] = {'memory_mb': 1024, 'vcpus': 1, 'local_gb': 10} +INSTANCE_TYPES['m1.medium'] = {'memory_mb': 2048, 'vcpus': 2, 'local_gb': 10} +INSTANCE_TYPES['m1.large'] = {'memory_mb': 4096, 'vcpus': 4, 'local_gb': 10} +INSTANCE_TYPES['m1.xlarge'] = {'memory_mb': 8192, 'vcpus': 4, 'local_gb': 10} +INSTANCE_TYPES['c1.medium'] = {'memory_mb': 2048, 'vcpus': 4, 'local_gb': 10} + +# The number of processes to start in our process pool +# TODO(termie): this should probably be a flag and the pool should probably +# be a singleton +PROCESS_POOL_SIZE = 4 + + +class Node(object, service.Service): + """ + Manages the running instances. + """ + def __init__(self): + """ load configuration options for this node and connect to libvirt """ + super(Node, self).__init__() + self._instances = {} + self._conn = self._get_connection() + self._pool = process.Pool(PROCESS_POOL_SIZE) + self.instdir = model.InstanceDirectory() + # TODO(joshua): This needs to ensure system state, specifically: modprobe aoe + + def _get_connection(self): + """ returns a libvirt connection object """ + # TODO(termie): maybe lazy load after initial check for permissions + # TODO(termie): check whether we can be disconnected + if FLAGS.fake_libvirt: + conn = fakevirt.FakeVirtConnection.instance() + else: + auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_NOECHOPROMPT], + 'root', + None] + conn = libvirt.openAuth('qemu:///system', auth, 0) + if conn == None: + logging.error('Failed to open connection to the hypervisor') + sys.exit(1) + return conn + + def noop(self): + """ simple test of an AMQP message call """ + return defer.succeed('PONG') + + def get_instance(self, instance_id): + # inst = self.instdir.get(instance_id) + # return inst + if self.instdir.exists(instance_id): + return Instance.fromName(self._conn, self._pool, instance_id) + return None + + @exception.wrap_exception + def adopt_instances(self): + """ if there are instances already running, adopt them """ + return defer.succeed(0) + instance_names = [self._conn.lookupByID(x).name() + for x in self._conn.listDomainsID()] + for name in instance_names: + try: + new_inst = Instance.fromName(self._conn, self._pool, name) + new_inst.update_state() + except: + pass + return defer.succeed(len(self._instances)) + + @exception.wrap_exception + def describe_instances(self): + retval = {} + for inst in self.instdir.by_node(FLAGS.node_name): + retval[inst['instance_id']] = (Instance.fromName(self._conn, self._pool, inst['instance_id'])) + return retval + + @defer.inlineCallbacks + def report_state(self): + logging.debug("Reporting State") + return + + @exception.wrap_exception + def run_instance(self, instance_id, **_kwargs): + """ launch a new instance with specified options """ + logging.debug("Starting instance %s..." % (instance_id)) + inst = self.instdir.get(instance_id) + inst['node_name'] = FLAGS.node_name + inst.save() + # TODO(vish) check to make sure the availability zone matches + new_inst = Instance(self._conn, name=instance_id, + pool=self._pool, data=inst) + if new_inst.is_running(): + raise exception.Error("Instance is already running") + d = new_inst.spawn() + return d + + @exception.wrap_exception + def terminate_instance(self, instance_id): + """ terminate an instance on this machine """ + logging.debug("Got told to terminate instance %s" % instance_id) + instance = self.get_instance(instance_id) + # inst = self.instdir.get(instance_id) + if not instance: + raise exception.Error( + 'trying to terminate unknown instance: %s' % instance_id) + d = instance.destroy() + # d.addCallback(lambda x: inst.destroy()) + return d + + @exception.wrap_exception + def reboot_instance(self, instance_id): + """ reboot an instance on this server + KVM doesn't support reboot, so we terminate and restart """ + instance = self.get_instance(instance_id) + if not instance: + raise exception.Error( + 'trying to reboot unknown instance: %s' % instance_id) + return instance.reboot() + + @defer.inlineCallbacks + @exception.wrap_exception + def get_console_output(self, instance_id): + """ send the console output for an instance """ + logging.debug("Getting console output for %s" % (instance_id)) + inst = self.instdir.get(instance_id) + instance = self.get_instance(instance_id) + if not instance: + raise exception.Error( + 'trying to get console log for unknown: %s' % instance_id) + rv = yield instance.console_output() + # TODO(termie): this stuff belongs in the API layer, no need to + # munge the data we send to ourselves + output = {"InstanceId" : instance_id, + "Timestamp" : "2", + "output" : base64.b64encode(rv)} + defer.returnValue(output) + + @defer.inlineCallbacks + @exception.wrap_exception + def attach_volume(self, instance_id = None, + aoe_device = None, mountpoint = None): + utils.runthis("Attached Volume: %s", + "sudo virsh attach-disk %s /dev/etherd/%s %s" + % (instance_id, aoe_device, mountpoint.split("/")[-1])) + return defer.succeed(True) + + def _init_aoe(self): + utils.runthis("Doin an AoE discover, returns %s", "sudo aoe-discover") + utils.runthis("Doin an AoE stat, returns %s", "sudo aoe-stat") + + @exception.wrap_exception + def detach_volume(self, instance_id, mountpoint): + """ detach a volume from an instance """ + # despite the documentation, virsh detach-disk just wants the device + # name without the leading /dev/ + target = mountpoint.rpartition('/dev/')[2] + utils.runthis("Detached Volume: %s", "sudo virsh detach-disk %s %s " + % (instance_id, target)) + return defer.succeed(True) + + +class Group(object): + def __init__(self, group_id): + self.group_id = group_id + + +class ProductCode(object): + def __init__(self, product_code): + self.product_code = product_code + + +def _create_image(data, libvirt_xml): + """ create libvirt.xml and copy files into instance path """ + def basepath(path=''): + return os.path.abspath(os.path.join(data['basepath'], path)) + + def imagepath(path=''): + return os.path.join(FLAGS.images_path, path) + + def image_url(path): + return "%s:%s/_images/%s" % (FLAGS.s3_host, FLAGS.s3_port, path) + + logging.info(basepath('disk')) + try: + os.makedirs(data['basepath']) + os.chmod(data['basepath'], 0777) + except OSError: + # TODO: there is already an instance with this name, do something + pass + try: + logging.info('Creating image for: %s', data['instance_id']) + f = open(basepath('libvirt.xml'), 'w') + f.write(libvirt_xml) + f.close() + if not FLAGS.fake_libvirt: + if FLAGS.use_s3: + if not os.path.exists(basepath('disk')): + utils.fetchfile(image_url("%s/image" % data['image_id']), + basepath('disk-raw')) + if not os.path.exists(basepath('kernel')): + utils.fetchfile(image_url("%s/image" % data['kernel_id']), + basepath('kernel')) + if not os.path.exists(basepath('ramdisk')): + utils.fetchfile(image_url("%s/image" % data['ramdisk_id']), + basepath('ramdisk')) + else: + if not os.path.exists(basepath('disk')): + shutil.copyfile(imagepath("%s/image" % data['image_id']), + basepath('disk-raw')) + if not os.path.exists(basepath('kernel')): + shutil.copyfile(imagepath("%s/image" % data['kernel_id']), + basepath('kernel')) + if not os.path.exists(basepath('ramdisk')): + shutil.copyfile(imagepath("%s/image" % + data['ramdisk_id']), + basepath('ramdisk')) + if data['key_data']: + logging.info('Injecting key data into image %s' % + data['image_id']) + disk.inject_key(data['key_data'], basepath('disk-raw')) + if os.path.exists(basepath('disk')): + os.remove(basepath('disk')) + bytes = INSTANCE_TYPES[data['instance_type']]['local_gb'] * 1024 * 1024 * 1024 + disk.partition(basepath('disk-raw'), basepath('disk'), bytes) + logging.info('Done create image for: %s', data['instance_id']) + except Exception as ex: + return {'exception': ex} + + +class Instance(object): + + NOSTATE = 0x00 + RUNNING = 0x01 + BLOCKED = 0x02 + PAUSED = 0x03 + SHUTDOWN = 0x04 + SHUTOFF = 0x05 + CRASHED = 0x06 + + def is_pending(self): + return (self.state == Instance.NOSTATE or self.state == 'pending') + + def is_destroyed(self): + return self.state == Instance.SHUTOFF + + def is_running(self): + logging.debug("Instance state is: %s" % self.state) + return (self.state == Instance.RUNNING or self.state == 'running') + + def __init__(self, conn, pool, name, data): + # TODO(termie): pool should probably be a singleton instead of being passed + # here and in the classmethods + """ spawn an instance with a given name """ + # TODO(termie): pool should probably be a singleton instead of being passed + # here and in the classmethods + self._pool = pool + self._conn = conn + self.datamodel = data + print data + + # NOTE(termie): to be passed to multiprocess self._s must be + # pickle-able by cPickle + self._s = {} + + # TODO(termie): is instance_type that actual name for this? + size = data.get('instance_type', FLAGS.default_instance_type) + if size not in INSTANCE_TYPES: + raise exception.Error('invalid instance type: %s' % size) + + self._s.update(INSTANCE_TYPES[size]) + + self._s['name'] = name + self._s['instance_id'] = name + self._s['instance_type'] = size + self._s['mac_address'] = data.get( + 'mac_address', 'df:df:df:df:df:df') + self._s['basepath'] = data.get( + 'basepath', os.path.abspath( + os.path.join(FLAGS.instances_path, self.name))) + self._s['memory_kb'] = int(self._s['memory_mb']) * 1024 + # TODO(joshua) - Get this from network directory controller later + self._s['bridge_name'] = data.get('bridge_name', 'br0') + self._s['image_id'] = data.get('image_id', FLAGS.default_image) + self._s['kernel_id'] = data.get('kernel_id', FLAGS.default_kernel) + self._s['ramdisk_id'] = data.get('ramdisk_id', FLAGS.default_ramdisk) + self._s['owner_id'] = data.get('owner_id', '') + self._s['node_name'] = data.get('node_name', '') + self._s['user_data'] = data.get('user_data', '') + self._s['ami_launch_index'] = data.get('ami_launch_index', None) + self._s['launch_time'] = data.get('launch_time', None) + self._s['reservation_id'] = data.get('reservation_id', None) + # self._s['state'] = Instance.NOSTATE + self._s['state'] = data.get('state', Instance.NOSTATE) + + self._s['key_data'] = data.get('key_data', None) + + # TODO: we may not need to save the next few + self._s['groups'] = data.get('security_group', ['default']) + self._s['product_codes'] = data.get('product_code', []) + self._s['key_name'] = data.get('key_name', None) + self._s['addressing_type'] = data.get('addressing_type', None) + self._s['availability_zone'] = data.get('availability_zone', 'fixme') + + #TODO: put real dns items here + self._s['private_dns_name'] = data.get('private_dns_name', 'fixme') + self._s['dns_name'] = data.get('dns_name', + self._s['private_dns_name']) + logging.debug("Finished init of Instance with id of %s" % name) + + def toXml(self): + # TODO(termie): cache? + logging.debug("Starting the toXML method") + libvirt_xml = open(FLAGS.libvirt_xml_template).read() + xml_info = self._s.copy() + #xml_info.update(self._s) + + # TODO(termie): lazy lazy hack because xml is annoying + xml_info['nova'] = json.dumps(self._s) + libvirt_xml = libvirt_xml % xml_info + logging.debug("Finished the toXML method") + + return libvirt_xml + + @classmethod + def fromName(cls, conn, pool, name): + """ use the saved data for reloading the instance """ + # if FLAGS.fake_libvirt: + # raise Exception('this is a bit useless, eh?') + + instdir = model.InstanceDirectory() + instance = instdir.get(name) + return cls(conn=conn, pool=pool, name=name, data=instance) + + @property + def state(self): + return self._s['state'] + + @property + def name(self): + return self._s['name'] + + def describe(self): + return self._s + + def info(self): + logging.debug("Getting info for dom %s" % self.name) + virt_dom = self._conn.lookupByName(self.name) + (state, max_mem, mem, num_cpu, cpu_time) = virt_dom.info() + return {'state': state, + 'max_mem': max_mem, + 'mem': mem, + 'num_cpu': num_cpu, + 'cpu_time': cpu_time} + + def update_state(self): + info = self.info() + self._s['state'] = info['state'] + self.datamodel['state'] = info['state'] + self.datamodel['node_name'] = FLAGS.node_name + self.datamodel.save() + + @exception.wrap_exception + def destroy(self): + if self.is_destroyed(): + self.datamodel.destroy() + raise exception.Error('trying to destroy already destroyed' + ' instance: %s' % self.name) + + self._s['state'] = Instance.SHUTDOWN + self.datamodel['state'] = 'shutting_down' + self.datamodel.save() + try: + virt_dom = self._conn.lookupByName(self.name) + virt_dom.destroy() + except Exception, _err: + pass + # If the instance is already terminated, we're still happy + d = defer.Deferred() + d.addCallback(lambda x: self.datamodel.destroy()) + # TODO(termie): short-circuit me for tests + # WE'LL save this for when we do shutdown, + # instead of destroy - but destroy returns immediately + timer = task.LoopingCall(f=None) + def _wait_for_shutdown(): + try: + info = self.info() + if info['state'] == Instance.SHUTDOWN: + self._s['state'] = Instance.SHUTDOWN + #self.datamodel['state'] = 'shutdown' + #self.datamodel.save() + timer.stop() + d.callback(None) + except Exception: + self._s['state'] = Instance.SHUTDOWN + timer.stop() + d.callback(None) + timer.f = _wait_for_shutdown + timer.start(interval=0.5, now=True) + return d + + @defer.inlineCallbacks + @exception.wrap_exception + def reboot(self): + # if not self.is_running(): + # raise exception.Error( + # 'trying to reboot a non-running' + # 'instance: %s (state: %s)' % (self.name, self.state)) + + yield self._conn.lookupByName(self.name).destroy() + self.datamodel['state'] = 'rebooting' + self.datamodel.save() + self._s['state'] = Instance.NOSTATE + self._conn.createXML(self.toXml(), 0) + # TODO(termie): this should actually register a callback to check + # for successful boot + self.datamodel['state'] = 'running' + self.datamodel.save() + self._s['state'] = Instance.RUNNING + logging.debug('rebooted instance %s' % self.name) + defer.returnValue(None) + + @exception.wrap_exception + def spawn(self): + self.datamodel['state'] = "spawning" + self.datamodel.save() + logging.debug("Starting spawn in Instance") + xml = self.toXml() + def _launch(retvals): + self.datamodel['state'] = 'launching' + self.datamodel.save() + try: + logging.debug("Arrived in _launch") + if retvals and 'exception' in retvals: + raise retvals['exception'] + self._conn.createXML(self.toXml(), 0) + # TODO(termie): this should actually register + # a callback to check for successful boot + self._s['state'] = Instance.RUNNING + self.datamodel['state'] = 'running' + self.datamodel.save() + logging.debug("Instance is running") + except Exception as ex: + logging.debug(ex) + self.datamodel['state'] = 'shutdown' + self.datamodel.save() + #return self + + d = self._pool.apply(_create_image, self._s, xml) + d.addCallback(_launch) + return d + + @exception.wrap_exception + def console_output(self): + if not FLAGS.fake_libvirt: + fname = os.path.abspath( + os.path.join(self._s['basepath'], 'console.log')) + with open(fname, 'r') as f: + console = f.read() + else: + console = 'FAKE CONSOLE OUTPUT' + return defer.succeed(console) + + def generate_mac(self): + mac = [0x00, 0x16, 0x3e, random.randint(0x00, 0x7f), + random.randint(0x00, 0xff), random.randint(0x00, 0xff) + ] + return ':'.join(map(lambda x: "%02x" % x, mac)) + + + +class NetworkNode(Node): + def __init__(self, **kwargs): + super(NetworkNode, self).__init__(**kwargs) + self.virtNets = {} + + def add_network(self, net_dict): + net = network.VirtNetwork(**net_dict) + self.virtNets[net.name] = net + self.virtNets[net.name].express() + return defer.succeed({'retval': 'network added'}) + + @exception.wrap_exception + def run_instance(self, instance_id, **kwargs): + inst = self.instdir.get(instance_id) + net_dict = json.loads(inst.get('network_str', "{}")) + self.add_network(net_dict) + return super(NetworkNode, self).run_instance(instance_id, **kwargs) + diff --git a/nova/crypto.py b/nova/crypto.py new file mode 100644 index 000000000..6add55ee5 --- /dev/null +++ b/nova/crypto.py @@ -0,0 +1,224 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Wrappers around standard crypto, including root and intermediate CAs, +SSH keypairs and x509 certificates. +""" + +import hashlib +import logging +import os +import shutil +import tempfile +import time +import utils + +from nova import vendor +import M2Crypto + +from nova import exception +from nova import flags + + +FLAGS = flags.FLAGS +flags.DEFINE_string('ca_file', 'cacert.pem', 'Filename of root CA') +flags.DEFINE_string('keys_path', utils.abspath('../keys'), 'Where we keep our keys') +flags.DEFINE_string('ca_path', utils.abspath('../CA'), 'Where we keep our root CA') +flags.DEFINE_boolean('use_intermediate_ca', False, 'Should we use intermediate CAs for each project?') + + +def ca_path(username): + if username: + return "%s/INTER/%s/cacert.pem" % (FLAGS.ca_path, username) + return "%s/cacert.pem" % (FLAGS.ca_path) + +def fetch_ca(username=None, chain=True): + if not FLAGS.use_intermediate_ca: + username = None + buffer = "" + if username: + with open(ca_path(username),"r") as cafile: + buffer += cafile.read() + if username and not chain: + return buffer + with open(ca_path(None),"r") as cafile: + buffer += cafile.read() + return buffer + +def generate_key_pair(bits=1024): + # what is the magic 65537? + + tmpdir = tempfile.mkdtemp() + keyfile = os.path.join(tmpdir, 'temp') + utils.execute('ssh-keygen -q -b %d -N "" -f %s' % (bits, keyfile)) + (out, err) = utils.execute('ssh-keygen -q -l -f %s.pub' % (keyfile)) + fingerprint = out.split(' ')[1] + private_key = open(keyfile).read() + public_key = open(keyfile + '.pub').read() + + shutil.rmtree(tmpdir) + # code below returns public key in pem format + # key = M2Crypto.RSA.gen_key(bits, 65537, callback=lambda: None) + # private_key = key.as_pem(cipher=None) + # bio = M2Crypto.BIO.MemoryBuffer() + # key.save_pub_key_bio(bio) + # public_key = bio.read() + # public_key, err = execute('ssh-keygen -y -f /dev/stdin', private_key) + + return (private_key, public_key, fingerprint) + + +def ssl_pub_to_ssh_pub(ssl_public_key, name='root', suffix='nova'): + """requires lsh-utils""" + convert="sed -e'1d' -e'$d' | pkcs1-conv --public-key-info --base-64 |" \ + + " sexp-conv | sed -e'1s/(rsa-pkcs1/(rsa-pkcs1-sha1/' | sexp-conv -s" \ + + " transport | lsh-export-key --openssh" + (out, err) = utils.execute(convert, ssl_public_key) + if err: + raise exception.Error("Failed to generate key: %s", err) + return '%s %s@%s\n' %(out.strip(), name, suffix) + + +def generate_x509_cert(subject="/C=US/ST=California/L=The Mission/O=CloudFed/OU=NOVA/CN=foo", bits=1024): + tmpdir = tempfile.mkdtemp() + keyfile = os.path.abspath(os.path.join(tmpdir, 'temp.key')) + csrfile = os.path.join(tmpdir, 'temp.csr') + logging.debug("openssl genrsa -out %s %s" % (keyfile, bits)) + utils.runthis("Generating private key: %s", "openssl genrsa -out %s %s" % (keyfile, bits)) + utils.runthis("Generating CSR: %s", "openssl req -new -key %s -out %s -batch -subj %s" % (keyfile, csrfile, subject)) + private_key = open(keyfile).read() + csr = open(csrfile).read() + shutil.rmtree(tmpdir) + return (private_key, csr) + + +def sign_csr(csr_text, intermediate=None): + if not FLAGS.use_intermediate_ca: + intermediate = None + if not intermediate: + return _sign_csr(csr_text, FLAGS.ca_path) + user_ca = "%s/INTER/%s" % (FLAGS.ca_path, intermediate) + if not os.path.exists(user_ca): + start = os.getcwd() + os.chdir(FLAGS.ca_path) + utils.runthis("Generating intermediate CA: %s", "sh geninter.sh %s" % (intermediate)) + os.chdir(start) + return _sign_csr(csr_text, user_ca) + + +def _sign_csr(csr_text, ca_folder): + tmpfolder = tempfile.mkdtemp() + csrfile = open("%s/inbound.csr" % (tmpfolder), "w") + csrfile.write(csr_text) + csrfile.close() + logging.debug("Flags path: %s" % ca_folder) + start = os.getcwd() + # Change working dir to CA + os.chdir(ca_folder) + utils.runthis("Signing cert: %s", "openssl ca -batch -out %s/outbound.crt -config ./openssl.cnf -infiles %s/inbound.csr" % (tmpfolder, tmpfolder)) + os.chdir(start) + with open("%s/outbound.crt" % (tmpfolder), "r") as crtfile: + return crtfile.read() + + +def mkreq(bits, subject="foo", ca=0): + pk = M2Crypto.EVP.PKey() + req = M2Crypto.X509.Request() + rsa = M2Crypto.RSA.gen_key(bits, 65537, callback=lambda: None) + pk.assign_rsa(rsa) + rsa = None # should not be freed here + req.set_pubkey(pk) + req.set_subject(subject) + req.sign(pk,'sha512') + assert req.verify(pk) + pk2 = req.get_pubkey() + assert req.verify(pk2) + return req, pk + + +def mkcacert(subject='nova', years=1): + req, pk = mkreq(2048, subject, ca=1) + pkey = req.get_pubkey() + sub = req.get_subject() + cert = M2Crypto.X509.X509() + cert.set_serial_number(1) + cert.set_version(2) + cert.set_subject(sub) # FIXME subject is not set in mkreq yet + t = long(time.time()) + time.timezone + now = M2Crypto.ASN1.ASN1_UTCTIME() + now.set_time(t) + nowPlusYear = M2Crypto.ASN1.ASN1_UTCTIME() + nowPlusYear.set_time(t + (years * 60 * 60 * 24 * 365)) + cert.set_not_before(now) + cert.set_not_after(nowPlusYear) + issuer = M2Crypto.X509.X509_Name() + issuer.C = "US" + issuer.CN = subject + cert.set_issuer(issuer) + cert.set_pubkey(pkey) + ext = M2Crypto.X509.new_extension('basicConstraints', 'CA:TRUE') + cert.add_ext(ext) + cert.sign(pk, 'sha512') + + # print 'cert', dir(cert) + print cert.as_pem() + print pk.get_rsa().as_pem() + + return cert, pk, pkey + + + +# Copyright (c) 2006-2009 Mitch Garnaat http://garnaat.org/ +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, dis- +# tribute, sublicense, and/or sell copies of the Software, and to permit +# persons to whom the Software is furnished to do so, subject to the fol- +# lowing conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- +# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# http://code.google.com/p/boto + +def compute_md5(fp): + """ + @type fp: file + @param fp: File pointer to the file to MD5 hash. The file pointer will be + reset to the beginning of the file before the method returns. + + @rtype: tuple + @return: the hex digest version of the MD5 hash + """ + m = hashlib.md5() + fp.seek(0) + s = fp.read(8192) + while s: + m.update(s) + s = fp.read(8192) + hex_md5 = m.hexdigest() + # size = fp.tell() + fp.seek(0) + return hex_md5 diff --git a/nova/datastore.py b/nova/datastore.py new file mode 100644 index 000000000..57940d98b --- /dev/null +++ b/nova/datastore.py @@ -0,0 +1,367 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Datastore: + +Providers the Keeper class, a simple pseudo-dictionary that +persists on disk. + +MAKE Sure that ReDIS is running, and your flags are set properly, +before trying to run this. +""" + +import json +import logging +import os +import sqlite3 + +from nova import vendor +import redis + +from nova import flags +from nova import utils + + +FLAGS = flags.FLAGS +flags.DEFINE_string('datastore_path', utils.abspath('../keeper'), + 'where keys are stored on disk') +flags.DEFINE_string('redis_host', '127.0.0.1', + 'Host that redis is running on.') +flags.DEFINE_integer('redis_port', 6379, + 'Port that redis is running on.') +flags.DEFINE_integer('redis_db', 0, 'Multiple DB keeps tests away') +flags.DEFINE_string('keeper_backend', 'redis', + 'which backend to use for keeper') + + +class Redis(object): + def __init__(self): + if hasattr(self.__class__, '_instance'): + raise Exception('Attempted to instantiate singleton') + + @classmethod + def instance(cls): + if not hasattr(cls, '_instance'): + inst = redis.Redis(host=FLAGS.redis_host, port=FLAGS.redis_port, db=FLAGS.redis_db) + cls._instance = inst + return cls._instance + + +class RedisModel(object): + """ Wrapper around redis-backed properties """ + object_type = 'generic' + def __init__(self, object_id): + """ loads an object from the datastore if exists """ + self.object_id = object_id + self.initial_state = {} + self.state = Redis.instance().hgetall(self.__redis_key) + if self.state: + self.initial_state = self.state + else: + self.set_default_state() + + def set_default_state(self): + self.state = {'state' : 'pending'} + self.state[self.object_type+"_id"] = self.object_id + + @property + def __redis_key(self): + """ Magic string for instance keys """ + return '%s:%s' % (self.object_type, self.object_id) + + def __repr__(self): + return "<%s:%s>" % (self.object_type, self.object_id) + + def __str__(self): + return str(self.state) + + def keys(self): + return self.state.keys() + + def copy(self): + copyDict = {} + for item in self.keys(): + copyDict[item] = self[item] + return copyDict + + def get(self, item, default): + return self.state.get(item, default) + + def __getitem__(self, item): + return self.state[item] + + def __setitem__(self, item, val): + self.state[item] = val + return self.state[item] + + def __delitem__(self, item): + """ We don't support this """ + raise Exception("Silly monkey, we NEED all our properties.") + + def save(self): + """ update the directory with the state from this instance """ + # TODO(ja): implement hmset in redis-py and use it + # instead of multiple calls to hset + for key, val in self.state.iteritems(): + # if (not self.initial_state.has_key(key) + # or self.initial_state[key] != val): + Redis.instance().hset(self.__redis_key, key, val) + if self.initial_state == {}: + self.first_save() + self.initial_state = self.state + return True + + def first_save(self): + pass + + def destroy(self): + """ deletes all related records from datastore. + does NOT do anything to running state. + """ + Redis.instance().delete(self.__redis_key) + return True + + +def slugify(key, prefix=None): + """ + Key has to be a valid filename. Slugify solves that. + """ + return "%s%s" % (prefix, key) + + +class SqliteKeeper(object): + """ Keeper implementation in SQLite, mostly for in-memory testing """ + _conn = {} # class variable + + def __init__(self, prefix): + self.prefix = prefix + + @property + def conn(self): + if self.prefix not in self.__class__._conn: + logging.debug('no sqlite connection (%s), making new', self.prefix) + if FLAGS.datastore_path != ':memory:': + try: + os.mkdir(FLAGS.datastore_path) + except Exception: + pass + conn = sqlite3.connect(os.path.join( + FLAGS.datastore_path, '%s.sqlite' % self.prefix)) + else: + conn = sqlite3.connect(':memory:') + + c = conn.cursor() + try: + c.execute('''CREATE TABLE data (item text, value text)''') + conn.commit() + except Exception: + logging.exception('create table failed') + finally: + c.close() + + self.__class__._conn[self.prefix] = conn + + return self.__class__._conn[self.prefix] + + def __delitem__(self, item): + #logging.debug('sqlite deleting %s', item) + c = self.conn.cursor() + try: + c.execute('DELETE FROM data WHERE item = ?', (item, )) + self.conn.commit() + except Exception: + logging.exception('delete failed: %s', item) + finally: + c.close() + + def __getitem__(self, item): + #logging.debug('sqlite getting %s', item) + result = None + c = self.conn.cursor() + try: + c.execute('SELECT value FROM data WHERE item = ?', (item, )) + row = c.fetchone() + if row: + result = json.loads(row[0]) + else: + result = None + except Exception: + logging.exception('select failed: %s', item) + finally: + c.close() + #logging.debug('sqlite got %s: %s', item, result) + return result + + def __setitem__(self, item, value): + serialized_value = json.dumps(value) + insert = True + if self[item] is not None: + insert = False + #logging.debug('sqlite insert %s: %s', item, value) + c = self.conn.cursor() + try: + if insert: + c.execute('INSERT INTO data VALUES (?, ?)', + (item, serialized_value)) + else: + c.execute('UPDATE data SET item=?, value=? WHERE item = ?', + (item, serialized_value, item)) + + self.conn.commit() + except Exception: + logging.exception('select failed: %s', item) + finally: + c.close() + + def clear(self): + if self.prefix not in self.__class__._conn: + return + self.conn.close() + if FLAGS.datastore_path != ':memory:': + os.unlink(os.path.join(FLAGS.datastore_path, '%s.sqlite' % self.prefix)) + del self.__class__._conn[self.prefix] + + def clear_all(self): + for k, conn in self.__class__._conn.iteritems(): + conn.close() + if FLAGS.datastore_path != ':memory:': + os.unlink(os.path.join(FLAGS.datastore_path, + '%s.sqlite' % self.prefix)) + self.__class__._conn = {} + + + def set_add(self, item, value): + group = self[item] + if not group: + group = [] + group.append(value) + self[item] = group + + def set_is_member(self, item, value): + group = self[item] + if not group: + return False + return value in group + + def set_remove(self, item, value): + group = self[item] + if not group: + group = [] + group.remove(value) + self[item] = group + + def set_fetch(self, item): + # TODO(termie): I don't really know what set_fetch is supposed to do + group = self[item] + if not group: + group = [] + return iter(group) + +class JsonKeeper(object): + """ + Simple dictionary class that persists using + JSON in files saved to disk. + """ + def __init__(self, prefix): + self.prefix = prefix + + def __delitem__(self, item): + """ + Removing a key means deleting a file from disk. + """ + item = slugify(item, self.prefix) + path = "%s/%s" % (FLAGS.datastore_path, item) + if os.path.isfile(path): + os.remove(path) + + def __getitem__(self, item): + """ + Fetch file contents and dejsonify them. + """ + item = slugify(item, self.prefix) + path = "%s/%s" % (FLAGS.datastore_path, item) + if os.path.isfile(path): + return json.load(open(path, 'r')) + return None + + def __setitem__(self, item, value): + """ + JSON encode value and save to file. + """ + item = slugify(item, self.prefix) + path = "%s/%s" % (FLAGS.datastore_path, item) + with open(path, "w") as blobfile: + blobfile.write(json.dumps(value)) + return value + + +class RedisKeeper(object): + """ + Simple dictionary class that persists using + ReDIS. + """ + def __init__(self, prefix="redis-"): + self.prefix = prefix + Redis.instance().ping() + + def __setitem__(self, item, value): + """ + JSON encode value and save to file. + """ + item = slugify(item, self.prefix) + Redis.instance().set(item, json.dumps(value)) + return value + + def __getitem__(self, item): + item = slugify(item, self.prefix) + value = Redis.instance().get(item) + if value: + return json.loads(value) + + def __delitem__(self, item): + item = slugify(item, self.prefix) + return Redis.instance().delete(item) + + def clear(self): + raise NotImplementedError() + + def clear_all(self): + raise NotImplementedError() + + def set_add(self, item, value): + item = slugify(item, self.prefix) + return Redis.instance().sadd(item, json.dumps(value)) + + def set_is_member(self, item, value): + item = slugify(item, self.prefix) + return Redis.instance().sismember(item, json.dumps(value)) + + def set_remove(self, item, value): + item = slugify(item, self.prefix) + return Redis.instance().srem(item, json.dumps(value)) + + def set_fetch(self, item): + item = slugify(item, self.prefix) + for obj in Redis.instance().sinter([item]): + yield json.loads(obj) + + +def Keeper(prefix=''): + KEEPERS = {'redis': RedisKeeper, + 'sqlite': SqliteKeeper} + return KEEPERS[FLAGS.keeper_backend](prefix) + diff --git a/nova/endpoint/__init__.py b/nova/endpoint/__init__.py new file mode 100644 index 000000000..dbf15d259 --- /dev/null +++ b/nova/endpoint/__init__.py @@ -0,0 +1,28 @@ +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +:mod:`nova.endpoint` -- Main NOVA Api endpoints +===================================================== + +.. automodule:: nova.endpoint + :platform: Unix + :synopsis: REST APIs for all nova functions +.. moduleauthor:: Jesse Andrews <jesse@ansolabs.com> +.. moduleauthor:: Devin Carlen <devin.carlen@gmail.com> +.. moduleauthor:: Vishvananda Ishaya <vishvananda@yahoo.com> +.. moduleauthor:: Joshua McKenty <joshua@cognition.ca> +.. moduleauthor:: Manish Singh <yosh@gimp.org> +.. moduleauthor:: Andy Smith <andy@anarkystic.com> +"""
\ No newline at end of file diff --git a/nova/endpoint/admin.py b/nova/endpoint/admin.py new file mode 100644 index 000000000..e9880acc5 --- /dev/null +++ b/nova/endpoint/admin.py @@ -0,0 +1,131 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Admin API controller, exposed through http via the api worker. +""" + +import base64 + +def user_dict(user, base64_file=None): + """Convert the user object to a result dict""" + if user: + return { + 'username': user.id, + 'accesskey': user.access, + 'secretkey': user.secret, + 'file': base64_file, + } + else: + return {} + +def node_dict(node): + """Convert a node object to a result dict""" + if node: + return { + 'node_id': node.id, + 'workers': ", ".join(node.workers), + 'disks': ", ".join(node.disks), + 'ram': node.memory, + 'load_average' : node.load_average, + } + else: + return {} + +def admin_only(target): + """Decorator for admin-only API calls""" + def wrapper(*args, **kwargs): + """Internal wrapper method for admin-only API calls""" + context = args[1] + if context.user.is_admin(): + return target(*args, **kwargs) + else: + return {} + + return wrapper + +class AdminController(object): + """ + API Controller for users, node status, and worker mgmt. + Trivial admin_only wrapper will be replaced with RBAC, + allowing project managers to administer project users. + + """ + def __init__(self, user_manager, node_manager=None): + self.user_manager = user_manager + self.node_manager = node_manager + + def __str__(self): + return 'AdminController' + + @admin_only + def describe_user(self, _context, name, **_kwargs): + """Returns user data, including access and secret keys. + """ + return user_dict(self.user_manager.get_user(name)) + + @admin_only + def describe_users(self, _context, **_kwargs): + """Returns all users - should be changed to deal with a list. + """ + return {'userSet': + [user_dict(u) for u in self.user_manager.get_users()] } + + @admin_only + def register_user(self, _context, name, **_kwargs): + """ Creates a new user, and returns generated credentials. + """ + self.user_manager.create_user(name) + + return user_dict(self.user_manager.get_user(name)) + + @admin_only + def deregister_user(self, _context, name, **_kwargs): + """Deletes a single user (NOT undoable.) + Should throw an exception if the user has instances, + volumes, or buckets remaining. + """ + self.user_manager.delete_user(name) + + return True + + @admin_only + def generate_x509_for_user(self, _context, name, **_kwargs): + """Generates and returns an x509 certificate for a single user. + Is usually called from a client that will wrap this with + access and secret key info, and return a zip file. + """ + user = self.user_manager.get_user(name) + return user_dict(user, base64.b64encode(user.get_credentials())) + + @admin_only + def describe_nodes(self, _context, **_kwargs): + """Returns status info for all nodes. Includes: + * Disk Space + * Instance List + * RAM used + * CPU used + * DHCP servers running + * Iptables / bridges + """ + return {'nodeSet': + [node_dict(n) for n in self.node_manager.get_nodes()] } + + @admin_only + def describe_node(self, _context, name, **_kwargs): + """Returns status info for single node. + """ + return node_dict(self.node_manager.get_node(name)) + diff --git a/nova/endpoint/api.py b/nova/endpoint/api.py new file mode 100755 index 000000000..5bbda3f56 --- /dev/null +++ b/nova/endpoint/api.py @@ -0,0 +1,337 @@ +#!/usr/bin/python +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tornado REST API Request Handlers for Nova functions +Most calls are proxied into the responsible controller. +""" + +import logging +import multiprocessing +import random +import re +import urllib +# TODO(termie): replace minidom with etree +from xml.dom import minidom + +from nova import vendor +import tornado.web +from twisted.internet import defer + +from nova import crypto +from nova import exception +from nova import flags +from nova import utils +from nova.endpoint import cloud +from nova.auth import users + +FLAGS = flags.FLAGS +flags.DEFINE_integer('cc_port', 8773, 'cloud controller port') + + +_log = logging.getLogger("api") +_log.setLevel(logging.DEBUG) + + +_c2u = re.compile('(((?<=[a-z])[A-Z])|([A-Z](?![A-Z]|$)))') + + +def _camelcase_to_underscore(str): + return _c2u.sub(r'_\1', str).lower().strip('_') + + +def _underscore_to_camelcase(str): + return ''.join([x[:1].upper() + x[1:] for x in str.split('_')]) + + +def _underscore_to_xmlcase(str): + res = _underscore_to_camelcase(str) + return res[:1].lower() + res[1:] + + +class APIRequestContext(object): + def __init__(self, handler, user): + self.handler = handler + self.user = user + self.request_id = ''.join( + [random.choice('ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890-') + for x in xrange(20)] + ) + + +class APIRequest(object): + def __init__(self, handler, controller, action): + self.handler = handler + self.controller = controller + self.action = action + + def send(self, user, **kwargs): + context = APIRequestContext(self.handler, user) + + try: + method = getattr(self.controller, + _camelcase_to_underscore(self.action)) + except AttributeError: + _error = ('Unsupported API request: controller = %s,' + 'action = %s') % (self.controller, self.action) + _log.warning(_error) + # TODO: Raise custom exception, trap in apiserver, + # and reraise as 400 error. + raise Exception(_error) + + args = {} + for key, value in kwargs.items(): + parts = key.split(".") + key = _camelcase_to_underscore(parts[0]) + if len(parts) > 1: + d = args.get(key, {}) + d[parts[1]] = value[0] + value = d + else: + value = value[0] + args[key] = value + + for key in args.keys(): + if isinstance(args[key], dict): + if args[key] != {} and args[key].keys()[0].isdigit(): + s = args[key].items() + s.sort() + args[key] = [v for k, v in s] + + d = defer.maybeDeferred(method, context, **args) + d.addCallback(self._render_response, context.request_id) + return d + + def _render_response(self, response_data, request_id): + xml = minidom.Document() + + response_el = xml.createElement(self.action + 'Response') + response_el.setAttribute('xmlns', + 'http://ec2.amazonaws.com/doc/2009-11-30/') + request_id_el = xml.createElement('requestId') + request_id_el.appendChild(xml.createTextNode(request_id)) + response_el.appendChild(request_id_el) + if(response_data == True): + self._render_dict(xml, response_el, {'return': 'true'}) + else: + self._render_dict(xml, response_el, response_data) + + xml.appendChild(response_el) + + response = xml.toxml() + xml.unlink() + _log.debug(response) + return response + + def _render_dict(self, xml, el, data): + try: + for key in data.keys(): + val = data[key] + el.appendChild(self._render_data(xml, key, val)) + except: + _log.debug(data) + raise + + def _render_data(self, xml, el_name, data): + el_name = _underscore_to_xmlcase(el_name) + data_el = xml.createElement(el_name) + + if isinstance(data, list): + for item in data: + data_el.appendChild(self._render_data(xml, 'item', item)) + elif isinstance(data, dict): + self._render_dict(xml, data_el, data) + elif hasattr(data, '__dict__'): + self._render_dict(xml, data_el, data.__dict__) + elif isinstance(data, bool): + data_el.appendChild(xml.createTextNode(str(data).lower())) + elif data != None: + data_el.appendChild(xml.createTextNode(str(data))) + + return data_el + + +class RootRequestHandler(tornado.web.RequestHandler): + def get(self): + # available api versions + versions = [ + '1.0', + '2007-01-19', + '2007-03-01', + '2007-08-29', + '2007-10-10', + '2007-12-15', + '2008-02-01', + '2008-09-01', + '2009-04-04', + ] + for version in versions: + self.write('%s\n' % version) + self.finish() + + +class MetadataRequestHandler(tornado.web.RequestHandler): + def print_data(self, data): + if isinstance(data, dict): + output = '' + for key in data: + if key == '_name': + continue + output += key + if isinstance(data[key], dict): + if '_name' in data[key]: + output += '=' + str(data[key]['_name']) + else: + output += '/' + output += '\n' + self.write(output[:-1]) # cut off last \n + elif isinstance(data, list): + self.write('\n'.join(data)) + else: + self.write(str(data)) + + def lookup(self, path, data): + items = path.split('/') + for item in items: + if item: + if not isinstance(data, dict): + return data + if not item in data: + return None + data = data[item] + return data + + def get(self, path): + cc = self.application.controllers['Cloud'] + meta_data = cc.get_metadata(self.request.remote_ip) + if meta_data is None: + _log.error('Failed to get metadata for ip: %s' % + self.request.remote_ip) + raise tornado.web.HTTPError(404) + data = self.lookup(path, meta_data) + if data is None: + raise tornado.web.HTTPError(404) + self.print_data(data) + self.finish() + + +class APIRequestHandler(tornado.web.RequestHandler): + def get(self, controller_name): + self.execute(controller_name) + + @tornado.web.asynchronous + def execute(self, controller_name): + # Obtain the appropriate controller for this request. + try: + controller = self.application.controllers[controller_name] + except KeyError: + self._error('unhandled', 'no controller named %s' % controller_name) + return + + args = self.request.arguments + + # Read request signature. + try: + signature = args.pop('Signature')[0] + except: + raise tornado.web.HTTPError(400) + + # Make a copy of args for authentication and signature verification. + auth_params = {} + for key, value in args.items(): + auth_params[key] = value[0] + + # Get requested action and remove authentication args for final request. + try: + action = args.pop('Action')[0] + args.pop('AWSAccessKeyId') + args.pop('SignatureMethod') + args.pop('SignatureVersion') + args.pop('Version') + args.pop('Timestamp') + except: + raise tornado.web.HTTPError(400) + + # Authenticate the request. + user = self.application.user_manager.authenticate( + auth_params, + signature, + self.request.method, + self.request.host, + self.request.path + ) + + if not user: + raise tornado.web.HTTPError(403) + + _log.debug('action: %s' % action) + + for key, value in args.items(): + _log.debug('arg: %s\t\tval: %s' % (key, value)) + + request = APIRequest(self, controller, action) + d = request.send(user, **args) + # d.addCallback(utils.debug) + + # TODO: Wrap response in AWS XML format + d.addCallbacks(self._write_callback, self._error_callback) + + def _write_callback(self, data): + self.set_header('Content-Type', 'text/xml') + self.write(data) + self.finish() + + def _error_callback(self, failure): + try: + failure.raiseException() + except exception.ApiError as ex: + self._error(type(ex).__name__ + "." + ex.code, ex.message) + # TODO(vish): do something more useful with unknown exceptions + except Exception as ex: + self._error(type(ex).__name__, str(ex)) + raise + + def post(self, controller_name): + self.execute(controller_name) + + def _error(self, code, message): + self._status_code = 400 + self.set_header('Content-Type', 'text/xml') + self.write('<?xml version="1.0"?>\n') + self.write('<Response><Errors><Error><Code>%s</Code>' + '<Message>%s</Message></Error></Errors>' + '<RequestID>?</RequestID></Response>' % (code, message)) + self.finish() + + +class APIServerApplication(tornado.web.Application): + def __init__(self, user_manager, controllers): + tornado.web.Application.__init__(self, [ + (r'/', RootRequestHandler), + (r'/services/([A-Za-z0-9]+)/', APIRequestHandler), + (r'/latest/([-A-Za-z0-9/]*)', MetadataRequestHandler), + (r'/2009-04-04/([-A-Za-z0-9/]*)', MetadataRequestHandler), + (r'/2008-09-01/([-A-Za-z0-9/]*)', MetadataRequestHandler), + (r'/2008-02-01/([-A-Za-z0-9/]*)', MetadataRequestHandler), + (r'/2007-12-15/([-A-Za-z0-9/]*)', MetadataRequestHandler), + (r'/2007-10-10/([-A-Za-z0-9/]*)', MetadataRequestHandler), + (r'/2007-08-29/([-A-Za-z0-9/]*)', MetadataRequestHandler), + (r'/2007-03-01/([-A-Za-z0-9/]*)', MetadataRequestHandler), + (r'/2007-01-19/([-A-Za-z0-9/]*)', MetadataRequestHandler), + (r'/1.0/([-A-Za-z0-9/]*)', MetadataRequestHandler), + ], pool=multiprocessing.Pool(4)) + self.user_manager = user_manager + self.controllers = controllers diff --git a/nova/endpoint/cloud.py b/nova/endpoint/cloud.py new file mode 100644 index 000000000..27dd81aa2 --- /dev/null +++ b/nova/endpoint/cloud.py @@ -0,0 +1,572 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Cloud Controller: Implementation of EC2 REST API calls, which are +dispatched to other nodes via AMQP RPC. State is via distributed +datastore. +""" + +import json +import logging +import os +import time + +from nova import vendor +from twisted.internet import defer + +from nova import datastore +from nova import flags +from nova import rpc +from nova import utils +from nova import exception +from nova.auth import users +from nova.compute import model +from nova.compute import network +from nova.endpoint import images +from nova.volume import storage + +FLAGS = flags.FLAGS + +flags.DEFINE_string('cloud_topic', 'cloud', 'the topic clouds listen on') + +def _gen_key(user_id, key_name): + """ Tuck this into UserManager """ + try: + manager = users.UserManager.instance() + private_key, fingerprint = manager.generate_key_pair(user_id, key_name) + except Exception as ex: + return {'exception': ex} + return {'private_key': private_key, 'fingerprint': fingerprint} + + +class CloudController(object): + """ CloudController provides the critical dispatch between + inbound API calls through the endpoint and messages + sent to the other nodes. +""" + def __init__(self): + self._instances = datastore.Keeper(FLAGS.instances_prefix) + self.instdir = model.InstanceDirectory() + self.network = network.NetworkController() + self.setup() + + @property + def instances(self): + """ All instances in the system, as dicts """ + for instance in self.instdir.all: + yield {instance['instance_id']: instance} + + @property + def volumes(self): + """ returns a list of all volumes """ + for volume_id in datastore.Redis.instance().smembers("volumes"): + volume = storage.Volume(volume_id=volume_id) + yield volume + + def __str__(self): + return 'CloudController' + + def setup(self): + """ Ensure the keychains and folders exist. """ + # Create keys folder, if it doesn't exist + if not os.path.exists(FLAGS.keys_path): + os.makedirs(os.path.abspath(FLAGS.keys_path)) + # Gen root CA, if we don't have one + root_ca_path = os.path.join(FLAGS.ca_path, FLAGS.ca_file) + if not os.path.exists(root_ca_path): + start = os.getcwd() + os.chdir(FLAGS.ca_path) + utils.runthis("Generating root CA: %s", "sh genrootca.sh") + os.chdir(start) + # TODO: Do this with M2Crypto instead + + def get_instance_by_ip(self, ip): + return self.instdir.by_ip(ip) + + def get_metadata(self, ip): + i = self.instdir.by_ip(ip) + if i is None: + return None + if i['key_name']: + keys = { + '0': { + '_name': i['key_name'], + 'openssh-key': i['key_data'] + } + } + else: + keys = '' + data = { + 'user-data': base64.b64decode(i['user_data']), + 'meta-data': { + 'ami-id': i['image_id'], + 'ami-launch-index': i['ami_launch_index'], + 'ami-manifest-path': 'FIXME', # image property + 'block-device-mapping': { # TODO: replace with real data + 'ami': 'sda1', + 'ephemeral0': 'sda2', + 'root': '/dev/sda1', + 'swap': 'sda3' + }, + 'hostname': i['private_dns_name'], # is this public sometimes? + 'instance-action': 'none', + 'instance-id': i['instance_id'], + 'instance-type': i.get('instance_type', ''), + 'local-hostname': i['private_dns_name'], + 'local-ipv4': i['private_dns_name'], # TODO: switch to IP + 'kernel-id': i.get('kernel_id', ''), + 'placement': { + 'availaibility-zone': i.get('availability_zone', 'nova'), + }, + 'public-hostname': i.get('dns_name', ''), + 'public-ipv4': i.get('dns_name', ''), # TODO: switch to IP + 'public-keys' : keys, + 'ramdisk-id': i.get('ramdisk_id', ''), + 'reservation-id': i['reservation_id'], + 'security-groups': i.get('groups', '') + } + } + if False: # TODO: store ancestor ids + data['ancestor-ami-ids'] = [] + if i.get('product_codes', None): + data['product-codes'] = i['product_codes'] + return data + + + def describe_availability_zones(self, context, **kwargs): + return {'availabilityZoneInfo': [{'zoneName': 'nova', + 'zoneState': 'available'}]} + + def describe_key_pairs(self, context, key_name=None, **kwargs): + key_pairs = [] + key_names = key_name and key_name or [] + if len(key_names) > 0: + for key_name in key_names: + key_pair = context.user.get_key_pair(key_name) + if key_pair != None: + key_pairs.append({ + 'keyName': key_pair.name, + 'keyFingerprint': key_pair.fingerprint, + }) + else: + for key_pair in context.user.get_key_pairs(): + key_pairs.append({ + 'keyName': key_pair.name, + 'keyFingerprint': key_pair.fingerprint, + }) + + return { 'keypairsSet': key_pairs } + + def create_key_pair(self, context, key_name, **kwargs): + try: + d = defer.Deferred() + p = context.handler.application.settings.get('pool') + def _complete(kwargs): + if 'exception' in kwargs: + d.errback(kwargs['exception']) + return + d.callback({'keyName': key_name, + 'keyFingerprint': kwargs['fingerprint'], + 'keyMaterial': kwargs['private_key']}) + p.apply_async(_gen_key, [context.user.id, key_name], + callback=_complete) + return d + + except users.UserError, e: + raise + + def delete_key_pair(self, context, key_name, **kwargs): + context.user.delete_key_pair(key_name) + # aws returns true even if the key doens't exist + return True + + def describe_security_groups(self, context, group_names, **kwargs): + groups = { 'securityGroupSet': [] } + + # Stubbed for now to unblock other things. + return groups + + def create_security_group(self, context, group_name, **kwargs): + return True + + def delete_security_group(self, context, group_name, **kwargs): + return True + + def get_console_output(self, context, instance_id, **kwargs): + # instance_id is passed in as a list of instances + instance = self.instdir.get(instance_id[0]) + if instance['state'] == 'pending': + raise exception.ApiError('Cannot get output for pending instance') + if not context.user.is_authorized(instance.get('owner_id', None)): + raise exception.ApiError('Not authorized to view output') + return rpc.call('%s.%s' % (FLAGS.compute_topic, instance['node_name']), + {"method": "get_console_output", + "args" : {"instance_id": instance_id[0]}}) + + def _get_user_id(self, context): + if context and context.user: + return context.user.id + else: + return None + + def describe_volumes(self, context, **kwargs): + volumes = [] + for volume in self.volumes: + if context.user.is_authorized(volume.get('user_id', None)): + v = self.format_volume(context, volume) + volumes.append(v) + return defer.succeed({'volumeSet': volumes}) + + def format_volume(self, context, volume): + v = {} + v['volumeId'] = volume['volume_id'] + v['status'] = volume['status'] + v['size'] = volume['size'] + v['availabilityZone'] = volume['availability_zone'] + v['createTime'] = volume['create_time'] + if context.user.is_admin(): + v['status'] = '%s (%s, %s, %s, %s)' % ( + volume.get('status', None), + volume.get('user_id', None), + volume.get('node_name', None), + volume.get('instance_id', ''), + volume.get('mountpoint', '')) + return v + + def create_volume(self, context, size, **kwargs): + # TODO(vish): refactor this to create the volume object here and tell storage to create it + res = rpc.call(FLAGS.storage_topic, {"method": "create_volume", + "args" : {"size": size, + "user_id": context.user.id}}) + def _format_result(result): + volume = self._get_volume(result['result']) + return {'volumeSet': [self.format_volume(context, volume)]} + res.addCallback(_format_result) + return res + + def _get_by_id(self, nodes, id): + if nodes == {}: + raise exception.NotFound("%s not found" % id) + for node_name, node in nodes.iteritems(): + if node.has_key(id): + return node_name, node[id] + raise exception.NotFound("%s not found" % id) + + def _get_volume(self, volume_id): + for volume in self.volumes: + if volume['volume_id'] == volume_id: + return volume + + def attach_volume(self, context, volume_id, instance_id, device, **kwargs): + volume = self._get_volume(volume_id) + storage_node = volume['node_name'] + # TODO: (joshua) Fix volumes to store creator id + if not context.user.is_authorized(volume.get('user_id', None)): + raise exception.ApiError("%s not authorized for %s" % + (context.user.id, volume_id)) + instance = self.instdir.get(instance_id) + compute_node = instance['node_name'] + if not context.user.is_authorized(instance.get('owner_id', None)): + raise exception.ApiError(message="%s not authorized for %s" % + (context.user.id, instance_id)) + aoe_device = volume['aoe_device'] + # Needs to get right node controller for attaching to + # TODO: Maybe have another exchange that goes to everyone? + rpc.cast('%s.%s' % (FLAGS.compute_topic, compute_node), + {"method": "attach_volume", + "args" : {"aoe_device": aoe_device, + "instance_id" : instance_id, + "mountpoint" : device}}) + rpc.cast('%s.%s' % (FLAGS.storage_topic, storage_node), + {"method": "attach_volume", + "args" : {"volume_id": volume_id, + "instance_id" : instance_id, + "mountpoint" : device}}) + return defer.succeed(True) + + def detach_volume(self, context, volume_id, **kwargs): + # TODO(joshua): Make sure the updated state has been received first + volume = self._get_volume(volume_id) + storage_node = volume['node_name'] + if not context.user.is_authorized(volume.get('user_id', None)): + raise exception.ApiError("%s not authorized for %s" % + (context.user.id, volume_id)) + if 'instance_id' in volume.keys(): + instance_id = volume['instance_id'] + try: + instance = self.instdir.get(instance_id) + compute_node = instance['node_name'] + mountpoint = volume['mountpoint'] + if not context.user.is_authorized( + instance.get('owner_id', None)): + raise exception.ApiError( + "%s not authorized for %s" % + (context.user.id, instance_id)) + rpc.cast('%s.%s' % (FLAGS.compute_topic, compute_node), + {"method": "detach_volume", + "args" : {"instance_id": instance_id, + "mountpoint": mountpoint}}) + except exception.NotFound: + pass + rpc.cast('%s.%s' % (FLAGS.storage_topic, storage_node), + {"method": "detach_volume", + "args" : {"volume_id": volume_id}}) + return defer.succeed(True) + + def _convert_to_set(self, lst, str): + if lst == None or lst == []: + return None + return [{str: x} for x in lst] + + def describe_instances(self, context, **kwargs): + return defer.succeed(self.format_instances(context.user)) + + def format_instances(self, user, reservation_id = None): + if self.instances == {}: + return {'reservationSet': []} + reservations = {} + for inst in self.instances: + instance = inst.values()[0] + res_id = instance.get('reservation_id', 'Unknown') + if (user.is_authorized(instance.get('owner_id', None)) + and (reservation_id == None or reservation_id == res_id)): + i = {} + i['instance_id'] = instance.get('instance_id', None) + i['image_id'] = instance.get('image_id', None) + i['instance_state'] = { + 'code': 42, + 'name': instance.get('state', 'pending') + } + i['public_dns_name'] = self.network.get_public_ip_for_instance( + i['instance_id']) + i['private_dns_name'] = instance.get('private_dns_name', None) + if not i['public_dns_name']: + i['public_dns_name'] = i['private_dns_name'] + i['dns_name'] = instance.get('dns_name', None) + i['key_name'] = instance.get('key_name', None) + if user.is_admin(): + i['key_name'] = '%s (%s, %s)' % (i['key_name'], + instance.get('owner_id', None), instance.get('node_name','')) + i['product_codes_set'] = self._convert_to_set( + instance.get('product_codes', None), 'product_code') + i['instance_type'] = instance.get('instance_type', None) + i['launch_time'] = instance.get('launch_time', None) + i['ami_launch_index'] = instance.get('ami_launch_index', + None) + if not reservations.has_key(res_id): + r = {} + r['reservation_id'] = res_id + r['owner_id'] = instance.get('owner_id', None) + r['group_set'] = self._convert_to_set( + instance.get('groups', None), 'group_id') + r['instances_set'] = [] + reservations[res_id] = r + reservations[res_id]['instances_set'].append(i) + + instance_response = {'reservationSet' : list(reservations.values()) } + return instance_response + + def describe_addresses(self, context, **kwargs): + return self.format_addresses(context.user) + + def format_addresses(self, user): + addresses = [] + # TODO(vish): move authorization checking into network.py + for address_record in self.network.describe_addresses( + type=network.PublicNetwork): + #logging.debug(address_record) + if user.is_authorized(address_record[u'user_id']): + address = { + 'public_ip': address_record[u'address'], + 'instance_id' : address_record.get(u'instance_id', 'free') + } + # FIXME: add another field for user id + if user.is_admin(): + address['instance_id'] = "%s (%s)" % ( + address['instance_id'], + address_record[u'user_id'], + ) + addresses.append(address) + # logging.debug(addresses) + return {'addressesSet': addresses} + + def allocate_address(self, context, **kwargs): + # TODO: Verify user is valid? + kwargs['owner_id'] = context.user.id + (address,network_name) = self.network.allocate_address( + context.user.id, type=network.PublicNetwork) + return defer.succeed({'addressSet': [{'publicIp' : address}]}) + + def release_address(self, context, **kwargs): + self.network.deallocate_address(kwargs.get('public_ip', None)) + return defer.succeed({'releaseResponse': ["Address released."]}) + + def associate_address(self, context, instance_id, **kwargs): + instance = self.instdir.get(instance_id) + rv = self.network.associate_address( + kwargs['public_ip'], + instance['private_dns_name'], + instance_id) + return defer.succeed({'associateResponse': ["Address associated."]}) + + def disassociate_address(self, context, **kwargs): + rv = self.network.disassociate_address(kwargs['public_ip']) + # TODO - Strip the IP from the instance + return rv + + def run_instances(self, context, **kwargs): + logging.debug("Going to run instances...") + reservation_id = utils.generate_uid('r') + launch_time = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) + key_data = None + if kwargs.has_key('key_name'): + key_pair = context.user.get_key_pair(kwargs['key_name']) + if not key_pair: + raise exception.ApiError('Key Pair %s not found' % + kwargs['key_name']) + key_data = key_pair.public_key + + for num in range(int(kwargs['max_count'])): + inst = self.instdir.new() + # TODO(ja): add ari, aki + inst['image_id'] = kwargs['image_id'] + inst['user_data'] = kwargs.get('user_data', '') + inst['instance_type'] = kwargs.get('instance_type', '') + inst['reservation_id'] = reservation_id + inst['launch_time'] = launch_time + inst['key_data'] = key_data or '' + inst['key_name'] = kwargs.get('key_name', '') + inst['owner_id'] = context.user.id + inst['mac_address'] = utils.generate_mac() + inst['ami_launch_index'] = num + address, _netname = self.network.allocate_address( + inst['owner_id'], mac=inst['mac_address']) + network = self.network.get_users_network(str(context.user.id)) + inst['network_str'] = json.dumps(network.to_dict()) + inst['bridge_name'] = network.bridge_name + inst['private_dns_name'] = str(address) + # TODO: allocate expresses on the router node + inst.save() + rpc.cast(FLAGS.compute_topic, + {"method": "run_instance", + "args": {"instance_id" : inst.instance_id}}) + logging.debug("Casting to node for %s's instance with IP of %s" % + (context.user.name, inst['private_dns_name'])) + # TODO: Make the NetworkComputeNode figure out the network name from ip. + return defer.succeed(self.format_instances( + context.user, reservation_id)) + + def terminate_instances(self, context, instance_id, **kwargs): + logging.debug("Going to start terminating instances") + # TODO: return error if not authorized + for i in instance_id: + logging.debug("Going to try and terminate %s" % i) + instance = self.instdir.get(i) + #if instance['state'] == 'pending': + # raise exception.ApiError('Cannot terminate pending instance') + if context.user.is_authorized(instance.get('owner_id', None)): + try: + self.network.disassociate_address( + instance.get('public_dns_name', 'bork')) + except: + pass + if instance.get('private_dns_name', None): + logging.debug("Deallocating address %s" % instance.get('private_dns_name', None)) + try: + self.network.deallocate_address(instance.get('private_dns_name', None)) + except Exception, _err: + pass + if instance.get('node_name', 'unassigned') != 'unassigned': #It's also internal default + rpc.cast('%s.%s' % (FLAGS.compute_topic, instance['node_name']), + {"method": "terminate_instance", + "args" : {"instance_id": i}}) + else: + instance.destroy() + return defer.succeed(True) + + def reboot_instances(self, context, instance_id, **kwargs): + # TODO: return error if not authorized + for i in instance_id: + instance = self.instdir.get(i) + if instance['state'] == 'pending': + raise exception.ApiError('Cannot reboot pending instance') + if context.user.is_authorized(instance.get('owner_id', None)): + rpc.cast('%s.%s' % (FLAGS.node_topic, instance['node_name']), + {"method": "reboot_instance", + "args" : {"instance_id": i}}) + return defer.succeed(True) + + def delete_volume(self, context, volume_id, **kwargs): + # TODO: return error if not authorized + volume = self._get_volume(volume_id) + storage_node = volume['node_name'] + if context.user.is_authorized(volume.get('user_id', None)): + rpc.cast('%s.%s' % (FLAGS.storage_topic, storage_node), + {"method": "delete_volume", + "args" : {"volume_id": volume_id}}) + return defer.succeed(True) + + def describe_images(self, context, image_id=None, **kwargs): + imageSet = images.list(context.user) + if not image_id is None: + imageSet = [i for i in imageSet if i['imageId'] in image_id] + + return defer.succeed({'imagesSet': imageSet}) + + def deregister_image(self, context, image_id, **kwargs): + images.deregister(context.user, image_id) + + return defer.succeed({'imageId': image_id}) + + def register_image(self, context, image_location=None, **kwargs): + if image_location is None and kwargs.has_key('name'): + image_location = kwargs['name'] + + image_id = images.register(context.user, image_location) + logging.debug("Registered %s as %s" % (image_location, image_id)) + + return defer.succeed({'imageId': image_id}) + + def modify_image_attribute(self, context, image_id, + attribute, operation_type, **kwargs): + if attribute != 'launchPermission': + raise exception.ApiError('only launchPermission is supported') + if len(kwargs['user_group']) != 1 and kwargs['user_group'][0] != 'all': + raise exception.ApiError('only group "all" is supported') + if not operation_type in ['add', 'delete']: + raise exception.ApiError('operation_type must be add or delete') + result = images.modify(context.user, image_id, operation_type) + return defer.succeed(result) + + def update_state(self, topic, value): + """ accepts status reports from the queue and consolidates them """ + # TODO(jmc): if an instance has disappeared from + # the node, call instance_death + if topic == "instances": + return defer.succeed(True) + aggregate_state = getattr(self, topic) + node_name = value.keys()[0] + items = value[node_name] + + logging.debug("Updating %s state for %s" % (topic, node_name)) + + for item_id in items.keys(): + if (aggregate_state.has_key('pending') and + aggregate_state['pending'].has_key(item_id)): + del aggregate_state['pending'][item_id] + aggregate_state[node_name] = items + + return defer.succeed(True) diff --git a/nova/endpoint/images.py b/nova/endpoint/images.py new file mode 100644 index 000000000..f494ce892 --- /dev/null +++ b/nova/endpoint/images.py @@ -0,0 +1,92 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Proxy AMI-related calls from the cloud controller, to the running +objectstore daemon. +""" + +import json +import random +import urllib + +from nova import vendor +import boto +import boto.s3 + +from nova import flags +from nova import utils + +FLAGS = flags.FLAGS + + +def modify(user, image_id, operation): + conn(user).make_request( + method='POST', + bucket='_images', + query_args=qs({'image_id': image_id, 'operation': operation})) + + return True + + +def register(user, image_location): + """ rpc call to register a new image based from a manifest """ + + image_id = utils.generate_uid('ami') + conn(user).make_request( + method='PUT', + bucket='_images', + query_args=qs({'image_location': image_location, + 'image_id': image_id})) + + return image_id + + +def list(user, filter_list=[]): + """ return a list of all images that a user can see + + optionally filtered by a list of image_id """ + + # FIXME: send along the list of only_images to check for + response = conn(user).make_request( + method='GET', + bucket='_images') + + return json.loads(response.read()) + + +def deregister(user, image_id): + """ unregister an image """ + conn(user).make_request( + method='DELETE', + bucket='_images', + query_args=qs({'image_id': image_id})) + + +def conn(user): + return boto.s3.connection.S3Connection ( + aws_access_key_id=user.access, + aws_secret_access_key=user.secret, + is_secure=False, + calling_format=boto.s3.connection.OrdinaryCallingFormat(), + port=FLAGS.s3_port, + host=FLAGS.s3_host) + + +def qs(params): + pairs = [] + for key in params.keys(): + pairs.append(key + '=' + urllib.quote(params[key])) + return '&'.join(pairs) diff --git a/nova/exception.py b/nova/exception.py new file mode 100644 index 000000000..dc7b16cdb --- /dev/null +++ b/nova/exception.py @@ -0,0 +1,53 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Nova base exception handling, including decorator for re-raising +Nova-type exceptions. SHOULD include dedicated exception logging. +""" + +import logging +import traceback +import sys + +class Error(Exception): + pass + +class ApiError(Error): + def __init__(self, message='Unknown', code='Unknown'): + self.message = message + self.code = code + +class NotFound(Error): + pass + +class NotAuthorized(Error): + pass + +def wrap_exception(f): + def _wrap(*args, **kw): + try: + return f(*args, **kw) + except Exception, e: + if not isinstance(e, Error): + # exc_type, exc_value, exc_traceback = sys.exc_info() + logging.exception('Uncaught exception') + # logging.debug(traceback.extract_stack(exc_traceback)) + raise Error(str(e)) + raise + _wrap.func_name = f.func_name + return _wrap + + diff --git a/nova/fakerabbit.py b/nova/fakerabbit.py new file mode 100644 index 000000000..ec2e50791 --- /dev/null +++ b/nova/fakerabbit.py @@ -0,0 +1,131 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Based a bit on the carrot.backeds.queue backend... but a lot better """ + +import logging +import Queue as queue + +from carrot.backends import base + + +class Message(base.BaseMessage): + pass + + +class Exchange(object): + def __init__(self, name, exchange_type): + self.name = name + self.exchange_type = exchange_type + self._queue = queue.Queue() + self._routes = {} + + def publish(self, message, routing_key=None): + logging.debug('(%s) publish (key: %s) %s', + self.name, routing_key, message) + if routing_key in self._routes: + for f in self._routes[routing_key]: + logging.debug('Publishing to route %s', f) + f(message, routing_key=routing_key) + + def bind(self, callback, routing_key): + self._routes.setdefault(routing_key, []) + self._routes[routing_key].append(callback) + + +class Queue(object): + def __init__(self, name): + self.name = name + self._queue = queue.Queue() + + def __repr__(self): + return '<Queue: %s>' % self.name + + def push(self, message, routing_key=None): + self._queue.put(message) + + def size(self): + return self._queue.qsize() + + def pop(self): + return self._queue.get() + + +class Backend(object): + """ Singleton backend for testing """ + class __impl(base.BaseBackend): + def __init__(self, *args, **kwargs): + #super(__impl, self).__init__(*args, **kwargs) + self._exchanges = {} + self._queues = {} + + def _reset_all(self): + self._exchanges = {} + self._queues = {} + + def queue_declare(self, queue, **kwargs): + if queue not in self._queues: + logging.debug('Declaring queue %s', queue) + self._queues[queue] = Queue(queue) + + def exchange_declare(self, exchange, type, *args, **kwargs): + if exchange not in self._exchanges: + logging.debug('Declaring exchange %s', exchange) + self._exchanges[exchange] = Exchange(exchange, type) + + def queue_bind(self, queue, exchange, routing_key, **kwargs): + logging.debug('Binding %s to %s with key %s', + queue, exchange, routing_key) + self._exchanges[exchange].bind(self._queues[queue].push, + routing_key) + + def get(self, queue, no_ack=False): + if not self._queues[queue].size(): + return None + (message_data, content_type, content_encoding) = \ + self._queues[queue].pop() + message = Message(backend=self, body=message_data, + content_type=content_type, + content_encoding=content_encoding) + logging.debug('Getting from %s: %s', queue, message) + return message + + def prepare_message(self, message_data, delivery_mode, + content_type, content_encoding, **kwargs): + """Prepare message for sending.""" + return (message_data, content_type, content_encoding) + + def publish(self, message, exchange, routing_key, **kwargs): + if exchange in self._exchanges: + self._exchanges[exchange].publish( + message, routing_key=routing_key) + + + __instance = None + + def __init__(self, *args, **kwargs): + if Backend.__instance is None: + Backend.__instance = Backend.__impl(*args, **kwargs) + self.__dict__['_Backend__instance'] = Backend.__instance + + def __getattr__(self, attr): + return getattr(self.__instance, attr) + + def __setattr__(self, attr, value): + return setattr(self.__instance, attr, value) + + +def reset_all(): + Backend()._reset_all() diff --git a/nova/fakevirt.py b/nova/fakevirt.py new file mode 100644 index 000000000..2b918d388 --- /dev/null +++ b/nova/fakevirt.py @@ -0,0 +1,109 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +A fake (in-memory) hypervisor+api. Allows nova testing w/o KVM and libvirt. +""" + +import StringIO +from xml.etree import ElementTree + + +class FakeVirtConnection(object): + # FIXME: networkCreateXML, listNetworks don't do anything since + # they aren't exercised in tests yet + + def __init__(self): + self.next_index = 0 + self.instances = {} + + @classmethod + def instance(cls): + if not hasattr(cls, '_instance'): + cls._instance = cls() + return cls._instance + + def lookupByID(self, i): + return self.instances[str(i)] + + def listDomainsID(self): + return self.instances.keys() + + def listNetworks(self): + return [] + + def lookupByName(self, instance_id): + for x in self.instances.values(): + if x.name() == instance_id: + return x + raise Exception('no instance found for instance_id: %s' % instance_id) + + def networkCreateXML(self, xml): + pass + + def createXML(self, xml, flags): + # parse the xml :( + xml_stringio = StringIO.StringIO(xml) + + my_xml = ElementTree.parse(xml_stringio) + name = my_xml.find('name').text + + fake_instance = FakeVirtInstance(conn=self, + index=str(self.next_index), + name=name, + xml=my_xml) + self.instances[str(self.next_index)] = fake_instance + self.next_index += 1 + + def _removeInstance(self, i): + self.instances.pop(str(i)) + + +class FakeVirtInstance(object): + NOSTATE = 0x00 + RUNNING = 0x01 + BLOCKED = 0x02 + PAUSED = 0x03 + SHUTDOWN = 0x04 + SHUTOFF = 0x05 + CRASHED = 0x06 + + def __init__(self, conn, index, name, xml): + self._conn = conn + self._destroyed = False + self._name = name + self._index = index + self._state = self.RUNNING + + def name(self): + return self._name + + def destroy(self): + if self._state == self.SHUTOFF: + raise Exception('instance already destroyed: %s' % self.name()) + self._state = self.SHUTDOWN + self._conn._removeInstance(self._index) + + def info(self): + return [self._state, 0, 2, 0, 0] + + def XMLDesc(self, flags): + return open('fakevirtinstance.xml', 'r').read() + + def blockStats(self, disk): + return [0L, 0L, 0L, 0L, null] + + def interfaceStats(self, iface): + return [0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L] diff --git a/nova/flags.py b/nova/flags.py new file mode 100644 index 000000000..7818e1b14 --- /dev/null +++ b/nova/flags.py @@ -0,0 +1,78 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Package-level global flags are defined here, the rest are defined +where they're used. +""" + +import socket + +from nova import vendor +from gflags import * + +# This keeps pylint from barfing on the imports +FLAGS = FLAGS +DEFINE_string = DEFINE_string +DEFINE_integer = DEFINE_integer +DEFINE_bool = DEFINE_bool + +# __GLOBAL FLAGS ONLY__ +# Define any app-specific flags in their own files, docs at: +# http://code.google.com/p/python-gflags/source/browse/trunk/gflags.py#39 + +DEFINE_integer('s3_port', 3333, 's3 port') +DEFINE_integer('s3_internal_port', 3334, 's3 port') +DEFINE_string('s3_host', '127.0.0.1', 's3 host') +#DEFINE_string('cloud_topic', 'cloud', 'the topic clouds listen on') +DEFINE_string('compute_topic', 'compute', 'the topic compute nodes listen on') +DEFINE_string('storage_topic', 'storage', 'the topic storage nodes listen on') +DEFINE_bool('fake_libvirt', False, + 'whether to use a fake libvirt or not') +DEFINE_bool('verbose', False, 'show debug output') +DEFINE_boolean('fake_rabbit', False, 'use a fake rabbit') +DEFINE_bool('fake_network', False, 'should we use fake network devices and addresses') +DEFINE_bool('fake_users', False, 'use fake users') +DEFINE_string('rabbit_host', 'localhost', 'rabbit host') +DEFINE_integer('rabbit_port', 5672, 'rabbit port') +DEFINE_string('rabbit_userid', 'guest', 'rabbit userid') +DEFINE_string('rabbit_password', 'guest', 'rabbit password') +DEFINE_string('rabbit_virtual_host', '/', 'rabbit virtual host') +DEFINE_string('control_exchange', 'nova', 'the main exchange to connect to') +DEFINE_string('ec2_url', + 'http://127.0.0.1:8773/services/Cloud', + 'Url to ec2 api server') + +DEFINE_string('default_image', + 'ami-11111', + 'default image to use, testing only') +DEFINE_string('default_kernel', + 'aki-11111', + 'default kernel to use, testing only') +DEFINE_string('default_ramdisk', + 'ari-11111', + 'default ramdisk to use, testing only') +DEFINE_string('default_instance_type', + 'm1.small', + 'default instance type to use, testing only') + +# UNUSED +DEFINE_string('node_availability_zone', + 'nova', + 'availability zone of this node') +DEFINE_string('node_name', + socket.gethostname(), + 'name of this node') + diff --git a/nova/objectstore/__init__.py b/nova/objectstore/__init__.py new file mode 100644 index 000000000..c6c09e53e --- /dev/null +++ b/nova/objectstore/__init__.py @@ -0,0 +1,28 @@ +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +:mod:`nova.objectstore` -- S3-type object store +===================================================== + +.. automodule:: nova.objectstore + :platform: Unix + :synopsis: Currently a trivial file-based system, getting extended w/ mongo. +.. moduleauthor:: Jesse Andrews <jesse@ansolabs.com> +.. moduleauthor:: Devin Carlen <devin.carlen@gmail.com> +.. moduleauthor:: Vishvananda Ishaya <vishvananda@yahoo.com> +.. moduleauthor:: Joshua McKenty <joshua@cognition.ca> +.. moduleauthor:: Manish Singh <yosh@gimp.org> +.. moduleauthor:: Andy Smith <andy@anarkystic.com> +"""
\ No newline at end of file diff --git a/nova/objectstore/bucket.py b/nova/objectstore/bucket.py new file mode 100644 index 000000000..0777c2f11 --- /dev/null +++ b/nova/objectstore/bucket.py @@ -0,0 +1,174 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Simple object store using Blobs and JSON files on disk. +""" + +import datetime +import glob +import json +import os +import bisect + +from nova import exception +from nova import flags +from nova import utils +from nova.objectstore import stored + + +FLAGS = flags.FLAGS +flags.DEFINE_string('buckets_path', utils.abspath('../buckets'), + 'path to s3 buckets') + + +class Bucket(object): + def __init__(self, name): + self.name = name + self.path = os.path.abspath(os.path.join(FLAGS.buckets_path, name)) + if not self.path.startswith(os.path.abspath(FLAGS.buckets_path)) or \ + not os.path.isdir(self.path): + raise exception.NotFound() + + self.ctime = os.path.getctime(self.path) + + def __repr__(self): + return "<Bucket: %s>" % self.name + + @staticmethod + def all(): + """ list of all buckets """ + buckets = [] + for fn in glob.glob("%s/*.json" % FLAGS.buckets_path): + try: + json.load(open(fn)) + name = os.path.split(fn)[-1][:-5] + buckets.append(Bucket(name)) + except: + pass + + return buckets + + @staticmethod + def create(bucket_name, user): + """Create a new bucket owned by a user. + + @bucket_name: a string representing the name of the bucket to create + @user: a nova.auth.user who should own the bucket. + + Raises: + NotAuthorized: if the bucket is already exists or has invalid name + """ + path = os.path.abspath(os.path.join( + FLAGS.buckets_path, bucket_name)) + if not path.startswith(os.path.abspath(FLAGS.buckets_path)) or \ + os.path.exists(path): + raise exception.NotAuthorized() + + os.makedirs(path) + + with open(path+'.json', 'w') as f: + json.dump({'ownerId': user.id}, f) + + @property + def metadata(self): + """ dictionary of metadata around bucket, + keys are 'Name' and 'CreationDate' + """ + + return { + "Name": self.name, + "CreationDate": datetime.datetime.utcfromtimestamp(self.ctime), + } + + @property + def owner_id(self): + try: + with open(self.path+'.json') as f: + return json.load(f)['ownerId'] + except: + return None + + def is_authorized(self, user): + try: + return user.is_admin() or self.owner_id == user.id + except Exception, e: + pass + + def list_keys(self, prefix='', marker=None, max_keys=1000, terse=False): + object_names = [] + for root, dirs, files in os.walk(self.path): + for file_name in files: + object_names.append(os.path.join(root, file_name)[len(self.path)+1:]) + object_names.sort() + contents = [] + + start_pos = 0 + if marker: + start_pos = bisect.bisect_right(object_names, marker, start_pos) + if prefix: + start_pos = bisect.bisect_left(object_names, prefix, start_pos) + + truncated = False + for object_name in object_names[start_pos:]: + if not object_name.startswith(prefix): + break + if len(contents) >= max_keys: + truncated = True + break + object_path = self._object_path(object_name) + c = {"Key": object_name} + if not terse: + info = os.stat(object_path) + c.update({ + "LastModified": datetime.datetime.utcfromtimestamp( + info.st_mtime), + "Size": info.st_size, + }) + contents.append(c) + marker = object_name + + return { + "Name": self.name, + "Prefix": prefix, + "Marker": marker, + "MaxKeys": max_keys, + "IsTruncated": truncated, + "Contents": contents, + } + + def _object_path(self, object_name): + fn = os.path.join(self.path, object_name) + + if not fn.startswith(self.path): + raise exception.NotAuthorized() + + return fn + + def delete(self): + if len(os.listdir(self.path)) > 0: + raise exception.NotAuthorized() + os.rmdir(self.path) + os.remove(self.path+'.json') + + def __getitem__(self, key): + return stored.Object(self, key) + + def __setitem__(self, key, value): + with open(self._object_path(key), 'wb') as f: + f.write(value) + + def __delitem__(self, key): + stored.Object(self, key).delete() diff --git a/nova/objectstore/handler.py b/nova/objectstore/handler.py new file mode 100644 index 000000000..c3e036a40 --- /dev/null +++ b/nova/objectstore/handler.py @@ -0,0 +1,285 @@ +#!/usr/bin/env python +# +# Copyright 2009 Facebook +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Implementation of an S3-like storage server based on local files. + +Useful to test features that will eventually run on S3, or if you want to +run something locally that was once running on S3. + +We don't support all the features of S3, but it does work with the +standard S3 client for the most basic semantics. To use the standard +S3 client with this module:: + + c = S3.AWSAuthConnection("", "", server="localhost", port=8888, + is_secure=False) + c.create_bucket("mybucket") + c.put("mybucket", "mykey", "a value") + print c.get("mybucket", "mykey").body + +""" +import datetime +import os +import urllib +import json +import logging +import multiprocessing + + +from nova import vendor +from tornado import escape, web + +from nova import exception +from nova import flags +from nova.objectstore import bucket +from nova.objectstore import image + + +FLAGS = flags.FLAGS + + +def catch_nova_exceptions(target): + # FIXME: find a way to wrap all handlers in the web.Application.__init__ ? + def wrapper(*args, **kwargs): + try: + return target(*args, **kwargs) + except exception.NotFound: + raise web.HTTPError(404) + except exception.NotAuthorized: + raise web.HTTPError(403) + + return wrapper + + +class Application(web.Application): + """Implementation of an S3-like storage server based on local files.""" + def __init__(self, user_manager): + web.Application.__init__(self, [ + (r"/", RootHandler), + (r"/_images/", ImageHandler), + (r"/([^/]+)/(.+)", ObjectHandler), + (r"/([^/]+)/", BucketHandler), + ]) + self.buckets_path = os.path.abspath(FLAGS.buckets_path) + self.images_path = os.path.abspath(FLAGS.images_path) + + if not os.path.exists(self.buckets_path): + raise Exception("buckets_path does not exist") + if not os.path.exists(self.images_path): + raise Exception("images_path does not exist") + self.user_manager = user_manager + + +class BaseRequestHandler(web.RequestHandler): + SUPPORTED_METHODS = ("PUT", "GET", "DELETE", "HEAD") + + @property + def user(self): + if not hasattr(self, '_user'): + try: + access = self.request.headers['Authorization'].split(' ')[1].split(':')[0] + user = self.application.user_manager.get_user_from_access_key(access) + user.secret # FIXME: check signature here! + self._user = user + except: + raise web.HTTPError(403) + return self._user + + def render_xml(self, value): + assert isinstance(value, dict) and len(value) == 1 + self.set_header("Content-Type", "application/xml; charset=UTF-8") + name = value.keys()[0] + parts = [] + parts.append('<' + escape.utf8(name) + + ' xmlns="http://doc.s3.amazonaws.com/2006-03-01">') + self._render_parts(value.values()[0], parts) + parts.append('</' + escape.utf8(name) + '>') + self.finish('<?xml version="1.0" encoding="UTF-8"?>\n' + + ''.join(parts)) + + def _render_parts(self, value, parts=[]): + if isinstance(value, basestring): + parts.append(escape.xhtml_escape(value)) + elif isinstance(value, int) or isinstance(value, long): + parts.append(str(value)) + elif isinstance(value, datetime.datetime): + parts.append(value.strftime("%Y-%m-%dT%H:%M:%S.000Z")) + elif isinstance(value, dict): + for name, subvalue in value.iteritems(): + if not isinstance(subvalue, list): + subvalue = [subvalue] + for subsubvalue in subvalue: + parts.append('<' + escape.utf8(name) + '>') + self._render_parts(subsubvalue, parts) + parts.append('</' + escape.utf8(name) + '>') + else: + raise Exception("Unknown S3 value type %r", value) + + def head(self, *args, **kwargs): + return self.get(*args, **kwargs) + + +class RootHandler(BaseRequestHandler): + def get(self): + buckets = [b for b in bucket.Bucket.all() if b.is_authorized(self.user)] + + self.render_xml({"ListAllMyBucketsResult": { + "Buckets": {"Bucket": [b.metadata for b in buckets]}, + }}) + + +class BucketHandler(BaseRequestHandler): + @catch_nova_exceptions + def get(self, bucket_name): + logging.debug("List keys for bucket %s" % (bucket_name)) + + bucket_object = bucket.Bucket(bucket_name) + + if not bucket_object.is_authorized(self.user): + raise web.HTTPError(403) + + prefix = self.get_argument("prefix", u"") + marker = self.get_argument("marker", u"") + max_keys = int(self.get_argument("max-keys", 1000)) + terse = int(self.get_argument("terse", 0)) + + results = bucket_object.list_keys(prefix=prefix, marker=marker, max_keys=max_keys, terse=terse) + self.render_xml({"ListBucketResult": results}) + + @catch_nova_exceptions + def put(self, bucket_name): + logging.debug("Creating bucket %s" % (bucket_name)) + bucket.Bucket.create(bucket_name, self.user) + self.finish() + + @catch_nova_exceptions + def delete(self, bucket_name): + logging.debug("Deleting bucket %s" % (bucket_name)) + bucket_object = bucket.Bucket(bucket_name) + + if not bucket_object.is_authorized(self.user): + raise web.HTTPError(403) + + bucket_object.delete() + self.set_status(204) + self.finish() + + +class ObjectHandler(BaseRequestHandler): + @catch_nova_exceptions + def get(self, bucket_name, object_name): + logging.debug("Getting object: %s / %s" % (bucket_name, object_name)) + + bucket_object = bucket.Bucket(bucket_name) + + if not bucket_object.is_authorized(self.user): + raise web.HTTPError(403) + + obj = bucket_object[urllib.unquote(object_name)] + self.set_header("Content-Type", "application/unknown") + self.set_header("Last-Modified", datetime.datetime.utcfromtimestamp(obj.mtime)) + self.set_header("Etag", '"' + obj.md5 + '"') + self.finish(obj.read()) + + @catch_nova_exceptions + def put(self, bucket_name, object_name): + logging.debug("Putting object: %s / %s" % (bucket_name, object_name)) + bucket_object = bucket.Bucket(bucket_name) + + if not bucket_object.is_authorized(self.user): + raise web.HTTPError(403) + + key = urllib.unquote(object_name) + bucket_object[key] = self.request.body + self.set_header("Etag", '"' + bucket_object[key].md5 + '"') + self.finish() + + @catch_nova_exceptions + def delete(self, bucket_name, object_name): + logging.debug("Deleting object: %s / %s" % (bucket_name, object_name)) + bucket_object = bucket.Bucket(bucket_name) + + if not bucket_object.is_authorized(self.user): + raise web.HTTPError(403) + + del bucket_object[urllib.unquote(object_name)] + self.set_status(204) + self.finish() + + +class ImageHandler(BaseRequestHandler): + SUPPORTED_METHODS = ("POST", "PUT", "GET", "DELETE") + + @catch_nova_exceptions + def get(self): + """ returns a json listing of all images + that a user has permissions to see """ + + images = [i for i in image.Image.all() if i.is_authorized(self.user)] + + self.finish(json.dumps([i.metadata for i in images])) + + @catch_nova_exceptions + def put(self): + """ create a new registered image """ + + image_id = self.get_argument('image_id', u'') + image_location = self.get_argument('image_location', u'') + + image_path = os.path.join(FLAGS.images_path, image_id) + if not image_path.startswith(FLAGS.images_path) or \ + os.path.exists(image_path): + raise web.HTTPError(403) + + bucket_object = bucket.Bucket(image_location.split("/")[0]) + manifest = image_location[len(image_location.split('/')[0])+1:] + + if not bucket_object.is_authorized(self.user): + raise web.HTTPError(403) + + p = multiprocessing.Process(target=image.Image.create,args= + (image_id, image_location, self.user)) + p.start() + self.finish() + + @catch_nova_exceptions + def post(self): + """ update image attributes: public/private """ + + image_id = self.get_argument('image_id', u'') + operation = self.get_argument('operation', u'') + + image_object = image.Image(image_id) + + if image_object.owner_id != self.user.id: + raise web.HTTPError(403) + + image_object.set_public(operation=='add') + + self.finish() + + @catch_nova_exceptions + def delete(self): + """ delete a registered image """ + image_id = self.get_argument("image_id", u"") + image_object = image.Image(image_id) + + if image_object.owner_id != self.user.id: + raise web.HTTPError(403) + + image_object.delete() + + self.set_status(204) diff --git a/nova/objectstore/image.py b/nova/objectstore/image.py new file mode 100644 index 000000000..1878487f7 --- /dev/null +++ b/nova/objectstore/image.py @@ -0,0 +1,177 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Take uploaded bucket contents and register them as disk images (AMIs). +Requires decryption using keys in the manifest. +""" + +# TODO(jesse): Got these from Euca2ools, will need to revisit them + +import binascii +import glob +import json +import os +import shutil +import tarfile +import tempfile +from xml.etree import ElementTree + +from nova import exception +from nova import flags +from nova import utils +from nova.objectstore import bucket + + +FLAGS = flags.FLAGS +flags.DEFINE_string('images_path', utils.abspath('../images'), + 'path to decrypted images') + +class Image(object): + def __init__(self, image_id): + self.image_id = image_id + self.path = os.path.abspath(os.path.join(FLAGS.images_path, image_id)) + if not self.path.startswith(os.path.abspath(FLAGS.images_path)) or \ + not os.path.isdir(self.path): + raise exception.NotFound + + def delete(self): + for fn in ['info.json', 'image']: + try: + os.unlink(os.path.join(self.path, fn)) + except: + pass + try: + os.rmdir(self.path) + except: + pass + + def is_authorized(self, user): + try: + return self.metadata['isPublic'] or self.metadata['imageOwnerId'] == user.id + except: + return False + + def set_public(self, state): + md = self.metadata + md['isPublic'] = state + with open(os.path.join(self.path, 'info.json'), 'w') as f: + json.dump(md, f) + + @staticmethod + def all(): + images = [] + for fn in glob.glob("%s/*/info.json" % FLAGS.images_path): + try: + image_id = fn.split('/')[-2] + images.append(Image(image_id)) + except: + pass + return images + + @property + def owner_id(self): + return self.metadata['imageOwnerId'] + + @property + def metadata(self): + with open(os.path.join(self.path, 'info.json')) as f: + return json.load(f) + + @staticmethod + def create(image_id, image_location, user): + image_path = os.path.join(FLAGS.images_path, image_id) + os.makedirs(image_path) + + bucket_name = image_location.split("/")[0] + manifest_path = image_location[len(bucket_name)+1:] + bucket_object = bucket.Bucket(bucket_name) + + manifest = ElementTree.fromstring(bucket_object[manifest_path].read()) + image_type = 'machine' + + try: + kernel_id = manifest.find("machine_configuration/kernel_id").text + if kernel_id == 'true': + image_type = 'kernel' + except: + pass + + try: + ramdisk_id = manifest.find("machine_configuration/ramdisk_id").text + if ramdisk_id == 'true': + image_type = 'ramdisk' + except: + pass + + info = { + 'imageId': image_id, + 'imageLocation': image_location, + 'imageOwnerId': user.id, + 'isPublic': False, # FIXME: grab public from manifest + 'architecture': 'x86_64', # FIXME: grab architecture from manifest + 'type' : image_type + } + + def write_state(state): + info['imageState'] = state + with open(os.path.join(image_path, 'info.json'), "w") as f: + json.dump(info, f) + + write_state('pending') + + encrypted_filename = os.path.join(image_path, 'image.encrypted') + with open(encrypted_filename, 'w') as f: + for filename in manifest.find("image").getiterator("filename"): + shutil.copyfileobj(bucket_object[filename.text].file, f) + + write_state('decrypting') + + # FIXME: grab kernelId and ramdiskId from bundle manifest + encrypted_key = binascii.a2b_hex(manifest.find("image/ec2_encrypted_key").text) + encrypted_iv = binascii.a2b_hex(manifest.find("image/ec2_encrypted_iv").text) + cloud_private_key = os.path.join(FLAGS.ca_path, "private/cakey.pem") + + decrypted_filename = os.path.join(image_path, 'image.tar.gz') + Image.decrypt_image(encrypted_filename, encrypted_key, encrypted_iv, cloud_private_key, decrypted_filename) + + write_state('untarring') + + image_file = Image.untarzip_image(image_path, decrypted_filename) + shutil.move(os.path.join(image_path, image_file), os.path.join(image_path, 'image')) + + write_state('available') + os.unlink(decrypted_filename) + os.unlink(encrypted_filename) + + @staticmethod + def decrypt_image(encrypted_filename, encrypted_key, encrypted_iv, cloud_private_key, decrypted_filename): + key, err = utils.execute('openssl rsautl -decrypt -inkey %s' % cloud_private_key, encrypted_key) + if err: + raise exception.Error("Failed to decrypt private key: %s" % err) + iv, err = utils.execute('openssl rsautl -decrypt -inkey %s' % cloud_private_key, encrypted_iv) + if err: + raise exception.Error("Failed to decrypt initialization vector: %s" % err) + out, err = utils.execute('openssl enc -d -aes-128-cbc -in %s -K %s -iv %s -out %s' % (encrypted_filename, key, iv, decrypted_filename)) + if err: + raise exception.Error("Failed to decrypt image file %s : %s" % (encrypted_filename, err)) + + @staticmethod + def untarzip_image(path, filename): + tar_file = tarfile.open(filename, "r|gz") + tar_file.extractall(path) + image_file = tar_file.getnames()[0] + tar_file.close() + return image_file diff --git a/nova/objectstore/stored.py b/nova/objectstore/stored.py new file mode 100644 index 000000000..05a7a1102 --- /dev/null +++ b/nova/objectstore/stored.py @@ -0,0 +1,58 @@ +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Properties of an object stored within a bucket. +""" + +from nova.exception import NotFound, NotAuthorized + +import os +import nova.crypto + +class Object(object): + def __init__(self, bucket, key): + """ wrapper class of an existing key """ + self.bucket = bucket + self.key = key + self.path = bucket._object_path(key) + if not os.path.isfile(self.path): + raise NotFound + + def __repr__(self): + return "<Object %s/%s>" % (self.bucket, self.key) + + @property + def md5(self): + """ computes the MD5 of the contents of file """ + with open(self.path, "r") as f: + return nova.crypto.compute_md5(f) + + @property + def mtime(self): + """ mtime of file """ + return os.path.getmtime(self.path) + + def read(self): + """ read all contents of key into memory and return """ + return self.file.read() + + @property + def file(self): + """ return a file object for the key """ + return open(self.path, 'rb') + + def delete(self): + """ deletes the file """ + os.unlink(self.path) diff --git a/nova/process.py b/nova/process.py new file mode 100644 index 000000000..754728fdf --- /dev/null +++ b/nova/process.py @@ -0,0 +1,131 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Process pool, still buggy right now. +""" + +import logging +import multiprocessing + +from nova import vendor +from twisted.internet import defer +from twisted.internet import reactor +from twisted.internet import protocol +from twisted.internet import threads + +# NOTE(termie): this is copied from twisted.internet.utils but since +# they don't export it I've copied. +class _BackRelay(protocol.ProcessProtocol): + """ + Trivial protocol for communicating with a process and turning its output + into the result of a L{Deferred}. + + @ivar deferred: A L{Deferred} which will be called back with all of stdout + and, if C{errortoo} is true, all of stderr as well (mixed together in + one string). If C{errortoo} is false and any bytes are received over + stderr, this will fire with an L{_UnexpectedErrorOutput} instance and + the attribute will be set to C{None}. + + @ivar onProcessEnded: If C{errortoo} is false and bytes are received over + stderr, this attribute will refer to a L{Deferred} which will be called + back when the process ends. This C{Deferred} is also associated with + the L{_UnexpectedErrorOutput} which C{deferred} fires with earlier in + this case so that users can determine when the process has actually + ended, in addition to knowing when bytes have been received via stderr. + """ + + def __init__(self, deferred, errortoo=0): + self.deferred = deferred + self.s = StringIO.StringIO() + if errortoo: + self.errReceived = self.errReceivedIsGood + else: + self.errReceived = self.errReceivedIsBad + + def errReceivedIsBad(self, text): + if self.deferred is not None: + self.onProcessEnded = defer.Deferred() + err = _UnexpectedErrorOutput(text, self.onProcessEnded) + self.deferred.errback(failure.Failure(err)) + self.deferred = None + self.transport.loseConnection() + + def errReceivedIsGood(self, text): + self.s.write(text) + + def outReceived(self, text): + self.s.write(text) + + def processEnded(self, reason): + if self.deferred is not None: + self.deferred.callback(self.s.getvalue()) + elif self.onProcessEnded is not None: + self.onProcessEnded.errback(reason) + + +class BackRelayWithInput(_BackRelay): + def __init__(self, deferred, errortoo=0, input=None): + super(BackRelayWithInput, self).__init__(deferred, errortoo) + self.input = input + + def connectionMade(self): + if self.input: + self.transport.write(self.input) + self.transport.closeStdin() + + +def getProcessOutput(executable, args=None, env=None, path=None, reactor=None, + errortoo=0, input=None): + if reactor is None: + from twisted.internet import reactor + args = args and args or () + env = env and env and {} + d = defer.Deferred() + p = BackRelayWithInput(d, errortoo=errortoo, input=input) + reactor.spawnProcess(p, executable, (executable,)+tuple(args), env, path) + return d + + +class Pool(object): + """ A simple process pool implementation around mutliprocessing. + + Allows up to `size` processes at a time and queues the rest. + + Using workarounds for multiprocessing behavior described in: + http://pypi.python.org/pypi/twisted.internet.processes/1.0b1 + """ + + def __init__(self, size=None): + self._size = size + self._pool = multiprocessing.Pool(size) + self._registerShutdown() + + def _registerShutdown(self): + reactor.addSystemEventTrigger( + 'during', 'shutdown', self.shutdown, reactor) + + def shutdown(self, reactor=None): + if not self._pool: + return + self._pool.close() + # wait for workers to finish + self._pool.terminate() + self._pool = None + + def apply(self, f, *args, **kw): + """ Add a task to the pool and return a deferred. """ + result = self._pool.apply_async(f, args, kw) + return threads.deferToThread(result.get) diff --git a/nova/rpc.py b/nova/rpc.py new file mode 100644 index 000000000..62c6afff3 --- /dev/null +++ b/nova/rpc.py @@ -0,0 +1,222 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +AMQP-based RPC. Queues have consumers and publishers. +No fan-out support yet. +""" + +import logging +import sys +import uuid + +from nova import vendor +import anyjson +from carrot import connection +from carrot import messaging +from twisted.internet import defer +from twisted.internet import reactor +from twisted.internet import task + +from nova import fakerabbit +from nova import flags + + +FLAGS = flags.FLAGS + + +_log = logging.getLogger('amqplib') +_log.setLevel(logging.WARN) + + +class Connection(connection.BrokerConnection): + @classmethod + def instance(cls): + if not hasattr(cls, '_instance'): + params = dict(hostname=FLAGS.rabbit_host, + port=FLAGS.rabbit_port, + userid=FLAGS.rabbit_userid, + password=FLAGS.rabbit_password, + virtual_host=FLAGS.rabbit_virtual_host) + + if FLAGS.fake_rabbit: + params['backend_cls'] = fakerabbit.Backend + + cls._instance = cls(**params) + return cls._instance + + +class Consumer(messaging.Consumer): + # TODO(termie): it would be nice to give these some way of automatically + # cleaning up after themselves + def attach_to_tornado(self, io_inst=None): + from tornado import ioloop + if io_inst is None: + io_inst = ioloop.IOLoop.instance() + + injected = ioloop.PeriodicCallback( + lambda: self.fetch(enable_callbacks=True), 1, io_loop=io_inst) + injected.start() + return injected + + attachToTornado = attach_to_tornado + + def attach_to_twisted(self): + loop = task.LoopingCall(self.fetch, enable_callbacks=True) + loop.start(interval=0.001) + +class Publisher(messaging.Publisher): + pass + + +class TopicConsumer(Consumer): + exchange_type = "topic" + def __init__(self, connection=None, topic="broadcast"): + self.queue = topic + self.routing_key = topic + self.exchange = FLAGS.control_exchange + super(TopicConsumer, self).__init__(connection=connection) + + +class AdapterConsumer(TopicConsumer): + def __init__(self, connection=None, topic="broadcast", proxy=None): + _log.debug('Initing the Adapter Consumer for %s' % (topic)) + self.proxy = proxy + super(AdapterConsumer, self).__init__(connection=connection, topic=topic) + + def receive(self, message_data, message): + _log.debug('received %s' % (message_data)) + msg_id = message_data.pop('_msg_id', None) + + method = message_data.get('method') + args = message_data.get('args', {}) + if not method: + return + + node_func = getattr(self.proxy, str(method)) + node_args = dict((str(k), v) for k, v in args.iteritems()) + d = defer.maybeDeferred(node_func, **node_args) + if msg_id: + d.addCallback(lambda rval: msg_reply(msg_id, rval)) + d.addErrback(lambda e: msg_reply(msg_id, str(e))) + message.ack() + return + + +class TopicPublisher(Publisher): + exchange_type = "topic" + def __init__(self, connection=None, topic="broadcast"): + self.routing_key = topic + self.exchange = FLAGS.control_exchange + super(TopicPublisher, self).__init__(connection=connection) + + +class DirectConsumer(Consumer): + exchange_type = "direct" + def __init__(self, connection=None, msg_id=None): + self.queue = msg_id + self.routing_key = msg_id + self.exchange = msg_id + self.auto_delete = True + super(DirectConsumer, self).__init__(connection=connection) + + +class DirectPublisher(Publisher): + exchange_type = "direct" + def __init__(self, connection=None, msg_id=None): + self.routing_key = msg_id + self.exchange = msg_id + self.auto_delete = True + super(DirectPublisher, self).__init__(connection=connection) + + +def msg_reply(msg_id, reply): + conn = Connection.instance() + publisher = DirectPublisher(connection=conn, msg_id=msg_id) + + try: + publisher.send({'result': reply}) + except TypeError: + publisher.send( + {'result': dict((k, repr(v)) + for k, v in reply.__dict__.iteritems()) + }) + publisher.close() + + +def call(topic, msg): + _log.debug("Making asynchronous call...") + msg_id = uuid.uuid4().hex + msg.update({'_msg_id': msg_id}) + _log.debug("MSG_ID is %s" % (msg_id)) + + conn = Connection.instance() + d = defer.Deferred() + consumer = DirectConsumer(connection=conn, msg_id=msg_id) + consumer.register_callback(lambda data, message: d.callback(data)) + injected = consumer.attach_to_tornado() + + # clean up after the injected listened and return x + d.addCallback(lambda x: injected.stop() and x or x) + + publisher = TopicPublisher(connection=conn, topic=topic) + publisher.send(msg) + publisher.close() + return d + + +def cast(topic, msg): + _log.debug("Making asynchronous cast...") + conn = Connection.instance() + publisher = TopicPublisher(connection=conn, topic=topic) + publisher.send(msg) + publisher.close() + + +def generic_response(message_data, message): + _log.debug('response %s', message_data) + message.ack() + sys.exit(0) + + +def send_message(topic, message, wait=True): + msg_id = uuid.uuid4().hex + message.update({'_msg_id': msg_id}) + _log.debug('topic is %s', topic) + _log.debug('message %s', message) + + if wait: + consumer = messaging.Consumer(connection=rpc.Connection.instance(), + queue=msg_id, + exchange=msg_id, + auto_delete=True, + exchange_type="direct", + routing_key=msg_id) + consumer.register_callback(generic_response) + + publisher = messaging.Publisher(connection=rpc.Connection.instance(), + exchange="nova", + exchange_type="topic", + routing_key=topic) + publisher.send(message) + publisher.close() + + if wait: + consumer.wait() + + +# TODO: Replace with a docstring test +if __name__ == "__main__": + send_message(sys.argv[1], anyjson.deserialize(sys.argv[2])) diff --git a/nova/server.py b/nova/server.py new file mode 100644 index 000000000..227f7fddc --- /dev/null +++ b/nova/server.py @@ -0,0 +1,139 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Base functionality for nova daemons - gradually being replaced with twistd.py. +""" + +import logging +import logging.handlers +import os +import signal +import sys +import time + +from nova import vendor +import daemon +from daemon import pidlockfile + +from nova import flags + + +FLAGS = flags.FLAGS +flags.DEFINE_bool('daemonize', False, 'daemonize this process') +# NOTE(termie): right now I am defaulting to using syslog when we daemonize +# it may be better to do something else -shrug- +# NOTE(Devin): I think we should let each process have its own log file +# and put it in /var/logs/nova/(appname).log +# This makes debugging much easier and cuts down on sys log +# clutter. +flags.DEFINE_bool('use_syslog', True, 'output to syslog when daemonizing') +flags.DEFINE_string('logfile', None, 'log file to output to') +flags.DEFINE_string('pidfile', None, 'pid file to output to') +flags.DEFINE_string('working_directory', './', 'working directory...') + + +def stop(pidfile): + """ + Stop the daemon + """ + # Get the pid from the pidfile + try: + pf = file(pidfile,'r') + pid = int(pf.read().strip()) + pf.close() + except IOError: + pid = None + + if not pid: + message = "pidfile %s does not exist. Daemon not running?\n" + sys.stderr.write(message % pidfile) + return # not an error in a restart + + # Try killing the daemon process + try: + while 1: + os.kill(pid, signal.SIGTERM) + time.sleep(0.1) + except OSError, err: + err = str(err) + if err.find("No such process") > 0: + if os.path.exists(pidfile): + os.remove(pidfile) + else: + print str(err) + sys.exit(1) + + +def serve(name, main): + argv = FLAGS(sys.argv) + + if not FLAGS.pidfile: + FLAGS.pidfile = '%s.pid' % name + + logging.debug("Full set of FLAGS: \n\n\n" ) + for flag in FLAGS: + logging.debug("%s : %s" % (flag, FLAGS.get(flag, None) )) + + action = 'start' + if len(argv) > 1: + action = argv.pop() + + if action == 'stop': + stop(FLAGS.pidfile) + sys.exit() + elif action == 'restart': + stop(FLAGS.pidfile) + elif action == 'start': + pass + else: + print 'usage: %s [options] [start|stop|restart]' % argv[0] + sys.exit(1) + + logging.getLogger('amqplib').setLevel(logging.WARN) + if FLAGS.daemonize: + logger = logging.getLogger() + formatter = logging.Formatter( + name + '(%(name)s): %(levelname)s %(message)s') + if FLAGS.use_syslog and not FLAGS.logfile: + syslog = logging.handlers.SysLogHandler(address='/dev/log') + syslog.setFormatter(formatter) + logger.addHandler(syslog) + else: + if not FLAGS.logfile: + FLAGS.logfile = '%s.log' % name + logfile = logging.handlers.FileHandler(FLAGS.logfile) + logfile.setFormatter(formatter) + logger.addHandler(logfile) + stdin, stdout, stderr = None, None, None + else: + stdin, stdout, stderr = sys.stdin, sys.stdout, sys.stderr + + if FLAGS.verbose: + logging.getLogger().setLevel(logging.DEBUG) + else: + logging.getLogger().setLevel(logging.WARNING) + + with daemon.DaemonContext( + detach_process=FLAGS.daemonize, + working_directory=FLAGS.working_directory, + pidfile=pidlockfile.TimeoutPIDLockFile(FLAGS.pidfile, + acquire_timeout=1, + threaded=False), + stdin=stdin, + stdout=stdout, + stderr=stderr + ): + main(argv) diff --git a/nova/test.py b/nova/test.py new file mode 100644 index 000000000..610ad89aa --- /dev/null +++ b/nova/test.py @@ -0,0 +1,246 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Base classes for our unit tests. +Allows overriding of flags for use of fakes, +and some black magic for inline callbacks. +""" + +import logging +import time +import unittest + +from nova import vendor +import mox +from tornado import ioloop +from twisted.internet import defer +from twisted.python import failure +from twisted.trial import unittest as trial_unittest +import stubout + +from nova import datastore +from nova import fakerabbit +from nova import flags + + +FLAGS = flags.FLAGS +flags.DEFINE_bool('fake_tests', True, + 'should we use everything for testing') + + +def skip_if_fake(f): + def _skipper(*args, **kw): + if FLAGS.fake_tests: + raise trial_unittest.SkipTest('Test cannot be run in fake mode') + else: + return f(*args, **kw) + + _skipper.func_name = f.func_name + return _skipper + + +class TrialTestCase(trial_unittest.TestCase): + def setUp(self): + super(TrialTestCase, self).setUp() + + # emulate some of the mox stuff, we can't use the metaclass + # because it screws with our generators + self.mox = mox.Mox() + self.stubs = stubout.StubOutForTesting() + self.flag_overrides = {} + + def tearDown(self): + super(TrialTestCase, self).tearDown() + self.reset_flags() + self.mox.UnsetStubs() + self.stubs.UnsetAll() + self.stubs.SmartUnsetAll() + self.mox.VerifyAll() + + if FLAGS.fake_rabbit: + fakerabbit.reset_all() + + # attempt to wipe all keepers + #keeper = datastore.Keeper() + #keeper.clear_all() + + def flags(self, **kw): + for k, v in kw.iteritems(): + if k in self.flag_overrides: + self.reset_flags() + raise Exception( + 'trying to override already overriden flag: %s' % k) + self.flag_overrides[k] = getattr(FLAGS, k) + setattr(FLAGS, k, v) + + def reset_flags(self): + for k, v in self.flag_overrides.iteritems(): + setattr(FLAGS, k, v) + + + +class BaseTestCase(TrialTestCase): + def setUp(self): + super(BaseTestCase, self).setUp() + # TODO(termie): we could possibly keep a more global registry of + # the injected listeners... this is fine for now though + self.injected = [] + self.ioloop = ioloop.IOLoop.instance() + + self._waiting = None + self._doneWaiting = False + self._timedOut = False + self.set_up() + + def set_up(self): + pass + + def tear_down(self): + pass + + def tearDown(self): + super(BaseTestCase, self).tearDown() + for x in self.injected: + x.stop() + if FLAGS.fake_rabbit: + fakerabbit.reset_all() + self.tear_down() + + def _waitForTest(self, timeout=60): + """ Push the ioloop along to wait for our test to complete. """ + self._waiting = self.ioloop.add_timeout(time.time() + timeout, + self._timeout) + def _wait(): + if self._timedOut: + self.fail('test timed out') + self._done() + if self._doneWaiting: + self.ioloop.stop() + return + # we can use add_callback here but this uses less cpu when testing + self.ioloop.add_timeout(time.time() + 0.01, _wait) + + self.ioloop.add_callback(_wait) + self.ioloop.start() + + def _done(self): + if self._waiting: + try: + self.ioloop.remove_timeout(self._waiting) + except Exception: + pass + self._waiting = None + self._doneWaiting = True + + def _maybeInlineCallbacks(self, f): + """ If we're doing async calls in our tests, wait on them. + + This is probably the most complicated hunk of code we have so far. + + First up, if the function is normal (not async) we just act normal + and return. + + Async tests will use the "Inline Callbacks" pattern, which means + you yield Deferreds at every "waiting" step of your code instead + of making epic callback chains. + + Example (callback chain, ugly): + + d = self.node.terminate_instance(instance_id) # a Deferred instance + def _describe(_): + d_desc = self.node.describe_instances() # another Deferred instance + return d_desc + def _checkDescribe(rv): + self.assertEqual(rv, []) + d.addCallback(_describe) + d.addCallback(_checkDescribe) + d.addCallback(lambda x: self._done()) + self._waitForTest() + + Example (inline callbacks! yay!): + + yield self.node.terminate_instance(instance_id) + rv = yield self.node.describe_instances() + self.assertEqual(rv, []) + + If the test fits the Inline Callbacks pattern we will automatically + handle calling wait and done. + """ + # TODO(termie): this can be a wrapper function instead and + # and we can make a metaclass so that we don't + # have to copy all that "run" code below. + g = f() + if not hasattr(g, 'send'): + self._done() + return defer.succeed(g) + + inlined = defer.inlineCallbacks(f) + d = inlined() + return d + + def _catchExceptions(self, result, failure): + exc = (failure.type, failure.value, failure.getTracebackObject()) + if isinstance(failure.value, self.failureException): + result.addFailure(self, exc) + elif isinstance(failure.value, KeyboardInterrupt): + raise + else: + result.addError(self, exc) + + self._done() + + def _timeout(self): + self._waiting = False + self._timedOut = True + + def run(self, result=None): + if result is None: result = self.defaultTestResult() + + result.startTest(self) + testMethod = getattr(self, self._testMethodName) + try: + try: + self.setUp() + except KeyboardInterrupt: + raise + except: + result.addError(self, self._exc_info()) + return + + ok = False + try: + d = self._maybeInlineCallbacks(testMethod) + d.addErrback(lambda x: self._catchExceptions(result, x)) + d.addBoth(lambda x: self._done() and x) + self._waitForTest() + ok = True + except self.failureException: + result.addFailure(self, self._exc_info()) + except KeyboardInterrupt: + raise + except: + result.addError(self, self._exc_info()) + + try: + self.tearDown() + except KeyboardInterrupt: + raise + except: + result.addError(self, self._exc_info()) + ok = False + if ok: result.addSuccess(self) + finally: + result.stopTest(self) diff --git a/nova/tests/CA/cacert.pem b/nova/tests/CA/cacert.pem new file mode 100644 index 000000000..9ffb5bb80 --- /dev/null +++ b/nova/tests/CA/cacert.pem @@ -0,0 +1,17 @@ +-----BEGIN CERTIFICATE----- +MIICyzCCAjSgAwIBAgIJANiqHZUcbScCMA0GCSqGSIb3DQEBBAUAME4xEjAQBgNV +BAoTCU5PVkEgUk9PVDEWMBQGA1UEBxMNTW91bnRhaW4gVmlldzETMBEGA1UECBMK +Q2FsaWZvcm5pYTELMAkGA1UEBhMCVVMwHhcNMTAwNTI4MDExOTI1WhcNMTEwNTI4 +MDExOTI1WjBOMRIwEAYDVQQKEwlOT1ZBIFJPT1QxFjAUBgNVBAcTDU1vdW50YWlu +IFZpZXcxEzARBgNVBAgTCkNhbGlmb3JuaWExCzAJBgNVBAYTAlVTMIGfMA0GCSqG +SIb3DQEBAQUAA4GNADCBiQKBgQDobUnq8rpXA/HQZ2Uu9Me3SlqCayz3ws2wtvFQ +koWPUzpriIYPkpprz2EaVu07Zb9uJHvjcoY07nYntl4jR8S7PH4XZhlVFn8AQWzs +iThU4KJF71UfVM00dDrarSgVpyOIcFXO3iUvLoJj7+RUPjrWdLuJoMqnhicgLeHZ +LAZ8ewIDAQABo4GwMIGtMAwGA1UdEwQFMAMBAf8wHQYDVR0OBBYEFMh1RMlTVtt8 +EdESYpsTU08r0FnpMH4GA1UdIwR3MHWAFMh1RMlTVtt8EdESYpsTU08r0FnpoVKk +UDBOMRIwEAYDVQQKEwlOT1ZBIFJPT1QxFjAUBgNVBAcTDU1vdW50YWluIFZpZXcx +EzARBgNVBAgTCkNhbGlmb3JuaWExCzAJBgNVBAYTAlVTggkA2KodlRxtJwIwDQYJ +KoZIhvcNAQEEBQADgYEAq+YCgflK36HCdodNu2ya3O6UDRUE2dW8n96tAOmvHqmR +v38k8GIW0pjWDo+lZYnFmeJYd+QGcJl9fLzXxffV5k+rNCfr/gEYtznWLNUX7AZB +b/VC7L+yK9qz08C8n51TslXaf3fUGkfkQxsvEP7+hi0qavdd/8eTbdheWahYwWg= +-----END CERTIFICATE----- diff --git a/nova/tests/CA/private/cakey.pem b/nova/tests/CA/private/cakey.pem new file mode 100644 index 000000000..eee54cc38 --- /dev/null +++ b/nova/tests/CA/private/cakey.pem @@ -0,0 +1,15 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICXQIBAAKBgQDobUnq8rpXA/HQZ2Uu9Me3SlqCayz3ws2wtvFQkoWPUzpriIYP +kpprz2EaVu07Zb9uJHvjcoY07nYntl4jR8S7PH4XZhlVFn8AQWzsiThU4KJF71Uf +VM00dDrarSgVpyOIcFXO3iUvLoJj7+RUPjrWdLuJoMqnhicgLeHZLAZ8ewIDAQAB +AoGBANQonmZ2Nh2jniFrn/LiwULP/ho6Fov6J6N8+n1focaYZCUwM58XZRmv7KUM +X/PuBnVVnDibm2HJodTSJM/zfODnGO15kdmJ9X23FkkdTyuvphO5tYF0ONARXdfX +9LbPcLYA14VSCZCKCye6mbv/xi0C/s7q6ZBoMl7XaeD9hgUxAkEA9lxQY/ZxcLV0 +Ae5I2spBbtuXEGns11YnKnppc59RrAono1gaDeYY2WZRwztIcD6VtUv7qkzH6ubo +shAG4fvnPQJBAPGFaDODs2ckPvxnILEbjpnZXGQqDCpQ3sVJ6nfu+qdAWS92ESNo +Y6DC8zFjFaQFbKy6Jxr1VsvYDXhF8cmy7hcCQHkLElSLGWGPRdhNA268QTn+mlJu +OPf0VHoCex1cAfzNYHxZJTP/AeaO501NK2I63cOd+aDK6M75dQtH5JnT8uECQQCg +jVydkhk6oV+1jiCvW3BKWbIPa9w2bRgJ8n8JRzYc5Kvk3wm5jfVcsvvTgtip9mkt +0XmZdCpEy9T4dRasTGP1AkBMhShiVP7+P+SIQlZtSn8ckTt9G6cefEjxsv0kVFZe +SjkUO0ZifahF8r3Q1eEUSzdXEvicEwONvcpc7MLwfSD7 +-----END RSA PRIVATE KEY----- diff --git a/nova/tests/__init__.py b/nova/tests/__init__.py new file mode 100644 index 000000000..a4ccbbaeb --- /dev/null +++ b/nova/tests/__init__.py @@ -0,0 +1,27 @@ +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +:mod:`nova.tests` -- Nova Unittests +===================================================== + +.. automodule:: nova.tests + :platform: Unix +.. moduleauthor:: Jesse Andrews <jesse@ansolabs.com> +.. moduleauthor:: Devin Carlen <devin.carlen@gmail.com> +.. moduleauthor:: Vishvananda Ishaya <vishvananda@yahoo.com> +.. moduleauthor:: Joshua McKenty <joshua@cognition.ca> +.. moduleauthor:: Manish Singh <yosh@gimp.org> +.. moduleauthor:: Andy Smith <andy@anarkystic.com> +"""
\ No newline at end of file diff --git a/nova/tests/access_unittest.py b/nova/tests/access_unittest.py new file mode 100644 index 000000000..ab0759c2d --- /dev/null +++ b/nova/tests/access_unittest.py @@ -0,0 +1,60 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import unittest + +from nova import flags +from nova import test +from nova.auth import users +from nova.endpoint import cloud + +FLAGS = flags.FLAGS + +class AccessTestCase(test.BaseTestCase): + def setUp(self): + FLAGS.fake_libvirt = True + FLAGS.fake_storage = True + self.users = users.UserManager.instance() + super(AccessTestCase, self).setUp() + # Make a test project + # Make a test user + self.users.create_user('test1', 'access', 'secret') + + # Make the test user a member of the project + + def tearDown(self): + # Delete the test user + # Delete the test project + self.users.delete_user('test1') + pass + + def test_001_basic_user_access(self): + user = self.users.get_user('test1') + # instance-foo, should be using object and not owner_id + instance_id = "i-12345678" + self.assertTrue(user.is_authorized(instance_id, action="describe_instances")) + + def test_002_sysadmin_access(self): + user = self.users.get_user('test1') + bucket = "foo/bar/image" + self.assertFalse(user.is_authorized(bucket, action="register")) + self.users.add_role(user, "sysadmin") + + +if __name__ == "__main__": + # TODO: Implement use_fake as an option + unittest.main() diff --git a/nova/tests/api_integration.py b/nova/tests/api_integration.py new file mode 100644 index 000000000..d2e1026b8 --- /dev/null +++ b/nova/tests/api_integration.py @@ -0,0 +1,50 @@ +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import boto +from boto.ec2.regioninfo import RegionInfo + +ACCESS_KEY = 'fake' +SECRET_KEY = 'fake' +CLC_IP = '127.0.0.1' +CLC_PORT = 8773 +REGION = 'test' + +def get_connection(): + return boto.connect_ec2 ( + aws_access_key_id=ACCESS_KEY, + aws_secret_access_key=SECRET_KEY, + is_secure=False, + region=RegionInfo(None, REGION, CLC_IP), + port=CLC_PORT, + path='/services/Cloud', + debug=99 + ) + +class APIIntegrationTests(unittest.TestCase): + def test_001_get_all_images(self): + conn = get_connection() + res = conn.get_all_images() + print res + + +if __name__ == '__main__': + unittest.main() + +#print conn.get_all_key_pairs() +#print conn.create_key_pair +#print conn.create_security_group('name', 'description') + diff --git a/nova/tests/api_unittest.py b/nova/tests/api_unittest.py new file mode 100644 index 000000000..fdbf088f9 --- /dev/null +++ b/nova/tests/api_unittest.py @@ -0,0 +1,189 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import httplib +import random +import StringIO + +from nova import vendor +import boto +from boto.ec2 import regioninfo +from tornado import httpserver +from twisted.internet import defer + +from nova import flags +from nova import test +from nova.auth import users +from nova.endpoint import api +from nova.endpoint import cloud + + +FLAGS = flags.FLAGS + + +# NOTE(termie): These are a bunch of helper methods and classes to short +# circuit boto calls and feed them into our tornado handlers, +# it's pretty damn circuitous so apologies if you have to fix +# a bug in it +def boto_to_tornado(method, path, headers, data, host, connection=None): + """ translate boto requests into tornado requests + + connection should be a FakeTornadoHttpConnection instance + """ + headers = httpserver.HTTPHeaders() + for k, v in headers.iteritems(): + headers[k] = v + + req = httpserver.HTTPRequest(method=method, + uri=path, + headers=headers, + body=data, + host=host, + remote_ip='127.0.0.1', + connection=connection) + return req + + +def raw_to_httpresponse(s): + """ translate a raw tornado http response into an httplib.HTTPResponse """ + sock = FakeHttplibSocket(s) + resp = httplib.HTTPResponse(sock) + resp.begin() + return resp + + +class FakeHttplibSocket(object): + """ a fake socket implementation for httplib.HTTPResponse, trivial """ + def __init__(self, s): + self.fp = StringIO.StringIO(s) + + def makefile(self, mode, other): + return self.fp + + +class FakeTornadoStream(object): + """ a fake stream to satisfy tornado's assumptions, trivial """ + def set_close_callback(self, f): + pass + + +class FakeTornadoConnection(object): + """ a fake connection object for tornado to pass to its handlers + + web requests are expected to write to this as they get data and call + finish when they are done with the request, we buffer the writes and + kick off a callback when it is done so that we can feed the result back + into boto. + """ + def __init__(self, d): + self.d = d + self._buffer = StringIO.StringIO() + + def write(self, chunk): + self._buffer.write(chunk) + + def finish(self): + s = self._buffer.getvalue() + self.d.callback(s) + + xheaders = None + + @property + def stream(self): + return FakeTornadoStream() + + +class FakeHttplibConnection(object): + """ a fake httplib.HTTPConnection for boto to use + + requests made via this connection actually get translated and routed into + our tornado app, we then wait for the response and turn it back into + the httplib.HTTPResponse that boto expects. + """ + def __init__(self, app, host, is_secure=False): + self.app = app + self.host = host + self.deferred = defer.Deferred() + + def request(self, method, path, data, headers): + req = boto_to_tornado + conn = FakeTornadoConnection(self.deferred) + request = boto_to_tornado(connection=conn, + method=method, + path=path, + headers=headers, + data=data, + host=self.host) + handler = self.app(request) + self.deferred.addCallback(raw_to_httpresponse) + + def getresponse(self): + @defer.inlineCallbacks + def _waiter(): + result = yield self.deferred + defer.returnValue(result) + d = _waiter() + # NOTE(termie): defer.returnValue above should ensure that + # this deferred has already been called by the time + # we get here, we are going to cheat and return + # the result of the callback + return d.result + + def close(self): + pass + + +class ApiEc2TestCase(test.BaseTestCase): + def setUp(self): + super(ApiEc2TestCase, self).setUp() + + self.users = users.UserManager.instance() + self.cloud = cloud.CloudController() + + self.host = '127.0.0.1' + + self.app = api.APIServerApplication(self.users, {'Cloud': self.cloud}) + self.ec2 = boto.connect_ec2( + aws_access_key_id='fake', + aws_secret_access_key='fake', + is_secure=False, + region=regioninfo.RegionInfo(None, 'test', self.host), + port=FLAGS.cc_port, + path='/services/Cloud') + + self.mox.StubOutWithMock(self.ec2, 'new_http_connection') + + def expect_http(self, host=None, is_secure=False): + http = FakeHttplibConnection( + self.app, '%s:%d' % (self.host, FLAGS.cc_port), False) + self.ec2.new_http_connection(host, is_secure).AndReturn(http) + return http + + def test_describe_instances(self): + self.expect_http() + self.mox.ReplayAll() + + self.assertEqual(self.ec2.get_all_instances(), []) + + + def test_get_all_key_pairs(self): + self.expect_http() + self.mox.ReplayAll() + keyname = "".join(random.choice("sdiuisudfsdcnpaqwertasd") for x in range(random.randint(4, 8))) + self.users.generate_key_pair('fake', keyname) + + rv = self.ec2.get_all_key_pairs() + self.assertTrue(filter(lambda k: k.name == keyname, rv)) + diff --git a/nova/tests/bundle/1mb.manifest.xml b/nova/tests/bundle/1mb.manifest.xml new file mode 100644 index 000000000..dc3315957 --- /dev/null +++ b/nova/tests/bundle/1mb.manifest.xml @@ -0,0 +1 @@ +<?xml version="1.0" ?><manifest><version>2007-10-10</version><bundler><name>euca-tools</name><version>1.2</version><release>31337</release></bundler><machine_configuration><architecture>x86_64</architecture></machine_configuration><image><name>1mb</name><user>42</user><type>machine</type><digest algorithm="SHA1">da39a3ee5e6b4b0d3255bfef95601890afd80709</digest><size>1048576</size><bundled_size>1136</bundled_size><ec2_encrypted_key algorithm="AES-128-CBC">33a2ea00dc64083dd9a10eb5e233635b42a7beb1670ab75452087d9de74c60aba1cd27c136fda56f62beb581de128fb1f10d072b9e556fd25e903107a57827c21f6ee8a93a4ff55b11311fcef217e3eefb07e81f71e88216f43b4b54029c1f2549f2925a839a73947d2d5aeecec4a62ece4af9156d557ae907978298296d9915</ec2_encrypted_key><user_encrypted_key algorithm="AES-128-CBC">4c11147fd8caf92447e90ce339928933d7579244c2f8ffb07cc0ea35f8738da8b90eff6c7a49671a84500e993e9462e4c36d5c19c0b3a2b397d035b4c0cce742b58e12552175d81d129b0425e9f71ebacb9aeb539fa9dd2ac36749fb82876f6902e5fb24b6ec19f35ec4c20acd50437fd30966e99c4d9a0647577970a8fa3023</user_encrypted_key><ec2_encrypted_iv>14bd082c9715f071160c69bbfb070f51d2ba1076775f1d988ccde150e515088156b248e4b5a64e46c4fe064feeeedfe14511f7fde478a51acb89f9b2f6c84b60593e5c3f792ba6b01fed9bf2158fdac03086374883b39d13a3ca74497eeaaf579fc3f26effc73bfd9446a2a8c4061f0874bfaca058905180e22d3d8881551cb3</ec2_encrypted_iv><user_encrypted_iv>8f7606f19f00e4e19535dd234b66b31b77e9c7bad3885d9c9efa75c863631fd4f82a009e17d789066d9cc6032a436f05384832f6d9a3283d3e63eab04fa0da5c8c87db9b17e854e842c3fb416507d067a266b44538125ce732e486098e8ebd1ca91fa3079f007fce7d14957a9b7e57282407ead3c6eb68fe975df3d83190021b</user_encrypted_iv><parts count="2"><part index="0"><filename>1mb.part.0</filename><digest algorithm="SHA1">c4413423cf7a57e71187e19bfd5cd4b514a64283</digest></part><part index="1"><filename>1mb.part.1</filename><digest algorithm="SHA1">9d4262e6589393d09a11a0332af169887bc2e57d</digest></part></parts></image><signature>4e00b5ba28114dda4a9df7eeae94be847ec46117a09a1cbe41e578660642f0660dda1776b39fb3bf826b6cfec019e2a5e9c566728d186b7400ebc989a30670eb1db26ce01e68bd9d3f31290370077a85b81c66b63c1e0d5499bac115c06c17a21a81b6d3a67ebbce6c17019095af7ab07f3796c708cc843e58efc12ddc788c5e</signature></manifest>
\ No newline at end of file diff --git a/nova/tests/bundle/1mb.part.0 b/nova/tests/bundle/1mb.part.0 Binary files differnew file mode 100644 index 000000000..15a1657c5 --- /dev/null +++ b/nova/tests/bundle/1mb.part.0 diff --git a/nova/tests/bundle/1mb.part.1 b/nova/tests/bundle/1mb.part.1 new file mode 100644 index 000000000..2f0406e2d --- /dev/null +++ b/nova/tests/bundle/1mb.part.1 @@ -0,0 +1 @@ +´ˆà«€ç‰°Æ³
¡ÀiDHW̽×JÈ8ïrV¼³h§X’·@Yj“~Ø·Gû5û 3Nt«˜•H6Ñ$§Ëgö™é Lá¢+³æ¤X†pm¬@,øŽ>7ÚÊ×užp¼ aü`¥V2X@£#á¶
\ No newline at end of file diff --git a/nova/tests/cloud_unittest.py b/nova/tests/cloud_unittest.py new file mode 100644 index 000000000..568a8dcd3 --- /dev/null +++ b/nova/tests/cloud_unittest.py @@ -0,0 +1,161 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import StringIO +import time +import unittest +from xml.etree import ElementTree + +from nova import vendor +import mox +from tornado import ioloop +from twisted.internet import defer + +from nova import flags +from nova import rpc +from nova import test +from nova.auth import users +from nova.compute import node +from nova.endpoint import api +from nova.endpoint import cloud + + +FLAGS = flags.FLAGS + + +class CloudTestCase(test.BaseTestCase): + def setUp(self): + super(CloudTestCase, self).setUp() + self.flags(fake_libvirt=True, + fake_storage=True, + fake_users=True, + redis_db=8) + + self.conn = rpc.Connection.instance() + logging.getLogger().setLevel(logging.DEBUG) + + # set up our cloud + self.cloud = cloud.CloudController() + self.cloud_consumer = rpc.AdapterConsumer(connection=self.conn, + topic=FLAGS.cloud_topic, + proxy=self.cloud) + self.injected.append(self.cloud_consumer.attach_to_tornado(self.ioloop)) + + # set up a node + self.node = node.Node() + self.node_consumer = rpc.AdapterConsumer(connection=self.conn, + topic=FLAGS.compute_topic, + proxy=self.node) + self.injected.append(self.node_consumer.attach_to_tornado(self.ioloop)) + + user_mocker = mox.Mox() + self.admin = user_mocker.CreateMock(users.User) + self.admin.is_authorized(mox.IgnoreArg()).AndReturn(True) + self.context = api.APIRequestContext(handler=None,user=self.admin) + + def test_console_output(self): + if FLAGS.fake_libvirt: + logging.debug("Can't test instances without a real virtual env.") + return + instance_id = 'foo' + inst = yield self.node.run_instance(instance_id) + output = yield self.cloud.get_console_output(self.context, [instance_id]) + logging.debug(output) + self.assert_(output) + rv = yield self.node.terminate_instance(instance_id) + + def test_run_instances(self): + if FLAGS.fake_libvirt: + logging.debug("Can't test instances without a real virtual env.") + return + image_id = FLAGS.default_image + instance_type = FLAGS.default_instance_type + max_count = 1 + kwargs = {'image_id': image_id, + 'instance_type': instance_type, + 'max_count': max_count} + rv = yield self.cloud.run_instances(self.context, **kwargs) + # TODO: check for proper response + instance = rv['reservationSet'][0][rv['reservationSet'][0].keys()[0]][0] + logging.debug("Need to watch instance %s until it's running..." % instance['instance_id']) + while True: + rv = yield defer.succeed(time.sleep(1)) + info = self.cloud._get_instance(instance['instance_id']) + logging.debug(info['state']) + if info['state'] == node.Instance.RUNNING: + break + self.assert_(rv) + + if not FLAGS.fake_libvirt: + time.sleep(45) # Should use boto for polling here + for reservations in rv['reservationSet']: + # for res_id in reservations.keys(): + # logging.debug(reservations[res_id]) + # for instance in reservations[res_id]: + for instance in reservations[reservations.keys()[0]]: + logging.debug("Terminating instance %s" % instance['instance_id']) + rv = yield self.node.terminate_instance(instance['instance_id']) + + def test_instance_update_state(self): + def instance(num): + return { + 'reservation_id': 'r-1', + 'instance_id': 'i-%s' % num, + 'image_id': 'ami-%s' % num, + 'private_dns_name': '10.0.0.%s' % num, + 'dns_name': '10.0.0%s' % num, + 'ami_launch_index': str(num), + 'instance_type': 'fake', + 'availability_zone': 'fake', + 'key_name': None, + 'kernel_id': 'fake', + 'ramdisk_id': 'fake', + 'groups': ['default'], + 'product_codes': None, + 'state': 0x01, + 'user_data': '' + } + + rv = self.cloud.format_instances(self.admin) + print rv + self.assert_(len(rv['reservationSet']) == 0) + + # simulate launch of 5 instances + # self.cloud.instances['pending'] = {} + #for i in xrange(5): + # inst = instance(i) + # self.cloud.instances['pending'][inst['instance_id']] = inst + + #rv = self.cloud.format_instances(self.admin) + #self.assert_(len(rv['reservationSet']) == 1) + #self.assert_(len(rv['reservationSet'][0]['instances_set']) == 5) + + # report 4 nodes each having 1 of the instances + #for i in xrange(4): + # self.cloud.update_state('instances', {('node-%s' % i): {('i-%s' % i): instance(i)}}) + + # one instance should be pending still + #self.assert_(len(self.cloud.instances['pending'].keys()) == 1) + + # check that the reservations collapse + #rv = self.cloud.format_instances(self.admin) + #self.assert_(len(rv['reservationSet']) == 1) + #self.assert_(len(rv['reservationSet'][0]['instances_set']) == 5) + + # check that we can get metadata for each instance + #for i in xrange(4): + # data = self.cloud.get_metadata(instance(i)['private_dns_name']) + # self.assert_(data['meta-data']['ami-id'] == 'ami-%s' % i) diff --git a/nova/tests/datastore_unittest.py b/nova/tests/datastore_unittest.py new file mode 100644 index 000000000..4e4d8586a --- /dev/null +++ b/nova/tests/datastore_unittest.py @@ -0,0 +1,60 @@ +from nova import test +from nova import datastore +import random + +class KeeperTestCase(test.BaseTestCase): + """ + Basic persistence tests for Keeper datastore. + Generalize, then use these to support + migration to redis / cassandra / multiple stores. + """ + + def __init__(self, *args, **kwargs): + """ + Create a new keeper instance for test keys. + """ + super(KeeperTestCase, self).__init__(*args, **kwargs) + self.keeper = datastore.Keeper('test-') + + def tear_down(self): + """ + Scrub out test keeper data. + """ + pass + + def test_store_strings(self): + """ + Confirm that simple strings go in and come out safely. + Should also test unicode strings. + """ + randomstring = ''.join( + [random.choice('ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890-') + for _x in xrange(20)] + ) + self.keeper['test_string'] = randomstring + self.assertEqual(randomstring, self.keeper['test_string']) + + def test_store_dicts(self): + """ + Arbitrary dictionaries should be storable. + """ + test_dict = {'key_one': 'value_one'} + self.keeper['test_dict'] = test_dict + self.assertEqual(test_dict['key_one'], + self.keeper['test_dict']['key_one']) + + def test_sets(self): + """ + A keeper dict should be self-serializing. + """ + self.keeper.set_add('test_set', 'foo') + test_dict = {'arbitrary': 'dict of stuff'} + self.keeper.set_add('test_set', test_dict) + self.assertTrue(self.keeper.set_is_member('test_set', 'foo')) + self.assertFalse(self.keeper.set_is_member('test_set', 'bar')) + self.keeper.set_remove('test_set', 'foo') + self.assertFalse(self.keeper.set_is_member('test_set', 'foo')) + rv = self.keeper.set_fetch('test_set') + self.assertEqual(test_dict, rv.next()) + self.keeper.set_remove('test_set', test_dict) + diff --git a/nova/tests/fake_flags.py b/nova/tests/fake_flags.py new file mode 100644 index 000000000..3c7b0be52 --- /dev/null +++ b/nova/tests/fake_flags.py @@ -0,0 +1,26 @@ +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nova import flags + +FLAGS = flags.FLAGS + +FLAGS.fake_libvirt = True +FLAGS.fake_storage = True +FLAGS.fake_rabbit = True +FLAGS.fake_network = True +FLAGS.fake_users = True +FLAGS.keeper_backend = 'sqlite' +FLAGS.datastore_path = ':memory:' +FLAGS.verbose = True diff --git a/nova/tests/future_unittest.py b/nova/tests/future_unittest.py new file mode 100644 index 000000000..81d69dfff --- /dev/null +++ b/nova/tests/future_unittest.py @@ -0,0 +1,74 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import StringIO +import time +import unittest +from xml.etree import ElementTree + +from nova import vendor +import mox +from tornado import ioloop +from twisted.internet import defer + +from nova import cloud +from nova import exception +from nova import flags +from nova import node +from nova import rpc +from nova import test + + +FLAGS = flags.FLAGS + + +class AdminTestCase(test.BaseTestCase): + def setUp(self): + super(AdminTestCase, self).setUp() + self.flags(fake_libvirt=True, + fake_rabbit=True) + + self.conn = rpc.Connection.instance() + + logging.getLogger().setLevel(logging.INFO) + + # set up our cloud + self.cloud = cloud.CloudController() + self.cloud_consumer = rpc.AdapterConsumer(connection=self.conn, + topic=FLAGS.cloud_topic, + proxy=self.cloud) + self.injected.append(self.cloud_consumer.attach_to_tornado(self.ioloop)) + + # set up a node + self.node = node.Node() + self.node_consumer = rpc.AdapterConsumer(connection=self.conn, + topic=FLAGS.compute_topic, + proxy=self.node) + self.injected.append(self.node_consumer.attach_to_tornado(self.ioloop)) + + def test_flush_terminated(self): + # Launch an instance + + # Wait until it's running + + # Terminate it + + # Wait until it's terminated + + # Flush terminated nodes + + # ASSERT that it's gone + pass diff --git a/nova/tests/keeper_unittest.py b/nova/tests/keeper_unittest.py new file mode 100644 index 000000000..3896c9e57 --- /dev/null +++ b/nova/tests/keeper_unittest.py @@ -0,0 +1,57 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +import random + +from nova import datastore +from nova import test + +class KeeperTestCase(test.TrialTestCase): + """ + Basic persistence tests for Keeper datastore. + Generalize, then use these to support + migration to redis / cassandra / multiple stores. + """ + + def setUp(self): + super(KeeperTestCase, self).setUp() + self.keeper = datastore.Keeper('test') + + def tearDown(self): + super(KeeperTestCase, self).tearDown() + self.keeper.clear() + + def test_store_strings(self): + """ + Confirm that simple strings go in and come out safely. + Should also test unicode strings. + """ + randomstring = ''.join( + [random.choice('ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890-') + for _x in xrange(20)] + ) + self.keeper['test_string'] = randomstring + self.assertEqual(randomstring, self.keeper['test_string']) + + def test_store_dicts(self): + """ + Arbitrary dictionaries should be storable. + """ + test_dict = {'key_one': 'value_one'} + self.keeper['test_dict'] = test_dict + self.assertEqual(test_dict['key_one'], + self.keeper['test_dict']['key_one']) + + def test_sets(self): + """ + A keeper dict should be self-serializing. + """ + self.keeper.set_add('test_set', 'foo') + test_dict = {'arbitrary': 'dict of stuff'} + self.keeper.set_add('test_set', test_dict) + self.assertTrue(self.keeper.set_is_member('test_set', 'foo')) + self.assertFalse(self.keeper.set_is_member('test_set', 'bar')) + self.keeper.set_remove('test_set', 'foo') + self.assertFalse(self.keeper.set_is_member('test_set', 'foo')) + rv = self.keeper.set_fetch('test_set') + self.assertEqual(test_dict, rv.next()) + self.keeper.set_remove('test_set', test_dict) + diff --git a/nova/tests/network_unittest.py b/nova/tests/network_unittest.py new file mode 100644 index 000000000..43c7831a7 --- /dev/null +++ b/nova/tests/network_unittest.py @@ -0,0 +1,113 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import unittest + +from nova import vendor +import IPy + +from nova import flags +from nova import test +from nova.compute import network +from nova.auth import users + + +class NetworkTestCase(test.TrialTestCase): + def setUp(self): + super(NetworkTestCase, self).setUp() + logging.getLogger().setLevel(logging.DEBUG) + self.manager = users.UserManager.instance() + for i in range(0, 6): + name = 'user%s' % i + if not self.manager.get_user(name): + self.manager.create_user(name, name, name) + self.network = network.NetworkController(netsize=16) + + def tearDown(self): + super(NetworkTestCase, self).tearDown() + for i in range(0, 6): + name = 'user%s' % i + self.manager.delete_user(name) + + def test_network_serialization(self): + net1 = network.Network(vlan=100, network="192.168.100.0/24", conn=None) + address = net1.allocate_ip("user0", "01:24:55:36:f2:a0") + net_json = str(net1) + net2 = network.Network.from_json(net_json) + self.assertEqual(net_json, str(net2)) + self.assertTrue(IPy.IP(address) in net2.network) + + def test_allocate_deallocate_address(self): + for flag in flags.FLAGS: + print "%s=%s" % (flag, flags.FLAGS.get(flag, None)) + (address, net_name) = self.network.allocate_address( + "user0", "01:24:55:36:f2:a0") + logging.debug("Was allocated %s" % (address)) + self.assertEqual(True, address in self._get_user_addresses("user0")) + rv = self.network.deallocate_address(address) + self.assertEqual(False, address in self._get_user_addresses("user0")) + + def test_range_allocation(self): + (address, net_name) = self.network.allocate_address( + "user0", "01:24:55:36:f2:a0") + (secondaddress, net_name) = self.network.allocate_address( + "user1", "01:24:55:36:f2:a0") + self.assertEqual(True, address in self._get_user_addresses("user0")) + self.assertEqual(True, + secondaddress in self._get_user_addresses("user1")) + self.assertEqual(False, address in self._get_user_addresses("user1")) + rv = self.network.deallocate_address(address) + self.assertEqual(False, address in self._get_user_addresses("user0")) + rv = self.network.deallocate_address(secondaddress) + self.assertEqual(False, + secondaddress in self._get_user_addresses("user1")) + + def test_subnet_edge(self): + (secondaddress, net_name) = self.network.allocate_address("user0") + for user in range(1,5): + user_id = "user%s" % (user) + (address, net_name) = self.network.allocate_address( + user_id, "01:24:55:36:f2:a0") + (address2, net_name) = self.network.allocate_address( + user_id, "01:24:55:36:f2:a0") + (address3, net_name) = self.network.allocate_address( + user_id, "01:24:55:36:f2:a0") + self.assertEqual(False, + address in self._get_user_addresses("user0")) + self.assertEqual(False, + address2 in self._get_user_addresses("user0")) + self.assertEqual(False, + address3 in self._get_user_addresses("user0")) + rv = self.network.deallocate_address(address) + rv = self.network.deallocate_address(address2) + rv = self.network.deallocate_address(address3) + rv = self.network.deallocate_address(secondaddress) + + def test_too_many_users(self): + for i in range(0, 30): + name = 'toomany-user%s' % i + self.manager.create_user(name, name, name) + (address, net_name) = self.network.allocate_address( + name, "01:24:55:36:f2:a0") + self.manager.delete_user(name) + + def _get_user_addresses(self, user_id): + rv = self.network.describe_addresses() + user_addresses = [] + for item in rv: + if item['user_id'] == user_id: + user_addresses.append(item['address']) + return user_addresses diff --git a/nova/tests/node_unittest.py b/nova/tests/node_unittest.py new file mode 100644 index 000000000..7a6115fcc --- /dev/null +++ b/nova/tests/node_unittest.py @@ -0,0 +1,128 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import StringIO +import time +import unittest +from xml.etree import ElementTree + +from nova import vendor +import mox +from tornado import ioloop +from twisted.internet import defer + +from nova import exception +from nova import flags +from nova import test +from nova import utils +from nova.compute import model +from nova.compute import node + +FLAGS = flags.FLAGS + + +class InstanceXmlTestCase(test.TrialTestCase): + # @defer.inlineCallbacks + def test_serialization(self): + # TODO: Reimplement this, it doesn't make sense in redis-land + return + + # instance_id = 'foo' + # first_node = node.Node() + # inst = yield first_node.run_instance(instance_id) + # + # # force the state so that we can verify that it changes + # inst._s['state'] = node.Instance.NOSTATE + # xml = inst.toXml() + # self.assert_(ElementTree.parse(StringIO.StringIO(xml))) + # + # second_node = node.Node() + # new_inst = node.Instance.fromXml(second_node._conn, pool=second_node._pool, xml=xml) + # self.assertEqual(new_inst.state, node.Instance.RUNNING) + # rv = yield first_node.terminate_instance(instance_id) + + +class NodeConnectionTestCase(test.TrialTestCase): + def setUp(self): + logging.getLogger().setLevel(logging.DEBUG) + super(NodeConnectionTestCase, self).setUp() + self.flags(fake_libvirt=True, + fake_storage=True, + fake_users=True, + redis_db=8) + self.node = node.Node() + + def create_instance(self): + instdir = model.InstanceDirectory() + inst = instdir.new() + # TODO(ja): add ami, ari, aki, user_data + inst['reservation_id'] = 'r-fakeres' + inst['launch_time'] = '10' + inst['owner_id'] = 'fake' + inst['node_name'] = FLAGS.node_name + inst['mac_address'] = utils.generate_mac() + inst['ami_launch_index'] = 0 + inst.save() + return inst['instance_id'] + + @defer.inlineCallbacks + def test_run_describe_terminate(self): + instance_id = self.create_instance() + + rv = yield self.node.run_instance(instance_id) + + rv = yield self.node.describe_instances() + self.assertEqual(rv[instance_id].name, instance_id) + + rv = yield self.node.terminate_instance(instance_id) + + rv = yield self.node.describe_instances() + self.assertEqual(rv, {}) + + @defer.inlineCallbacks + def test_reboot(self): + instance_id = self.create_instance() + rv = yield self.node.run_instance(instance_id) + + rv = yield self.node.describe_instances() + logging.debug("describe_instances returns %s" % (rv)) + self.assertEqual(rv[instance_id].name, instance_id) + + yield self.node.reboot_instance(instance_id) + + rv = yield self.node.describe_instances() + self.assertEqual(rv[instance_id].name, instance_id) + rv = yield self.node.terminate_instance(instance_id) + + @defer.inlineCallbacks + def test_console_output(self): + instance_id = self.create_instance() + rv = yield self.node.run_instance(instance_id) + + console = yield self.node.get_console_output(instance_id) + self.assert_(console) + rv = yield self.node.terminate_instance(instance_id) + + @defer.inlineCallbacks + def test_run_instance_existing(self): + instance_id = self.create_instance() + rv = yield self.node.run_instance(instance_id) + + rv = yield self.node.describe_instances() + self.assertEqual(rv[instance_id].name, instance_id) + + self.assertRaises(exception.Error, self.node.run_instance, instance_id) + rv = yield self.node.terminate_instance(instance_id) diff --git a/nova/tests/objectstore_unittest.py b/nova/tests/objectstore_unittest.py new file mode 100644 index 000000000..5f41d47a0 --- /dev/null +++ b/nova/tests/objectstore_unittest.py @@ -0,0 +1,190 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import glob +import hashlib +import logging +import os +import shutil +import tempfile + +from nova import vendor + +from nova import flags +from nova import rpc +from nova import objectstore +from nova import test +from nova.auth import users + +FLAGS = flags.FLAGS + + +oss_tempdir = tempfile.mkdtemp(prefix='test_oss-') + + +# delete tempdirs from previous runs (we don't delete after test to allow +# checking the contents after running tests) +for path in glob.glob(os.path.abspath(os.path.join(oss_tempdir, '../test_oss-*'))): + if path != oss_tempdir: + shutil.rmtree(path) + + +# create bucket/images path +os.makedirs(os.path.join(oss_tempdir, 'images')) +os.makedirs(os.path.join(oss_tempdir, 'buckets')) + +class ObjectStoreTestCase(test.BaseTestCase): + def setUp(self): + super(ObjectStoreTestCase, self).setUp() + self.flags(fake_users=True, + buckets_path=os.path.join(oss_tempdir, 'buckets'), + images_path=os.path.join(oss_tempdir, 'images'), + ca_path=os.path.join(os.path.dirname(__file__), 'CA')) + self.conn = rpc.Connection.instance() + logging.getLogger().setLevel(logging.DEBUG) + + self.um = users.UserManager.instance() + + def test_buckets(self): + try: + self.um.create_user('user1') + except: pass + try: + self.um.create_user('user2') + except: pass + try: + self.um.create_user('admin_user', admin=True) + except: pass + + objectstore.bucket.Bucket.create('new_bucket', self.um.get_user('user1')) + bucket = objectstore.bucket.Bucket('new_bucket') + + # creator is authorized to use bucket + self.assert_(bucket.is_authorized(self.um.get_user('user1'))) + + # another user is not authorized + self.assert_(bucket.is_authorized(self.um.get_user('user2')) == False) + + # admin is authorized to use bucket + self.assert_(bucket.is_authorized(self.um.get_user('admin_user'))) + + # new buckets are empty + self.assert_(bucket.list_keys()['Contents'] == []) + + # storing keys works + bucket['foo'] = "bar" + + self.assert_(len(bucket.list_keys()['Contents']) == 1) + + self.assert_(bucket['foo'].read() == 'bar') + + # md5 of key works + self.assert_(bucket['foo'].md5 == hashlib.md5('bar').hexdigest()) + + # deleting non-empty bucket throws exception + exception = False + try: + bucket.delete() + except: + exception = True + + self.assert_(exception) + + # deleting key + del bucket['foo'] + + # deleting empty button + bucket.delete() + + # accessing deleted bucket throws exception + exception = False + try: + objectstore.bucket.Bucket('new_bucket') + except: + exception = True + + self.assert_(exception) + self.um.delete_user('user1') + self.um.delete_user('user2') + self.um.delete_user('admin_user') + + def test_images(self): + try: + self.um.create_user('image_creator') + except: pass + image_user = self.um.get_user('image_creator') + + # create a bucket for our bundle + objectstore.bucket.Bucket.create('image_bucket', image_user) + bucket = objectstore.bucket.Bucket('image_bucket') + + # upload an image manifest/parts + bundle_path = os.path.join(os.path.dirname(__file__), 'bundle') + for path in glob.glob(bundle_path + '/*'): + bucket[os.path.basename(path)] = open(path, 'rb').read() + + # register an image + objectstore.image.Image.create('i-testing', 'image_bucket/1mb.manifest.xml', image_user) + + # verify image + my_img = objectstore.image.Image('i-testing') + result_image_file = os.path.join(my_img.path, 'image') + self.assertEqual(os.stat(result_image_file).st_size, 1048576) + + sha = hashlib.sha1(open(result_image_file).read()).hexdigest() + self.assertEqual(sha, '3b71f43ff30f4b15b5cd85dd9e95ebc7e84eb5a3') + + # verify image permissions + try: + self.um.create_user('new_user') + except: pass + new_user = self.um.get_user('new_user') + self.assert_(my_img.is_authorized(new_user) == False) + + self.um.delete_user('new_user') + self.um.delete_user('image_creator') + +# class ApiObjectStoreTestCase(test.BaseTestCase): +# def setUp(self): +# super(ApiObjectStoreTestCase, self).setUp() +# FLAGS.fake_users = True +# FLAGS.buckets_path = os.path.join(tempdir, 'buckets') +# FLAGS.images_path = os.path.join(tempdir, 'images') +# FLAGS.ca_path = os.path.join(os.path.dirname(__file__), 'CA') +# +# self.users = users.UserManager.instance() +# self.app = handler.Application(self.users) +# +# self.host = '127.0.0.1' +# +# self.conn = boto.s3.connection.S3Connection( +# aws_access_key_id=user.access, +# aws_secret_access_key=user.secret, +# is_secure=False, +# calling_format=boto.s3.connection.OrdinaryCallingFormat(), +# port=FLAGS.s3_port, +# host=FLAGS.s3_host) +# +# self.mox.StubOutWithMock(self.ec2, 'new_http_connection') +# +# def tearDown(self): +# FLAGS.Reset() +# super(ApiObjectStoreTestCase, self).tearDown() +# +# def test_describe_instances(self): +# self.expect_http() +# self.mox.ReplayAll() +# +# self.assertEqual(self.ec2.get_all_instances(), []) diff --git a/nova/tests/real_flags.py b/nova/tests/real_flags.py new file mode 100644 index 000000000..68fe8dc5b --- /dev/null +++ b/nova/tests/real_flags.py @@ -0,0 +1,24 @@ +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nova import flags + +FLAGS = flags.FLAGS + +FLAGS.fake_libvirt = False +FLAGS.fake_storage = False +FLAGS.fake_rabbit = False +FLAGS.fake_network = False +FLAGS.fake_users = False +FLAGS.verbose = False diff --git a/nova/tests/storage_unittest.py b/nova/tests/storage_unittest.py new file mode 100644 index 000000000..31966d2d5 --- /dev/null +++ b/nova/tests/storage_unittest.py @@ -0,0 +1,86 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import StringIO +import time +import unittest +from xml.etree import ElementTree + +from nova import vendor +import mox +from tornado import ioloop +from twisted.internet import defer + +from nova import exception +from nova import flags +from nova import test +from nova.compute import node +from nova.volume import storage + + +FLAGS = flags.FLAGS + + +class StorageTestCase(test.TrialTestCase): + def setUp(self): + logging.getLogger().setLevel(logging.DEBUG) + super(StorageTestCase, self).setUp() + self.mynode = node.Node() + self.mystorage = None + self.flags(fake_libvirt=True, + fake_storage=True, + redis_db=8) + if FLAGS.fake_storage: + self.mystorage = storage.FakeBlockStore() + else: + self.mystorage = storage.BlockStore() + + @test.skip_if_fake + def test_run_create_volume(self): + vol_size = '0' + user_id = 'fake' + volume_id = self.mystorage.create_volume(vol_size, user_id) + # rv = self.mystorage.describe_volumes() + + # Volumes have to be sorted by timestamp in order to work here... + # TODO(termie): get_volume returns differently than create_volume + self.assertEqual(volume_id, + self.mystorage.get_volume(volume_id)['volume_id']) + + rv = self.mystorage.delete_volume(volume_id) + self.assertRaises(exception.Error, + self.mystorage.get_volume, + volume_id) + + @test.skip_if_fake + def test_run_attach_detach_volume(self): + # Create one volume and one node to test with + instance_id = "storage-test" + # TODO(joshua) - Redo this test, can't make fake instances this way any more + # rv = self.mynode.run_instance(instance_id) + vol_size = "5" + user_id = "fake" + volume_id = self.mystorage.create_volume(vol_size, user_id) + rv = self.mystorage.attach_volume(volume_id, + instance_id, + "/dev/sdf") + volume_obj = self.mystorage.get_volume(volume_id) + self.assertEqual(volume_obj['status'], "attached") + # TODO(???): assert that it's attached to the right instance + + rv = self.mystorage.detach_volume(volume_id) + volume_obj = self.mystorage.get_volume(volume_id) + self.assertEqual(volume_obj['status'], "available") diff --git a/nova/tests/users_unittest.py b/nova/tests/users_unittest.py new file mode 100644 index 000000000..70f508b35 --- /dev/null +++ b/nova/tests/users_unittest.py @@ -0,0 +1,137 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import unittest + +from nova import vendor +from M2Crypto import BIO +from M2Crypto import RSA +from M2Crypto import X509 + +from nova import crypto +from nova import flags +from nova import test +from nova import utils +from nova.auth import users +from nova.endpoint import cloud + + +FLAGS = flags.FLAGS + + +class UserTestCase(test.BaseTestCase): + def setUp(self): + super(UserTestCase, self).setUp() + self.flags(fake_libvirt=True, + fake_storage=True, + redis_db=8) + self.users = users.UserManager.instance() + + def test_001_can_create_user(self): + self.users.create_user('test1', 'access', 'secret') + + def test_002_can_get_user(self): + user = self.users.get_user('test1') + + def test_003_can_retreive_properties(self): + user = self.users.get_user('test1') + self.assertEqual('test1', user.id) + self.assertEqual('access', user.access) + self.assertEqual('secret', user.secret) + + def test_004_signature_is_valid(self): + #self.assertTrue(self.users.authenticate( **boto.generate_url ... ? ? ? )) + pass + #raise NotImplementedError + + def test_005_can_get_credentials(self): + return + credentials = self.users.get_user('test1').get_credentials() + self.assertEqual(credentials, + 'export EC2_ACCESS_KEY="access"\n' + + 'export EC2_SECRET_KEY="secret"\n' + + 'export EC2_URL="http://127.0.0.1:8773/services/Cloud"\n' + + 'export S3_URL="http://127.0.0.1:3333/"\n' + + 'export EC2_USER_ID="test1"\n') + + def test_006_test_key_storage(self): + user = self.users.get_user('test1') + user.create_key_pair('public', 'key', 'fingerprint') + key = user.get_key_pair('public') + self.assertEqual('key', key.public_key) + self.assertEqual('fingerprint', key.fingerprint) + + def test_007_test_key_generation(self): + user = self.users.get_user('test1') + private_key, fingerprint = user.generate_key_pair('public2') + key = RSA.load_key_string(private_key, callback=lambda: None) + bio = BIO.MemoryBuffer() + public_key = user.get_key_pair('public2').public_key + key.save_pub_key_bio(bio) + converted = crypto.ssl_pub_to_ssh_pub(bio.read()) + # assert key fields are equal + print converted + self.assertEqual(public_key.split(" ")[1].strip(), + converted.split(" ")[1].strip()) + + def test_008_can_list_key_pairs(self): + keys = self.users.get_user('test1').get_key_pairs() + self.assertTrue(filter(lambda k: k.name == 'public', keys)) + self.assertTrue(filter(lambda k: k.name == 'public2', keys)) + + def test_009_can_delete_key_pair(self): + self.users.get_user('test1').delete_key_pair('public') + keys = self.users.get_user('test1').get_key_pairs() + self.assertFalse(filter(lambda k: k.name == 'public', keys)) + + def test_010_can_list_users(self): + users = self.users.get_users() + self.assertTrue(filter(lambda u: u.id == 'test1', users)) + + def test_011_can_generate_x509(self): + # MUST HAVE RUN CLOUD SETUP BY NOW + self.cloud = cloud.CloudController() + self.cloud.setup() + private_key, signed_cert_string = self.users.get_user('test1').generate_x509_cert() + logging.debug(signed_cert_string) + + # Need to verify that it's signed by the right intermediate CA + full_chain = crypto.fetch_ca(username='test1', chain=True) + int_cert = crypto.fetch_ca(username='test1', chain=False) + cloud_cert = crypto.fetch_ca() + logging.debug("CA chain:\n\n =====\n%s\n\n=====" % full_chain) + signed_cert = X509.load_cert_string(signed_cert_string) + chain_cert = X509.load_cert_string(full_chain) + int_cert = X509.load_cert_string(int_cert) + cloud_cert = X509.load_cert_string(cloud_cert) + self.assertTrue(signed_cert.verify(chain_cert.get_pubkey())) + self.assertTrue(signed_cert.verify(int_cert.get_pubkey())) + + if not FLAGS.use_intermediate_ca: + self.assertTrue(signed_cert.verify(cloud_cert.get_pubkey())) + else: + self.assertFalse(signed_cert.verify(cloud_cert.get_pubkey())) + + def test_012_can_delete_user(self): + self.users.delete_user('test1') + users = self.users.get_users() + if users != None: + self.assertFalse(filter(lambda u: u.id == 'test1', users)) + + +if __name__ == "__main__": + # TODO: Implement use_fake as an option + unittest.main() diff --git a/nova/twistd.py b/nova/twistd.py new file mode 100644 index 000000000..ea3c9c168 --- /dev/null +++ b/nova/twistd.py @@ -0,0 +1,249 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Twisted daemon helpers, specifically to parse out gFlags from twisted flags, +manage pid files and support syslogging. +""" + +import logging +import os +import signal +import sys +import time +import UserDict +import logging.handlers + +from nova import vendor +from twisted.scripts import twistd +from twisted.python import log +from twisted.python import reflect +from twisted.python import runtime +from twisted.python import usage + +from nova import flags + +if runtime.platformType == "win32": + from twisted.scripts._twistw import ServerOptions +else: + from twisted.scripts._twistd_unix import ServerOptions + + +FLAGS = flags.FLAGS + + +class TwistdServerOptions(ServerOptions): + def parseArgs(self, *args): + return + + +def WrapTwistedOptions(wrapped): + class TwistedOptionsToFlags(wrapped): + subCommands = None + def __init__(self): + # NOTE(termie): _data exists because Twisted stuff expects + # to be able to set arbitrary things that are + # not actual flags + self._data = {} + self._flagHandlers = {} + self._paramHandlers = {} + + # Absorb the twistd flags into our FLAGS + self._absorbFlags() + self._absorbParameters() + self._absorbHandlers() + + super(TwistedOptionsToFlags, self).__init__() + + def _absorbFlags(self): + twistd_flags = [] + reflect.accumulateClassList(self.__class__, 'optFlags', twistd_flags) + for flag in twistd_flags: + key = flag[0].replace('-', '_') + flags.DEFINE_boolean(key, None, str(flag[-1])) + + def _absorbParameters(self): + twistd_params = [] + reflect.accumulateClassList(self.__class__, 'optParameters', twistd_params) + for param in twistd_params: + key = param[0].replace('-', '_') + flags.DEFINE_string(key, param[2], str(param[-1])) + + def _absorbHandlers(self): + twistd_handlers = {} + reflect.addMethodNamesToDict(self.__class__, twistd_handlers, "opt_") + + # NOTE(termie): Much of the following is derived/copied from + # twisted.python.usage with the express purpose of + # providing compatibility + for name in twistd_handlers.keys(): + method = getattr(self, 'opt_'+name) + + takesArg = not usage.flagFunction(method, name) + doc = getattr(method, '__doc__', None) + if not doc: + doc = 'undocumented' + + if not takesArg: + if name not in FLAGS: + flags.DEFINE_boolean(name, None, doc) + self._flagHandlers[name] = method + else: + if name not in FLAGS: + flags.DEFINE_string(name, None, doc) + self._paramHandlers[name] = method + + + def _doHandlers(self): + for flag, handler in self._flagHandlers.iteritems(): + if self[flag]: + handler() + for param, handler in self._paramHandlers.iteritems(): + if self[param] is not None: + handler(self[param]) + + def __str__(self): + return str(FLAGS) + + def parseOptions(self, options=None): + if options is None: + options = sys.argv + else: + options.insert(0, '') + + args = FLAGS(options) + argv = args[1:] + # ignore subcommands + + try: + self.parseArgs(*argv) + except TypeError: + raise usage.UsageError("Wrong number of arguments.") + + self.postOptions() + return args + + def parseArgs(self, *args): + # TODO(termie): figure out a decent way of dealing with args + #return + super(TwistedOptionsToFlags, self).parseArgs(*args) + + def postOptions(self): + self._doHandlers() + + super(TwistedOptionsToFlags, self).postOptions() + + def __getitem__(self, key): + key = key.replace('-', '_') + try: + return getattr(FLAGS, key) + except (AttributeError, KeyError): + return self._data[key] + + def __setitem__(self, key, value): + key = key.replace('-', '_') + try: + return setattr(FLAGS, key, value) + except (AttributeError, KeyError): + self._data[key] = value + + return TwistedOptionsToFlags + + +def stop(pidfile): + """ + Stop the daemon + """ + # Get the pid from the pidfile + try: + pf = file(pidfile,'r') + pid = int(pf.read().strip()) + pf.close() + except IOError: + pid = None + + if not pid: + message = "pidfile %s does not exist. Daemon not running?\n" + sys.stderr.write(message % pidfile) + return # not an error in a restart + + # Try killing the daemon process + try: + while 1: + os.kill(pid, signal.SIGKILL) + time.sleep(0.1) + except OSError, err: + err = str(err) + if err.find("No such process") > 0: + if os.path.exists(pidfile): + os.remove(pidfile) + else: + print str(err) + sys.exit(1) + + +def serve(filename): + logging.debug("Serving %s" % filename) + name = os.path.basename(filename) + OptionsClass = WrapTwistedOptions(TwistdServerOptions) + options = OptionsClass() + argv = options.parseOptions() + logging.getLogger('amqplib').setLevel(logging.WARN) + FLAGS.python = filename + FLAGS.no_save = True + if not FLAGS.pidfile: + FLAGS.pidfile = '%s.pid' % name + elif FLAGS.pidfile.endswith('twistd.pid'): + FLAGS.pidfile = FLAGS.pidfile.replace('twistd.pid', '%s.pid' % name) + + if not FLAGS.logfile: + FLAGS.logfile = '%s.log' % name + + action = 'start' + if len(argv) > 1: + action = argv.pop() + + if action == 'stop': + stop(FLAGS.pidfile) + sys.exit() + elif action == 'restart': + stop(FLAGS.pidfile) + elif action == 'start': + pass + else: + print 'usage: %s [options] [start|stop|restart]' % argv[0] + sys.exit(1) + + formatter = logging.Formatter( + name + '(%(name)s): %(levelname)s %(message)s') + handler = logging.StreamHandler(log.StdioOnnaStick()) + handler.setFormatter(formatter) + logging.getLogger().addHandler(handler) + + if FLAGS.verbose: + logging.getLogger().setLevel(logging.DEBUG) + else: + logging.getLogger().setLevel(logging.WARNING) + + if FLAGS.syslog: + syslog = logging.handlers.SysLogHandler(address='/dev/log') + syslog.setFormatter(formatter) + logging.getLogger().addHandler(syslog) + + logging.debug("Full set of FLAGS:") + for flag in FLAGS: + logging.debug("%s : %s" % (flag, FLAGS.get(flag, None))) + + twistd.runApp(options) diff --git a/nova/utils.py b/nova/utils.py new file mode 100644 index 000000000..0cfa2cf6c --- /dev/null +++ b/nova/utils.py @@ -0,0 +1,96 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +System-level utilities and helper functions. +""" + +import logging +import socket +import sys +import os.path +import inspect +import subprocess +import random + +def fetchfile(url, target): + logging.debug("Fetching %s" % url) +# c = pycurl.Curl() +# fp = open(target, "wb") +# c.setopt(c.URL, url) +# c.setopt(c.WRITEDATA, fp) +# c.perform() +# c.close() +# fp.close() + execute("curl %s -o %s" % (url, target)) + +def execute(cmd, input=None): + #logging.debug("Running %s" % (cmd)) + obj = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + result = None + if input != None: + result = obj.communicate(input) + else: + result = obj.communicate() + obj.stdin.close() + if obj.returncode: + logging.debug("Result was %s" % (obj.returncode)) + return result + +def abspath(s): + return os.path.join(os.path.dirname(__file__), s) + +def default_flagfile(filename='nova.conf'): + for arg in sys.argv: + if arg.find('flagfile') != -1: + break + else: + if not os.path.isabs(filename): + # turn relative filename into an absolute path + script_dir = os.path.dirname(inspect.stack()[-1][1]) + filename = os.path.abspath(os.path.join(script_dir, filename)) + if os.path.exists(filename): + sys.argv = sys.argv[:1] + ['--flagfile=%s' % filename] + sys.argv[1:] + +def debug(arg): + logging.debug('debug in callback: %s', arg) + return arg + +def runthis(prompt, cmd): + logging.debug("Running %s" % (cmd)) + logging.debug(prompt % (subprocess.call(cmd.split(" ")))) + + +def generate_uid(topic, size=8): + return '%s-%s' % (topic, ''.join([random.choice('01234567890abcdefghijklmnopqrstuvwxyz') for x in xrange(size)])) + +def generate_mac(): + mac = [0x00, 0x16, 0x3e, random.randint(0x00, 0x7f), + random.randint(0x00, 0xff), random.randint(0x00, 0xff) + ] + return ':'.join(map(lambda x: "%02x" % x, mac)) + +def last_octet(address): + return int(address.split(".")[-1]) + +def get_my_ip(): + ''' returns the actual ip of the local machine. + ''' + csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + csock.connect(('www.google.com', 80)) + (addr, port) = csock.getsockname() + csock.close() + return addr diff --git a/nova/vendor.py b/nova/vendor.py new file mode 100644 index 000000000..758adeb3c --- /dev/null +++ b/nova/vendor.py @@ -0,0 +1,43 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Get our vendor folders into the system path. +""" + +import os +import sys + +# abspath/__file__/../vendor +VENDOR_PATH = os.path.abspath( + os.path.join(os.path.dirname(os.path.dirname(__file__)), 'vendor')) + +if not os.path.exists(VENDOR_PATH): + print 'warning: no vendor libraries included' +else: + paths = [VENDOR_PATH, + os.path.join(VENDOR_PATH, 'pymox'), + os.path.join(VENDOR_PATH, 'tornado'), + os.path.join(VENDOR_PATH, 'python-gflags'), + os.path.join(VENDOR_PATH, 'python-daemon'), + os.path.join(VENDOR_PATH, 'lockfile'), + os.path.join(VENDOR_PATH, 'boto'), + os.path.join(VENDOR_PATH, 'Twisted-10.0.0'), + os.path.join(VENDOR_PATH, 'redis-py'), + ] + + for p in paths: + if p not in sys.path: + sys.path.insert(0, p) diff --git a/nova/volume/__init__.py b/nova/volume/__init__.py new file mode 100644 index 000000000..1c569f383 --- /dev/null +++ b/nova/volume/__init__.py @@ -0,0 +1,27 @@ +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +:mod:`nova.volume` -- Nova Block Storage +===================================================== + +.. automodule:: nova.volume + :platform: Unix +.. moduleauthor:: Jesse Andrews <jesse@ansolabs.com> +.. moduleauthor:: Devin Carlen <devin.carlen@gmail.com> +.. moduleauthor:: Vishvananda Ishaya <vishvananda@yahoo.com> +.. moduleauthor:: Joshua McKenty <joshua@cognition.ca> +.. moduleauthor:: Manish Singh <yosh@gimp.org> +.. moduleauthor:: Andy Smith <andy@anarkystic.com> +"""
\ No newline at end of file diff --git a/nova/volume/storage.py b/nova/volume/storage.py new file mode 100644 index 000000000..823e1390a --- /dev/null +++ b/nova/volume/storage.py @@ -0,0 +1,250 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Nova Storage manages creating, attaching, detaching, and +destroying persistent storage volumes, ala EBS. +Currently uses Ata-over-Ethernet. +""" + +import logging +import random +import socket +import subprocess +import time + +from nova import vendor +from tornado import ioloop +from twisted.internet import defer + +from nova import datastore +from nova import exception +from nova import flags +from nova import rpc +from nova import utils + + +FLAGS = flags.FLAGS +flags.DEFINE_string('storage_dev', '/dev/sdb', + 'Physical device to use for volumes') +flags.DEFINE_string('volume_group', 'nova-volumes', + 'Name for the VG that will contain exported volumes') +flags.DEFINE_string('aoe_eth_dev', 'eth0', + 'Which device to export the volumes on') +flags.DEFINE_string('storage_name', + socket.gethostname(), + 'name of this node') +flags.DEFINE_integer('shelf_id', + utils.last_octet(utils.get_my_ip()), + 'AoE shelf_id for this node') +flags.DEFINE_string('storage_availability_zone', + 'nova', + 'availability zone of this node') +flags.DEFINE_boolean('fake_storage', False, + 'Should we make real storage volumes to attach?') + +class BlockStore(object): + def __init__(self): + super(BlockStore, self).__init__() + self.volume_class = Volume + if FLAGS.fake_storage: + self.volume_class = FakeVolume + self._init_volume_group() + self.keeper = datastore.Keeper('instances') + + def report_state(self): + #TODO: aggregate the state of the system + pass + + def create_volume(self, size, user_id): + """ + Creates an exported volume (fake or real), + restarts exports to make it available. + Volume at this point has size, owner, and zone. + """ + logging.debug("Creating volume of size: %s" % (size)) + vol = self.volume_class.create(size, user_id) + self.keeper.set_add('volumes', vol['volume_id']) + self._restart_exports() + return vol['volume_id'] + + def get_volume(self, volume_id): + """ Returns a redis-backed volume object """ + if volume_id in self.keeper['volumes']: + return self.volume_class(volume_id=volume_id) + raise exception.Error("Volume does not exist") + + def by_project(self, project): + """ returns a list of volume objects for a project """ + # TODO(termie): I don't understand why this is doing a range + #for volume_id in datastore.Redis.instance().lrange("project:%s:volumes" % + #project, 0, -1): + for volume_id in datastore['project:%s:volumes' % project]: + yield self.volume_class(volume_id=volume_id) + + def by_node(self, node_id): + """ returns a list of volumes for a node """ + for volume in self.all: + if volume['node_name'] == node_id: + yield volume + + @property + def all(self): + """ returns a list of all volumes """ + for volume_id in self.keeper['volumes']: + yield self.volume_class(volume_id=volume_id) + + + def delete_volume(self, volume_id): + logging.debug("Deleting volume with id of: %s" % (volume_id)) + vol = self.get_volume(volume_id) + vol.destroy() + self.keeper.set_remove('volumes', vol['volume_id']) + return True + + def attach_volume(self, volume_id, instance_id, mountpoint): + self.volume_class(volume_id).attach(instance_id, mountpoint) + + def detach_volume(self, volume_id): + self.volume_class(volume_id).detach() + + def loop_volumes(self): + volumes = subprocess.Popen(["sudo", "lvs", "--noheadings"], stdout=subprocess.PIPE).communicate()[0].split("\n") + for lv in volumes: + if len(lv.split(" ")) > 1: + yield lv.split(" ")[2] + + def _restart_exports(self): + if FLAGS.fake_storage: + return + utils.runthis("Setting exports to auto: %s", "sudo vblade-persist auto all") + utils.runthis("Starting all exports: %s", "sudo vblade-persist start all") + utils.runthis("Discovering AOE devices: %s", "sudo aoe-discover") + + def _init_volume_group(self): + if FLAGS.fake_storage: + return + utils.runthis("PVCreate returned: %s", "sudo pvcreate %s" % (FLAGS.storage_dev)) + utils.runthis("VGCreate returned: %s", "sudo vgcreate %s %s" % (FLAGS.volume_group, FLAGS.storage_dev)) + + +class FakeBlockStore(BlockStore): + def __init__(self): + super(FakeBlockStore, self).__init__() + + def loop_volumes(self): + return self.volumes + + def _init_volume_group(self): + pass + + def _restart_exports(self): + pass + + +class Volume(datastore.RedisModel): + + object_type = 'volume' + + def __init__(self, volume_id=None): + self.volume_id = volume_id + super(Volume, self).__init__(object_id=volume_id) + + @classmethod + def create(cls, size, user_id): + volume_id = utils.generate_uid('vol') + vol = cls(volume_id=volume_id) + #TODO(vish): do we really need to store the volume id as .object_id .volume_id and ['volume_id']? + vol['volume_id'] = volume_id + vol['node_name'] = FLAGS.storage_name + vol['size'] = size + vol['user_id'] = user_id + vol['availability_zone'] = FLAGS.storage_availability_zone + vol["instance_id"] = 'none' + vol["mountpoint"] = 'none' + vol["create_time"] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) + vol["attachment_set"] = '' + vol.create_lv() + vol.setup_export() + vol['status'] = "available" + vol.save() + return vol + + def attach(self, instance_id, mountpoint): + self['instance_id'] = instance_id + self['mountpoint'] = mountpoint + self['status'] = "attached" + self.save() + + def detach(self): + self['instance_id'] = None + self['mountpoint'] = None + self['status'] = "available" + self.save() + + def destroy(self): + try: + self._remove_export() + except: + pass + self._delete_lv() + super(Volume, self).destroy() + + def create_lv(self): + if str(self['size']) == '0': + sizestr = '100M' + else: + sizestr = '%sG' % self['size'] + utils.runthis("Creating LV: %s", "sudo lvcreate -L %s -n %s %s" % (sizestr, self.volume_id, FLAGS.volume_group)) + + def _delete_lv(self): + utils.runthis("Removing LV: %s", "sudo lvremove -f %s/%s" % (FLAGS.volume_group, self.volume_id)) + + def setup_export(self): + (shelf_id, blade_id) = get_next_aoe_numbers() + self['aoe_device'] = "e%s.%s" % (shelf_id, blade_id) + self.save() + utils.runthis("Creating AOE export: %s", + "sudo vblade-persist setup %s %s %s /dev/%s/%s" % + (shelf_id, blade_id, FLAGS.aoe_eth_dev, FLAGS.volume_group, self.volume_id)) + + def _remove_export(self): + utils.runthis("Destroyed AOE export: %s", "sudo vblade-persist stop %s %s" % (self.aoe_device[1], self.aoe_device[3])) + utils.runthis("Destroyed AOE export: %s", "sudo vblade-persist destroy %s %s" % (self.aoe_device[1], self.aoe_device[3])) + + +class FakeVolume(Volume): + def create_lv(self): + pass + + def setup_export(self): + # TODO(???): This may not be good enough? + self['aoe_device'] = 'e%s.%s' % (FLAGS.shelf_id, + ''.join([random.choice('0123456789') for x in xrange(3)])) + self.save() + + def _remove_export(self): + pass + + def _delete_lv(self): + pass + +def get_next_aoe_numbers(): + aoes = glob.glob("/var/lib/vblade-persist/vblades/e*") + aoes.extend(['e0.0']) + blade_id = int(max([int(a.split('.')[1]) for a in aoes])) + 1 + logging.debug("Next blade_id is %s" % (blade_id)) + shelf_id = FLAGS.shelf_id + return (shelf_id, blade_id) |
