summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--API.txt12
-rw-r--r--freeipa.spec.in2
-rw-r--r--ipaserver/plugins/certmap.py166
3 files changed, 179 insertions, 1 deletions
diff --git a/API.txt b/API.txt
index a8f8ff187..ace3101dc 100644
--- a/API.txt
+++ b/API.txt
@@ -824,6 +824,16 @@ option: Str('version?')
output: Entry('result')
output: Output('summary', type=[<type 'unicode'>, <type 'NoneType'>])
output: PrimaryKey('value')
+command: certmap_match/1
+args: 1,3,4
+arg: Bytes('certificate', cli_name='certificate')
+option: Flag('all', autofill=True, cli_name='all', default=False)
+option: Flag('raw', autofill=True, cli_name='raw', default=False)
+option: Str('version?')
+output: Output('count', type=[<type 'int'>])
+output: ListOfEntries('result')
+output: Output('summary', type=[<type 'unicode'>, <type 'NoneType'>])
+output: Output('truncated', type=[<type 'bool'>])
command: certmapconfig_mod/1
args: 0,8,3
option: Str('addattr*', cli_name='addattr')
@@ -6517,6 +6527,8 @@ default: cert_request/1
default: cert_revoke/1
default: cert_show/1
default: cert_status/1
+default: certmap/1
+default: certmap_match/1
default: certmapconfig/1
default: certmapconfig_mod/1
default: certmapconfig_show/1
diff --git a/freeipa.spec.in b/freeipa.spec.in
index 5b736b615..cc7422aad 100644
--- a/freeipa.spec.in
+++ b/freeipa.spec.in
@@ -284,6 +284,8 @@ Requires: gzip
Requires: oddjob
# Require 0.6.0 for the new delegation access control features
Requires: gssproxy >= 0.6.0
+# Require 1.15.1 for the certificate identity mapping feature
+Requires: sssd-dbus >= 1.15.1
Provides: %{alt_name}-server = %{version}
Conflicts: %{alt_name}-server
diff --git a/ipaserver/plugins/certmap.py b/ipaserver/plugins/certmap.py
index c37eae38e..e28b397c0 100644
--- a/ipaserver/plugins/certmap.py
+++ b/ipaserver/plugins/certmap.py
@@ -17,9 +17,14 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import base64
+import dbus
import six
-from ipalib import api, errors
+from ipalib import api, errors, x509
+from ipalib import Bytes
+from ipalib.crud import Search
+from ipalib.frontend import Object
from ipalib.parameters import Bool, DNSNameParam, Flag, Int, Str
from ipalib.plugable import Registry
from .baseldap import (
@@ -33,6 +38,7 @@ from .baseldap import (
pkey_to_value)
from ipalib import _, ngettext
from ipalib import output
+from ipaserver.plugins.service import validate_certificate
if six.PY3:
@@ -389,3 +395,161 @@ class certmaprule_disable(LDAPQuery):
result=True,
value=pkey_to_value(cn, options),
)
+
+
+DBUS_SSSD_NAME = 'org.freedesktop.sssd.infopipe'
+DBUS_PROPERTY_IF = 'org.freedesktop.DBus.Properties'
+DBUS_SSSD_USERS_PATH = '/org/freedesktop/sssd/infopipe/Users'
+DBUS_SSSD_USERS_IF = 'org.freedesktop.sssd.infopipe.Users'
+DBUS_SSSD_USER_IF = 'org.freedesktop.sssd.infopipe.Users.User'
+
+
+class _sssd(object):
+ """
+ Auxiliary class for SSSD infopipe DBus.
+ """
+ def __init__(self, log):
+ """
+ Initialize the Users object and interface.
+
+ :raise RemoteRetrieveError: if DBus error occurs
+ """
+ try:
+ self.log = log
+ self._bus = dbus.SystemBus()
+ self._users_obj = self._bus.get_object(
+ DBUS_SSSD_NAME, DBUS_SSSD_USERS_PATH)
+ self._users_iface = dbus.Interface(
+ self._users_obj, DBUS_SSSD_USERS_IF)
+ except dbus.DBusException as e:
+ self.log.error(
+ 'Failed to initialize DBus interface {iface}. DBus '
+ 'exception is {exc}.'.format(iface=DBUS_SSSD_USERS_IF, exc=e)
+ )
+ raise errors.RemoteRetrieveError(
+ reason=_('Failed to connect to sssd over SystemBus. '
+ 'See details in the error_log'))
+
+ def list_users_by_cert(self, cert):
+ """
+ Look for users matching the cert.
+
+ Call Users.ListByCertificate interface and return a dict
+ with key = domain, value = list of uids
+ corresponding to the users matching the provided cert
+ :param cert: DER cert
+ :raise RemoteRetrieveError: if DBus error occurs
+ """
+ try:
+ pem = x509.make_pem(base64.b64encode(cert))
+ # bug 3306 in sssd returns 0 entry when max_entries = 0
+ # Temp workaround is to use a non-null value, not too high
+ # to avoid reserving unneeded memory
+ max_entries = dbus.UInt32(100)
+ user_paths = self._users_iface.ListByCertificate(pem, max_entries)
+ users = dict()
+ for user_path in user_paths:
+ user_obj = self._bus.get_object(DBUS_SSSD_NAME, user_path)
+ user_iface = dbus.Interface(user_obj, DBUS_PROPERTY_IF)
+ user_login = user_iface.Get(DBUS_SSSD_USER_IF, 'name')
+
+ # Extract name@domain
+ items = user_login.split('@')
+ domain = api.env.realm if len(items) < 2 else items[1]
+ name = items[0]
+
+ # Retrieve the list of users for the given domain,
+ # or initialize to an empty list
+ # and add the name
+ users_for_dom = users.setdefault(domain, list())
+ users_for_dom.append(name)
+ return users
+ except dbus.DBusException as e:
+ err_name = e.get_dbus_name()
+ # If there is no matching user, do not consider this as an
+ # exception and return an empty list
+ if err_name == 'org.freedesktop.sssd.Error.NotFound':
+ return dict()
+ self.log.error(
+ 'Failed to use interface {iface}. DBus '
+ 'exception is {exc}.'.format(iface=DBUS_SSSD_USERS_IF, exc=e))
+ raise errors.RemoteRetrieveError(
+ reason=_('Failed to find users over SystemBus. '
+ ' See details in the error_log'))
+
+
+@register()
+class certmap(Object):
+ """
+ virtual object for certmatch_map API
+ """
+ takes_params = (
+ DNSNameParam(
+ 'domain',
+ label=_('Domain'),
+ flags={'no_search'},
+ ),
+ Str(
+ 'uid*',
+ label=_('User logins'),
+ flags={'no_search'},
+ ),
+ )
+
+
+@register()
+class certmap_match(Search):
+ __doc__ = _("""
+ Search for users matching the provided certificate.
+
+ This command relies on SSSD to retrieve the list of matching users and
+ may return cached data. For more information on purging SSSD cache,
+ please refer to sss_cache documentation.
+ """)
+
+ msg_summary = ngettext('%(count)s user matched',
+ '%(count)s users matched', 0)
+
+ def get_summary_default(self, output):
+ """
+ Need to sum the numbre of matching users for each domain.
+ """
+ count = sum(len(entry['uid']) for entry in output['result'])
+ return self.msg_summary % dict(count=count)
+
+ def get_args(self):
+ for arg in super(certmap_match, self).get_args():
+ if arg.name == 'criteria':
+ continue
+ yield arg
+ yield Bytes(
+ 'certificate', validate_certificate,
+ cli_name='certificate',
+ label=_('Certificate'),
+ doc=_('Base-64 encoded user certificate'),
+ flags=['virtual_attribute']
+ )
+
+ def execute(self, *args, **options):
+ """
+ Search for users matching the provided certificate.
+
+ The search is performed using SSSD's DBus interface
+ Users.ListByCertificate.
+ SSSD does the lookup based on certificate mapping rules, using
+ FreeIPA domain and trusted domains.
+ :raise RemoteRetrieveError: if DBus returns an exception
+ """
+ sssd = _sssd(self.log)
+
+ cert = args[0]
+ users = sssd.list_users_by_cert(cert)
+ result = [{'domain': domain, 'uid': userlist}
+ for (domain, userlist) in users.items()]
+ count = len(result)
+
+ return dict(
+ result=result,
+ count=count,
+ truncated=False,
+ )