diff options
| author | Luke Kanies <luke@madstop.com> | 2009-06-02 23:08:52 -0500 |
|---|---|---|
| committer | James Turnbull <james@lovedthanlost.net> | 2009-06-06 19:57:59 +1000 |
| commit | 0de70b7035ebc7f00ede73098684ee5db4b2de14 (patch) | |
| tree | c7e4abf18f785b4959c46c78fed697e908ed304d | |
| parent | 7b33b6da4bdcd2263e2c63b443e9bea6fbe8d161 (diff) | |
| download | puppet-0de70b7035ebc7f00ede73098684ee5db4b2de14.tar.gz puppet-0de70b7035ebc7f00ede73098684ee5db4b2de14.tar.xz puppet-0de70b7035ebc7f00ede73098684ee5db4b2de14.zip | |
Switching Queueing to using JSON instead of YAML
This provides about a 75x speedup, so it's totally
worth it. The downside is that queueing requires json,
but only on the server side.
| -rw-r--r-- | lib/puppet/defaults.rb | 2 | ||||
| -rw-r--r-- | lib/puppet/indirector/queue.rb | 33 | ||||
| -rw-r--r-- | lib/puppet/relationship.rb | 2 | ||||
| -rw-r--r-- | lib/puppet/resource/catalog.rb | 7 | ||||
| -rwxr-xr-x | spec/integration/indirector/catalog/queue.rb | 59 | ||||
| -rwxr-xr-x | spec/unit/indirector/queue.rb | 108 |
6 files changed, 158 insertions, 53 deletions
diff --git a/lib/puppet/defaults.rb b/lib/puppet/defaults.rb index 3d0ad4bb8..8743f859d 100644 --- a/lib/puppet/defaults.rb +++ b/lib/puppet/defaults.rb @@ -168,7 +168,7 @@ module Puppet :queue_source => ["stomp://localhost:61613/", "Which type of queue to use for asynchronous processing. If your stomp server requires authentication, you can include it in the URI as long as your stomp client library is at least 1.1.1"], :async_storeconfigs => {:default => false, :desc => "Whether to use a queueing system to provide asynchronous database integration. - Requires that ``puppetqd`` be running.", + Requires that ``puppetqd`` be running and that 'JSON' support for ruby be installed.", :hook => proc do |value| if value # This reconfigures the terminii for Node, Facts, and Catalog diff --git a/lib/puppet/indirector/queue.rb b/lib/puppet/indirector/queue.rb index cd0e0c833..1fc72d6c1 100644 --- a/lib/puppet/indirector/queue.rb +++ b/lib/puppet/indirector/queue.rb @@ -1,6 +1,7 @@ require 'puppet/indirector/terminus' require 'puppet/util/queue' -require 'yaml' +require 'puppet/util' +require 'json' # Implements the <tt>:queue</tt> abstract indirector terminus type, for storing # model instances to a message queue, presumably for the purpose of out-of-process @@ -20,6 +21,12 @@ require 'yaml' # creation is automatic and not a concern). class Puppet::Indirector::Queue < Puppet::Indirector::Terminus extend ::Puppet::Util::Queue + include Puppet::Util + + def initialize(*args) + super + raise ArgumentError, "Queueing requires json support" unless Puppet.features.json? + end # Queue has no idiomatic "find" def find(request) @@ -29,8 +36,11 @@ class Puppet::Indirector::Queue < Puppet::Indirector::Terminus # Place the request on the queue def save(request) begin - Puppet.info "Queueing catalog for %s" % request.key - client.send_message(queue, render(request.instance)) + result = nil + benchmark :info, "Queued %s for %s" % [indirection.name, request.key] do + result = client.send_message(queue, request.instance.render(:json)) + end + result rescue => detail raise Puppet::Error, "Could not write %s to queue: %s\nInstance::%s\n client : %s" % [request.key, detail,request.instance.to_s,client.to_s] end @@ -49,15 +59,13 @@ class Puppet::Indirector::Queue < Puppet::Indirector::Terminus self.class.client end - # Formats the model instance associated with _request_ appropriately for message delivery. - # Uses YAML serialization. - def render(obj) - YAML::dump(obj) - end - # converts the _message_ from deserialized format to an actual model instance. def self.intern(message) - YAML::load(message) + result = nil + benchmark :info, "Loaded queued %s" % [indirection.name] do + result = model.convert_from(:json, message) + end + result end # Provides queue subscription functionality; for a given indirection, use this method on the terminus @@ -68,9 +76,8 @@ class Puppet::Indirector::Queue < Puppet::Indirector::Terminus begin yield(self.intern(msg)) rescue => detail - # really, this should log the exception rather than raise it all the way up the stack; - # we don't want exceptions resulting from a single message bringing down a listener - raise Puppet::Error, "Error occured with subscription to queue %s for indirection %s: %s" % [queue, indirection_name, detail] + puts detail.backtrace if Puppet[:trace] + Puppet.err "Error occured with subscription to queue %s for indirection %s: %s" % [queue, indirection_name, detail] end end end diff --git a/lib/puppet/relationship.rb b/lib/puppet/relationship.rb index 8efebf1e3..96a71c3f1 100644 --- a/lib/puppet/relationship.rb +++ b/lib/puppet/relationship.rb @@ -42,7 +42,7 @@ class Puppet::Relationship def initialize(source, target, options = {}) @source, @target = source, target - options ||= {} + options = (options || {}).inject({}) { |h,a| h[a[0].to_sym] = a[1]; h } [:callback, :event].each do |option| if value = options[option] send(option.to_s + "=", value) diff --git a/lib/puppet/resource/catalog.rb b/lib/puppet/resource/catalog.rb index 68e6d7de5..42e92f407 100644 --- a/lib/puppet/resource/catalog.rb +++ b/lib/puppet/resource/catalog.rb @@ -402,12 +402,14 @@ class Puppet::Resource::Catalog < Puppet::SimpleGraph end if resources = data['resources'] + resources = JSON.parse(resources) if resources.is_a?(String) resources.each do |res| resource_from_json(result, res) end end if edges = data['edges'] + edges = JSON.parse(edges) if edges.is_a?(String) edges.each do |edge| edge_from_json(result, edge) end @@ -436,7 +438,10 @@ class Puppet::Resource::Catalog < Puppet::SimpleGraph def self.resource_from_json(result, res) # If no json_class information was presented, we manually find # the class. - res = Puppet::Resource.from_json(res) if res.is_a?(Hash) + if res.is_a?(Hash) + res = res['data'] if res['json_class'] + res = Puppet::Resource.from_json(res) + end result.add_resource(res) end diff --git a/spec/integration/indirector/catalog/queue.rb b/spec/integration/indirector/catalog/queue.rb new file mode 100755 index 000000000..22f29aac3 --- /dev/null +++ b/spec/integration/indirector/catalog/queue.rb @@ -0,0 +1,59 @@ +#!/usr/bin/env ruby + +Dir.chdir(File.dirname(__FILE__)) { (s = lambda { |f| File.exist?(f) ? require(f) : Dir.chdir("..") { s.call(f) } }).call("spec/spec_helper.rb") } + +require 'puppet/resource/catalog' + +Puppet::Resource::Catalog.indirection.terminus(:queue) + +describe Puppet::Resource::Catalog::Queue do + before do + @catalog = Puppet::Resource::Catalog.new + + @one = Puppet::Resource.new(:file, "/one") + @two = Puppet::Resource.new(:file, "/two") + @catalog.add_resource(@one, @two) + + @catalog.add_edge(@one, @two) + + Puppet[:trace] = true + end + + after { Puppet.settings.clear } + + it "should render catalogs to json and send them via the queue client when catalogs are saved" do + terminus = Puppet::Resource::Catalog.indirection.terminus(:queue) + + client = mock 'client' + terminus.stubs(:client).returns client + + client.expects(:send_message).with(:catalog, @catalog.to_json) + + request = Puppet::Indirector::Request.new(:catalog, :save, "foo", :instance => @catalog) + + terminus.save(request) + end + + it "should intern catalog messages when they are passed via a subscription" do + client = mock 'client' + Puppet::Resource::Catalog::Queue.stubs(:client).returns client + + json = @catalog.to_json + + client.expects(:subscribe).with(:catalog).yields(json) + + Puppet.expects(:err).never + + result = [] + Puppet::Resource::Catalog::Queue.subscribe do |catalog| + result << catalog + end + + catalog = result.shift + catalog.should be_instance_of(Puppet::Resource::Catalog) + + catalog.resource(:file, "/one").should be_instance_of(Puppet::Resource) + catalog.resource(:file, "/two").should be_instance_of(Puppet::Resource) + catalog.should be_edge(catalog.resource(:file, "/one"), catalog.resource(:file, "/two")) + end +end diff --git a/spec/unit/indirector/queue.rb b/spec/unit/indirector/queue.rb index 0e9074440..3bc066873 100755 --- a/spec/unit/indirector/queue.rb +++ b/spec/unit/indirector/queue.rb @@ -4,35 +4,32 @@ require File.dirname(__FILE__) + '/../../spec_helper' require 'puppet/indirector/queue' class Puppet::Indirector::Queue::TestClient - def self.reset - @queues = {} - end +end + +class FooExampleData + attr_accessor :name - def self.queues - @queues ||= {} + def self.json_create(json) + new(json['data'].to_sym) end - def subscribe(queue) - stack = self.class.queues[queue] ||= [] - while stack.length > 0 do - yield(stack.shift) - end + def initialize(name = nil) + @name = name if name end - def send_message(queue, message) - stack = self.class.queues[queue] ||= [] - stack.push(message) - queue + def render(format = :json) + to_json end -end -class FooExampleData - attr_accessor :name + def to_json(*args) + {:json_class => self.class.to_s, :data => name}.to_json(*args) + end end describe Puppet::Indirector::Queue do before :each do - @indirection = stub 'indirection', :name => :my_queue, :register_terminus_type => nil + @model = mock 'model' + @indirection = stub 'indirection', :name => :my_queue, :register_terminus_type => nil, :model => @model Puppet::Indirector::Indirection.stubs(:instance).with(:my_queue).returns(@indirection) @store_class = Class.new(Puppet::Indirector::Queue) do def self.to_s @@ -48,40 +45,77 @@ describe Puppet::Indirector::Queue do Puppet.settings.stubs(:value).returns("bogus setting data") Puppet.settings.stubs(:value).with(:queue_type).returns(:test_client) Puppet::Util::Queue.stubs(:queue_type_to_class).with(:test_client).returns(Puppet::Indirector::Queue::TestClient) - Puppet::Indirector::Queue::TestClient.reset @request = stub 'request', :key => :me, :instance => @subject end + it "should require JSON" do + Puppet.features.expects(:json?).returns false + + lambda { @store_class.new }.should raise_error(ArgumentError) + end + it 'should use the correct client type and queue' do @store.queue.should == :my_queue @store.client.should be_an_instance_of(Puppet::Indirector::Queue::TestClient) end - it 'should use render() to convert object to message' do - @store.expects(:render).with(@subject).once - @store.save(@request) - end + describe "when saving" do + it 'should render the instance using json' do + @subject.expects(:render).with(:json) + @store.client.stubs(:send_message) + @store.save(@request) + end + + it "should send the rendered message to the appropriate queue on the client" do + @subject.expects(:render).returns "myjson" - it 'should save and restore with the appropriate queue, and handle subscribe block' do - @subject_two = @subject_class.new - @subject_two.name = :too - @store.save(@request) - @store.save(stub('request_two', :key => 'too', :instance => @subject_two)) + @store.client.expects(:send_message).with(:my_queue, "myjson") - received = [] - @store_class.subscribe do |obj| - received.push(obj) + @store.save(@request) end - received[0].name.should == @subject.name - received[1].name.should == @subject_two.name + it "should catch any exceptions raised" do + @store.client.expects(:send_message).raises ArgumentError + + lambda { @store.save(@request) }.should raise_error(Puppet::Error) + end end - it 'should use intern() to convert message to object with subscribe()' do - @store.save(@request) - @store_class.expects(:intern).with(@store.render(@subject)).once - @store_class.subscribe {|o| o } + describe "when subscribing to the queue" do + before do + @store_class.stubs(:model).returns @model + end + + it "should use the model's Format support to intern the message from json" do + @model.expects(:convert_from).with(:json, "mymessage") + + @store_class.client.expects(:subscribe).yields("mymessage") + @store_class.subscribe {|o| o } + end + + it "should yield each interned received message" do + @model.stubs(:convert_from).returns "something" + + @subject_two = @subject_class.new + @subject_two.name = :too + + @store_class.client.expects(:subscribe).with(:my_queue).multiple_yields(@subject, @subject_two) + + received = [] + @store_class.subscribe do |obj| + received.push(obj) + end + + received.should == %w{something something} + end + + it "should log but not propagate errors" do + @store_class.client.expects(:subscribe).yields("foo") + @store_class.expects(:intern).raises ArgumentError + Puppet.expects(:err) + @store_class.subscribe {|o| o } + end end end |
