summaryrefslogtreecommitdiffstats
path: root/msgqueue.py
diff options
context:
space:
mode:
authorColin Walters <walters@verbum.org>2010-10-03 12:55:07 -0400
committerColin Walters <walters@verbum.org>2010-10-03 12:55:07 -0400
commitdb8df4a040883b631ff3c719c453246df9085776 (patch)
tree66e00f6726f6b0cdb612b00d069bba49fdd2bc0c /msgqueue.py
parent77ad7371c5906b83db49de309ba39d867b673975 (diff)
downloadrpmci-db8df4a040883b631ff3c719c453246df9085776.tar.gz
rpmci-db8df4a040883b631ff3c719c453246df9085776.tar.xz
rpmci-db8df4a040883b631ff3c719c453246df9085776.zip
rpmci-vcs-mirror: Basically appears to work
Diffstat (limited to 'msgqueue.py')
-rw-r--r--msgqueue.py87
1 files changed, 0 insertions, 87 deletions
diff --git a/msgqueue.py b/msgqueue.py
deleted file mode 100644
index 737740f..0000000
--- a/msgqueue.py
+++ /dev/null
@@ -1,87 +0,0 @@
-# msgqueue.py: Persistent message queue bassed on files
-#
-# Licensed under the new-BSD license (http://www.opensource.org/licenses/bsd-license.php)
-# Copyright (C) 2010 Red Hat, Inc.
-# Written by Colin Walters <walters@verbum.org>
-
-import gio
-import hashlib
-import json
-
-class Message(object):
- def __init__(self, ident, headers, payload):
- self.ident = ident
- self.headers = headers
- self.payload = payload
-
- def serialize(self):
- return json.dumps({'headers': self.headers,
- 'payload': self.payload})
-
- def __cmp__(self, other):
- return cmp(self.ident, other.ident)
-
- @classmethod
- def parse(cls, md5sum, stream):
- contents = json.load(stream)
- if 'headers' in contents:
- headers = contents['headers']
- else:
- headers = {}
- if 'payload' in contents:
- payload = contents['payload']
- else:
- raise ValueError("Missing 'payload' in message %r" % (string, ))
- return cls(md5sum, headers, payload)
-
-class MessageQueue(object):
- def __init__(self, dirpath):
- self._dirpath = dirpath
- self._dir_gfile = gio.File(path=dirpath)
- self._monitor = self._dir_gfile.monitor(gio.FILE_MONITOR_NONE)
- self._monitor.connect('changed', self._on_dir_changed)
- self._subscribers = []
- self._consumed = []
-
- def connect(self, callback):
- self._subscribers.append(callback)
-
- def consume(self, message):
- self._consumed.append(message)
-
- def append(self, message):
- serialized = message.serialize()
- digest = hashlib.md5()
- digest.update(serialized)
- hexdigest = digest.hexdigest()
- filename = os.path.join(self._dirpath, hexdigest)
- temp_filename = os.path.join(self._dirpath, '_' + hexdigest)
- f = open(temp_filename, 'w')
- f.write(serialized)
- f.close()
- os.rename(temp_filename, filename)
-
- def _on_dir_changed(self, mon, gfile, other, event):
- dir_contents = os.listdir(self._dirpath)
- messages = set()
- for filename in dir_contents:
- if filename.startswith('_'):
- continue
- file_path = os.path.join(self._dirpath, filename)
- if not os.path.isfile(file_path):
- continue
- f = open(file_path)
- message = Message.parse(filename, f)
- f.close()
- messages.add(message)
-
- self._consumed = []
- for subscriber in self._subscribers:
- subscriber(iter(messages))
- for msg in self._consumed:
- messages.remove(msg)
- self._consumed = []
-
-
-
-