diff options
Diffstat (limited to 'ipaserver/plugins/certmap.py')
-rw-r--r-- | ipaserver/plugins/certmap.py | 166 |
1 files changed, 165 insertions, 1 deletions
diff --git a/ipaserver/plugins/certmap.py b/ipaserver/plugins/certmap.py index c37eae38e..e28b397c0 100644 --- a/ipaserver/plugins/certmap.py +++ b/ipaserver/plugins/certmap.py @@ -17,9 +17,14 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import base64 +import dbus import six -from ipalib import api, errors +from ipalib import api, errors, x509 +from ipalib import Bytes +from ipalib.crud import Search +from ipalib.frontend import Object from ipalib.parameters import Bool, DNSNameParam, Flag, Int, Str from ipalib.plugable import Registry from .baseldap import ( @@ -33,6 +38,7 @@ from .baseldap import ( pkey_to_value) from ipalib import _, ngettext from ipalib import output +from ipaserver.plugins.service import validate_certificate if six.PY3: @@ -389,3 +395,161 @@ class certmaprule_disable(LDAPQuery): result=True, value=pkey_to_value(cn, options), ) + + +DBUS_SSSD_NAME = 'org.freedesktop.sssd.infopipe' +DBUS_PROPERTY_IF = 'org.freedesktop.DBus.Properties' +DBUS_SSSD_USERS_PATH = '/org/freedesktop/sssd/infopipe/Users' +DBUS_SSSD_USERS_IF = 'org.freedesktop.sssd.infopipe.Users' +DBUS_SSSD_USER_IF = 'org.freedesktop.sssd.infopipe.Users.User' + + +class _sssd(object): + """ + Auxiliary class for SSSD infopipe DBus. + """ + def __init__(self, log): + """ + Initialize the Users object and interface. + + :raise RemoteRetrieveError: if DBus error occurs + """ + try: + self.log = log + self._bus = dbus.SystemBus() + self._users_obj = self._bus.get_object( + DBUS_SSSD_NAME, DBUS_SSSD_USERS_PATH) + self._users_iface = dbus.Interface( + self._users_obj, DBUS_SSSD_USERS_IF) + except dbus.DBusException as e: + self.log.error( + 'Failed to initialize DBus interface {iface}. DBus ' + 'exception is {exc}.'.format(iface=DBUS_SSSD_USERS_IF, exc=e) + ) + raise errors.RemoteRetrieveError( + reason=_('Failed to connect to sssd over SystemBus. ' + 'See details in the error_log')) + + def list_users_by_cert(self, cert): + """ + Look for users matching the cert. + + Call Users.ListByCertificate interface and return a dict + with key = domain, value = list of uids + corresponding to the users matching the provided cert + :param cert: DER cert + :raise RemoteRetrieveError: if DBus error occurs + """ + try: + pem = x509.make_pem(base64.b64encode(cert)) + # bug 3306 in sssd returns 0 entry when max_entries = 0 + # Temp workaround is to use a non-null value, not too high + # to avoid reserving unneeded memory + max_entries = dbus.UInt32(100) + user_paths = self._users_iface.ListByCertificate(pem, max_entries) + users = dict() + for user_path in user_paths: + user_obj = self._bus.get_object(DBUS_SSSD_NAME, user_path) + user_iface = dbus.Interface(user_obj, DBUS_PROPERTY_IF) + user_login = user_iface.Get(DBUS_SSSD_USER_IF, 'name') + + # Extract name@domain + items = user_login.split('@') + domain = api.env.realm if len(items) < 2 else items[1] + name = items[0] + + # Retrieve the list of users for the given domain, + # or initialize to an empty list + # and add the name + users_for_dom = users.setdefault(domain, list()) + users_for_dom.append(name) + return users + except dbus.DBusException as e: + err_name = e.get_dbus_name() + # If there is no matching user, do not consider this as an + # exception and return an empty list + if err_name == 'org.freedesktop.sssd.Error.NotFound': + return dict() + self.log.error( + 'Failed to use interface {iface}. DBus ' + 'exception is {exc}.'.format(iface=DBUS_SSSD_USERS_IF, exc=e)) + raise errors.RemoteRetrieveError( + reason=_('Failed to find users over SystemBus. ' + ' See details in the error_log')) + + +@register() +class certmap(Object): + """ + virtual object for certmatch_map API + """ + takes_params = ( + DNSNameParam( + 'domain', + label=_('Domain'), + flags={'no_search'}, + ), + Str( + 'uid*', + label=_('User logins'), + flags={'no_search'}, + ), + ) + + +@register() +class certmap_match(Search): + __doc__ = _(""" + Search for users matching the provided certificate. + + This command relies on SSSD to retrieve the list of matching users and + may return cached data. For more information on purging SSSD cache, + please refer to sss_cache documentation. + """) + + msg_summary = ngettext('%(count)s user matched', + '%(count)s users matched', 0) + + def get_summary_default(self, output): + """ + Need to sum the numbre of matching users for each domain. + """ + count = sum(len(entry['uid']) for entry in output['result']) + return self.msg_summary % dict(count=count) + + def get_args(self): + for arg in super(certmap_match, self).get_args(): + if arg.name == 'criteria': + continue + yield arg + yield Bytes( + 'certificate', validate_certificate, + cli_name='certificate', + label=_('Certificate'), + doc=_('Base-64 encoded user certificate'), + flags=['virtual_attribute'] + ) + + def execute(self, *args, **options): + """ + Search for users matching the provided certificate. + + The search is performed using SSSD's DBus interface + Users.ListByCertificate. + SSSD does the lookup based on certificate mapping rules, using + FreeIPA domain and trusted domains. + :raise RemoteRetrieveError: if DBus returns an exception + """ + sssd = _sssd(self.log) + + cert = args[0] + users = sssd.list_users_by_cert(cert) + result = [{'domain': domain, 'uid': userlist} + for (domain, userlist) in users.items()] + count = len(result) + + return dict( + result=result, + count=count, + truncated=False, + ) |