diff options
author | Luke Kanies <luke@madstop.com> | 2009-10-29 00:23:05 -0700 |
---|---|---|
committer | test branch <puppet-dev@googlegroups.com> | 2010-02-17 06:50:53 -0800 |
commit | 2cbd9e85259ed1742f8a54a7e5b9825d0bb79d5e (patch) | |
tree | bafbe77d88527b56c3d10a975180a1748fa971da /lib/puppet | |
parent | 6a450c51eedc38b73d79389c19b8d5e5964a9d71 (diff) | |
download | puppet-2cbd9e85259ed1742f8a54a7e5b9825d0bb79d5e.tar.gz puppet-2cbd9e85259ed1742f8a54a7e5b9825d0bb79d5e.tar.xz puppet-2cbd9e85259ed1742f8a54a7e5b9825d0bb79d5e.zip |
Switching transactions to callback-based events
Events are now queued as they are created, and
the queues are managed through simple interfaces,
rather than collecting events over time and
responding to them inline.
This drastically simplifies event management,
and will make moving it to a separate system
essentially trivial.
Signed-off-by: Luke Kanies <luke@madstop.com>
Diffstat (limited to 'lib/puppet')
-rw-r--r-- | lib/puppet/transaction.rb | 319 | ||||
-rw-r--r-- | lib/puppet/transaction/event.rb | 19 |
2 files changed, 122 insertions, 216 deletions
diff --git a/lib/puppet/transaction.rb b/lib/puppet/transaction.rb index 725b86dc5..6671fb1f0 100644 --- a/lib/puppet/transaction.rb +++ b/lib/puppet/transaction.rb @@ -4,8 +4,7 @@ require 'puppet' require 'puppet/util/tagging' -module Puppet -class Transaction +class Puppet::Transaction require 'puppet/transaction/change' require 'puppet/transaction/event' @@ -15,6 +14,9 @@ class Transaction # The list of events generated in this transaction. attr_reader :events + # Mostly only used for tests + attr_reader :resourcemetrics, :changes + include Puppet::Util include Puppet::Util::Tagging @@ -58,9 +60,7 @@ class Transaction begin changes = resource.evaluate rescue => detail - if Puppet[:trace] - puts detail.backtrace - end + puts detail.backtrace if Puppet[:trace] resource.err "Failed to retrieve current state of resource: %s" % detail @@ -68,18 +68,16 @@ class Transaction @failures[resource] += 1 # And then return - return [] + return end changes = [changes] unless changes.is_a?(Array) - if changes.length > 0 - @resourcemetrics[:out_of_sync] += 1 - end + @resourcemetrics[:out_of_sync] += 1 if changes.length > 0 - return [] if changes.empty? or ! allow_processing?(resource, changes) + return if changes.empty? or ! allow_processing?(resource, changes) - resourceevents = apply_changes(resource, changes) + apply_changes(resource, changes) # If there were changes and the resource isn't in noop mode... unless changes.empty? or resource.noop @@ -90,63 +88,22 @@ class Transaction if resource.respond_to?(:flush) resource.flush end - - # And set a trigger for refreshing this resource if it's a - # self-refresher - if resource.self_refresh? and ! resource.deleting? - # Create an edge with this resource as both the source and - # target. The triggering method treats these specially for - # logging. - events = resourceevents.collect { |e| e.name } - set_trigger(Puppet::Relationship.new(resource, resource, :callback => :refresh, :event => events)) - end end - - resourceevents end # Apply each change in turn. def apply_changes(resource, changes) - changes.collect { |change| - @changes << change - @count += 1 - events = nil - begin - # use an array, so that changes can return more than one - # event if they want - events = [change.forward].flatten.reject { |e| e.nil? } - rescue => detail - if Puppet[:trace] - puts detail.backtrace - end - change.property.err "change from %s to %s failed: %s" % - [change.property.is_to_s(change.is), change.property.should_to_s(change.should), detail] - @failures[resource] += 1 - next - # FIXME this should support using onerror to determine - # behaviour; or more likely, the client calling us - # should do so - end - - # Mark that our change happened, so it can be reversed - # if we ever get to that point - unless events.nil? or (events.is_a?(Array) and (events.empty?) or events.include?(:noop)) - change.changed = true - @resourcemetrics[:applied] += 1 - end - - events - }.flatten.reject { |e| e.nil? } + changes.each { |change| apply_change(resource, change) } end # Find all of the changed resources. def changed? - @changes.find_all { |change| change.changed }.collect { |change| + @changes.find_all { |change| change.changed }.collect do |change| unless change.property.resource raise "No resource for %s" % change.inspect end change.property.resource - }.uniq + end.uniq end # Do any necessary cleanup. If we don't get rid of the graphs, the @@ -192,42 +149,18 @@ class Transaction # Evaluate a single resource. def eval_resource(resource) - events = [] - - if resource.is_a?(Puppet::Type::Component) - raise Puppet::DevError, "Got a component to evaluate" - end - if skip?(resource) @resourcemetrics[:skipped] += 1 - else - events += eval_children_and_apply_resource(resource) + return end - # Check to see if there are any events for this resource - if triggedevents = trigger(resource) - events += triggedevents - end + eval_children_and_apply_resource(resource) - # Collect the targets of any subscriptions to those events. We pass - # the parent resource in so it will override the source in the events, - # since eval_generated children can't have direct relationships. - relationship_graph.matching_edges(events, resource).each do |orig_edge| - # We have to dup the label here, else we modify the original edge label, - # which affects whether a given event will match on the next run, which is, - # of course, bad. - edge = orig_edge.class.new(orig_edge.source, orig_edge.target, orig_edge.label) - edge.event = events.collect { |e| e.name } - set_trigger(edge) - end - - # And return the events for collection - events + # Check to see if there are any events queued for this resource + process_events(resource) end def eval_children_and_apply_resource(resource) - events = [] - @resourcemetrics[:scheduled] += 1 changecount = @changes.length @@ -239,18 +172,18 @@ class Transaction if ! children.empty? and resource.depthfirst? children.each do |child| # The child will never be skipped when the parent isn't - events += eval_resource(child, false) + eval_resource(child, false) end end # Perform the actual changes seconds = thinmark do - events += apply(resource) + apply(resource) end if ! children.empty? and ! resource.depthfirst? children.each do |child| - events += eval_resource(child) + eval_resource(child) end end @@ -265,15 +198,14 @@ class Transaction # Keep track of how long we spend in each type of resource @timemetrics[resource.class.name] += seconds - - events end # This method does all the actual work of running a transaction. It # collects all of the changes, executes them, and responds to any # necessary events. def evaluate - @count = 0 + # Start logging. + Puppet::Util::Log.newdestination(@report) prepare() @@ -295,11 +227,7 @@ class Transaction ret }.flatten.reject { |e| e.nil? } - Puppet.debug "Finishing transaction %s with %s changes" % - [self.object_id, @count] - - @events = allevents - allevents + Puppet.debug "Finishing transaction #{object_id} with #{@changes.length} changes" end # Determine whether a given resource has failed. @@ -417,26 +345,21 @@ class Transaction # Metrics for distributing times across the different types. @timemetrics = Hash.new(0) - # The number of resources that were triggered in this run - @triggered = Hash.new { |hash, key| - hash[key] = Hash.new(0) - } - - # Targets of being triggered. - @targets = Hash.new do |hash, key| - hash[key] = [] - end + @event_queues = {} # The changes we're performing @changes = [] + # The complete list of events generated. + @events = [] + # The resources that have failed and the number of failures each. This # is used for skipping resources because of failed dependencies. @failures = Hash.new do |h, key| h[key] = 0 end - @count = 0 + @report = Report.new end # Prefetch any providers that support it. We don't support prefetching @@ -477,15 +400,67 @@ class Transaction @sorted_resources = relationship_graph.topsort end + # Respond to any queued events for this resource. + def process_events(resource) + restarted = false + queued_events(resource) do |callback, events| + r = process_callback(resource, callback, events) + restarted ||= r + end + + if restarted + queue_events(Puppet::Transaction::Event.new(:restarted, resource)) + + @resourcemetrics[:restarted] += 1 + end + end + + # Queue events for other resources to respond to. All of these events have + # to be from the same resource. + def queue_events(*events) + events.flatten! + + @events += events + + resource = events[0].resource + + # Collect the targets of any subscriptions to those events. We pass + # the parent resource in so it will override the source in the events, + # since eval_generated children can't have direct relationships. + relationship_graph.matching_edges(events, resource).each do |edge| + next unless method = edge.callback + next unless edge.target.respond_to?(method) + + queue_events_for_resource(resource, edge.target, method, events) + end + + if resource.self_refresh? and ! resource.deleting? + queue_events_for_resource(resource, resource, :refresh, events) + end + end + + def queue_events_for_resource(source, target, callback, events) + source.info "Scheduling #{callback} of #{target}" + + @event_queues[target] ||= {} + @event_queues[target][callback] ||= [] + @event_queues[target][callback] += events + end + + def queued_events(resource) + return unless callbacks = @event_queues[resource] + callbacks.each do |callback, events| + yield callback, events + end + end + def relationship_graph catalog.relationship_graph end # Roll all completed changes back. def rollback - @targets.clear - @triggered.clear - allevents = @changes.reverse.collect { |change| + @changes.reverse.collect do |change| # skip changes that were never actually run unless change.changed Puppet.debug "%s was not changed" % change.to_s @@ -507,19 +482,12 @@ class Transaction # but a chmod failed? how would i handle that error? dern end - # FIXME This won't work right now. - relationship_graph.matching_edges(events).each do |edge| - @targets[edge.target] << edge - end + # And queue the events + queue_events(events) # Now check to see if there are any events for this child. - # Kind of hackish, since going backwards goes a change at a - # time, not a child at a time. - trigger(change.property.resource) - - # And return the events for collection - events - }.flatten.reject { |e| e.nil? } + process_events(change.property.resource) + end end # Is the resource currently scheduled? @@ -527,18 +495,6 @@ class Transaction self.ignoreschedules or resource.scheduled? end - # Set an edge to be triggered when we evaluate its target. - def set_trigger(edge) - return unless method = edge.callback - return unless edge.target.respond_to?(method) - if edge.target.respond_to?(:ref) - unless edge.source == edge.target - edge.source.info "Scheduling %s of %s" % [edge.callback, edge.target.ref] - end - end - @targets[edge.target] << edge - end - # Should this resource be skipped? def skip?(resource) skip = false @@ -581,87 +537,50 @@ class Transaction self.ignore_tags? or tags.empty? or resource.tagged?(*tags) end - # Are there any edges that target this resource? - def targeted?(resource) - # The default value is a new array so we have to test the length of it. - @targets.include?(resource) and @targets[resource].length > 0 - end - - # Trigger any subscriptions to a child. This does an upwardly recursive - # search -- it triggers the passed resource, but also the resource's parent - # and so on up the tree. - def trigger(resource) - return nil unless targeted?(resource) - callbacks = Hash.new { |hash, key| hash[key] = [] } - - trigged = [] - @targets[resource].each do |edge| - # Collect all of the subs for each callback - callbacks[edge.callback] << edge - end - - callbacks.each do |callback, subs| - noop = true - subs.each do |edge| - if edge.event.nil? or ! edge.event.include?(:noop) - noop = false - end - end - - if noop - resource.notice "Would have triggered %s from %s dependencies" % - [callback, subs.length] - - # And then add an event for it. - return [Puppet::Transaction::Event.new(:noop, resource)] - end - - if subs.length == 1 and subs[0].source == resource - message = "Refreshing self" - else - message = "Triggering '%s' from %s dependencies" % - [callback, subs.length] - end - resource.notice message + private - # At this point, just log failures, don't try to react - # to them in any way. - begin - resource.send(callback) - @resourcemetrics[:restarted] += 1 - rescue => detail - resource.err "Failed to call %s on %s: %s" % - [callback, resource, detail] + def apply_change(resource, change) + @changes << change - @resourcemetrics[:failed_restarts] += 1 + # use an array, so that changes can return more than one + # event if they want + events = [change.forward].flatten.reject { |e| e.nil? } - if Puppet[:trace] - puts detail.backtrace - end - end + # Mark that our change happened, so it can be reversed + # if we ever get to that point + change.changed = true + @resourcemetrics[:applied] += 1 + queue_events(events) + rescue => detail + puts detail.backtrace if Puppet[:trace] + is = change.property.is_to_s(change.is) + should = change.property.should_to_s(change.should) + change.property.err "change from #{is} to #{should} failed: #{detail}" + @failures[resource] += 1 + end - # And then add an event for it. - trigged << Puppet::Transaction::Event.new(:triggered, resource) + def process_callback(resource, callback, events) + process_noop_events(resource, callback, events) and return false if events.detect { |e| e.name == :noop } + resource.send(callback) - triggered(resource, callback) - end + resource.notice "Triggered '#{callback}' from #{events.length} events" + return true + rescue => detail + resource.err "Failed to call #{callback}: #{detail}" - if trigged.empty? - return nil - else - return trigged - end + @resourcemetrics[:failed_restarts] += 1 + puts detail.backtrace if Puppet[:trace] + return false end - def triggered(resource, method) - @triggered[resource][method] += 1 - end + def process_noop_events(resource, callback, events) + resource.notice "Would have triggered '#{callback}' from #{events.length} events" - def triggered?(resource, method) - @triggered[resource][method] + # And then add an event for it. + queue_events(Puppet::Transaction::Event.new(:noop, resource)) + true # so the 'and if' works end end -end require 'puppet/transaction/report' diff --git a/lib/puppet/transaction/event.rb b/lib/puppet/transaction/event.rb index 54c092e65..5d422f93d 100644 --- a/lib/puppet/transaction/event.rb +++ b/lib/puppet/transaction/event.rb @@ -1,21 +1,8 @@ require 'puppet' -require 'puppet/util/methodhelper' -require 'puppet/util/errors' - -# events are transient packets of information; they result in one or more (or none) -# subscriptions getting triggered, and then they get cleared -# eventually, these will be passed on to some central event system -class Puppet::Transaction::Event - include Puppet::Util::MethodHelper - include Puppet::Util::Errors - - attr_reader :name, :source - - def initialize(name, source) - @name, @source = name, source - end +# A simple struct for storing what happens on the system. +Puppet::Transaction::Event = Struct.new(:name, :resource, :property, :result, :log, :previous_value, :desired_value) do def to_s - source.to_s + " -> " + name.to_s + log end end |