diff options
author | Miloslav Trmač <mitr@redhat.com> | 2009-07-16 20:10:58 +0200 |
---|---|---|
committer | Martin Sivak <msivak@redhat.com> | 2009-07-17 16:44:30 +0200 |
commit | 0eff6d603623c6d506749f511bbd06b63a7b4d98 (patch) | |
tree | e02db913f683ae31ff59284f32810611f489e0ec | |
parent | 66743f15980987f74f930174786da6a6438bc40b (diff) | |
download | firstaidkit-0eff6d603623c6d506749f511bbd06b63a7b4d98.tar.gz firstaidkit-0eff6d603623c6d506749f511bbd06b63a7b4d98.tar.xz firstaidkit-0eff6d603623c6d506749f511bbd06b63a7b4d98.zip |
Add KeyRecoveryPlugin
Signed-off-by: Martin Sivak <msivak@redhat.com>
-rw-r--r-- | plugins/key_recovery.py | 242 |
1 files changed, 242 insertions, 0 deletions
diff --git a/plugins/key_recovery.py b/plugins/key_recovery.py new file mode 100644 index 0000000..ffabfc5 --- /dev/null +++ b/plugins/key_recovery.py @@ -0,0 +1,242 @@ +# First Aid Kit - diagnostic and repair tool for Linux +# encoding: utf-8 +# Copyright (C) 2009 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +# +# Red Hat author: Miloslav Trmač <mitr@redhat.com> + +import collections +import logging + +from pyfirstaidkit.plugins import Plugin,Flow +from pyfirstaidkit.returns import * +from pyfirstaidkit.reporting import PLUGIN + +import volume_key + +# A hackish alternative way to initialize NSS: +# import rpm; rpm.ts() +# (currently conflicts with rpm.py in the current directory) +import nss.nss +nss.nss.nss_init_nodb() + +class KeyRecoveryPlugin(Plugin): + """This plugin allows restoring access to encrypted volumes using escrow + packets generated by volume_key.""" + + flows = {} + flows["recoverEncryptionKey"] = Flow({ + Plugin.initial: {Return: "findEncryptedVolumes"}, + "findEncryptedVolumes" : {ReturnSuccess: "openPacket", + ReturnFailure: Plugin.final, + None: Plugin.final}, + "openPacket" : {ReturnSuccess: "addPassphrase", + ReturnFailure: Plugin.final, + None: Plugin.final}, + "addPassphrase" : {ReturnSuccess: Plugin.final, + ReturnFailure: Plugin.final, + None: Plugin.final} + }, description="Add a password to an encrypted volume " + "using a provided escrow packet") + + name = "Encryption key recovery plugin" + version = "0.0.1" + author = "Miloslav Trmač" + description = "Automates recovery of LUKS volume key" + + @classmethod + def getDeps(cls): + return set(["root"]) + + def __init__(self, *args, **kwargs): + Plugin.__init__(self, *args, **kwargs) + + self._ui = volume_key.UI() + self._ui.generic_cb = self._vk_ui_generic_cb + self._ui.passphrase_cb = self._vk_ui_passphrase_cb + + def _vk_ui_generic_cb(self, prompt, echo): + prompt = "%s:" % (prompt,) + if echo: + r = self._reporting.text_question_wait(prompt, origin = self) + else: + r = self._reporting.password_question_wait(prompt, origin = self) + if r == '': + return None + return r + + def _vk_ui_passphrase_cb(self, prompt, unused_failed_attempts): + r = self._reporting.password_question_wait("%s:" % (prompt,), + origin = self) + if r == '': + return None + return r + + def findEncryptedVolumes(self): + # Note: This assumes "the system" has automatically set up RAID, LVM + # etc. to make the encrypted volumes visible. + self._volumes = collections.defaultdict(list) + with open("/proc/partitions") as f: + for line in f.readlines()[2:]: + (major, minor, blocks, name) = line.split() + if name.startswith("ram"): + continue + path = "/dev/%s" % name + try: + vol = volume_key.Volume.open(path) + except RuntimeError, e: + self._reporting.info("Error examining %s: %s" % + (path, str(e)), origin = self, + importance = logging.DEBUG) + else: + if vol.format in (volume_key.VOLUME_FORMAT_LUKS,): + self._volumes[vol.uuid].append(path) + if len(self._volumes) == 0: + self._reporting.info("No encrypted volume found", origin = self) + self._result = ReturnFailure + return + self._result = ReturnSuccess + + def openPacket(self): + pack = self._openPacket() + if pack is None: + return + candidates = self._getCandidates(pack) + + volume_path = self._chooseCandidate(candidates) + if volume_path is None: + return + self._packet = pack + self._volume_path = volume_path + self._result = ReturnSuccess + + def _openPacket(self): + """Ask for a packet and open it. + + Return a packet if OK (setting self._packet_path), None otherwise. + + """ + packet_path = self._reporting.filename_question_wait \ + ("Select an escrow packet:", origin = self) + try: + with open(packet_path, "rb") as f: + packet_data = f.read() + except IOError, e: + self._reporting.error("Error reading %s: %s" % + (packet_path, str(e)), origin = self) + return None + try: + fmt = volume_key.Packet.get_format(packet_data) + # In particular, PACKET_FORMAT_ASSYMETRIC requries a NSS db and is + # not supported + if fmt not in (volume_key.PACKET_FORMAT_CLEARTEXT, + volume_key.PACKET_FORMAT_PASSPHRASE): + self._reporting.error("Unsupported format %s of %s" % + (fmt, packet_path), origin = self) + return None + pack = volume_key.Packet.open(packet_data, self._ui) + except RuntimeError, e: + self._reporting.error("Error opening %s: %s" % + (packet_path, str(e)), origin = self) + return None + self._packet_path = packet_path + return pack + + def _getCandidates(self, pack): + """Return a list of volume paths that match pack.""" + candidates = [] + for path in self._volumes[pack.uuid]: + try: + vol = volume_key.Volume.open(path) + (unused_r, unused_warnings) = pack.packet_match_volume(vol) + except RuntimeError: + # volume_key.Volume.open() passed previously, so this exception + # probably means the packet did not match + pass + else: + candidates.append(path) + return candidates + + def _chooseCandidate(self, candidates): + """Let the user choose a volume out of candidates. + + Return volume path, or None on error. + + """ + if len(candidates) == 0: + self._reporting.error("Packet %s does not match any known volume" % + (self._packet_path,), origin = self) + self._result = ReturnFailure + return None + elif len(candidates) == 1: + volume_path = candidates[0] + if not self._reporting.choice_question_wait \ + ("This packet can be applied to volume %s. Continue?" % + volume_path, ((True, "Yes"), (False, "No")), + origin = self): + self._result = ReturnFailure + return None + else: + # This really shouldn't happen, that's what UUIDs are for... + choices = [(p, p) for p in candidates] + choices.append((None, "Cancel operation")) + volume_path = self._reporting.choice_question_wait \ + ("Which volume do you want to apply this packet to?", choices, + origin = self) + if volume_path is None: + self._result = ReturnFailure + return None + return volume_path + + def addPassphrase(self): + try: + vol = volume_key.Volume.open(self._volume_path) + except RuntimeError, e: + self._reporting.error("Error opening %s: %s" % + (self._volume_path, str(e)), origin = self) + return + try: + (r, warnings) = self._packet.packet_match_volume(vol) + except RuntimeError, e: + self._reporting.error("%s does not match %s: %s" % + (self._packet_path, self._volume_path, + str(e)), origin = self) + return + if (r == volume_key.PACKET_MATCH_UNSURE and + not self._reporting.choice_question_wait \ + ("%s perhaps does not match %s:\n%s\nAre you sure you want " + " to use this packet?" % + (self._packet_path, self._volume_path, "\n".join(warnings)), + ((True, "Yes"), (False, "No")), origin = self)): + self._result = ReturnFailure + return + + self._reporting.info("Applying packet to set up a new passphrase...", + origin = self) + try: + vol.apply_packet(self._packet, volume_key.SECRET_DEFAULT, self._ui) + except RuntimeError, e: + self._reporting.error("Error applying %s to %s: %s" % + (self._packet_path, self._volume_path, + str(e)), origin = self) + self._result = ReturnFailure + return + self._reporting.info("Packet applied succesfully...", origin = self) + self._result = ReturnSuccess + return + +def get_plugin(): + return KeyRecoveryPlugin |