# First Aid Kit - diagnostic and repair tool for Linux # encoding: utf-8 # Copyright (C) 2009 Red Hat, Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. # # Red Hat author: Miloslav Trmač import collections import logging from pyfirstaidkit.plugins import Plugin,Flow from pyfirstaidkit.returns import * from pyfirstaidkit.reporting import PLUGIN import volume_key # A hackish alternative way to initialize NSS: # import rpm; rpm.ts() # (currently conflicts with rpm.py in the current directory) import nss.nss nss.nss.nss_init_nodb() class KeyRecoveryPlugin(Plugin): """This plugin allows restoring access to encrypted volumes using escrow packets generated by volume_key.""" flows = {} flows["recoverEncryptionKey"] = Flow({ Plugin.initial: {Return: "findEncryptedVolumes"}, "findEncryptedVolumes" : {ReturnSuccess: "openPacket", ReturnFailure: Plugin.final, None: Plugin.final}, "openPacket" : {ReturnSuccess: "addPassphrase", ReturnFailure: Plugin.final, None: Plugin.final}, "addPassphrase" : {ReturnSuccess: Plugin.final, ReturnFailure: Plugin.final, None: Plugin.final} }, description="Add a password to an encrypted volume " "using a provided escrow packet") name = "Encryption key recovery plugin" version = "0.0.1" author = "Miloslav Trmač" description = "Automates recovery of LUKS volume key" @classmethod def getDeps(cls): return set(["root"]) def __init__(self, *args, **kwargs): Plugin.__init__(self, *args, **kwargs) self._ui = volume_key.UI() self._ui.generic_cb = self._vk_ui_generic_cb self._ui.passphrase_cb = self._vk_ui_passphrase_cb def _vk_ui_generic_cb(self, prompt, echo): prompt = "%s:" % (prompt,) if echo: r = self._reporting.text_question_wait(prompt, origin = self) else: r = self._reporting.password_question_wait(prompt, origin = self) if r == '': return None return r def _vk_ui_passphrase_cb(self, prompt, unused_failed_attempts): r = self._reporting.password_question_wait("%s:" % (prompt,), origin = self) if r == '': return None return r def findEncryptedVolumes(self): # Note: This assumes "the system" has automatically set up RAID, LVM # etc. to make the encrypted volumes visible. self._volumes = collections.defaultdict(list) with open("/proc/partitions") as f: for line in f.readlines()[2:]: (major, minor, blocks, name) = line.split() if name.startswith("ram"): continue path = "/dev/%s" % name try: vol = volume_key.Volume.open(path) except RuntimeError, e: self._reporting.info("Error examining %s: %s" % (path, str(e)), origin = self, importance = logging.DEBUG) else: if vol.format in (volume_key.VOLUME_FORMAT_LUKS,): self._volumes[vol.uuid].append(path) if len(self._volumes) == 0: self._reporting.info("No encrypted volume found", origin = self) self._result = ReturnFailure return self._result = ReturnSuccess def openPacket(self): pack = self._openPacket() if pack is None: return candidates = self._getCandidates(pack) volume_path = self._chooseCandidate(candidates) if volume_path is None: return self._packet = pack self._volume_path = volume_path self._result = ReturnSuccess def _openPacket(self): """Ask for a packet and open it. Return a packet if OK (setting self._packet_path), None otherwise. """ packet_path = self._reporting.filename_question_wait \ ("Select an escrow packet:", origin = self) try: with open(packet_path, "rb") as f: packet_data = f.read() except IOError, e: self._reporting.error("Error reading %s: %s" % (packet_path, str(e)), origin = self) return None try: fmt = volume_key.Packet.get_format(packet_data) # In particular, PACKET_FORMAT_ASSYMETRIC requries a NSS db and is # not supported if fmt not in (volume_key.PACKET_FORMAT_CLEARTEXT, volume_key.PACKET_FORMAT_PASSPHRASE): self._reporting.error("Unsupported format %s of %s" % (fmt, packet_path), origin = self) return None pack = volume_key.Packet.open(packet_data, self._ui) except RuntimeError, e: self._reporting.error("Error opening %s: %s" % (packet_path, str(e)), origin = self) return None self._packet_path = packet_path return pack def _getCandidates(self, pack): """Return a list of volume paths that match pack.""" candidates = [] for path in self._volumes[pack.uuid]: try: vol = volume_key.Volume.open(path) (unused_r, unused_warnings) = pack.packet_match_volume(vol) except RuntimeError: # volume_key.Volume.open() passed previously, so this exception # probably means the packet did not match pass else: candidates.append(path) return candidates def _chooseCandidate(self, candidates): """Let the user choose a volume out of candidates. Return volume path, or None on error. """ if len(candidates) == 0: self._reporting.error("Packet %s does not match any known volume" % (self._packet_path,), origin = self) self._result = ReturnFailure return None elif len(candidates) == 1: volume_path = candidates[0] if not self._reporting.choice_question_wait \ ("This packet can be applied to volume %s. Continue?" % volume_path, ((True, "Yes"), (False, "No")), origin = self): self._result = ReturnFailure return None else: # This really shouldn't happen, that's what UUIDs are for... choices = [(p, p) for p in candidates] choices.append((None, "Cancel operation")) volume_path = self._reporting.choice_question_wait \ ("Which volume do you want to apply this packet to?", choices, origin = self) if volume_path is None: self._result = ReturnFailure return None return volume_path def addPassphrase(self): try: vol = volume_key.Volume.open(self._volume_path) except RuntimeError, e: self._reporting.error("Error opening %s: %s" % (self._volume_path, str(e)), origin = self) return try: (r, warnings) = self._packet.packet_match_volume(vol) except RuntimeError, e: self._reporting.error("%s does not match %s: %s" % (self._packet_path, self._volume_path, str(e)), origin = self) return if (r == volume_key.PACKET_MATCH_UNSURE and not self._reporting.choice_question_wait \ ("%s perhaps does not match %s:\n%s\nAre you sure you want " " to use this packet?" % (self._packet_path, self._volume_path, "\n".join(warnings)), ((True, "Yes"), (False, "No")), origin = self)): self._result = ReturnFailure return self._reporting.info("Applying packet to set up a new passphrase...", origin = self) try: vol.apply_packet(self._packet, volume_key.SECRET_DEFAULT, self._ui) except RuntimeError, e: self._reporting.error("Error applying %s to %s: %s" % (self._packet_path, self._volume_path, str(e)), origin = self) self._result = ReturnFailure return self._reporting.info("Packet applied succesfully...", origin = self) self._result = ReturnSuccess return def get_plugin(): return KeyRecoveryPlugin