summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMiloslav Trmač <mitr@redhat.com>2009-07-16 20:10:58 +0200
committerMartin Sivak <msivak@redhat.com>2009-07-17 16:44:30 +0200
commit0eff6d603623c6d506749f511bbd06b63a7b4d98 (patch)
treee02db913f683ae31ff59284f32810611f489e0ec
parent66743f15980987f74f930174786da6a6438bc40b (diff)
downloadfirstaidkit-0eff6d603623c6d506749f511bbd06b63a7b4d98.tar.gz
firstaidkit-0eff6d603623c6d506749f511bbd06b63a7b4d98.tar.xz
firstaidkit-0eff6d603623c6d506749f511bbd06b63a7b4d98.zip
Add KeyRecoveryPlugin
Signed-off-by: Martin Sivak <msivak@redhat.com>
-rw-r--r--plugins/key_recovery.py242
1 files changed, 242 insertions, 0 deletions
diff --git a/plugins/key_recovery.py b/plugins/key_recovery.py
new file mode 100644
index 0000000..ffabfc5
--- /dev/null
+++ b/plugins/key_recovery.py
@@ -0,0 +1,242 @@
+# First Aid Kit - diagnostic and repair tool for Linux
+# encoding: utf-8
+# Copyright (C) 2009 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+#
+# Red Hat author: Miloslav Trmač <mitr@redhat.com>
+
+import collections
+import logging
+
+from pyfirstaidkit.plugins import Plugin,Flow
+from pyfirstaidkit.returns import *
+from pyfirstaidkit.reporting import PLUGIN
+
+import volume_key
+
+# A hackish alternative way to initialize NSS:
+# import rpm; rpm.ts()
+# (currently conflicts with rpm.py in the current directory)
+import nss.nss
+nss.nss.nss_init_nodb()
+
+class KeyRecoveryPlugin(Plugin):
+ """This plugin allows restoring access to encrypted volumes using escrow
+ packets generated by volume_key."""
+
+ flows = {}
+ flows["recoverEncryptionKey"] = Flow({
+ Plugin.initial: {Return: "findEncryptedVolumes"},
+ "findEncryptedVolumes" : {ReturnSuccess: "openPacket",
+ ReturnFailure: Plugin.final,
+ None: Plugin.final},
+ "openPacket" : {ReturnSuccess: "addPassphrase",
+ ReturnFailure: Plugin.final,
+ None: Plugin.final},
+ "addPassphrase" : {ReturnSuccess: Plugin.final,
+ ReturnFailure: Plugin.final,
+ None: Plugin.final}
+ }, description="Add a password to an encrypted volume "
+ "using a provided escrow packet")
+
+ name = "Encryption key recovery plugin"
+ version = "0.0.1"
+ author = "Miloslav Trmač"
+ description = "Automates recovery of LUKS volume key"
+
+ @classmethod
+ def getDeps(cls):
+ return set(["root"])
+
+ def __init__(self, *args, **kwargs):
+ Plugin.__init__(self, *args, **kwargs)
+
+ self._ui = volume_key.UI()
+ self._ui.generic_cb = self._vk_ui_generic_cb
+ self._ui.passphrase_cb = self._vk_ui_passphrase_cb
+
+ def _vk_ui_generic_cb(self, prompt, echo):
+ prompt = "%s:" % (prompt,)
+ if echo:
+ r = self._reporting.text_question_wait(prompt, origin = self)
+ else:
+ r = self._reporting.password_question_wait(prompt, origin = self)
+ if r == '':
+ return None
+ return r
+
+ def _vk_ui_passphrase_cb(self, prompt, unused_failed_attempts):
+ r = self._reporting.password_question_wait("%s:" % (prompt,),
+ origin = self)
+ if r == '':
+ return None
+ return r
+
+ def findEncryptedVolumes(self):
+ # Note: This assumes "the system" has automatically set up RAID, LVM
+ # etc. to make the encrypted volumes visible.
+ self._volumes = collections.defaultdict(list)
+ with open("/proc/partitions") as f:
+ for line in f.readlines()[2:]:
+ (major, minor, blocks, name) = line.split()
+ if name.startswith("ram"):
+ continue
+ path = "/dev/%s" % name
+ try:
+ vol = volume_key.Volume.open(path)
+ except RuntimeError, e:
+ self._reporting.info("Error examining %s: %s" %
+ (path, str(e)), origin = self,
+ importance = logging.DEBUG)
+ else:
+ if vol.format in (volume_key.VOLUME_FORMAT_LUKS,):
+ self._volumes[vol.uuid].append(path)
+ if len(self._volumes) == 0:
+ self._reporting.info("No encrypted volume found", origin = self)
+ self._result = ReturnFailure
+ return
+ self._result = ReturnSuccess
+
+ def openPacket(self):
+ pack = self._openPacket()
+ if pack is None:
+ return
+ candidates = self._getCandidates(pack)
+
+ volume_path = self._chooseCandidate(candidates)
+ if volume_path is None:
+ return
+ self._packet = pack
+ self._volume_path = volume_path
+ self._result = ReturnSuccess
+
+ def _openPacket(self):
+ """Ask for a packet and open it.
+
+ Return a packet if OK (setting self._packet_path), None otherwise.
+
+ """
+ packet_path = self._reporting.filename_question_wait \
+ ("Select an escrow packet:", origin = self)
+ try:
+ with open(packet_path, "rb") as f:
+ packet_data = f.read()
+ except IOError, e:
+ self._reporting.error("Error reading %s: %s" %
+ (packet_path, str(e)), origin = self)
+ return None
+ try:
+ fmt = volume_key.Packet.get_format(packet_data)
+ # In particular, PACKET_FORMAT_ASSYMETRIC requries a NSS db and is
+ # not supported
+ if fmt not in (volume_key.PACKET_FORMAT_CLEARTEXT,
+ volume_key.PACKET_FORMAT_PASSPHRASE):
+ self._reporting.error("Unsupported format %s of %s" %
+ (fmt, packet_path), origin = self)
+ return None
+ pack = volume_key.Packet.open(packet_data, self._ui)
+ except RuntimeError, e:
+ self._reporting.error("Error opening %s: %s" %
+ (packet_path, str(e)), origin = self)
+ return None
+ self._packet_path = packet_path
+ return pack
+
+ def _getCandidates(self, pack):
+ """Return a list of volume paths that match pack."""
+ candidates = []
+ for path in self._volumes[pack.uuid]:
+ try:
+ vol = volume_key.Volume.open(path)
+ (unused_r, unused_warnings) = pack.packet_match_volume(vol)
+ except RuntimeError:
+ # volume_key.Volume.open() passed previously, so this exception
+ # probably means the packet did not match
+ pass
+ else:
+ candidates.append(path)
+ return candidates
+
+ def _chooseCandidate(self, candidates):
+ """Let the user choose a volume out of candidates.
+
+ Return volume path, or None on error.
+
+ """
+ if len(candidates) == 0:
+ self._reporting.error("Packet %s does not match any known volume" %
+ (self._packet_path,), origin = self)
+ self._result = ReturnFailure
+ return None
+ elif len(candidates) == 1:
+ volume_path = candidates[0]
+ if not self._reporting.choice_question_wait \
+ ("This packet can be applied to volume %s. Continue?" %
+ volume_path, ((True, "Yes"), (False, "No")),
+ origin = self):
+ self._result = ReturnFailure
+ return None
+ else:
+ # This really shouldn't happen, that's what UUIDs are for...
+ choices = [(p, p) for p in candidates]
+ choices.append((None, "Cancel operation"))
+ volume_path = self._reporting.choice_question_wait \
+ ("Which volume do you want to apply this packet to?", choices,
+ origin = self)
+ if volume_path is None:
+ self._result = ReturnFailure
+ return None
+ return volume_path
+
+ def addPassphrase(self):
+ try:
+ vol = volume_key.Volume.open(self._volume_path)
+ except RuntimeError, e:
+ self._reporting.error("Error opening %s: %s" %
+ (self._volume_path, str(e)), origin = self)
+ return
+ try:
+ (r, warnings) = self._packet.packet_match_volume(vol)
+ except RuntimeError, e:
+ self._reporting.error("%s does not match %s: %s" %
+ (self._packet_path, self._volume_path,
+ str(e)), origin = self)
+ return
+ if (r == volume_key.PACKET_MATCH_UNSURE and
+ not self._reporting.choice_question_wait \
+ ("%s perhaps does not match %s:\n%s\nAre you sure you want "
+ " to use this packet?" %
+ (self._packet_path, self._volume_path, "\n".join(warnings)),
+ ((True, "Yes"), (False, "No")), origin = self)):
+ self._result = ReturnFailure
+ return
+
+ self._reporting.info("Applying packet to set up a new passphrase...",
+ origin = self)
+ try:
+ vol.apply_packet(self._packet, volume_key.SECRET_DEFAULT, self._ui)
+ except RuntimeError, e:
+ self._reporting.error("Error applying %s to %s: %s" %
+ (self._packet_path, self._volume_path,
+ str(e)), origin = self)
+ self._result = ReturnFailure
+ return
+ self._reporting.info("Packet applied succesfully...", origin = self)
+ self._result = ReturnSuccess
+ return
+
+def get_plugin():
+ return KeyRecoveryPlugin