diff options
-rw-r--r-- | lib/puppet/util/queue/stomp.rb | 5 | ||||
-rwxr-xr-x | spec/unit/util/queue/stomp.rb | 18 |
2 files changed, 20 insertions, 3 deletions
diff --git a/lib/puppet/util/queue/stomp.rb b/lib/puppet/util/queue/stomp.rb index 62716fab2..a87268b72 100644 --- a/lib/puppet/util/queue/stomp.rb +++ b/lib/puppet/util/queue/stomp.rb @@ -33,7 +33,10 @@ class Puppet::Util::Queue::Stomp end def subscribe(target) - stomp_client.subscribe(stompify_target(target)) {|stomp_message| yield(stomp_message.body)} + stomp_client.subscribe(stompify_target(target), :ack => :client) do |stomp_message| + yield(stomp_message.body) + stomp_client.acknowledge(stomp_message) + end end def stompify_target(target) diff --git a/spec/unit/util/queue/stomp.rb b/spec/unit/util/queue/stomp.rb index 2adf320e1..fec179018 100755 --- a/spec/unit/util/queue/stomp.rb +++ b/spec/unit/util/queue/stomp.rb @@ -93,7 +93,7 @@ describe 'Puppet::Util::Queue::Stomp' do describe "when subscribing to a queue" do before do - @client = stub 'client' + @client = stub 'client', :acknowledge => true Stomp::Client.stubs(:new).returns @client @queue = Puppet::Util::Queue::Stomp.new end @@ -104,7 +104,12 @@ describe 'Puppet::Util::Queue::Stomp' do end it "should subscribe to the transformed queue name" do - @client.expects(:subscribe).with("/queue/fooqueue") + @client.expects(:subscribe).with { |queue, options| queue == "/queue/fooqueue" } + @queue.subscribe('fooqueue') + end + + it "should specify that its messages should be acknowledged" do + @client.expects(:subscribe).with { |queue, options| options[:ack] == :client } @queue.subscribe('fooqueue') end @@ -118,6 +123,15 @@ describe 'Puppet::Util::Queue::Stomp' do @queue.subscribe('fooqueue') { |b| body = b } body.should == "mybody" end + + it "should acknowledge all successfully processed messages" do + message = stub 'message', :body => "mybode" + + @client.stubs(:subscribe).yields(message) + @client.expects(:acknowledge).with(message) + + @queue.subscribe('fooqueue') { |b| "eh" } + end end it 'should transform the simple queue name to "/queue/<queue_name>"' do |