diff options
author | Michael DeHaan <mdehaan@redhat.com> | 2008-07-11 16:35:58 -0400 |
---|---|---|
committer | Michael DeHaan <mdehaan@redhat.com> | 2008-07-11 16:35:58 -0400 |
commit | ed30c66bfc08e95c959f4dc9b14b09d4a1ae07de (patch) | |
tree | 34eb3f86fb4295bb45543b5a6d52d6bca50e3aa1 /func/overlord/client.py | |
parent | 287bf22c59f714c6925377a3a44f59a8906c2e50 (diff) | |
parent | 035a69e6bc140e7079ac1c4ca90679b577ad1b56 (diff) | |
download | func-ed30c66bfc08e95c959f4dc9b14b09d4a1ae07de.tar.gz func-ed30c66bfc08e95c959f4dc9b14b09d4a1ae07de.tar.xz func-ed30c66bfc08e95c959f4dc9b14b09d4a1ae07de.zip |
Merge branch 'ssalevan_delegation'
Conflicts:
func.spec
func/minion/modules/certmastermod.py
setup.py
Diffstat (limited to 'func/overlord/client.py')
-rwxr-xr-x | func/overlord/client.py | 103 |
1 files changed, 90 insertions, 13 deletions
diff --git a/func/overlord/client.py b/func/overlord/client.py index c46cc1f..01a31a7 100755 --- a/func/overlord/client.py +++ b/func/overlord/client.py @@ -16,6 +16,7 @@ import sys import glob import os +import yaml from certmaster.commonconfig import CMConfig from func.config import read_config, CONFIG_FILE @@ -24,6 +25,7 @@ import sslclient import command import groups +import delegation_tools as dtools import func.forkbomb as forkbomb import func.jobthing as jobthing import func.utils as utils @@ -35,6 +37,8 @@ from func.CommonErrors import * DEFAULT_PORT = 51234 FUNC_USAGE = "Usage: %s [ --help ] [ --verbose ] target.example.org module method arg1 [...]" +DEFAULT_MAPLOC = "/var/lib/func/map" +DELEGATION_METH = "delegation.run" # =================================== @@ -88,13 +92,16 @@ class CommandAutomagic(object): class Minions(object): def __init__(self, spec, port=51234, noglobs=None, verbose=None, - just_fqdns=False, groups_file=None): + just_fqdns=False, groups_file=None, + delegate=False, minionmap={}): self.spec = spec self.port = port self.noglobs = noglobs self.verbose = verbose self.just_fqdns = just_fqdns + self.delegate = delegate + self.minionmap = minionmap self.config = read_config(CONFIG_FILE, CMConfig) self.group_class = groups.Groups(filename=groups_file) @@ -153,12 +160,11 @@ def is_minion(minion_string): return minions.is_minion() - - class Overlord(object): def __init__(self, server_spec, port=DEFAULT_PORT, interactive=False, - verbose=False, noglobs=False, nforks=1, config=None, async=False, init_ssl=True): + verbose=False, noglobs=False, nforks=1, config=None, async=False, init_ssl=True, + delegate=False, mapfile=DEFAULT_MAPLOC): """ Constructor. @server_spec -- something like "*.example.org" or "foosball" @@ -179,9 +185,19 @@ class Overlord(object): self.noglobs = noglobs self.nforks = nforks self.async = async + self.delegate = delegate + self.mapfile = mapfile self.minions_class = Minions(self.server_spec, port=self.port, noglobs=self.noglobs,verbose=self.verbose) self.minions = self.minions_class.get_urls() + + if self.delegate: + try: + mapstream = file(self.mapfile, 'r') + self.minionmap = yaml.load(mapstream) + except e: + sys.stderr.write("mapfile load failed, switching delegation off") + self.delegate = False if init_ssl: self.setup_ssl() @@ -256,9 +272,50 @@ class Overlord(object): If Overlord() was constructed with noglobs=True, the return is instead just a single value, not a hash. """ + + if not self.delegate: #delegation is turned off, so run normally + return self.run_direct(module, method, args, nforks) + + resulthash = {} + + #First we get all call paths for minions not directly beneath this overlord + dele_paths = dtools.get_paths_for_glob(self.server_spec, self.minionmap) + non_single_paths = [path for path in dele_paths if len(path) > 1] + + for path in non_single_paths: + resulthash.update(self.run_direct(module, + method, + args, + nforks, + call_path=path)) + + #Next, we run everything that can be run directly beneath this overlord + #Why do we do this after delegation calls? Imagine what happens when + #reboot is called... + resulthash.update(self.run_direct(module,method,args,nforks)) + + return resulthash + + + # ----------------------------------------------- - results = {} + def run_direct(self, module, method, args, nforks=1, *extraargs, **kwargs): + """ + Invoke a remote method on one or more servers. + Run returns a hash, the keys are server names, the values are the + returns. + + The returns may include exception objects. + If Overlord() was constructed with noglobs=True, the return is instead + just a single value, not a hash. + """ + results = {} + spec = '' + minionurls = [] + use_delegate = False + delegation_path = [] + def process_server(bucketnumber, buckets, server): conn = sslclient.FuncServer(server, self.key, self.cert, self.ca ) @@ -275,7 +332,10 @@ class Overlord(object): # thats some pretty code right there aint it? -akl # we can't call "call" on s, since thats a rpc, so # we call gettatr around it. - meth = "%s.%s" % (module, method) + if use_delegate: + meth = DELEGATION_METH #call delegation module + else: + meth = "%s.%s" % (module, method) # async calling signature has an "imaginary" prefix # so async.abc.def does abc.def as a background task. @@ -284,7 +344,10 @@ class Overlord(object): meth = "async.%s" % meth # this is the point at which we make the remote call. - retval = getattr(conn, meth)(*args[:]) + if use_delegate: + retval = getattr(conn, meth)(module, method, args, delegation_path) + else: + retval = getattr(conn, meth)(*args[:]) if self.interactive: print retval @@ -302,29 +365,43 @@ class Overlord(object): right = server.rfind(":") server_name = server[left:right] return (server_name, retval) - + + if kwargs.has_key('call_path'): #we're delegating if this key exists + spec = kwargs['call_path'][0] #the sub-overlord directly beneath this one + minionobj = Minions(spec, port=self.port, verbose=self.verbose) + use_delegate = True #signal to process_server to call delegate method + delegation_path = kwargs['call_path'][1:len(kwargs['call_path'])] + minionurls = minionobj.get_urls() #the single-item url list to make async + #tools such as jobthing/forkbomb happy + else: #we're directly calling minions, so treat everything normally + spec = self.server_spec + minionurls = self.minions + if not self.noglobs: if self.nforks > 1 or self.async: # using forkbomb module to distribute job over multiple threads if not self.async: - results = forkbomb.batch_run(self.minions, process_server, nforks) + results = forkbomb.batch_run(minionurls, process_server, nforks) else: - results = jobthing.batch_run(self.minions, process_server, nforks) + results = jobthing.batch_run(minionurls, process_server, nforks) else: # no need to go through the fork code, we can do this directly results = {} - for x in self.minions: + for x in minionurls: (nkey,nvalue) = process_server(0, 0, x) results[nkey] = nvalue else: # globbing is not being used, but still need to make sure # URI is well formed. # expanded = expand_servers(self.server_spec, port=self.port, noglobs=True, verbose=self.verbose)[0] - expanded_minions = Minions(self.server_spec, port=self.port, noglobs=True, verbose=self.verbose) + expanded_minions = Minions(spec, port=self.port, noglobs=True, verbose=self.verbose) minions = expanded_minions.get_urls()[0] # print minions results = process_server(0, 0, minions) - + + if use_delegate: + return results[spec] + return results # ----------------------------------------------- |