From f0d5df523b982ef1737dc0ee2e698b13041af64c Mon Sep 17 00:00:00 2001 From: Johannes Erdfelt Date: Tue, 28 Feb 2012 05:54:48 +0000 Subject: Add utils.tempdir() context manager for easy temp dirs Fixes bug 883323 (and others) Users of tempfile.mkdtemp() need to make sure the directory is cleaned up when it's done being used. Unfortunately, not all of the code does so at all, or safely (by using a try/finally block). Change-Id: I270109d83efec4f8b3dd954021493f4d96c6ab79 --- nova/tests/test_crypto.py | 19 +--- nova/tests/test_imagecache.py | 232 +++++++++++++++++------------------------- nova/tests/test_libvirt.py | 36 +++---- 3 files changed, 113 insertions(+), 174 deletions(-) (limited to 'nova/tests') diff --git a/nova/tests/test_crypto.py b/nova/tests/test_crypto.py index 2bfb345b9..ee4837582 100644 --- a/nova/tests/test_crypto.py +++ b/nova/tests/test_crypto.py @@ -17,8 +17,6 @@ Tests for Crypto module. """ import os -import shutil -import tempfile import mox @@ -50,9 +48,8 @@ class SymmetricKeyTestCase(test.TestCase): class X509Test(test.TestCase): def test_can_generate_x509(self): - tmpdir = tempfile.mkdtemp() - self.flags(ca_path=tmpdir) - try: + with utils.tempdir() as tmpdir: + self.flags(ca_path=tmpdir) crypto.ensure_ca_filesystem() _key, cert_str = crypto.generate_x509_cert('fake', 'fake') @@ -70,14 +67,10 @@ class X509Test(test.TestCase): project_cert_file, '-verbose', signed_cert_file) self.assertFalse(err) - finally: - shutil.rmtree(tmpdir) - def test_encrypt_decrypt_x509(self): - tmpdir = tempfile.mkdtemp() - self.flags(ca_path=tmpdir) - project_id = "fake" - try: + with utils.tempdir() as tmpdir: + self.flags(ca_path=tmpdir) + project_id = "fake" crypto.ensure_ca_filesystem() cert = crypto.fetch_ca(project_id) public_key = os.path.join(tmpdir, "public.pem") @@ -92,8 +85,6 @@ class X509Test(test.TestCase): process_input=text) dec = crypto.decrypt_text(project_id, enc) self.assertEqual(text, dec) - finally: - shutil.rmtree(tmpdir) class RevokeCertsTest(test.TestCase): diff --git a/nova/tests/test_imagecache.py b/nova/tests/test_imagecache.py index c9f300de1..242f9c010 100644 --- a/nova/tests/test_imagecache.py +++ b/nova/tests/test_imagecache.py @@ -17,12 +17,11 @@ # under the License. +import contextlib import cStringIO import hashlib import logging import os -import shutil -import tempfile import time from nova import test @@ -58,9 +57,8 @@ class ImageCacheManagerTestCase(test.TestCase): self.assertEquals(csum, None) def test_read_stored_checksum(self): - try: - dirname = tempfile.mkdtemp() - fname = os.path.join(dirname, 'aaa') + with utils.tempdir() as tmpdir: + fname = os.path.join(tmpdir, 'aaa') csum_input = 'fdghkfhkgjjksfdgjksjkghsdf' f = open('%s.sha1' % fname, 'w') @@ -71,9 +69,6 @@ class ImageCacheManagerTestCase(test.TestCase): self.assertEquals(csum_input, csum_output) - finally: - shutil.rmtree(dirname) - def test_list_base_images(self): listing = ['00000001', 'ephemeral_0_20_None', @@ -281,13 +276,17 @@ class ImageCacheManagerTestCase(test.TestCase): (base_file2, True, False), (base_file3, False, True)]) + @contextlib.contextmanager def _intercept_log_messages(self): - mylog = log.getLogger() - stream = cStringIO.StringIO() - handler = logging.StreamHandler(stream) - handler.setFormatter(log.LegacyNovaFormatter()) - mylog.logger.addHandler(handler) - return mylog, handler, stream + try: + mylog = log.getLogger() + stream = cStringIO.StringIO() + handler = logging.StreamHandler(stream) + handler.setFormatter(log.LegacyNovaFormatter()) + mylog.logger.addHandler(handler) + yield stream + finally: + mylog.logger.removeHandler(handler) def test_verify_checksum(self): testdata = ('OpenStack Software delivers a massively scalable cloud ' @@ -295,74 +294,69 @@ class ImageCacheManagerTestCase(test.TestCase): img = {'container_format': 'ami', 'id': '42'} self.flags(checksum_base_images=True) - mylog, handler, stream = self._intercept_log_messages() - - try: - dirname = tempfile.mkdtemp() - fname = os.path.join(dirname, 'aaa') - - f = open(fname, 'w') - f.write(testdata) - f.close() - - # Checksum is valid - f = open('%s.sha1' % fname, 'w') - csum = hashlib.sha1() - csum.update(testdata) - f.write(csum.hexdigest()) - f.close() - - image_cache_manager = imagecache.ImageCacheManager() - res = image_cache_manager._verify_checksum(img, fname) - self.assertTrue(res) - - # Checksum is invalid - f = open('%s.sha1' % fname, 'w') - f.write('banana') - f.close() - - image_cache_manager = imagecache.ImageCacheManager() - res = image_cache_manager._verify_checksum(img, fname) - self.assertFalse(res) - self.assertNotEqual(stream.getvalue().find('image verification ' - 'failed'), -1) - - # Checksum file missing - os.remove('%s.sha1' % fname) - image_cache_manager = imagecache.ImageCacheManager() - res = image_cache_manager._verify_checksum(img, fname) - self.assertEquals(res, None) - - # Checksum requests for a file with no checksum now have the - # side effect of creating the checksum - self.assertTrue(os.path.exists('%s.sha1' % fname)) - - finally: - shutil.rmtree(dirname) - mylog.logger.removeHandler(handler) - def _make_base_file(checksum=True): + with self._intercept_log_messages() as stream: + with utils.tempdir() as tmpdir: + fname = os.path.join(tmpdir, 'aaa') + + f = open(fname, 'w') + f.write(testdata) + f.close() + + # Checksum is valid + f = open('%s.sha1' % fname, 'w') + csum = hashlib.sha1() + csum.update(testdata) + f.write(csum.hexdigest()) + f.close() + + image_cache_manager = imagecache.ImageCacheManager() + res = image_cache_manager._verify_checksum(img, fname) + self.assertTrue(res) + + # Checksum is invalid + f = open('%s.sha1' % fname, 'w') + f.write('banana') + f.close() + + image_cache_manager = imagecache.ImageCacheManager() + res = image_cache_manager._verify_checksum(img, fname) + self.assertFalse(res) + log = stream.getvalue() + self.assertNotEqual(log.find('image verification failed'), -1) + + # Checksum file missing + os.remove('%s.sha1' % fname) + image_cache_manager = imagecache.ImageCacheManager() + res = image_cache_manager._verify_checksum(img, fname) + self.assertEquals(res, None) + + # Checksum requests for a file with no checksum now have the + # side effect of creating the checksum + self.assertTrue(os.path.exists('%s.sha1' % fname)) + + @contextlib.contextmanager + def _make_base_file(self, checksum=True): """Make a base file for testing.""" - dirname = tempfile.mkdtemp() - fname = os.path.join(dirname, 'aaa') + with utils.tempdir() as tmpdir: + fname = os.path.join(tmpdir, 'aaa') - base_file = open(fname, 'w') - base_file.write('data') - base_file.close() - base_file = open(fname, 'r') + base_file = open(fname, 'w') + base_file.write('data') + base_file.close() + base_file = open(fname, 'r') - if checksum: - checksum_file = open('%s.sha1' % fname, 'w') - checksum_file.write(utils.hash_file(base_file)) - checksum_file.close() + if checksum: + checksum_file = open('%s.sha1' % fname, 'w') + checksum_file.write(utils.hash_file(base_file)) + checksum_file.close() - base_file.close() - return dirname, fname + base_file.close() + yield fname def test_remove_base_file(self): - dirname, fname = self._make_base_file() - try: + with self._make_base_file() as fname: image_cache_manager = imagecache.ImageCacheManager() image_cache_manager._remove_base_file(fname) @@ -377,12 +371,8 @@ class ImageCacheManagerTestCase(test.TestCase): self.assertFalse(os.path.exists(fname)) self.assertFalse(os.path.exists('%s.sha1' % fname)) - finally: - shutil.rmtree(dirname) - def test_remove_base_file_original(self): - dirname, fname = self._make_base_file() - try: + with self._make_base_file() as fname: image_cache_manager = imagecache.ImageCacheManager() image_cache_manager.originals = [fname] image_cache_manager._remove_base_file(fname) @@ -405,51 +395,38 @@ class ImageCacheManagerTestCase(test.TestCase): self.assertFalse(os.path.exists(fname)) self.assertFalse(os.path.exists('%s.sha1' % fname)) - finally: - shutil.rmtree(dirname) - def test_remove_base_file_dne(self): # This test is solely to execute the "does not exist" code path. We # don't expect the method being tested to do anything in this case. - dirname = tempfile.mkdtemp() - try: - fname = os.path.join(dirname, 'aaa') + with utils.tempdir() as tmpdir: + fname = os.path.join(tmpdir, 'aaa') image_cache_manager = imagecache.ImageCacheManager() image_cache_manager._remove_base_file(fname) - finally: - shutil.rmtree(dirname) - def test_remove_base_file_oserror(self): - dirname = tempfile.mkdtemp() - fname = os.path.join(dirname, 'aaa') - mylog, handler, stream = self._intercept_log_messages() + with self._intercept_log_messages() as stream: + with utils.tempdir() as tmpdir: + fname = os.path.join(tmpdir, 'aaa') - try: - os.mkdir(fname) - os.utime(fname, (-1, time.time() - 3601)) + os.mkdir(fname) + os.utime(fname, (-1, time.time() - 3601)) - # This will raise an OSError because of file permissions - image_cache_manager = imagecache.ImageCacheManager() - image_cache_manager._remove_base_file(fname) - - self.assertTrue(os.path.exists(fname)) - self.assertNotEqual(stream.getvalue().find('Failed to remove'), - -1) + # This will raise an OSError because of file permissions + image_cache_manager = imagecache.ImageCacheManager() + image_cache_manager._remove_base_file(fname) - finally: - shutil.rmtree(dirname) - mylog.logger.removeHandler(handler) + self.assertTrue(os.path.exists(fname)) + self.assertNotEqual(stream.getvalue().find('Failed to remove'), + -1) def test_handle_base_image_unused(self): img = {'container_format': 'ami', 'id': '123', 'uuid': '1234-4567-2378'} - dirname, fname = self._make_base_file() - os.utime(fname, (-1, time.time() - 3601)) + with self._make_base_file() as fname: + os.utime(fname, (-1, time.time() - 3601)) - try: image_cache_manager = imagecache.ImageCacheManager() image_cache_manager.unexplained_images = [fname] image_cache_manager._handle_base_image(img, fname) @@ -459,18 +436,14 @@ class ImageCacheManagerTestCase(test.TestCase): [fname]) self.assertEquals(image_cache_manager.corrupt_base_files, []) - finally: - shutil.rmtree(dirname) - def test_handle_base_image_used(self): img = {'container_format': 'ami', 'id': '123', 'uuid': '1234-4567-2378'} - dirname, fname = self._make_base_file() - os.utime(fname, (-1, time.time() - 3601)) + with self._make_base_file() as fname: + os.utime(fname, (-1, time.time() - 3601)) - try: image_cache_manager = imagecache.ImageCacheManager() image_cache_manager.unexplained_images = [fname] image_cache_manager.used_images = {'123': (1, 0, ['banana-42'])} @@ -480,18 +453,14 @@ class ImageCacheManagerTestCase(test.TestCase): self.assertEquals(image_cache_manager.removable_base_files, []) self.assertEquals(image_cache_manager.corrupt_base_files, []) - finally: - shutil.rmtree(dirname) - def test_handle_base_image_used_remotely(self): img = {'container_format': 'ami', 'id': '123', 'uuid': '1234-4567-2378'} - dirname, fname = self._make_base_file() - os.utime(fname, (-1, time.time() - 3601)) + with self._make_base_file() as fname: + os.utime(fname, (-1, time.time() - 3601)) - try: image_cache_manager = imagecache.ImageCacheManager() image_cache_manager.used_images = {'123': (0, 1, ['banana-42'])} image_cache_manager._handle_base_image(img, None) @@ -500,9 +469,6 @@ class ImageCacheManagerTestCase(test.TestCase): self.assertEquals(image_cache_manager.removable_base_files, []) self.assertEquals(image_cache_manager.corrupt_base_files, []) - finally: - shutil.rmtree(dirname) - def test_handle_base_image_absent(self): """Ensure we warn for use of a missing base image.""" @@ -510,9 +476,7 @@ class ImageCacheManagerTestCase(test.TestCase): 'id': '123', 'uuid': '1234-4567-2378'} - mylog, handler, stream = self._intercept_log_messages() - - try: + with self._intercept_log_messages() as stream: image_cache_manager = imagecache.ImageCacheManager() image_cache_manager.used_images = {'123': (1, 0, ['banana-42'])} image_cache_manager._handle_base_image(img, None) @@ -523,18 +487,14 @@ class ImageCacheManagerTestCase(test.TestCase): self.assertNotEqual(stream.getvalue().find('an absent base file'), -1) - finally: - mylog.logger.removeHandler(handler) - def test_handle_base_image_used_missing(self): img = {'container_format': 'ami', 'id': '123', 'uuid': '1234-4567-2378'} - dirname = tempfile.mkdtemp() - fname = os.path.join(dirname, 'aaa') + with utils.tempdir() as tmpdir: + fname = os.path.join(tmpdir, 'aaa') - try: image_cache_manager = imagecache.ImageCacheManager() image_cache_manager.unexplained_images = [fname] image_cache_manager.used_images = {'123': (1, 0, ['banana-42'])} @@ -544,17 +504,12 @@ class ImageCacheManagerTestCase(test.TestCase): self.assertEquals(image_cache_manager.removable_base_files, []) self.assertEquals(image_cache_manager.corrupt_base_files, []) - finally: - shutil.rmtree(dirname) - def test_handle_base_image_checksum_fails(self): img = {'container_format': 'ami', 'id': '123', 'uuid': '1234-4567-2378'} - dirname, fname = self._make_base_file() - - try: + with self._make_base_file() as fname: f = open(fname, 'w') f.write('banana') f.close() @@ -569,9 +524,6 @@ class ImageCacheManagerTestCase(test.TestCase): self.assertEquals(image_cache_manager.corrupt_base_files, [fname]) - finally: - shutil.rmtree(dirname) - def test_verify_base_images(self): self.flags(instances_path='/instance_path') self.flags(remove_unused_base_images=True) diff --git a/nova/tests/test_libvirt.py b/nova/tests/test_libvirt.py index 3044cd136..3b866ed01 100644 --- a/nova/tests/test_libvirt.py +++ b/nova/tests/test_libvirt.py @@ -1050,26 +1050,25 @@ class LibvirtConnTestCase(test.TestCase): def test_pre_block_migration_works_correctly(self): """Confirms pre_block_migration works correctly.""" # Replace instances_path since this testcase creates tmpfile - tmpdir = tempfile.mkdtemp() - self.flags(instances_path=tmpdir) + with utils.tempdir() as tmpdir: + self.flags(instances_path=tmpdir) - # Test data - instance_ref = db.instance_create(self.context, self.test_instance) - dummyjson = ('[{"path": "%s/disk", "disk_size": "10737418240",' - ' "type": "raw", "backing_file": ""}]') + # Test data + instance_ref = db.instance_create(self.context, self.test_instance) + dummyjson = ('[{"path": "%s/disk", "disk_size": "10737418240",' + ' "type": "raw", "backing_file": ""}]') - # Preparing mocks - # qemu-img should be mockd since test environment might not have - # large disk space. - self.mox.ReplayAll() - conn = connection.LibvirtConnection(False) - conn.pre_block_migration(self.context, instance_ref, - dummyjson % tmpdir) + # Preparing mocks + # qemu-img should be mockd since test environment might not have + # large disk space. + self.mox.ReplayAll() + conn = connection.LibvirtConnection(False) + conn.pre_block_migration(self.context, instance_ref, + dummyjson % tmpdir) - self.assertTrue(os.path.exists('%s/%s/' % - (tmpdir, instance_ref.name))) + self.assertTrue(os.path.exists('%s/%s/' % + (tmpdir, instance_ref.name))) - shutil.rmtree(tmpdir) db.instance_destroy(self.context, instance_ref['id']) @test.skip_if(missing_libvirt(), "Test requires libvirt") @@ -1926,13 +1925,10 @@ disk size: 4.4M''', '')) libvirt_utils.mkfs('swap', '/my/swap/block/dev') def test_ensure_tree(self): - tmpdir = tempfile.mkdtemp() - try: + with utils.tempdir() as tmpdir: testdir = '%s/foo/bar/baz' % (tmpdir,) libvirt_utils.ensure_tree(testdir) self.assertTrue(os.path.isdir(testdir)) - finally: - shutil.rmtree(tmpdir) def test_write_to_file(self): dst_fd, dst_path = tempfile.mkstemp() -- cgit