summaryrefslogtreecommitdiffstats
path: root/lib/puppet/util/queue/stomp.rb
blob: 6f845c314acb747ac536ab6d6eeae50580a4d2bf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
require 'puppet/util/queue'
require 'stomp'

# Implements the Ruby Stomp client as a queue type within the Puppet::Indirector::Queue::Client
# registry, for use with the <tt>:queue</tt> indirection terminus type.
#
# Looks to <tt>Puppet[:queue_source]</tt> for the sole argument to the underlying Stomp::Client constructor;
# consequently, for this client to work, <tt>Puppet[:queue_source]</tt> must use the Stomp::Client URL-like
# syntax for identifying the Stomp message broker: <em>login:pass@host.port</em>
class Puppet::Util::Queue::Stomp
    attr_accessor :stomp_client

    def initialize
        self.stomp_client = Stomp::Client.new( Puppet[:queue_source] )
    end

    def send_message(target, msg)
        stomp_client.send(stompify_target(target), msg)
    end

    def subscribe(target)
        stomp_client.subscribe(stompify_target(target)) {|stomp_message| yield(stomp_message.body)}
    end

    def stompify_target(target)
        '/queue/' + target.to_s
    end

    Puppet::Util::Queue.register_queue_type(self, :stomp)
end