summaryrefslogtreecommitdiffstats
path: root/lib/puppet
diff options
context:
space:
mode:
authorLuke Kanies <luke@madstop.com>2009-10-29 00:23:05 -0700
committertest branch <puppet-dev@googlegroups.com>2010-02-17 06:50:53 -0800
commit2cbd9e85259ed1742f8a54a7e5b9825d0bb79d5e (patch)
treebafbe77d88527b56c3d10a975180a1748fa971da /lib/puppet
parent6a450c51eedc38b73d79389c19b8d5e5964a9d71 (diff)
downloadpuppet-2cbd9e85259ed1742f8a54a7e5b9825d0bb79d5e.tar.gz
puppet-2cbd9e85259ed1742f8a54a7e5b9825d0bb79d5e.tar.xz
puppet-2cbd9e85259ed1742f8a54a7e5b9825d0bb79d5e.zip
Switching transactions to callback-based events
Events are now queued as they are created, and the queues are managed through simple interfaces, rather than collecting events over time and responding to them inline. This drastically simplifies event management, and will make moving it to a separate system essentially trivial. Signed-off-by: Luke Kanies <luke@madstop.com>
Diffstat (limited to 'lib/puppet')
-rw-r--r--lib/puppet/transaction.rb319
-rw-r--r--lib/puppet/transaction/event.rb19
2 files changed, 122 insertions, 216 deletions
diff --git a/lib/puppet/transaction.rb b/lib/puppet/transaction.rb
index 725b86dc5..6671fb1f0 100644
--- a/lib/puppet/transaction.rb
+++ b/lib/puppet/transaction.rb
@@ -4,8 +4,7 @@
require 'puppet'
require 'puppet/util/tagging'
-module Puppet
-class Transaction
+class Puppet::Transaction
require 'puppet/transaction/change'
require 'puppet/transaction/event'
@@ -15,6 +14,9 @@ class Transaction
# The list of events generated in this transaction.
attr_reader :events
+ # Mostly only used for tests
+ attr_reader :resourcemetrics, :changes
+
include Puppet::Util
include Puppet::Util::Tagging
@@ -58,9 +60,7 @@ class Transaction
begin
changes = resource.evaluate
rescue => detail
- if Puppet[:trace]
- puts detail.backtrace
- end
+ puts detail.backtrace if Puppet[:trace]
resource.err "Failed to retrieve current state of resource: %s" % detail
@@ -68,18 +68,16 @@ class Transaction
@failures[resource] += 1
# And then return
- return []
+ return
end
changes = [changes] unless changes.is_a?(Array)
- if changes.length > 0
- @resourcemetrics[:out_of_sync] += 1
- end
+ @resourcemetrics[:out_of_sync] += 1 if changes.length > 0
- return [] if changes.empty? or ! allow_processing?(resource, changes)
+ return if changes.empty? or ! allow_processing?(resource, changes)
- resourceevents = apply_changes(resource, changes)
+ apply_changes(resource, changes)
# If there were changes and the resource isn't in noop mode...
unless changes.empty? or resource.noop
@@ -90,63 +88,22 @@ class Transaction
if resource.respond_to?(:flush)
resource.flush
end
-
- # And set a trigger for refreshing this resource if it's a
- # self-refresher
- if resource.self_refresh? and ! resource.deleting?
- # Create an edge with this resource as both the source and
- # target. The triggering method treats these specially for
- # logging.
- events = resourceevents.collect { |e| e.name }
- set_trigger(Puppet::Relationship.new(resource, resource, :callback => :refresh, :event => events))
- end
end
-
- resourceevents
end
# Apply each change in turn.
def apply_changes(resource, changes)
- changes.collect { |change|
- @changes << change
- @count += 1
- events = nil
- begin
- # use an array, so that changes can return more than one
- # event if they want
- events = [change.forward].flatten.reject { |e| e.nil? }
- rescue => detail
- if Puppet[:trace]
- puts detail.backtrace
- end
- change.property.err "change from %s to %s failed: %s" %
- [change.property.is_to_s(change.is), change.property.should_to_s(change.should), detail]
- @failures[resource] += 1
- next
- # FIXME this should support using onerror to determine
- # behaviour; or more likely, the client calling us
- # should do so
- end
-
- # Mark that our change happened, so it can be reversed
- # if we ever get to that point
- unless events.nil? or (events.is_a?(Array) and (events.empty?) or events.include?(:noop))
- change.changed = true
- @resourcemetrics[:applied] += 1
- end
-
- events
- }.flatten.reject { |e| e.nil? }
+ changes.each { |change| apply_change(resource, change) }
end
# Find all of the changed resources.
def changed?
- @changes.find_all { |change| change.changed }.collect { |change|
+ @changes.find_all { |change| change.changed }.collect do |change|
unless change.property.resource
raise "No resource for %s" % change.inspect
end
change.property.resource
- }.uniq
+ end.uniq
end
# Do any necessary cleanup. If we don't get rid of the graphs, the
@@ -192,42 +149,18 @@ class Transaction
# Evaluate a single resource.
def eval_resource(resource)
- events = []
-
- if resource.is_a?(Puppet::Type::Component)
- raise Puppet::DevError, "Got a component to evaluate"
- end
-
if skip?(resource)
@resourcemetrics[:skipped] += 1
- else
- events += eval_children_and_apply_resource(resource)
+ return
end
- # Check to see if there are any events for this resource
- if triggedevents = trigger(resource)
- events += triggedevents
- end
+ eval_children_and_apply_resource(resource)
- # Collect the targets of any subscriptions to those events. We pass
- # the parent resource in so it will override the source in the events,
- # since eval_generated children can't have direct relationships.
- relationship_graph.matching_edges(events, resource).each do |orig_edge|
- # We have to dup the label here, else we modify the original edge label,
- # which affects whether a given event will match on the next run, which is,
- # of course, bad.
- edge = orig_edge.class.new(orig_edge.source, orig_edge.target, orig_edge.label)
- edge.event = events.collect { |e| e.name }
- set_trigger(edge)
- end
-
- # And return the events for collection
- events
+ # Check to see if there are any events queued for this resource
+ process_events(resource)
end
def eval_children_and_apply_resource(resource)
- events = []
-
@resourcemetrics[:scheduled] += 1
changecount = @changes.length
@@ -239,18 +172,18 @@ class Transaction
if ! children.empty? and resource.depthfirst?
children.each do |child|
# The child will never be skipped when the parent isn't
- events += eval_resource(child, false)
+ eval_resource(child, false)
end
end
# Perform the actual changes
seconds = thinmark do
- events += apply(resource)
+ apply(resource)
end
if ! children.empty? and ! resource.depthfirst?
children.each do |child|
- events += eval_resource(child)
+ eval_resource(child)
end
end
@@ -265,15 +198,14 @@ class Transaction
# Keep track of how long we spend in each type of resource
@timemetrics[resource.class.name] += seconds
-
- events
end
# This method does all the actual work of running a transaction. It
# collects all of the changes, executes them, and responds to any
# necessary events.
def evaluate
- @count = 0
+ # Start logging.
+ Puppet::Util::Log.newdestination(@report)
prepare()
@@ -295,11 +227,7 @@ class Transaction
ret
}.flatten.reject { |e| e.nil? }
- Puppet.debug "Finishing transaction %s with %s changes" %
- [self.object_id, @count]
-
- @events = allevents
- allevents
+ Puppet.debug "Finishing transaction #{object_id} with #{@changes.length} changes"
end
# Determine whether a given resource has failed.
@@ -417,26 +345,21 @@ class Transaction
# Metrics for distributing times across the different types.
@timemetrics = Hash.new(0)
- # The number of resources that were triggered in this run
- @triggered = Hash.new { |hash, key|
- hash[key] = Hash.new(0)
- }
-
- # Targets of being triggered.
- @targets = Hash.new do |hash, key|
- hash[key] = []
- end
+ @event_queues = {}
# The changes we're performing
@changes = []
+ # The complete list of events generated.
+ @events = []
+
# The resources that have failed and the number of failures each. This
# is used for skipping resources because of failed dependencies.
@failures = Hash.new do |h, key|
h[key] = 0
end
- @count = 0
+ @report = Report.new
end
# Prefetch any providers that support it. We don't support prefetching
@@ -477,15 +400,67 @@ class Transaction
@sorted_resources = relationship_graph.topsort
end
+ # Respond to any queued events for this resource.
+ def process_events(resource)
+ restarted = false
+ queued_events(resource) do |callback, events|
+ r = process_callback(resource, callback, events)
+ restarted ||= r
+ end
+
+ if restarted
+ queue_events(Puppet::Transaction::Event.new(:restarted, resource))
+
+ @resourcemetrics[:restarted] += 1
+ end
+ end
+
+ # Queue events for other resources to respond to. All of these events have
+ # to be from the same resource.
+ def queue_events(*events)
+ events.flatten!
+
+ @events += events
+
+ resource = events[0].resource
+
+ # Collect the targets of any subscriptions to those events. We pass
+ # the parent resource in so it will override the source in the events,
+ # since eval_generated children can't have direct relationships.
+ relationship_graph.matching_edges(events, resource).each do |edge|
+ next unless method = edge.callback
+ next unless edge.target.respond_to?(method)
+
+ queue_events_for_resource(resource, edge.target, method, events)
+ end
+
+ if resource.self_refresh? and ! resource.deleting?
+ queue_events_for_resource(resource, resource, :refresh, events)
+ end
+ end
+
+ def queue_events_for_resource(source, target, callback, events)
+ source.info "Scheduling #{callback} of #{target}"
+
+ @event_queues[target] ||= {}
+ @event_queues[target][callback] ||= []
+ @event_queues[target][callback] += events
+ end
+
+ def queued_events(resource)
+ return unless callbacks = @event_queues[resource]
+ callbacks.each do |callback, events|
+ yield callback, events
+ end
+ end
+
def relationship_graph
catalog.relationship_graph
end
# Roll all completed changes back.
def rollback
- @targets.clear
- @triggered.clear
- allevents = @changes.reverse.collect { |change|
+ @changes.reverse.collect do |change|
# skip changes that were never actually run
unless change.changed
Puppet.debug "%s was not changed" % change.to_s
@@ -507,19 +482,12 @@ class Transaction
# but a chmod failed? how would i handle that error? dern
end
- # FIXME This won't work right now.
- relationship_graph.matching_edges(events).each do |edge|
- @targets[edge.target] << edge
- end
+ # And queue the events
+ queue_events(events)
# Now check to see if there are any events for this child.
- # Kind of hackish, since going backwards goes a change at a
- # time, not a child at a time.
- trigger(change.property.resource)
-
- # And return the events for collection
- events
- }.flatten.reject { |e| e.nil? }
+ process_events(change.property.resource)
+ end
end
# Is the resource currently scheduled?
@@ -527,18 +495,6 @@ class Transaction
self.ignoreschedules or resource.scheduled?
end
- # Set an edge to be triggered when we evaluate its target.
- def set_trigger(edge)
- return unless method = edge.callback
- return unless edge.target.respond_to?(method)
- if edge.target.respond_to?(:ref)
- unless edge.source == edge.target
- edge.source.info "Scheduling %s of %s" % [edge.callback, edge.target.ref]
- end
- end
- @targets[edge.target] << edge
- end
-
# Should this resource be skipped?
def skip?(resource)
skip = false
@@ -581,87 +537,50 @@ class Transaction
self.ignore_tags? or tags.empty? or resource.tagged?(*tags)
end
- # Are there any edges that target this resource?
- def targeted?(resource)
- # The default value is a new array so we have to test the length of it.
- @targets.include?(resource) and @targets[resource].length > 0
- end
-
- # Trigger any subscriptions to a child. This does an upwardly recursive
- # search -- it triggers the passed resource, but also the resource's parent
- # and so on up the tree.
- def trigger(resource)
- return nil unless targeted?(resource)
- callbacks = Hash.new { |hash, key| hash[key] = [] }
-
- trigged = []
- @targets[resource].each do |edge|
- # Collect all of the subs for each callback
- callbacks[edge.callback] << edge
- end
-
- callbacks.each do |callback, subs|
- noop = true
- subs.each do |edge|
- if edge.event.nil? or ! edge.event.include?(:noop)
- noop = false
- end
- end
-
- if noop
- resource.notice "Would have triggered %s from %s dependencies" %
- [callback, subs.length]
-
- # And then add an event for it.
- return [Puppet::Transaction::Event.new(:noop, resource)]
- end
-
- if subs.length == 1 and subs[0].source == resource
- message = "Refreshing self"
- else
- message = "Triggering '%s' from %s dependencies" %
- [callback, subs.length]
- end
- resource.notice message
+ private
- # At this point, just log failures, don't try to react
- # to them in any way.
- begin
- resource.send(callback)
- @resourcemetrics[:restarted] += 1
- rescue => detail
- resource.err "Failed to call %s on %s: %s" %
- [callback, resource, detail]
+ def apply_change(resource, change)
+ @changes << change
- @resourcemetrics[:failed_restarts] += 1
+ # use an array, so that changes can return more than one
+ # event if they want
+ events = [change.forward].flatten.reject { |e| e.nil? }
- if Puppet[:trace]
- puts detail.backtrace
- end
- end
+ # Mark that our change happened, so it can be reversed
+ # if we ever get to that point
+ change.changed = true
+ @resourcemetrics[:applied] += 1
+ queue_events(events)
+ rescue => detail
+ puts detail.backtrace if Puppet[:trace]
+ is = change.property.is_to_s(change.is)
+ should = change.property.should_to_s(change.should)
+ change.property.err "change from #{is} to #{should} failed: #{detail}"
+ @failures[resource] += 1
+ end
- # And then add an event for it.
- trigged << Puppet::Transaction::Event.new(:triggered, resource)
+ def process_callback(resource, callback, events)
+ process_noop_events(resource, callback, events) and return false if events.detect { |e| e.name == :noop }
+ resource.send(callback)
- triggered(resource, callback)
- end
+ resource.notice "Triggered '#{callback}' from #{events.length} events"
+ return true
+ rescue => detail
+ resource.err "Failed to call #{callback}: #{detail}"
- if trigged.empty?
- return nil
- else
- return trigged
- end
+ @resourcemetrics[:failed_restarts] += 1
+ puts detail.backtrace if Puppet[:trace]
+ return false
end
- def triggered(resource, method)
- @triggered[resource][method] += 1
- end
+ def process_noop_events(resource, callback, events)
+ resource.notice "Would have triggered '#{callback}' from #{events.length} events"
- def triggered?(resource, method)
- @triggered[resource][method]
+ # And then add an event for it.
+ queue_events(Puppet::Transaction::Event.new(:noop, resource))
+ true # so the 'and if' works
end
end
-end
require 'puppet/transaction/report'
diff --git a/lib/puppet/transaction/event.rb b/lib/puppet/transaction/event.rb
index 54c092e65..5d422f93d 100644
--- a/lib/puppet/transaction/event.rb
+++ b/lib/puppet/transaction/event.rb
@@ -1,21 +1,8 @@
require 'puppet'
-require 'puppet/util/methodhelper'
-require 'puppet/util/errors'
-
-# events are transient packets of information; they result in one or more (or none)
-# subscriptions getting triggered, and then they get cleared
-# eventually, these will be passed on to some central event system
-class Puppet::Transaction::Event
- include Puppet::Util::MethodHelper
- include Puppet::Util::Errors
-
- attr_reader :name, :source
-
- def initialize(name, source)
- @name, @source = name, source
- end
+# A simple struct for storing what happens on the system.
+Puppet::Transaction::Event = Struct.new(:name, :resource, :property, :result, :log, :previous_value, :desired_value) do
def to_s
- source.to_s + " -> " + name.to_s
+ log
end
end