summaryrefslogtreecommitdiffstats
path: root/nova/tests
diff options
context:
space:
mode:
authorYaguang Tang <heut2008@gmail.com>2012-09-23 12:33:18 +0800
committerYaguang Tang <heut2008@gmail.com>2012-09-23 16:47:26 +0800
commit821271113df9cfc2be6b24cd5574fbe08b4cd815 (patch)
treecafaef6899deac0519c4085d87f36ef7eb589573 /nova/tests
parent338fae4f2be127f65491860fc46dc7436a5610be (diff)
Move fakeldap.py from auth dir to tests.
Also removes the auth dir from nova. fix bug lp:1054848 Change-Id: I9c2710c9789ff8413bc80044eba94f3a851d8544
Diffstat (limited to 'nova/tests')
-rw-r--r--nova/tests/fake_ldap.py328
-rw-r--r--nova/tests/network/test_manager.py20
2 files changed, 344 insertions, 4 deletions
diff --git a/nova/tests/fake_ldap.py b/nova/tests/fake_ldap.py
new file mode 100644
index 000000000..b3fab03ab
--- /dev/null
+++ b/nova/tests/fake_ldap.py
@@ -0,0 +1,328 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""Fake LDAP server for test harness.
+
+This class does very little error checking, and knows nothing about ldap
+class definitions. It implements the minimum emulation of the python ldap
+library to work with nova.
+
+"""
+
+import fnmatch
+
+from nova.openstack.common import jsonutils
+
+
+class Store(object):
+ def __init__(self):
+ if hasattr(self.__class__, '_instance'):
+ raise Exception(_('Attempted to instantiate singleton'))
+
+ @classmethod
+ def instance(cls):
+ if not hasattr(cls, '_instance'):
+ cls._instance = _StorageDict()
+ return cls._instance
+
+
+class _StorageDict(dict):
+ def keys(self, pat=None):
+ ret = super(_StorageDict, self).keys()
+ if pat is not None:
+ ret = fnmatch.filter(ret, pat)
+ return ret
+
+ def delete(self, key):
+ try:
+ del self[key]
+ except KeyError:
+ pass
+
+ def flushdb(self):
+ self.clear()
+
+ def hgetall(self, key):
+ """Returns the hash for the given key; creates
+ the hash if the key doesn't exist."""
+ try:
+ return self[key]
+ except KeyError:
+ self[key] = {}
+ return self[key]
+
+ def hget(self, key, field):
+ hashdict = self.hgetall(key)
+ try:
+ return hashdict[field]
+ except KeyError:
+ hashdict[field] = {}
+ return hashdict[field]
+
+ def hset(self, key, field, val):
+ hashdict = self.hgetall(key)
+ hashdict[field] = val
+
+ def hmset(self, key, value_dict):
+ hashdict = self.hgetall(key)
+ for field, val in value_dict.items():
+ hashdict[field] = val
+
+
+SCOPE_BASE = 0
+SCOPE_ONELEVEL = 1 # Not implemented
+SCOPE_SUBTREE = 2
+MOD_ADD = 0
+MOD_DELETE = 1
+MOD_REPLACE = 2
+
+
+class NO_SUCH_OBJECT(Exception): # pylint: disable=C0103
+ """Duplicate exception class from real LDAP module."""
+ pass
+
+
+class OBJECT_CLASS_VIOLATION(Exception): # pylint: disable=C0103
+ """Duplicate exception class from real LDAP module."""
+ pass
+
+
+class SERVER_DOWN(Exception): # pylint: disable=C0103
+ """Duplicate exception class from real LDAP module."""
+ pass
+
+
+def initialize(_uri):
+ """Opens a fake connection with an LDAP server."""
+ return FakeLDAP()
+
+
+def _match_query(query, attrs):
+ """Match an ldap query to an attribute dictionary.
+
+ The characters &, |, and ! are supported in the query. No syntax checking
+ is performed, so malformed queries will not work correctly.
+ """
+ # cut off the parentheses
+ inner = query[1:-1]
+ if inner.startswith('&'):
+ # cut off the &
+ l, r = _paren_groups(inner[1:])
+ return _match_query(l, attrs) and _match_query(r, attrs)
+ if inner.startswith('|'):
+ # cut off the |
+ l, r = _paren_groups(inner[1:])
+ return _match_query(l, attrs) or _match_query(r, attrs)
+ if inner.startswith('!'):
+ # cut off the ! and the nested parentheses
+ return not _match_query(query[2:-1], attrs)
+
+ (k, _sep, v) = inner.partition('=')
+ return _match(k, v, attrs)
+
+
+def _paren_groups(source):
+ """Split a string into parenthesized groups."""
+ count = 0
+ start = 0
+ result = []
+ for pos in xrange(len(source)):
+ if source[pos] == '(':
+ if count == 0:
+ start = pos
+ count += 1
+ if source[pos] == ')':
+ count -= 1
+ if count == 0:
+ result.append(source[start:pos + 1])
+ return result
+
+
+def _match(key, value, attrs):
+ """Match a given key and value against an attribute list."""
+ if key not in attrs:
+ return False
+ # This is a wild card search. Implemented as all or nothing for now.
+ if value == "*":
+ return True
+ if key != "objectclass":
+ return value in attrs[key]
+ # it is an objectclass check, so check subclasses
+ values = _subs(value)
+ for v in values:
+ if v in attrs[key]:
+ return True
+ return False
+
+
+def _subs(value):
+ """Returns a list of subclass strings.
+
+ The strings represent the ldap object class plus any subclasses that
+ inherit from it. Fakeldap doesn't know about the ldap object structure,
+ so subclasses need to be defined manually in the dictionary below.
+
+ """
+ subs = {'groupOfNames': ['novaProject']}
+ if value in subs:
+ return [value] + subs[value]
+ return [value]
+
+
+def _from_json(encoded):
+ """Convert attribute values from json representation.
+
+ Args:
+ encoded -- a json encoded string
+
+ Returns a list of strings
+
+ """
+ return [str(x) for x in jsonutils.loads(encoded)]
+
+
+def _to_json(unencoded):
+ """Convert attribute values into json representation.
+
+ Args:
+ unencoded -- an unencoded string or list of strings. If it
+ is a single string, it will be converted into a list.
+
+ Returns a json string
+
+ """
+ return jsonutils.dumps(list(unencoded))
+
+
+server_fail = False
+
+
+class FakeLDAP(object):
+ """Fake LDAP connection."""
+
+ def simple_bind_s(self, dn, password):
+ """This method is ignored, but provided for compatibility."""
+ if server_fail:
+ raise SERVER_DOWN
+ pass
+
+ def unbind_s(self):
+ """This method is ignored, but provided for compatibility."""
+ if server_fail:
+ raise SERVER_DOWN
+ pass
+
+ def add_s(self, dn, attr):
+ """Add an object with the specified attributes at dn."""
+ if server_fail:
+ raise SERVER_DOWN
+
+ key = "%s%s" % (self.__prefix, dn)
+ value_dict = dict([(k, _to_json(v)) for k, v in attr])
+ Store.instance().hmset(key, value_dict)
+
+ def delete_s(self, dn):
+ """Remove the ldap object at specified dn."""
+ if server_fail:
+ raise SERVER_DOWN
+
+ Store.instance().delete("%s%s" % (self.__prefix, dn))
+
+ def modify_s(self, dn, attrs):
+ """Modify the object at dn using the attribute list.
+
+ :param dn: a dn
+ :param attrs: a list of tuples in the following form::
+
+ ([MOD_ADD | MOD_DELETE | MOD_REPACE], attribute, value)
+
+ """
+ if server_fail:
+ raise SERVER_DOWN
+
+ store = Store.instance()
+ key = "%s%s" % (self.__prefix, dn)
+
+ for cmd, k, v in attrs:
+ values = _from_json(store.hget(key, k))
+ if cmd == MOD_ADD:
+ values.append(v)
+ elif cmd == MOD_REPLACE:
+ values = [v]
+ else:
+ values.remove(v)
+ values = store.hset(key, k, _to_json(values))
+
+ def modrdn_s(self, dn, newrdn):
+ oldobj = self.search_s(dn, SCOPE_BASE)
+ if not oldobj:
+ raise NO_SUCH_OBJECT()
+ newdn = "%s,%s" % (newrdn, dn.partition(',')[2])
+ newattrs = oldobj[0][1]
+
+ modlist = []
+ for attrtype in newattrs.keys():
+ modlist.append((attrtype, newattrs[attrtype]))
+
+ self.add_s(newdn, modlist)
+ self.delete_s(dn)
+
+ def search_s(self, dn, scope, query=None, fields=None):
+ """Search for all matching objects under dn using the query.
+
+ Args:
+ dn -- dn to search under
+ scope -- only SCOPE_BASE and SCOPE_SUBTREE are supported
+ query -- query to filter objects by
+ fields -- fields to return. Returns all fields if not specified
+
+ """
+ if server_fail:
+ raise SERVER_DOWN
+
+ if scope != SCOPE_BASE and scope != SCOPE_SUBTREE:
+ raise NotImplementedError(str(scope))
+ store = Store.instance()
+ if scope == SCOPE_BASE:
+ pattern = "%s%s" % (self.__prefix, dn)
+ keys = store.keys(pattern)
+ else:
+ keys = store.keys("%s*%s" % (self.__prefix, dn))
+
+ if not keys:
+ raise NO_SUCH_OBJECT()
+
+ objects = []
+ for key in keys:
+ # get the attributes from the store
+ attrs = store.hgetall(key)
+ # turn the values from the store into lists
+ # pylint: disable=E1103
+ attrs = dict([(k, _from_json(v))
+ for k, v in attrs.iteritems()])
+ # filter the objects by query
+ if not query or _match_query(query, attrs):
+ # filter the attributes by fields
+ attrs = dict([(k, v) for k, v in attrs.iteritems()
+ if not fields or k in fields])
+ objects.append((key[len(self.__prefix):], attrs))
+ return objects
+
+ @property
+ def __prefix(self): # pylint: disable=R0201
+ """Get the prefix to use for all keys."""
+ return 'ldap:'
diff --git a/nova/tests/network/test_manager.py b/nova/tests/network/test_manager.py
index 8ef37fe95..2f597547c 100644
--- a/nova/tests/network/test_manager.py
+++ b/nova/tests/network/test_manager.py
@@ -23,6 +23,7 @@ import tempfile
from nova import context
from nova import db
from nova import exception
+from nova.network import ldapdns
from nova.network import linux_net
from nova.network import manager as network_manager
from nova.openstack.common import importutils
@@ -30,6 +31,7 @@ from nova.openstack.common import log as logging
from nova.openstack.common import rpc
import nova.policy
from nova import test
+from nova.tests import fake_ldap
from nova.tests import fake_network
from nova import utils
@@ -1827,17 +1829,27 @@ domain1 = "example.org"
domain2 = "example.com"
+class FakeLdapDNS(ldapdns.LdapDNS):
+ """For testing purposes, a DNS driver backed with a fake ldap driver."""
+ def __init__(self):
+ self.lobj = fake_ldap.FakeLDAP()
+ attrs = {'objectClass': ['domainrelatedobject', 'dnsdomain',
+ 'domain', 'dcobject', 'top'],
+ 'associateddomain': ['root'],
+ 'dc': ['root']}
+ self.lobj.add_s("ou=hosts,dc=example,dc=org",
+ ldapdns.create_modlist(attrs))
+
+
class LdapDNSTestCase(test.TestCase):
"""Tests nova.network.ldapdns.LdapDNS"""
def setUp(self):
super(LdapDNSTestCase, self).setUp()
self.saved_ldap = sys.modules.get('ldap')
- import nova.auth.fakeldap
- sys.modules['ldap'] = nova.auth.fakeldap
+ sys.modules['ldap'] = fake_ldap
- temp = importutils.import_object('nova.network.ldapdns.FakeLdapDNS')
- self.driver = temp
+ self.driver = FakeLdapDNS()
self.driver.create_domain(domain1)
self.driver.create_domain(domain2)