summaryrefslogtreecommitdiffstats
path: root/lib/puppet/indirector/queue.rb
blob: 0e9ff966cae2da7cd9a6196dc1e00b2195d4e9a7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
require 'puppet/indirector/terminus'
require 'puppet/util/queue'
require 'puppet/util'

# Implements the <tt>:queue</tt> abstract indirector terminus type, for storing
# model instances to a message queue, presumably for the purpose of out-of-process
# handling of changes related to the model.
#
# Relies upon Puppet::Util::Queue for registry and client object management,
# and specifies a default queue type of <tt>:stomp</tt>, appropriate for use with a variety of message brokers.
#
# It's up to the queue client type to instantiate itself correctly based on Puppet configuration information.
#
# A single queue client is maintained for the abstract terminus, meaning that you can only use one type
# of queue client, one message broker solution, etc., with the indirection mechanism.
#
# Per-indirection queues are assumed, based on the indirection name.  If the <tt>:catalog</tt> indirection makes
# use of this <tt>:queue</tt> terminus, queue operations work against the "catalog" queue.  It is up to the queue
# client library to handle queue creation as necessary (for a number of popular queuing solutions, queue
# creation is automatic and not a concern).
class Puppet::Indirector::Queue < Puppet::Indirector::Terminus
    extend ::Puppet::Util::Queue
    include Puppet::Util

    def initialize(*args)
        super
        raise ArgumentError, "Queueing requires pson support" unless Puppet.features.pson?
    end

    # Queue has no idiomatic "find"
    def find(request)
        nil
    end

    # Place the request on the queue
    def save(request)
        begin
            result = nil
            benchmark :info, "Queued #{indirection.name} for #{request.key}" do
                result = client.send_message(queue, request.instance.render(:pson))
            end
            result
        rescue => detail
            raise Puppet::Error, "Could not write #{request.key} to queue: #{detail}\nInstance::#{request.instance}\n client : #{client}"
        end
    end

    def self.queue
        indirection_name
    end

    def queue
        self.class.queue
    end

    # Returns the singleton queue client object.
    def client
        self.class.client
    end

    # converts the _message_ from deserialized format to an actual model instance.
    def self.intern(message)
        result = nil
        benchmark :info, "Loaded queued #{indirection.name}" do
            result = model.convert_from(:pson, message)
        end
        result
    end

    # Provides queue subscription functionality; for a given indirection, use this method on the terminus
    # to subscribe to the indirection-specific queue.  Your _block_ will be executed per new indirection
    # model received from the queue, with _obj_ being the model instance.
    def self.subscribe
        client.subscribe(queue) do |msg|
            begin
                yield(self.intern(msg))
            rescue => detail
                puts detail.backtrace if Puppet[:trace]
                Puppet.err "Error occured with subscription to queue #{queue} for indirection #{indirection_name}: #{detail}"
            end
        end
    end
end