summaryrefslogtreecommitdiffstats
path: root/lib/puppet/indirector/queue.rb
blob: fd089f43105903c8b0fc94af244ca144609a40e1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
require 'puppet/indirector/terminus'
require 'puppet/util/queue'
require 'puppet/util'

# Implements the <tt>:queue</tt> abstract indirector terminus type, for storing
# model instances to a message queue, presumably for the purpose of out-of-process
# handling of changes related to the model.
#
# Relies upon Puppet::Util::Queue for registry and client object management,
# and specifies a default queue type of <tt>:stomp</tt>, appropriate for use with a variety of message brokers.
#
# It's up to the queue client type to instantiate itself correctly based on Puppet configuration information.
#
# A single queue client is maintained for the abstract terminus, meaning that you can only use one type
# of queue client, one message broker solution, etc., with the indirection mechanism.
#
# Per-indirection queues are assumed, based on the indirection name.  If the <tt>:catalog</tt> indirection makes
# use of this <tt>:queue</tt> terminus, queue operations work against the "catalog" queue.  It is up to the queue
# client library to handle queue creation as necessary (for a number of popular queuing solutions, queue
# creation is automatic and not a concern).
class Puppet::Indirector::Queue < Puppet::Indirector::Terminus
  extend ::Puppet::Util::Queue
  include Puppet::Util

  def initialize(*args)
    super
    raise ArgumentError, "Queueing requires pson support" unless Puppet.features.pson?
  end

  # Queue has no idiomatic "find"
  def find(request)
    nil
  end

  # Place the request on the queue
  def save(request)
      result = nil
      benchmark :info, "Queued #{indirection.name} for #{request.key}" do
        result = client.send_message(queue, request.instance.render(:pson))
      end
      result
  rescue => detail
      raise Puppet::Error, "Could not write #{request.key} to queue: #{detail}\nInstance::#{request.instance}\n client : #{client}"
  end

  def self.queue
    indirection_name
  end

  def queue
    self.class.queue
  end

  # Returns the singleton queue client object.
  def client
    self.class.client
  end

  # converts the _message_ from deserialized format to an actual model instance.
  def self.intern(message)
    result = nil
    benchmark :info, "Loaded queued #{indirection.name}" do
      result = model.convert_from(:pson, message)
    end
    result
  end

  # Provides queue subscription functionality; for a given indirection, use this method on the terminus
  # to subscribe to the indirection-specific queue.  Your _block_ will be executed per new indirection
  # model received from the queue, with _obj_ being the model instance.
  def self.subscribe
    client.subscribe(queue) do |msg|
      begin
        yield(self.intern(msg))
      rescue => detail
        puts detail.backtrace if Puppet[:trace]
        Puppet.err "Error occured with subscription to queue #{queue} for indirection #{indirection_name}: #{detail}"
      end
    end
  end
end