summaryrefslogtreecommitdiffstats
path: root/nova
diff options
context:
space:
mode:
authorEric Day <eday@oddments.org>2010-08-05 13:51:44 -0700
committerEric Day <eday@oddments.org>2010-08-05 13:51:44 -0700
commitb77d261b0288f89d2b25a52e7ad7c8073e357cb1 (patch)
treed391f0ee81edc9879b147fb3c504d9f693777e11 /nova
parent85b73194c2f8432a7e9ab5d24574746f209846ee (diff)
First pass at making a file pass pep8 and pylint tests as an example.
Diffstat (limited to 'nova')
-rw-r--r--nova/tests/objectstore_unittest.py237
1 files changed, 136 insertions, 101 deletions
diff --git a/nova/tests/objectstore_unittest.py b/nova/tests/objectstore_unittest.py
index dd00377e7..dece4b5d5 100644
--- a/nova/tests/objectstore_unittest.py
+++ b/nova/tests/objectstore_unittest.py
@@ -16,6 +16,10 @@
# License for the specific language governing permissions and limitations
# under the License.
+"""
+Unittets for S3 objectstore clone.
+"""
+
import boto
import glob
import hashlib
@@ -24,76 +28,69 @@ import os
import shutil
import tempfile
+from boto.s3.connection import S3Connection, OrdinaryCallingFormat
+from twisted.internet import reactor, threads, defer
+from twisted.web import http, server
+
from nova import flags
from nova import objectstore
-from nova.objectstore import bucket # for buckets_path flag
-from nova.objectstore import image # for images_path flag
from nova import test
from nova.auth import manager
+from nova.exception import NotEmpty, NotFound
+from nova.objectstore import image
from nova.objectstore.handler import S3
-from nova.exception import NotEmpty, NotFound, NotAuthorized
-from boto.s3.connection import S3Connection, OrdinaryCallingFormat
-from twisted.internet import reactor, threads, defer
-from twisted.web import http, server
FLAGS = flags.FLAGS
-oss_tempdir = tempfile.mkdtemp(prefix='test_oss-')
-
+# Create a unique temporary directory. We don't delete after test to
+# allow checking the contents after running tests. Users and/or tools
+# running the tests need to remove the tests directories.
+OSS_TEMPDIR = tempfile.mkdtemp(prefix='test_oss-')
-# delete tempdirs from previous runs (we don't delete after test to allow
-# checking the contents after running tests)
-# TODO: This fails on the test box with a permission denied error
-# Also, doing these things in a global tempdir means that different runs of
-# the test suite on the same box could clobber each other.
-#for path in glob.glob(os.path.abspath(os.path.join(oss_tempdir, '../test_oss-*'))):
-# if path != oss_tempdir:
-# shutil.rmtree(path)
+# Create bucket/images path
+os.makedirs(os.path.join(OSS_TEMPDIR, 'images'))
+os.makedirs(os.path.join(OSS_TEMPDIR, 'buckets'))
-# create bucket/images path
-os.makedirs(os.path.join(oss_tempdir, 'images'))
-os.makedirs(os.path.join(oss_tempdir, 'buckets'))
-
class ObjectStoreTestCase(test.BaseTestCase):
- def setUp(self):
+ """Test objectstore API directly."""
+
+ def setUp(self): # pylint: disable-msg=C0103
+ """Setup users and projects."""
super(ObjectStoreTestCase, self).setUp()
- self.flags(buckets_path=os.path.join(oss_tempdir, 'buckets'),
- images_path=os.path.join(oss_tempdir, 'images'),
+ self.flags(buckets_path=os.path.join(OSS_TEMPDIR, 'buckets'),
+ images_path=os.path.join(OSS_TEMPDIR, 'images'),
ca_path=os.path.join(os.path.dirname(__file__), 'CA'))
logging.getLogger().setLevel(logging.DEBUG)
- self.um = manager.AuthManager()
- try:
- self.um.create_user('user1')
- except: pass
- try:
- self.um.create_user('user2')
- except: pass
- try:
- self.um.create_user('admin_user', admin=True)
- except: pass
- try:
- self.um.create_project('proj1', 'user1', 'a proj', ['user1'])
- except: pass
- try:
- self.um.create_project('proj2', 'user2', 'a proj', ['user2'])
- except: pass
- class Context(object): pass
+ self.auth_manager = manager.AuthManager()
+ self.auth_manager.create_user('user1')
+ self.auth_manager.create_user('user2')
+ self.auth_manager.create_user('admin_user', admin=True)
+ self.auth_manager.create_project('proj1', 'user1', 'a proj', ['user1'])
+ self.auth_manager.create_project('proj2', 'user2', 'a proj', ['user2'])
+
+ class Context(object):
+ """Dummy context for running tests."""
+ user = None
+ project = None
+
self.context = Context()
- def tearDown(self):
- self.um.delete_project('proj1')
- self.um.delete_project('proj2')
- self.um.delete_user('user1')
- self.um.delete_user('user2')
- self.um.delete_user('admin_user')
+ def tearDown(self): # pylint: disable-msg=C0103
+ """Tear down users and projects."""
+ self.auth_manager.delete_project('proj1')
+ self.auth_manager.delete_project('proj2')
+ self.auth_manager.delete_user('user1')
+ self.auth_manager.delete_user('user2')
+ self.auth_manager.delete_user('admin_user')
super(ObjectStoreTestCase, self).tearDown()
def test_buckets(self):
- self.context.user = self.um.get_user('user1')
- self.context.project = self.um.get_project('proj1')
+ """Test the bucket API."""
+ self.context.user = self.auth_manager.get_user('user1')
+ self.context.project = self.auth_manager.get_project('proj1')
objectstore.bucket.Bucket.create('new_bucket', self.context)
bucket = objectstore.bucket.Bucket('new_bucket')
@@ -101,12 +98,12 @@ class ObjectStoreTestCase(test.BaseTestCase):
self.assert_(bucket.is_authorized(self.context))
# another user is not authorized
- self.context.user = self.um.get_user('user2')
- self.context.project = self.um.get_project('proj2')
+ self.context.user = self.auth_manager.get_user('user2')
+ self.context.project = self.auth_manager.get_project('proj2')
self.assertFalse(bucket.is_authorized(self.context))
# admin is authorized to use bucket
- self.context.user = self.um.get_user('admin_user')
+ self.context.user = self.auth_manager.get_user('admin_user')
self.context.project = None
self.assertTrue(bucket.is_authorized(self.context))
@@ -136,8 +133,9 @@ class ObjectStoreTestCase(test.BaseTestCase):
self.assertRaises(NotFound, objectstore.bucket.Bucket, 'new_bucket')
def test_images(self):
- self.context.user = self.um.get_user('user1')
- self.context.project = self.um.get_project('proj1')
+ "Test the image API."
+ self.context.user = self.auth_manager.get_user('user1')
+ self.context.project = self.auth_manager.get_project('proj1')
# create a bucket for our bundle
objectstore.bucket.Bucket.create('image_bucket', self.context)
@@ -149,10 +147,12 @@ class ObjectStoreTestCase(test.BaseTestCase):
bucket[os.path.basename(path)] = open(path, 'rb').read()
# register an image
- objectstore.image.Image.register_aws_image('i-testing', 'image_bucket/1mb.manifest.xml', self.context)
+ image.Image.register_aws_image('i-testing',
+ 'image_bucket/1mb.manifest.xml',
+ self.context)
# verify image
- my_img = objectstore.image.Image('i-testing')
+ my_img = image.Image('i-testing')
result_image_file = os.path.join(my_img.path, 'image')
self.assertEqual(os.stat(result_image_file).st_size, 1048576)
@@ -160,38 +160,48 @@ class ObjectStoreTestCase(test.BaseTestCase):
self.assertEqual(sha, '3b71f43ff30f4b15b5cd85dd9e95ebc7e84eb5a3')
# verify image permissions
- self.context.user = self.um.get_user('user2')
- self.context.project = self.um.get_project('proj2')
+ self.context.user = self.auth_manager.get_user('user2')
+ self.context.project = self.auth_manager.get_project('proj2')
self.assertFalse(my_img.is_authorized(self.context))
class TestHTTPChannel(http.HTTPChannel):
- # Otherwise we end up with an unclean reactor
- def checkPersistence(self, _, __):
+ """Dummy site required for twisted.web"""
+
+ def checkPersistence(self, _, __): # pylint: disable-msg=C0103
+ """Otherwise we end up with an unclean reactor."""
return False
class TestSite(server.Site):
+ """Dummy site required for twisted.web"""
protocol = TestHTTPChannel
class S3APITestCase(test.TrialTestCase):
- def setUp(self):
+ """Test objectstore through S3 API."""
+
+ def setUp(self): # pylint: disable-msg=C0103
+ """Setup users, projects, and start a test server."""
super(S3APITestCase, self).setUp()
- FLAGS.auth_driver='nova.auth.ldapdriver.FakeLdapDriver',
- FLAGS.buckets_path = os.path.join(oss_tempdir, 'buckets')
+ FLAGS.auth_driver = 'nova.auth.ldapdriver.FakeLdapDriver',
+ FLAGS.buckets_path = os.path.join(OSS_TEMPDIR, 'buckets')
- self.um = manager.AuthManager()
- self.admin_user = self.um.create_user('admin', admin=True)
- self.admin_project = self.um.create_project('admin', self.admin_user)
+ self.auth_manager = manager.AuthManager()
+ self.admin_user = self.auth_manager.create_user('admin', admin=True)
+ self.admin_project = self.auth_manager.create_project('admin',
+ self.admin_user)
shutil.rmtree(FLAGS.buckets_path)
os.mkdir(FLAGS.buckets_path)
root = S3()
self.site = TestSite(root)
- self.listening_port = reactor.listenTCP(0, self.site, interface='127.0.0.1')
+ # pylint: disable-msg=E1101
+ self.listening_port = reactor.listenTCP(0, self.site,
+ interface='127.0.0.1')
+ # pylint: enable-msg=E1101
self.tcp_port = self.listening_port.getHost().port
@@ -205,65 +215,90 @@ class S3APITestCase(test.TrialTestCase):
is_secure=False,
calling_format=OrdinaryCallingFormat())
- # Don't attempt to reuse connections
def get_http_connection(host, is_secure):
+ """Get a new S3 connection, don't attempt to reuse connections."""
return self.conn.new_http_connection(host, is_secure)
+
self.conn.get_http_connection = get_http_connection
- def _ensure_empty_list(self, l):
- self.assertEquals(len(l), 0, "List was not empty")
+ def _ensure_no_buckets(self, buckets): # pylint: disable-msg=C0111
+ self.assertEquals(len(buckets), 0, "Bucket list was not empty")
return True
- def _ensure_only_bucket(self, l, name):
- self.assertEquals(len(l), 1, "List didn't have exactly one element in it")
- self.assertEquals(l[0].name, name, "Wrong name")
+ def _ensure_one_bucket(self, buckets, name): # pylint: disable-msg=C0111
+ self.assertEquals(len(buckets), 1,
+ "Bucket list didn't have exactly one element in it")
+ self.assertEquals(buckets[0].name, name, "Wrong name")
+ return True
def test_000_list_buckets(self):
- d = threads.deferToThread(self.conn.get_all_buckets)
- d.addCallback(self._ensure_empty_list)
- return d
+ """Make sure we are starting with no buckets."""
+ deferred = threads.deferToThread(self.conn.get_all_buckets)
+ deferred.addCallback(self._ensure_no_buckets)
+ return deferred
def test_001_create_and_delete_bucket(self):
+ """Test bucket creation and deletion."""
bucket_name = 'testbucket'
- d = threads.deferToThread(self.conn.create_bucket, bucket_name)
- d.addCallback(lambda _:threads.deferToThread(self.conn.get_all_buckets))
+ deferred = threads.deferToThread(self.conn.create_bucket, bucket_name)
+ deferred.addCallback(lambda _:
+ threads.deferToThread(self.conn.get_all_buckets))
- def ensure_only_bucket(l, name):
- self.assertEquals(len(l), 1, "List didn't have exactly one element in it")
- self.assertEquals(l[0].name, name, "Wrong name")
- d.addCallback(ensure_only_bucket, bucket_name)
+ deferred.addCallback(self._ensure_one_bucket, bucket_name)
- d.addCallback(lambda _:threads.deferToThread(self.conn.delete_bucket, bucket_name))
- d.addCallback(lambda _:threads.deferToThread(self.conn.get_all_buckets))
- d.addCallback(self._ensure_empty_list)
- return d
+ deferred.addCallback(lambda _:
+ threads.deferToThread(self.conn.delete_bucket,
+ bucket_name))
+ deferred.addCallback(lambda _:
+ threads.deferToThread(self.conn.get_all_buckets))
+ deferred.addCallback(self._ensure_no_buckets)
+ return deferred
def test_002_create_bucket_and_key_and_delete_key_again(self):
+ """Test key operations on buckets."""
bucket_name = 'testbucket'
key_name = 'somekey'
key_contents = 'somekey'
- d = threads.deferToThread(self.conn.create_bucket, bucket_name)
- d.addCallback(lambda b:threads.deferToThread(b.new_key, key_name))
- d.addCallback(lambda k:threads.deferToThread(k.set_contents_from_string, key_contents))
+ deferred = threads.deferToThread(self.conn.create_bucket, bucket_name)
+ deferred.addCallback(lambda b:
+ threads.deferToThread(b.new_key, key_name))
+ deferred.addCallback(lambda k:
+ threads.deferToThread(k.set_contents_from_string,
+ key_contents))
+
def ensure_key_contents(bucket_name, key_name, contents):
+ """Verify contents for a key in the given bucket."""
bucket = self.conn.get_bucket(bucket_name)
key = bucket.get_key(key_name)
- self.assertEquals(key.get_contents_as_string(), contents, "Bad contents")
- d.addCallback(lambda _:threads.deferToThread(ensure_key_contents, bucket_name, key_name, key_contents))
+ self.assertEquals(key.get_contents_as_string(), contents,
+ "Bad contents")
+
+ deferred.addCallback(lambda _:
+ threads.deferToThread(ensure_key_contents,
+ bucket_name, key_name,
+ key_contents))
+
def delete_key(bucket_name, key_name):
+ """Delete a key for the given bucket."""
bucket = self.conn.get_bucket(bucket_name)
key = bucket.get_key(key_name)
key.delete()
- d.addCallback(lambda _:threads.deferToThread(delete_key, bucket_name, key_name))
- d.addCallback(lambda _:threads.deferToThread(self.conn.get_bucket, bucket_name))
- d.addCallback(lambda b:threads.deferToThread(b.get_all_keys))
- d.addCallback(self._ensure_empty_list)
- return d
-
- def tearDown(self):
- self.um.delete_user('admin')
- self.um.delete_project('admin')
- return defer.DeferredList([defer.maybeDeferred(self.listening_port.stopListening)])
- super(S3APITestCase, self).tearDown()
+
+ deferred.addCallback(lambda _:
+ threads.deferToThread(delete_key, bucket_name,
+ key_name))
+ deferred.addCallback(lambda _:
+ threads.deferToThread(self.conn.get_bucket,
+ bucket_name))
+ deferred.addCallback(lambda b: threads.deferToThread(b.get_all_keys))
+ deferred.addCallback(self._ensure_no_buckets)
+ return deferred
+
+ def tearDown(self): # pylint: disable-msg=C0103
+ """Tear down auth and test server."""
+ self.auth_manager.delete_user('admin')
+ self.auth_manager.delete_project('admin')
+ stop_listening = defer.maybeDeferred(self.listening_port.stopListening)
+ return defer.DeferredList([stop_listening])