summaryrefslogtreecommitdiffstats
path: root/msgqueue.py
diff options
context:
space:
mode:
Diffstat (limited to 'msgqueue.py')
-rw-r--r--msgqueue.py87
1 files changed, 87 insertions, 0 deletions
diff --git a/msgqueue.py b/msgqueue.py
new file mode 100644
index 0000000..737740f
--- /dev/null
+++ b/msgqueue.py
@@ -0,0 +1,87 @@
+# msgqueue.py: Persistent message queue bassed on files
+#
+# Licensed under the new-BSD license (http://www.opensource.org/licenses/bsd-license.php)
+# Copyright (C) 2010 Red Hat, Inc.
+# Written by Colin Walters <walters@verbum.org>
+
+import gio
+import hashlib
+import json
+
+class Message(object):
+ def __init__(self, ident, headers, payload):
+ self.ident = ident
+ self.headers = headers
+ self.payload = payload
+
+ def serialize(self):
+ return json.dumps({'headers': self.headers,
+ 'payload': self.payload})
+
+ def __cmp__(self, other):
+ return cmp(self.ident, other.ident)
+
+ @classmethod
+ def parse(cls, md5sum, stream):
+ contents = json.load(stream)
+ if 'headers' in contents:
+ headers = contents['headers']
+ else:
+ headers = {}
+ if 'payload' in contents:
+ payload = contents['payload']
+ else:
+ raise ValueError("Missing 'payload' in message %r" % (string, ))
+ return cls(md5sum, headers, payload)
+
+class MessageQueue(object):
+ def __init__(self, dirpath):
+ self._dirpath = dirpath
+ self._dir_gfile = gio.File(path=dirpath)
+ self._monitor = self._dir_gfile.monitor(gio.FILE_MONITOR_NONE)
+ self._monitor.connect('changed', self._on_dir_changed)
+ self._subscribers = []
+ self._consumed = []
+
+ def connect(self, callback):
+ self._subscribers.append(callback)
+
+ def consume(self, message):
+ self._consumed.append(message)
+
+ def append(self, message):
+ serialized = message.serialize()
+ digest = hashlib.md5()
+ digest.update(serialized)
+ hexdigest = digest.hexdigest()
+ filename = os.path.join(self._dirpath, hexdigest)
+ temp_filename = os.path.join(self._dirpath, '_' + hexdigest)
+ f = open(temp_filename, 'w')
+ f.write(serialized)
+ f.close()
+ os.rename(temp_filename, filename)
+
+ def _on_dir_changed(self, mon, gfile, other, event):
+ dir_contents = os.listdir(self._dirpath)
+ messages = set()
+ for filename in dir_contents:
+ if filename.startswith('_'):
+ continue
+ file_path = os.path.join(self._dirpath, filename)
+ if not os.path.isfile(file_path):
+ continue
+ f = open(file_path)
+ message = Message.parse(filename, f)
+ f.close()
+ messages.add(message)
+
+ self._consumed = []
+ for subscriber in self._subscribers:
+ subscriber(iter(messages))
+ for msg in self._consumed:
+ messages.remove(msg)
+ self._consumed = []
+
+
+
+