#!/usr/bin/python # rpmci_vcs_mirror_main.py: # Implementation of rpm-vcs-mirror # # Licensed under the new-BSD license (http://www.opensource.org/licenses/bsd-license.php) # Copyright (C) 2010 Red Hat, Inc. # Written by Colin Walters import os import sys import time import shutil import optparse from ConfigParser import SafeConfigParser import logging import urllib import glib import gobject import gio from . import msgqueue from . import lame_vcs_abstraction class VCSMirror(object): def __init__(self, options, config, urls): self.config = config self.urls = urls self._options = options self._dir = config.get('VCS', 'mirror_dir') self._process_dir = config.get('VCS', 'process_logdir') if not os.path.isdir(self._process_dir): os.makedirs(self._process_dir) self._timeout_seconds = int(config.get('VCS', 'poll_seconds')) self._max_host_concurrency = int(config.get('VCS', 'max_host_concurrency')) self._max_concurrency = int(config.get('VCS', 'max_concurrency')) msgqueue_dir = config.get('VCS', 'msgqueue') self._msgqueue = msgqueue.MessageQueue(msgqueue_dir) self._vcslist = [] for url in urls: url = url.strip() self._vcslist.append(lame_vcs_abstraction.Vcs.new_from_spec(url)) self._vcs_by_host = {} for vcs in self._vcslist: host = self._host_for_vcs(vcs) if not host in self._vcs_by_host: self._vcs_by_host[host] = [] self._vcs_by_host[host].append(vcs) self._vcs_queue = [] self._jobs_by_host = {} # string host -> Job self._active_queue_timeout_seconds = 0 self._active_queue_timeout_id = 0 def _host_for_vcs(self, vcs): return vcs.get_url().netloc def _escape_vcs_url(self, vcs): return urllib.quote(vcs.get_url_string(), '') def _cachedir_for_vcs(self, vcs): return os.path.join(self._dir, self._escape_vcs_url(vcs)) def start(self): logging.info("Starting poll of %d repositories on %d unique hosts" % (len(self._vcslist), len(list(self._vcs_by_host.iterkeys())))) # Schedule jobs for now for vcs in self._vcslist: self._vcs_queue.append((vcs, 0)) for host in self._vcs_by_host: self._jobs_by_host[host] = [] self._active_queue_timeout_seconds = self._timeout_seconds self._poll() def _on_job_exited(self, process, condition): vcs = process.__vcs text = "VCS %r task %r exited with code %d" % (vcs, process, condition) failed = condition != 0 if failed: logging.warning(text) else: logging.info(text) host = self._host_for_vcs(vcs) jobs = self._jobs_by_host[host] previous_id = None index = -1 for i, (iter_process, previous_id) in enumerate(jobs): if iter_process == process: index = i break assert index >= 0 del jobs[i] if not failed: vcsdir = vcs.get_directory() if vcsdir.endswith('.tmp'): newname = vcsdir[:-4] os.rename(vcsdir, newname) vcs.set_directory(newname) logging.info("vcs %r: Checkout complete in %r" % (vcs, newname)) current_id = vcs.get_id() if current_id != previous_id: logging.info("vcs %r: New commit id %r differs from previous %r" % (vcs, current_id, previous_id)) msg = msgqueue.Message(None, {'type': 'update'}, {'url': vcs.get_url_string(), 'id': current_id}) self._msgqueue.append(msg) else: logging.info("No changes in %r from previous commit id %r" % (vcs, previous_id)) if not failed and not self._options.clone_then_exit: target_time = int(time.time() + self._timeout_seconds) self._vcs_queue.append((vcs, target_time)) self._poll() def _num_active_jobs(self): active = 0 for host in self._jobs_by_host: active += len(self._jobs_by_host[host]) return active def _job_for_vcs(self, vcs): for job in self._jobs_by_host[self._host_for_vcs(vcs)]: (iter_process, previous_id) = job iter_vcs = iter_process.__vcs if iter_vcs == vcs: return job return None def _adjust_timeout(self): current_time = int(time.time()) timeout = self._active_queue_timeout_seconds for (vcs, target_time) in self._vcs_queue: time_delta = target_time - current_time if time_delta > 0: if time_delta < timeout: timeout = time_delta continue if timeout < self._active_queue_timeout_seconds: logging.info("Rescheduling poll for %d seconds" % (timeout, )) if self._active_queue_timeout_id > 0: glib.source_remove(self._active_queue_timeout_id) self._active_queue_timeout_seconds = timeout self._active_queue_timeout_id = glib.timeout_add_seconds(timeout, self._poll) def _poll(self): current_time = int(time.time()) orig_active_jobs = self._num_active_jobs() logging.debug("Queue: %r" % (self._vcs_queue, )) processed = [] for vcs, target_time in self._vcs_queue: active = self._job_for_vcs(vcs) assert active is None if target_time > current_time: continue host = self._host_for_vcs(vcs) host_job_count = len(self._jobs_by_host[host]) if host_job_count >= self._max_host_concurrency: logging.debug("Skipping job %r; at concurrency limit %d" % (vcs, self._max_host_concurrency)) continue else: logging.debug("Have %d active jobs for host %r" % (host_job_count, host)) vcsdir = self._cachedir_for_vcs(vcs) job_logname = '%s-update-%s-%d.log' % (vcs.vcstype, self._escape_vcs_url(vcs), int(time.time()), ) job_logpath = os.path.join(self._process_dir, job_logname) if not os.path.isdir(vcsdir): previous_id = None logging.info("Doing initial checkout for %r" % (vcs.get_base_url_string(), )) vcs_tempdir = vcsdir + '.tmp' if os.path.isdir(vcs_tempdir): shutil.rmtree(vcs_tempdir) process = vcs.checkout_async(vcs_tempdir, job_logpath, self._on_job_exited) elif self._options.clone_then_exit: processed.append(vcs) continue else: vcs.set_directory(vcsdir) previous_id = vcs.get_id() logging.info("Doing update for %r (starting from commit id %s)" % (vcs.get_base_url_string(), previous_id)) process = vcs.update_async(job_logpath, self._on_job_exited) process.__vcs = vcs self._jobs_by_host[host].append((process, previous_id)) processed.append(vcs) added_job_count = len(processed) while processed: vcs = processed[0] del processed[0] index = -1 for i, (iter_vcs, target_time) in enumerate(self._vcs_queue): if iter_vcs == vcs: index = i break assert index >= 0 del self._vcs_queue[index] new_active_jobs = self._num_active_jobs() if len(self._vcs_queue) == 0 and new_active_jobs == 0: logging.info("Queue is empty and no active jobs. Exiting.") sys.exit(0) self._adjust_timeout() if new_active_jobs == 0: logging.info("No active jobs; sleeping for %d seconds" % (self._active_queue_timeout_seconds, )) else: logging.info("Poll complete, started %d jobs (%d total)" % (new_active_jobs - orig_active_jobs, new_active_jobs)) return False def main(): glib.threads_init() opts = optparse.OptionParser("usage: %prog [options]") opts.add_option('-c', '--config', dest='config', help="Path to configuration file") opts.add_option('', '--debug', action='store_true', help="Print verbose debugging") opts.add_option('', '--clone-then-exit', action='store_true', help="If true, perform any necessary clones, then exit") (options, args) = opts.parse_args() if options.config is None: print "Must specify --config" sys.exit(1) config = SafeConfigParser({'home': os.environ['HOME']}) config.read(options.config) level = logging.DEBUG if options.debug else logging.INFO logging.basicConfig(stream=sys.stderr, level=level) mirrordir = config.get('VCS', 'mirror_dir') vcslist_path = os.path.join(mirrordir, 'vcs.txt') if not os.path.isfile(vcslist_path): print "Missing mirror configuration file %r" % (vcslist_path, ) sys.exit(1) f = open(vcslist_path) urls = f.readlines() f.close() mirror = VCSMirror(options, config, urls) mirror.start() loop = glib.MainLoop() loop.run()