summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael DeHaan <mdehaan@redhat.com>2008-01-23 18:51:33 -0500
committerMichael DeHaan <mdehaan@redhat.com>2008-01-23 18:51:33 -0500
commit33c6e4013874878f05eec593d69e8afdeaae212b (patch)
treee84879dd215ac90d20822ab0574dc1ab99b7ed8a
parent19e4d1ba808cbbe95568271a5b0075b8422e4fb6 (diff)
downloadfunc-33c6e4013874878f05eec593d69e8afdeaae212b.tar.gz
func-33c6e4013874878f05eec593d69e8afdeaae212b.tar.xz
func-33c6e4013874878f05eec593d69e8afdeaae212b.zip
Working on async error handling, lots more to do...
(If it hits no exceptions, returns are right, it's the partial error case to deal with next...)
-rw-r--r--func/forkbomb.py8
-rw-r--r--func/jobthing.py6
-rw-r--r--func/minion/modules/test.py8
-rwxr-xr-xfunc/minion/server.py2
-rwxr-xr-xfunc/utils.py41
-rw-r--r--test/async_test.py7
6 files changed, 68 insertions, 4 deletions
diff --git a/func/forkbomb.py b/func/forkbomb.py
index daad28a..3dc12c8 100644
--- a/func/forkbomb.py
+++ b/func/forkbomb.py
@@ -20,6 +20,7 @@ import bsddb
import sys
import tempfile
import fcntl
+import utils
DEFAULT_FORKS = 4
DEFAULT_CACHE_DIR = "/var/lib/func"
@@ -52,11 +53,16 @@ def __access_buckets(filename,clear,new_key=None,new_value=None):
if not storage.has_key("data"):
storage["data"] = {}
+ else:
+ # print "DEBUG: existing: %s" % storage["data"]
+ pass
if new_key is not None:
# bsdb is a bit weird about this
newish = storage["data"].copy()
+ new_value = utils.remove_exceptions(new_value)
newish[new_key] = new_value
+ # print "DEBUG: newish: %s" % newish
storage["data"] = newish
rc = storage["data"].copy()
@@ -118,7 +124,7 @@ def __demo(bucket_number, buckets, my_item):
This is a demo handler for test purposes.
It just multiplies all numbers by 1000, but slowly.
"""
- print ">> I am fork (%s) and I am processing item (%s)" % (bucket_number, my_item)
+ # print ">> I am fork (%s) and I am processing item (%s)" % (bucket_number, my_item)
# just to verify forks are not sequential
sleep = random.randrange(0,4)
time.sleep(sleep)
diff --git a/func/jobthing.py b/func/jobthing.py
index 486fe6b..5b89599 100644
--- a/func/jobthing.py
+++ b/func/jobthing.py
@@ -23,6 +23,7 @@ import sys
import tempfile
import fcntl
import forkbomb
+import utils
JOB_ID_RUNNING = 0
JOB_ID_FINISHED = 1
@@ -82,6 +83,9 @@ def __access_status(jobid=0, status=0, results=0, clear=False, write=False, purg
__purge_old_jobs(storage)
if write:
+ results = utils.remove_exceptions(results)
+ # print "DEBUG: status=%s" % status
+ # print "DEBUG: results=%s" % results
storage[str(jobid)] = (status, results)
rc = jobid
elif not purge:
@@ -137,7 +141,7 @@ def minion_async_run(function_ref, args):
return job_id
else:
__update_status(job_id, JOB_ID_RUNNING, -1)
- results = function_ref(args)
+ results = function_ref(*args)
__update_status(job_id, JOB_ID_FINISHED, results)
sys.exit(0)
diff --git a/func/minion/modules/test.py b/func/minion/modules/test.py
index 6718fed..ed65fd4 100644
--- a/func/minion/modules/test.py
+++ b/func/minion/modules/test.py
@@ -1,5 +1,6 @@
import func_module
import time
+import exceptions
class Test(func_module.FuncModule):
version = "11.11.11"
@@ -7,6 +8,7 @@ class Test(func_module.FuncModule):
description = "Just a very simple example module"
def add(self, numb1, numb2):
+ time.sleep(10)
return numb1 + numb2
def ping(self):
@@ -20,3 +22,9 @@ class Test(func_module.FuncModule):
t = int(t)
time.sleep(t)
return time.time()
+
+ def explode(self):
+ """
+ Testing remote exception handling is useful
+ """
+ raise exceptions.Exception("khhhhhhaaaaaan!!!!!!")
diff --git a/func/minion/server.py b/func/minion/server.py
index 1b2a556..7187783 100755
--- a/func/minion/server.py
+++ b/func/minion/server.py
@@ -222,7 +222,7 @@ class FuncSSLXMLRPCServer(AuthedXMLRPCServer.AuthedSSLXMLRPCServer,
return self.get_dispatch_method(method)(*params)
else:
meth = self.get_dispatch_method(method)
- return jobthing.minion_async_run(meth, *params)
+ return jobthing.minion_async_run(meth, params)
def auth_cb(self, request, client_address):
peer_cert = request.get_peer_certificate()
diff --git a/func/utils.py b/func/utils.py
index 4149885..140b761 100755
--- a/func/utils.py
+++ b/func/utils.py
@@ -14,7 +14,9 @@ import os
import string
import sys
import traceback
+import xmlrpclib
+REMOTE_CANARY = "***REMOTE_ERROR***"
# this is kind of handy, so keep it around for now
# but we really need to fix out server side logging and error
@@ -44,3 +46,42 @@ def daemonize(pidfile=None):
if pidfile is not None:
open(pidfile, "w").write(str(pid))
sys.exit(0)
+
+def remove_exceptions(results):
+ """
+ Used by forkbomb/jobthing to avoid storing exceptions in database
+ because you know those don't serialize so well :)
+ # FIXME: this needs cleanup
+ """
+
+ if results is None:
+ print "DEBUG: A"
+ return REMOTE_CANARY
+
+ if str(results).startswith("<Fault"):
+ print "DEBUG: B"
+ return REMOTE_CANARY
+
+ if type(results) == xmlrpclib.Fault:
+ print "DEBUG: C"
+ return REMOTE_CANARY
+
+ if type(results) == dict:
+ new_results = {}
+ for x in results.keys():
+ value = results[x]
+ # print "DEBUG: checking against: %s" % str(value)
+ if str(value).find("<Fault") == -1:
+ # there are interesting issues with the way it is imported and type()
+ # so that is why this hack is here. type(x) != xmlrpclib.Fault appears to miss some things
+ new_results[x] = value
+ else:
+ new_results[x] = REMOTE_CANARY
+ # print "DEBUG: removed exceptions = %s" % new_results
+ return new_results
+
+ print "DEBUG: removed exceptions = %s" % results
+ return results
+
+
+
diff --git a/test/async_test.py b/test/async_test.py
index cec512b..4c99a56 100644
--- a/test/async_test.py
+++ b/test/async_test.py
@@ -11,7 +11,12 @@ def __tester(async):
client = Client("*",nforks=10,async=True)
oldtime = time.time()
print "asking minion to sleep for %s seconds" % TEST_SLEEP
- job_id = client.test.sleep(TEST_SLEEP)
+
+ # job_id = client.test.sleep(TEST_SLEEP)
+ job_id = client.hardware.info()
+ # job_id = client.test.explode()
+ # job_id = client.test.does_not_exist(1,2)
+
print "job_id = %s" % job_id
while True:
status = client.job_status(job_id)