summaryrefslogtreecommitdiffstats
path: root/func
diff options
context:
space:
mode:
authorMichael DeHaan <mdehaan@redhat.com>2008-01-23 18:51:33 -0500
committerMichael DeHaan <mdehaan@redhat.com>2008-01-23 18:51:33 -0500
commit33c6e4013874878f05eec593d69e8afdeaae212b (patch)
treee84879dd215ac90d20822ab0574dc1ab99b7ed8a /func
parent19e4d1ba808cbbe95568271a5b0075b8422e4fb6 (diff)
Working on async error handling, lots more to do...
(If it hits no exceptions, returns are right, it's the partial error case to deal with next...)
Diffstat (limited to 'func')
-rw-r--r--func/forkbomb.py8
-rw-r--r--func/jobthing.py6
-rw-r--r--func/minion/modules/test.py8
-rwxr-xr-xfunc/minion/server.py2
-rwxr-xr-xfunc/utils.py41
5 files changed, 62 insertions, 3 deletions
diff --git a/func/forkbomb.py b/func/forkbomb.py
index daad28a..3dc12c8 100644
--- a/func/forkbomb.py
+++ b/func/forkbomb.py
@@ -20,6 +20,7 @@ import bsddb
import sys
import tempfile
import fcntl
+import utils
DEFAULT_FORKS = 4
DEFAULT_CACHE_DIR = "/var/lib/func"
@@ -52,11 +53,16 @@ def __access_buckets(filename,clear,new_key=None,new_value=None):
if not storage.has_key("data"):
storage["data"] = {}
+ else:
+ # print "DEBUG: existing: %s" % storage["data"]
+ pass
if new_key is not None:
# bsdb is a bit weird about this
newish = storage["data"].copy()
+ new_value = utils.remove_exceptions(new_value)
newish[new_key] = new_value
+ # print "DEBUG: newish: %s" % newish
storage["data"] = newish
rc = storage["data"].copy()
@@ -118,7 +124,7 @@ def __demo(bucket_number, buckets, my_item):
This is a demo handler for test purposes.
It just multiplies all numbers by 1000, but slowly.
"""
- print ">> I am fork (%s) and I am processing item (%s)" % (bucket_number, my_item)
+ # print ">> I am fork (%s) and I am processing item (%s)" % (bucket_number, my_item)
# just to verify forks are not sequential
sleep = random.randrange(0,4)
time.sleep(sleep)
diff --git a/func/jobthing.py b/func/jobthing.py
index 486fe6b..5b89599 100644
--- a/func/jobthing.py
+++ b/func/jobthing.py
@@ -23,6 +23,7 @@ import sys
import tempfile
import fcntl
import forkbomb
+import utils
JOB_ID_RUNNING = 0
JOB_ID_FINISHED = 1
@@ -82,6 +83,9 @@ def __access_status(jobid=0, status=0, results=0, clear=False, write=False, purg
__purge_old_jobs(storage)
if write:
+ results = utils.remove_exceptions(results)
+ # print "DEBUG: status=%s" % status
+ # print "DEBUG: results=%s" % results
storage[str(jobid)] = (status, results)
rc = jobid
elif not purge:
@@ -137,7 +141,7 @@ def minion_async_run(function_ref, args):
return job_id
else:
__update_status(job_id, JOB_ID_RUNNING, -1)
- results = function_ref(args)
+ results = function_ref(*args)
__update_status(job_id, JOB_ID_FINISHED, results)
sys.exit(0)
diff --git a/func/minion/modules/test.py b/func/minion/modules/test.py
index 6718fed..ed65fd4 100644
--- a/func/minion/modules/test.py
+++ b/func/minion/modules/test.py
@@ -1,5 +1,6 @@
import func_module
import time
+import exceptions
class Test(func_module.FuncModule):
version = "11.11.11"
@@ -7,6 +8,7 @@ class Test(func_module.FuncModule):
description = "Just a very simple example module"
def add(self, numb1, numb2):
+ time.sleep(10)
return numb1 + numb2
def ping(self):
@@ -20,3 +22,9 @@ class Test(func_module.FuncModule):
t = int(t)
time.sleep(t)
return time.time()
+
+ def explode(self):
+ """
+ Testing remote exception handling is useful
+ """
+ raise exceptions.Exception("khhhhhhaaaaaan!!!!!!")
diff --git a/func/minion/server.py b/func/minion/server.py
index 1b2a556..7187783 100755
--- a/func/minion/server.py
+++ b/func/minion/server.py
@@ -222,7 +222,7 @@ class FuncSSLXMLRPCServer(AuthedXMLRPCServer.AuthedSSLXMLRPCServer,
return self.get_dispatch_method(method)(*params)
else:
meth = self.get_dispatch_method(method)
- return jobthing.minion_async_run(meth, *params)
+ return jobthing.minion_async_run(meth, params)
def auth_cb(self, request, client_address):
peer_cert = request.get_peer_certificate()
diff --git a/func/utils.py b/func/utils.py
index 4149885..140b761 100755
--- a/func/utils.py
+++ b/func/utils.py
@@ -14,7 +14,9 @@ import os
import string
import sys
import traceback
+import xmlrpclib
+REMOTE_CANARY = "***REMOTE_ERROR***"
# this is kind of handy, so keep it around for now
# but we really need to fix out server side logging and error
@@ -44,3 +46,42 @@ def daemonize(pidfile=None):
if pidfile is not None:
open(pidfile, "w").write(str(pid))
sys.exit(0)
+
+def remove_exceptions(results):
+ """
+ Used by forkbomb/jobthing to avoid storing exceptions in database
+ because you know those don't serialize so well :)
+ # FIXME: this needs cleanup
+ """
+
+ if results is None:
+ print "DEBUG: A"
+ return REMOTE_CANARY
+
+ if str(results).startswith("<Fault"):
+ print "DEBUG: B"
+ return REMOTE_CANARY
+
+ if type(results) == xmlrpclib.Fault:
+ print "DEBUG: C"
+ return REMOTE_CANARY
+
+ if type(results) == dict:
+ new_results = {}
+ for x in results.keys():
+ value = results[x]
+ # print "DEBUG: checking against: %s" % str(value)
+ if str(value).find("<Fault") == -1:
+ # there are interesting issues with the way it is imported and type()
+ # so that is why this hack is here. type(x) != xmlrpclib.Fault appears to miss some things
+ new_results[x] = value
+ else:
+ new_results[x] = REMOTE_CANARY
+ # print "DEBUG: removed exceptions = %s" % new_results
+ return new_results
+
+ print "DEBUG: removed exceptions = %s" % results
+ return results
+
+
+