diff options
author | Colin Walters <walters@verbum.org> | 2010-10-03 12:55:07 -0400 |
---|---|---|
committer | Colin Walters <walters@verbum.org> | 2010-10-03 12:55:07 -0400 |
commit | db8df4a040883b631ff3c719c453246df9085776 (patch) | |
tree | 66e00f6726f6b0cdb612b00d069bba49fdd2bc0c /msgqueue.py | |
parent | 77ad7371c5906b83db49de309ba39d867b673975 (diff) | |
download | rpmci-db8df4a040883b631ff3c719c453246df9085776.tar.gz rpmci-db8df4a040883b631ff3c719c453246df9085776.tar.xz rpmci-db8df4a040883b631ff3c719c453246df9085776.zip |
rpmci-vcs-mirror: Basically appears to work
Diffstat (limited to 'msgqueue.py')
-rw-r--r-- | msgqueue.py | 87 |
1 files changed, 0 insertions, 87 deletions
diff --git a/msgqueue.py b/msgqueue.py deleted file mode 100644 index 737740f..0000000 --- a/msgqueue.py +++ /dev/null @@ -1,87 +0,0 @@ -# msgqueue.py: Persistent message queue bassed on files -# -# Licensed under the new-BSD license (http://www.opensource.org/licenses/bsd-license.php) -# Copyright (C) 2010 Red Hat, Inc. -# Written by Colin Walters <walters@verbum.org> - -import gio -import hashlib -import json - -class Message(object): - def __init__(self, ident, headers, payload): - self.ident = ident - self.headers = headers - self.payload = payload - - def serialize(self): - return json.dumps({'headers': self.headers, - 'payload': self.payload}) - - def __cmp__(self, other): - return cmp(self.ident, other.ident) - - @classmethod - def parse(cls, md5sum, stream): - contents = json.load(stream) - if 'headers' in contents: - headers = contents['headers'] - else: - headers = {} - if 'payload' in contents: - payload = contents['payload'] - else: - raise ValueError("Missing 'payload' in message %r" % (string, )) - return cls(md5sum, headers, payload) - -class MessageQueue(object): - def __init__(self, dirpath): - self._dirpath = dirpath - self._dir_gfile = gio.File(path=dirpath) - self._monitor = self._dir_gfile.monitor(gio.FILE_MONITOR_NONE) - self._monitor.connect('changed', self._on_dir_changed) - self._subscribers = [] - self._consumed = [] - - def connect(self, callback): - self._subscribers.append(callback) - - def consume(self, message): - self._consumed.append(message) - - def append(self, message): - serialized = message.serialize() - digest = hashlib.md5() - digest.update(serialized) - hexdigest = digest.hexdigest() - filename = os.path.join(self._dirpath, hexdigest) - temp_filename = os.path.join(self._dirpath, '_' + hexdigest) - f = open(temp_filename, 'w') - f.write(serialized) - f.close() - os.rename(temp_filename, filename) - - def _on_dir_changed(self, mon, gfile, other, event): - dir_contents = os.listdir(self._dirpath) - messages = set() - for filename in dir_contents: - if filename.startswith('_'): - continue - file_path = os.path.join(self._dirpath, filename) - if not os.path.isfile(file_path): - continue - f = open(file_path) - message = Message.parse(filename, f) - f.close() - messages.add(message) - - self._consumed = [] - for subscriber in self._subscribers: - subscriber(iter(messages)) - for msg in self._consumed: - messages.remove(msg) - self._consumed = [] - - - - |