summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xMakefile5
-rw-r--r--func/jobthing.py37
-rw-r--r--test/async_test.py4
3 files changed, 34 insertions, 12 deletions
diff --git a/Makefile b/Makefile
index 8a44702..974ca4f 100755
--- a/Makefile
+++ b/Makefile
@@ -87,6 +87,11 @@ pyflakes:
money: clean
-sloccount --addlang "makefile" $(TOPDIR) $(PYDIRS) $(EXAMPLEDIR) $(INITDIR)
+async: install
+ /sbin/service funcd restart
+ sleep 4
+ python test/async_test.py
+
testit: clean
-cd test; sh test-it.sh
diff --git a/func/jobthing.py b/func/jobthing.py
index a5b0820..ca8ee38 100644
--- a/func/jobthing.py
+++ b/func/jobthing.py
@@ -28,8 +28,10 @@ import utils
JOB_ID_RUNNING = 0
JOB_ID_FINISHED = 1
JOB_ID_LOST_IN_SPACE = 2
-JOB_ID_ASYNC_STATUS = 3
-JOB_ID_ASYNC_COMPLETE = 4
+JOB_ID_ASYNC_PARTIAL = 3
+JOB_ID_ASYNC_FINISHED = 4
+
+REMOTE_ERROR = utils.REMOTE_CANARY
# how long to retain old job records in the job id database
RETAIN_INTERVAL = 60 * 60
@@ -126,7 +128,7 @@ def batch_run(server, process_server, nforks):
results = forkbomb.batch_run(server, process_server, nforks)
# we now have a list of job id's for each minion, kill the task
- __update_status(job_id, JOB_ID_ASYNC_STATUS, results)
+ __update_status(job_id, JOB_ID_ASYNC_PARTIAL, results)
sys.exit(0)
def minion_async_run(function_ref, args):
@@ -142,7 +144,19 @@ def minion_async_run(function_ref, args):
return job_id
else:
__update_status(job_id, JOB_ID_RUNNING, -1)
- results = function_ref(*args)
+ try:
+ results = function_ref(*args)
+ except Exception, e:
+ # FIXME: we need to make sure this is logged
+ # NOTE: we need a return here, else the async engine won't get any results
+ # so that is the reason for the catch
+ # FIXME: it would be nice to store the string data from the exception here so that the caller
+ # can read the exception data, however we can't store the exception directly in the DB.
+ # some care must be made to also make this not suck for the user of the API,
+ # when they are iterating over batch results, so they can tell good data from exceptions that
+ # are represented as strings. Ideally, reconstructing the exceptions back into objects would be shiny
+ # but if we go there I will need more caffeine first.
+ __update_status(job_id, JOB_ID_FINISHED, REMOTE_ERROR)
__update_status(job_id, JOB_ID_FINISHED, results)
sys.exit(0)
@@ -155,13 +169,13 @@ def job_status(jobid, client_class=None):
got_status = __get_status(jobid)
- # if the status comes back as JOB_ID_ASYNC_STATUS what we have is actually a hash
+ # if the status comes back as JOB_ID_ASYNC_PARTIAL what we have is actually a hash
# of hostname/minion-jobid pairs. Instantiate a client handle for each and poll them
# for their actual status, filling in only the ones that are actually done.
(interim_rc, interim_results) = got_status
- if interim_rc == JOB_ID_ASYNC_STATUS:
+ if interim_rc == JOB_ID_ASYNC_PARTIAL:
partial_results = {}
@@ -172,20 +186,23 @@ def job_status(jobid, client_class=None):
minion_job = interim_results[host]
client = client_class(host, noglobs=True, async=False)
minion_result = client.jobs.job_status(minion_job)
- print "DEBUG: background task on minion (%s) has status %s" % (minion_job, minion_result)
+ # print "DEBUG: background task on minion (%s) has status %s" % (minion_job, minion_result)
(minion_interim_rc, minion_interim_result) = minion_result
some_missing = False
if minion_interim_rc not in [ JOB_ID_RUNNING ]:
- partial_results[host] = minion_interim_result
+ if minion_interim_rc in [ JOB_ID_LOST_IN_SPACE ]:
+ partial_results[host] = REMOTE_ERROR
+ else:
+ partial_results[host] = minion_interim_result
else:
some_missing = True
if some_missing:
- return (JOB_ID_ASYNC_STATUS, partial_results)
+ return (JOB_ID_ASYNC_PARTIAL, partial_results)
else:
- return (JOB_ID_ASYNC_COMPLETE, partial_results)
+ return (JOB_ID_ASYNC_FINISHED, partial_results)
else:
return got_status
diff --git a/test/async_test.py b/test/async_test.py
index af5e55a..39a22b1 100644
--- a/test/async_test.py
+++ b/test/async_test.py
@@ -44,12 +44,12 @@ def __tester(async,test):
return
if code == jobthing.JOB_ID_RUNNING:
print "task is still running, %s elapsed ..." % delta
- elif code == jobthing.JOB_ID_ASYNC_STATUS:
+ elif code == jobthing.JOB_ID_ASYNC_PARTIAL:
print "task reports partial status, %s elapsed, results = %s" % (delta, results)
elif code == jobthing.JOB_ID_FINISHED:
print "(non-async) task complete, %s elapsed, results = %s" % (delta, results)
return
- elif code == jobthing.JOB_ID_ASYNC_COMPLETE:
+ elif code == jobthing.JOB_ID_ASYNC_FINISHED:
print "(async) task complete, %s elapsed, results = %s" % (delta, results)
return
else: