summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJohn Tran <jtran@attinteractive.com>2011-06-23 11:31:22 -0700
committerJohn Tran <jtran@attinteractive.com>2011-06-23 11:31:22 -0700
commitd5206c7f41c435fd39c1bb9c0fd7ec53c9685f43 (patch)
treec25c76e6bd2617012a7ee670095043ad29510e10
parent3f2c0521f1c8462380c68d5245b5754867738fa1 (diff)
downloadnova-d5206c7f41c435fd39c1bb9c0fd7ec53c9685f43.tar.gz
nova-d5206c7f41c435fd39c1bb9c0fd7ec53c9685f43.tar.xz
nova-d5206c7f41c435fd39c1bb9c0fd7ec53c9685f43.zip
altho security_group authorize & revoke tests already exist in test_api, adding some direct ec2 api method tests. added group_id param support to the pertinent security group methods
-rw-r--r--nova/api/ec2/cloud.py78
-rw-r--r--nova/tests/test_cloud.py71
2 files changed, 129 insertions, 20 deletions
diff --git a/nova/api/ec2/cloud.py b/nova/api/ec2/cloud.py
index 9364b0bdd..75b1fb2a7 100644
--- a/nova/api/ec2/cloud.py
+++ b/nova/api/ec2/cloud.py
@@ -391,7 +391,8 @@ class CloudController(object):
pass
return True
- def describe_security_groups(self, context, group_name=None, group_id=None, **kwargs):
+ def describe_security_groups(self, context, group_name=None, group_id=None,
+ **kwargs):
self.compute_api.ensure_default_security_group(context)
if group_name or group_id:
groups = []
@@ -403,7 +404,7 @@ class CloudController(object):
groups.append(group)
if group_id:
for gid in group_id:
- group = db.security_group_get(context, context.project_id, name)
+ group = db.security_group_get(context, gid)
groups.append(group)
elif context.is_admin:
groups = db.security_group_get_all(context)
@@ -502,13 +503,26 @@ class CloudController(object):
return True
return False
- def revoke_security_group_ingress(self, context, group_name, **kwargs):
- LOG.audit(_("Revoke security group ingress %s"), group_name,
- context=context)
+ def revoke_security_group_ingress(self, context, group_name=None,
+ group_id=None, **kwargs):
+ if not group_name and not group_id:
+ err = "Not enough parameters, need group_name or group_id"
+ raise exception.ApiError(_(err))
self.compute_api.ensure_default_security_group(context)
- security_group = db.security_group_get_by_name(context,
- context.project_id,
- group_name)
+ notfound = exception.SecurityGroupNotFound
+ if group_name:
+ security_group = db.security_group_get_by_name(context,
+ context.project_id,
+ group_name)
+ if not security_group:
+ raise notfound(security_group_id=group_name)
+ if group_id:
+ security_group = db.security_group_get(context, group_id)
+ if not security_group:
+ raise notfound(security_group_id=group_id)
+
+ msg = "Revoke security group ingress %s"
+ LOG.audit(_(msg), security_group['name'], context=context)
criteria = self._revoke_rule_args_to_dict(context, **kwargs)
if criteria is None:
@@ -531,14 +545,26 @@ class CloudController(object):
# Unfortunately, it seems Boto is using an old API
# for these operations, so support for newer API versions
# is sketchy.
- def authorize_security_group_ingress(self, context, group_name, **kwargs):
- LOG.audit(_("Authorize security group ingress %s"), group_name,
- context=context)
+ def authorize_security_group_ingress(self, context, group_name=None,
+ group_id=None, **kwargs):
+ if not group_name and not group_id:
+ err = "Not enough parameters, need group_name or group_id"
+ raise exception.ApiError(_(err))
self.compute_api.ensure_default_security_group(context)
- security_group = db.security_group_get_by_name(context,
- context.project_id,
- group_name)
-
+ notfound = exception.SecurityGroupNotFound
+ if group_name:
+ security_group = db.security_group_get_by_name(context,
+ context.project_id,
+ group_name)
+ if not security_group:
+ raise notfound(security_group_id=group_name)
+ if group_id:
+ security_group = db.security_group_get(context, group_id)
+ if not security_group:
+ raise notfound(security_group_id=group_id)
+
+ msg = "Authorize security group ingress %s"
+ LOG.audit(_(msg), security_group['name'], context=context)
values = self._revoke_rule_args_to_dict(context, **kwargs)
if values is None:
raise exception.ApiError(_("Not enough parameters to build a "
@@ -573,7 +599,7 @@ class CloudController(object):
return source_project_id
- def create_security_group(self, context, group_name, group_description, group_id=None):
+ def create_security_group(self, context, group_name, group_description):
LOG.audit(_("Create Security Group %s"), group_name, context=context)
self.compute_api.ensure_default_security_group(context)
if db.security_group_exists(context, context.project_id, group_name):
@@ -588,11 +614,23 @@ class CloudController(object):
return {'securityGroupSet': [self._format_security_group(context,
group_ref)]}
- def delete_security_group(self, context, group_name, **kwargs):
+ def delete_security_group(self, context, group_name=None, group_id=None,
+ **kwargs):
+ if not group_name and not group_id:
+ err = "Not enough parameters, need group_name or group_id"
+ raise exception.ApiError(_(err))
+ notfound = exception.SecurityGroupNotFound
+ if group_name:
+ security_group = db.security_group_get_by_name(context,
+ context.project_id,
+ group_name)
+ if not security_group:
+ raise notfound(security_group_id=group_name)
+ elif group_id:
+ security_group = db.security_group_get(context, group_id)
+ if not security_group:
+ raise notfound(security_group_id=group_id)
LOG.audit(_("Delete security group %s"), group_name, context=context)
- security_group = db.security_group_get_by_name(context,
- context.project_id,
- group_name)
db.security_group_destroy(context, security_group.id)
return True
diff --git a/nova/tests/test_cloud.py b/nova/tests/test_cloud.py
index 2bd5979e7..8cbab09a9 100644
--- a/nova/tests/test_cloud.py
+++ b/nova/tests/test_cloud.py
@@ -165,6 +165,27 @@ class CloudTestCase(test.TestCase):
sec['name'])
db.security_group_destroy(self.context, sec['id'])
+ def test_describe_security_groups_by_id(self):
+ sec = db.security_group_create(self.context,
+ {'project_id': self.context.project_id,
+ 'name': 'test'})
+ result = self.cloud.describe_security_groups(self.context,
+ group_id=[sec['id']])
+ self.assertEqual(len(result['securityGroupInfo']), 1)
+ self.assertEqual(
+ result['securityGroupInfo'][0]['groupName'],
+ sec['name'])
+ default = db.security_group_get_by_name(self.context,
+ self.context.project_id,
+ 'default')
+ result = self.cloud.describe_security_groups(self.context,
+ group_id=[default['id']])
+ self.assertEqual(len(result['securityGroupInfo']), 1)
+ self.assertEqual(
+ result['securityGroupInfo'][0]['groupName'],
+ 'default')
+ db.security_group_destroy(self.context, sec['id'])
+
def test_create_delete_security_group(self):
descript = 'test description'
create = self.cloud.create_security_group
@@ -174,6 +195,56 @@ class CloudTestCase(test.TestCase):
delete = self.cloud.delete_security_group
self.assertTrue(delete(self.context, 'testgrp'))
+ def test_delete_security_group_by_id(self):
+ sec = db.security_group_create(self.context,
+ {'project_id': self.context.project_id,
+ 'name': 'test'})
+ delete = self.cloud.delete_security_group
+ notfound = exception.SecurityGroupNotFound
+ self.assertRaises(notfound, delete, self.context, 'badname')
+ self.assertRaises(notfound, delete, self.context, group_id=999)
+ self.assertRaises(exception.ApiError, delete, self.context)
+ self.assertTrue(delete(self.context, group_id=sec['id']))
+
+ def test_authorize_revoke_security_group_ingress(self):
+ sec = db.security_group_create(self.context,
+ {'project_id': self.context.project_id,
+ 'name': 'test'})
+ authz = self.cloud.authorize_security_group_ingress
+ self.assertRaises(exception.ApiError, authz, self.context, sec['name'])
+ kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
+ # ApiError: Not enough parameters, need group_name or group_id
+ self.assertRaises(exception.ApiError, authz, self.context, **kwargs)
+ authz(self.context, group_name=sec['name'], **kwargs)
+ # ApiError: This rule already exists in group test
+ self.assertRaises(exception.ApiError, authz, self.context,
+ group_name=sec['name'], **kwargs)
+ revoke = self.cloud.revoke_security_group_ingress
+ # ApiError: Not enough parameters, need group_name or group_id
+ self.assertRaises(exception.ApiError, revoke, self.context, **kwargs)
+ self.assertTrue(revoke(self.context, group_name=sec['name'], **kwargs))
+
+ def test_authorize_revoke_security_group_ingress_by_id(self):
+ sec = db.security_group_create(self.context,
+ {'project_id': self.context.project_id,
+ 'name': 'test'})
+ authz = self.cloud.authorize_security_group_ingress
+ kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
+ self.assertRaises(exception.ApiError, authz, self.context, sec['name'])
+ authz(self.context, group_id=sec['id'], **kwargs)
+ # ApiError: This rule already exists in group test
+ self.assertRaises(exception.ApiError, authz, self.context,
+ group_id=sec['id'], **kwargs)
+ revoke = self.cloud.revoke_security_group_ingress
+ self.assertTrue(revoke(self.context, group_id=sec['id'], **kwargs))
+
+ def test_describe_volumes(self):
+ """Makes sure describe_volumes works and filters results."""
+ vol1 = db.volume_create(self.context, {})
+ vol2 = db.volume_create(self.context, {})
+ result = self.cloud.describe_volumes(self.context)
+ self.assertEqual(len(result['volumeSet']), 2)
+ volume_id = ec2utils.id_to_ec2_id(vol2['id'], 'vol-%08x')
def test_describe_volumes(self):
"""Makes sure describe_volumes works and filters results."""
vol1 = db.volume_create(self.context, {})