1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
# msgqueue.py: Persistent message queue bassed on files
#
# Licensed under the new-BSD license (http://www.opensource.org/licenses/bsd-license.php)
# Copyright (C) 2010 Red Hat, Inc.
# Written by Colin Walters <walters@verbum.org>
import os
import sys
import hashlib
import json
import glib
import gio
class Message(object):
"""A message is a JSON-serialized blob with a header dictionary
and a payload dictionary. It has a MD5 sum which acts as a unique ID."""
def __init__(self, ident, headers, payload):
self.headers = headers
self.payload = payload
if ident is None:
(self.ident, serialized) = self.serialize()
else:
self.ident = ident
def serialize(self):
serialized = json.dumps({'headers': self.headers,
'payload': self.payload})
digest = hashlib.md5()
digest.update(serialized)
hexdigest = digest.hexdigest()
return (hexdigest, serialized)
def __cmp__(self, other):
return cmp(self.ident, other.ident)
@classmethod
def parse(cls, md5sum, stream):
contents = json.load(stream)
if 'headers' in contents:
headers = contents['headers']
else:
headers = {}
if 'payload' in contents:
payload = contents['payload']
else:
raise ValueError("Missing 'payload' in message %r" % (string, ))
return cls(md5sum, headers, payload)
class MessageQueue(object):
"""A message queue is multi producer, multi consumer.
Consumers subscribe, and are handed messages. If a particular
consumer wants a message, it calls consume(). Otherwise, it
remains in the queue.
BUGS:
* No effort is made to order of the queue
* Consumers will be repeatedly handed messages even if they previously didn't want them.
* We re-read the directory and messages a lot.
In general, think of this as a stub class which needs
replacing with something Amazon SQS like.
"""
def __init__(self, dirpath):
self._dirpath = dirpath
if not os.path.isdir(self._dirpath):
os.makedirs(self._dirpath)
self._dir_gfile = gio.File(path=dirpath)
self._monitor = self._dir_gfile.monitor(gio.FILE_MONITOR_NONE)
self._monitor.connect('changed', self._on_dir_changed)
self._subscribers = []
self._consumed = []
def connect(self, callback):
self._subscribers.append(callback)
glib.idle_add(self._reprocess_queue)
def consume(self, message):
self._consumed.append(message)
def append(self, message):
(hexdigest, serialized) = message.serialize()
filename = os.path.join(self._dirpath, hexdigest)
temp_filename = os.path.join(self._dirpath, '_' + hexdigest)
f = open(temp_filename, 'w')
f.write(serialized)
f.close()
os.rename(temp_filename, filename)
def _on_dir_changed(self, mon, gfile, other, event):
self._reprocess_queue()
def _reprocess_queue(self):
dir_contents = os.listdir(self._dirpath)
messages = set()
for filename in dir_contents:
if filename.startswith('_'):
continue
file_path = os.path.join(self._dirpath, filename)
if not os.path.isfile(file_path):
continue
f = open(file_path)
message = Message.parse(filename, f)
f.close()
messages.add(message)
if len(messages) == 0:
return
self._consumed = []
for subscriber in self._subscribers:
subscriber(self, iter(messages))
for msg in self._consumed:
messages.remove(msg)
os.unlink(os.path.join(self._dirpath, msg.ident))
self._consumed = []
|