1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
require 'puppet/util/queue'
require 'stomp'
require 'uri'
# Implements the Ruby Stomp client as a queue type within the Puppet::Indirector::Queue::Client
# registry, for use with the <tt>:queue</tt> indirection terminus type.
#
# Looks to <tt>Puppet[:queue_source]</tt> for the sole argument to the underlying Stomp::Client constructor;
# consequently, for this client to work, <tt>Puppet[:queue_source]</tt> must use the Stomp::Client URL-like
# syntax for identifying the Stomp message broker: <em>login:pass@host.port</em>
class Puppet::Util::Queue::Stomp
attr_accessor :stomp_client
def initialize
begin
uri = URI.parse(Puppet[:queue_source])
rescue => detail
raise ArgumentError, "Could not create Stomp client instance - queue source #{Puppet[:queue_source]} is invalid: #{detail}"
end
unless uri.scheme == "stomp"
raise ArgumentError, "Could not create Stomp client instance - queue source #{Puppet[:queue_source]} is not a Stomp URL: #{detail}"
end
begin
self.stomp_client = Stomp::Client.new(uri.user, uri.password, uri.host, uri.port, true)
rescue => detail
raise ArgumentError, "Could not create Stomp client instance with queue source #{Puppet[:queue_source]}: got internal Stomp client error #{detail}"
end
end
def publish_message(target, msg)
stomp_client.publish(stompify_target(target), msg, :persistent => true)
end
def subscribe(target)
stomp_client.subscribe(stompify_target(target), :ack => :client) do |stomp_message|
yield(stomp_message.body)
stomp_client.acknowledge(stomp_message)
end
end
def stompify_target(target)
'/queue/' + target.to_s
end
Puppet::Util::Queue.register_queue_type(self, :stomp)
end
|