summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLuke Kanies <luke@madstop.com>2009-05-28 10:10:40 -0500
committerJames Turnbull <james@lovedthanlost.net>2009-06-03 07:32:21 +1000
commit907b39b9708cbca10446336c424754e13d55e683 (patch)
tree9c56aa19d873d4aa6cd845e35c8ac2197c93fc4c
parent42247f0f5d08bce4d3fd2b020b770a12cc2f91db (diff)
downloadpuppet-907b39b9708cbca10446336c424754e13d55e683.tar.gz
puppet-907b39b9708cbca10446336c424754e13d55e683.tar.xz
puppet-907b39b9708cbca10446336c424754e13d55e683.zip
Using Message acknowledgement in queueing
This allows a more fine-grained load-balancing of the queue, which makes it easy to spin up multiple puppetqd instances and process the queues faster. Signed-off-by: Luke Kanies <luke@madstop.com>
-rw-r--r--lib/puppet/util/queue/stomp.rb5
-rwxr-xr-xspec/unit/util/queue/stomp.rb18
2 files changed, 20 insertions, 3 deletions
diff --git a/lib/puppet/util/queue/stomp.rb b/lib/puppet/util/queue/stomp.rb
index 62716fab2..a87268b72 100644
--- a/lib/puppet/util/queue/stomp.rb
+++ b/lib/puppet/util/queue/stomp.rb
@@ -33,7 +33,10 @@ class Puppet::Util::Queue::Stomp
end
def subscribe(target)
- stomp_client.subscribe(stompify_target(target)) {|stomp_message| yield(stomp_message.body)}
+ stomp_client.subscribe(stompify_target(target), :ack => :client) do |stomp_message|
+ yield(stomp_message.body)
+ stomp_client.acknowledge(stomp_message)
+ end
end
def stompify_target(target)
diff --git a/spec/unit/util/queue/stomp.rb b/spec/unit/util/queue/stomp.rb
index 2adf320e1..fec179018 100755
--- a/spec/unit/util/queue/stomp.rb
+++ b/spec/unit/util/queue/stomp.rb
@@ -93,7 +93,7 @@ describe 'Puppet::Util::Queue::Stomp' do
describe "when subscribing to a queue" do
before do
- @client = stub 'client'
+ @client = stub 'client', :acknowledge => true
Stomp::Client.stubs(:new).returns @client
@queue = Puppet::Util::Queue::Stomp.new
end
@@ -104,7 +104,12 @@ describe 'Puppet::Util::Queue::Stomp' do
end
it "should subscribe to the transformed queue name" do
- @client.expects(:subscribe).with("/queue/fooqueue")
+ @client.expects(:subscribe).with { |queue, options| queue == "/queue/fooqueue" }
+ @queue.subscribe('fooqueue')
+ end
+
+ it "should specify that its messages should be acknowledged" do
+ @client.expects(:subscribe).with { |queue, options| options[:ack] == :client }
@queue.subscribe('fooqueue')
end
@@ -118,6 +123,15 @@ describe 'Puppet::Util::Queue::Stomp' do
@queue.subscribe('fooqueue') { |b| body = b }
body.should == "mybody"
end
+
+ it "should acknowledge all successfully processed messages" do
+ message = stub 'message', :body => "mybode"
+
+ @client.stubs(:subscribe).yields(message)
+ @client.expects(:acknowledge).with(message)
+
+ @queue.subscribe('fooqueue') { |b| "eh" }
+ end
end
it 'should transform the simple queue name to "/queue/<queue_name>"' do