From 33c6e4013874878f05eec593d69e8afdeaae212b Mon Sep 17 00:00:00 2001 From: Michael DeHaan Date: Wed, 23 Jan 2008 18:51:33 -0500 Subject: Working on async error handling, lots more to do... (If it hits no exceptions, returns are right, it's the partial error case to deal with next...) --- func/forkbomb.py | 8 +++++++- func/jobthing.py | 6 +++++- func/minion/modules/test.py | 8 ++++++++ func/minion/server.py | 2 +- func/utils.py | 41 +++++++++++++++++++++++++++++++++++++++++ test/async_test.py | 7 ++++++- 6 files changed, 68 insertions(+), 4 deletions(-) diff --git a/func/forkbomb.py b/func/forkbomb.py index daad28a..3dc12c8 100644 --- a/func/forkbomb.py +++ b/func/forkbomb.py @@ -20,6 +20,7 @@ import bsddb import sys import tempfile import fcntl +import utils DEFAULT_FORKS = 4 DEFAULT_CACHE_DIR = "/var/lib/func" @@ -52,11 +53,16 @@ def __access_buckets(filename,clear,new_key=None,new_value=None): if not storage.has_key("data"): storage["data"] = {} + else: + # print "DEBUG: existing: %s" % storage["data"] + pass if new_key is not None: # bsdb is a bit weird about this newish = storage["data"].copy() + new_value = utils.remove_exceptions(new_value) newish[new_key] = new_value + # print "DEBUG: newish: %s" % newish storage["data"] = newish rc = storage["data"].copy() @@ -118,7 +124,7 @@ def __demo(bucket_number, buckets, my_item): This is a demo handler for test purposes. It just multiplies all numbers by 1000, but slowly. """ - print ">> I am fork (%s) and I am processing item (%s)" % (bucket_number, my_item) + # print ">> I am fork (%s) and I am processing item (%s)" % (bucket_number, my_item) # just to verify forks are not sequential sleep = random.randrange(0,4) time.sleep(sleep) diff --git a/func/jobthing.py b/func/jobthing.py index 486fe6b..5b89599 100644 --- a/func/jobthing.py +++ b/func/jobthing.py @@ -23,6 +23,7 @@ import sys import tempfile import fcntl import forkbomb +import utils JOB_ID_RUNNING = 0 JOB_ID_FINISHED = 1 @@ -82,6 +83,9 @@ def __access_status(jobid=0, status=0, results=0, clear=False, write=False, purg __purge_old_jobs(storage) if write: + results = utils.remove_exceptions(results) + # print "DEBUG: status=%s" % status + # print "DEBUG: results=%s" % results storage[str(jobid)] = (status, results) rc = jobid elif not purge: @@ -137,7 +141,7 @@ def minion_async_run(function_ref, args): return job_id else: __update_status(job_id, JOB_ID_RUNNING, -1) - results = function_ref(args) + results = function_ref(*args) __update_status(job_id, JOB_ID_FINISHED, results) sys.exit(0) diff --git a/func/minion/modules/test.py b/func/minion/modules/test.py index 6718fed..ed65fd4 100644 --- a/func/minion/modules/test.py +++ b/func/minion/modules/test.py @@ -1,5 +1,6 @@ import func_module import time +import exceptions class Test(func_module.FuncModule): version = "11.11.11" @@ -7,6 +8,7 @@ class Test(func_module.FuncModule): description = "Just a very simple example module" def add(self, numb1, numb2): + time.sleep(10) return numb1 + numb2 def ping(self): @@ -20,3 +22,9 @@ class Test(func_module.FuncModule): t = int(t) time.sleep(t) return time.time() + + def explode(self): + """ + Testing remote exception handling is useful + """ + raise exceptions.Exception("khhhhhhaaaaaan!!!!!!") diff --git a/func/minion/server.py b/func/minion/server.py index 1b2a556..7187783 100755 --- a/func/minion/server.py +++ b/func/minion/server.py @@ -222,7 +222,7 @@ class FuncSSLXMLRPCServer(AuthedXMLRPCServer.AuthedSSLXMLRPCServer, return self.get_dispatch_method(method)(*params) else: meth = self.get_dispatch_method(method) - return jobthing.minion_async_run(meth, *params) + return jobthing.minion_async_run(meth, params) def auth_cb(self, request, client_address): peer_cert = request.get_peer_certificate() diff --git a/func/utils.py b/func/utils.py index 4149885..140b761 100755 --- a/func/utils.py +++ b/func/utils.py @@ -14,7 +14,9 @@ import os import string import sys import traceback +import xmlrpclib +REMOTE_CANARY = "***REMOTE_ERROR***" # this is kind of handy, so keep it around for now # but we really need to fix out server side logging and error @@ -44,3 +46,42 @@ def daemonize(pidfile=None): if pidfile is not None: open(pidfile, "w").write(str(pid)) sys.exit(0) + +def remove_exceptions(results): + """ + Used by forkbomb/jobthing to avoid storing exceptions in database + because you know those don't serialize so well :) + # FIXME: this needs cleanup + """ + + if results is None: + print "DEBUG: A" + return REMOTE_CANARY + + if str(results).startswith("