summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLuke Kanies <luke@madstop.com>2009-06-02 23:08:52 -0500
committerJames Turnbull <james@lovedthanlost.net>2009-06-06 19:57:59 +1000
commit0de70b7035ebc7f00ede73098684ee5db4b2de14 (patch)
treec7e4abf18f785b4959c46c78fed697e908ed304d
parent7b33b6da4bdcd2263e2c63b443e9bea6fbe8d161 (diff)
downloadpuppet-0de70b7035ebc7f00ede73098684ee5db4b2de14.tar.gz
puppet-0de70b7035ebc7f00ede73098684ee5db4b2de14.tar.xz
puppet-0de70b7035ebc7f00ede73098684ee5db4b2de14.zip
Switching Queueing to using JSON instead of YAML
This provides about a 75x speedup, so it's totally worth it. The downside is that queueing requires json, but only on the server side.
-rw-r--r--lib/puppet/defaults.rb2
-rw-r--r--lib/puppet/indirector/queue.rb33
-rw-r--r--lib/puppet/relationship.rb2
-rw-r--r--lib/puppet/resource/catalog.rb7
-rwxr-xr-xspec/integration/indirector/catalog/queue.rb59
-rwxr-xr-xspec/unit/indirector/queue.rb108
6 files changed, 158 insertions, 53 deletions
diff --git a/lib/puppet/defaults.rb b/lib/puppet/defaults.rb
index 3d0ad4bb8..8743f859d 100644
--- a/lib/puppet/defaults.rb
+++ b/lib/puppet/defaults.rb
@@ -168,7 +168,7 @@ module Puppet
:queue_source => ["stomp://localhost:61613/", "Which type of queue to use for asynchronous processing. If your stomp server requires
authentication, you can include it in the URI as long as your stomp client library is at least 1.1.1"],
:async_storeconfigs => {:default => false, :desc => "Whether to use a queueing system to provide asynchronous database integration.
- Requires that ``puppetqd`` be running.",
+ Requires that ``puppetqd`` be running and that 'JSON' support for ruby be installed.",
:hook => proc do |value|
if value
# This reconfigures the terminii for Node, Facts, and Catalog
diff --git a/lib/puppet/indirector/queue.rb b/lib/puppet/indirector/queue.rb
index cd0e0c833..1fc72d6c1 100644
--- a/lib/puppet/indirector/queue.rb
+++ b/lib/puppet/indirector/queue.rb
@@ -1,6 +1,7 @@
require 'puppet/indirector/terminus'
require 'puppet/util/queue'
-require 'yaml'
+require 'puppet/util'
+require 'json'
# Implements the <tt>:queue</tt> abstract indirector terminus type, for storing
# model instances to a message queue, presumably for the purpose of out-of-process
@@ -20,6 +21,12 @@ require 'yaml'
# creation is automatic and not a concern).
class Puppet::Indirector::Queue < Puppet::Indirector::Terminus
extend ::Puppet::Util::Queue
+ include Puppet::Util
+
+ def initialize(*args)
+ super
+ raise ArgumentError, "Queueing requires json support" unless Puppet.features.json?
+ end
# Queue has no idiomatic "find"
def find(request)
@@ -29,8 +36,11 @@ class Puppet::Indirector::Queue < Puppet::Indirector::Terminus
# Place the request on the queue
def save(request)
begin
- Puppet.info "Queueing catalog for %s" % request.key
- client.send_message(queue, render(request.instance))
+ result = nil
+ benchmark :info, "Queued %s for %s" % [indirection.name, request.key] do
+ result = client.send_message(queue, request.instance.render(:json))
+ end
+ result
rescue => detail
raise Puppet::Error, "Could not write %s to queue: %s\nInstance::%s\n client : %s" % [request.key, detail,request.instance.to_s,client.to_s]
end
@@ -49,15 +59,13 @@ class Puppet::Indirector::Queue < Puppet::Indirector::Terminus
self.class.client
end
- # Formats the model instance associated with _request_ appropriately for message delivery.
- # Uses YAML serialization.
- def render(obj)
- YAML::dump(obj)
- end
-
# converts the _message_ from deserialized format to an actual model instance.
def self.intern(message)
- YAML::load(message)
+ result = nil
+ benchmark :info, "Loaded queued %s" % [indirection.name] do
+ result = model.convert_from(:json, message)
+ end
+ result
end
# Provides queue subscription functionality; for a given indirection, use this method on the terminus
@@ -68,9 +76,8 @@ class Puppet::Indirector::Queue < Puppet::Indirector::Terminus
begin
yield(self.intern(msg))
rescue => detail
- # really, this should log the exception rather than raise it all the way up the stack;
- # we don't want exceptions resulting from a single message bringing down a listener
- raise Puppet::Error, "Error occured with subscription to queue %s for indirection %s: %s" % [queue, indirection_name, detail]
+ puts detail.backtrace if Puppet[:trace]
+ Puppet.err "Error occured with subscription to queue %s for indirection %s: %s" % [queue, indirection_name, detail]
end
end
end
diff --git a/lib/puppet/relationship.rb b/lib/puppet/relationship.rb
index 8efebf1e3..96a71c3f1 100644
--- a/lib/puppet/relationship.rb
+++ b/lib/puppet/relationship.rb
@@ -42,7 +42,7 @@ class Puppet::Relationship
def initialize(source, target, options = {})
@source, @target = source, target
- options ||= {}
+ options = (options || {}).inject({}) { |h,a| h[a[0].to_sym] = a[1]; h }
[:callback, :event].each do |option|
if value = options[option]
send(option.to_s + "=", value)
diff --git a/lib/puppet/resource/catalog.rb b/lib/puppet/resource/catalog.rb
index 68e6d7de5..42e92f407 100644
--- a/lib/puppet/resource/catalog.rb
+++ b/lib/puppet/resource/catalog.rb
@@ -402,12 +402,14 @@ class Puppet::Resource::Catalog < Puppet::SimpleGraph
end
if resources = data['resources']
+ resources = JSON.parse(resources) if resources.is_a?(String)
resources.each do |res|
resource_from_json(result, res)
end
end
if edges = data['edges']
+ edges = JSON.parse(edges) if edges.is_a?(String)
edges.each do |edge|
edge_from_json(result, edge)
end
@@ -436,7 +438,10 @@ class Puppet::Resource::Catalog < Puppet::SimpleGraph
def self.resource_from_json(result, res)
# If no json_class information was presented, we manually find
# the class.
- res = Puppet::Resource.from_json(res) if res.is_a?(Hash)
+ if res.is_a?(Hash)
+ res = res['data'] if res['json_class']
+ res = Puppet::Resource.from_json(res)
+ end
result.add_resource(res)
end
diff --git a/spec/integration/indirector/catalog/queue.rb b/spec/integration/indirector/catalog/queue.rb
new file mode 100755
index 000000000..22f29aac3
--- /dev/null
+++ b/spec/integration/indirector/catalog/queue.rb
@@ -0,0 +1,59 @@
+#!/usr/bin/env ruby
+
+Dir.chdir(File.dirname(__FILE__)) { (s = lambda { |f| File.exist?(f) ? require(f) : Dir.chdir("..") { s.call(f) } }).call("spec/spec_helper.rb") }
+
+require 'puppet/resource/catalog'
+
+Puppet::Resource::Catalog.indirection.terminus(:queue)
+
+describe Puppet::Resource::Catalog::Queue do
+ before do
+ @catalog = Puppet::Resource::Catalog.new
+
+ @one = Puppet::Resource.new(:file, "/one")
+ @two = Puppet::Resource.new(:file, "/two")
+ @catalog.add_resource(@one, @two)
+
+ @catalog.add_edge(@one, @two)
+
+ Puppet[:trace] = true
+ end
+
+ after { Puppet.settings.clear }
+
+ it "should render catalogs to json and send them via the queue client when catalogs are saved" do
+ terminus = Puppet::Resource::Catalog.indirection.terminus(:queue)
+
+ client = mock 'client'
+ terminus.stubs(:client).returns client
+
+ client.expects(:send_message).with(:catalog, @catalog.to_json)
+
+ request = Puppet::Indirector::Request.new(:catalog, :save, "foo", :instance => @catalog)
+
+ terminus.save(request)
+ end
+
+ it "should intern catalog messages when they are passed via a subscription" do
+ client = mock 'client'
+ Puppet::Resource::Catalog::Queue.stubs(:client).returns client
+
+ json = @catalog.to_json
+
+ client.expects(:subscribe).with(:catalog).yields(json)
+
+ Puppet.expects(:err).never
+
+ result = []
+ Puppet::Resource::Catalog::Queue.subscribe do |catalog|
+ result << catalog
+ end
+
+ catalog = result.shift
+ catalog.should be_instance_of(Puppet::Resource::Catalog)
+
+ catalog.resource(:file, "/one").should be_instance_of(Puppet::Resource)
+ catalog.resource(:file, "/two").should be_instance_of(Puppet::Resource)
+ catalog.should be_edge(catalog.resource(:file, "/one"), catalog.resource(:file, "/two"))
+ end
+end
diff --git a/spec/unit/indirector/queue.rb b/spec/unit/indirector/queue.rb
index 0e9074440..3bc066873 100755
--- a/spec/unit/indirector/queue.rb
+++ b/spec/unit/indirector/queue.rb
@@ -4,35 +4,32 @@ require File.dirname(__FILE__) + '/../../spec_helper'
require 'puppet/indirector/queue'
class Puppet::Indirector::Queue::TestClient
- def self.reset
- @queues = {}
- end
+end
+
+class FooExampleData
+ attr_accessor :name
- def self.queues
- @queues ||= {}
+ def self.json_create(json)
+ new(json['data'].to_sym)
end
- def subscribe(queue)
- stack = self.class.queues[queue] ||= []
- while stack.length > 0 do
- yield(stack.shift)
- end
+ def initialize(name = nil)
+ @name = name if name
end
- def send_message(queue, message)
- stack = self.class.queues[queue] ||= []
- stack.push(message)
- queue
+ def render(format = :json)
+ to_json
end
-end
-class FooExampleData
- attr_accessor :name
+ def to_json(*args)
+ {:json_class => self.class.to_s, :data => name}.to_json(*args)
+ end
end
describe Puppet::Indirector::Queue do
before :each do
- @indirection = stub 'indirection', :name => :my_queue, :register_terminus_type => nil
+ @model = mock 'model'
+ @indirection = stub 'indirection', :name => :my_queue, :register_terminus_type => nil, :model => @model
Puppet::Indirector::Indirection.stubs(:instance).with(:my_queue).returns(@indirection)
@store_class = Class.new(Puppet::Indirector::Queue) do
def self.to_s
@@ -48,40 +45,77 @@ describe Puppet::Indirector::Queue do
Puppet.settings.stubs(:value).returns("bogus setting data")
Puppet.settings.stubs(:value).with(:queue_type).returns(:test_client)
Puppet::Util::Queue.stubs(:queue_type_to_class).with(:test_client).returns(Puppet::Indirector::Queue::TestClient)
- Puppet::Indirector::Queue::TestClient.reset
@request = stub 'request', :key => :me, :instance => @subject
end
+ it "should require JSON" do
+ Puppet.features.expects(:json?).returns false
+
+ lambda { @store_class.new }.should raise_error(ArgumentError)
+ end
+
it 'should use the correct client type and queue' do
@store.queue.should == :my_queue
@store.client.should be_an_instance_of(Puppet::Indirector::Queue::TestClient)
end
- it 'should use render() to convert object to message' do
- @store.expects(:render).with(@subject).once
- @store.save(@request)
- end
+ describe "when saving" do
+ it 'should render the instance using json' do
+ @subject.expects(:render).with(:json)
+ @store.client.stubs(:send_message)
+ @store.save(@request)
+ end
+
+ it "should send the rendered message to the appropriate queue on the client" do
+ @subject.expects(:render).returns "myjson"
- it 'should save and restore with the appropriate queue, and handle subscribe block' do
- @subject_two = @subject_class.new
- @subject_two.name = :too
- @store.save(@request)
- @store.save(stub('request_two', :key => 'too', :instance => @subject_two))
+ @store.client.expects(:send_message).with(:my_queue, "myjson")
- received = []
- @store_class.subscribe do |obj|
- received.push(obj)
+ @store.save(@request)
end
- received[0].name.should == @subject.name
- received[1].name.should == @subject_two.name
+ it "should catch any exceptions raised" do
+ @store.client.expects(:send_message).raises ArgumentError
+
+ lambda { @store.save(@request) }.should raise_error(Puppet::Error)
+ end
end
- it 'should use intern() to convert message to object with subscribe()' do
- @store.save(@request)
- @store_class.expects(:intern).with(@store.render(@subject)).once
- @store_class.subscribe {|o| o }
+ describe "when subscribing to the queue" do
+ before do
+ @store_class.stubs(:model).returns @model
+ end
+
+ it "should use the model's Format support to intern the message from json" do
+ @model.expects(:convert_from).with(:json, "mymessage")
+
+ @store_class.client.expects(:subscribe).yields("mymessage")
+ @store_class.subscribe {|o| o }
+ end
+
+ it "should yield each interned received message" do
+ @model.stubs(:convert_from).returns "something"
+
+ @subject_two = @subject_class.new
+ @subject_two.name = :too
+
+ @store_class.client.expects(:subscribe).with(:my_queue).multiple_yields(@subject, @subject_two)
+
+ received = []
+ @store_class.subscribe do |obj|
+ received.push(obj)
+ end
+
+ received.should == %w{something something}
+ end
+
+ it "should log but not propagate errors" do
+ @store_class.client.expects(:subscribe).yields("foo")
+ @store_class.expects(:intern).raises ArgumentError
+ Puppet.expects(:err)
+ @store_class.subscribe {|o| o }
+ end
end
end