summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--func/forkbomb.py2
-rw-r--r--func/jobthing.py2
-rw-r--r--func/minion/modules/delegation.py89
-rw-r--r--func/minion/modules/overlord.py12
-rwxr-xr-xfunc/overlord/client.py84
-rw-r--r--func/overlord/delegation_tools.py62
-rwxr-xr-xfunc/overlord/mapper.py6
7 files changed, 234 insertions, 23 deletions
diff --git a/func/forkbomb.py b/func/forkbomb.py
index ef0817a..0eb14fe 100644
--- a/func/forkbomb.py
+++ b/func/forkbomb.py
@@ -111,7 +111,7 @@ def __forkbomb(mybucket,buckets,what_to_do,filename):
raise ose
else:
__with_my_bucket(mybucket,buckets,what_to_do,filename)
- sys.exit(0)
+ os._exit(0)
def __demo(bucket_number, buckets, my_item):
"""
diff --git a/func/jobthing.py b/func/jobthing.py
index 051480d..d919bb3 100644
--- a/func/jobthing.py
+++ b/func/jobthing.py
@@ -130,7 +130,7 @@ def batch_run(pool, callback, nforks):
# we now have a list of job id's for each minion, kill the task
__update_status(job_id, JOB_ID_PARTIAL, results)
- sys.exit(0)
+ os._exit(0)
def minion_async_run(retriever, method, args):
"""
diff --git a/func/minion/modules/delegation.py b/func/minion/modules/delegation.py
index 9f34d1a..d7731bc 100644
--- a/func/minion/modules/delegation.py
+++ b/func/minion/modules/delegation.py
@@ -8,9 +8,17 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+import time
import func_module
import func.overlord.client as fc
+import func.jobthing as jobthing
+
from func import utils
+from func.overlord import delegation_tools as dtools
+
+#boolean value appended to job id list to denote direct/delegated calls
+DIRECT = False
+DELEGATED = True
class DelegationModule(func_module.FuncModule):
@@ -18,18 +26,83 @@ class DelegationModule(func_module.FuncModule):
api_version = "0.0.1"
description = "Minion-side module to support delegation on sub-Overlords."
- def run(self,module,method,args,delegation_path):
+ def run(self,module,method,args,delegation_list,async,nforks):
"""
Delegates commands down the path of delegation
supplied as an argument
"""
+ result_dict = {}
+ job_id_list = []
+
+ #separate list passed to us into minions we can call directly and
+ #further delegation paths
+ (single_paths, grouped_paths) = dtools.group_paths(delegation_list)
- next_hop = delegation_path[0]
- overlord = fc.Overlord(next_hop)
- if len(delegation_path) == 1: #minion exists under this overlord
+ #run delegated calls
+ for group in grouped_paths.keys():
+ overlord = fc.Overlord(group,
+ async=async,
+ nforks=nforks)
+ path_list = grouped_paths[group]
+ delegation_results = overlord.delegation.run(module,
+ method,
+ args,
+ path_list,
+ async,
+ nforks)
+ if async:
+ job_id_list.append([overlord,
+ delegation_results,
+ group,
+ True])
+ else:
+ #These are delegated calls, so we need to strip away the
+ #hash that surrounds the results
+ if utils.is_error(delegation_results[group]):
+ result_dict.update(delegation_results)
+ else:
+ result_dict.update(delegation_results[group])
+
+ #run direct calls
+ for minion in single_paths:
+ overlord = fc.Overlord(minion,
+ async=async,
+ nforks=nforks)
overlord_module = getattr(overlord,module)
- return getattr(overlord_module,method)(*args[:])
+ results = getattr(overlord_module,method)(*args[:])
+ if async:
+ job_id_list.append([overlord,
+ results,
+ minion,
+ False])
+ else:
+ result_dict.update(results)
+
+ #poll async calls
+ while len(job_id_list) > 0:
+ for job in job_id_list:
+ (return_code, async_results) = job[0].job_status(job[1])
+ if return_code == jobthing.JOB_ID_RUNNING:
+ pass #it's still going, ignore it this cycle
+ elif return_code == jobthing.JOB_ID_PARTIAL:
+ pass #yep, it's still rolling
+ elif return_code == jobthing.JOB_ID_REMOTE_ERROR:
+ result_dict.update(async_results)
+ job_id_list.remove(job)
+ else: #it's done or it's had an error, pass it up
+ if job[3] == DIRECT:
+ #this is a direct call, so we only need to
+ #update the hash with the pertinent results
+ results = async_results
+ elif job[3] == DELEGATED:
+ #this is a delegated call, so we need to strip
+ #away the nesting hash
+ results = async_results[job[2]]
+ else:
+ #and this code should never be reached
+ results = {}
+ result_dict.update(results)
+ job_id_list.remove(job)
+ time.sleep(0.1) #pause a bit so that we don't flood our minions
- stripped_list = delegation_path[1:len(delegation_path)]
- delegation_results = overlord.delegation.run(module,method,args,stripped_list)
- return delegation_results[next_hop] #strip away nested hash data from results
+ return result_dict
diff --git a/func/minion/modules/overlord.py b/func/minion/modules/overlord.py
index 710f6d1..743c672 100644
--- a/func/minion/modules/overlord.py
+++ b/func/minion/modules/overlord.py
@@ -30,12 +30,20 @@ class OverlordModule(func_module.FuncModule):
ping_results = fc.Overlord("*").test.ping()
for minion in ping_results.keys():
if ping_results[minion] == 1: #if minion is alive
- current_minions.append(minion) #add it to the list of current minions
+ current_minions.append(minion) #add it to the list
else:
cm = certmaster.CertMaster()
current_minions = cm.get_signed_certs()
for current_minion in current_minions:
- maphash[current_minion] = fc.Overlord(current_minion).overlord.map_minions()[current_minion]
+ if current_minion in utils.get_hostname():
+ maphash[current_minion] = {} #prevent infinite recursion
+ else:
+ next_hop = fc.Overlord(current_minion)
+ mapresults = next_hop.overlord.map_minions()[current_minion]
+ if not utils.is_error(mapresults):
+ maphash[current_minion] = mapresults
+ else:
+ maphash[current_minion] = {}
return maphash
diff --git a/func/overlord/client.py b/func/overlord/client.py
index 79ead06..b322f76 100755
--- a/func/overlord/client.py
+++ b/func/overlord/client.py
@@ -16,6 +16,7 @@
import sys
import glob
import os
+import time
import func.yaml as yaml
from certmaster.commonconfig import CMConfig
@@ -244,6 +245,21 @@ class Overlord(object):
# -----------------------------------------------
+ def list_minions(self, format='list'):
+ """
+ Returns a flat list containing the minions this Overlord object currently
+ controls
+ """
+ if self.delegate:
+ return dtools.match_glob_in_tree(self.server_spec, self.minionmap)
+ minionlist = [] #nasty ugly hack to remove duplicate minions from list
+ for minion in self.minions_class.get_all_hosts():
+ if minion not in minionlist: #ugh, brute force :(
+ minionlist.append(minion)
+ return minionlist
+
+ # -----------------------------------------------
+
def run(self, module, method, args, nforks=1):
"""
Invoke a remote method on one or more servers.
@@ -258,25 +274,61 @@ class Overlord(object):
if not self.delegate: #delegation is turned off, so run normally
return self.run_direct(module, method, args, nforks)
- resulthash = {}
+ delegatedhash = {}
+ directhash = {}
+ completedhash = {}
#First we get all call paths for minions not directly beneath this overlord
dele_paths = dtools.get_paths_for_glob(self.server_spec, self.minionmap)
- non_single_paths = [path for path in dele_paths if len(path) > 1]
- for path in non_single_paths:
- resulthash.update(self.run_direct(module,
+ #Then we group them together in a dictionary by a common next hop
+ (single_paths,grouped_paths) = dtools.group_paths(dele_paths)
+
+ for group in grouped_paths.keys():
+ delegatedhash.update(self.run_direct(module,
method,
args,
nforks,
- call_path=path))
+ call_path=grouped_paths[group],
+ suboverlord=group))
#Next, we run everything that can be run directly beneath this overlord
#Why do we do this after delegation calls? Imagine what happens when
#reboot is called...
- resulthash.update(self.run_direct(module,method,args,nforks))
+ directhash.update(self.run_direct(module,method,args,nforks))
- return resulthash
+ #poll async results if we've async turned on
+ if self.async:
+ while (len(delegatedhash) + len(directhash)) > 0:
+ for minion in delegatedhash.keys():
+ results = delegatedhash[minion]
+ (return_code, async_results) = self.job_status(results)
+ if return_code == jobthing.JOB_ID_RUNNING:
+ pass
+ elif return_code == jobthing.JOB_ID_PARTIAL:
+ pass
+ else:
+ completedhash.update(async_results[minion])
+ del delegatedhash[minion]
+
+ for minion in directhash.keys():
+ results = directhash[minion]
+ (return_code, async_results) = self.job_status(results)
+ if return_code == jobthing.JOB_ID_RUNNING:
+ pass
+ elif return_code == jobthing.JOB_ID_PARTIAL:
+ pass
+ else:
+ completedhash.update(async_results)
+ del directhash[minion]
+ time.sleep(0.1) #pause a bit so we don't flood our minions
+ return completedhash
+
+ #we didn't instantiate this Overlord in async mode, so we just return the
+ #result hash
+ completedhash.update(delegatedhash)
+ completedhash.update(directhash)
+ return completedhash
# -----------------------------------------------
@@ -327,12 +379,18 @@ class Overlord(object):
# this is the point at which we make the remote call.
if use_delegate:
- retval = getattr(conn, meth)(module, method, args, delegation_path)
+ retval = getattr(conn, meth)(module,
+ method,
+ args,
+ delegation_path,
+ self.async,
+ self.nforks)
else:
retval = getattr(conn, meth)(*args[:])
if self.interactive:
print retval
+
except Exception, e:
(t, v, tb) = sys.exc_info()
retval = utils.nice_exception(t,v,tb)
@@ -349,10 +407,10 @@ class Overlord(object):
return (server_name, retval)
if kwargs.has_key('call_path'): #we're delegating if this key exists
- spec = kwargs['call_path'][0] #the sub-overlord directly beneath this one
+ delegation_path = kwargs['call_path']
+ spec = kwargs['suboverlord'] #the sub-overlord directly beneath this one
minionobj = Minions(spec, port=self.port, verbose=self.verbose)
use_delegate = True #signal to process_server to call delegate method
- delegation_path = kwargs['call_path'][1:len(kwargs['call_path'])]
minionurls = minionobj.get_urls() #the single-item url list to make async
#tools such as jobthing/forkbomb happy
else: #we're directly calling minions, so treat everything normally
@@ -380,7 +438,13 @@ class Overlord(object):
minions = expanded_minions.get_urls()[0]
results = process_server(0, 0, minions)
+ if self.delegate and self.async:
+ return {spec:results}
+
if use_delegate:
+ if utils.is_error(results[spec]):
+ print results
+ return results
return results[spec]
return results
diff --git a/func/overlord/delegation_tools.py b/func/overlord/delegation_tools.py
index f3445c0..0f3b43e 100644
--- a/func/overlord/delegation_tools.py
+++ b/func/overlord/delegation_tools.py
@@ -17,6 +17,49 @@
import fnmatch
+class groupby(object):
+ """
+ Borrowing the groupby iterator class directly
+ from the Python API as it does not exist in Pythons < 2.4
+ """
+
+ def __init__(self, iterable, key=None):
+ if key is None:
+ key = lambda x: x
+ self.keyfunc = key
+ self.it = iter(iterable)
+ self.tgtkey = self.currkey = self.currvalue = xrange(0)
+ def __iter__(self):
+ return self
+ def next(self):
+ while self.currkey == self.tgtkey:
+ self.currvalue = self.it.next() # Exit on StopIteration
+ self.currkey = self.keyfunc(self.currvalue)
+ self.tgtkey = self.currkey
+ return (self.currkey, self._grouper(self.tgtkey))
+ def _grouper(self, tgtkey):
+ while self.currkey == tgtkey:
+ yield self.currvalue
+ self.currvalue = self.it.next() # Exit on StopIteration
+ self.currkey = self.keyfunc(self.currvalue)
+
+def group_paths(ungrouped_list):
+ """
+ Given a list of multi-element path lists,
+ groups them together into a list of single-element paths (which
+ exist directly under the current overlord) and a dictionary of paths
+ to send to next hops in the delegation chain, containing a list of lists
+ keyed by their common next hop.
+ """
+
+ single_paths = [path[0] for path in ungrouped_list if len(path) == 1]
+ non_single_paths = [path for path in ungrouped_list if len(path) > 1]
+ path_group = dict([(key,[path[1:len(path)] for path in list(gen)])
+ for key, gen in groupby(non_single_paths,
+ key=lambda x:x[0])])
+
+ return (single_paths,path_group)
+
def get_paths_for_glob(glob, minionmap):
"""
Given a glob, returns shortest path to all minions
@@ -30,6 +73,20 @@ def get_paths_for_glob(glob, minionmap):
pathlist.append(result)
return pathlist
+def list_all_minions(minionmap):
+ """
+ Given a minion map, returns a flat list of all minions
+ contained within it
+ """
+ minionlist = []
+ for minion in minionmap.keys():
+ if minion not in minionlist:
+ minionlist.append(minion)
+ for minion in list_all_minions(minionmap[minion]):
+ if minion not in minionlist:
+ minionlist.append(minion)
+ return minionlist
+
def flatten_list(bumpy_list):
"""
Flattens gnarly nested lists into much
@@ -151,5 +208,8 @@ if __name__ == "__main__":
print "Element: %s, best path: %s" % (elem, get_shortest_path(elem,mymap))
print "- And finally, with all duplicates removed:"
- for elem in get_paths_for_glob('*path*',mymap):
+ for elem in get_paths_for_glob('*',mymap):
print "Valid Path: %s" % elem
+
+ print "- And grouped together:"
+ print group_paths(get_paths_for_glob('*',mymap))
diff --git a/func/overlord/mapper.py b/func/overlord/mapper.py
index 6a1131c..2ed33dd 100755
--- a/func/overlord/mapper.py
+++ b/func/overlord/mapper.py
@@ -21,6 +21,8 @@ import sys
import func.yaml as yaml
import func.overlord.client as func_client
+from func import utils
+
DEFAULT_TREE = "/var/lib/func/map"
class MapperTool(object):
@@ -59,6 +61,10 @@ class MapperTool(object):
minion_hash = func_client.Overlord("*").overlord.map_minions(self.options.only_alive==True)
+ for minion in minion_hash.keys(): #clean hash of any top-level errors
+ if utils.is_error(minion_hash[minion]):
+ minion_hash[minion] = {}
+
if self.options.verbose:
print "- built the following map:"
print minion_hash