summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVishvananda Ishaya <vishvananda@gmail.com>2010-07-20 09:15:36 -0500
committerVishvananda Ishaya <vishvananda@gmail.com>2010-07-20 09:15:36 -0500
commit0f9be756f44e831545bf5c31606e0419b61d6ddd (patch)
tree6bbe6ce0295cadd6d677404e88ce4d10537d22d6
parent8ef16679e06c227f782fffdd0640b426af19a5c6 (diff)
Test cleanup, make driver return dictionaries and construct objects in manager
-rw-r--r--nova/auth/ldapdriver.py45
-rw-r--r--nova/auth/manager.py105
-rw-r--r--nova/tests/api_unittest.py23
-rw-r--r--nova/tests/auth_unittest.py95
-rw-r--r--nova/tests/network_unittest.py79
5 files changed, 187 insertions, 160 deletions
diff --git a/nova/auth/ldapdriver.py b/nova/auth/ldapdriver.py
index 89c4defda..d330ae729 100644
--- a/nova/auth/ldapdriver.py
+++ b/nova/auth/ldapdriver.py
@@ -28,7 +28,6 @@ import logging
from nova import exception
from nova import flags
-from nova.auth import manager
try:
import ldap
@@ -322,7 +321,7 @@ class LdapDriver(object):
keys = self.get_key_pairs(uid)
if keys != None:
for key in keys:
- self.delete_key_pair(uid, key.name)
+ self.delete_key_pair(uid, key['name'])
def __role_to_dn(self, role, project_id=None):
"""Convert role to corresponding dn"""
@@ -438,38 +437,38 @@ class LdapDriver(object):
"""Convert ldap attributes to User object"""
if attr == None:
return None
- return manager.User(
- id = attr['uid'][0],
- name = attr['cn'][0],
- access = attr['accessKey'][0],
- secret = attr['secretKey'][0],
- admin = (attr['isAdmin'][0] == 'TRUE')
- )
+ return {
+ 'id': attr['uid'][0],
+ 'name': attr['cn'][0],
+ 'access': attr['accessKey'][0],
+ 'secret': attr['secretKey'][0],
+ 'admin': (attr['isAdmin'][0] == 'TRUE')
+ }
def __to_key_pair(self, owner, attr):
"""Convert ldap attributes to KeyPair object"""
if attr == None:
return None
- return manager.KeyPair(
- id = attr['cn'][0],
- name = attr['cn'][0],
- owner_id = owner,
- public_key = attr['sshPublicKey'][0],
- fingerprint = attr['keyFingerprint'][0],
- )
+ return {
+ 'id': attr['cn'][0],
+ 'name': attr['cn'][0],
+ 'owner_id': owner,
+ 'public_key': attr['sshPublicKey'][0],
+ 'fingerprint': attr['keyFingerprint'][0],
+ }
def __to_project(self, attr):
"""Convert ldap attributes to Project object"""
if attr == None:
return None
member_dns = attr.get('member', [])
- return manager.Project(
- id = attr['cn'][0],
- name = attr['cn'][0],
- project_manager_id = self.__dn_to_uid(attr['projectManager'][0]),
- description = attr.get('description', [None])[0],
- member_ids = [self.__dn_to_uid(x) for x in member_dns]
- )
+ return {
+ 'id': attr['cn'][0],
+ 'name': attr['cn'][0],
+ 'project_manager_id': self.__dn_to_uid(attr['projectManager'][0]),
+ 'description': attr.get('description', [None])[0],
+ 'member_ids': [self.__dn_to_uid(x) for x in member_dns]
+ }
def __dn_to_uid(self, dn):
"""Convert user dn to uid"""
diff --git a/nova/auth/manager.py b/nova/auth/manager.py
index 87cfd9a91..2facffe51 100644
--- a/nova/auth/manager.py
+++ b/nova/auth/manager.py
@@ -33,9 +33,9 @@ from nova import datastore
from nova import exception
from nova import flags
from nova import objectstore # for flags
-from nova import signer
from nova import utils
from nova.auth import ldapdriver
+from nova.auth import signer
FLAGS = flags.FLAGS
# NOTE(vish): a user with one of these roles will be a superuser and
@@ -187,6 +187,14 @@ class Project(AuthBase):
def project_manager(self):
return AuthManager().get_user(self.project_manager_id)
+ @property
+ def vpn_ip(self):
+ return AuthManager().get_project_vpn_ip(self)
+
+ @property
+ def vpn_port(self):
+ return AuthManager().get_project_vpn_port(self)
+
def has_manager(self, user):
return AuthManager().is_project_manager(user, self)
@@ -314,16 +322,6 @@ class AuthManager(object):
def __init__(self, *args, **kwargs):
self.driver_class = kwargs.get('driver_class', ldapdriver.LdapDriver)
- if FLAGS.fake_tests:
- try:
- self.create_user('fake', 'fake', 'fake')
- except: pass
- try:
- self.create_user('user', 'user', 'user')
- except: pass
- try:
- self.create_user('admin', 'admin', 'admin', True)
- except: pass
def authenticate(self, access, signature, params, verb='GET',
server_string='127.0.0.1:8773', path='/',
@@ -508,6 +506,21 @@ class AuthManager(object):
with self.driver_class() as drv:
drv.remove_role(User.safe_id(user), role, Project.safe_id(project))
+ def get_project(self, pid):
+ """Get project object by id"""
+ with self.driver_class() as drv:
+ project_dict = drv.get_project(pid)
+ if project_dict:
+ return Project(**project_dict)
+
+ def get_projects(self):
+ """Retrieves list of all projects"""
+ with self.driver_class() as drv:
+ project_list = drv.get_projects()
+ if not project_list:
+ return []
+ return [Project(**project_dict) for project_dict in project_list]
+
def create_project(self, name, manager_user,
description=None, member_users=None):
"""Create a project
@@ -532,26 +545,14 @@ class AuthManager(object):
"""
if member_users:
member_users = [User.safe_id(u) for u in member_users]
- # NOTE(vish): try to associate a vpn ip and port first because
- # if it throws an exception, we save having to
- # create and destroy a project
- Vpn.create(name)
with self.driver_class() as drv:
- return drv.create_project(name,
- User.safe_id(manager_user),
- description,
- member_users)
-
- def get_projects(self):
- """Retrieves list of all projects"""
- with self.driver_class() as drv:
- return drv.get_projects()
-
-
- def get_project(self, pid):
- """Get project object by id"""
- with self.driver_class() as drv:
- return drv.get_project(pid)
+ project_dict = drv.create_project(name,
+ User.safe_id(manager_user),
+ description,
+ member_users)
+ if project_dict:
+ Vpn.create(project_dict['id'])
+ return Project(**project_dict)
def add_to_project(self, user, project):
"""Add user to project"""
@@ -577,6 +578,12 @@ class AuthManager(object):
return drv.remove_from_project(User.safe_id(user),
Project.safe_id(project))
+ def get_project_vpn_ip(self, project):
+ return Vpn(Project.safe_id(project)).ip
+
+ def get_project_vpn_port(self, project):
+ return Vpn(Project.safe_id(project)).port
+
def delete_project(self, project):
"""Deletes a project"""
with self.driver_class() as drv:
@@ -585,17 +592,24 @@ class AuthManager(object):
def get_user(self, uid):
"""Retrieves a user by id"""
with self.driver_class() as drv:
- return drv.get_user(uid)
+ user_dict = drv.get_user(uid)
+ if user_dict:
+ return User(**user_dict)
def get_user_from_access_key(self, access_key):
"""Retrieves a user by access key"""
with self.driver_class() as drv:
- return drv.get_user_from_access_key(access_key)
+ user_dict = drv.get_user_from_access_key(access_key)
+ if user_dict:
+ return User(**user_dict)
def get_users(self):
"""Retrieves a list of all users"""
with self.driver_class() as drv:
- return drv.get_users()
+ user_list = drv.get_users()
+ if not user_list:
+ return []
+ return [User(**user_dict) for user_dict in user_list]
def create_user(self, name, access=None, secret=None, admin=False):
"""Creates a user
@@ -622,7 +636,9 @@ class AuthManager(object):
if access == None: access = str(uuid.uuid4())
if secret == None: secret = str(uuid.uuid4())
with self.driver_class() as drv:
- return drv.create_user(name, access, secret, admin)
+ user_dict = drv.create_user(name, access, secret, admin)
+ if user_dict:
+ return User(**user_dict)
def delete_user(self, user):
"""Deletes a user"""
@@ -660,18 +676,27 @@ class AuthManager(object):
def create_key_pair(self, user, key_name, public_key, fingerprint):
"""Creates a key pair for user"""
with self.driver_class() as drv:
- return drv.create_key_pair(User.safe_id(user), key_name,
- public_key, fingerprint)
+ kp_dict = drv.create_key_pair(User.safe_id(user),
+ key_name,
+ public_key,
+ fingerprint)
+ if kp_dict:
+ return KeyPair(**kp_dict)
def get_key_pair(self, user, key_name):
"""Retrieves a key pair for user"""
with self.driver_class() as drv:
- return drv.get_key_pair(User.safe_id(user), key_name)
+ kp_dict = drv.get_key_pair(User.safe_id(user), key_name)
+ if kp_dict:
+ return KeyPair(**kp_dict)
def get_key_pairs(self, user):
"""Retrieves all key pairs for user"""
with self.driver_class() as drv:
- return drv.get_key_pairs(User.safe_id(user))
+ kp_list = drv.get_key_pairs(User.safe_id(user))
+ if not kp_list:
+ return []
+ return [KeyPair(**kp_dict) for kp_dict in kp_list]
def delete_key_pair(self, user, key_name):
"""Deletes a key pair for user"""
@@ -686,7 +711,7 @@ class AuthManager(object):
project = user.id
pid = Project.safe_id(project)
rc = self.__generate_rc(user.access, user.secret, pid)
- private_key, signed_cert = self.__generate_x509_cert(user.id, pid)
+ private_key, signed_cert = self._generate_x509_cert(user.id, pid)
vpn = Vpn(pid)
configfile = open(FLAGS.vpn_client_template,"r")
@@ -726,7 +751,7 @@ class AuthManager(object):
}
return rc
- def __generate_x509_cert(self, uid, pid):
+ def _generate_x509_cert(self, uid, pid):
"""Generate x509 cert for user"""
(private_key, csr) = crypto.generate_x509_cert(
self.__cert_subject(uid))
diff --git a/nova/tests/api_unittest.py b/nova/tests/api_unittest.py
index 5c26192bd..4477a1fe6 100644
--- a/nova/tests/api_unittest.py
+++ b/nova/tests/api_unittest.py
@@ -150,7 +150,7 @@ class ApiEc2TestCase(test.BaseTestCase):
def setUp(self):
super(ApiEc2TestCase, self).setUp()
- self.users = manager.AuthManager()
+ self.manager = manager.AuthManager()
self.cloud = cloud.CloudController()
self.host = '127.0.0.1'
@@ -175,25 +175,22 @@ class ApiEc2TestCase(test.BaseTestCase):
def test_describe_instances(self):
self.expect_http()
self.mox.ReplayAll()
- try:
- self.users.create_user('fake', 'fake', 'fake')
- except Exception, _err:
- pass # User may already exist
+ user = self.manager.create_user('fake', 'fake', 'fake')
+ project = self.manager.create_project('fake', 'fake', 'fake')
self.assertEqual(self.ec2.get_all_instances(), [])
- self.users.delete_user('fake')
+ self.manager.delete_project(project)
+ self.manager.delete_user(user)
def test_get_all_key_pairs(self):
self.expect_http()
self.mox.ReplayAll()
keyname = "".join(random.choice("sdiuisudfsdcnpaqwertasd") for x in range(random.randint(4, 8)))
- try:
- self.users.create_user('fake', 'fake', 'fake')
- except Exception, _err:
- pass # User may already exist
- self.users.generate_key_pair('fake', keyname)
+ user = self.manager.create_user('fake', 'fake', 'fake')
+ project = self.manager.create_project('fake', 'fake', 'fake')
+ self.manager.generate_key_pair(user.id, keyname)
rv = self.ec2.get_all_key_pairs()
self.assertTrue(filter(lambda k: k.name == keyname, rv))
- self.users.delete_user('fake')
-
+ self.manager.delete_project(project)
+ self.manager.delete_user(user)
diff --git a/nova/tests/auth_unittest.py b/nova/tests/auth_unittest.py
index 000f6bf17..0cd377b70 100644
--- a/nova/tests/auth_unittest.py
+++ b/nova/tests/auth_unittest.py
@@ -37,29 +37,29 @@ class AuthTestCase(test.BaseTestCase):
super(AuthTestCase, self).setUp()
self.flags(fake_libvirt=True,
fake_storage=True)
- self.users = manager.AuthManager()
+ self.manager = manager.AuthManager()
def test_001_can_create_users(self):
- self.users.create_user('test1', 'access', 'secret')
- self.users.create_user('test2')
+ self.manager.create_user('test1', 'access', 'secret')
+ self.manager.create_user('test2')
def test_002_can_get_user(self):
- user = self.users.get_user('test1')
+ user = self.manager.get_user('test1')
def test_003_can_retreive_properties(self):
- user = self.users.get_user('test1')
+ user = self.manager.get_user('test1')
self.assertEqual('test1', user.id)
self.assertEqual('access', user.access)
self.assertEqual('secret', user.secret)
def test_004_signature_is_valid(self):
- #self.assertTrue(self.users.authenticate( **boto.generate_url ... ? ? ? ))
+ #self.assertTrue(self.manager.authenticate( **boto.generate_url ... ? ? ? ))
pass
#raise NotImplementedError
def test_005_can_get_credentials(self):
return
- credentials = self.users.get_user('test1').get_credentials()
+ credentials = self.manager.get_user('test1').get_credentials()
self.assertEqual(credentials,
'export EC2_ACCESS_KEY="access"\n' +
'export EC2_SECRET_KEY="secret"\n' +
@@ -68,14 +68,14 @@ class AuthTestCase(test.BaseTestCase):
'export EC2_USER_ID="test1"\n')
def test_006_test_key_storage(self):
- user = self.users.get_user('test1')
+ user = self.manager.get_user('test1')
user.create_key_pair('public', 'key', 'fingerprint')
key = user.get_key_pair('public')
self.assertEqual('key', key.public_key)
self.assertEqual('fingerprint', key.fingerprint)
def test_007_test_key_generation(self):
- user = self.users.get_user('test1')
+ user = self.manager.get_user('test1')
private_key, fingerprint = user.generate_key_pair('public2')
key = RSA.load_key_string(private_key, callback=lambda: None)
bio = BIO.MemoryBuffer()
@@ -87,71 +87,71 @@ class AuthTestCase(test.BaseTestCase):
converted.split(" ")[1].strip())
def test_008_can_list_key_pairs(self):
- keys = self.users.get_user('test1').get_key_pairs()
+ keys = self.manager.get_user('test1').get_key_pairs()
self.assertTrue(filter(lambda k: k.name == 'public', keys))
self.assertTrue(filter(lambda k: k.name == 'public2', keys))
def test_009_can_delete_key_pair(self):
- self.users.get_user('test1').delete_key_pair('public')
- keys = self.users.get_user('test1').get_key_pairs()
+ self.manager.get_user('test1').delete_key_pair('public')
+ keys = self.manager.get_user('test1').get_key_pairs()
self.assertFalse(filter(lambda k: k.name == 'public', keys))
def test_010_can_list_users(self):
- users = self.users.get_users()
+ users = self.manager.get_users()
logging.warn(users)
self.assertTrue(filter(lambda u: u.id == 'test1', users))
def test_101_can_add_user_role(self):
- self.assertFalse(self.users.has_role('test1', 'itsec'))
- self.users.add_role('test1', 'itsec')
- self.assertTrue(self.users.has_role('test1', 'itsec'))
+ self.assertFalse(self.manager.has_role('test1', 'itsec'))
+ self.manager.add_role('test1', 'itsec')
+ self.assertTrue(self.manager.has_role('test1', 'itsec'))
def test_199_can_remove_user_role(self):
- self.assertTrue(self.users.has_role('test1', 'itsec'))
- self.users.remove_role('test1', 'itsec')
- self.assertFalse(self.users.has_role('test1', 'itsec'))
+ self.assertTrue(self.manager.has_role('test1', 'itsec'))
+ self.manager.remove_role('test1', 'itsec')
+ self.assertFalse(self.manager.has_role('test1', 'itsec'))
def test_201_can_create_project(self):
- project = self.users.create_project('testproj', 'test1', 'A test project', ['test1'])
- self.assertTrue(filter(lambda p: p.name == 'testproj', self.users.get_projects()))
+ project = self.manager.create_project('testproj', 'test1', 'A test project', ['test1'])
+ self.assertTrue(filter(lambda p: p.name == 'testproj', self.manager.get_projects()))
self.assertEqual(project.name, 'testproj')
self.assertEqual(project.description, 'A test project')
self.assertEqual(project.project_manager_id, 'test1')
self.assertTrue(project.has_member('test1'))
def test_202_user1_is_project_member(self):
- self.assertTrue(self.users.get_user('test1').is_project_member('testproj'))
+ self.assertTrue(self.manager.get_user('test1').is_project_member('testproj'))
def test_203_user2_is_not_project_member(self):
- self.assertFalse(self.users.get_user('test2').is_project_member('testproj'))
+ self.assertFalse(self.manager.get_user('test2').is_project_member('testproj'))
def test_204_user1_is_project_manager(self):
- self.assertTrue(self.users.get_user('test1').is_project_manager('testproj'))
+ self.assertTrue(self.manager.get_user('test1').is_project_manager('testproj'))
def test_205_user2_is_not_project_manager(self):
- self.assertFalse(self.users.get_user('test2').is_project_manager('testproj'))
+ self.assertFalse(self.manager.get_user('test2').is_project_manager('testproj'))
def test_206_can_add_user_to_project(self):
- self.users.add_to_project('test2', 'testproj')
- self.assertTrue(self.users.get_project('testproj').has_member('test2'))
+ self.manager.add_to_project('test2', 'testproj')
+ self.assertTrue(self.manager.get_project('testproj').has_member('test2'))
def test_208_can_remove_user_from_project(self):
- self.users.remove_from_project('test2', 'testproj')
- self.assertFalse(self.users.get_project('testproj').has_member('test2'))
+ self.manager.remove_from_project('test2', 'testproj')
+ self.assertFalse(self.manager.get_project('testproj').has_member('test2'))
def test_209_can_generate_x509(self):
# MUST HAVE RUN CLOUD SETUP BY NOW
self.cloud = cloud.CloudController()
self.cloud.setup()
- private_key, signed_cert_string = self.users.get_project('testproj').generate_x509_cert('test1')
- logging.debug(signed_cert_string)
+ _key, cert_str = self.manager._generate_x509_cert('test1', 'testproj')
+ logging.debug(cert_str)
# Need to verify that it's signed by the right intermediate CA
full_chain = crypto.fetch_ca(project_id='testproj', chain=True)
int_cert = crypto.fetch_ca(project_id='testproj', chain=False)
cloud_cert = crypto.fetch_ca()
logging.debug("CA chain:\n\n =====\n%s\n\n=====" % full_chain)
- signed_cert = X509.load_cert_string(signed_cert_string)
+ signed_cert = X509.load_cert_string(cert_str)
chain_cert = X509.load_cert_string(full_chain)
int_cert = X509.load_cert_string(int_cert)
cloud_cert = X509.load_cert_string(cloud_cert)
@@ -164,42 +164,45 @@ class AuthTestCase(test.BaseTestCase):
self.assertFalse(signed_cert.verify(cloud_cert.get_pubkey()))
def test_210_can_add_project_role(self):
- project = self.users.get_project('testproj')
+ project = self.manager.get_project('testproj')
self.assertFalse(project.has_role('test1', 'sysadmin'))
- self.users.add_role('test1', 'sysadmin')
+ self.manager.add_role('test1', 'sysadmin')
self.assertFalse(project.has_role('test1', 'sysadmin'))
project.add_role('test1', 'sysadmin')
self.assertTrue(project.has_role('test1', 'sysadmin'))
def test_211_can_remove_project_role(self):
- project = self.users.get_project('testproj')
+ project = self.manager.get_project('testproj')
self.assertTrue(project.has_role('test1', 'sysadmin'))
project.remove_role('test1', 'sysadmin')
self.assertFalse(project.has_role('test1', 'sysadmin'))
- self.users.remove_role('test1', 'sysadmin')
+ self.manager.remove_role('test1', 'sysadmin')
self.assertFalse(project.has_role('test1', 'sysadmin'))
def test_212_vpn_ip_and_port_looks_valid(self):
- project = self.users.get_project('testproj')
+ project = self.manager.get_project('testproj')
self.assert_(project.vpn_ip)
self.assert_(project.vpn_port >= FLAGS.vpn_start_port)
self.assert_(project.vpn_port <= FLAGS.vpn_end_port)
def test_213_too_many_vpns(self):
- for i in xrange(users.Vpn.num_ports_for_ip(FLAGS.vpn_ip)):
- users.Vpn.create("vpnuser%s" % i)
- self.assertRaises(users.NoMorePorts, users.Vpn.create, "boom")
+ vpns = []
+ for i in xrange(manager.Vpn.num_ports_for_ip(FLAGS.vpn_ip)):
+ vpns.append(manager.Vpn.create("vpnuser%s" % i))
+ self.assertRaises(manager.NoMorePorts, manager.Vpn.create, "boom")
+ for vpn in vpns:
+ vpn.destroy()
def test_299_can_delete_project(self):
- self.users.delete_project('testproj')
- self.assertFalse(filter(lambda p: p.name == 'testproj', self.users.get_projects()))
+ self.manager.delete_project('testproj')
+ self.assertFalse(filter(lambda p: p.name == 'testproj', self.manager.get_projects()))
def test_999_can_delete_users(self):
- self.users.delete_user('test1')
- users = self.users.get_users()
+ self.manager.delete_user('test1')
+ users = self.manager.get_users()
self.assertFalse(filter(lambda u: u.id == 'test1', users))
- self.users.delete_user('test2')
- self.assertEqual(self.users.get_user('test2'), None)
+ self.manager.delete_user('test2')
+ self.assertEqual(self.manager.get_user('test2'), None)
if __name__ == "__main__":
diff --git a/nova/tests/network_unittest.py b/nova/tests/network_unittest.py
index 68cd488be..0e1b55065 100644
--- a/nova/tests/network_unittest.py
+++ b/nova/tests/network_unittest.py
@@ -39,59 +39,61 @@ class NetworkTestCase(test.TrialTestCase):
logging.getLogger().setLevel(logging.DEBUG)
self.manager = manager.AuthManager()
self.dnsmasq = FakeDNSMasq()
- try:
- self.manager.create_user('netuser', 'netuser', 'netuser')
- except: pass
+ self.user = self.manager.create_user('netuser', 'netuser', 'netuser')
+ self.projects = []
+ self.projects.append(self.manager.create_project('netuser',
+ 'netuser',
+ 'netuser'))
for i in range(0, 6):
name = 'project%s' % i
- if not self.manager.get_project(name):
- self.manager.create_project(name, 'netuser', name)
+ self.projects.append(self.manager.create_project(name,
+ 'netuser',
+ name))
self.network = network.PublicNetworkController()
def tearDown(self):
super(NetworkTestCase, self).tearDown()
- for i in range(0, 6):
- name = 'project%s' % i
- self.manager.delete_project(name)
- self.manager.delete_user('netuser')
+ for project in self.projects:
+ self.manager.delete_project(project)
+ self.manager.delete_user(self.user)
def test_public_network_allocation(self):
pubnet = IPy.IP(flags.FLAGS.public_range)
- address = self.network.allocate_ip("netuser", "project0", "public")
+ address = self.network.allocate_ip(self.user.id, self.projects[0].id, "public")
self.assertTrue(IPy.IP(address) in pubnet)
self.assertTrue(IPy.IP(address) in self.network.network)
def test_allocate_deallocate_ip(self):
address = network.allocate_ip(
- "netuser", "project0", utils.generate_mac())
+ self.user.id, self.projects[0].id, utils.generate_mac())
logging.debug("Was allocated %s" % (address))
- net = network.get_project_network("project0", "default")
- self.assertEqual(True, is_in_project(address, "project0"))
+ net = network.get_project_network(self.projects[0].id, "default")
+ self.assertEqual(True, is_in_project(address, self.projects[0].id))
mac = utils.generate_mac()
hostname = "test-host"
self.dnsmasq.issue_ip(mac, address, hostname, net.bridge_name)
rv = network.deallocate_ip(address)
# Doesn't go away until it's dhcp released
- self.assertEqual(True, is_in_project(address, "project0"))
+ self.assertEqual(True, is_in_project(address, self.projects[0].id))
self.dnsmasq.release_ip(mac, address, hostname, net.bridge_name)
- self.assertEqual(False, is_in_project(address, "project0"))
+ self.assertEqual(False, is_in_project(address, self.projects[0].id))
def test_range_allocation(self):
mac = utils.generate_mac()
secondmac = utils.generate_mac()
hostname = "test-host"
address = network.allocate_ip(
- "netuser", "project0", mac)
+ self.user.id, self.projects[0].id, mac)
secondaddress = network.allocate_ip(
- "netuser", "project1", secondmac)
- net = network.get_project_network("project0", "default")
- secondnet = network.get_project_network("project1", "default")
+ self.user, "project1", secondmac)
+ net = network.get_project_network(self.projects[0].id, "default")
+ secondnet = network.get_project_network(self.projects[1].id, "default")
- self.assertEqual(True, is_in_project(address, "project0"))
- self.assertEqual(True, is_in_project(secondaddress, "project1"))
- self.assertEqual(False, is_in_project(address, "project1"))
+ self.assertEqual(True, is_in_project(address, self.projects[0].id))
+ self.assertEqual(True, is_in_project(secondaddress, self.projects[1].id))
+ self.assertEqual(False, is_in_project(address, self.projects[1].id))
# Addresses are allocated before they're issued
self.dnsmasq.issue_ip(mac, address, hostname, net.bridge_name)
@@ -100,34 +102,34 @@ class NetworkTestCase(test.TrialTestCase):
rv = network.deallocate_ip(address)
self.dnsmasq.release_ip(mac, address, hostname, net.bridge_name)
- self.assertEqual(False, is_in_project(address, "project0"))
+ self.assertEqual(False, is_in_project(address, self.projects[0].id))
# First address release shouldn't affect the second
- self.assertEqual(True, is_in_project(secondaddress, "project1"))
+ self.assertEqual(True, is_in_project(secondaddress, self.projects[1].id))
rv = network.deallocate_ip(secondaddress)
self.dnsmasq.release_ip(secondmac, secondaddress,
hostname, secondnet.bridge_name)
- self.assertEqual(False, is_in_project(secondaddress, "project1"))
+ self.assertEqual(False, is_in_project(secondaddress, self.projects[1].id))
def test_subnet_edge(self):
- secondaddress = network.allocate_ip("netuser", "project0",
+ secondaddress = network.allocate_ip(self.user.id, self.projects[0].id,
utils.generate_mac())
hostname = "toomany-hosts"
- for project in range(1,5):
- project_id = "project%s" % (project)
+ for i in range(1,5):
+ project_id = self.projects[i].id
mac = utils.generate_mac()
mac2 = utils.generate_mac()
mac3 = utils.generate_mac()
address = network.allocate_ip(
- "netuser", project_id, mac)
+ self.user, project_id, mac)
address2 = network.allocate_ip(
- "netuser", project_id, mac2)
+ self.user, project_id, mac2)
address3 = network.allocate_ip(
- "netuser", project_id, mac3)
- self.assertEqual(False, is_in_project(address, "project0"))
- self.assertEqual(False, is_in_project(address2, "project0"))
- self.assertEqual(False, is_in_project(address3, "project0"))
+ self.user, project_id, mac3)
+ self.assertEqual(False, is_in_project(address, self.projects[0].id))
+ self.assertEqual(False, is_in_project(address2, self.projects[0].id))
+ self.assertEqual(False, is_in_project(address3, self.projects[0].id))
rv = network.deallocate_ip(address)
rv = network.deallocate_ip(address2)
rv = network.deallocate_ip(address3)
@@ -135,7 +137,7 @@ class NetworkTestCase(test.TrialTestCase):
self.dnsmasq.release_ip(mac, address, hostname, net.bridge_name)
self.dnsmasq.release_ip(mac2, address2, hostname, net.bridge_name)
self.dnsmasq.release_ip(mac3, address3, hostname, net.bridge_name)
- net = network.get_project_network("project0", "default")
+ net = network.get_project_network(self.projects[0].id, "default")
rv = network.deallocate_ip(secondaddress)
self.dnsmasq.release_ip(mac, address, hostname, net.bridge_name)
@@ -150,22 +152,23 @@ class NetworkTestCase(test.TrialTestCase):
Network size is 32, there are 5 addresses reserved for VPN.
So we should get 23 usable addresses
"""
- net = network.get_project_network("project0", "default")
+ net = network.get_project_network(self.projects[0].id, "default")
hostname = "toomany-hosts"
macs = {}
addresses = {}
for i in range(0, 22):
macs[i] = utils.generate_mac()
- addresses[i] = network.allocate_ip("netuser", "project0", macs[i])
+ addresses[i] = network.allocate_ip(self.user.id, self.projects[0].id, macs[i])
self.dnsmasq.issue_ip(macs[i], addresses[i], hostname, net.bridge_name)
- self.assertRaises(NoMoreAddresses, network.allocate_ip, "netuser", "project0", utils.generate_mac())
+ self.assertRaises(NoMoreAddresses, network.allocate_ip, self.user.id, self.projects[0].id, utils.generate_mac())
for i in range(0, 22):
rv = network.deallocate_ip(addresses[i])
self.dnsmasq.release_ip(macs[i], addresses[i], hostname, net.bridge_name)
def is_in_project(address, project_id):
+ print address, list(network.get_project_network(project_id).list_addresses())
return address in network.get_project_network(project_id).list_addresses()
def _get_project_addresses(project_id):