summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRob Crittenden <rcritten@redhat.com>2012-06-06 22:54:16 -0400
committerMartin Kosek <mkosek@redhat.com>2012-06-14 14:02:26 +0200
commit54135ecd9a96f59429cfd535f3add282b535d3e3 (patch)
treeff1fc78a7ca3f705844cdc1c39a2253426b675df
parent0c96f5935607e3825ed76330d3375dec9689c8ba (diff)
downloadfreeipa-54135ecd9a96f59429cfd535f3add282b535d3e3.zip
freeipa-54135ecd9a96f59429cfd535f3add282b535d3e3.tar.gz
freeipa-54135ecd9a96f59429cfd535f3add282b535d3e3.tar.xz
Store session cookie in ccache for cli users
Try to use the URI /ipa/session/xml if there is a key in the kernel keyring. If there is no cookie or it turns out to be invalid (expired, whatever) then use the standard URI /ipa/xml. This in turn will create a session that the user can then use later. https://fedorahosted.org/freeipa/ticket/2331
-rw-r--r--freeipa.spec.in1
-rw-r--r--install/conf/ipa.conf10
-rw-r--r--ipalib/rpc.py85
-rw-r--r--ipapython/kernel_keyring.py102
-rw-r--r--ipaserver/plugins/xmlserver.py3
-rw-r--r--ipaserver/rpcserver.py230
-rw-r--r--tests/test_ipapython/test_keyring.py147
7 files changed, 499 insertions, 79 deletions
diff --git a/freeipa.spec.in b/freeipa.spec.in
index ce8e1e6..64abcc9 100644
--- a/freeipa.spec.in
+++ b/freeipa.spec.in
@@ -154,6 +154,7 @@ Requires(preun): python initscripts chkconfig
Requires(postun): python initscripts chkconfig
%endif
Requires: python-dns
+Requires: keyutils
# We have a soft-requires on bind. It is an optional part of
# IPA but if it is configured we need a way to require versions
diff --git a/install/conf/ipa.conf b/install/conf/ipa.conf
index b52d9d2..b01a0c2 100644
--- a/install/conf/ipa.conf
+++ b/install/conf/ipa.conf
@@ -1,5 +1,7 @@
#
-# VERSION 5 - DO NOT REMOVE THIS LINE
+# VERSION 6 - DO NOT REMOVE THIS LINE
+#
+# This file may be overwritten on upgrades.
#
# LoadModule auth_kerb_module modules/mod_auth_kerb.so
@@ -66,6 +68,12 @@ KrbConstrainedDelegationLock ipa
Allow from all
</Location>
+<Location "/ipa/session/xml">
+ Satisfy Any
+ Order Deny,Allow
+ Allow from all
+</Location>
+
<Location "/ipa/session/login_password">
Satisfy Any
Order Deny,Allow
diff --git a/ipalib/rpc.py b/ipalib/rpc.py
index bd18b6b..6518cb2 100644
--- a/ipalib/rpc.py
+++ b/ipalib/rpc.py
@@ -47,6 +47,7 @@ from ipalib.errors import public_errors, PublicError, UnknownError, NetworkError
from ipalib import errors
from ipalib.request import context, Connection
from ipapython import ipautil
+from ipapython import kernel_keyring
import httplib
import socket
@@ -257,6 +258,13 @@ class SSLTransport(LanguageAwareTransport):
conn.connect()
return conn
+ def parse_response(self, response):
+ session_cookie = response.getheader('Set-Cookie')
+ if session_cookie:
+ kernel_keyring.update_key('ipa_session_cookie', session_cookie)
+ return LanguageAwareTransport.parse_response(self, response)
+
+
class KerbTransport(SSLTransport):
"""
Handles Kerberos Negotiation authentication to an XML-RPC server.
@@ -281,8 +289,20 @@ class KerbTransport(SSLTransport):
raise errors.KerberosError(major=major, minor=minor)
def get_host_info(self, host):
+ """
+ Two things can happen here. If we have a session we will add
+ a cookie for that. If not we will set an Authorization header.
+ """
(host, extra_headers, x509) = SSLTransport.get_host_info(self, host)
+ if not isinstance(extra_headers, list):
+ extra_headers = []
+
+ session_data = getattr(context, 'session_data', None)
+ if session_data:
+ extra_headers.append(('Cookie', session_data))
+ return (host, extra_headers, x509)
+
# Set the remote host principal
service = "HTTP@" + host.split(':')[0]
@@ -296,9 +316,6 @@ class KerbTransport(SSLTransport):
except kerberos.GSSError, e:
self._handle_exception(e, service=service)
- if not isinstance(extra_headers, list):
- extra_headers = []
-
for (h, v) in extra_headers:
if h == 'Authorization':
extra_headers.remove((h, v))
@@ -345,12 +362,12 @@ class xmlclient(Connectible):
server = '%s://%s%s' % (scheme, ipautil.format_netloc(self.conn._ServerProxy__host), self.conn._ServerProxy__handler)
return server
- def get_url_list(self):
+ def get_url_list(self, xmlrpc_uri):
"""
Create a list of urls consisting of the available IPA servers.
"""
# the configured URL defines what we use for the discovered servers
- (scheme, netloc, path, params, query, fragment) = urlparse.urlparse(self.env.xmlrpc_uri)
+ (scheme, netloc, path, params, query, fragment) = urlparse.urlparse(xmlrpc_uri)
servers = []
name = '_ldap._tcp.%s.' % self.env.domain
@@ -366,7 +383,7 @@ class xmlclient(Connectible):
servers = list(set(servers))
# the list/set conversion won't preserve order so stick in the
# local config file version here.
- cfg_server = self.env.xmlrpc_uri
+ cfg_server = xmlrpc_uri
if cfg_server in servers:
# make sure the configured master server is there just once and
# it is the first one
@@ -379,7 +396,22 @@ class xmlclient(Connectible):
def create_connection(self, ccache=None, verbose=False, fallback=True,
delegate=False):
- servers = self.get_url_list()
+ try:
+ session = False
+ session_data = None
+ xmlrpc_uri = self.env.xmlrpc_uri
+ # We have a session cookie, try using the session URI to see if it
+ # is still valid
+ if not delegate:
+ session_data = kernel_keyring.read_key('ipa_session_cookie')
+ setattr(context, 'session_data', session_data)
+ (scheme, netloc, path, params, query, fragment) = urlparse.urlparse(self.env.xmlrpc_uri)
+ xmlrpc_uri = urlparse.urlunparse((scheme, netloc, '/ipa/session/xml', params, query, fragment))
+ session = True
+ except ValueError:
+ # No session key, do full Kerberos auth
+ pass
+ servers = self.get_url_list(xmlrpc_uri)
serverproxy = None
for server in servers:
kw = dict(allow_none=True, encoding='UTF-8')
@@ -393,9 +425,10 @@ class xmlclient(Connectible):
kw['transport'] = LanguageAwareTransport()
self.log.info('trying %s' % server)
serverproxy = ServerProxy(server, **kw)
- if len(servers) == 1 or not fallback:
- # if we have only 1 server to try then let the main
- # requester handle any errors
+ if len(servers) == 1:
+ # if we have only 1 server and then let the
+ # main requester handle any errors. This also means it
+ # must handle a 401 but we save a ping.
return serverproxy
try:
command = getattr(serverproxy, 'ping')
@@ -417,9 +450,23 @@ class xmlclient(Connectible):
except KerberosError, krberr:
# kerberos error on one server is likely on all
raise errors.KerberosError(major=str(krberr), minor='')
+ except ProtocolError, e:
+ if session_data and e.errcode == 401:
+ # Unauthorized. Remove the session and try again.
+ try:
+ kernel_keyring.del_key('ipa_session_cookie')
+ delattr(context, 'session_data')
+ except ValueError:
+ # This shouldn't happen if we have a session but
+ # it isn't fatal.
+ pass
+ return self.create_connection(ccache, verbose, fallback, delegate)
+ if not fallback:
+ raise
+ serverproxy = None
except Exception, e:
if not fallback:
- raise e
+ raise
serverproxy = None
if serverproxy is None:
@@ -466,6 +513,22 @@ class xmlclient(Connectible):
except NSPRError, e:
raise NetworkError(uri=server, error=str(e))
except ProtocolError, e:
+ # By catching a 401 here we can detect the case where we have
+ # a single IPA server and the session is invalid. Otherwise
+ # we always have to do a ping().
+ session_data = getattr(context, 'session_data', None)
+ if session_data and e.errcode == 401:
+ # Unauthorized. Remove the session and try again.
+ try:
+ kernel_keyring.del_key('ipa_session_cookie')
+ delattr(context, 'session_data')
+ except ValueError:
+ # This shouldn't happen if we have a session but
+ # it isn't fatal.
+ pass
+ serverproxy = self.create_connection(os.environ.get('KRB5CCNAME'), self.env.verbose, self.env.fallback, self.env.delegate)
+ setattr(context, self.id, Connection(serverproxy, self.disconnect))
+ return self.forward(name, *args, **kw)
raise NetworkError(uri=server, error=e.errmsg)
except socket.error, e:
raise NetworkError(uri=server, error=str(e))
diff --git a/ipapython/kernel_keyring.py b/ipapython/kernel_keyring.py
new file mode 100644
index 0000000..547dd3d
--- /dev/null
+++ b/ipapython/kernel_keyring.py
@@ -0,0 +1,102 @@
+# Authors: Rob Crittenden <rcritten@redhat.com>
+#
+# Copyright (C) 2012 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+from ipapython.ipautil import run
+
+# NOTE: Absolute path not required for keyctl since we reset the environment
+# in ipautil.run.
+
+# Use the session keyring so the same user can have a different principal
+# in different shells. This was explicitly chosen over @us because then
+# it is not possible to use KRB5CCNAME to have a different user principal.
+# The same session would always be used and the first principal would
+# always win.
+KEYRING = '@s'
+KEYTYPE = 'user'
+
+def dump_keys():
+ """
+ Dump all keys
+ """
+ (stdout, stderr, rc) = run(['keyctl', 'list', KEYRING], raiseonerr=False)
+ return stdout
+
+def get_real_key(key):
+ """
+ One cannot request a key based on the description it was created with
+ so find the one we're looking for.
+ """
+ (stdout, stderr, rc) = run(['keyctl', 'search', KEYRING, KEYTYPE, key], raiseonerr=False)
+ if rc:
+ raise ValueError('key %s not found' % key)
+ return stdout.rstrip()
+
+def has_key(key):
+ """
+ Returns True/False whether the key exists in the keyring.
+ """
+ try:
+ get_real_key(key)
+ return True
+ except ValueError:
+ return False
+
+def read_key(key):
+ """
+ Read the keyring and return the value for key.
+
+ Use pipe instead of print here to ensure we always get the raw data.
+ """
+ real_key = get_real_key(key)
+ (stdout, stderr, rc) = run(['keyctl', 'pipe', real_key], raiseonerr=False)
+ if rc:
+ raise ValueError('keyctl pipe failed: %s' % stderr)
+
+ return stdout
+
+def update_key(key, value):
+ """
+ Update the keyring data. If they key doesn't exist it is created.
+ """
+ if has_key(key):
+ real_key = get_real_key(key)
+ (stdout, stderr, rc) = run(['keyctl', 'pupdate', real_key], stdin=value, raiseonerr=False)
+ if rc:
+ raise ValueError('keyctl pupdate failed: %s' % stderr)
+ else:
+ add_key(key, value)
+
+def add_key(key, value):
+ """
+ Add a key to the kernel keyring.
+ """
+ if has_key(key):
+ raise ValueError('key %s already exists' % key)
+ (stdout, stderr, rc) = run(['keyctl', 'padd', KEYTYPE, key, KEYRING], stdin=value, raiseonerr=False)
+ if rc:
+ raise ValueError('keyctl padd failed: %s' % stderr)
+
+def del_key(key):
+ """
+ Remove a key from the keyring
+ """
+ real_key = get_real_key(key)
+ (stdout, stderr, rc) = run(['keyctl', 'unlink', real_key, KEYRING], raiseonerr=False)
+ if rc:
+ raise ValueError('keyctl unlink failed: %s' % stderr)
diff --git a/ipaserver/plugins/xmlserver.py b/ipaserver/plugins/xmlserver.py
index bd9eb1f..8d96262 100644
--- a/ipaserver/plugins/xmlserver.py
+++ b/ipaserver/plugins/xmlserver.py
@@ -25,7 +25,7 @@ Loads WSGI server plugins.
from ipalib import api
if 'in_server' in api.env and api.env.in_server is True:
- from ipaserver.rpcserver import wsgi_dispatch, xmlserver, jsonserver_kerb, jsonserver_session, login_kerberos, login_password, change_password
+ from ipaserver.rpcserver import wsgi_dispatch, xmlserver, jsonserver_kerb, jsonserver_session, login_kerberos, login_password, change_password, xmlserver_session
api.register(wsgi_dispatch)
api.register(xmlserver)
api.register(jsonserver_kerb)
@@ -33,3 +33,4 @@ if 'in_server' in api.env and api.env.in_server is True:
api.register(login_kerberos)
api.register(login_password)
api.register(change_password)
+ api.register(xmlserver_session)
diff --git a/ipaserver/rpcserver.py b/ipaserver/rpcserver.py
index 5abbaf1..c770290 100644
--- a/ipaserver/rpcserver.py
+++ b/ipaserver/rpcserver.py
@@ -395,72 +395,6 @@ class WSGIExecutioner(Executioner):
raise NotImplementedError('%s.marshal()' % self.fullname)
-class xmlserver(WSGIExecutioner, HTTP_Status):
- """
- Execution backend plugin for XML-RPC server.
-
- Also see the `ipalib.rpc.xmlclient` plugin.
- """
-
- content_type = 'text/xml'
- key = '/xml'
-
- def _on_finalize(self):
- self.__system = {
- 'system.listMethods': self.listMethods,
- 'system.methodSignature': self.methodSignature,
- 'system.methodHelp': self.methodHelp,
- }
- super(xmlserver, self)._on_finalize()
-
- def __call__(self, environ, start_response):
- '''
- '''
-
- self.debug('WSGI xmlserver.__call__:')
- user_ccache=environ.get('KRB5CCNAME')
- if user_ccache is None:
- self.internal_error(environ, start_response,
- 'xmlserver.__call__: KRB5CCNAME not defined in HTTP request environment')
- return self.marshal(None, CCacheError())
- try:
- self.create_context(ccache=user_ccache)
- response = super(xmlserver, self).__call__(environ, start_response)
- except PublicError, e:
- status = HTTP_STATUS_SUCCESS
- response = status
- headers = [('Content-Type', 'text/plain; charset=utf-8')]
- start_response(status, headers)
- return self.marshal(None, e)
- finally:
- destroy_context()
- return response
-
- def listMethods(self, *params):
- return tuple(name.decode('UTF-8') for name in self.Command)
-
- def methodSignature(self, *params):
- return u'methodSignature not implemented'
-
- def methodHelp(self, *params):
- return u'methodHelp not implemented'
-
- def unmarshal(self, data):
- (params, name) = xml_loads(data)
- (args, options) = params_2_args_options(params)
- return (name, args, options, None)
-
- def marshal(self, result, error, _id=None):
- if error:
- self.debug('response: %s: %s', error.__class__.__name__, str(error))
- response = Fault(error.errno, error.strerror)
- else:
- if isinstance(result, dict):
- self.debug('response: entries returned %d', result.get('count', 1))
- response = (result,)
- return xml_dumps(response, methodresponse=True)
-
-
def json_encode_binary(val):
'''
JSON cannot encode binary values. We encode binary values in Python str
@@ -757,6 +691,76 @@ class KerberosSession(object):
return ['']
+class xmlserver(WSGIExecutioner, HTTP_Status, KerberosSession):
+ """
+ Execution backend plugin for XML-RPC server.
+
+ Also see the `ipalib.rpc.xmlclient` plugin.
+ """
+
+ content_type = 'text/xml'
+ key = '/xml'
+
+ def _on_finalize(self):
+ self.__system = {
+ 'system.listMethods': self.listMethods,
+ 'system.methodSignature': self.methodSignature,
+ 'system.methodHelp': self.methodHelp,
+ }
+ super(xmlserver, self)._on_finalize()
+ self.kerb_session_on_finalize()
+
+ def __call__(self, environ, start_response):
+ '''
+ '''
+
+ self.debug('WSGI xmlserver.__call__:')
+ user_ccache=environ.get('KRB5CCNAME')
+ if user_ccache is None:
+ self.internal_error(environ, start_response,
+ 'xmlserver.__call__: KRB5CCNAME not defined in HTTP request environment')
+ return self.marshal(None, CCacheError())
+ try:
+ self.create_context(ccache=user_ccache)
+ response = super(xmlserver, self).__call__(environ, start_response)
+ if getattr(context, 'session_data', None) is None and \
+ self.env.context != 'lite':
+ self.finalize_kerberos_acquisition('xmlserver', user_ccache, environ, start_response)
+ except PublicError, e:
+ status = HTTP_STATUS_SUCCESS
+ response = status
+ headers = [('Content-Type', 'text/plain; charset=utf-8')]
+ start_response(status, headers)
+ return self.marshal(None, e)
+ finally:
+ destroy_context()
+ return response
+
+ def listMethods(self, *params):
+ return tuple(name.decode('UTF-8') for name in self.Command)
+
+ def methodSignature(self, *params):
+ return u'methodSignature not implemented'
+
+ def methodHelp(self, *params):
+ return u'methodHelp not implemented'
+
+ def unmarshal(self, data):
+ (params, name) = xml_loads(data)
+ (args, options) = params_2_args_options(params)
+ return (name, args, options, None)
+
+ def marshal(self, result, error, _id=None):
+ if error:
+ self.debug('response: %s: %s', error.__class__.__name__, str(error))
+ response = Fault(error.errno, error.strerror)
+ else:
+ if isinstance(result, dict):
+ self.debug('response: entries returned %d', result.get('count', 1))
+ response = (result,)
+ return xml_dumps(response, methodresponse=True)
+
+
class jsonserver_session(jsonserver, KerberosSession):
"""
JSON RPC server protected with session auth.
@@ -1098,3 +1102,97 @@ class change_password(Backend, HTTP_Status):
output = _pwchange_template % dict(title=str(title),
message=str(message))
return [output]
+
+
+class xmlserver_session(xmlserver, KerberosSession):
+ """
+ XML RPC server protected with session auth.
+ """
+
+ key = '/session/xml'
+
+ def __init__(self):
+ super(xmlserver_session, self).__init__()
+ auth_mgr = AuthManagerKerb(self.__class__.__name__)
+ session_mgr.auth_mgr.register(auth_mgr.name, auth_mgr)
+
+ def _on_finalize(self):
+ super(xmlserver_session, self)._on_finalize()
+ self.kerb_session_on_finalize()
+
+ def need_login(self, start_response):
+ status = '401 Unauthorized'
+ headers = []
+ response = ''
+
+ self.debug('xmlserver_session: %s need login', status)
+
+ start_response(status, headers)
+ return [response]
+
+ def __call__(self, environ, start_response):
+ '''
+ '''
+
+ self.debug('WSGI xmlserver_session.__call__:')
+
+ # Load the session data
+ session_data = session_mgr.load_session_data(environ.get('HTTP_COOKIE'))
+ session_id = session_data['session_id']
+
+ self.debug('xmlserver_session.__call__: session_id=%s start_timestamp=%s access_timestamp=%s expiration_timestamp=%s',
+ session_id,
+ fmt_time(session_data['session_start_timestamp']),
+ fmt_time(session_data['session_access_timestamp']),
+ fmt_time(session_data['session_expiration_timestamp']))
+
+ ccache_data = session_data.get('ccache_data')
+
+ # Redirect to /ipa/xml if no Kerberos credentials
+ if ccache_data is None:
+ self.debug('xmlserver_session.__call_: no ccache, need TGT')
+ return self.need_login(start_response)
+
+ ipa_ccache_name = bind_ipa_ccache(ccache_data)
+
+ # Redirect to /ipa/xml if Kerberos credentials are expired
+ cc = KRB5_CCache(ipa_ccache_name)
+ if not cc.valid(self.api.env.host, self.api.env.realm):
+ self.debug('xmlserver_session.__call_: ccache expired, deleting session, need login')
+ # The request is finished with the ccache, destroy it.
+ release_ipa_ccache(ipa_ccache_name)
+ return self.need_login(start_response)
+
+ # Update the session expiration based on the Kerberos expiration
+ endtime = cc.endtime(self.api.env.host, self.api.env.realm)
+ self.update_session_expiration(session_data, endtime)
+
+ # Store the session data in the per-thread context
+ setattr(context, 'session_data', session_data)
+
+ environ['KRB5CCNAME'] = ipa_ccache_name
+
+ try:
+ response = super(xmlserver_session, self).__call__(environ, start_response)
+ finally:
+ # Kerberos may have updated the ccache data during the
+ # execution of the command therefore we need refresh our
+ # copy of it in the session data so the next command sees
+ # the same state of the ccache.
+ #
+ # However we must be careful not to restore the ccache
+ # data in the session data if it was explicitly deleted
+ # during the execution of the command. For example the
+ # logout command removes the ccache data from the session
+ # data to invalidate the session credentials.
+
+ if session_data.has_key('ccache_data'):
+ session_data['ccache_data'] = load_ccache_data(ipa_ccache_name)
+
+ # The request is finished with the ccache, destroy it.
+ release_ipa_ccache(ipa_ccache_name)
+ # Store the session data.
+ session_mgr.store_session_data(session_data)
+ destroy_context()
+
+ return response
diff --git a/tests/test_ipapython/test_keyring.py b/tests/test_ipapython/test_keyring.py
new file mode 100644
index 0000000..568fd5e
--- /dev/null
+++ b/tests/test_ipapython/test_keyring.py
@@ -0,0 +1,147 @@
+# Authors:
+# Rob Crittenden <rcritten@redhat.com>
+#
+# Copyright (C) 2012 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Test the `kernel_keyring.py` module.
+"""
+
+from nose.tools import raises, assert_raises # pylint: disable=E0611
+from ipapython import kernel_keyring
+
+TEST_KEY = 'ipa_test'
+TEST_VALUE = 'abc123'
+UPDATE_VALUE = '123abc'
+
+SIZE_256 = 'abcdefgh' * 32
+SIZE_512 = 'abcdefgh' * 64
+SIZE_1024 = 'abcdefgh' * 128
+
+class test_keyring(object):
+ """
+ Test the kernel keyring interface
+ """
+
+ def setUp(self):
+ try:
+ kernel_keyring.del_key(TEST_KEY)
+ except ValueError:
+ pass
+ try:
+ kernel_keyring.del_key(SIZE_256)
+ except ValueError:
+ pass
+
+ def test_01(self):
+ """
+ Add a new key and value, then remove it
+ """
+ kernel_keyring.add_key(TEST_KEY, TEST_VALUE)
+ result = kernel_keyring.read_key(TEST_KEY)
+ assert(result == TEST_VALUE)
+
+ kernel_keyring.del_key(TEST_KEY)
+
+ # Make sure it is gone
+ try:
+ result = kernel_keyring.read_key(TEST_KEY)
+ except ValueError, e:
+ assert e.message == 'key %s not found' % TEST_KEY
+
+ def test_02(self):
+ """
+ Delete a non_existent key
+ """
+ try:
+ kernel_keyring.del_key(TEST_KEY)
+ raise AssertionError('key should not have been deleted')
+ except ValueError:
+ pass
+
+ @raises(ValueError)
+ def test_03(self):
+ """
+ Add a duplicate key
+ """
+ kernel_keyring.add_key(TEST_KEY, TEST_VALUE)
+ kernel_keyring.add_key(TEST_KEY, TEST_VALUE)
+
+ def test_04(self):
+ """
+ Update the value in a key
+ """
+ kernel_keyring.update_key(TEST_KEY, UPDATE_VALUE)
+ result = kernel_keyring.read_key(TEST_KEY)
+ assert(result == UPDATE_VALUE)
+
+ # Now update it 10 times
+ for i in xrange(10):
+ kernel_keyring.update_key(TEST_KEY, 'test %d' % i)
+ result = kernel_keyring.read_key(TEST_KEY)
+ assert(result == 'test %d' % i)
+
+ kernel_keyring.del_key(TEST_KEY)
+
+ @raises(ValueError)
+ def test_05(self):
+ """
+ Read a non-existent key
+ """
+ result = kernel_keyring.read_key(TEST_KEY)
+
+ def test_06(self):
+ """
+ See if a key is available
+ """
+ kernel_keyring.add_key(TEST_KEY, TEST_VALUE)
+
+ result = kernel_keyring.has_key(TEST_KEY)
+ assert(result == True)
+ kernel_keyring.del_key(TEST_KEY)
+
+ result = kernel_keyring.has_key(TEST_KEY)
+ assert(result == False)
+
+ def test_07(self):
+ """
+ Test a 256-byte key
+ """
+ kernel_keyring.add_key(SIZE_256, TEST_VALUE)
+ result = kernel_keyring.read_key(SIZE_256)
+ assert(result == TEST_VALUE)
+
+ kernel_keyring.del_key(SIZE_256)
+
+ def test_08(self):
+ """
+ Test 512-bytes of data
+ """
+ kernel_keyring.add_key(TEST_KEY, SIZE_512)
+ result = kernel_keyring.read_key(TEST_KEY)
+ assert(result == SIZE_512)
+
+ kernel_keyring.del_key(TEST_KEY)
+
+ def test_09(self):
+ """
+ Test 1k bytes of data
+ """
+ kernel_keyring.add_key(TEST_KEY, SIZE_1024)
+ result = kernel_keyring.read_key(TEST_KEY)
+ assert(result == SIZE_1024)
+
+ kernel_keyring.del_key(TEST_KEY)