summaryrefslogtreecommitdiffstats
path: root/func
diff options
context:
space:
mode:
authorMichael DeHaan <mdehaan@redhat.com>2008-01-29 16:19:30 -0500
committerMichael DeHaan <mdehaan@redhat.com>2008-01-29 16:19:30 -0500
commitb3c5591d70c1c354d14267e804ab64872af97b40 (patch)
treea361f4d0ea060df23ffbccf9961f38bb01a65d23 /func
parent1d60f197dab809e9a51c3377587d46370e698c52 (diff)
downloadthird_party-func-b3c5591d70c1c354d14267e804ab64872af97b40.tar.gz
third_party-func-b3c5591d70c1c354d14267e804ab64872af97b40.tar.xz
third_party-func-b3c5591d70c1c354d14267e804ab64872af97b40.zip
All exceptions, async or otherwise, now come back as easily detectable signatures. Use utils.is_error(result)
to determine if something is an error or isn't. Example scripts as well as func-inventory have been updated. See async_test.py for examples.
Diffstat (limited to 'func')
-rw-r--r--func/forkbomb.py8
-rw-r--r--func/jobthing.py35
-rwxr-xr-xfunc/minion/server.py21
-rwxr-xr-xfunc/overlord/client.py10
-rwxr-xr-xfunc/overlord/inventory.py18
-rwxr-xr-xfunc/utils.py58
6 files changed, 60 insertions, 90 deletions
diff --git a/func/forkbomb.py b/func/forkbomb.py
index 3dc12c8..3dfa6f2 100644
--- a/func/forkbomb.py
+++ b/func/forkbomb.py
@@ -21,6 +21,7 @@ import sys
import tempfile
import fcntl
import utils
+import xmlrpclib
DEFAULT_FORKS = 4
DEFAULT_CACHE_DIR = "/var/lib/func"
@@ -54,15 +55,12 @@ def __access_buckets(filename,clear,new_key=None,new_value=None):
if not storage.has_key("data"):
storage["data"] = {}
else:
- # print "DEBUG: existing: %s" % storage["data"]
pass
if new_key is not None:
# bsdb is a bit weird about this
newish = storage["data"].copy()
- new_value = utils.remove_exceptions(new_value)
newish[new_key] = new_value
- # print "DEBUG: newish: %s" % newish
storage["data"] = newish
rc = storage["data"].copy()
@@ -78,14 +76,12 @@ def __bucketize(pool, slots):
"""
buckets = {}
count = 0
- # print "DEBUG: slots: %s" % slots
for key in pool:
count = count + 1
slot = count % slots
if not buckets.has_key(slot):
buckets[slot] = []
buckets[slot].append(key)
- # print "DEBUG: buckets: %s" % buckets
return buckets
def __with_my_bucket(bucket_number,buckets,what_to_do,filename):
@@ -139,11 +135,9 @@ def batch_run(pool,callback,nforks=DEFAULT_FORKS,cachedir=DEFAULT_CACHE_DIR):
if nforks <= 1:
# modulus voodoo gets crazy otherwise and bad things happen
nforks = 2
- # print "DEBUG: nforks=%s" % 2
shelf_file = __get_storage(cachedir)
__access_buckets(shelf_file,True,None)
buckets = __bucketize(pool, nforks)
- # print "DEBUG: buckets: %s" % buckets
__forkbomb(1,buckets,callback,shelf_file)
rc = __access_buckets(shelf_file,False,None)
os.remove(shelf_file)
diff --git a/func/jobthing.py b/func/jobthing.py
index ca8ee38..67ad1a6 100644
--- a/func/jobthing.py
+++ b/func/jobthing.py
@@ -24,6 +24,7 @@ import tempfile
import fcntl
import forkbomb
import utils
+import traceback
JOB_ID_RUNNING = 0
JOB_ID_FINISHED = 1
@@ -31,8 +32,6 @@ JOB_ID_LOST_IN_SPACE = 2
JOB_ID_ASYNC_PARTIAL = 3
JOB_ID_ASYNC_FINISHED = 4
-REMOTE_ERROR = utils.REMOTE_CANARY
-
# how long to retain old job records in the job id database
RETAIN_INTERVAL = 60 * 60
@@ -86,9 +85,6 @@ def __access_status(jobid=0, status=0, results=0, clear=False, write=False, purg
__purge_old_jobs(storage)
if write:
- results = utils.remove_exceptions(results)
- # print "DEBUG: status=%s" % status
- # print "DEBUG: results=%s" % results
storage[str(jobid)] = (status, results)
rc = jobid
elif not purge:
@@ -119,7 +115,6 @@ def batch_run(server, process_server, nforks):
job_id = time.time()
pid = os.fork()
if pid != 0:
- #print "DEBUG: UPDATE STATUS: r1: %s" % job_id
__update_status(job_id, JOB_ID_RUNNING, -1)
return job_id
else:
@@ -131,12 +126,14 @@ def batch_run(server, process_server, nforks):
__update_status(job_id, JOB_ID_ASYNC_PARTIAL, results)
sys.exit(0)
-def minion_async_run(function_ref, args):
+def minion_async_run(retriever, method, args):
"""
This is a simpler invocation for minion side async usage.
"""
# to avoid confusion of job id's (we use the same job database)
# minion jobs contain the string "minion".
+
+
job_id = "%s-minion" % time.time()
pid = os.fork()
if pid != 0:
@@ -145,19 +142,13 @@ def minion_async_run(function_ref, args):
else:
__update_status(job_id, JOB_ID_RUNNING, -1)
try:
- results = function_ref(*args)
+ function_ref = retriever(method)
+ rc = function_ref(*args)
except Exception, e:
- # FIXME: we need to make sure this is logged
- # NOTE: we need a return here, else the async engine won't get any results
- # so that is the reason for the catch
- # FIXME: it would be nice to store the string data from the exception here so that the caller
- # can read the exception data, however we can't store the exception directly in the DB.
- # some care must be made to also make this not suck for the user of the API,
- # when they are iterating over batch results, so they can tell good data from exceptions that
- # are represented as strings. Ideally, reconstructing the exceptions back into objects would be shiny
- # but if we go there I will need more caffeine first.
- __update_status(job_id, JOB_ID_FINISHED, REMOTE_ERROR)
- __update_status(job_id, JOB_ID_FINISHED, results)
+ (t, v, tb) = sys.exc_info()
+ rc = utils.nice_exception(t,v,tb)
+
+ __update_status(job_id, JOB_ID_FINISHED, rc)
sys.exit(0)
def job_status(jobid, client_class=None):
@@ -179,21 +170,19 @@ def job_status(jobid, client_class=None):
partial_results = {}
- # print "DEBUG: partial results for batch task: %s" % interim_results
+ some_missing = False
for host in interim_results.keys():
minion_job = interim_results[host]
client = client_class(host, noglobs=True, async=False)
minion_result = client.jobs.job_status(minion_job)
- # print "DEBUG: background task on minion (%s) has status %s" % (minion_job, minion_result)
(minion_interim_rc, minion_interim_result) = minion_result
- some_missing = False
if minion_interim_rc not in [ JOB_ID_RUNNING ]:
if minion_interim_rc in [ JOB_ID_LOST_IN_SPACE ]:
- partial_results[host] = REMOTE_ERROR
+ partial_results[host] = [ utils.REMOTE_ERROR, "lost job" ]
else:
partial_results[host] = minion_interim_result
else:
diff --git a/func/minion/server.py b/func/minion/server.py
index 7187783..7a760a0 100755
--- a/func/minion/server.py
+++ b/func/minion/server.py
@@ -131,11 +131,12 @@ class FuncApiMethod:
rc = self.__method(*args)
except codes.FuncException, e:
self.__log_exc()
- rc = e
+ (t, v, tb) = sys.exc_info()
+ rc = utils.nice_exception(t,v,tb)
except:
- self.logger.debug("Not a Func-specific exception")
self.__log_exc()
- raise
+ (t, v, tb) = sys.exc_info()
+ rc = utils.nice_exception(t,v,tb)
self.logger.debug("Return code for %s: %s" % (self.__name, rc))
return rc
@@ -218,11 +219,15 @@ class FuncSSLXMLRPCServer(AuthedXMLRPCServer.AuthedSSLXMLRPCServer,
sub_hash = peer_cert.subject_name_hash()
self.audit_logger.log_call(ip, cn, sub_hash, method, params)
- if not async_dispatch:
- return self.get_dispatch_method(method)(*params)
- else:
- meth = self.get_dispatch_method(method)
- return jobthing.minion_async_run(meth, params)
+ try:
+ if not async_dispatch:
+ return self.get_dispatch_method(method)(*params)
+ else:
+ return jobthing.minion_async_run(self.get_dispatch_method, method, params)
+ except:
+ (t, v, tb) = sys.exc_info()
+ rc = utils.nice_exception(t, v, tb)
+ return rc
def auth_cb(self, request, client_address):
peer_cert = request.get_peer_certificate()
diff --git a/func/overlord/client.py b/func/overlord/client.py
index f33bc4b..e293f1c 100755
--- a/func/overlord/client.py
+++ b/func/overlord/client.py
@@ -25,7 +25,7 @@ import command
import groups
import func.forkbomb as forkbomb
import func.jobthing as jobthing
-
+import func.utils as utils
# ===================================
# defaults
@@ -132,7 +132,7 @@ def isServer(server_string):
class Client(object):
def __init__(self, server_spec, port=DEFAULT_PORT, interactive=False,
- verbose=False, noglobs=False, nforks=1, config=None, async=False, noexceptions=True):
+ verbose=False, noglobs=False, nforks=1, config=None, async=False):
"""
Constructor.
@server_spec -- something like "*.example.org" or "foosball"
@@ -153,7 +153,6 @@ class Client(object):
self.noglobs = noglobs
self.nforks = nforks
self.async = async
- self.noexceptions= noexceptions
self.servers = expand_servers(self.server_spec, port=self.port, noglobs=self.noglobs,verbose=self.verbose)
@@ -234,12 +233,11 @@ class Client(object):
if self.interactive:
print retval
except Exception, e:
- retval = e
+ (t, v, tb) = sys.exc_info()
+ retval = utils.nice_exception(t,v,tb)
if self.interactive:
sys.stderr.write("remote exception on %s: %s\n" %
(server, str(e)))
- if self.noglob and not self.noexceptions:
- raise(e)
if self.noglobs:
return retval
diff --git a/func/overlord/inventory.py b/func/overlord/inventory.py
index 47fff6a..c2a1d30 100755
--- a/func/overlord/inventory.py
+++ b/func/overlord/inventory.py
@@ -21,8 +21,8 @@ import sys
import pprint
import xmlrpclib
from func.minion import sub_process
-
import func.overlord.client as func_client
+import func.utils as utils
DEFAULT_TREE = "/var/lib/func/inventory/"
@@ -80,19 +80,21 @@ class FuncInventory(object):
# call all remote info methods and handle them
if options.verbose:
- # print "- DEBUG: %s" % host_methods
print "- scanning ..."
# for (host, modules) in host_modules.iteritems():
for (host, methods) in host_methods.iteritems():
-
- for each_method in methods:
+ if utils.is_error(methods):
+ print "-- connection refused: %s" % host
+ break
+
+ for each_method in methods:
- if type(each_method) == int:
- if self.options.verbose:
- print "-- connection refused: %s" % host
- break
+ #if type(each_method) == int:
+ # if self.options.verbose:
+ # print "-- connection refused: %s" % host
+ # break
(module_name, method_name) = each_method.split(".")
diff --git a/func/utils.py b/func/utils.py
index c2fbb9f..1a4abb7 100755
--- a/func/utils.py
+++ b/func/utils.py
@@ -16,17 +16,13 @@ import sys
import traceback
import xmlrpclib
-REMOTE_CANARY = "***REMOTE_ERROR***"
+REMOTE_ERROR = "REMOTE_ERROR"
-# this is kind of handy, so keep it around for now
-# but we really need to fix out server side logging and error
-# reporting so we don't need it
def trace_me():
x = traceback.extract_stack()
bar = string.join(traceback.format_list(x))
return bar
-
def daemonize(pidfile=None):
"""
Daemonize this process with the UNIX double-fork trick.
@@ -41,41 +37,27 @@ def daemonize(pidfile=None):
os.umask(0)
pid = os.fork()
-
if pid > 0:
if pidfile is not None:
open(pidfile, "w").write(str(pid))
sys.exit(0)
-def remove_exceptions(results):
- """
- Used by forkbomb/jobthing to avoid storing exceptions in database
- because you know those don't serialize so well :)
- # FIXME: this needs cleanup
- """
-
- if results is None:
- return REMOTE_CANARY
-
- if str(results).startswith("<Fault"):
- return REMOTE_CANARY
-
- if type(results) == xmlrpclib.Fault:
- return REMOTE_CANARY
-
- if type(results) == dict:
- new_results = {}
- for x in results.keys():
- value = results[x]
- if str(value).find("<Fault") == -1:
- # there are interesting issues with the way it is imported and type()
- # so that is why this hack is here. type(x) != xmlrpclib.Fault appears to miss some things
- new_results[x] = value
- else:
- new_results[x] = REMOTE_CANARY
- return new_results
-
- return results
-
-
-
+def nice_exception(etype, evalue, etb):
+ etype = str(etype)
+ lefti = etype.index("'") + 1
+ righti = etype.rindex("'")
+ nicetype = etype[lefti:righti]
+ nicestack = string.join(traceback.format_list(traceback.extract_tb(etb)))
+ return [ REMOTE_ERROR, nicetype, str(evalue), nicestack ]
+
+def is_error(result):
+ if type(result) != list:
+ return False
+ if len(result) == 0:
+ return False
+ if result[0] == REMOTE_ERROR:
+ return True
+ return False
+
+
+