# msgqueue.py: Persistent message queue bassed on files # # Licensed under the new-BSD license (http://www.opensource.org/licenses/bsd-license.php) # Copyright (C) 2010 Red Hat, Inc. # Written by Colin Walters import os import sys import hashlib import json import glib import gio class Message(object): def __init__(self, ident, headers, payload): self.headers = headers self.payload = payload if ident is None: (self.ident, serialized) = self.serialize() else: self.ident = ident def serialize(self): serialized = json.dumps({'headers': self.headers, 'payload': self.payload}) digest = hashlib.md5() digest.update(serialized) hexdigest = digest.hexdigest() return (hexdigest, serialized) def __cmp__(self, other): return cmp(self.ident, other.ident) @classmethod def parse(cls, md5sum, stream): contents = json.load(stream) if 'headers' in contents: headers = contents['headers'] else: headers = {} if 'payload' in contents: payload = contents['payload'] else: raise ValueError("Missing 'payload' in message %r" % (string, )) return cls(md5sum, headers, payload) class MessageQueue(object): def __init__(self, dirpath): self._dirpath = dirpath if not os.path.isdir(self._dirpath): os.makedirs(self._dirpath) self._dir_gfile = gio.File(path=dirpath) self._monitor = self._dir_gfile.monitor(gio.FILE_MONITOR_NONE) self._monitor.connect('changed', self._on_dir_changed) self._subscribers = [] self._consumed = [] def connect(self, callback): self._subscribers.append(callback) glib.idle_add(self._reprocess_queue) def consume(self, message): self._consumed.append(message) def append(self, message): (hexdigest, serialized) = message.serialize() filename = os.path.join(self._dirpath, hexdigest) temp_filename = os.path.join(self._dirpath, '_' + hexdigest) f = open(temp_filename, 'w') f.write(serialized) f.close() os.rename(temp_filename, filename) def _on_dir_changed(self, mon, gfile, other, event): self._reprocess_queue() def _reprocess_queue(self): dir_contents = os.listdir(self._dirpath) messages = set() for filename in dir_contents: if filename.startswith('_'): continue file_path = os.path.join(self._dirpath, filename) if not os.path.isfile(file_path): continue f = open(file_path) message = Message.parse(filename, f) f.close() messages.add(message) if len(messages) == 0: return self._consumed = [] for subscriber in self._subscribers: subscriber(self, iter(messages)) for msg in self._consumed: messages.remove(msg) os.unlink(os.path.join(self._dirpath, msg.ident)) self._consumed = []