diff options
-rwxr-xr-x | Makefile | 5 | ||||
-rw-r--r-- | func/jobthing.py | 37 | ||||
-rw-r--r-- | test/async_test.py | 4 |
3 files changed, 34 insertions, 12 deletions
@@ -87,6 +87,11 @@ pyflakes: money: clean -sloccount --addlang "makefile" $(TOPDIR) $(PYDIRS) $(EXAMPLEDIR) $(INITDIR) +async: install + /sbin/service funcd restart + sleep 4 + python test/async_test.py + testit: clean -cd test; sh test-it.sh diff --git a/func/jobthing.py b/func/jobthing.py index a5b0820..ca8ee38 100644 --- a/func/jobthing.py +++ b/func/jobthing.py @@ -28,8 +28,10 @@ import utils JOB_ID_RUNNING = 0 JOB_ID_FINISHED = 1 JOB_ID_LOST_IN_SPACE = 2 -JOB_ID_ASYNC_STATUS = 3 -JOB_ID_ASYNC_COMPLETE = 4 +JOB_ID_ASYNC_PARTIAL = 3 +JOB_ID_ASYNC_FINISHED = 4 + +REMOTE_ERROR = utils.REMOTE_CANARY # how long to retain old job records in the job id database RETAIN_INTERVAL = 60 * 60 @@ -126,7 +128,7 @@ def batch_run(server, process_server, nforks): results = forkbomb.batch_run(server, process_server, nforks) # we now have a list of job id's for each minion, kill the task - __update_status(job_id, JOB_ID_ASYNC_STATUS, results) + __update_status(job_id, JOB_ID_ASYNC_PARTIAL, results) sys.exit(0) def minion_async_run(function_ref, args): @@ -142,7 +144,19 @@ def minion_async_run(function_ref, args): return job_id else: __update_status(job_id, JOB_ID_RUNNING, -1) - results = function_ref(*args) + try: + results = function_ref(*args) + except Exception, e: + # FIXME: we need to make sure this is logged + # NOTE: we need a return here, else the async engine won't get any results + # so that is the reason for the catch + # FIXME: it would be nice to store the string data from the exception here so that the caller + # can read the exception data, however we can't store the exception directly in the DB. + # some care must be made to also make this not suck for the user of the API, + # when they are iterating over batch results, so they can tell good data from exceptions that + # are represented as strings. Ideally, reconstructing the exceptions back into objects would be shiny + # but if we go there I will need more caffeine first. + __update_status(job_id, JOB_ID_FINISHED, REMOTE_ERROR) __update_status(job_id, JOB_ID_FINISHED, results) sys.exit(0) @@ -155,13 +169,13 @@ def job_status(jobid, client_class=None): got_status = __get_status(jobid) - # if the status comes back as JOB_ID_ASYNC_STATUS what we have is actually a hash + # if the status comes back as JOB_ID_ASYNC_PARTIAL what we have is actually a hash # of hostname/minion-jobid pairs. Instantiate a client handle for each and poll them # for their actual status, filling in only the ones that are actually done. (interim_rc, interim_results) = got_status - if interim_rc == JOB_ID_ASYNC_STATUS: + if interim_rc == JOB_ID_ASYNC_PARTIAL: partial_results = {} @@ -172,20 +186,23 @@ def job_status(jobid, client_class=None): minion_job = interim_results[host] client = client_class(host, noglobs=True, async=False) minion_result = client.jobs.job_status(minion_job) - print "DEBUG: background task on minion (%s) has status %s" % (minion_job, minion_result) + # print "DEBUG: background task on minion (%s) has status %s" % (minion_job, minion_result) (minion_interim_rc, minion_interim_result) = minion_result some_missing = False if minion_interim_rc not in [ JOB_ID_RUNNING ]: - partial_results[host] = minion_interim_result + if minion_interim_rc in [ JOB_ID_LOST_IN_SPACE ]: + partial_results[host] = REMOTE_ERROR + else: + partial_results[host] = minion_interim_result else: some_missing = True if some_missing: - return (JOB_ID_ASYNC_STATUS, partial_results) + return (JOB_ID_ASYNC_PARTIAL, partial_results) else: - return (JOB_ID_ASYNC_COMPLETE, partial_results) + return (JOB_ID_ASYNC_FINISHED, partial_results) else: return got_status diff --git a/test/async_test.py b/test/async_test.py index af5e55a..39a22b1 100644 --- a/test/async_test.py +++ b/test/async_test.py @@ -44,12 +44,12 @@ def __tester(async,test): return if code == jobthing.JOB_ID_RUNNING: print "task is still running, %s elapsed ..." % delta - elif code == jobthing.JOB_ID_ASYNC_STATUS: + elif code == jobthing.JOB_ID_ASYNC_PARTIAL: print "task reports partial status, %s elapsed, results = %s" % (delta, results) elif code == jobthing.JOB_ID_FINISHED: print "(non-async) task complete, %s elapsed, results = %s" % (delta, results) return - elif code == jobthing.JOB_ID_ASYNC_COMPLETE: + elif code == jobthing.JOB_ID_ASYNC_FINISHED: print "(async) task complete, %s elapsed, results = %s" % (delta, results) return else: |