summaryrefslogtreecommitdiffstats
path: root/rpmci/msgqueue.py
blob: 746a1d26a989023ceb02fa994a553dd032e47935 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# msgqueue.py: Persistent message queue bassed on files
#
# Licensed under the new-BSD license (http://www.opensource.org/licenses/bsd-license.php)
# Copyright (C) 2010 Red Hat, Inc.
# Written by Colin Walters <walters@verbum.org>

import os
import sys

import hashlib
import json

import glib
import gio

class Message(object):
    """A message is a JSON-serialized blob with a header dictionary
and a payload dictionary.  It has a MD5 sum which acts as a unique ID."""
    def __init__(self, ident, headers, payload):
        self.headers = headers
        self.payload = payload
        if ident is None:
            (self.ident, serialized) = self.serialize()
        else:
            self.ident = ident

    def serialize(self):
        serialized = json.dumps({'headers': self.headers,
                                 'payload': self.payload})
        digest = hashlib.md5()
        digest.update(serialized)
        hexdigest = digest.hexdigest()
        return (hexdigest, serialized)

    def __cmp__(self, other):
        return cmp(self.ident, other.ident)

    @classmethod
    def parse(cls, md5sum, stream):
        contents = json.load(stream)
        if 'headers' in contents:
            headers = contents['headers']
        else:
            headers = {}
        if 'payload' in contents:
            payload = contents['payload']
        else:
            raise ValueError("Missing 'payload' in message %r" % (string, ))
        return cls(md5sum, headers, payload)

class MessageQueue(object):
    """A message queue is multi producer, multi consumer.
Consumers subscribe, and are handed messages.  If a particular
consumer wants a message, it calls consume().  Otherwise, it
remains in the queue. 

BUGS:
* No effort is made to order of the queue
* Consumers will be repeatedly handed messages even if they previously didn't want them.
* We re-read the directory and messages a lot.

In general, think of this as a stub class which needs
replacing with something Amazon SQS like.
"""
    def __init__(self, dirpath):
        self._dirpath = dirpath
        if not os.path.isdir(self._dirpath):
            os.makedirs(self._dirpath)
        self._dir_gfile = gio.File(path=dirpath)
        self._monitor = self._dir_gfile.monitor(gio.FILE_MONITOR_NONE)
        self._monitor.connect('changed', self._on_dir_changed)
        self._subscribers = []
        self._consumed = []

    def connect(self, callback):
        self._subscribers.append(callback)
        glib.idle_add(self._reprocess_queue)

    def consume(self, message):
        self._consumed.append(message)

    def append(self, message):
        (hexdigest, serialized) = message.serialize()
        filename = os.path.join(self._dirpath, hexdigest)
        temp_filename = os.path.join(self._dirpath, '_' + hexdigest)
        f = open(temp_filename, 'w')
        f.write(serialized)
        f.close()
        os.rename(temp_filename, filename)

    def _on_dir_changed(self, mon, gfile, other, event):
        self._reprocess_queue()

    def _reprocess_queue(self):
        dir_contents = os.listdir(self._dirpath)
        messages = set()
        for filename in dir_contents:
            if filename.startswith('_'):
                continue
            file_path = os.path.join(self._dirpath, filename)
            if not os.path.isfile(file_path):
                continue
            f = open(file_path)
            message = Message.parse(filename, f)
            f.close()
            messages.add(message)

        if len(messages) == 0:
            return
            
        self._consumed = []
        for subscriber in self._subscribers:
            subscriber(self, iter(messages))

            for msg in self._consumed:
                messages.remove(msg)
                os.unlink(os.path.join(self._dirpath, msg.ident))
            self._consumed = []