summaryrefslogtreecommitdiffstats
path: root/smoketests
diff options
context:
space:
mode:
Diffstat (limited to 'smoketests')
-rw-r--r--smoketests/admin_smoketests.py92
-rw-r--r--smoketests/base.py (renamed from smoketests/novatestcase.py)114
-rw-r--r--smoketests/flags.py13
-rw-r--r--smoketests/smoketest.py566
-rw-r--r--smoketests/user_smoketests.py326
5 files changed, 490 insertions, 621 deletions
diff --git a/smoketests/admin_smoketests.py b/smoketests/admin_smoketests.py
new file mode 100644
index 000000000..50bb3fa2e
--- /dev/null
+++ b/smoketests/admin_smoketests.py
@@ -0,0 +1,92 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+import random
+import sys
+import time
+import unittest
+import zipfile
+
+from nova import adminclient
+from smoketests import flags
+from smoketests import base
+
+
+SUITE_NAMES = '[user]'
+
+FLAGS = flags.FLAGS
+flags.DEFINE_string('suite', None, 'Specific test suite to run ' + SUITE_NAMES)
+
+# TODO(devamcar): Use random tempfile
+ZIP_FILENAME = '/tmp/nova-me-x509.zip'
+
+TEST_PREFIX = 'test%s' % int(random.random()*1000000)
+TEST_USERNAME = '%suser' % TEST_PREFIX
+TEST_PROJECTNAME = '%sproject' % TEST_PREFIX
+
+
+class AdminSmokeTestCase(base.SmokeTestCase):
+ def setUp(self):
+ self.admin = adminclient.NovaAdminClient(
+ access_key=os.getenv('EC2_ACCESS_KEY'),
+ secret_key=os.getenv('EC2_SECRET_KEY'),
+ clc_url=os.getenv('EC2_URL'),
+ region=FLAGS.region)
+
+
+class UserTests(AdminSmokeTestCase):
+ """ Test admin credentials and user creation. """
+
+ def test_001_admin_can_connect(self):
+ conn = self.admin.connection_for('admin', 'admin')
+ self.assert_(conn)
+
+ def test_002_admin_can_create_user(self):
+ user = self.admin.create_user(TEST_USERNAME)
+ self.assertEqual(user.username, TEST_USERNAME)
+
+ def test_003_admin_can_create_project(self):
+ project = self.admin.create_project(TEST_PROJECTNAME,
+ TEST_USERNAME)
+ self.assertEqual(project.projectname, TEST_PROJECTNAME)
+
+ def test_004_user_can_download_credentials(self):
+ buf = self.admin.get_zip(TEST_USERNAME, TEST_PROJECTNAME)
+ output = open(ZIP_FILENAME, 'w')
+ output.write(buf)
+ output.close()
+
+ zip = zipfile.ZipFile(ZIP_FILENAME, 'a', zipfile.ZIP_DEFLATED)
+ bad = zip.testzip()
+ zip.close()
+
+ self.failIf(bad)
+
+ def test_999_tearDown(self):
+ self.admin.delete_project(TEST_PROJECTNAME)
+ self.admin.delete_user(TEST_USERNAME)
+ try:
+ os.remove(ZIP_FILENAME)
+ except:
+ pass
+
+if __name__ == "__main__":
+ suites = {'user': unittest.makeSuite(UserTests)}
+ sys.exit(base.run_tests(suites))
+
diff --git a/smoketests/novatestcase.py b/smoketests/base.py
index 513e0ca91..5a14d3e09 100644
--- a/smoketests/novatestcase.py
+++ b/smoketests/base.py
@@ -16,36 +16,26 @@
# License for the specific language governing permissions and limitations
# under the License.
+import boto
import commands
+import httplib
import os
+import paramiko
import random
import sys
import unittest
+from boto.ec2.regioninfo import RegionInfo
-
-import paramiko
-
-from nova import adminclient
from smoketests import flags
FLAGS = flags.FLAGS
-class NovaTestCase(unittest.TestCase):
- def setUp(self):
- self.nova_admin = adminclient.NovaAdminClient(
- access_key=FLAGS.admin_access_key,
- secret_key=FLAGS.admin_secret_key,
- clc_ip=FLAGS.clc_ip)
-
- def tearDown(self):
- pass
-
+class SmokeTestCase(unittest.TestCase):
def connect_ssh(self, ip, key_name):
# TODO(devcamcar): set a more reasonable connection timeout time
key = paramiko.RSAKey.from_private_key_file('/tmp/%s.pem' % key_name)
client = paramiko.SSHClient()
- client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.WarningPolicy())
client.connect(ip, username='root', pkey=key)
stdin, stdout, stderr = client.exec_command('uptime')
@@ -53,26 +43,50 @@ class NovaTestCase(unittest.TestCase):
return client
def can_ping(self, ip):
- return commands.getstatusoutput('ping -c 1 %s' % ip)[0] == 0
-
- @property
- def admin(self):
- return self.nova_admin.connection_for('admin')
-
- def connection_for(self, username):
- return self.nova_admin.connection_for(username)
-
- def create_user(self, username):
- return self.nova_admin.create_user(username)
-
- def get_user(self, username):
- return self.nova_admin.get_user(username)
-
- def delete_user(self, username):
- return self.nova_admin.delete_user(username)
-
- def get_signed_zip(self, username):
- return self.nova_admin.get_zip(username)
+ """ Attempt to ping the specified IP, and give up after 1 second. """
+
+ # NOTE(devcamcar): ping timeout flag is different in OSX.
+ if sys.platform == 'darwin':
+ timeout_flag = 't'
+ else:
+ timeout_flag = 'w'
+
+ status, output = commands.getstatusoutput('ping -c1 -%s1 %s' %
+ (timeout_flag, ip))
+ return status == 0
+
+ def connection_for_env(self, **kwargs):
+ """
+ Returns a boto ec2 connection for the current environment.
+ """
+ access_key = os.getenv('EC2_ACCESS_KEY')
+ secret_key = os.getenv('EC2_SECRET_KEY')
+ clc_url = os.getenv('EC2_URL')
+
+ if not access_key or not secret_key or not clc_url:
+ raise Exception('Missing EC2 environment variables. Please source '
+ 'the appropriate novarc file before running this '
+ 'test.')
+
+ parts = self.split_clc_url(clc_url)
+ return boto.connect_ec2(aws_access_key_id=access_key,
+ aws_secret_access_key=secret_key,
+ is_secure=parts['is_secure'],
+ region=RegionInfo(None,
+ 'nova',
+ parts['ip']),
+ port=parts['port'],
+ path='/services/Cloud',
+ **kwargs)
+
+ def split_clc_url(self, clc_url):
+ """
+ Splits a cloud controller endpoint url.
+ """
+ parts = httplib.urlsplit(clc_url)
+ is_secure = parts.scheme == 'https'
+ ip, port = parts.netloc.split(':')
+ return {'ip': ip, 'port': int(port), 'is_secure': is_secure}
def create_key_pair(self, conn, key_name):
try:
@@ -116,15 +130,25 @@ class NovaTestCase(unittest.TestCase):
raise Exception(output)
return True
- def register_image(self, bucket_name, manifest):
- conn = nova_admin.connection_for('admin')
- return conn.register_image("%s/%s.manifest.xml" % (bucket_name, manifest))
+def run_tests(suites):
+ argv = FLAGS(sys.argv)
+
+ if not os.getenv('EC2_ACCESS_KEY'):
+ print >> sys.stderr, 'Missing EC2 environment variables. Please ' \
+ 'source the appropriate novarc file before ' \
+ 'running this test.'
+ return 1
+
+ if FLAGS.suite:
+ try:
+ suite = suites[FLAGS.suite]
+ except KeyError:
+ print >> sys.stderr, 'Available test suites:', \
+ ', '.join(suites.keys())
+ return 1
- def setUp_test_image(self, image, kernel=False):
- self.bundle_image(image, kernel=kernel)
- bucket = "auto_test_%s" % int(random.random() * 1000000)
- self.upload_image(bucket, image)
- return self.register_image(bucket, image)
+ unittest.TextTestRunner(verbosity=2).run(suite)
+ else:
+ for suite in suites.itervalues():
+ unittest.TextTestRunner(verbosity=2).run(suite)
- def tearDown_test_image(self, conn, image_id):
- conn.deregister_image(image_id)
diff --git a/smoketests/flags.py b/smoketests/flags.py
index 3617fb797..ae4d09508 100644
--- a/smoketests/flags.py
+++ b/smoketests/flags.py
@@ -1,7 +1,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
+# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -33,13 +33,6 @@ DEFINE_bool = DEFINE_bool
# __GLOBAL FLAGS ONLY__
# Define any app-specific flags in their own files, docs at:
# http://code.google.com/p/python-gflags/source/browse/trunk/gflags.py#39
-DEFINE_string('admin_access_key', 'admin', 'Access key for admin user')
-DEFINE_string('admin_secret_key', 'admin', 'Secret key for admin user')
-DEFINE_string('clc_ip', '127.0.0.1', 'IP of cloud controller API')
-DEFINE_string('bundle_kernel', 'openwrt-x86-vmlinuz',
- 'Local kernel file to use for bundling tests')
-DEFINE_string('bundle_image', 'openwrt-x86-ext2.image',
- 'Local image file to use for bundling tests')
-#DEFINE_string('vpn_image_id', 'ami-CLOUDPIPE',
-# 'AMI for cloudpipe vpn server')
+DEFINE_string('region', 'nova', 'Region to use')
+DEFINE_string('test_image', 'ami-tiny', 'Image to use for launch tests')
diff --git a/smoketests/smoketest.py b/smoketests/smoketest.py
deleted file mode 100644
index ad95114d4..000000000
--- a/smoketests/smoketest.py
+++ /dev/null
@@ -1,566 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import commands
-import os
-import random
-import re
-import sys
-import time
-import unittest
-import zipfile
-
-
-import paramiko
-
-from smoketests import flags
-from smoketests import novatestcase
-
-SUITE_NAMES = '[user, image, security, public_network, volume]'
-
-FLAGS = flags.FLAGS
-flags.DEFINE_string('suite', None, 'Specific test suite to run ' + SUITE_NAMES)
-
-# TODO(devamcar): Use random tempfile
-ZIP_FILENAME = '/tmp/nova-me-x509.zip'
-
-data = {}
-
-test_prefix = 'test%s' % int(random.random()*1000000)
-test_username = '%suser' % test_prefix
-test_bucket = '%s_bucket' % test_prefix
-test_key = '%s_key' % test_prefix
-
-# Test admin credentials and user creation
-class UserTests(novatestcase.NovaTestCase):
- def test_001_admin_can_connect(self):
- conn = self.connection_for('admin')
- self.assert_(conn)
-
- def test_002_admin_can_create_user(self):
- userinfo = self.create_user(test_username)
- self.assertEqual(userinfo.username, test_username)
-
- def test_003_user_can_download_credentials(self):
- buf = self.get_signed_zip(test_username)
- output = open(ZIP_FILENAME, 'w')
- output.write(buf)
- output.close()
-
- zip = zipfile.ZipFile(ZIP_FILENAME, 'a', zipfile.ZIP_DEFLATED)
- bad = zip.testzip()
- zip.close()
-
- self.failIf(bad)
-
- def test_999_tearDown(self):
- self.delete_user(test_username)
- user = self.get_user(test_username)
- self.assert_(user is None)
- try:
- os.remove(ZIP_FILENAME)
- except:
- pass
-
-# Test image bundling, registration, and launching
-class ImageTests(novatestcase.NovaTestCase):
- def test_000_setUp(self):
- self.create_user(test_username)
-
- def test_001_admin_can_bundle_image(self):
- self.assertTrue(self.bundle_image(FLAGS.bundle_image))
-
- def test_002_admin_can_upload_image(self):
- self.assertTrue(self.upload_image(test_bucket, FLAGS.bundle_image))
-
- def test_003_admin_can_register_image(self):
- image_id = self.register_image(test_bucket, FLAGS.bundle_image)
- self.assert_(image_id is not None)
- data['image_id'] = image_id
-
- def test_004_admin_can_bundle_kernel(self):
- self.assertTrue(self.bundle_image(FLAGS.bundle_kernel, kernel=True))
-
- def test_005_admin_can_upload_kernel(self):
- self.assertTrue(self.upload_image(test_bucket, FLAGS.bundle_kernel))
-
- def test_006_admin_can_register_kernel(self):
- # FIXME(devcamcar): registration should verify that bucket/manifest
- # exists before returning successfully.
- kernel_id = self.register_image(test_bucket, FLAGS.bundle_kernel)
- self.assert_(kernel_id is not None)
- data['kernel_id'] = kernel_id
-
- def test_007_admin_images_are_available_within_10_seconds(self):
- for i in xrange(10):
- image = self.admin.get_image(data['image_id'])
- if image and image.state == 'available':
- break
- time.sleep(1)
- else:
- print image.state
- self.assert_(False) # wasn't available within 10 seconds
- self.assert_(image.type == 'machine')
-
- for i in xrange(10):
- kernel = self.admin.get_image(data['kernel_id'])
- if kernel and kernel.state == 'available':
- break
- time.sleep(1)
- else:
- self.assert_(False) # wasn't available within 10 seconds
- self.assert_(kernel.type == 'kernel')
-
- def test_008_admin_can_describe_image_attribute(self):
- attrs = self.admin.get_image_attribute(data['image_id'],
- 'launchPermission')
- self.assert_(attrs.name, 'launch_permission')
-
- def test_009_me_cannot_see_non_public_images(self):
- conn = self.connection_for(test_username)
- images = conn.get_all_images(image_ids=[data['image_id']])
- self.assertEqual(len(images), 0)
-
- def test_010_admin_can_modify_image_launch_permission(self):
- conn = self.connection_for(test_username)
-
- self.admin.modify_image_attribute(image_id=data['image_id'],
- operation='add',
- attribute='launchPermission',
- groups='all')
-
- image = conn.get_image(data['image_id'])
- self.assertEqual(image.id, data['image_id'])
-
- def test_011_me_can_list_public_images(self):
- conn = self.connection_for(test_username)
- images = conn.get_all_images(image_ids=[data['image_id']])
- self.assertEqual(len(images), 1)
- pass
-
- def test_012_me_can_see_launch_permission(self):
- attrs = self.admin.get_image_attribute(data['image_id'],
- 'launchPermission')
- self.assert_(attrs.name, 'launch_permission')
- self.assert_(attrs.groups[0], 'all')
-
- # FIXME: add tests that user can launch image
-
-# def test_013_user_can_launch_admin_public_image(self):
-# # TODO: Use openwrt kernel instead of default kernel
-# conn = self.connection_for(test_username)
-# reservation = conn.run_instances(data['image_id'])
-# self.assertEqual(len(reservation.instances), 1)
-# data['my_instance_id'] = reservation.instances[0].id
-
-# def test_014_instances_launch_within_30_seconds(self):
-# pass
-
-# def test_015_user_can_terminate(self):
-# conn = self.connection_for(test_username)
-# terminated = conn.terminate_instances(
-# instance_ids=[data['my_instance_id']])
-# self.assertEqual(len(terminated), 1)
-
- def test_016_admin_can_deregister_kernel(self):
- self.assertTrue(self.admin.deregister_image(data['kernel_id']))
-
- def test_017_admin_can_deregister_image(self):
- self.assertTrue(self.admin.deregister_image(data['image_id']))
-
- def test_018_admin_can_delete_bundle(self):
- self.assertTrue(self.delete_bundle_bucket(test_bucket))
-
- def test_999_tearDown(self):
- data = {}
- self.delete_user(test_username)
-
-
-# Test key pairs and security groups
-class SecurityTests(novatestcase.NovaTestCase):
- def test_000_setUp(self):
- self.create_user(test_username + '_me')
- self.create_user(test_username + '_you')
- data['image_id'] = 'ami-tiny'
-
- def test_001_me_can_create_keypair(self):
- conn = self.connection_for(test_username + '_me')
- key = self.create_key_pair(conn, test_key)
- self.assertEqual(key.name, test_key)
-
- def test_002_you_can_create_keypair(self):
- conn = self.connection_for(test_username + '_you')
- key = self.create_key_pair(conn, test_key+ 'yourkey')
- self.assertEqual(key.name, test_key+'yourkey')
-
- def test_003_me_can_create_instance_with_keypair(self):
- conn = self.connection_for(test_username + '_me')
- reservation = conn.run_instances(data['image_id'], key_name=test_key)
- self.assertEqual(len(reservation.instances), 1)
- data['my_instance_id'] = reservation.instances[0].id
-
- def test_004_me_can_obtain_private_ip_within_60_seconds(self):
- conn = self.connection_for(test_username + '_me')
- reservations = conn.get_all_instances([data['my_instance_id']])
- instance = reservations[0].instances[0]
- # allow 60 seconds to exit pending with IP
- for x in xrange(60):
- instance.update()
- if instance.state != u'pending':
- break
- time.sleep(1)
- else:
- self.assert_(False)
- # self.assertEqual(instance.state, u'running')
- ip = reservations[0].instances[0].private_dns_name
- self.failIf(ip == '0.0.0.0')
- data['my_private_ip'] = ip
- print data['my_private_ip'],
-
- def test_005_can_ping_private_ip(self):
- for x in xrange(120):
- # ping waits for 1 second
- status, output = commands.getstatusoutput(
- 'ping -c1 -w1 %s' % data['my_private_ip'])
- if status == 0:
- break
- else:
- self.assert_('could not ping instance')
- #def test_005_me_cannot_ssh_when_unauthorized(self):
- # self.assertRaises(paramiko.SSHException, self.connect_ssh,
- # data['my_private_ip'], 'mykey')
-
- #def test_006_me_can_authorize_ssh(self):
- # conn = self.connection_for(test_username + '_me')
- # self.assertTrue(
- # conn.authorize_security_group(
- # 'default',
- # ip_protocol='tcp',
- # from_port=22,
- # to_port=22,
- # cidr_ip='0.0.0.0/0'
- # )
- # )
-
- def test_007_me_can_ssh_when_authorized(self):
- conn = self.connect_ssh(data['my_private_ip'], test_key)
- conn.close()
-
- #def test_008_me_can_revoke_ssh_authorization(self):
- # conn = self.connection_for('me')
- # self.assertTrue(
- # conn.revoke_security_group(
- # 'default',
- # ip_protocol='tcp',
- # from_port=22,
- # to_port=22,
- # cidr_ip='0.0.0.0/0'
- # )
- # )
-
- #def test_009_you_cannot_ping_my_instance(self):
- # TODO: should ping my_private_ip from with an instance started by you.
- #self.assertFalse(self.can_ping(data['my_private_ip']))
-
- def test_010_you_cannot_ssh_to_my_instance(self):
- try:
- conn = self.connect_ssh(data['my_private_ip'],
- test_key + 'yourkey')
- conn.close()
- except paramiko.SSHException:
- pass
- else:
- self.fail("expected SSHException")
-
- def test_999_tearDown(self):
- conn = self.connection_for(test_username + '_me')
- self.delete_key_pair(conn, test_key)
- if data.has_key('my_instance_id'):
- conn.terminate_instances([data['my_instance_id']])
-
- conn = self.connection_for(test_username + '_you')
- self.delete_key_pair(conn, test_key + 'yourkey')
-
- conn = self.connection_for('admin')
- self.delete_user(test_username + '_me')
- self.delete_user(test_username + '_you')
- #self.tearDown_test_image(conn, data['image_id'])
-
-# TODO: verify wrt image boots
-# build python into wrt image
-# build boto/m2crypto into wrt image
-# build euca2ools into wrt image
-# build a script to download and unpack credentials
-# - return "ok" to stdout for comparison in self.assertEqual()
-# build a script to bundle the instance
-# build a script to upload the bundle
-
-# status, output = commands.getstatusoutput('cmd')
-# if status == 0:
-# print 'ok'
-# else:
-# print output
-
-# Testing rebundling
-class RebundlingTests(novatestcase.NovaTestCase):
- def test_000_setUp(self):
- self.create_user('me')
- self.create_user('you')
- # TODO: create keypair for me
- # upload smoketest img
- # run instance
-
- def test_001_me_can_download_credentials_within_instance(self):
- conn = self.connect_ssh(data['my_private_ip'], 'mykey')
- stdin, stdout = conn.exec_command(
- 'python ~/smoketests/install-credentials.py')
- conn.close()
- self.assertEqual(stdout, 'ok')
-
- def test_002_me_can_rebundle_within_instance(self):
- conn = self.connect_ssh(data['my_private_ip'], 'mykey')
- stdin, stdout = conn.exec_command(
- 'python ~/smoketests/rebundle-instance.py')
- conn.close()
- self.assertEqual(stdout, 'ok')
-
- def test_003_me_can_upload_image_within_instance(self):
- conn = self.connect_ssh(data['my_private_ip'], 'mykey')
- stdin, stdout = conn.exec_command(
- 'python ~/smoketests/upload-bundle.py')
- conn.close()
- self.assertEqual(stdout, 'ok')
-
- def test_004_me_can_register_image_within_instance(self):
- conn = self.connect_ssh(data['my_private_ip'], 'mykey')
- stdin, stdout = conn.exec_command(
- 'python ~/smoketests/register-image.py')
- conn.close()
- if re.matches('ami-{\w+}', stdout):
- data['my_image_id'] = stdout.strip()
- else:
- self.fail('expected ami-nnnnnn, got:\n ' + stdout)
-
- def test_005_you_cannot_see_my_private_image(self):
- conn = self.connection_for('you')
- image = conn.get_image(data['my_image_id'])
- self.assertEqual(image, None)
-
- def test_006_me_can_make_image_public(self):
- conn = self.connection_for(test_username)
- conn.modify_image_attribute(image_id=data['my_image_id'],
- operation='add',
- attribute='launchPermission',
- groups='all')
-
- def test_007_you_can_see_my_public_image(self):
- conn = self.connection_for('you')
- image = conn.get_image(data['my_image_id'])
- self.assertEqual(image.id, data['my_image_id'])
-
- def test_999_tearDown(self):
- self.delete_user('me')
- self.delete_user('you')
-
- #if data.has_key('image_id'):
- # deregister rebundled image
-
- # TODO: tear down instance
- # delete keypairs
- data = {}
-
-# Test elastic IPs
-class ElasticIPTests(novatestcase.NovaTestCase):
- def test_000_setUp(self):
- data['image_id'] = 'ami-tiny'
-
- self.create_user('me')
- conn = self.connection_for('me')
- self.create_key_pair(conn, 'mykey')
-
- conn = self.connection_for('admin')
- #data['image_id'] = self.setUp_test_image(FLAGS.bundle_image)
-
- def test_001_me_can_launch_image_with_keypair(self):
- conn = self.connection_for('me')
- reservation = conn.run_instances(data['image_id'], key_name='mykey')
- self.assertEqual(len(reservation.instances), 1)
- data['my_instance_id'] = reservation.instances[0].id
-
- def test_002_me_can_allocate_elastic_ip(self):
- conn = self.connection_for('me')
- data['my_public_ip'] = conn.allocate_address()
- self.assert_(data['my_public_ip'].public_ip)
-
- def test_003_me_can_associate_ip_with_instance(self):
- self.assertTrue(data['my_public_ip'].associate(data['my_instance_id']))
-
- def test_004_me_can_ssh_with_public_ip(self):
- conn = self.connect_ssh(data['my_public_ip'].public_ip, 'mykey')
- conn.close()
-
- def test_005_me_can_disassociate_ip_from_instance(self):
- self.assertTrue(data['my_public_ip'].disassociate())
-
- def test_006_me_can_deallocate_elastic_ip(self):
- self.assertTrue(data['my_public_ip'].delete())
-
- def test_999_tearDown(self):
- conn = self.connection_for('me')
- self.delete_key_pair(conn, 'mykey')
-
- conn = self.connection_for('admin')
- #self.tearDown_test_image(conn, data['image_id'])
- data = {}
-
-ZONE = 'nova'
-DEVICE = 'vdb'
-# Test iscsi volumes
-class VolumeTests(novatestcase.NovaTestCase):
- def test_000_setUp(self):
- self.create_user(test_username)
- data['image_id'] = 'ami-tiny' # A7370FE3
-
- conn = self.connection_for(test_username)
- self.create_key_pair(conn, test_key)
- reservation = conn.run_instances(data['image_id'],
- instance_type='m1.tiny',
- key_name=test_key)
- data['instance_id'] = reservation.instances[0].id
- data['private_ip'] = reservation.instances[0].private_dns_name
- # wait for instance to show up
- for x in xrange(120):
- # ping waits for 1 second
- status, output = commands.getstatusoutput(
- 'ping -c1 -w1 %s' % data['private_ip'])
- if status == 0:
- break
- else:
- self.fail('unable to ping instance')
-
- def test_001_me_can_create_volume(self):
- conn = self.connection_for(test_username)
- volume = conn.create_volume(1, ZONE)
- self.assertEqual(volume.size, 1)
- data['volume_id'] = volume.id
- # give network time to find volume
- time.sleep(5)
-
- def test_002_me_can_attach_volume(self):
- conn = self.connection_for(test_username)
- conn.attach_volume(
- volume_id = data['volume_id'],
- instance_id = data['instance_id'],
- device = '/dev/%s' % DEVICE
- )
- # give instance time to recognize volume
- time.sleep(5)
-
- def test_003_me_can_mount_volume(self):
- conn = self.connect_ssh(data['private_ip'], test_key)
- # FIXME(devcamcar): the tiny image doesn't create the node properly
- # this will make /dev/vd* if it doesn't exist
- stdin, stdout, stderr = conn.exec_command(
- 'grep %s /proc/partitions |' + \
- '`awk \'{print "mknod /dev/"$4" b "$1" "$2}\'`' % DEVICE)
- commands = []
- commands.append('mkdir -p /mnt/vol')
- commands.append('mkfs.ext2 /dev/%s' % DEVICE)
- commands.append('mount /dev/%s /mnt/vol' % DEVICE)
- commands.append('echo success')
- stdin, stdout, stderr = conn.exec_command(' && '.join(commands))
- out = stdout.read()
- conn.close()
- if not out.strip().endswith('success'):
- self.fail('Unable to mount: %s %s' % (out, stderr.read()))
-
- def test_004_me_can_write_to_volume(self):
- conn = self.connect_ssh(data['private_ip'], test_key)
- # FIXME(devcamcar): This doesn't fail if the volume hasn't been mounted
- stdin, stdout, stderr = conn.exec_command(
- 'echo hello > /mnt/vol/test.txt')
- err = stderr.read()
- conn.close()
- if len(err) > 0:
- self.fail('Unable to write to mount: %s' % (err))
-
- def test_005_volume_is_correct_size(self):
- conn = self.connect_ssh(data['private_ip'], test_key)
- stdin, stdout, stderr = conn.exec_command(
- "df -h | grep %s | awk {'print $2'}" % DEVICE)
- out = stdout.read()
- conn.close()
- if not out.strip() == '1007.9M':
- self.fail('Volume is not the right size: %s %s' % (out, stderr.read()))
-
- def test_006_me_can_umount_volume(self):
- conn = self.connect_ssh(data['private_ip'], test_key)
- stdin, stdout, stderr = conn.exec_command('umount /mnt/vol')
- err = stderr.read()
- conn.close()
- if len(err) > 0:
- self.fail('Unable to unmount: %s' % (err))
-
- def test_007_me_can_detach_volume(self):
- conn = self.connection_for(test_username)
- self.assertTrue(conn.detach_volume(volume_id = data['volume_id']))
-
- def test_008_me_can_delete_volume(self):
- conn = self.connection_for(test_username)
- self.assertTrue(conn.delete_volume(data['volume_id']))
-
- def test_009_volume_size_must_be_int(self):
- conn = self.connection_for(test_username)
- self.assertRaises(Exception, conn.create_volume, 'foo', ZONE)
-
- def test_999_tearDown(self):
- global data
- conn = self.connection_for(test_username)
- self.delete_key_pair(conn, test_key)
- if data.has_key('instance_id'):
- conn.terminate_instances([data['instance_id']])
- self.delete_user(test_username)
- data = {}
-
-def build_suites():
- return {
- 'user': unittest.makeSuite(UserTests),
- 'image': unittest.makeSuite(ImageTests),
- 'security': unittest.makeSuite(SecurityTests),
- 'public_network': unittest.makeSuite(ElasticIPTests),
- 'volume': unittest.makeSuite(VolumeTests),
- }
-
-def main():
- argv = FLAGS(sys.argv)
- suites = build_suites()
-
- if FLAGS.suite:
- try:
- suite = suites[FLAGS.suite]
- except KeyError:
- print >> sys.stderr, 'Available test suites:', SUITE_NAMES
- return 1
-
- unittest.TextTestRunner(verbosity=2).run(suite)
- else:
- for suite in suites.itervalues():
- unittest.TextTestRunner(verbosity=2).run(suite)
-
-if __name__ == "__main__":
- sys.exit(main())
diff --git a/smoketests/user_smoketests.py b/smoketests/user_smoketests.py
new file mode 100644
index 000000000..d29e3aea3
--- /dev/null
+++ b/smoketests/user_smoketests.py
@@ -0,0 +1,326 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import commands
+import os
+import random
+import socket
+import sys
+import time
+import unittest
+
+from smoketests import flags
+from smoketests import base
+
+
+SUITE_NAMES = '[image, instance, volume]'
+
+FLAGS = flags.FLAGS
+flags.DEFINE_string('suite', None, 'Specific test suite to run ' + SUITE_NAMES)
+flags.DEFINE_string('bundle_kernel', 'openwrt-x86-vmlinuz',
+ 'Local kernel file to use for bundling tests')
+flags.DEFINE_string('bundle_image', 'openwrt-x86-ext2.image',
+ 'Local image file to use for bundling tests')
+
+TEST_PREFIX = 'test%s' % int (random.random()*1000000)
+TEST_BUCKET = '%s_bucket' % TEST_PREFIX
+TEST_KEY = '%s_key' % TEST_PREFIX
+TEST_DATA = {}
+
+
+class UserSmokeTestCase(base.SmokeTestCase):
+ def setUp(self):
+ global TEST_DATA
+ self.conn = self.connection_for_env()
+ self.data = TEST_DATA
+
+
+class ImageTests(UserSmokeTestCase):
+ def test_001_can_bundle_image(self):
+ self.assertTrue(self.bundle_image(FLAGS.bundle_image))
+
+ def test_002_can_upload_image(self):
+ self.assertTrue(self.upload_image(TEST_BUCKET, FLAGS.bundle_image))
+
+ def test_003_can_register_image(self):
+ image_id = self.conn.register_image('%s/%s.manifest.xml' %
+ (TEST_BUCKET, FLAGS.bundle_image))
+ self.assert_(image_id is not None)
+ self.data['image_id'] = image_id
+
+ def test_004_can_bundle_kernel(self):
+ self.assertTrue(self.bundle_image(FLAGS.bundle_kernel, kernel=True))
+
+ def test_005_can_upload_kernel(self):
+ self.assertTrue(self.upload_image(TEST_BUCKET, FLAGS.bundle_kernel))
+
+ def test_006_can_register_kernel(self):
+ kernel_id = self.conn.register_image('%s/%s.manifest.xml' %
+ (TEST_BUCKET, FLAGS.bundle_kernel))
+ self.assert_(kernel_id is not None)
+ self.data['kernel_id'] = kernel_id
+
+ def test_007_images_are_available_within_10_seconds(self):
+ for i in xrange(10):
+ image = self.conn.get_image(self.data['image_id'])
+ if image and image.state == 'available':
+ break
+ time.sleep(1)
+ else:
+ print image.state
+ self.assert_(False) # wasn't available within 10 seconds
+ self.assert_(image.type == 'machine')
+
+ for i in xrange(10):
+ kernel = self.conn.get_image(self.data['kernel_id'])
+ if kernel and kernel.state == 'available':
+ break
+ time.sleep(1)
+ else:
+ self.assert_(False) # wasn't available within 10 seconds
+ self.assert_(kernel.type == 'kernel')
+
+ def test_008_can_describe_image_attribute(self):
+ attrs = self.conn.get_image_attribute(self.data['image_id'],
+ 'launchPermission')
+ self.assert_(attrs.name, 'launch_permission')
+
+ def test_009_can_modify_image_launch_permission(self):
+ self.conn.modify_image_attribute(image_id=self.data['image_id'],
+ operation='add',
+ attribute='launchPermission',
+ groups='all')
+ image = self.conn.get_image(self.data['image_id'])
+ self.assertEqual(image.id, self.data['image_id'])
+
+ def test_010_can_see_launch_permission(self):
+ attrs = self.conn.get_image_attribute(self.data['image_id'],
+ 'launchPermission')
+ self.assert_(attrs.name, 'launch_permission')
+ self.assert_(attrs.attrs['groups'][0], 'all')
+
+ def test_011_user_can_deregister_kernel(self):
+ self.assertTrue(self.conn.deregister_image(self.data['kernel_id']))
+
+ def test_012_can_deregister_image(self):
+ self.assertTrue(self.conn.deregister_image(self.data['image_id']))
+
+ def test_013_can_delete_bundle(self):
+ self.assertTrue(self.delete_bundle_bucket(TEST_BUCKET))
+
+
+class InstanceTests(UserSmokeTestCase):
+ def test_001_can_create_keypair(self):
+ key = self.create_key_pair(self.conn, TEST_KEY)
+ self.assertEqual(key.name, TEST_KEY)
+
+ def test_002_can_create_instance_with_keypair(self):
+ reservation = self.conn.run_instances(FLAGS.test_image,
+ key_name=TEST_KEY,
+ instance_type='m1.tiny')
+ self.assertEqual(len(reservation.instances), 1)
+ self.data['instance_id'] = reservation.instances[0].id
+
+ def test_003_instance_runs_within_60_seconds(self):
+ reservations = self.conn.get_all_instances([data['instance_id']])
+ instance = reservations[0].instances[0]
+ # allow 60 seconds to exit pending with IP
+ for x in xrange(60):
+ instance.update()
+ if instance.state == u'running':
+ break
+ time.sleep(1)
+ else:
+ self.fail('instance failed to start')
+ ip = reservations[0].instances[0].private_dns_name
+ self.failIf(ip == '0.0.0.0')
+ self.data['private_ip'] = ip
+ print self.data['private_ip']
+
+ def test_004_can_ping_private_ip(self):
+ for x in xrange(120):
+ # ping waits for 1 second
+ status, output = commands.getstatusoutput(
+ 'ping -c1 %s' % self.data['private_ip'])
+ if status == 0:
+ break
+ else:
+ self.fail('could not ping instance')
+
+ def test_005_can_ssh_to_private_ip(self):
+ for x in xrange(30):
+ try:
+ conn = self.connect_ssh(self.data['private_ip'], TEST_KEY)
+ conn.close()
+ except Exception:
+ time.sleep(1)
+ else:
+ break
+ else:
+ self.fail('could not ssh to instance')
+
+ def test_006_can_allocate_elastic_ip(self):
+ result = self.conn.allocate_address()
+ self.assertTrue(hasattr(result, 'public_ip'))
+ self.data['public_ip'] = result.public_ip
+
+ def test_007_can_associate_ip_with_instance(self):
+ result = self.conn.associate_address(self.data['instance_id'],
+ self.data['public_ip'])
+ self.assertTrue(result)
+
+ def test_008_can_ssh_with_public_ip(self):
+ for x in xrange(30):
+ try:
+ conn = self.connect_ssh(self.data['public_ip'], TEST_KEY)
+ conn.close()
+ except socket.error:
+ time.sleep(1)
+ else:
+ break
+ else:
+ self.fail('could not ssh to instance')
+
+ def test_009_can_disassociate_ip_from_instance(self):
+ result = self.conn.disassociate_address(self.data['public_ip'])
+ self.assertTrue(result)
+
+ def test_010_can_deallocate_elastic_ip(self):
+ result = self.conn.release_address(self.data['public_ip'])
+ self.assertTrue(result)
+
+ def test_999_tearDown(self):
+ self.delete_key_pair(self.conn, TEST_KEY)
+ if self.data.has_key('instance_id'):
+ self.conn.terminate_instances([data['instance_id']])
+
+
+class VolumeTests(UserSmokeTestCase):
+ def setUp(self):
+ super(VolumeTests, self).setUp()
+ self.device = '/dev/vdb'
+
+ def test_000_setUp(self):
+ self.create_key_pair(self.conn, TEST_KEY)
+ reservation = self.conn.run_instances(FLAGS.test_image,
+ instance_type='m1.tiny',
+ key_name=TEST_KEY)
+ instance = reservation.instances[0]
+ self.data['instance'] = instance
+ for x in xrange(120):
+ if self.can_ping(instance.private_dns_name):
+ break
+ else:
+ self.fail('unable to start instance')
+
+ def test_001_can_create_volume(self):
+ volume = self.conn.create_volume(1, 'nova')
+ self.assertEqual(volume.size, 1)
+ self.data['volume'] = volume
+ # Give network time to find volume.
+ time.sleep(5)
+
+ def test_002_can_attach_volume(self):
+ volume = self.data['volume']
+
+ for x in xrange(10):
+ if volume.status == u'available':
+ break
+ time.sleep(5)
+ volume.update()
+ else:
+ self.fail('cannot attach volume with state %s' % volume.status)
+
+ volume.attach(self.data['instance'].id, self.device)
+
+ # Volumes seems to report "available" too soon.
+ for x in xrange(10):
+ if volume.status == u'in-use':
+ break
+ time.sleep(5)
+ volume.update()
+
+ self.assertEqual(volume.status, u'in-use')
+
+ # Give instance time to recognize volume.
+ time.sleep(5)
+
+ def test_003_can_mount_volume(self):
+ ip = self.data['instance'].private_dns_name
+ conn = self.connect_ssh(ip, TEST_KEY)
+ commands = []
+ commands.append('mkdir -p /mnt/vol')
+ commands.append('mkfs.ext2 %s' % self.device)
+ commands.append('mount %s /mnt/vol' % self.device)
+ commands.append('echo success')
+ stdin, stdout, stderr = conn.exec_command(' && '.join(commands))
+ out = stdout.read()
+ conn.close()
+ if not out.strip().endswith('success'):
+ self.fail('Unable to mount: %s %s' % (out, stderr.read()))
+
+ def test_004_can_write_to_volume(self):
+ ip = self.data['instance'].private_dns_name
+ conn = self.connect_ssh(ip, TEST_KEY)
+ # FIXME(devcamcar): This doesn't fail if the volume hasn't been mounted
+ stdin, stdout, stderr = conn.exec_command(
+ 'echo hello > /mnt/vol/test.txt')
+ err = stderr.read()
+ conn.close()
+ if len(err) > 0:
+ self.fail('Unable to write to mount: %s' % (err))
+
+ def test_005_volume_is_correct_size(self):
+ ip = self.data['instance'].private_dns_name
+ conn = self.connect_ssh(ip, TEST_KEY)
+ stdin, stdout, stderr = conn.exec_command(
+ "df -h | grep %s | awk {'print $2'}" % self.device)
+ out = stdout.read()
+ conn.close()
+ if not out.strip() == '1008M':
+ self.fail('Volume is not the right size: %s %s' %
+ (out, stderr.read()))
+
+ def test_006_me_can_umount_volume(self):
+ ip = self.data['instance'].private_dns_name
+ conn = self.connect_ssh(ip, TEST_KEY)
+ stdin, stdout, stderr = conn.exec_command('umount /mnt/vol')
+ err = stderr.read()
+ conn.close()
+ if len(err) > 0:
+ self.fail('Unable to unmount: %s' % (err))
+
+ def test_007_me_can_detach_volume(self):
+ result = self.conn.detach_volume(volume_id=self.data['volume'].id)
+ self.assertTrue(result)
+ time.sleep(5)
+
+ def test_008_me_can_delete_volume(self):
+ result = self.conn.delete_volume(self.data['volume'].id)
+ self.assertTrue(result)
+
+ def test_999_tearDown(self):
+ self.conn.terminate_instances([self.data['instance'].id])
+ self.conn.delete_key_pair(TEST_KEY)
+
+
+if __name__ == "__main__":
+ suites = {'image': unittest.makeSuite(ImageTests),
+ 'instance': unittest.makeSuite(InstanceTests),
+ 'volume': unittest.makeSuite(VolumeTests)}
+ sys.exit(base.run_tests(suites))