From 1365aadb0242dd7635f37911d7afa57093b6ce87 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Thu, 7 Oct 2010 13:39:59 -0400 Subject: subtask: New wrapper for logging subprocess output consistently Assign taskids, rotating old logs. --- rpmci/dynrepo.py | 10 ++---- rpmci/rpmci_spec_vcs_main.py | 1 - rpmci/rpmci_srpm_builder_main.py | 22 +++++++------ rpmci/rpmci_vcs_mirror_main.py | 16 +++++----- rpmci/subtask.py | 69 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 93 insertions(+), 25 deletions(-) create mode 100644 rpmci/subtask.py (limited to 'rpmci') diff --git a/rpmci/dynrepo.py b/rpmci/dynrepo.py index 8db1a8d..2b8199a 100644 --- a/rpmci/dynrepo.py +++ b/rpmci/dynrepo.py @@ -10,7 +10,6 @@ import os import sys import logging -import subprocess import glib import gio @@ -18,7 +17,7 @@ import rpm import rpmUtils import rpmUtils.miscutils -from . import async_subprocess +from . import subtask class Repo(object): def __init__(self, dirpath): @@ -51,17 +50,14 @@ class Repo(object): os.link(srpm_path, os.path.join(self._dir, basename)) def _delete_old_rpms_in_dir(self, dirpath): - proc = subprocess.Popen(['repomanage', '-o', '.'], stdout=subprocess.PIPE, - stderr=sys.stderr, - cwd=dirpath) - output = proc.communicate()[0] + output = subtask.spawn_sync_get_output('srpm-repomanage', ['repomanage', '-o', '.'], cwd=dirpath) for line in output.split('\n'): if line.endswith('.rpm') and os.path.exists(line): os.unlink(line) def update_repo_sync(self): self._delete_old_rpms_in_dir(self._dir) - subprocess.check_call(['createrepo', '.'], cwd=self._dir) + subtask.spawn_sync('srpm-createrepo', ['createrepo', '.'], cwd=self._dir) def _headers_from_packages(self, rpmlist): result = {} diff --git a/rpmci/rpmci_spec_vcs_main.py b/rpmci/rpmci_spec_vcs_main.py index 1e0ded2..c444e35 100644 --- a/rpmci/rpmci_spec_vcs_main.py +++ b/rpmci/rpmci_spec_vcs_main.py @@ -13,7 +13,6 @@ import time import shutil import optparse import logging -import subprocess import datetime import glib diff --git a/rpmci/rpmci_srpm_builder_main.py b/rpmci/rpmci_srpm_builder_main.py index 1796d8b..d0e554e 100644 --- a/rpmci/rpmci_srpm_builder_main.py +++ b/rpmci/rpmci_srpm_builder_main.py @@ -23,6 +23,7 @@ import gobject import gio from . import msgqueue +from . import subtask from . import dynrepo from . import artifact from . import spec @@ -69,9 +70,10 @@ class SRPMBuilder(object): '--define', '_rpmdir ' + src_dirpath] def _on_vcs_message(self, q, messages): - num_msgs = len(messages) + msg_list = list(messages) + num_msgs = len(msg_list) logging.debug("Starting processing of %d messages" % (num_msgs, )) - for msg in messages: + for msg in msg_list: url = msg.payload['url'] logging.debug("Processing update for VCS url %r" % (url, )) self._handle_vcs_message_url(q, url) @@ -119,19 +121,18 @@ class SRPMBuilder(object): target_spec_obj.save() exec_basedir = os.path.dirname(sys.argv[0]) + args = [os.path.abspath(os.path.join(exec_basedir, 'rpmci-spec-vcs')), '--vcsdir=' + upstream_vcs.get_directory(), 'set-revision', 'HEAD'] - print "Synchronously executing: %r" % (args, ) - subprocess.check_call(args, - stdout=sys.stdout, stderr=sys.stderr, - cwd=fedora_dir) + subtask.spawn_sync('rpmci-spec-vcs-%s' % (target.module, ), + args, cwd=fedora_dir) + args = ['rpmbuild'] args.extend(self._get_base_rpmbuild_args(fedora_dir)) args.extend(['-bs', specname]) - print "Synchronously executing: %r" % (args, ) - subprocess.check_call(args, cwd=fedora_dir, stdout=sys.stdout, - stderr=sys.stderr) + subtask.spawn_sync('rpmbuild-srpm-%s' % (target.module, ), + args, cwd=fedora_dir) srpm_path = None srpm_basename = None for filename in os.listdir(fedora_dir): @@ -163,8 +164,11 @@ def main(): level = logging.DEBUG if options.debug else logging.INFO logging.basicConfig(stream=sys.stderr, level=level) + subtask.global_configure(config) + builder = SRPMBuilder(options, config) builder.start() + logging.info("Awaiting events") loop = glib.MainLoop() loop.run() diff --git a/rpmci/rpmci_vcs_mirror_main.py b/rpmci/rpmci_vcs_mirror_main.py index 51c706b..db24e58 100644 --- a/rpmci/rpmci_vcs_mirror_main.py +++ b/rpmci/rpmci_vcs_mirror_main.py @@ -21,6 +21,7 @@ import gobject import gio from . import msgqueue +from . import subtask from . import lame_vcs_abstraction class VCSMirror(object): @@ -31,9 +32,6 @@ class VCSMirror(object): self._options = options self._dir = config.get('VCS', 'mirror_dir') - self._process_dir = config.get('VCS', 'process_logdir') - if not os.path.isdir(self._process_dir): - os.makedirs(self._process_dir) self._timeout_seconds = int(config.get('VCS', 'poll_seconds')) self._max_host_concurrency = int(config.get('VCS', 'max_host_concurrency')) self._max_concurrency = int(config.get('VCS', 'max_concurrency')) @@ -180,16 +178,16 @@ class VCSMirror(object): % (host_job_count, host)) vcsdir = self._cachedir_for_vcs(vcs) - job_logname = '%s-update-%s-%d.log' % (vcs.vcstype, self._escape_vcs_url(vcs), - int(time.time()), ) - job_logpath = os.path.join(self._process_dir, job_logname) + escaped_url = self._escape_vcs_url(vcs) + taskid = 'vcs-update-%s' % (escaped_url, ) + task_logpath = subtask.prepare_task_logfile(taskid) if not os.path.isdir(vcsdir): previous_id = None logging.info("Doing initial checkout for %r" % (vcs.get_base_url_string(), )) vcs_tempdir = vcsdir + '.tmp' if os.path.isdir(vcs_tempdir): shutil.rmtree(vcs_tempdir) - process = vcs.checkout_async(vcs_tempdir, job_logpath, self._on_job_exited) + process = vcs.checkout_async(vcs_tempdir, task_logpath, self._on_job_exited) elif self._options.clone_then_exit: logging.info("Already have clone directory %s" % (vcsdir, )) processed.append(vcs) @@ -198,7 +196,7 @@ class VCSMirror(object): vcs.set_directory(vcsdir) previous_id = vcs.get_id() logging.info("Doing update for %r (starting from commit id %s)" % (vcs.get_base_url_string(), previous_id)) - process = vcs.update_async(job_logpath, self._on_job_exited) + process = vcs.update_async(task_logpath, self._on_job_exited) process.__vcs = vcs self._jobs_by_host[host].append((process, previous_id)) processed.append(vcs) @@ -249,6 +247,8 @@ def main(): level = logging.DEBUG if options.debug else logging.INFO logging.basicConfig(stream=sys.stderr, level=level) + subtask.global_configure(config) + mirrordir = config.get('VCS', 'mirror_dir') vcslist_path = os.path.join(mirrordir, 'vcs.txt') diff --git a/rpmci/subtask.py b/rpmci/subtask.py new file mode 100644 index 0000000..e97279e --- /dev/null +++ b/rpmci/subtask.py @@ -0,0 +1,69 @@ +# subprocess_msg.py: +# Wrapper for creating processes that logs their stderr to +# a given directory, rotating earlier logs. +# +# Licensed under the new-BSD license (http://www.opensource.org/licenses/bsd-license.php) +# Copyright (C) 2010 Red Hat, Inc. +# Written by Colin Walters + +import os +import shutil +import logging +import time + +import subprocess +import async_subprocess + +_base_path = None +_failed_path = None +_old_path = None +def global_configure(config): + global _base_path + global _failed_path + global _old_path + _base_path = config.get('subtask', 'subtask_dir') + _failed_path = os.path.join(_base_path, 'failed') + _old_path = os.path.join(_base_path, 'old') + if not os.path.isdir(_failed_path): + os.makedirs(_failed_path) + if not os.path.isdir(_old_path): + os.makedirs(_old_path) + +def prepare_task_logfile(taskid): + log_path = os.path.join(_base_path, '%s.log' % (taskid, )) + if os.path.isfile(log_path): + curtime = int(time.time()) + saved_name = '%s-%d.log' % (taskid, int(time.time()),) + os.rename(log_path, os.path.join(_old_path, saved_name)) + return log_path + +def _init_task_run(taskid, argv, cwd): + log_path = prepare_task_logfile(taskid) + logging.info("Running task %r synchronously, cwd=%r args=%r" % (taskid, cwd, argv)) + return log_path + +def spawn_sync(taskid, argv, cwd=None): + log_path = _init_task_run(taskid, argv, cwd) + f = open(log_path, 'w') + nullf = open(os.devnull, 'w') + try: + subprocess.check_call(argv, cwd=cwd, stdin=nullf, stdout=f, stderr=f) + except subprocess.CalledProcessError, e: + f.close() + shutil.move(log_path, _failed_path) + raise e + f.close() + +def spawn_sync_get_output(taskid, argv, cwd=None): + log_path = _init_task_run(taskid, argv, cwd) + f = open(log_path, 'w') + nullf = open(os.devnull, 'w') + try: + proc = subprocess.Popen(argv, cwd=cwd, stdin=nullf, stdout=subprocess.PIPE, stderr=f) + output = proc.communicate()[0] + except subprocess.CalledProcessError, e: + f.close() + shutil.move(log_path, _failed_path) + raise e + f.close() + return output -- cgit