summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Heimes <cheimes@redhat.com>2017-02-09 14:55:45 +0100
committerMartin Basti <mbasti@redhat.com>2017-03-02 14:45:00 +0100
commit22d7492c94837342a559c368454c223f566490ac (patch)
treeb134cd427432932f5969a666371357fa4aff43c5
parentbc1f60b3ba74032cb0895e154e02971aa380a6b3 (diff)
downloadfreeipa-22d7492c94837342a559c368454c223f566490ac.tar.gz
freeipa-22d7492c94837342a559c368454c223f566490ac.tar.xz
freeipa-22d7492c94837342a559c368454c223f566490ac.zip
Cleanup certdb
* use with statement to open/close files * prefer fchmod/fchown when a file descriptor is available * set permission before data is written to file Signed-off-by: Christian Heimes <cheimes@redhat.com> Reviewed-By: Stanislav Laznicka <slaznick@redhat.com>
-rw-r--r--ipaserver/install/certs.py117
1 files changed, 55 insertions, 62 deletions
diff --git a/ipaserver/install/certs.py b/ipaserver/install/certs.py
index 4f978012f..660da795f 100644
--- a/ipaserver/install/certs.py
+++ b/ipaserver/install/certs.py
@@ -117,10 +117,12 @@ class CertDB(object):
self.host_name = host_name
self.ca_subject = ca_subject
self.subject_base = subject_base
+
try:
- self.cwd = os.getcwd()
+ self.cwd = os.path.abspath(os.getcwd())
except OSError as e:
- raise RuntimeError("Unable to determine the current directory: %s" % str(e))
+ raise RuntimeError(
+ "Unable to determine the current directory: %s" % str(e))
self.cacert_name = get_ca_nickname(self.realm)
@@ -163,6 +165,8 @@ class CertDB(object):
def __del__(self):
if self.reqdir is not None:
shutil.rmtree(self.reqdir, ignore_errors=True)
+ self.reqdir = None
+ self.nssdb.close()
try:
os.chdir(self.cwd)
except OSError:
@@ -187,16 +191,16 @@ class CertDB(object):
# sure we are in a unique place when this happens
os.chdir(self.reqdir)
- def set_perms(self, fname, write=False, uid=None):
- if uid:
- pent = pwd.getpwnam(uid)
- os.chown(fname, pent.pw_uid, pent.pw_gid)
- else:
- os.chown(fname, self.uid, self.gid)
+ def set_perms(self, fname, write=False):
perms = stat.S_IRUSR
if write:
perms |= stat.S_IWUSR
- os.chmod(fname, perms)
+ if hasattr(fname, 'fileno'):
+ os.fchown(fname.fileno(), self.uid, self.gid)
+ os.fchmod(fname.fileno(), perms)
+ else:
+ os.chown(fname, self.uid, self.gid)
+ os.chmod(fname, perms)
def run_certutil(self, args, stdin=None, **kwargs):
return self.nssdb.run_certutil(args, stdin, **kwargs)
@@ -212,19 +216,18 @@ class CertDB(object):
def create_noise_file(self):
if ipautil.file_exists(self.noise_fname):
os.remove(self.noise_fname)
- f = open(self.noise_fname, "w")
- f.write(ipautil.ipa_generate_password())
- self.set_perms(self.noise_fname)
+ with open(self.noise_fname, "w") as f:
+ self.set_perms(f)
+ f.write(ipautil.ipa_generate_password())
def create_passwd_file(self, passwd=None):
ipautil.backup_file(self.passwd_fname)
- f = open(self.passwd_fname, "w")
- if passwd is not None:
- f.write("%s\n" % passwd)
- else:
- f.write(ipautil.ipa_generate_password())
- f.close()
- self.set_perms(self.passwd_fname)
+ with open(self.passwd_fname, "w") as f:
+ self.set_perms(f)
+ if passwd is not None:
+ f.write("%s\n" % passwd)
+ else:
+ f.write(ipautil.ipa_generate_password())
def create_certdbs(self):
self.nssdb.create_db(user=self.user, group=self.group, mode=self.mode,
@@ -262,13 +265,13 @@ class CertDB(object):
# export the CA cert for use with other apps
ipautil.backup_file(cacert_fname)
root_nicknames = self.find_root_cert(nickname)[:-1]
- fd = open(cacert_fname, "w")
- for root in root_nicknames:
- result = self.run_certutil(["-L", "-n", root, "-a"],
- capture_output=True)
- fd.write(result.output)
- fd.close()
- os.chmod(cacert_fname, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
+ with open(cacert_fname, "w") as f:
+ os.fchmod(f.fileno(), stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
+ for root in root_nicknames:
+ result = self.run_certutil(["-L", "-n", root, "-a"],
+ capture_output=True)
+ f.write(result.output)
+
if create_pkcs12:
ipautil.backup_file(self.pk12_fname)
ipautil.run([paths.PK12UTIL, "-d", self.secdir,
@@ -283,9 +286,8 @@ class CertDB(object):
Load all the certificates from a given file. It is assumed that
this file creates CA certificates.
"""
- fd = open(cacert_fname)
- certs = fd.read()
- fd.close()
+ with open(cacert_fname) as f:
+ certs = f.read()
st = 0
while True:
@@ -360,14 +362,14 @@ class CertDB(object):
if subject is None:
subject=DN(('CN', hostname), self.subject_base)
self.request_cert(subject, san_dnsnames=[hostname])
- self.issue_server_cert(self.certreq_fname, self.certder_fname)
- self.import_cert(self.certder_fname, nickname)
- fd = open(self.certder_fname, "r")
- dercert = fd.read()
- fd.close()
-
- os.unlink(self.certreq_fname)
- os.unlink(self.certder_fname)
+ try:
+ self.issue_server_cert(self.certreq_fname, self.certder_fname)
+ self.import_cert(self.certder_fname, nickname)
+ with open(self.certder_fname, "r") as f:
+ dercert = f.read()
+ finally:
+ os.unlink(self.certreq_fname)
+ os.unlink(self.certder_fname)
return dercert
@@ -397,10 +399,8 @@ class CertDB(object):
if self.host_name is None:
raise RuntimeError("CA Host is not set.")
- f = open(certreq_fname, "r")
- csr = f.readlines()
- f.close()
- csr = "".join(csr)
+ with open(certreq_fname, "r") as f:
+ csr = f.read()
# We just want the CSR bits, make sure there is nothing else
csr = pkcs10.strip_header(csr)
@@ -442,9 +442,8 @@ class CertDB(object):
# Write the certificate to a file. It will be imported in a later
# step. This file will be read later to be imported.
- f = open(cert_fname, "w")
- f.write(cert)
- f.close()
+ with open(cert_fname, "w") as f:
+ f.write(cert)
def issue_signing_cert(self, certreq_fname, cert_fname):
self.setup_cert_request()
@@ -452,10 +451,8 @@ class CertDB(object):
if self.host_name is None:
raise RuntimeError("CA Host is not set.")
- f = open(certreq_fname, "r")
- csr = f.readlines()
- f.close()
- csr = "".join(csr)
+ with open(certreq_fname, "r") as f:
+ csr = f.read()
# We just want the CSR bits, make sure there is no thing else
csr = pkcs10.strip_header(csr)
@@ -489,9 +486,8 @@ class CertDB(object):
# Write the certificate to a file. It will be imported in a later
# step. This file will be read later to be imported.
- f = open(cert_fname, "w")
- f.write(cert)
- f.close()
+ with open(cert_fname, "w") as f:
+ f.write(cert)
def add_cert(self, cert, nick, flags, pem=False):
self.nssdb.add_cert(cert, nick, flags, pem)
@@ -514,13 +510,11 @@ class CertDB(object):
This is the format of Directory Server pin files.
"""
ipautil.backup_file(self.pin_fname)
- f = open(self.pin_fname, "w")
- f.write("Internal (Software) Token:")
- pwdfile = open(self.passwd_fname)
- f.write(pwdfile.read())
- f.close()
- pwdfile.close()
- self.set_perms(self.pin_fname)
+ with open(self.pin_fname, "w") as pinfile:
+ self.set_perms(pinfile)
+ pinfile.write("Internal (Software) Token:")
+ with open(self.passwd_fname) as pwdfile:
+ pinfile.write(pwdfile.read())
def find_root_cert(self, nickname):
"""
@@ -563,10 +557,9 @@ class CertDB(object):
if ipautil.file_exists(self.certdb_fname):
# We already have a cert db, see if it is for the same CA.
# If it is we leave things as they are.
- f = open(cacert_fname, "r")
- newca = f.readlines()
- f.close()
- newca = "".join(newca)
+ with open(cacert_fname, "r") as f:
+ newca = f.read()
+
newca, _st = find_cert_from_txt(newca)
cacert = self.get_cert_from_db(self.cacert_name)