summaryrefslogtreecommitdiffstats
path: root/rpmci/rpmci_vcs_mirror_main.py
diff options
context:
space:
mode:
authorColin Walters <walters@verbum.org>2010-10-03 12:55:07 -0400
committerColin Walters <walters@verbum.org>2010-10-03 12:55:07 -0400
commitdb8df4a040883b631ff3c719c453246df9085776 (patch)
tree66e00f6726f6b0cdb612b00d069bba49fdd2bc0c /rpmci/rpmci_vcs_mirror_main.py
parent77ad7371c5906b83db49de309ba39d867b673975 (diff)
downloadrpmci-db8df4a040883b631ff3c719c453246df9085776.tar.gz
rpmci-db8df4a040883b631ff3c719c453246df9085776.tar.xz
rpmci-db8df4a040883b631ff3c719c453246df9085776.zip
rpmci-vcs-mirror: Basically appears to work
Diffstat (limited to 'rpmci/rpmci_vcs_mirror_main.py')
-rw-r--r--rpmci/rpmci_vcs_mirror_main.py239
1 files changed, 239 insertions, 0 deletions
diff --git a/rpmci/rpmci_vcs_mirror_main.py b/rpmci/rpmci_vcs_mirror_main.py
new file mode 100644
index 0000000..85615a3
--- /dev/null
+++ b/rpmci/rpmci_vcs_mirror_main.py
@@ -0,0 +1,239 @@
+#!/usr/bin/python
+
+# rpmci_vcs_mirror_main.py:
+# Implementation of rpm-vcs-mirror
+#
+# Licensed under the new-BSD license (http://www.opensource.org/licenses/bsd-license.php)
+# Copyright (C) 2010 Red Hat, Inc.
+# Written by Colin Walters <walters@verbum.org>
+
+import os
+import sys
+import time
+import shutil
+import optparse
+from ConfigParser import SafeConfigParser
+import logging
+import urllib
+
+import glib
+import gobject
+import gio
+
+from . import msgqueue
+from . import lame_vcs_abstraction
+
+class VCSMirror(object):
+ def __init__(self, config, urls):
+ self.config = config
+ self.urls = urls
+
+ self._dir = config.get('VCS', 'mirror_dir')
+ self._process_dir = config.get('VCS', 'process_logdir')
+ if not os.path.isdir(self._process_dir):
+ os.makedirs(self._process_dir)
+ self._timeout_seconds = int(config.get('VCS', 'poll_seconds'))
+ self._max_host_concurrency = int(config.get('VCS', 'max_host_concurrency'))
+ self._max_concurrency = int(config.get('VCS', 'max_concurrency'))
+ msgqueue_dir = config.get('VCS', 'msgqueue')
+ self._msgqueue = msgqueue.MessageQueue(msgqueue_dir)
+
+ self._vcslist = []
+ for url in urls:
+ url = url.strip()
+ self._vcslist.append(lame_vcs_abstraction.Vcs.new_from_spec(url))
+
+ self._vcs_by_host = {}
+ for vcs in self._vcslist:
+ host = self._host_for_vcs(vcs)
+ if not host in self._vcs_by_host:
+ self._vcs_by_host[host] = []
+ self._vcs_by_host[host].append(vcs)
+
+ self._vcs_queue = []
+ self._jobs_by_host = {} # string host -> Job
+ self._active_queue_timeout_seconds = 0
+ self._active_queue_timeout_id = 0
+
+ def _host_for_vcs(self, vcs):
+ return vcs.get_url().netloc
+
+ def _escape_vcs_url(self, vcs):
+ return urllib.quote(vcs.get_base_url_string(), '')
+
+ def _cachedir_for_vcs(self, vcs):
+ return os.path.join(self._dir, self._escape_vcs_url(vcs))
+
+ def start(self):
+ logging.info("Starting poll of %d repositories on %d unique hosts"
+ % (len(self._vcslist), len(list(self._vcs_by_host.iterkeys()))))
+
+ # Schedule jobs for now
+ for vcs in self._vcslist:
+ self._vcs_queue.append((vcs, 0))
+ for host in self._vcs_by_host:
+ self._jobs_by_host[host] = []
+ self._active_queue_timeout_seconds = self._timeout_seconds
+ self._poll()
+
+ def _on_job_exited(self, process, condition):
+ vcs = process.__vcs
+ text = "VCS %r task %r exited with code %d" % (vcs, process, condition)
+ failed = condition != 0
+ if failed:
+ logging.warning(text)
+ else:
+ logging.info(text)
+ host = self._host_for_vcs(vcs)
+ jobs = self._jobs_by_host[host]
+ previous_id = None
+ index = -1
+ for i, (iter_process, previous_id) in enumerate(jobs):
+ if iter_process == process:
+ index = i
+ break
+ assert index >= 0
+ del jobs[i]
+
+ if not failed:
+ vcsdir = vcs.get_directory()
+ if vcsdir.endswith('.tmp'):
+ newname = vcsdir[:-4]
+ os.rename(vcsdir, newname)
+ vcs.set_directory(newname)
+ logging.info("vcs %r: Checkout complete in %r" % (vcs, newname))
+ current_id = vcs.get_id()
+ if current_id != previous_id:
+ logging.info("vcs %r: New commit id %r differs from previous %r" % (vcs, current_id, previous_id))
+ msg = msgqueue.Message(None, {'type': 'update'}, {'id': current_id})
+ self._msgqueue.append(msg)
+ else:
+ logging.info("No changes in %r from previous commit id %r" % (vcs, previous_id))
+
+ target_time = int(time.time() + self._timeout_seconds)
+ self._vcs_queue.append((vcs, target_time))
+
+ self._poll()
+
+ def _num_active_jobs(self):
+ active = 0
+ for host in self._jobs_by_host:
+ active += len(self._jobs_by_host[host])
+ return active
+
+ def _job_for_vcs(self, vcs):
+ for job in self._jobs_by_host[self._host_for_vcs(vcs)]:
+ (iter_process, previous_id) = job
+ iter_vcs = iter_process.__vcs
+ if iter_vcs == vcs:
+ return job
+ return None
+
+ def _adjust_timeout(self):
+ current_time = int(time.time())
+ timeout = self._active_queue_timeout_seconds
+ for (vcs, target_time) in self._vcs_queue:
+ time_delta = target_time - current_time
+ if time_delta > 0:
+ if time_delta < timeout:
+ timeout = time_delta
+ continue
+ if timeout < self._active_queue_timeout_seconds:
+ logging.info("Rescheduling poll for %d seconds" % (timeout, ))
+ if self._active_queue_timeout_id > 0:
+ glib.source_remove(self._active_queue_timeout_id)
+ self._active_queue_timeout_seconds = timeout
+ self._active_queue_timeout_id = glib.timeout_add_seconds(timeout, self._poll)
+
+
+ def _poll(self):
+ current_time = int(time.time())
+
+ logging.info("Doing poll (%d active tasks)" % (self._num_active_jobs(), ))
+ logging.debug("Queue: %r" % (self._vcs_queue, ))
+ processed = []
+ for vcs, target_time in self._vcs_queue:
+ active = self._job_for_vcs(vcs)
+ assert active is None
+
+ if target_time > current_time:
+ continue
+
+ host = self._host_for_vcs(vcs)
+ host_job_count = len(self._jobs_by_host[host])
+ if host_job_count >= self._max_host_concurrency:
+ logging.debug("Skipping job %r; at concurrency limit %d"
+ % (vcs, self._max_host_concurrency))
+ continue
+ else:
+ logging.debug("Have %d active jobs for host %r"
+ % (host_job_count, host))
+
+ vcsdir = self._cachedir_for_vcs(vcs)
+ job_logname = '%s-update-%s-%d.log' % (vcs.vcstype, self._escape_vcs_url(vcs),
+ int(time.time()), )
+ job_logpath = os.path.join(self._process_dir, job_logname)
+ if not os.path.isdir(vcsdir):
+ previous_id = None
+ logging.info("Doing initial checkout for %r" % (vcs.get_base_url_string(), ))
+ vcs_tempdir = vcsdir + '.tmp'
+ process = vcs.checkout_async(vcs_tempdir, job_logpath, self._on_job_exited)
+ else:
+ vcs.set_directory(vcsdir)
+ previous_id = vcs.get_id()
+ logging.info("Doing update for %r (starting from commit id %s)" % (vcs.get_base_url_string(), previous_id))
+ process = vcs.update_async(job_logpath, self._on_job_exited)
+ process.__vcs = vcs
+ self._jobs_by_host[host].append((process, previous_id))
+ processed.append(vcs)
+ while processed:
+ vcs = processed[0]
+ del processed[0]
+ index = -1
+ for i, (iter_vcs, target_time) in enumerate(self._vcs_queue):
+ if iter_vcs == vcs:
+ index = i
+ break
+ assert index >= 0
+ del self._vcs_queue[index]
+
+ self._adjust_timeout()
+
+ return False
+
+
+def main():
+ glib.threads_init()
+
+ opts = optparse.OptionParser("usage: %prog [options]")
+ opts.add_option('-c', '--config', dest='config', help="Path to configuration file")
+ opts.add_option('', '--debug', action='store_true', help="Print verbose debugging")
+
+ (options, args) = opts.parse_args()
+
+ if options.config is None:
+ print "Must specify --config"
+ sys.exit(1)
+
+ config = SafeConfigParser({'home': os.environ['HOME']})
+ config.read(options.config)
+ level = logging.DEBUG if options.debug else logging.INFO
+ logging.basicConfig(stream=sys.stderr, level=level)
+
+ mirrordir = config.get('VCS', 'mirror_dir')
+
+ vcslist_path = os.path.join(mirrordir, 'vcs.txt')
+
+ if not os.path.isfile(vcslist_path):
+ print "Missing mirror configuration file %r" % (vcslist_path, )
+ sys.exit(1)
+
+ f = open(vcslist_path)
+ urls = f.readlines()
+ f.close()
+
+ mirror = VCSMirror(config, urls)
+ mirror.start()
+
+ loop = glib.MainLoop()
+ loop.run()