summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorColin Walters <walters@verbum.org>2010-10-03 12:55:07 -0400
committerColin Walters <walters@verbum.org>2010-10-03 12:55:07 -0400
commitdb8df4a040883b631ff3c719c453246df9085776 (patch)
tree66e00f6726f6b0cdb612b00d069bba49fdd2bc0c
parent77ad7371c5906b83db49de309ba39d867b673975 (diff)
downloadrpmci-db8df4a040883b631ff3c719c453246df9085776.tar.gz
rpmci-db8df4a040883b631ff3c719c453246df9085776.tar.xz
rpmci-db8df4a040883b631ff3c719c453246df9085776.zip
rpmci-vcs-mirror: Basically appears to work
-rw-r--r--.gitignore1
-rwxr-xr-x[-rw-r--r--]rpmci-vcs-mirror155
-rw-r--r--rpmci/__init__.py0
-rw-r--r--rpmci/async_subprocess.py (renamed from async_subprocess.py)66
-rw-r--r--rpmci/lame_vcs_abstraction.py (renamed from lame_vcs_abstraction.py)32
-rw-r--r--rpmci/msgqueue.py (renamed from msgqueue.py)37
-rw-r--r--rpmci/rpmci_vcs_mirror_main.py239
-rw-r--r--rpmci/spec.py (renamed from spec.py)0
-rw-r--r--sample.config12
-rw-r--r--tests/tests.py58
10 files changed, 396 insertions, 204 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..0d20b64
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+*.pyc
diff --git a/rpmci-vcs-mirror b/rpmci-vcs-mirror
index af114ac..e223257 100644..100755
--- a/rpmci-vcs-mirror
+++ b/rpmci-vcs-mirror
@@ -1,7 +1,7 @@
#!/usr/bin/python
# rpmci-vcs-mirror:
-# Poll set of VCS URLs, caching local repositories,
+# Poll set of VCS URLs, caching local repositories
#
# Licensed under the new-BSD license (http://www.opensource.org/licenses/bsd-license.php)
# Copyright (C) 2010 Red Hat, Inc.
@@ -9,157 +9,12 @@
import os
import sys
-import time
-import shutil
-import optparse
-from ConfigParser import SafeConfigParser
-import logging
-import glib
-import gobject
-import gio
+if os.path.isdir('.git'):
+ sys.path.insert(0, os.getcwd())
-from . import msgqueue
-from . import lame_vcs_abstraction
-
-class VCSMirror(object):
- def __init__(self, config, urls):
- self.config = config
- self.urls = urls
-
- self._dir = config.get('VCS', 'mirrordir')
- self._process_dir = config.get('DEFAULT', 'process_logdir')
- self._timeout_seconds = config.get('VCS', 'poll_seconds')
- self._max_host_concurrency = config.get('VCS', 'max_host_concurrency')
- self._max_concurrency = config.get('VCS', 'max_concurrency')
-
- self._vcslist = []
- for url in urls:
- self._vcslist.append(lame_vcs_abstraction.Vcs.new_from_spec(url))
-
- self._vcs_by_host = {}
- for vcs in self._vcslist:
- host = self._host_for_vcs(vcs)
- if not host in self._vcs_by_host:
- self._vcs_by_host[host] = []
- self._vcs_by_host[host].append(vcs)
-
- self._vcs_queue = []
- self._jobs_by_host = {} # string host -> Job
-
- def _host_for_vcs(self, vcs):
- return vcs.get_url().netloc
-
- def _escape_vcs_url(self, vcs):
- return urllib.quote(vcs.get_url(), '')
-
- def _cachedir_for_vcs(self, vcs):
- return os.path.join(self._dir, self._escape_vcs_url(vcs))
-
- def start(self):
- logging.info("Starting poll of %d repositories on %d unique hosts"
- % (len(self._vcslist), len(self._vcs_by_host.iterkeys())))
-
- # Schedule jobs for now
- for vcs in self._vcslist:
- self._vcs_queue.append((vcs, 0))
- for host in self._vcs_by_host:
- self._jobs_by_host[host] = []
- self._poll()
-
- def _on_vcs_job_exited(self, vcs, process, condition):
- text = "VCS %r task %r exited with code %d" % (vcs, process, condition)
- if condition != 0:
- logging.warning(text)
- else:
- logging.info(text)
- host = self._host_for_vcs(vcs)
- jobs = self._jobs_by_host[host]
- index = -1
- for i, (process, _vcs) in enumerate(jobs):
- if _vcs == vcs:
- index = i
- break
- assert index >= 0
- del jobs[i]
-
- self._poll()
-
- target_time = int(time.time() + self._timeout_seconds)
- self._vcs_queue.append((vcs, target_time))
-
- def _num_active_jobs(self):
- active = 0
- for host in self._jobs_by_host:
- active += len(self._jobs_by_host[host])
- return active
-
- def _poll(self):
- current_time = int(time.time())
- timeout = self._timeout_seconds
- timeout_id = 0
-
- logging.info("Doing poll (%d active tasks)" % (self._num_active_jobs(), ))
- for (vcs, target_time) in self._vcs_queue:
- time_delta = target_time - current_time:
- if time_delta > 0 and time_delta < timeout:
- timeout = time_delta
- host = self._host_for_vcs(vcs)
- if len(self._jobs_by_host[host]) >= self._max_host_concurrency:
- logging.debug("Skipping job %r; at concurrency limit %d"
- % (vcs, self._max_host_concurrency))
- continue
-
- vcsdir = self._cachedir_for_vcs(vcs)
- job_logname = '%s-update-%s-%d.log' % (vcs.vcstype, self._escape_vcs_url(vcs),
- int(time.time()), )
- job_logpath = os.path.join(self._process_dir, job_logname)
- def _on_job_exited(process, condition):
- self._on_vcs_job_exited(vcs, process, condition)
- if not os.path.isdir(vcsdir):
- previous_id = None
- logging.info("Doing initial checkout for %r" % (vcs.get_url(), ))
- process = vcs.checkout_async(vcsdir, job_logpath, _on_job_exited)
- else:
- previous_id = vcs.get_id()
- logging.info("Doing update for %r (starting from commit id %s)" % (vcs.get_url(), previous_id))
- process = vcs.update_async(job_logpath, _on_job_exited)
- self._jobs_by_host[host].append((process, vcs, previous_id))
- glib.timeout_add_seconds(timeout, self._poll)
- return False
-
-
-def main():
- opts = optparse.OptionParser("usage: %prog [options]")
- opts.add_option('-c', '--config', dest='config', help="Path to configuration file")
- opts.add_option('', '--debug', dest='debug', help="Print verbose debugging")
-
- (options, args) = opts.parse_args()
-
- if options.config is None:
- print "Must specify --config"
- sys.exit(1)
-
- config = SafeConfigParser.read(options.config)
- level = logging.DEBUG if options.debug else logging.INFO
- logging.basicConfig(stream=sys.stderr, level=level)
-
- mirrordir = config.get('VCS', 'mirror_dir')
-
- vcslist = os.path.join(mirrordir, 'vcs.txt')
-
- if not os.path.isfile(vcslist):
- print "Missing mirror configuration file %r" % (vcslist, )
- sys.exit(1)
-
- f = open(options.vcs_url_list)
- urls = f.readlines()
- f.close()
-
- mirror = VCSMirror(config, urls)
- mirror.start()
-
- glib.main()
+import rpmci
+from rpmci.rpmci_vcs_mirror_main import main
if __name__ == '__main__':
main()
diff --git a/rpmci/__init__.py b/rpmci/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/rpmci/__init__.py
diff --git a/async_subprocess.py b/rpmci/async_subprocess.py
index 53abaa1..7988724 100644
--- a/async_subprocess.py
+++ b/rpmci/async_subprocess.py
@@ -5,9 +5,11 @@
# Copyright (C) 2010 Red Hat, Inc.
# Written by Colin Walters <walters@verbum.org>
-import glib
+import os
import logging
+import glib
+
PIPE_STDOUT = 'pipe-stdout'
def spawn_async_log_info(argv, cwd=None, stdout=None):
@@ -17,44 +19,60 @@ def spawn_async_log_info(argv, cwd=None, stdout=None):
def spawn_async_output_to_file(argv, output_filepath, on_exited, cwd=None):
f = open(output_filepath, 'w')
process = AsyncProcess(argv, cwd=cwd, stdout=PIPE_STDOUT)
- def _write_to_file(process, buf):
- f.write(buf)
- def _on_exited(process):
- f.close()
- on_exited()
- process.set_callbacks(_write_to_file, _on_exited)
+ def _on_io(process, buf):
+ if buf is not None:
+ f.write(buf)
+ else:
+ f.close()
+ def _on_exited(process, condition):
+ on_exited(process, condition)
+ process.set_callbacks(_on_io, _on_exited)
return process
class AsyncProcess(object):
def __init__(self, argv, cwd=None, stdout=None):
self.argv = argv
+ target_cwd = cwd or os.getcwd()
+
if stdout == PIPE_STDOUT:
+ (self.stdout, self._pipeout) = os.pipe()
(pid, stdin, stdout, stderr) = glib.spawn_async(argv,
- working_directory=cwd,
+ working_directory=target_cwd,
+ child_setup=self._child_setup_pipes,
+ user_data=None,
flags=(glib.SPAWN_SEARCH_PATH
- | glib.SPAWN_DO_NOT_REAP_CHILD),
- standard_output=True,
- standard_error=None)
- assert stdout == stderr
+ | glib.SPAWN_DO_NOT_REAP_CHILD))
+ os.close(self._pipeout)
else:
(pid, stdin, stdout, stderr) = glib.spawn_async(argv,
- working_directory=cwd,
- (glib.SPAWN_SEARCH_PATH
- | glib.SPAWN_DO_NOT_REAP_CHILD
- | glib.SPAWN_STDOUT_TO_DEV_NULL
- | glib.SPAWN_STDERR_TO_DEV_NULL))
+ working_directory=target_cwd,
+ child_setup=None,
+ user_data=None,
+ flags=(glib.SPAWN_SEARCH_PATH
+ | glib.SPAWN_DO_NOT_REAP_CHILD
+ | glib.SPAWN_STDOUT_TO_DEV_NULL
+ | glib.SPAWN_STDERR_TO_DEV_NULL))
+ self.stdout = None
assert stdout is None
assert stderr is None
assert stdin is None
self.pid = pid
self._child_watch_id = glib.child_watch_add(pid, self._on_child_exited)
- self._stdout_fd = stdout
+ self._output_callback = None
+ self._exited_callback = None
+
+ def _child_setup_pipes(self, *args):
+ # Sadly we have to do a child setup function to get merged stdout/stderr
+ os.dup2(self._pipeout, 1)
+ os.dup2(self._pipeout, 2)
def set_callbacks(self, output_callback, exited_callback):
+ assert self._output_callback is None
+ assert self._exited_callback is None
self._output_callback = output_callback
self._exited_callback = exited_callback
- if (self.stdout and self._stdout_watch_id == 0):
+ if self.stdout is not None:
glib.io_add_watch(self.stdout, glib.IO_IN | glib.IO_ERR | glib.IO_HUP,
self._on_io)
@@ -63,19 +81,13 @@ class AsyncProcess(object):
self._exited_callback(self, condition)
def _on_io(self, source, condition):
- have_read = condition & gobject.IO_IN
+ have_read = condition & glib.IO_IN
if have_read:
buf = os.read(source, 8192)
self._output_callback(self, buf)
if ((condition & glib.IO_ERR) > 0
or (condition & glib.IO_HUP) > 0):
+ self._output_callback(self, None)
os.close(source)
- self._stdout_watch_id = 0
return False
return have_read
-
-
-
-
-
-
diff --git a/lame_vcs_abstraction.py b/rpmci/lame_vcs_abstraction.py
index 67a3c1d..a96f23d 100644
--- a/lame_vcs_abstraction.py
+++ b/rpmci/lame_vcs_abstraction.py
@@ -15,12 +15,12 @@ import getopt
import subprocess
import shutil
import hashlib
-import logger
+import logging
from . import async_subprocess
class Vcs(object):
- def __init__(self, parsedurl):
+ def __init__(self, parsedurl, directory=None):
self._parsed_url = parsedurl
# Deliberately drop params/query
self._nonfragment_url_string = urlparse.urlunparse((parsedurl.scheme,
@@ -29,16 +29,26 @@ class Vcs(object):
'', '', ''))
self._branch = self._parsed_url.fragment
- self._dir = None
+ self._dir = directory
def get_url(self):
return self._parsed_url
+ def get_base_url_string(self):
+ return self._nonfragment_url_string
+
def checkout_async(self, destdir):
"""Retrieve a new copy of the source tree, saving as destdir"""
raise Exception("not implemented")
+
+ def set_directory(self, dirpath):
+ """Set the checkout directory."""
+ self._dir = dirpath
+
+ def get_directory(self):
+ return self._dir
- def update(self):
+ def update_async(self):
"""Update directory from the latest upstream"""
raise Exception("not implemented")
@@ -56,11 +66,11 @@ class Vcs(object):
raise Exception("not implemented")
def _vcs_exec_sync_log_error(self, args):
- logger.info("Synchronously executing: %r" % (args, ))
+ logging.info("Synchronously executing: %r in cwd %r" % (args, self._dir))
return subprocess.check_output(args, stderr=subprocess.STDOUT, close_fds=True, cwd=self._dir)
def _vcs_exec_async(self, args, logfile_path, on_exited):
- logger.info("Asynchronously executing: %r" % (args, ))
+ logging.info("Asynchronously executing: %r in cwd %r" % (args, self._dir))
return async_subprocess.spawn_async_output_to_file(args, logfile_path, on_exited, cwd=self._dir)
@classmethod
@@ -90,7 +100,11 @@ class GitVcs(Vcs):
if self._branch:
args.extend(['-b', self._branch])
args.extend([self._nonfragment_url_string, destdir])
- return self._vcs_exec_async(args, logfile, on_exited)
+ def _wrap_on_exited(process, condition):
+ if condition == 0:
+ self._dir = destdir
+ on_exited(process, condition)
+ return self._vcs_exec_async(args, logfile, _wrap_on_exited)
def update_async(self, logfile, on_exited):
assert self._dir is not None
@@ -100,7 +114,7 @@ class GitVcs(Vcs):
if not prefix.endswith('/'):
prefix += '/'
args = ['git', 'archive', '--format=tar', '--prefix=%s' % (prefix,), 'HEAD']
- logger.info("Synchronously executing: %r" % (args, ))
+ logging.info("Synchronously executing: %r" % (args, ))
log_f = open(logfile, 'w')
gitproc = subprocess.Popen(args, cwd=src_directory, stdout=subprocess.PIPE, stderr=log_f)
if target_filename.endswith('.bz2'):
@@ -110,7 +124,7 @@ class GitVcs(Vcs):
else:
raise ValueError("Unknown compression for filename %r" % (target_filename,))
args = [zipbin, '-c']
- logger.info("Synchronously executing: %r" % (args, ))
+ logging.info("Synchronously executing: %r" % (args, ))
f = open(target_filename, 'w')
zipproc = subprocess.Popen(args, cwd=src_directory, stdout=f, stdin=gitproc.stdout, stderr=log_f)
zipproc.wait()
diff --git a/msgqueue.py b/rpmci/msgqueue.py
index 737740f..7148773 100644
--- a/msgqueue.py
+++ b/rpmci/msgqueue.py
@@ -4,19 +4,30 @@
# Copyright (C) 2010 Red Hat, Inc.
# Written by Colin Walters <walters@verbum.org>
-import gio
+import os
+import sys
+
import hashlib
import json
+import gio
+
class Message(object):
def __init__(self, ident, headers, payload):
- self.ident = ident
self.headers = headers
self.payload = payload
+ if ident is None:
+ (self.ident, serialized) = self.serialize()
+ else:
+ self.ident = ident
def serialize(self):
- return json.dumps({'headers': self.headers,
- 'payload': self.payload})
+ serialized = json.dumps({'headers': self.headers,
+ 'payload': self.payload})
+ digest = hashlib.md5()
+ digest.update(serialized)
+ hexdigest = digest.hexdigest()
+ return (hexdigest, serialized)
def __cmp__(self, other):
return cmp(self.ident, other.ident)
@@ -37,6 +48,8 @@ class Message(object):
class MessageQueue(object):
def __init__(self, dirpath):
self._dirpath = dirpath
+ if not os.path.isdir(self._dirpath):
+ os.makedirs(self._dirpath)
self._dir_gfile = gio.File(path=dirpath)
self._monitor = self._dir_gfile.monitor(gio.FILE_MONITOR_NONE)
self._monitor.connect('changed', self._on_dir_changed)
@@ -50,10 +63,7 @@ class MessageQueue(object):
self._consumed.append(message)
def append(self, message):
- serialized = message.serialize()
- digest = hashlib.md5()
- digest.update(serialized)
- hexdigest = digest.hexdigest()
+ (hexdigest, serialized) = message.serialize()
filename = os.path.join(self._dirpath, hexdigest)
temp_filename = os.path.join(self._dirpath, '_' + hexdigest)
f = open(temp_filename, 'w')
@@ -74,14 +84,15 @@ class MessageQueue(object):
message = Message.parse(filename, f)
f.close()
messages.add(message)
+
+ if len(messages) == 0:
+ return
self._consumed = []
for subscriber in self._subscribers:
- subscriber(iter(messages))
+ subscriber(self, iter(messages))
+
for msg in self._consumed:
messages.remove(msg)
+ os.unlink(os.path.join(self._dirpath, msg.ident))
self._consumed = []
-
-
-
-
diff --git a/rpmci/rpmci_vcs_mirror_main.py b/rpmci/rpmci_vcs_mirror_main.py
new file mode 100644
index 0000000..85615a3
--- /dev/null
+++ b/rpmci/rpmci_vcs_mirror_main.py
@@ -0,0 +1,239 @@
+#!/usr/bin/python
+
+# rpmci_vcs_mirror_main.py:
+# Implementation of rpm-vcs-mirror
+#
+# Licensed under the new-BSD license (http://www.opensource.org/licenses/bsd-license.php)
+# Copyright (C) 2010 Red Hat, Inc.
+# Written by Colin Walters <walters@verbum.org>
+
+import os
+import sys
+import time
+import shutil
+import optparse
+from ConfigParser import SafeConfigParser
+import logging
+import urllib
+
+import glib
+import gobject
+import gio
+
+from . import msgqueue
+from . import lame_vcs_abstraction
+
+class VCSMirror(object):
+ def __init__(self, config, urls):
+ self.config = config
+ self.urls = urls
+
+ self._dir = config.get('VCS', 'mirror_dir')
+ self._process_dir = config.get('VCS', 'process_logdir')
+ if not os.path.isdir(self._process_dir):
+ os.makedirs(self._process_dir)
+ self._timeout_seconds = int(config.get('VCS', 'poll_seconds'))
+ self._max_host_concurrency = int(config.get('VCS', 'max_host_concurrency'))
+ self._max_concurrency = int(config.get('VCS', 'max_concurrency'))
+ msgqueue_dir = config.get('VCS', 'msgqueue')
+ self._msgqueue = msgqueue.MessageQueue(msgqueue_dir)
+
+ self._vcslist = []
+ for url in urls:
+ url = url.strip()
+ self._vcslist.append(lame_vcs_abstraction.Vcs.new_from_spec(url))
+
+ self._vcs_by_host = {}
+ for vcs in self._vcslist:
+ host = self._host_for_vcs(vcs)
+ if not host in self._vcs_by_host:
+ self._vcs_by_host[host] = []
+ self._vcs_by_host[host].append(vcs)
+
+ self._vcs_queue = []
+ self._jobs_by_host = {} # string host -> Job
+ self._active_queue_timeout_seconds = 0
+ self._active_queue_timeout_id = 0
+
+ def _host_for_vcs(self, vcs):
+ return vcs.get_url().netloc
+
+ def _escape_vcs_url(self, vcs):
+ return urllib.quote(vcs.get_base_url_string(), '')
+
+ def _cachedir_for_vcs(self, vcs):
+ return os.path.join(self._dir, self._escape_vcs_url(vcs))
+
+ def start(self):
+ logging.info("Starting poll of %d repositories on %d unique hosts"
+ % (len(self._vcslist), len(list(self._vcs_by_host.iterkeys()))))
+
+ # Schedule jobs for now
+ for vcs in self._vcslist:
+ self._vcs_queue.append((vcs, 0))
+ for host in self._vcs_by_host:
+ self._jobs_by_host[host] = []
+ self._active_queue_timeout_seconds = self._timeout_seconds
+ self._poll()
+
+ def _on_job_exited(self, process, condition):
+ vcs = process.__vcs
+ text = "VCS %r task %r exited with code %d" % (vcs, process, condition)
+ failed = condition != 0
+ if failed:
+ logging.warning(text)
+ else:
+ logging.info(text)
+ host = self._host_for_vcs(vcs)
+ jobs = self._jobs_by_host[host]
+ previous_id = None
+ index = -1
+ for i, (iter_process, previous_id) in enumerate(jobs):
+ if iter_process == process:
+ index = i
+ break
+ assert index >= 0
+ del jobs[i]
+
+ if not failed:
+ vcsdir = vcs.get_directory()
+ if vcsdir.endswith('.tmp'):
+ newname = vcsdir[:-4]
+ os.rename(vcsdir, newname)
+ vcs.set_directory(newname)
+ logging.info("vcs %r: Checkout complete in %r" % (vcs, newname))
+ current_id = vcs.get_id()
+ if current_id != previous_id:
+ logging.info("vcs %r: New commit id %r differs from previous %r" % (vcs, current_id, previous_id))
+ msg = msgqueue.Message(None, {'type': 'update'}, {'id': current_id})
+ self._msgqueue.append(msg)
+ else:
+ logging.info("No changes in %r from previous commit id %r" % (vcs, previous_id))
+
+ target_time = int(time.time() + self._timeout_seconds)
+ self._vcs_queue.append((vcs, target_time))
+
+ self._poll()
+
+ def _num_active_jobs(self):
+ active = 0
+ for host in self._jobs_by_host:
+ active += len(self._jobs_by_host[host])
+ return active
+
+ def _job_for_vcs(self, vcs):
+ for job in self._jobs_by_host[self._host_for_vcs(vcs)]:
+ (iter_process, previous_id) = job
+ iter_vcs = iter_process.__vcs
+ if iter_vcs == vcs:
+ return job
+ return None
+
+ def _adjust_timeout(self):
+ current_time = int(time.time())
+ timeout = self._active_queue_timeout_seconds
+ for (vcs, target_time) in self._vcs_queue:
+ time_delta = target_time - current_time
+ if time_delta > 0:
+ if time_delta < timeout:
+ timeout = time_delta
+ continue
+ if timeout < self._active_queue_timeout_seconds:
+ logging.info("Rescheduling poll for %d seconds" % (timeout, ))
+ if self._active_queue_timeout_id > 0:
+ glib.source_remove(self._active_queue_timeout_id)
+ self._active_queue_timeout_seconds = timeout
+ self._active_queue_timeout_id = glib.timeout_add_seconds(timeout, self._poll)
+
+
+ def _poll(self):
+ current_time = int(time.time())
+
+ logging.info("Doing poll (%d active tasks)" % (self._num_active_jobs(), ))
+ logging.debug("Queue: %r" % (self._vcs_queue, ))
+ processed = []
+ for vcs, target_time in self._vcs_queue:
+ active = self._job_for_vcs(vcs)
+ assert active is None
+
+ if target_time > current_time:
+ continue
+
+ host = self._host_for_vcs(vcs)
+ host_job_count = len(self._jobs_by_host[host])
+ if host_job_count >= self._max_host_concurrency:
+ logging.debug("Skipping job %r; at concurrency limit %d"
+ % (vcs, self._max_host_concurrency))
+ continue
+ else:
+ logging.debug("Have %d active jobs for host %r"
+ % (host_job_count, host))
+
+ vcsdir = self._cachedir_for_vcs(vcs)
+ job_logname = '%s-update-%s-%d.log' % (vcs.vcstype, self._escape_vcs_url(vcs),
+ int(time.time()), )
+ job_logpath = os.path.join(self._process_dir, job_logname)
+ if not os.path.isdir(vcsdir):
+ previous_id = None
+ logging.info("Doing initial checkout for %r" % (vcs.get_base_url_string(), ))
+ vcs_tempdir = vcsdir + '.tmp'
+ process = vcs.checkout_async(vcs_tempdir, job_logpath, self._on_job_exited)
+ else:
+ vcs.set_directory(vcsdir)
+ previous_id = vcs.get_id()
+ logging.info("Doing update for %r (starting from commit id %s)" % (vcs.get_base_url_string(), previous_id))
+ process = vcs.update_async(job_logpath, self._on_job_exited)
+ process.__vcs = vcs
+ self._jobs_by_host[host].append((process, previous_id))
+ processed.append(vcs)
+ while processed:
+ vcs = processed[0]
+ del processed[0]
+ index = -1
+ for i, (iter_vcs, target_time) in enumerate(self._vcs_queue):
+ if iter_vcs == vcs:
+ index = i
+ break
+ assert index >= 0
+ del self._vcs_queue[index]
+
+ self._adjust_timeout()
+
+ return False
+
+
+def main():
+ glib.threads_init()
+
+ opts = optparse.OptionParser("usage: %prog [options]")
+ opts.add_option('-c', '--config', dest='config', help="Path to configuration file")
+ opts.add_option('', '--debug', action='store_true', help="Print verbose debugging")
+
+ (options, args) = opts.parse_args()
+
+ if options.config is None:
+ print "Must specify --config"
+ sys.exit(1)
+
+ config = SafeConfigParser({'home': os.environ['HOME']})
+ config.read(options.config)
+ level = logging.DEBUG if options.debug else logging.INFO
+ logging.basicConfig(stream=sys.stderr, level=level)
+
+ mirrordir = config.get('VCS', 'mirror_dir')
+
+ vcslist_path = os.path.join(mirrordir, 'vcs.txt')
+
+ if not os.path.isfile(vcslist_path):
+ print "Missing mirror configuration file %r" % (vcslist_path, )
+ sys.exit(1)
+
+ f = open(vcslist_path)
+ urls = f.readlines()
+ f.close()
+
+ mirror = VCSMirror(config, urls)
+ mirror.start()
+
+ loop = glib.MainLoop()
+ loop.run()
diff --git a/spec.py b/rpmci/spec.py
index 40ef6af..40ef6af 100644
--- a/spec.py
+++ b/rpmci/spec.py
diff --git a/sample.config b/sample.config
index f7c977f..afeeda0 100644
--- a/sample.config
+++ b/sample.config
@@ -1,9 +1,11 @@
-[DEFAULT]
-basedir=%(home)/rpmci
-process_logdir=%(basedir)/process-logs
-
[VCS]
-mirrordir=%(basedir)/vcs-mirror
+# Standard stuff
+basedir=%(home)s/rpmci
+msg_basedir=%(basedir)s/msgqueue
+process_logdir=%(basedir)s/process-logs
+
+msgqueue=%(msg_basedir)s/vcs-queue
+mirror_dir=%(basedir)s/vcs-mirror
# Minimum seconds between poll requests for a respository
poll_seconds=60
# Max connections to a given host like "git.gnome.org"
diff --git a/tests/tests.py b/tests/tests.py
new file mode 100644
index 0000000..6ff7846
--- /dev/null
+++ b/tests/tests.py
@@ -0,0 +1,58 @@
+import os
+import sys
+import unittest
+import shutil
+import tempfile
+
+import glib
+
+if os.path.isdir('.git'):
+ sys.path.insert(0, os.getcwd())
+
+from rpmci import async_subprocess
+from rpmci import msgqueue
+
+class BaseTestCase(unittest.TestCase):
+ def setUp(self):
+ self._tmpdir = tempfile.mkdtemp()
+ self._loop = glib.MainLoop()
+
+ def tearDown(self):
+ del self._loop
+ shutil.rmtree(self._tmpdir)
+
+class TestAsyncSubprocess(BaseTestCase):
+ def testEchoAsync(self):
+ temp_filepath = os.path.join(self._tmpdir, 'echo-output')
+ def _on_exited(process, condition):
+ self._loop.quit()
+ if not os.path.isfile(temp_filepath):
+ raise AssertionError("Couldn't find output file %r" % (temp_filepath, ))
+ f = open(temp_filepath)
+ contents = f.read()
+ f.close()
+ self.assertEquals(contents, 'hello\n')
+ async_echo = async_subprocess.spawn_async_output_to_file(['echo', 'hello'], temp_filepath, _on_exited)
+ self._loop.run()
+
+class TestMsgQueue(BaseTestCase):
+ def testWriteReadMessage(self):
+ baseq_dir_path = os.path.join(self._tmpdir, 'msgq')
+ q = msgqueue.MessageQueue(baseq_dir_path)
+ def _on_message(queue, messages):
+ self._loop.quit()
+ msgs = list(messages)
+ for msg in msgs:
+ queue.consume(msg)
+ self.assertEquals(len(msgs), 1)
+ msg = msgs[0]
+ self.assertEquals(msg.headers['type'], 'update')
+ self.assertEquals(msg.payload['id'], 42)
+ self.assertEquals(msg.payload['otherdata'], 'foo bar baz')
+ q.connect(_on_message)
+ msg = msgqueue.Message(None, {'type': 'update'}, {'id': 42, 'otherdata': 'foo bar baz'})
+ q.append(msg)
+ self._loop.run()
+
+if __name__ == '__main__':
+ unittest.main()