diff options
Diffstat (limited to 'msgqueue.py')
-rw-r--r-- | msgqueue.py | 87 |
1 files changed, 87 insertions, 0 deletions
diff --git a/msgqueue.py b/msgqueue.py new file mode 100644 index 0000000..737740f --- /dev/null +++ b/msgqueue.py @@ -0,0 +1,87 @@ +# msgqueue.py: Persistent message queue bassed on files +# +# Licensed under the new-BSD license (http://www.opensource.org/licenses/bsd-license.php) +# Copyright (C) 2010 Red Hat, Inc. +# Written by Colin Walters <walters@verbum.org> + +import gio +import hashlib +import json + +class Message(object): + def __init__(self, ident, headers, payload): + self.ident = ident + self.headers = headers + self.payload = payload + + def serialize(self): + return json.dumps({'headers': self.headers, + 'payload': self.payload}) + + def __cmp__(self, other): + return cmp(self.ident, other.ident) + + @classmethod + def parse(cls, md5sum, stream): + contents = json.load(stream) + if 'headers' in contents: + headers = contents['headers'] + else: + headers = {} + if 'payload' in contents: + payload = contents['payload'] + else: + raise ValueError("Missing 'payload' in message %r" % (string, )) + return cls(md5sum, headers, payload) + +class MessageQueue(object): + def __init__(self, dirpath): + self._dirpath = dirpath + self._dir_gfile = gio.File(path=dirpath) + self._monitor = self._dir_gfile.monitor(gio.FILE_MONITOR_NONE) + self._monitor.connect('changed', self._on_dir_changed) + self._subscribers = [] + self._consumed = [] + + def connect(self, callback): + self._subscribers.append(callback) + + def consume(self, message): + self._consumed.append(message) + + def append(self, message): + serialized = message.serialize() + digest = hashlib.md5() + digest.update(serialized) + hexdigest = digest.hexdigest() + filename = os.path.join(self._dirpath, hexdigest) + temp_filename = os.path.join(self._dirpath, '_' + hexdigest) + f = open(temp_filename, 'w') + f.write(serialized) + f.close() + os.rename(temp_filename, filename) + + def _on_dir_changed(self, mon, gfile, other, event): + dir_contents = os.listdir(self._dirpath) + messages = set() + for filename in dir_contents: + if filename.startswith('_'): + continue + file_path = os.path.join(self._dirpath, filename) + if not os.path.isfile(file_path): + continue + f = open(file_path) + message = Message.parse(filename, f) + f.close() + messages.add(message) + + self._consumed = [] + for subscriber in self._subscribers: + subscriber(iter(messages)) + for msg in self._consumed: + messages.remove(msg) + self._consumed = [] + + + + |