diff options
author | Michael DeHaan <mdehaan@redhat.com> | 2008-01-29 16:19:30 -0500 |
---|---|---|
committer | Michael DeHaan <mdehaan@redhat.com> | 2008-01-29 16:19:30 -0500 |
commit | b3c5591d70c1c354d14267e804ab64872af97b40 (patch) | |
tree | a361f4d0ea060df23ffbccf9961f38bb01a65d23 /func | |
parent | 1d60f197dab809e9a51c3377587d46370e698c52 (diff) | |
download | third_party-func-b3c5591d70c1c354d14267e804ab64872af97b40.tar.gz third_party-func-b3c5591d70c1c354d14267e804ab64872af97b40.tar.xz third_party-func-b3c5591d70c1c354d14267e804ab64872af97b40.zip |
All exceptions, async or otherwise, now come back as easily detectable signatures. Use utils.is_error(result)
to determine if something is an error or isn't. Example scripts as well as func-inventory have been updated.
See async_test.py for examples.
Diffstat (limited to 'func')
-rw-r--r-- | func/forkbomb.py | 8 | ||||
-rw-r--r-- | func/jobthing.py | 35 | ||||
-rwxr-xr-x | func/minion/server.py | 21 | ||||
-rwxr-xr-x | func/overlord/client.py | 10 | ||||
-rwxr-xr-x | func/overlord/inventory.py | 18 | ||||
-rwxr-xr-x | func/utils.py | 58 |
6 files changed, 60 insertions, 90 deletions
diff --git a/func/forkbomb.py b/func/forkbomb.py index 3dc12c8..3dfa6f2 100644 --- a/func/forkbomb.py +++ b/func/forkbomb.py @@ -21,6 +21,7 @@ import sys import tempfile import fcntl import utils +import xmlrpclib DEFAULT_FORKS = 4 DEFAULT_CACHE_DIR = "/var/lib/func" @@ -54,15 +55,12 @@ def __access_buckets(filename,clear,new_key=None,new_value=None): if not storage.has_key("data"): storage["data"] = {} else: - # print "DEBUG: existing: %s" % storage["data"] pass if new_key is not None: # bsdb is a bit weird about this newish = storage["data"].copy() - new_value = utils.remove_exceptions(new_value) newish[new_key] = new_value - # print "DEBUG: newish: %s" % newish storage["data"] = newish rc = storage["data"].copy() @@ -78,14 +76,12 @@ def __bucketize(pool, slots): """ buckets = {} count = 0 - # print "DEBUG: slots: %s" % slots for key in pool: count = count + 1 slot = count % slots if not buckets.has_key(slot): buckets[slot] = [] buckets[slot].append(key) - # print "DEBUG: buckets: %s" % buckets return buckets def __with_my_bucket(bucket_number,buckets,what_to_do,filename): @@ -139,11 +135,9 @@ def batch_run(pool,callback,nforks=DEFAULT_FORKS,cachedir=DEFAULT_CACHE_DIR): if nforks <= 1: # modulus voodoo gets crazy otherwise and bad things happen nforks = 2 - # print "DEBUG: nforks=%s" % 2 shelf_file = __get_storage(cachedir) __access_buckets(shelf_file,True,None) buckets = __bucketize(pool, nforks) - # print "DEBUG: buckets: %s" % buckets __forkbomb(1,buckets,callback,shelf_file) rc = __access_buckets(shelf_file,False,None) os.remove(shelf_file) diff --git a/func/jobthing.py b/func/jobthing.py index ca8ee38..67ad1a6 100644 --- a/func/jobthing.py +++ b/func/jobthing.py @@ -24,6 +24,7 @@ import tempfile import fcntl import forkbomb import utils +import traceback JOB_ID_RUNNING = 0 JOB_ID_FINISHED = 1 @@ -31,8 +32,6 @@ JOB_ID_LOST_IN_SPACE = 2 JOB_ID_ASYNC_PARTIAL = 3 JOB_ID_ASYNC_FINISHED = 4 -REMOTE_ERROR = utils.REMOTE_CANARY - # how long to retain old job records in the job id database RETAIN_INTERVAL = 60 * 60 @@ -86,9 +85,6 @@ def __access_status(jobid=0, status=0, results=0, clear=False, write=False, purg __purge_old_jobs(storage) if write: - results = utils.remove_exceptions(results) - # print "DEBUG: status=%s" % status - # print "DEBUG: results=%s" % results storage[str(jobid)] = (status, results) rc = jobid elif not purge: @@ -119,7 +115,6 @@ def batch_run(server, process_server, nforks): job_id = time.time() pid = os.fork() if pid != 0: - #print "DEBUG: UPDATE STATUS: r1: %s" % job_id __update_status(job_id, JOB_ID_RUNNING, -1) return job_id else: @@ -131,12 +126,14 @@ def batch_run(server, process_server, nforks): __update_status(job_id, JOB_ID_ASYNC_PARTIAL, results) sys.exit(0) -def minion_async_run(function_ref, args): +def minion_async_run(retriever, method, args): """ This is a simpler invocation for minion side async usage. """ # to avoid confusion of job id's (we use the same job database) # minion jobs contain the string "minion". + + job_id = "%s-minion" % time.time() pid = os.fork() if pid != 0: @@ -145,19 +142,13 @@ def minion_async_run(function_ref, args): else: __update_status(job_id, JOB_ID_RUNNING, -1) try: - results = function_ref(*args) + function_ref = retriever(method) + rc = function_ref(*args) except Exception, e: - # FIXME: we need to make sure this is logged - # NOTE: we need a return here, else the async engine won't get any results - # so that is the reason for the catch - # FIXME: it would be nice to store the string data from the exception here so that the caller - # can read the exception data, however we can't store the exception directly in the DB. - # some care must be made to also make this not suck for the user of the API, - # when they are iterating over batch results, so they can tell good data from exceptions that - # are represented as strings. Ideally, reconstructing the exceptions back into objects would be shiny - # but if we go there I will need more caffeine first. - __update_status(job_id, JOB_ID_FINISHED, REMOTE_ERROR) - __update_status(job_id, JOB_ID_FINISHED, results) + (t, v, tb) = sys.exc_info() + rc = utils.nice_exception(t,v,tb) + + __update_status(job_id, JOB_ID_FINISHED, rc) sys.exit(0) def job_status(jobid, client_class=None): @@ -179,21 +170,19 @@ def job_status(jobid, client_class=None): partial_results = {} - # print "DEBUG: partial results for batch task: %s" % interim_results + some_missing = False for host in interim_results.keys(): minion_job = interim_results[host] client = client_class(host, noglobs=True, async=False) minion_result = client.jobs.job_status(minion_job) - # print "DEBUG: background task on minion (%s) has status %s" % (minion_job, minion_result) (minion_interim_rc, minion_interim_result) = minion_result - some_missing = False if minion_interim_rc not in [ JOB_ID_RUNNING ]: if minion_interim_rc in [ JOB_ID_LOST_IN_SPACE ]: - partial_results[host] = REMOTE_ERROR + partial_results[host] = [ utils.REMOTE_ERROR, "lost job" ] else: partial_results[host] = minion_interim_result else: diff --git a/func/minion/server.py b/func/minion/server.py index 7187783..7a760a0 100755 --- a/func/minion/server.py +++ b/func/minion/server.py @@ -131,11 +131,12 @@ class FuncApiMethod: rc = self.__method(*args) except codes.FuncException, e: self.__log_exc() - rc = e + (t, v, tb) = sys.exc_info() + rc = utils.nice_exception(t,v,tb) except: - self.logger.debug("Not a Func-specific exception") self.__log_exc() - raise + (t, v, tb) = sys.exc_info() + rc = utils.nice_exception(t,v,tb) self.logger.debug("Return code for %s: %s" % (self.__name, rc)) return rc @@ -218,11 +219,15 @@ class FuncSSLXMLRPCServer(AuthedXMLRPCServer.AuthedSSLXMLRPCServer, sub_hash = peer_cert.subject_name_hash() self.audit_logger.log_call(ip, cn, sub_hash, method, params) - if not async_dispatch: - return self.get_dispatch_method(method)(*params) - else: - meth = self.get_dispatch_method(method) - return jobthing.minion_async_run(meth, params) + try: + if not async_dispatch: + return self.get_dispatch_method(method)(*params) + else: + return jobthing.minion_async_run(self.get_dispatch_method, method, params) + except: + (t, v, tb) = sys.exc_info() + rc = utils.nice_exception(t, v, tb) + return rc def auth_cb(self, request, client_address): peer_cert = request.get_peer_certificate() diff --git a/func/overlord/client.py b/func/overlord/client.py index f33bc4b..e293f1c 100755 --- a/func/overlord/client.py +++ b/func/overlord/client.py @@ -25,7 +25,7 @@ import command import groups import func.forkbomb as forkbomb import func.jobthing as jobthing - +import func.utils as utils # =================================== # defaults @@ -132,7 +132,7 @@ def isServer(server_string): class Client(object): def __init__(self, server_spec, port=DEFAULT_PORT, interactive=False, - verbose=False, noglobs=False, nforks=1, config=None, async=False, noexceptions=True): + verbose=False, noglobs=False, nforks=1, config=None, async=False): """ Constructor. @server_spec -- something like "*.example.org" or "foosball" @@ -153,7 +153,6 @@ class Client(object): self.noglobs = noglobs self.nforks = nforks self.async = async - self.noexceptions= noexceptions self.servers = expand_servers(self.server_spec, port=self.port, noglobs=self.noglobs,verbose=self.verbose) @@ -234,12 +233,11 @@ class Client(object): if self.interactive: print retval except Exception, e: - retval = e + (t, v, tb) = sys.exc_info() + retval = utils.nice_exception(t,v,tb) if self.interactive: sys.stderr.write("remote exception on %s: %s\n" % (server, str(e))) - if self.noglob and not self.noexceptions: - raise(e) if self.noglobs: return retval diff --git a/func/overlord/inventory.py b/func/overlord/inventory.py index 47fff6a..c2a1d30 100755 --- a/func/overlord/inventory.py +++ b/func/overlord/inventory.py @@ -21,8 +21,8 @@ import sys import pprint import xmlrpclib from func.minion import sub_process - import func.overlord.client as func_client +import func.utils as utils DEFAULT_TREE = "/var/lib/func/inventory/" @@ -80,19 +80,21 @@ class FuncInventory(object): # call all remote info methods and handle them if options.verbose: - # print "- DEBUG: %s" % host_methods print "- scanning ..." # for (host, modules) in host_modules.iteritems(): for (host, methods) in host_methods.iteritems(): - - for each_method in methods: + if utils.is_error(methods): + print "-- connection refused: %s" % host + break + + for each_method in methods: - if type(each_method) == int: - if self.options.verbose: - print "-- connection refused: %s" % host - break + #if type(each_method) == int: + # if self.options.verbose: + # print "-- connection refused: %s" % host + # break (module_name, method_name) = each_method.split(".") diff --git a/func/utils.py b/func/utils.py index c2fbb9f..1a4abb7 100755 --- a/func/utils.py +++ b/func/utils.py @@ -16,17 +16,13 @@ import sys import traceback import xmlrpclib -REMOTE_CANARY = "***REMOTE_ERROR***" +REMOTE_ERROR = "REMOTE_ERROR" -# this is kind of handy, so keep it around for now -# but we really need to fix out server side logging and error -# reporting so we don't need it def trace_me(): x = traceback.extract_stack() bar = string.join(traceback.format_list(x)) return bar - def daemonize(pidfile=None): """ Daemonize this process with the UNIX double-fork trick. @@ -41,41 +37,27 @@ def daemonize(pidfile=None): os.umask(0) pid = os.fork() - if pid > 0: if pidfile is not None: open(pidfile, "w").write(str(pid)) sys.exit(0) -def remove_exceptions(results): - """ - Used by forkbomb/jobthing to avoid storing exceptions in database - because you know those don't serialize so well :) - # FIXME: this needs cleanup - """ - - if results is None: - return REMOTE_CANARY - - if str(results).startswith("<Fault"): - return REMOTE_CANARY - - if type(results) == xmlrpclib.Fault: - return REMOTE_CANARY - - if type(results) == dict: - new_results = {} - for x in results.keys(): - value = results[x] - if str(value).find("<Fault") == -1: - # there are interesting issues with the way it is imported and type() - # so that is why this hack is here. type(x) != xmlrpclib.Fault appears to miss some things - new_results[x] = value - else: - new_results[x] = REMOTE_CANARY - return new_results - - return results - - - +def nice_exception(etype, evalue, etb): + etype = str(etype) + lefti = etype.index("'") + 1 + righti = etype.rindex("'") + nicetype = etype[lefti:righti] + nicestack = string.join(traceback.format_list(traceback.extract_tb(etb))) + return [ REMOTE_ERROR, nicetype, str(evalue), nicestack ] + +def is_error(result): + if type(result) != list: + return False + if len(result) == 0: + return False + if result[0] == REMOTE_ERROR: + return True + return False + + + |