diff options
-rw-r--r-- | func/forkbomb.py | 2 | ||||
-rw-r--r-- | func/jobthing.py | 2 | ||||
-rw-r--r-- | func/minion/modules/delegation.py | 89 | ||||
-rw-r--r-- | func/minion/modules/overlord.py | 12 | ||||
-rwxr-xr-x | func/overlord/client.py | 84 | ||||
-rw-r--r-- | func/overlord/delegation_tools.py | 62 | ||||
-rwxr-xr-x | func/overlord/mapper.py | 6 |
7 files changed, 234 insertions, 23 deletions
diff --git a/func/forkbomb.py b/func/forkbomb.py index ef0817a..0eb14fe 100644 --- a/func/forkbomb.py +++ b/func/forkbomb.py @@ -111,7 +111,7 @@ def __forkbomb(mybucket,buckets,what_to_do,filename): raise ose else: __with_my_bucket(mybucket,buckets,what_to_do,filename) - sys.exit(0) + os._exit(0) def __demo(bucket_number, buckets, my_item): """ diff --git a/func/jobthing.py b/func/jobthing.py index 051480d..d919bb3 100644 --- a/func/jobthing.py +++ b/func/jobthing.py @@ -130,7 +130,7 @@ def batch_run(pool, callback, nforks): # we now have a list of job id's for each minion, kill the task __update_status(job_id, JOB_ID_PARTIAL, results) - sys.exit(0) + os._exit(0) def minion_async_run(retriever, method, args): """ diff --git a/func/minion/modules/delegation.py b/func/minion/modules/delegation.py index 9f34d1a..d7731bc 100644 --- a/func/minion/modules/delegation.py +++ b/func/minion/modules/delegation.py @@ -8,9 +8,17 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +import time import func_module import func.overlord.client as fc +import func.jobthing as jobthing + from func import utils +from func.overlord import delegation_tools as dtools + +#boolean value appended to job id list to denote direct/delegated calls +DIRECT = False +DELEGATED = True class DelegationModule(func_module.FuncModule): @@ -18,18 +26,83 @@ class DelegationModule(func_module.FuncModule): api_version = "0.0.1" description = "Minion-side module to support delegation on sub-Overlords." - def run(self,module,method,args,delegation_path): + def run(self,module,method,args,delegation_list,async,nforks): """ Delegates commands down the path of delegation supplied as an argument """ + result_dict = {} + job_id_list = [] + + #separate list passed to us into minions we can call directly and + #further delegation paths + (single_paths, grouped_paths) = dtools.group_paths(delegation_list) - next_hop = delegation_path[0] - overlord = fc.Overlord(next_hop) - if len(delegation_path) == 1: #minion exists under this overlord + #run delegated calls + for group in grouped_paths.keys(): + overlord = fc.Overlord(group, + async=async, + nforks=nforks) + path_list = grouped_paths[group] + delegation_results = overlord.delegation.run(module, + method, + args, + path_list, + async, + nforks) + if async: + job_id_list.append([overlord, + delegation_results, + group, + True]) + else: + #These are delegated calls, so we need to strip away the + #hash that surrounds the results + if utils.is_error(delegation_results[group]): + result_dict.update(delegation_results) + else: + result_dict.update(delegation_results[group]) + + #run direct calls + for minion in single_paths: + overlord = fc.Overlord(minion, + async=async, + nforks=nforks) overlord_module = getattr(overlord,module) - return getattr(overlord_module,method)(*args[:]) + results = getattr(overlord_module,method)(*args[:]) + if async: + job_id_list.append([overlord, + results, + minion, + False]) + else: + result_dict.update(results) + + #poll async calls + while len(job_id_list) > 0: + for job in job_id_list: + (return_code, async_results) = job[0].job_status(job[1]) + if return_code == jobthing.JOB_ID_RUNNING: + pass #it's still going, ignore it this cycle + elif return_code == jobthing.JOB_ID_PARTIAL: + pass #yep, it's still rolling + elif return_code == jobthing.JOB_ID_REMOTE_ERROR: + result_dict.update(async_results) + job_id_list.remove(job) + else: #it's done or it's had an error, pass it up + if job[3] == DIRECT: + #this is a direct call, so we only need to + #update the hash with the pertinent results + results = async_results + elif job[3] == DELEGATED: + #this is a delegated call, so we need to strip + #away the nesting hash + results = async_results[job[2]] + else: + #and this code should never be reached + results = {} + result_dict.update(results) + job_id_list.remove(job) + time.sleep(0.1) #pause a bit so that we don't flood our minions - stripped_list = delegation_path[1:len(delegation_path)] - delegation_results = overlord.delegation.run(module,method,args,stripped_list) - return delegation_results[next_hop] #strip away nested hash data from results + return result_dict diff --git a/func/minion/modules/overlord.py b/func/minion/modules/overlord.py index 710f6d1..743c672 100644 --- a/func/minion/modules/overlord.py +++ b/func/minion/modules/overlord.py @@ -30,12 +30,20 @@ class OverlordModule(func_module.FuncModule): ping_results = fc.Overlord("*").test.ping() for minion in ping_results.keys(): if ping_results[minion] == 1: #if minion is alive - current_minions.append(minion) #add it to the list of current minions + current_minions.append(minion) #add it to the list else: cm = certmaster.CertMaster() current_minions = cm.get_signed_certs() for current_minion in current_minions: - maphash[current_minion] = fc.Overlord(current_minion).overlord.map_minions()[current_minion] + if current_minion in utils.get_hostname(): + maphash[current_minion] = {} #prevent infinite recursion + else: + next_hop = fc.Overlord(current_minion) + mapresults = next_hop.overlord.map_minions()[current_minion] + if not utils.is_error(mapresults): + maphash[current_minion] = mapresults + else: + maphash[current_minion] = {} return maphash diff --git a/func/overlord/client.py b/func/overlord/client.py index 79ead06..b322f76 100755 --- a/func/overlord/client.py +++ b/func/overlord/client.py @@ -16,6 +16,7 @@ import sys import glob import os +import time import func.yaml as yaml from certmaster.commonconfig import CMConfig @@ -244,6 +245,21 @@ class Overlord(object): # ----------------------------------------------- + def list_minions(self, format='list'): + """ + Returns a flat list containing the minions this Overlord object currently + controls + """ + if self.delegate: + return dtools.match_glob_in_tree(self.server_spec, self.minionmap) + minionlist = [] #nasty ugly hack to remove duplicate minions from list + for minion in self.minions_class.get_all_hosts(): + if minion not in minionlist: #ugh, brute force :( + minionlist.append(minion) + return minionlist + + # ----------------------------------------------- + def run(self, module, method, args, nforks=1): """ Invoke a remote method on one or more servers. @@ -258,25 +274,61 @@ class Overlord(object): if not self.delegate: #delegation is turned off, so run normally return self.run_direct(module, method, args, nforks) - resulthash = {} + delegatedhash = {} + directhash = {} + completedhash = {} #First we get all call paths for minions not directly beneath this overlord dele_paths = dtools.get_paths_for_glob(self.server_spec, self.minionmap) - non_single_paths = [path for path in dele_paths if len(path) > 1] - for path in non_single_paths: - resulthash.update(self.run_direct(module, + #Then we group them together in a dictionary by a common next hop + (single_paths,grouped_paths) = dtools.group_paths(dele_paths) + + for group in grouped_paths.keys(): + delegatedhash.update(self.run_direct(module, method, args, nforks, - call_path=path)) + call_path=grouped_paths[group], + suboverlord=group)) #Next, we run everything that can be run directly beneath this overlord #Why do we do this after delegation calls? Imagine what happens when #reboot is called... - resulthash.update(self.run_direct(module,method,args,nforks)) + directhash.update(self.run_direct(module,method,args,nforks)) - return resulthash + #poll async results if we've async turned on + if self.async: + while (len(delegatedhash) + len(directhash)) > 0: + for minion in delegatedhash.keys(): + results = delegatedhash[minion] + (return_code, async_results) = self.job_status(results) + if return_code == jobthing.JOB_ID_RUNNING: + pass + elif return_code == jobthing.JOB_ID_PARTIAL: + pass + else: + completedhash.update(async_results[minion]) + del delegatedhash[minion] + + for minion in directhash.keys(): + results = directhash[minion] + (return_code, async_results) = self.job_status(results) + if return_code == jobthing.JOB_ID_RUNNING: + pass + elif return_code == jobthing.JOB_ID_PARTIAL: + pass + else: + completedhash.update(async_results) + del directhash[minion] + time.sleep(0.1) #pause a bit so we don't flood our minions + return completedhash + + #we didn't instantiate this Overlord in async mode, so we just return the + #result hash + completedhash.update(delegatedhash) + completedhash.update(directhash) + return completedhash # ----------------------------------------------- @@ -327,12 +379,18 @@ class Overlord(object): # this is the point at which we make the remote call. if use_delegate: - retval = getattr(conn, meth)(module, method, args, delegation_path) + retval = getattr(conn, meth)(module, + method, + args, + delegation_path, + self.async, + self.nforks) else: retval = getattr(conn, meth)(*args[:]) if self.interactive: print retval + except Exception, e: (t, v, tb) = sys.exc_info() retval = utils.nice_exception(t,v,tb) @@ -349,10 +407,10 @@ class Overlord(object): return (server_name, retval) if kwargs.has_key('call_path'): #we're delegating if this key exists - spec = kwargs['call_path'][0] #the sub-overlord directly beneath this one + delegation_path = kwargs['call_path'] + spec = kwargs['suboverlord'] #the sub-overlord directly beneath this one minionobj = Minions(spec, port=self.port, verbose=self.verbose) use_delegate = True #signal to process_server to call delegate method - delegation_path = kwargs['call_path'][1:len(kwargs['call_path'])] minionurls = minionobj.get_urls() #the single-item url list to make async #tools such as jobthing/forkbomb happy else: #we're directly calling minions, so treat everything normally @@ -380,7 +438,13 @@ class Overlord(object): minions = expanded_minions.get_urls()[0] results = process_server(0, 0, minions) + if self.delegate and self.async: + return {spec:results} + if use_delegate: + if utils.is_error(results[spec]): + print results + return results return results[spec] return results diff --git a/func/overlord/delegation_tools.py b/func/overlord/delegation_tools.py index f3445c0..0f3b43e 100644 --- a/func/overlord/delegation_tools.py +++ b/func/overlord/delegation_tools.py @@ -17,6 +17,49 @@ import fnmatch +class groupby(object): + """ + Borrowing the groupby iterator class directly + from the Python API as it does not exist in Pythons < 2.4 + """ + + def __init__(self, iterable, key=None): + if key is None: + key = lambda x: x + self.keyfunc = key + self.it = iter(iterable) + self.tgtkey = self.currkey = self.currvalue = xrange(0) + def __iter__(self): + return self + def next(self): + while self.currkey == self.tgtkey: + self.currvalue = self.it.next() # Exit on StopIteration + self.currkey = self.keyfunc(self.currvalue) + self.tgtkey = self.currkey + return (self.currkey, self._grouper(self.tgtkey)) + def _grouper(self, tgtkey): + while self.currkey == tgtkey: + yield self.currvalue + self.currvalue = self.it.next() # Exit on StopIteration + self.currkey = self.keyfunc(self.currvalue) + +def group_paths(ungrouped_list): + """ + Given a list of multi-element path lists, + groups them together into a list of single-element paths (which + exist directly under the current overlord) and a dictionary of paths + to send to next hops in the delegation chain, containing a list of lists + keyed by their common next hop. + """ + + single_paths = [path[0] for path in ungrouped_list if len(path) == 1] + non_single_paths = [path for path in ungrouped_list if len(path) > 1] + path_group = dict([(key,[path[1:len(path)] for path in list(gen)]) + for key, gen in groupby(non_single_paths, + key=lambda x:x[0])]) + + return (single_paths,path_group) + def get_paths_for_glob(glob, minionmap): """ Given a glob, returns shortest path to all minions @@ -30,6 +73,20 @@ def get_paths_for_glob(glob, minionmap): pathlist.append(result) return pathlist +def list_all_minions(minionmap): + """ + Given a minion map, returns a flat list of all minions + contained within it + """ + minionlist = [] + for minion in minionmap.keys(): + if minion not in minionlist: + minionlist.append(minion) + for minion in list_all_minions(minionmap[minion]): + if minion not in minionlist: + minionlist.append(minion) + return minionlist + def flatten_list(bumpy_list): """ Flattens gnarly nested lists into much @@ -151,5 +208,8 @@ if __name__ == "__main__": print "Element: %s, best path: %s" % (elem, get_shortest_path(elem,mymap)) print "- And finally, with all duplicates removed:" - for elem in get_paths_for_glob('*path*',mymap): + for elem in get_paths_for_glob('*',mymap): print "Valid Path: %s" % elem + + print "- And grouped together:" + print group_paths(get_paths_for_glob('*',mymap)) diff --git a/func/overlord/mapper.py b/func/overlord/mapper.py index 6a1131c..2ed33dd 100755 --- a/func/overlord/mapper.py +++ b/func/overlord/mapper.py @@ -21,6 +21,8 @@ import sys import func.yaml as yaml import func.overlord.client as func_client +from func import utils + DEFAULT_TREE = "/var/lib/func/map" class MapperTool(object): @@ -59,6 +61,10 @@ class MapperTool(object): minion_hash = func_client.Overlord("*").overlord.map_minions(self.options.only_alive==True) + for minion in minion_hash.keys(): #clean hash of any top-level errors + if utils.is_error(minion_hash[minion]): + minion_hash[minion] = {} + if self.options.verbose: print "- built the following map:" print minion_hash |