summaryrefslogtreecommitdiffstats
path: root/nova/tests
diff options
context:
space:
mode:
authorKevin L. Mitchell <kevin.mitchell@rackspace.com>2011-07-29 19:52:11 +0000
committerKevin L. Mitchell <kevin.mitchell@rackspace.com>2011-07-29 19:52:11 +0000
commit73711a9e260fd8b6f747b9c8f09511eba149a1fb (patch)
treeef91156b5b04e85e03bfab44629966e8130d4290 /nova/tests
parent62c7ca622a42aaed9a4f23e8fc2167655b2ff58f (diff)
parent6703e33a68d0653f486d679337b4dfc4239eba34 (diff)
pull-up from trunk
Diffstat (limited to 'nova/tests')
-rw-r--r--nova/tests/api/openstack/test_servers.py59
-rw-r--r--nova/tests/scheduler/test_scheduler.py5
-rw-r--r--nova/tests/scheduler/test_zone_aware_scheduler.py18
-rw-r--r--nova/tests/test_adminapi.py2
-rw-r--r--nova/tests/test_cloud.py32
-rw-r--r--nova/tests/test_rpc.py61
-rw-r--r--nova/tests/test_rpc_amqp.py68
-rw-r--r--nova/tests/test_service.py170
-rw-r--r--nova/tests/test_test.py13
-rw-r--r--nova/tests/test_xenapi.py16
10 files changed, 180 insertions, 264 deletions
diff --git a/nova/tests/api/openstack/test_servers.py b/nova/tests/api/openstack/test_servers.py
index 4027ef829..221c1f4f7 100644
--- a/nova/tests/api/openstack/test_servers.py
+++ b/nova/tests/api/openstack/test_servers.py
@@ -2167,11 +2167,14 @@ class ServersTest(test.TestCase):
self.assertEqual(self.resize_called, True)
def test_resize_server_v11(self):
-
req = webob.Request.blank('/v1.1/servers/1/action')
req.content_type = 'application/json'
req.method = 'POST'
- body_dict = dict(resize=dict(flavorRef="http://localhost/3"))
+ body_dict = {
+ "resize": {
+ "flavorRef": 3,
+ },
+ }
req.body = json.dumps(body_dict)
self.resize_called = False
@@ -2185,8 +2188,8 @@ class ServersTest(test.TestCase):
self.assertEqual(res.status_int, 202)
self.assertEqual(self.resize_called, True)
- def test_resize_bad_flavor_fails(self):
- req = self.webreq('/1/action', 'POST', dict(resize=dict(derp=3)))
+ def test_resize_bad_flavor_data(self):
+ req = self.webreq('/1/action', 'POST', {"resize": "bad_data"})
self.resize_called = False
@@ -2196,14 +2199,54 @@ class ServersTest(test.TestCase):
self.stubs.Set(nova.compute.api.API, 'resize', resize_mock)
res = req.get_response(fakes.wsgi_app())
- self.assertEqual(res.status_int, 422)
+ self.assertEqual(res.status_int, 400)
self.assertEqual(self.resize_called, False)
+ def test_resize_invalid_flavorid(self):
+ req = self.webreq('/1/action', 'POST', {"resize": {"flavorId": 300}})
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 400)
+
+ def test_resize_nonint_flavorid(self):
+ req = self.webreq('/1/action', 'POST', {"resize": {"flavorId": "a"}})
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 400)
+
+ def test_resize_invalid_flavorid_v1_1(self):
+ req = webob.Request.blank('/v1.1/servers/1/action')
+ req.content_type = 'application/json'
+ req.method = 'POST'
+ resize_body = {
+ "resize": {
+ "image": {
+ "id": 300,
+ },
+ },
+ }
+ req.body = json.dumps(resize_body)
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 400)
+
+ def test_resize_nonint_flavorid_v1_1(self):
+ req = webob.Request.blank('/v1.1/servers/1/action')
+ req.content_type = 'application/json'
+ req.method = 'POST'
+ resize_body = {
+ "resize": {
+ "image": {
+ "id": "a",
+ },
+ },
+ }
+ req.body = json.dumps(resize_body)
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 400)
+
def test_resize_raises_fails(self):
req = self.webreq('/1/action', 'POST', dict(resize=dict(flavorId=3)))
def resize_mock(*args):
- raise Exception('hurr durr')
+ raise Exception("An error occurred.")
self.stubs.Set(nova.compute.api.API, 'resize', resize_mock)
@@ -2241,7 +2284,7 @@ class ServersTest(test.TestCase):
req = self.webreq('/1/action', 'POST', dict(confirmResize=None))
def confirm_resize_mock(*args):
- raise Exception('hurr durr')
+ raise Exception("An error occurred.")
self.stubs.Set(nova.compute.api.API, 'confirm_resize',
confirm_resize_mock)
@@ -2268,7 +2311,7 @@ class ServersTest(test.TestCase):
req = self.webreq('/1/action', 'POST', dict(revertResize=None))
def revert_resize_mock(*args):
- raise Exception('hurr durr')
+ raise Exception("An error occurred.")
self.stubs.Set(nova.compute.api.API, 'revert_resize',
revert_resize_mock)
diff --git a/nova/tests/scheduler/test_scheduler.py b/nova/tests/scheduler/test_scheduler.py
index daea826fd..18fbef5ca 100644
--- a/nova/tests/scheduler/test_scheduler.py
+++ b/nova/tests/scheduler/test_scheduler.py
@@ -485,11 +485,6 @@ class SimpleDriverTestCase(test.TestCase):
self.assertEqual(host, 'host2')
volume1.delete_volume(self.context, volume_id1)
db.volume_destroy(self.context, volume_id2)
- dic = {'service_id': s_ref['id'],
- 'vcpus': 16, 'memory_mb': 32, 'local_gb': 100,
- 'vcpus_used': 16, 'memory_mb_used': 12, 'local_gb_used': 10,
- 'hypervisor_type': 'qemu', 'hypervisor_version': 12003,
- 'cpu_info': ''}
def test_doesnt_report_disabled_hosts_as_up(self):
"""Ensures driver doesn't find hosts before they are enabled"""
diff --git a/nova/tests/scheduler/test_zone_aware_scheduler.py b/nova/tests/scheduler/test_zone_aware_scheduler.py
index d74b71fb6..7833028c3 100644
--- a/nova/tests/scheduler/test_zone_aware_scheduler.py
+++ b/nova/tests/scheduler/test_zone_aware_scheduler.py
@@ -16,6 +16,8 @@
Tests For Zone Aware Scheduler.
"""
+import json
+
import nova.db
from nova import exception
@@ -327,3 +329,19 @@ class ZoneAwareSchedulerTestCase(test.TestCase):
sched._provision_resource_from_blob(None, request_spec, 1,
request_spec, {})
self.assertTrue(was_called)
+
+ def test_decrypt_blob(self):
+ """Test that the decrypt method works."""
+
+ fixture = FakeZoneAwareScheduler()
+ test_data = {"foo": "bar"}
+
+ class StubDecryptor(object):
+ def decryptor(self, key):
+ return lambda blob: blob
+
+ self.stubs.Set(zone_aware_scheduler, 'crypto',
+ StubDecryptor())
+
+ self.assertEqual(fixture._decrypt_blob(test_data),
+ json.dumps(test_data))
diff --git a/nova/tests/test_adminapi.py b/nova/tests/test_adminapi.py
index 877cf4ea1..6bbe15f53 100644
--- a/nova/tests/test_adminapi.py
+++ b/nova/tests/test_adminapi.py
@@ -39,7 +39,7 @@ class AdminApiTestCase(test.TestCase):
super(AdminApiTestCase, self).setUp()
self.flags(connection_type='fake')
- self.conn = rpc.Connection.instance()
+ self.conn = rpc.create_connection()
# set up our cloud
self.api = admin.AdminController()
diff --git a/nova/tests/test_cloud.py b/nova/tests/test_cloud.py
index e419e7a50..50df344d6 100644
--- a/nova/tests/test_cloud.py
+++ b/nova/tests/test_cloud.py
@@ -50,7 +50,7 @@ class CloudTestCase(test.TestCase):
self.flags(connection_type='fake',
stub_network=True)
- self.conn = rpc.Connection.instance()
+ self.conn = rpc.create_connection()
# set up our cloud
self.cloud = cloud.CloudController()
@@ -326,22 +326,15 @@ class CloudTestCase(test.TestCase):
revoke = self.cloud.revoke_security_group_ingress
self.assertTrue(revoke(self.context, group_name=sec['name'], **kwargs))
- def test_revoke_security_group_ingress_by_id(self):
- kwargs = {'project_id': self.context.project_id, 'name': 'test'}
- sec = db.security_group_create(self.context, kwargs)
- authz = self.cloud.authorize_security_group_ingress
- kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
- authz(self.context, group_id=sec['id'], **kwargs)
- revoke = self.cloud.revoke_security_group_ingress
- self.assertTrue(revoke(self.context, group_id=sec['id'], **kwargs))
-
- def test_authorize_security_group_ingress_by_id(self):
+ def test_authorize_revoke_security_group_ingress_by_id(self):
sec = db.security_group_create(self.context,
{'project_id': self.context.project_id,
'name': 'test'})
authz = self.cloud.authorize_security_group_ingress
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
- self.assertTrue(authz(self.context, group_id=sec['id'], **kwargs))
+ authz(self.context, group_id=sec['id'], **kwargs)
+ revoke = self.cloud.revoke_security_group_ingress
+ self.assertTrue(revoke(self.context, group_id=sec['id'], **kwargs))
def test_authorize_security_group_ingress_missing_protocol_params(self):
sec = db.security_group_create(self.context,
@@ -961,21 +954,6 @@ class CloudTestCase(test.TestCase):
self._wait_for_running(ec2_instance_id)
return ec2_instance_id
- def test_rescue_unrescue_instance(self):
- instance_id = self._run_instance(
- image_id='ami-1',
- instance_type=FLAGS.default_instance_type,
- max_count=1)
- self.cloud.rescue_instance(context=self.context,
- instance_id=instance_id)
- # NOTE(vish): This currently does no validation, it simply makes sure
- # that the code path doesn't throw an exception.
- self.cloud.unrescue_instance(context=self.context,
- instance_id=instance_id)
- # TODO(soren): We need this until we can stop polling in the rpc code
- # for unit tests.
- self.cloud.terminate_instances(self.context, [instance_id])
-
def test_console_output(self):
instance_id = self._run_instance(
image_id='ami-1',
diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py
index ffd748efe..2d2436175 100644
--- a/nova/tests/test_rpc.py
+++ b/nova/tests/test_rpc.py
@@ -33,11 +33,12 @@ LOG = logging.getLogger('nova.tests.rpc')
class RpcTestCase(test.TestCase):
def setUp(self):
super(RpcTestCase, self).setUp()
- self.conn = rpc.Connection.instance(True)
+ self.conn = rpc.create_connection(True)
self.receiver = TestReceiver()
- self.consumer = rpc.TopicAdapterConsumer(connection=self.conn,
- topic='test',
- proxy=self.receiver)
+ self.consumer = rpc.create_consumer(self.conn,
+ 'test',
+ self.receiver,
+ False)
self.consumer.attach_to_eventlet()
self.context = context.get_admin_context()
@@ -129,6 +130,8 @@ class RpcTestCase(test.TestCase):
"""Calls echo in the passed queue"""
LOG.debug(_("Nested received %(queue)s, %(value)s")
% locals())
+ # TODO: so, it will replay the context and use the same REQID?
+ # that's bizarre.
ret = rpc.call(context,
queue,
{"method": "echo",
@@ -137,10 +140,11 @@ class RpcTestCase(test.TestCase):
return value
nested = Nested()
- conn = rpc.Connection.instance(True)
- consumer = rpc.TopicAdapterConsumer(connection=conn,
- topic='nested',
- proxy=nested)
+ conn = rpc.create_connection(True)
+ consumer = rpc.create_consumer(conn,
+ 'nested',
+ nested,
+ False)
consumer.attach_to_eventlet()
value = 42
result = rpc.call(self.context,
@@ -149,47 +153,6 @@ class RpcTestCase(test.TestCase):
"value": value}})
self.assertEqual(value, result)
- def test_connectionpool_single(self):
- """Test that ConnectionPool recycles a single connection."""
- conn1 = rpc.ConnectionPool.get()
- rpc.ConnectionPool.put(conn1)
- conn2 = rpc.ConnectionPool.get()
- rpc.ConnectionPool.put(conn2)
- self.assertEqual(conn1, conn2)
-
- def test_connectionpool_double(self):
- """Test that ConnectionPool returns and reuses separate connections.
-
- When called consecutively we should get separate connections and upon
- returning them those connections should be reused for future calls
- before generating a new connection.
-
- """
- conn1 = rpc.ConnectionPool.get()
- conn2 = rpc.ConnectionPool.get()
-
- self.assertNotEqual(conn1, conn2)
- rpc.ConnectionPool.put(conn1)
- rpc.ConnectionPool.put(conn2)
-
- conn3 = rpc.ConnectionPool.get()
- conn4 = rpc.ConnectionPool.get()
- self.assertEqual(conn1, conn3)
- self.assertEqual(conn2, conn4)
-
- def test_connectionpool_limit(self):
- """Test connection pool limit and connection uniqueness."""
- max_size = FLAGS.rpc_conn_pool_size
- conns = []
-
- for i in xrange(max_size):
- conns.append(rpc.ConnectionPool.get())
-
- self.assertFalse(rpc.ConnectionPool.free_items)
- self.assertEqual(rpc.ConnectionPool.current_size,
- rpc.ConnectionPool.max_size)
- self.assertEqual(len(set(conns)), max_size)
-
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call.
diff --git a/nova/tests/test_rpc_amqp.py b/nova/tests/test_rpc_amqp.py
new file mode 100644
index 000000000..d29f7ae32
--- /dev/null
+++ b/nova/tests/test_rpc_amqp.py
@@ -0,0 +1,68 @@
+from nova import context
+from nova import flags
+from nova import log as logging
+from nova import rpc
+from nova.rpc import amqp
+from nova import test
+
+
+FLAGS = flags.FLAGS
+LOG = logging.getLogger('nova.tests.rpc')
+
+
+class RpcAMQPTestCase(test.TestCase):
+ def setUp(self):
+ super(RpcAMQPTestCase, self).setUp()
+ self.conn = rpc.create_connection(True)
+ self.receiver = TestReceiver()
+ self.consumer = rpc.create_consumer(self.conn,
+ 'test',
+ self.receiver,
+ False)
+ self.consumer.attach_to_eventlet()
+ self.context = context.get_admin_context()
+
+ def test_connectionpool_single(self):
+ """Test that ConnectionPool recycles a single connection."""
+ conn1 = amqp.ConnectionPool.get()
+ amqp.ConnectionPool.put(conn1)
+ conn2 = amqp.ConnectionPool.get()
+ amqp.ConnectionPool.put(conn2)
+ self.assertEqual(conn1, conn2)
+
+
+class TestReceiver(object):
+ """Simple Proxy class so the consumer has methods to call.
+
+ Uses static methods because we aren't actually storing any state.
+
+ """
+
+ @staticmethod
+ def echo(context, value):
+ """Simply returns whatever value is sent in."""
+ LOG.debug(_("Received %s"), value)
+ return value
+
+ @staticmethod
+ def context(context, value):
+ """Returns dictionary version of context."""
+ LOG.debug(_("Received %s"), context)
+ return context.to_dict()
+
+ @staticmethod
+ def echo_three_times(context, value):
+ context.reply(value)
+ context.reply(value + 1)
+ context.reply(value + 2)
+
+ @staticmethod
+ def echo_three_times_yield(context, value):
+ yield value
+ yield value + 1
+ yield value + 2
+
+ @staticmethod
+ def fail(context, value):
+ """Raises an exception with the value sent in."""
+ raise Exception(value)
diff --git a/nova/tests/test_service.py b/nova/tests/test_service.py
index f45f76b73..bbf47b50f 100644
--- a/nova/tests/test_service.py
+++ b/nova/tests/test_service.py
@@ -109,103 +109,8 @@ class ServiceTestCase(test.TestCase):
# the looping calls are created in StartService.
app = service.Service.create(host=host, binary=binary, topic=topic)
- self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
- service.rpc.Connection.instance(new=mox.IgnoreArg())
-
- self.mox.StubOutWithMock(rpc,
- 'TopicAdapterConsumer',
- use_mock_anything=True)
- self.mox.StubOutWithMock(rpc,
- 'FanoutAdapterConsumer',
- use_mock_anything=True)
-
- self.mox.StubOutWithMock(rpc,
- 'ConsumerSet',
- use_mock_anything=True)
-
- rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
- topic=topic,
- proxy=mox.IsA(service.Service)).AndReturn(
- rpc.TopicAdapterConsumer)
-
- rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
- topic='%s.%s' % (topic, host),
- proxy=mox.IsA(service.Service)).AndReturn(
- rpc.TopicAdapterConsumer)
-
- rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(),
- topic=topic,
- proxy=mox.IsA(service.Service)).AndReturn(
- rpc.FanoutAdapterConsumer)
-
- def wait_func(self, limit=None):
- return None
-
- mock_cset = self.mox.CreateMock(rpc.ConsumerSet,
- {'wait': wait_func})
- rpc.ConsumerSet(connection=mox.IgnoreArg(),
- consumer_list=mox.IsA(list)).AndReturn(mock_cset)
- wait_func(mox.IgnoreArg())
-
- service_create = {'host': host,
- 'binary': binary,
- 'topic': topic,
- 'report_count': 0,
- 'availability_zone': 'nova'}
- service_ref = {'host': host,
- 'binary': binary,
- 'report_count': 0,
- 'id': 1}
-
- service.db.service_get_by_args(mox.IgnoreArg(),
- host,
- binary).AndRaise(exception.NotFound())
- service.db.service_create(mox.IgnoreArg(),
- service_create).AndReturn(service_ref)
- self.mox.ReplayAll()
-
- app.start()
- app.stop()
self.assert_(app)
- # We're testing sort of weird behavior in how report_state decides
- # whether it is disconnected, it looks for a variable on itself called
- # 'model_disconnected' and report_state doesn't really do much so this
- # these are mostly just for coverage
- def test_report_state_no_service(self):
- host = 'foo'
- binary = 'bar'
- topic = 'test'
- service_create = {'host': host,
- 'binary': binary,
- 'topic': topic,
- 'report_count': 0,
- 'availability_zone': 'nova'}
- service_ref = {'host': host,
- 'binary': binary,
- 'topic': topic,
- 'report_count': 0,
- 'availability_zone': 'nova',
- 'id': 1}
-
- service.db.service_get_by_args(mox.IgnoreArg(),
- host,
- binary).AndRaise(exception.NotFound())
- service.db.service_create(mox.IgnoreArg(),
- service_create).AndReturn(service_ref)
- service.db.service_get(mox.IgnoreArg(),
- service_ref['id']).AndReturn(service_ref)
- service.db.service_update(mox.IgnoreArg(), service_ref['id'],
- mox.ContainsKeyValue('report_count', 1))
-
- self.mox.ReplayAll()
- serv = service.Service(host,
- binary,
- topic,
- 'nova.tests.test_service.FakeManager')
- serv.start()
- serv.report_state()
-
def test_report_state_newly_disconnected(self):
host = 'foo'
binary = 'bar'
@@ -276,81 +181,6 @@ class ServiceTestCase(test.TestCase):
self.assert_(not serv.model_disconnected)
- def test_compute_can_update_available_resource(self):
- """Confirm compute updates their record of compute-service table."""
- host = 'foo'
- binary = 'nova-compute'
- topic = 'compute'
-
- # Any mocks are not working without UnsetStubs() here.
- self.mox.UnsetStubs()
- ctxt = context.get_admin_context()
- service_ref = db.service_create(ctxt, {'host': host,
- 'binary': binary,
- 'topic': topic})
- serv = service.Service(host,
- binary,
- topic,
- 'nova.compute.manager.ComputeManager')
-
- # This testcase want to test calling update_available_resource.
- # No need to call periodic call, then below variable must be set 0.
- serv.report_interval = 0
- serv.periodic_interval = 0
-
- # Creating mocks
- self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
- service.rpc.Connection.instance(new=mox.IgnoreArg())
-
- self.mox.StubOutWithMock(rpc,
- 'TopicAdapterConsumer',
- use_mock_anything=True)
- self.mox.StubOutWithMock(rpc,
- 'FanoutAdapterConsumer',
- use_mock_anything=True)
-
- self.mox.StubOutWithMock(rpc,
- 'ConsumerSet',
- use_mock_anything=True)
-
- rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
- topic=topic,
- proxy=mox.IsA(service.Service)).AndReturn(
- rpc.TopicAdapterConsumer)
-
- rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
- topic='%s.%s' % (topic, host),
- proxy=mox.IsA(service.Service)).AndReturn(
- rpc.TopicAdapterConsumer)
-
- rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(),
- topic=topic,
- proxy=mox.IsA(service.Service)).AndReturn(
- rpc.FanoutAdapterConsumer)
-
- def wait_func(self, limit=None):
- return None
-
- mock_cset = self.mox.CreateMock(rpc.ConsumerSet,
- {'wait': wait_func})
- rpc.ConsumerSet(connection=mox.IgnoreArg(),
- consumer_list=mox.IsA(list)).AndReturn(mock_cset)
- wait_func(mox.IgnoreArg())
-
- self.mox.StubOutWithMock(serv.manager.driver,
- 'update_available_resource')
- serv.manager.driver.update_available_resource(mox.IgnoreArg(), host)
-
- # Just doing start()-stop(), not confirm new db record is created,
- # because update_available_resource() works only in
- # libvirt environment. This testcase confirms
- # update_available_resource() is called. Otherwise, mox complains.
- self.mox.ReplayAll()
- serv.start()
- serv.stop()
-
- db.service_destroy(ctxt, service_ref['id'])
-
class TestWSGIService(test.TestCase):
diff --git a/nova/tests/test_test.py b/nova/tests/test_test.py
index 35c838065..64f11fa45 100644
--- a/nova/tests/test_test.py
+++ b/nova/tests/test_test.py
@@ -33,8 +33,13 @@ class IsolationTestCase(test.TestCase):
self.start_service('compute')
def test_rpc_consumer_isolation(self):
- connection = rpc.Connection.instance(new=True)
- consumer = rpc.TopicAdapterConsumer(connection, topic='compute')
- consumer.register_callback(
- lambda x, y: self.fail('I should never be called'))
+ class NeverCalled(object):
+
+ def __getattribute__(*args):
+ assert False, "I should never get called."
+
+ connection = rpc.create_connection(new=True)
+ proxy = NeverCalled()
+ consumer = rpc.create_consumer(connection, 'compute',
+ proxy, fanout=False)
consumer.attach_to_eventlet()
diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py
index 1a8ddadc4..c2b97a5e3 100644
--- a/nova/tests/test_xenapi.py
+++ b/nova/tests/test_xenapi.py
@@ -881,6 +881,22 @@ class XenAPIMigrateInstance(test.TestCase):
network_info, resize_instance=False)
+class XenAPIImageTypeTestCase(test.TestCase):
+ """Test ImageType class."""
+
+ def test_to_string(self):
+ """Can convert from type id to type string."""
+ self.assertEquals(
+ vm_utils.ImageType.to_string(vm_utils.ImageType.KERNEL),
+ vm_utils.ImageType.KERNEL_STR)
+
+ def test_from_string(self):
+ """Can convert from string to type id."""
+ self.assertEquals(
+ vm_utils.ImageType.from_string(vm_utils.ImageType.KERNEL_STR),
+ vm_utils.ImageType.KERNEL)
+
+
class XenAPIDetermineDiskImageTestCase(test.TestCase):
"""Unit tests for code that detects the ImageType."""
def setUp(self):