diff options
author | Colin Walters <walters@verbum.org> | 2010-10-03 12:55:07 -0400 |
---|---|---|
committer | Colin Walters <walters@verbum.org> | 2010-10-03 12:55:07 -0400 |
commit | db8df4a040883b631ff3c719c453246df9085776 (patch) | |
tree | 66e00f6726f6b0cdb612b00d069bba49fdd2bc0c /rpmci/rpmci_vcs_mirror_main.py | |
parent | 77ad7371c5906b83db49de309ba39d867b673975 (diff) | |
download | rpmci-db8df4a040883b631ff3c719c453246df9085776.tar.gz rpmci-db8df4a040883b631ff3c719c453246df9085776.tar.xz rpmci-db8df4a040883b631ff3c719c453246df9085776.zip |
rpmci-vcs-mirror: Basically appears to work
Diffstat (limited to 'rpmci/rpmci_vcs_mirror_main.py')
-rw-r--r-- | rpmci/rpmci_vcs_mirror_main.py | 239 |
1 files changed, 239 insertions, 0 deletions
diff --git a/rpmci/rpmci_vcs_mirror_main.py b/rpmci/rpmci_vcs_mirror_main.py new file mode 100644 index 0000000..85615a3 --- /dev/null +++ b/rpmci/rpmci_vcs_mirror_main.py @@ -0,0 +1,239 @@ +#!/usr/bin/python + +# rpmci_vcs_mirror_main.py: +# Implementation of rpm-vcs-mirror +# +# Licensed under the new-BSD license (http://www.opensource.org/licenses/bsd-license.php) +# Copyright (C) 2010 Red Hat, Inc. +# Written by Colin Walters <walters@verbum.org> + +import os +import sys +import time +import shutil +import optparse +from ConfigParser import SafeConfigParser +import logging +import urllib + +import glib +import gobject +import gio + +from . import msgqueue +from . import lame_vcs_abstraction + +class VCSMirror(object): + def __init__(self, config, urls): + self.config = config + self.urls = urls + + self._dir = config.get('VCS', 'mirror_dir') + self._process_dir = config.get('VCS', 'process_logdir') + if not os.path.isdir(self._process_dir): + os.makedirs(self._process_dir) + self._timeout_seconds = int(config.get('VCS', 'poll_seconds')) + self._max_host_concurrency = int(config.get('VCS', 'max_host_concurrency')) + self._max_concurrency = int(config.get('VCS', 'max_concurrency')) + msgqueue_dir = config.get('VCS', 'msgqueue') + self._msgqueue = msgqueue.MessageQueue(msgqueue_dir) + + self._vcslist = [] + for url in urls: + url = url.strip() + self._vcslist.append(lame_vcs_abstraction.Vcs.new_from_spec(url)) + + self._vcs_by_host = {} + for vcs in self._vcslist: + host = self._host_for_vcs(vcs) + if not host in self._vcs_by_host: + self._vcs_by_host[host] = [] + self._vcs_by_host[host].append(vcs) + + self._vcs_queue = [] + self._jobs_by_host = {} # string host -> Job + self._active_queue_timeout_seconds = 0 + self._active_queue_timeout_id = 0 + + def _host_for_vcs(self, vcs): + return vcs.get_url().netloc + + def _escape_vcs_url(self, vcs): + return urllib.quote(vcs.get_base_url_string(), '') + + def _cachedir_for_vcs(self, vcs): + return os.path.join(self._dir, self._escape_vcs_url(vcs)) + + def start(self): + logging.info("Starting poll of %d repositories on %d unique hosts" + % (len(self._vcslist), len(list(self._vcs_by_host.iterkeys())))) + + # Schedule jobs for now + for vcs in self._vcslist: + self._vcs_queue.append((vcs, 0)) + for host in self._vcs_by_host: + self._jobs_by_host[host] = [] + self._active_queue_timeout_seconds = self._timeout_seconds + self._poll() + + def _on_job_exited(self, process, condition): + vcs = process.__vcs + text = "VCS %r task %r exited with code %d" % (vcs, process, condition) + failed = condition != 0 + if failed: + logging.warning(text) + else: + logging.info(text) + host = self._host_for_vcs(vcs) + jobs = self._jobs_by_host[host] + previous_id = None + index = -1 + for i, (iter_process, previous_id) in enumerate(jobs): + if iter_process == process: + index = i + break + assert index >= 0 + del jobs[i] + + if not failed: + vcsdir = vcs.get_directory() + if vcsdir.endswith('.tmp'): + newname = vcsdir[:-4] + os.rename(vcsdir, newname) + vcs.set_directory(newname) + logging.info("vcs %r: Checkout complete in %r" % (vcs, newname)) + current_id = vcs.get_id() + if current_id != previous_id: + logging.info("vcs %r: New commit id %r differs from previous %r" % (vcs, current_id, previous_id)) + msg = msgqueue.Message(None, {'type': 'update'}, {'id': current_id}) + self._msgqueue.append(msg) + else: + logging.info("No changes in %r from previous commit id %r" % (vcs, previous_id)) + + target_time = int(time.time() + self._timeout_seconds) + self._vcs_queue.append((vcs, target_time)) + + self._poll() + + def _num_active_jobs(self): + active = 0 + for host in self._jobs_by_host: + active += len(self._jobs_by_host[host]) + return active + + def _job_for_vcs(self, vcs): + for job in self._jobs_by_host[self._host_for_vcs(vcs)]: + (iter_process, previous_id) = job + iter_vcs = iter_process.__vcs + if iter_vcs == vcs: + return job + return None + + def _adjust_timeout(self): + current_time = int(time.time()) + timeout = self._active_queue_timeout_seconds + for (vcs, target_time) in self._vcs_queue: + time_delta = target_time - current_time + if time_delta > 0: + if time_delta < timeout: + timeout = time_delta + continue + if timeout < self._active_queue_timeout_seconds: + logging.info("Rescheduling poll for %d seconds" % (timeout, )) + if self._active_queue_timeout_id > 0: + glib.source_remove(self._active_queue_timeout_id) + self._active_queue_timeout_seconds = timeout + self._active_queue_timeout_id = glib.timeout_add_seconds(timeout, self._poll) + + + def _poll(self): + current_time = int(time.time()) + + logging.info("Doing poll (%d active tasks)" % (self._num_active_jobs(), )) + logging.debug("Queue: %r" % (self._vcs_queue, )) + processed = [] + for vcs, target_time in self._vcs_queue: + active = self._job_for_vcs(vcs) + assert active is None + + if target_time > current_time: + continue + + host = self._host_for_vcs(vcs) + host_job_count = len(self._jobs_by_host[host]) + if host_job_count >= self._max_host_concurrency: + logging.debug("Skipping job %r; at concurrency limit %d" + % (vcs, self._max_host_concurrency)) + continue + else: + logging.debug("Have %d active jobs for host %r" + % (host_job_count, host)) + + vcsdir = self._cachedir_for_vcs(vcs) + job_logname = '%s-update-%s-%d.log' % (vcs.vcstype, self._escape_vcs_url(vcs), + int(time.time()), ) + job_logpath = os.path.join(self._process_dir, job_logname) + if not os.path.isdir(vcsdir): + previous_id = None + logging.info("Doing initial checkout for %r" % (vcs.get_base_url_string(), )) + vcs_tempdir = vcsdir + '.tmp' + process = vcs.checkout_async(vcs_tempdir, job_logpath, self._on_job_exited) + else: + vcs.set_directory(vcsdir) + previous_id = vcs.get_id() + logging.info("Doing update for %r (starting from commit id %s)" % (vcs.get_base_url_string(), previous_id)) + process = vcs.update_async(job_logpath, self._on_job_exited) + process.__vcs = vcs + self._jobs_by_host[host].append((process, previous_id)) + processed.append(vcs) + while processed: + vcs = processed[0] + del processed[0] + index = -1 + for i, (iter_vcs, target_time) in enumerate(self._vcs_queue): + if iter_vcs == vcs: + index = i + break + assert index >= 0 + del self._vcs_queue[index] + + self._adjust_timeout() + + return False + + +def main(): + glib.threads_init() + + opts = optparse.OptionParser("usage: %prog [options]") + opts.add_option('-c', '--config', dest='config', help="Path to configuration file") + opts.add_option('', '--debug', action='store_true', help="Print verbose debugging") + + (options, args) = opts.parse_args() + + if options.config is None: + print "Must specify --config" + sys.exit(1) + + config = SafeConfigParser({'home': os.environ['HOME']}) + config.read(options.config) + level = logging.DEBUG if options.debug else logging.INFO + logging.basicConfig(stream=sys.stderr, level=level) + + mirrordir = config.get('VCS', 'mirror_dir') + + vcslist_path = os.path.join(mirrordir, 'vcs.txt') + + if not os.path.isfile(vcslist_path): + print "Missing mirror configuration file %r" % (vcslist_path, ) + sys.exit(1) + + f = open(vcslist_path) + urls = f.readlines() + f.close() + + mirror = VCSMirror(config, urls) + mirror.start() + + loop = glib.MainLoop() + loop.run() |